Package iceprod :: Package core :: Module runconfig
[hide private]
[frames] | no frames]

Source Code for Module iceprod.core.runconfig

  1  #! /usr/bin/env python 
  2  # 
  3   
  4  """ 
  5     Module for submitting IceTray jobs in the form of Python IceTrayConfig 
  6     objects. 
  7   
  8     copyright  (c) 2005 the icecube collaboration 
  9   
 10     Usage::  
 11   
 12         runconfig.py [options] <file> 
 13       
 14      where options is one or more of 
 15       
 16         --nproc=<number of proceses> : Specify a value to substitute $arg(nproc) 
 17       
 18         --procnum=<process number> : Specify a process number to substitute $arg(procnum) 
 19         --dataset=<dataset id> : Specify a unique dataset id when in production 
 20   
 21   
 22     Runconfig allows the user (or production system) to substitute values in a 
 23     generic configuration file with those passed as options in the commandline. 
 24     This is useful for submitting a number of jobs in which only a few parameters  
 25     change for each job. 
 26   
 27   
 28     For example one might set a parameter in the configfile like:: 
 29   
 30         <service class='I3SPRNGRandomServiceFactory'> 
 31        <name>sprngrandom</name> 
 32        <parameters> 
 33          <int> 
 34            <name>NStreams</name> 
 35            <value>$args(nproc)</value> 
 36          </int> 
 37          <int> 
 38            <name>StreamNum</name> 
 39            <value>$args(procnum)</value> 
 40          </int> 
 41          <int> 
 42            <name>Seed</name> 
 43            <value>11</value> 
 44          </int> 
 45        </parameters> 
 46      </service> 
 47   
 48     then run 'runconfig.py --nproc=2 --procnum=0 config.xml' 
 49       
 50   
 51   
 52     @version: $Revision: 1.7 $ 
 53     @date: $Date: 2005/04/06 17:32:56 $ 
 54     @author: T. McCauley <tpmccauley@lbl.gov> 
 55     @author: Paolo Desiati <paolo.desiati@icecube.wisc.edu> 
 56     @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 57   
 58     @todo: Add support for more project content (the "optional" option) 
 59     @todo: Be a bit more clever about handling of type tags 
 60     @todo: Units 
 61     @todo: Commas and whitespace handling in projects attribute 
 62     @todo: What to do if libraries are included more than once? 
 63     @todo: Add some overall elegance 
 64     @todo: Fix the "inbox" problem 
 65  """ 
 66   
 67  import sys,os,time 
 68  import string 
 69  import re,getopt 
 70  import glob 
 71  from iceprod.core.dataclasses import * 
 72  from iceprod.core.dataclasses import VectorTypes 
 73  from iceprod.core.xmlparser import IceTrayXMLParser 
 74  from os.path import expandvars 
 75  from iceprod.core.lex import ExpParser 
 76  import logging 
 77  from iceprod.core import resample,functions 
 78   
 79  logging.basicConfig() 
 80  logger = logging.getLogger('RunConfig') 
 81  logger.setLevel(logging.INFO) 
 82   
 83  host = os.uname()[1] 
 84   
85 -class RunConfig:
86 87 loaded_libs = {} 88
89 - def __init__(self,steering,opts):
90 91 self.steering = steering 92 self.opts = opts 93 self.expparser = ExpParser(self.opts,self.steering)
94
95 - def ProcExtern(self):
96 retval = 0 97 for extern in self.steering.GetExterns(): 98 print "running external " 99 print "name: %s" % extern.GetName() 100 print "version: %s" % extern.GetVersion() 101 os.system("chmod +x " + extern.GetExec()) 102 cmd = "%s %s" % (extern.GetExec(), self.parseval(extern.GetArgs())) 103 print "cmd: %s" % cmd 104 if extern.GetInFile(): 105 cmd += " <" + self.parseval(extern.GetInFile()) 106 extern.WriteInputFile(extern.GetInFile()) 107 if extern.GetOutFile(): 108 cmd += " 1>" + self.parseval(extern.GetOutFile()) 109 if extern.GetErrFile(): 110 cmd += " 2>" + self.parseval(extern.GetErrFile()) 111 for file in extern.GetSteering(): 112 file.Write() 113 print cmd 114 retval += os.system(cmd) 115 if not retval == 0: 116 logger.fatal("Error while processing %s" % extern.GetName()) 117 os._exit(5) 118 return retval
119 120
121 - def load_library(self,library,i3config):
122 """ 123 Load library. If library depends on other libraries, 124 recursively load them first (can't have closed loops) 125 @param library: library to be loaded 126 """ 127 from iceprod.core import py3 128 for dep_name in library.GetDependencies(): 129 dependency = i3config.GetProject(dep_name) 130 if dependency: 131 if dependency.GetName() != library.GetName(): 132 self.load_library(dependency,i3config) 133 else: 134 logger.warn("'%s' depends on '%s'." % (library.GetName(),dep_name)) 135 136 if library.GetType() == 'python': 137 libname = library.GetName() 138 if not libname in self.loaded_libs.keys(): 139 self.loaded_libs[libname] = __import__(libname, globals(),locals()) 140 else: # 'c++' 141 libname = 'lib'+library.GetName() 142 if not libname in self.loaded_libs.keys(): 143 py3.load(str(libname)) 144 self.loaded_libs[libname] = True
145 146
147 - def LoadLibraries(self,i3config):
148 """ 149 Load libraries into memory 150 """ 151 152 # We have the library names so load them... 153 # ...but prevent loading the same library (from service elements) 154 metaprojects = i3config.GetMetaProjectList() 155 for mp in metaprojects: 156 projects = mp.GetProjectList() 157 for lib in projects: 158 self.load_library(lib,i3config) 159 160 projects = i3config.GetProjectList() 161 for lib in projects: 162 self.load_library(lib,i3config)
163 164
165 - def format_string(self,pstring,pformat):
166 """ 167 Format string using the arguments in pformat 168 """ 169 return_string = pstring 170 if not pformat: 171 return return_string 172 173 try: 174 fstrings = re.findall(r'\%[0-9]*.{0,1}[0-9]*[csridufegExXo]',pstring) 175 pformat = map(self.cast_string,fstrings,pformat) 176 return_string = pstring % tuple(pformat) 177 except TypeError,e: 178 logger.error( str(e) ) 179 logger.error('Unable to format string parameter ' \ 180 + logger.error(pstring) + ' with args ' +str(pformat)) 181 excinfo = sys.exc_info() 182 sys.excepthook(excinfo[0],excinfo[1],excinfo[2]) 183 184 return return_string
185
186 - def cast_string(self,fstring,arg):
187 if not fstring: return arg 188 if fstring.endswith('c'): return str(arg) 189 elif fstring.endswith('s'): return str(arg) 190 elif fstring.endswith('r'): return repr(arg) 191 elif fstring.endswith('i'): return int(arg) 192 elif fstring.endswith('d'): return int(arg) 193 elif fstring.endswith('u'): return int(arg) 194 elif fstring.endswith('f'): return float(arg) 195 elif fstring.endswith('e'): return float(arg) 196 elif fstring.endswith('g'): return float(arg) 197 elif fstring.endswith('E'): return float(arg) 198 elif fstring.endswith('x'): return int('0x%s' % arg, 0) 199 elif fstring.endswith('X'): return int('0x%s' % arg, 0) 200 elif fstring.endswith('o'): return int('0%s' % arg, 0) 201 else: 202 logger.warn('Unable to cast %s using format %s' (arg,fstring)) 203 return arg
204 205
206 - def parseval(self,pvalue):
207 """ 208 Parse parameter value string to see if it is an external 209 variable and replace it with its value if found. 210 """ 211 return self.expparser.parse(pvalue)
212 213
214 - def EvaluateParameter(self,val,stype):
215 216 if stype == 'stringv': 217 vals = map(lambda x: self.EvaluateParameter(x,stype[:-1]),val) 218 retvals = [] 219 for v in vals: 220 if v.startswith('$glob('): 221 retvals.extend(glob.glob(v[len('$glob('):len(v)-1])) 222 else: 223 retvals.append(v) 224 return retvals 225 226 if stype in VectorTypes: 227 return map(lambda x: self.EvaluateParameter(x,stype[:-1]),val) 228 229 if stype == 'OMKey': 230 from iceprod.core.py3 import OMKey 231 232 stringid = self.parseval(val.stringid) 233 stringid = self.TypeValue(stringid,'int',None) 234 omid = self.parseval(val.omid) 235 omid = self.TypeValue(omid,'int',None) 236 return OMKey(stringid,omid) 237 238 pvalue = val.value 239 punit = val.unit 240 pformat = val.format 241 242 if stype == 'string' and not pformat == None: #format string 243 pformat = map(self.parseval,pformat.split(',')) 244 pvalue = self.format_string(pvalue,pformat) 245 246 #parse value string 247 pvalue = self.parseval(pvalue) 248 pvalue = self.TypeValue(pvalue, stype, punit) 249 return pvalue
250
251 - def ConfigureServices(self,i3config,tray):
252 if not i3config.GetServices(): return False 253 for serv in i3config.GetServices(): 254 sname = serv.GetName() 255 sclass = serv.GetClass() 256 tray.AddService(str(sclass), str(sname)) 257 logger.debug("loading class '%s' as service '%s'" % (sclass,sname)) 258 259 for param in serv.GetParameters(): 260 # If there is an environmental variable, expand it 261 pname = param.GetName() 262 stype = param.GetType() 263 pvalue = param.GetValue() 264 try: 265 pvalue = self.EvaluateParameter(pvalue,stype) 266 tray.SetParameter(str(sname), str(pname), pvalue) 267 print stype,pname,pvalue 268 except TypeError,e: 269 logger.error('%s: Type error thrown: %s' % (pname,str(e))) 270 excinfo = sys.exc_info() 271 sys.excepthook(excinfo[0],excinfo[1],excinfo[2]) 272 os._exit(50) 273 return True
274 275
276 - def ConfigureModules(self,i3config,tray):
277 if not i3config.GetModules(): return False 278 for mod in i3config.GetModules(): 279 mname = str(mod.GetName()) 280 mclass = str(mod.GetClass()) 281 lib = mod.GetProjectList()[0] 282 # support for python modules 283 if lib.GetType() == 'python': 284 mclass = __import__(lib.GetName(), globals(),locals(),[mclass]) 285 tray.AddModule(mclass, mname) 286 elif lib.GetType() == 'segment': 287 mclass = __import__(lib.GetName(), globals(),locals(),[mclass]) 288 tray.AddSegment(mclass, mname) 289 else: 290 tray.AddModule(mclass, mname) 291 logger.debug("loading class '%s' as module '%s'" % (mclass,mname)) 292 293 for param in mod.GetParameters(): 294 try: 295 pname = param.GetName() 296 stype = param.GetType() 297 pvalue = param.GetValue() 298 299 pvalue = self.EvaluateParameter(pvalue,stype) 300 tray.SetParameter(str(mname), str(pname), pvalue) 301 print stype,pname,pvalue 302 except TypeError,e: 303 logger.error('%s: Type error thrown: %s' % (pname,str(e))) 304 excinfo = sys.exc_info() 305 sys.excepthook(excinfo[0],excinfo[1],excinfo[2]) 306 os._exit(51) 307 return True
308
309 - def ConfigurePre(self,i3config,tray):
310 for mod in i3config.GetIceProdPres(): 311 mname = mod.GetName() 312 mclass = mod.GetClass() 313 314 tray.AddModule(str(mclass), str(mname)) 315 logger.debug("loading class '%s' as module '%s'" % (mclass,mname)) 316 317 tray.SetParser(str(mname),self.expparser) 318 319 for param in mod.GetParameters(): 320 try: 321 pname = param.GetName() 322 stype = param.GetType() 323 pvalue = param.GetValue() 324 325 pvalue = self.EvaluateParameter(pvalue,stype) 326 tray.SetParameter(str(mname), str(pname), pvalue) 327 except TypeError,e: 328 logger.error('%s: Type error thrown: %s' % (pname,str(e))) 329 excinfo = sys.exc_info() 330 sys.excepthook(excinfo[0],excinfo[1],excinfo[2]) 331 os._exit(52)
332
333 - def ConfigurePost(self,i3config,tray):
334 for mod in i3config.GetIceProdPosts(): 335 mname = mod.GetName() 336 mclass = mod.GetClass() 337 338 tray.AddModule(str(mclass), str(mname)) 339 logger.debug("loading class '%s' as module '%s'" % (mclass,mname)) 340 341 tray.SetParser(str(mname),self.expparser) 342 343 for param in mod.GetParameters(): 344 try: 345 pname = param.GetName() 346 stype = param.GetType() 347 pvalue = param.GetValue() 348 349 pvalue = self.EvaluateParameter(pvalue,stype) 350 tray.SetParameter(str(mname), str(pname), pvalue) 351 except TypeError,e: 352 logger.error('%s: Type error thrown: %s' % (pname,str(e))) 353 excinfo = sys.exc_info() 354 sys.excepthook(excinfo[0],excinfo[1],excinfo[2]) 355 os._exit(53)
356 357 358
359 - def ConnectBoxes(self,i3config,tray):
360 if not i3config.GetConnections(): return False 361 for conn in i3config.GetConnections(): 362 outbox = conn.GetOutbox() 363 from_module = outbox.GetModule() 364 inbox = conn.GetInbox() 365 to_module = inbox.GetModule() 366 tray.ConnectBoxes(str(from_module), 367 str(outbox.GetBoxName()), str(to_module)) 368 logger.debug("%s: Connection: from %s, to %s %s" % \ 369 (host,from_module,outbox.GetBoxName(), to_module)) 370 return True
371
372 - def TypeValue(self,pvalue, ptype,punit=None):
373 """ 374 Convert string to a typed value 375 @param pvalue: parameter value (string) 376 @param ptype: parameter type (string) 377 @param punit: optional parameter I3Unit (string) 378 """ 379 380 if ptype == 'bool': 381 return pvalue not in ["False","0","[]"] 382 if ptype == 'int': 383 return int(pvalue) 384 if ptype == 'long': 385 return long(pvalue) 386 if ptype == 'float' or ptype == 'double': 387 if punit: 388 from iceprod.core import py3 389 return py3.units(float(pvalue),punit) 390 else: 391 return float(pvalue) 392 if ptype == 'string': 393 return str(pvalue) 394 elif type.endswith("v"): 395 if type.startswith("string"): 396 return re.findall(r'\"[^\"]\"',pvalue) 397 else: 398 return val.split(",") 399 else: 400 logger.error('Unsupported type "%s"' % type ) 401 os._exit(40)
402 403
404 - def Run(self,i3config):
405 """ 406 Execute icetray 407 """ 408 from iceprod.core.lex import XMLSummaryParser 409 410 logger.debug('I3Tray') 411 412 # config versions prior to v2 did not have pre/post 413 config_version = 1 414 try: 415 config_version = self.steering.GetVersion() 416 logger.info("steering version %d" % config_version) 417 except Exception,e: 418 logger.warn(str(e) +": reverting to config version 1") 419 420 summaryfile = 'summary.xml' 421 if self.steering.GetParameter('summaryfile'): 422 summaryfile = self.steering.GetParameter('summaryfile').GetValue() 423 summaryfile = 'iceprod.' + self.expparser.parse(summaryfile) 424 425 if config_version > 1: 426 # execute pre modules first 427 428 pre = I3PreTray() 429 pre_summary = XMLSummaryParser() 430 if os.path.exists(summaryfile): 431 pre_summary.ParseFile(summaryfile) 432 pre.stats = pre_summary.summary_map 433 434 self.ConfigurePre(i3config,pre) 435 pre.Execute() 436 pre.Finish() 437 pre_summary.Write(summaryfile) 438 439 if i3config.GetModules(): # has I3 modules 440 from iceprod.core.py3 import I3Tray 441 442 self.LoadLibraries(i3config) 443 tray = I3Tray() 444 445 events = i3config.GetEvents() 446 events = int(self.parseval(events)) 447 448 services = self.ConfigureServices(i3config,tray) 449 modules = self.ConfigureModules(i3config,tray) 450 connections = self.ConnectBoxes(i3config,tray) 451 452 if modules: 453 if events > 0: 454 tray.Execute(events) 455 else: 456 tray.Execute() 457 458 # print memory usage at end of tray 459 for k,v in functions.getmemusage().items(): 460 print k,v 461 462 tray.Finish() 463 del tray 464 465 # Now execute posts 466 if config_version > 1: 467 468 # then configure post (so misconfigures posts can abort before running icetray) 469 post_summary = XMLSummaryParser() 470 471 post = I3PostTray() 472 if os.path.exists(summaryfile): 473 post_summary.ParseFile(summaryfile) 474 post.stats = post_summary.summary_map 475 476 self.ConfigurePost(i3config,post) 477 post.Execute() 478 479 ## At present we cannot process summary file in post tray since 480 ## Summary service won't write it until destructor is called 481 post.Finish() 482 post_summary.Write(summaryfile) 483 484 485 return 486 487 ######################################################################## 488
489 -def parse_opts(arglist):
490 opt_dict = { 491 'extern':0, 492 'nproc':1, 493 'procnum':0, 494 'dataset':0, 495 'iter':0, 496 'tray':0, 497 'step':0, 498 'validate':0 499 } 500 for o,a in arglist: 501 if o in ("-v", "--validate"): # should validate 502 opt_dict["validate"] = int(a) 503 elif o in ("-g", "--grid"): # specify grid ID 504 opt_dict["grid"] = a 505 elif o == "--date": 506 opt_dict["date"] = a 507 rundate = time.strptime(a, "%Y-%m-%d") 508 opt_dict["run.year"] = time.strftime("%Y",rundate) 509 opt_dict["run.month"] = time.strftime("%m",rundate) 510 opt_dict["run.day"] = time.strftime("%d",rundate) 511 else: 512 opt_dict[o.replace("--","")] = a 513 return opt_dict
514 515
516 -def usage(arguments):
517 """ 518 print usage/help info 519 520 @param arguments: cmdline args passed to program 521 """ 522 print >> sys.stderr, """ 523 Usage: %s [options] <file> 524 525 where options is one or more of 526 527 --nproc=<number of proceses> : Specify a value to substitute $arg(nproc) 528 529 --procnum=<process number> : Specify a process number to substitute $arg(procnum) 530 531 --dataset=<i3simdb key> : Specify the dataset that this job belongs to 532 533 --url=<soap url> : Url of soap server to comunicate with for lookup tables 534 535 """ % arguments[0]
536 537
538 -def main(opts,args,steering=None):
539 540 logging.basicConfig() 541 print '\n----------- environment contents ------------' 542 for envkey in os.environ.keys(): 543 print envkey, ":" 544 for env_val in os.environ[envkey].split(":"): 545 print "\t", env_val 546 547 optdict = parse_opts(opts) 548 xmlvalidate = False 549 if optdict.has_key('validate'): 550 xmlvalidate = optdict['validate'] 551 552 # Instantiate the IceTray object 553 if not steering: 554 steering = Steering() 555 xparser = IceTrayXMLParser(steering) 556 xparser.ParseFile(args[0],validate=xmlvalidate) 557 558 # Set node-dependent sys opts 559 steering.AddSysOpt(SysOpt('hostname', functions.gethostname())) 560 if optdict.has_key('cache'): 561 steering.AddSysOpt(SysOpt('cachedir',optdict['cache'])) 562 563 # Extract the tray out of the steering file 564 itray = int(optdict['tray']) 565 iiter = int(optdict['iter']) 566 logger.info("tray %u, iter %u" % (itray,iiter)) 567 if itray < len(steering.GetTrays()): 568 569 tray = steering.GetTrays()[itray] 570 571 max_iter = int(tray.GetIterations()) -1 572 optdict['max_iter'] = max_iter 573 prev_max_iter = 1 574 if itray > 0: 575 prev_max_iter = int(steering.GetTrays()[itray-1].GetIterations())-1 576 optdict['prev_max_iter'] = prev_max_iter 577 578 else: 579 print >> sys.stderr, 'assertion failed: itray < len(steering.GetTrays())' 580 sys.exit(11) 581 582 runconfig = RunConfig(steering,optdict) 583 584 if int(optdict['extern']): 585 if len(steering.GetExterns()): 586 try: # Process any external proc 587 logger.info("processing externs") 588 retval = runconfig.ProcExtern() 589 os._exit(retval) 590 except Exception,e: 591 excinfo = sys.exc_info() 592 sys.excepthook(excinfo[0],excinfo[1],excinfo[2]) 593 os._exit(1) 594 else: 595 logger.info("no externs to process") 596 os._exit(0) 597 598 logger.debug('Execute') 599 logger.info('/'.join(os.uname())) 600 601 # Run it! 602 runconfig.Run( tray ) 603 print 'done' 604 return 0
605 606 if __name__ == '__main__': 607 retval = main(sys.argv) 608 os._exit(retval) 609