Package iceprod :: Package core :: Module runconfig
[hide private]
[frames] | no frames]

Source Code for Module iceprod.core.runconfig

  1  #! /usr/bin/env python 
  2  # 
  3   
  4  """ 
  5     Module for submitting IceTray jobs in the form of Python IceTrayConfig 
  6     objects. 
  7   
  8     copyright  (c) 2005 the icecube collaboration 
  9   
 10     Usage::  
 11   
 12         runconfig.py [options] <file> 
 13       
 14      where options is one or more of 
 15       
 16         --nproc=<number of proceses> : Specify a value to substitute $arg(nproc) 
 17       
 18         --procnum=<process number> : Specify a process number to substitute $arg(procnum) 
 19         --dataset=<dataset id> : Specify a unique dataset id when in production 
 20   
 21   
 22     Runconfig allows the user (or production system) to substitute values in a 
 23     generic configuration file with those passed as options in the commandline. 
 24     This is useful for submitting a number of jobs in which only a few parameters  
 25     change for each job. 
 26   
 27   
 28     For example one might set a parameter in the configfile like:: 
 29   
 30         <service class='I3SPRNGRandomServiceFactory'> 
 31        <name>sprngrandom</name> 
 32        <parameters> 
 33          <int> 
 34            <name>NStreams</name> 
 35            <value>$args(nproc)</value> 
 36          </int> 
 37          <int> 
 38            <name>StreamNum</name> 
 39            <value>$args(procnum)</value> 
 40          </int> 
 41          <int> 
 42            <name>Seed</name> 
 43            <value>11</value> 
 44          </int> 
 45        </parameters> 
 46      </service> 
 47   
 48     then run 'runconfig.py --nproc=2 --procnum=0 config.xml' 
 49       
 50   
 51   
 52     @version: $Revision: 1.7 $ 
 53     @date: $Date: 2005/04/06 17:32:56 $ 
 54     @author: T. McCauley <tpmccauley@lbl.gov> 
 55     @author: Paolo Desiati <paolo.desiati@icecube.wisc.edu> 
 56     @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 57   
 58     @todo: Add support for more project content (the "optional" option) 
 59     @todo: Be a bit more clever about handling of type tags 
 60     @todo: Units 
 61     @todo: Commas and whitespace handling in projects attribute 
 62     @todo: What to do if libraries are included more than once? 
 63     @todo: Add some overall elegance 
 64     @todo: Fix the "inbox" problem 
 65  """ 
 66   
 67  import sys,os,time 
 68  import string 
 69  import re,getopt 
 70  import glob 
 71  from iceprod.core.dataclasses import * 
 72  from iceprod.core.dataclasses import VectorTypes 
 73  from iceprod.core.xmlparser import IceTrayXMLParser 
 74  from os.path import expandvars 
 75  from iceprod.core.lex import ExpParser 
 76  import logging 
 77  from iceprod.core import resample,functions 
 78   
 79  logging.basicConfig() 
 80  logger = logging.getLogger('RunConfig') 
 81  logger.setLevel(logging.INFO) 
 82   
 83  host = os.uname()[1] 
 84   
85 -class RunConfig:
86 87 loaded_libs = {} 88
89 - def __init__(self,steering,opts):
90 91 self.steering = steering 92 self.opts = opts 93 self.expparser = ExpParser(self.opts,self.steering)
94
95 - def ProcExtern(self):
96 retval = 0 97 for extern in self.steering.GetExterns(): 98 print "running external " 99 print "name: %s" % extern.GetName() 100 print "version: %s" % extern.GetVersion() 101 os.system("chmod +x " + extern.GetExec()) 102 cmd = "%s %s" % (extern.GetExec(), self.parseval(extern.GetArgs())) 103 print "cmd: %s" % cmd 104 if extern.GetInFile(): 105 cmd += " <" + self.parseval(extern.GetInFile()) 106 extern.WriteInputFile(extern.GetInFile()) 107 if extern.GetOutFile(): 108 cmd += " 1>" + self.parseval(extern.GetOutFile()) 109 if extern.GetErrFile(): 110 cmd += " 2>" + self.parseval(extern.GetErrFile()) 111 for file in extern.GetSteering(): 112 file.Write() 113 print cmd 114 retval += os.system(cmd) 115 if not retval == 0: 116 logger.fatal("Error while processing %s" % extern.GetName()) 117 os._exit(5) 118 return retval
119 120
121 - def load_library(self,library,i3config):
122 """ 123 Load library. If library depends on other libraries, 124 recursively load them first (can't have closed loops) 125 @param library: library to be loaded 126 """ 127 from iceprod.core import py3 128 for dep_name in library.GetDependencies(): 129 dependency = i3config.GetProject(dep_name) 130 if dependency: 131 if dependency.GetName() != library.GetName(): 132 self.load_library(dependency,i3config) 133 else: 134 logger.warn("'%s' depends on '%s'." % (library.GetName(),dep_name)) 135 136 if library.GetType() == 'python': 137 libname = library.GetName() 138 if not libname in self.loaded_libs.keys(): 139 self.loaded_libs[libname] = __import__(libname, globals(),locals()) 140 else: # 'c++' 141 libname = 'lib'+library.GetName() 142 if not libname in self.loaded_libs.keys(): 143 py3.load(str(libname)) 144 self.loaded_libs[libname] = True
145 146
147 - def LoadLibraries(self,i3config):
148 """ 149 Load libraries into memory 150 """ 151 152 # We have the library names so load them... 153 # ...but prevent loading the same library (from service elements) 154 metaprojects = i3config.GetMetaProjectList() 155 for mp in metaprojects: 156 projects = mp.GetProjectList() 157 for lib in projects: 158 self.load_library(lib,i3config) 159 160 projects = i3config.GetProjectList() 161 for lib in projects: 162 self.load_library(lib,i3config)
163 164
165 - def format_string(self,pstring,pformat):
166 """ 167 Format string using the arguments in pformat 168 """ 169 return_string = pstring 170 if not pformat: 171 return return_string 172 173 try: 174 fstrings = re.findall(r'\%[0-9]*.{0,1}[0-9]*[csridufegExXo]',pstring) 175 pformat = map(self.cast_string,fstrings,pformat) 176 return_string = pstring % tuple(pformat) 177 except TypeError,e: 178 logger.error( str(e) ) 179 logger.error('Unable to format string parameter ' \ 180 + logger.error(pstring) + ' with args ' +str(pformat)) 181 excinfo = sys.exc_info() 182 sys.excepthook(excinfo[0],excinfo[1],excinfo[2]) 183 184 return return_string
185
186 - def cast_string(self,fstring,arg):
187 if not fstring: return arg 188 if fstring.endswith('c'): return str(arg) 189 elif fstring.endswith('s'): return str(arg) 190 elif fstring.endswith('r'): return repr(arg) 191 elif fstring.endswith('i'): return int(arg) 192 elif fstring.endswith('d'): return int(arg) 193 elif fstring.endswith('u'): return int(arg) 194 elif fstring.endswith('f'): return float(arg) 195 elif fstring.endswith('e'): return float(arg) 196 elif fstring.endswith('g'): return float(arg) 197 elif fstring.endswith('E'): return float(arg) 198 elif fstring.endswith('x'): return int('0x%s' % arg, 0) 199 elif fstring.endswith('X'): return int('0x%s' % arg, 0) 200 elif fstring.endswith('o'): return int('0%s' % arg, 0) 201 else: 202 logger.warn('Unable to cast %s using format %s' (arg,fstring)) 203 return arg
204 205
206 - def parseval(self,pvalue):
207 """ 208 Parse parameter value string to see if it is an external 209 variable and replace it with its value if found. 210 """ 211 return self.expparser.parse(pvalue)
212 213
214 - def EvaluateParameter(self,val,stype):
215 from iceprod.core.py3 import OMKey 216 217 if stype == 'stringv': 218 vals = map(lambda x: self.EvaluateParameter(x,stype[:-1]),val) 219 retvals = [] 220 for v in vals: 221 if v.startswith('$glob('): 222 retvals.extend(glob.glob(v[len('$glob('):len(v)-1])) 223 else: 224 retvals.append(v) 225 return retvals 226 227 if stype in VectorTypes: 228 return map(lambda x: self.EvaluateParameter(x,stype[:-1]),val) 229 230 if stype == 'OMKey': 231 stringid = self.parseval(val.stringid) 232 stringid = self.TypeValue(stringid,'int',None) 233 omid = self.parseval(val.omid) 234 omid = self.TypeValue(omid,'int',None) 235 return OMKey(stringid,omid) 236 237 pvalue = val.value 238 punit = val.unit 239 pformat = val.format 240 241 if stype == 'string' and not pformat == None: #format string 242 pformat = map(self.parseval,pformat.split(',')) 243 pvalue = self.format_string(pvalue,pformat) 244 245 #parse value string 246 pvalue = self.parseval(pvalue) 247 pvalue = self.TypeValue(pvalue, stype, punit) 248 return pvalue
249
250 - def ConfigureServices(self,i3config,tray):
251 if not i3config.GetServices(): return False 252 for serv in i3config.GetServices(): 253 sname = serv.GetName() 254 sclass = serv.GetClass() 255 tray.AddService(str(sclass), str(sname)) 256 logger.debug("loading class '%s' as service '%s'" % (sclass,sname)) 257 258 for param in serv.GetParameters(): 259 # If there is an environmental variable, expand it 260 pname = param.GetName() 261 stype = param.GetType() 262 pvalue = param.GetValue() 263 try: 264 pvalue = self.EvaluateParameter(pvalue,stype) 265 tray.SetParameter(str(sname), str(pname), pvalue) 266 print stype,pname,pvalue 267 except TypeError,e: 268 logger.error('%s: Type error thrown: %s' % (pname,str(e))) 269 excinfo = sys.exc_info() 270 sys.excepthook(excinfo[0],excinfo[1],excinfo[2]) 271 os._exit(50) 272 return True
273 274
275 - def ConfigureModules(self,i3config,tray):
276 if not i3config.GetModules(): return False 277 for mod in i3config.GetModules(): 278 mname = str(mod.GetName()) 279 mclass = str(mod.GetClass()) 280 lib = mod.GetProjectList()[0] 281 # support for python modules 282 if lib.GetType() == 'python': 283 mclass = __import__(lib.GetName(), globals(),locals(),[mclass]) 284 285 tray.AddModule(mclass, mname) 286 logger.debug("loading class '%s' as module '%s'" % (mclass,mname)) 287 288 for param in mod.GetParameters(): 289 try: 290 pname = param.GetName() 291 stype = param.GetType() 292 pvalue = param.GetValue() 293 294 pvalue = self.EvaluateParameter(pvalue,stype) 295 tray.SetParameter(str(mname), str(pname), pvalue) 296 print stype,pname,pvalue 297 except TypeError,e: 298 logger.error('%s: Type error thrown: %s' % (pname,str(e))) 299 excinfo = sys.exc_info() 300 sys.excepthook(excinfo[0],excinfo[1],excinfo[2]) 301 os._exit(51) 302 return True
303
304 - def ConfigurePre(self,i3config,tray):
305 for mod in i3config.GetIceProdPres(): 306 mname = mod.GetName() 307 mclass = mod.GetClass() 308 309 tray.AddModule(str(mclass), str(mname)) 310 logger.debug("loading class '%s' as module '%s'" % (mclass,mname)) 311 312 tray.SetParser(str(mname),self.expparser) 313 314 for param in mod.GetParameters(): 315 try: 316 pname = param.GetName() 317 stype = param.GetType() 318 pvalue = param.GetValue() 319 320 pvalue = self.EvaluateParameter(pvalue,stype) 321 tray.SetParameter(str(mname), str(pname), pvalue) 322 except TypeError,e: 323 logger.error('%s: Type error thrown: %s' % (pname,str(e))) 324 excinfo = sys.exc_info() 325 sys.excepthook(excinfo[0],excinfo[1],excinfo[2]) 326 os._exit(52)
327
328 - def ConfigurePost(self,i3config,tray):
329 for mod in i3config.GetIceProdPosts(): 330 mname = mod.GetName() 331 mclass = mod.GetClass() 332 333 tray.AddModule(str(mclass), str(mname)) 334 logger.debug("loading class '%s' as module '%s'" % (mclass,mname)) 335 336 tray.SetParser(str(mname),self.expparser) 337 338 for param in mod.GetParameters(): 339 try: 340 pname = param.GetName() 341 stype = param.GetType() 342 pvalue = param.GetValue() 343 344 pvalue = self.EvaluateParameter(pvalue,stype) 345 tray.SetParameter(str(mname), str(pname), pvalue) 346 except TypeError,e: 347 logger.error('%s: Type error thrown: %s' % (pname,str(e))) 348 excinfo = sys.exc_info() 349 sys.excepthook(excinfo[0],excinfo[1],excinfo[2]) 350 os._exit(53)
351 352 353
354 - def ConnectBoxes(self,i3config,tray):
355 if not i3config.GetConnections(): return False 356 for conn in i3config.GetConnections(): 357 outbox = conn.GetOutbox() 358 from_module = outbox.GetModule() 359 inbox = conn.GetInbox() 360 to_module = inbox.GetModule() 361 tray.ConnectBoxes(str(from_module), 362 str(outbox.GetBoxName()), str(to_module)) 363 logger.debug("%s: Connection: from %s, to %s %s" % \ 364 (host,from_module,outbox.GetBoxName(), to_module)) 365 return True
366
367 - def TypeValue(self,pvalue, ptype,punit=None):
368 """ 369 Convert string to a typed value 370 @param pvalue: parameter value (string) 371 @param ptype: parameter type (string) 372 @param punit: optional parameter I3Unit (string) 373 """ 374 375 if ptype == 'bool': 376 return pvalue not in ["False","0","[]"] 377 if ptype == 'int': 378 return int(pvalue) 379 if ptype == 'long': 380 return long(pvalue) 381 if ptype == 'float' or ptype == 'double': 382 from iceprod.core import py3 383 return py3.units(float(pvalue),punit) 384 if ptype == 'string': 385 return str(pvalue) 386 elif type.endswith("v"): 387 if type.startswith("string"): 388 return re.findall(r'\"[^\"]\"',pvalue) 389 else: 390 return val.split(",") 391 else: 392 logger.error('Unsupported type "%s"' % type ) 393 os._exit(40)
394 395
396 - def Run(self,i3config):
397 """ 398 Execute icetray 399 """ 400 from iceprod.core.lex import XMLSummaryParser 401 402 logger.debug('I3Tray') 403 404 # config versions prior to v2 did not have pre/post 405 config_version = 1 406 try: 407 config_version = self.steering.GetVersion() 408 logger.info("steering version %d" % config_version) 409 except Exception,e: 410 logger.warn(str(e) +": reverting to config version 1") 411 412 summaryfile = 'summary.xml' 413 if self.steering.GetParameter('summaryfile'): 414 summaryfile = self.steering.GetParameter('summaryfile').GetValue() 415 summaryfile = 'iceprod.' + self.expparser.parse(summaryfile) 416 417 if config_version > 1: 418 # execute pre modules first 419 420 pre = I3PreTray() 421 pre_summary = XMLSummaryParser() 422 if os.path.exists(summaryfile): 423 pre_summary.ParseFile(summaryfile) 424 pre.stats = pre_summary.summary_map 425 426 self.ConfigurePre(i3config,pre) 427 pre.Execute() 428 pre_summary.Write(summaryfile) 429 430 if i3config.GetModules(): # has I3 modules 431 from iceprod.core.py3 import I3Tray 432 433 self.LoadLibraries(i3config) 434 tray = I3Tray() 435 436 events = i3config.GetEvents() 437 events = int(self.parseval(events)) 438 439 services = self.ConfigureServices(i3config,tray) 440 modules = self.ConfigureModules(i3config,tray) 441 connections = self.ConnectBoxes(i3config,tray) 442 443 if modules: 444 if events > 0: 445 tray.Execute(events) 446 else: 447 tray.Execute() 448 tray.Finish() 449 450 # Now execute posts 451 if config_version > 1: 452 453 # then configure post (so misconfigures posts can abort before running icetray) 454 post_summary = XMLSummaryParser() 455 456 post = I3PostTray() 457 if os.path.exists(summaryfile): 458 post_summary.ParseFile(summaryfile) 459 post.stats = post_summary.summary_map 460 461 self.ConfigurePost(i3config,post) 462 post.Execute() 463 464 ## At present we cannot process summary file in post tray since 465 ## Summary service won't write it until destructor is called 466 #post_summary.Write(summaryfile) 467 468 pre.Finish() 469 post.Finish() 470 471 return 472 473 ######################################################################## 474
475 -def parse_opts(arglist):
476 opt_dict = { 477 'extern':0, 478 'nproc':1, 479 'procnum':0, 480 'dataset':0, 481 'iter':0, 482 'tray':0, 483 'step':0, 484 'validate':0 485 } 486 for o,a in arglist: 487 if o in ("-v", "--validate"): # should validate 488 opt_dict["validate"] = int(a) 489 elif o in ("-g", "--grid"): # specify grid ID 490 opt_dict["grid"] = a 491 else: 492 opt_dict[o.replace("--","")] = a 493 return opt_dict
494 495
496 -def usage(arguments):
497 """ 498 print usage/help info 499 500 @param arguments: cmdline args passed to program 501 """ 502 print >> sys.stderr, """ 503 Usage: %s [options] <file> 504 505 where options is one or more of 506 507 --nproc=<number of proceses> : Specify a value to substitute $arg(nproc) 508 509 --procnum=<process number> : Specify a process number to substitute $arg(procnum) 510 511 --dataset=<i3simdb key> : Specify the dataset that this job belongs to 512 513 --url=<soap url> : Url of soap server to comunicate with for lookup tables 514 515 """ % arguments[0]
516 517
518 -def main(opts,args,steering=None):
519 520 logging.basicConfig() 521 print '\n----------- environment contents ------------' 522 for envkey in os.environ.keys(): 523 print envkey, ":" 524 for env_val in os.environ[envkey].split(":"): 525 print "\t", env_val 526 527 optdict = parse_opts(opts) 528 xmlvalidate = False 529 if optdict.has_key('validate'): 530 xmlvalidate = optdict['validate'] 531 532 # Instantiate the IceTray object 533 if not steering: 534 steering = Steering() 535 xparser = IceTrayXMLParser(steering) 536 xparser.ParseFile(args[0],validate=xmlvalidate) 537 538 # Set node-dependent sys opts 539 steering.AddSysOpt(SysOpt('hostname', functions.gethostname())) 540 if optdict.has_key('cache'): 541 steering.AddSysOpt(SysOpt('cachedir',optdict['cache'])) 542 543 # Extract the tray out of the steering file 544 itray = int(optdict['tray']) 545 iiter = int(optdict['iter']) 546 logger.info("tray %u, iter %u" % (itray,iiter)) 547 if itray < len(steering.GetTrays()): 548 549 tray = steering.GetTrays()[itray] 550 551 max_iter = int(tray.GetIterations()) -1 552 optdict['max_iter'] = max_iter 553 prev_max_iter = 1 554 if itray > 0: 555 prev_max_iter = int(steering.GetTrays()[itray-1].GetIterations())-1 556 optdict['prev_max_iter'] = prev_max_iter 557 558 else: 559 print >> sys.stderr, 'assertion failed: itray < len(steering.GetTrays())' 560 sys.exit(11) 561 562 runconfig = RunConfig(steering,optdict) 563 564 if int(optdict['extern']): 565 if len(steering.GetExterns()): 566 try: # Process any external proc 567 logger.info("processing externs") 568 retval = runconfig.ProcExtern() 569 os._exit(retval) 570 except Exception,e: 571 excinfo = sys.exc_info() 572 sys.excepthook(excinfo[0],excinfo[1],excinfo[2]) 573 os._exit(1) 574 else: 575 logger.info("no externs to process") 576 os._exit(0) 577 578 logger.debug('Execute') 579 logger.info('/'.join(os.uname())) 580 581 # Run it! 582 runconfig.Run( tray ) 583 print 'done' 584 return 0
585 586 if __name__ == '__main__': 587 retval = main(sys.argv) 588 os._exit(retval) 589