Package iceprod :: Package core :: Module exe
[hide private]
[frames] | no frames]

Source Code for Module iceprod.core.exe

   1  #!/usr/bin/python 
   2  # @author: Juan Carlos Diaz-Velez 
   3  # @date:   04 May 2005 
   4  # (c) IceCube collaboration. 
   5  # @brief:  Steering script for running I3Sim on Condor. 
   6  # Evaluator: Module for parsing and evaluating expressions in IceTray values 
   7  # 
   8  ###################################################################### 
   9  import os,sys,time,string,re 
  10  import signal 
  11  import random 
  12  import getpass 
  13  import os.path,getopt 
  14  import commands 
  15  import platform 
  16  import logging 
  17  import glob 
  18  import time 
  19  import re 
  20  from os.path import expandvars,join,exists,basename,dirname,abspath 
  21  import iceprod.core 
  22  from iceprod.core.dataclasses import * 
  23  from iceprod.core.xmlparser import IceTrayXMLParser 
  24  from iceprod.core.lex import * 
  25  from iceprod.core import functions 
  26  from iceprod.core.constants import * 
  27  from iceprod.core.inventory import FileInventory 
  28   
  29  import xmlrpclib 
  30  import socket 
  31  import cPickle 
  32  from ConfigParser import SafeConfigParser 
  33  from ipxml import PrettyPrint 
  34   
  35  import traceback 
  36   
  37  _platform = None 
  38  _inventory = "sp_outfile_inventory.list" 
  39  _done  = False 
  40  _clean = True 
  41   
  42  EVICTED = 1 
  43  FAILED  = 2 
  44  python_version = platform.python_version()[:3] 
  45   
  46  _child = 0 
  47   
  48  _baseurl = 'http://icecube.wisc.edu/simulation/downloads' 
  49  _tmpstorage = '/tmp/%s_icesoft' % getpass.getuser() 
  50   
51 -def get_tmpstorage(tmpstorage):
52 if ':' in tmpstorage: 53 # more than one path specified, choose first good one 54 storage = None 55 for t in tmpstorage.split(':'): 56 base = dirname(t) 57 if not exists(base): 58 continue 59 if not exists(t): 60 try: os.makedirs(t) 61 except: continue 62 storage = t 63 break 64 if storage: 65 tmpstorage = storage 66 else: 67 tmpstorage = _tmpstorage 68 return tmpstorage
69
70 -def fetch_file(url,tmpstorage=_tmpstorage,basedir='.'):
71 tmpstorage = get_tmpstorage(tmpstorage) 72 73 filename = basename(url) 74 lockfile = join(tmpstorage,"%s.lock" % filename) 75 cache = join(tmpstorage,filename) 76 mdcache = cache + '.md5sum' 77 78 #fetch checksum 79 md5url = url + '.md5sum' 80 print "retrieving %s..." % md5url 81 failed = wget(md5url) 82 if failed: # No md5sum found. Revert to simple wget 83 return wget(url) 84 85 cwd = os.getcwd() 86 # See if we need to download new tarball 87 trials = 5 # limit number of attempts 88 while (not exists(cache)) or \ 89 (not checksum(join(cwd,filename+'.md5sum'),mdcache)): 90 91 print cache, "does not exist. " 92 os.chdir(tmpstorage) 93 try: # create lockfile 94 os.mknod(lockfile) 95 except OSError,oserr: # file exists 96 print "%s %s ." % (oserr,lockfile) 97 time.sleep(300) # 5min 98 try: 99 if (time.time() - os.stat(lockfile).st_mtime) > 2400: 100 os.remove(lockfile) 101 except: pass 102 continue # check cache again 103 104 os.system("rm -rf %s %s" % (cache,mdcache)) 105 print "retrieving %s..." % (url) 106 failed = wget(url) 107 if failed: 108 os.remove(lockfile) 109 raise Exception, "unable to fetch file from %s" % url 110 111 # create md5sum 112 cmd = "md5sum %s >%s" % (filename,mdcache) 113 print cmd 114 os.system(cmd) 115 os.remove(lockfile) 116 os.chdir(cwd) 117 118 119 if trials < 1: 120 raise Exception, "Max no. of trails reached to fetch '%s'" % url 121 trials -= 1 # decrement counter 122 123 os.chdir(cwd) # if not there already 124 125 if abspath(filename) != abspath(cache): 126 if exists(filename): 127 os.remove(filename) 128 os.system("ln -s %s" % cache) 129 return failed
130 131
132 -def fetch_tarball(meta,tmpstorage=_tmpstorage,basedir='.'):
133 tmpstorage = get_tmpstorage(tmpstorage) 134 135 lockfile = join(tmpstorage,"%s.lock" % meta.filebase) 136 cache = join(tmpstorage,meta.filebase + meta.suffix) 137 cachedir = join(tmpstorage,meta.filebase) 138 139 #fetch checksum 140 md5url = meta.url + '.md5sum' 141 print "retrieving %s..." % md5url 142 failed = wget(md5url) 143 if failed: raise Exception, "unable to fetch file from %s" % md5url 144 145 cwd = os.getcwd() 146 # See if we need to download new tarball 147 trials = 5 # limit number of attempts 148 while (not exists(cache)) or \ 149 (not exists(cachedir)) or \ 150 (not checksum(join(cwd,meta.md5sum),join(cachedir,meta.md5sum))): 151 152 os.chdir(tmpstorage) 153 try: # create lockfile 154 os.mknod(lockfile) 155 except OSError,oserr: # file exists 156 print "%s %s ." % (oserr,lockfile) 157 time.sleep(300) # 5min 158 try: 159 if (time.time() - os.stat(lockfile).st_mtime) > 2400: 160 os.remove(lockfile) 161 except: pass 162 continue # check cache again 163 164 os.system("rm -rf %s %s" % (cache,cachedir)) 165 print "retrieving %s..." % (meta.url + meta.suffix) 166 failed = wget(meta.url+meta.suffix) 167 if failed: 168 os.remove(lockfile) 169 raise Exception, "unable to fetch file from %s" % meta.url + meta.suffix 170 171 if os.path.isdir(meta.filebase): # clean old i3_work 172 os.system("rm -rf " + meta.filebase) 173 174 # create md5sum 175 os.mkdir(meta.filebase) # create i3_work 176 os.system("tar -C%s -xzf %s" % (basedir,cache)) 177 os.system("md5sum %s > %s" % (meta.filebase+meta.suffix,join(cachedir,meta.md5sum))) 178 os.remove(lockfile) 179 os.chdir(cwd) 180 181 trials -= 1 # decrement counter 182 if trials < 1: 183 raise Exception, "Max no. of trails reached to stage metaproject '%s'" % meta.filebase 184 185 os.chdir(cwd) # if not there already 186 meta.path = join(tmpstorage,meta.filebase) 187 return meta
188 189 190 #------ start of class Run -----
191 -class RunInstance:
192
193 - def setstats(self,name,value):
194 if not self.stats.has_key(name): 195 self.stats[name] = value 196 elif type(value) == type(float()): 197 self.stats[name] += value 198 elif type(value) == type(int()): 199 self.stats[name] += value 200 else: 201 self.stats[name] = value
202
203 - def setpython(self,tray):
204 if self.steering.GetTrays()[tray].GetPython(): 205 self.python = self.steering.GetTrays()[tray].GetPython() 206 else: 207 self.python = sys.executable 208 if not os.path.exists(self.python): 209 meta = I3Tarball() 210 meta.platform = getplatform() 211 meta.name = self.python 212 baseurl = os.path.join(self.baseurl,'python') 213 if functions.isurl(self.python): 214 meta.name = os.path.basename(self.python) 215 baseurl = os.path.dirname(self.python) 216 meta.suffix = ".tar.gz" 217 meta.filebase = "%s.%s" % (meta.name, meta.platform) 218 meta.md5sum = "%s.md5sum" % meta.filebase 219 meta.url = "%s/%s" % (baseurl,meta.filebase) 220 meta = fetch_tarball(meta,self.cache) 221 self.python = os.path.join(meta.path,'bin','python')
222
223 - def __init__(self,opts):
224 global _baseurl 225 self.logger = logging.getLogger('iceprod::exe') 226 227 self.python = sys.executable 228 self.starttime = time.time() 229 self.env = {} 230 self.opts = opts 231 self.passkey = None 232 self.production = False 233 self.server = None 234 self.nproc = 1 235 self.procnum = 0 236 self.dataset = 0 237 self.configfile = 'config.xml' 238 self.statsfile = 'summary.xml' 239 self.url = None 240 self.topdir = None 241 self.workdir = "." 242 self.build = None 243 self.baseurl = _baseurl 244 self.stats = {} 245 self.statusmsg = [] 246 self.excludefiles = [] 247 self.debug = False 248 self.mktar = False 249 self.suffix_in = [] 250 self.mpdownloaded = {} 251 self.logoutput = "icetray.log" 252 self.err = "icetray.err" 253 self.out = "icetray.out" 254 self.ping = 0 255 self.stageout = False 256 self.nocopy = False 257 self.target = None 258 self.cache = '/tmp/%s_icesoft' % getpass.getuser() 259 self.localhost = os.uname()[1] 260 self.grid = 0 261 self.socktimeout = 0 262 self.validate = 0 263 264 # override defaults with cmdline options 265 if self.opts.has_key("url"): 266 self.url = self.opts["url"] 267 self.production = True 268 if self.opts.has_key("ping"): 269 self.ping = int(self.opts["ping"]) 270 if self.opts.has_key("dataset"): 271 self.dataset = int(self.opts["dataset"]) 272 if self.opts.has_key("procnum"): 273 self.procnum = int(self.opts["procnum"]) 274 if self.opts.has_key("nproc"): 275 self.nproc = int(self.opts["nproc"]) 276 if self.opts.has_key("key"): 277 self.passkey = self.opts['key'] 278 if self.opts.has_key("fetch"): 279 self.baseurl = self.opts['fetch'] 280 if self.opts.has_key("mktar"): 281 self.mktar = True 282 if self.opts.has_key("debug"): 283 self.debug = int(self.opts["debug"]) 284 if self.opts.has_key("target"): 285 self.target = self.opts['target'] 286 if self.opts.has_key("stageout"): 287 self.stageout = int(self.opts['stageout']) 288 if self.opts.has_key("nocopy"): 289 self.nocopy = True 290 if self.opts.has_key("grid"): 291 self.grid = int(self.opts["grid"]) 292 if self.opts.has_key("timeout"): 293 self.socktimeout = int(self.opts["timeout"]) 294 if self.opts.has_key("validate"): 295 self.validate = int(self.opts["validate"]) 296 297 if self.opts.has_key("python"): 298 self.python = self.opts["python"]
299
300 - def configure(self):
301 """ 302 Configure I3Run instance and set status for monitoring system 303 @return: True if job was configures succesfully 304 """ 305 print "getting config for dataset %d " % self.dataset 306 self.out = os.path.join(self.topdir,"iceprod.dataset_%d.queue_%d.out" % (self.dataset,self.procnum)) 307 self.err = os.path.join(self.topdir,"iceprod.dataset_%d.queue_%d.err" % (self.dataset,self.procnum)) 308 myputenv("ICEPROD_STDOUT",os.path.abspath(self.out)) 309 myputenv("ICEPROD_STDERR",os.path.abspath(self.err)) 310 311 # Set the timeout for XMLRPC connections 312 if self.socktimeout: 313 socket.setdefaulttimeout(self.socktimeout) 314 315 try: 316 if self.production and not os.path.exists(self.configfile): 317 domtree = cPickle.loads(self.server.getconfig(self.dataset)) 318 self.steering = IceTrayXMLParser(Steering()).LoadDocument(domtree) 319 cfg = open(self.configfile,'w') 320 PrettyPrint(domtree,cfg) 321 cfg.close() 322 else: 323 self.steering = IceTrayXMLParser(Steering()).ParseFile(self.configfile,validate=self.validate) 324 except Exception, e: 325 print >> sys.stderr, e,"aborting" 326 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 327 return False 328 329 self.steering.AddSysOpt(SysOpt('hostname', functions.gethostname())) 330 331 # configure parameter parser 332 steerdict = self.steering.get_param_dict() 333 self.parser = ExpParser(self.opts,self.steering) 334 335 if self.steering.GetSysOpt("cache"): 336 cache = '/tmp' 337 for c in self.steering.GetSysOpt("cache").GetValue().split(':'): 338 c = expandvars(c) 339 if not os.path.exists(c): 340 continue 341 c = os.path.join(c,'%s_icesoft' % getpass.getuser()) 342 if not exists(c): 343 try: os.makedirs(c) 344 except: continue 345 try: os.makedirs(os.path.join(c,'test')) 346 except: continue 347 os.rmdir(os.path.join(c,'test')) 348 cache = c 349 break 350 self.cache = cache 351 if not exists(self.cache): 352 try: os.makedirs(self.cache) 353 except: pass 354 355 # set dag temporary storage for intermediate files 356 if self.steering.GetSysOpt("dagtemp"): 357 self.dagtemp = self.parser.parse(self.steering.GetSysOpt("dagtemp").GetValue()) 358 pos = self.dagtemp.find("://") 359 self.protocol = self.dagtemp[0:pos] 360 host_end = self.dagtemp.find("/",pos+3) 361 self.host = self.dagtemp[pos+3:host_end] 362 self.basedir = self.dagtemp[host_end:] 363 364 if self.protocol != 'gsiftp': 365 # only support either GridFTP or local file 366 self.basedir = self.host + self.basedir 367 self.host = '' 368 369 # set globus environment 370 if self.steering.GetSysOpt("proxy_delegate") and \ 371 self.steering.GetSysOpt("proxy_delegate").GetValue() == 'True': 372 unpack() 373 globus_proxy = abspath(expandvars(self.steering.GetSysOpt("globus_proxy").GetValue())) 374 globus_loc = abspath(expandvars(self.steering.GetSysOpt("globus_location").GetValue())) 375 globus_path = os.path.join(globus_loc, "bin") 376 globus_certs = os.path.join(globus_loc, "certificates") 377 globus_lib = os.path.join(globus_loc, "lib") 378 os.chmod(globus_proxy,0600) 379 myputenv("GLOBUS_LOCATION",globus_loc) 380 myputenv("X509_CERT_DIR",globus_certs) 381 myputenv("X509_USER_PROXY",globus_proxy) 382 383 384 if steerdict.has_key('outputfiles'): 385 outfiles = self.parser.parse(steerdict['outputfiles']) 386 self.suffix_in.extend(map(string.strip,outfiles.split(','))) 387 if steerdict.has_key('excludefiles'): 388 excludefiles = self.parser.parse(steerdict['excludefiles']) 389 self.excludefiles.extend(map(string.strip,excludefiles.split(','))) 390 print "excluded files:",self.excludefiles 391 if steerdict.has_key('summaryfile'): 392 self.statsfile = self.parser.parse(steerdict['summaryfile']) 393 394 os.system("touch %s/work.tgz" % self.topdir ) 395 self.getenv() 396 print self.url,self.dataset,self.procnum,self.passkey 397 return self.startsoap(self.url,self.dataset,self.procnum,self.passkey)
398 399
400 - def getenv(self):
401 """ 402 Get current environment and save it 403 """ 404 self.env['PYTHONPATH'] = expandvars("$PYTHONPATH") 405 self.env['PATH'] = expandvars("$PATH") 406 self.env['LD_LIBRARY_PATH'] = expandvars("$LD_LIBRARY_PATH") 407 self.env['DYLD_LIBRARY_PATH'] = expandvars("$DYLD_LIBRARY_PATH")
408
409 - def setenv(self,i3build):
410 """ 411 Configure environment for run 412 @param i3build: working directory path 413 """ 414 # See if we need to do anything with the environment 415 if self.build == i3build: 416 print "setenv: Using previously selected build environment in" 417 print i3build 418 return 419 self.build = i3build 420 421 # determine the plaform we are running on 422 platform = getplatform() 423 myputenv('I3_PLATFORM','') 424 if platform == 'system': # forcing a getplatform 425 platform = getplatform(reset=True) 426 myputenv('I3_PLATFORM',platform) 427 myputenv('PLATFORM',platform) 428 print platform 429 print "Host info:" 430 print ' '.join(os.uname()) 431 print "running in "+os.getcwd() 432 myputenv('I3_BUILD', i3build) 433 myputenv('I3_WORK', os.path.join(i3build,'projects')) 434 myputenv('I3_SRC', i3build) 435 myputenv('I3_PORTS', i3build) 436 myputenv('I3_TOOLS', i3build) 437 rootsys = os.path.join(i3build,'cernroot') 438 if os.path.exists(rootsys): 439 myputenv('ROOTSYS', rootsys) 440 elif self.steering.GetSysOpt("rootsys"): 441 rootsys = self.steering.GetSysOpt("rootsys").GetValue() 442 myputenv('ROOTSYS', expandvars(rootsys)) 443 else: 444 myputenv('ROOTSYS', i3build) 445 446 # SETUP JAVA ENVIRONMENT 447 #find any java installed on system 448 javasearchpath = expandvars('$JAVA_HOME:/usr/java:/usr/local/java') 449 java = functions.findjava(javasearchpath) 450 451 ldlibpath = [] 452 if java: 453 myputenv('JAVA_HOME',java[0]) 454 ldlibpath.append(java[1]) 455 else: 456 print >> sys.stderr,"!!!!!!!!!JAVA NOT FOUND IN %s!!!!!!!!!!!"% javasearchpath 457 print expandvars("using java in $JAVA_HOME") 458 459 #Set library path 460 ldlibpath.append(expandvars("$ROOTSYS/lib")) 461 ldlibpath.append(expandvars("$I3_BUILD/lib")) 462 ldlibpath.append(expandvars("$I3_BUILD/lib/tools")) 463 if os.uname()[4].endswith("64"): 464 ldlibpath.append(expandvars("/usr/lib64")) 465 ldlibpath.append(expandvars("/usr/lib")) 466 ldlibpath.extend(glob.glob(self.python.replace("bin/python","lib*"))) 467 ldlibpath.append(self.env['LD_LIBRARY_PATH']) 468 myputenv('DYLD_LIBRARY_PATH', ":".join(ldlibpath)) 469 myputenv('LD_LIBRARY_PATH', ":".join(ldlibpath)) 470 471 #Set other environment paths 472 pythonpath = [] 473 pythonpath.append(expandvars(".")) 474 pythonpath.append(expandvars("$I3_BUILD/lib")) 475 pythonpath.append(expandvars("$I3_BUILD/lib/python")) 476 pythonpath.append(expandvars("$I3_BUILD/lib/python/site-packages")) 477 pythonpath.extend(glob.glob(self.python.replace("bin/python","lib*/python*/site-packages"))) 478 pythonpath.append(self.env['PYTHONPATH']) 479 myputenv('PYTHONPATH', ":".join(pythonpath)) 480 481 482 path = [] 483 path.append(expandvars(".")) 484 path.append(expandvars("$I3_BUILD/bin")) 485 path.append(expandvars("$PYROOT/bin")) 486 path.append(expandvars("/bin")) 487 path.append(expandvars("/usr/bin")) 488 path.append(self.env["PATH"]) 489 myputenv('PATH', ":".join(path)) 490 491 myputenv('USER_CLASSPATH',expandvars("$I3_WORK/monolith-reader/resources/triggerUtil.jar")) 492 myputenv('ICETRAY_CLASSPATH',expandvars("$I3_BUILD/lib/mmc.jar")) 493 494 495 print '\n----------- environment contents ------------' 496 for envkey in os.environ.keys(): 497 print envkey, ":" 498 for env_val in os.environ[envkey].split(":"): 499 print "\t", env_val
500 501
502 - def fetch_metaproject(self,tray=0,tmpstorage=_tmpstorage):
503 """ 504 Cached download of metaproject tarball 505 @param tray: integer tray index in config file 506 @param tmpstorage: location to cache software 507 """ 508 tmpstorage = get_tmpstorage(tmpstorage) 509 510 # check disk space 511 dfout = os.popen("df -P %s" % tmpstorage) 512 dfout.readline() # pop header line 513 avail_disk = int(dfout.readline().strip().split()[3]) 514 if avail_disk < 6*1039501: # approx 5G 515 print "cachdir %s does not have enough space" % tmpstorage 516 # clean up old meta-projects 517 if not os.path.abspath(tmpstorage).startswith(os.path.abspath(self.topdir)): 518 for dir in os.listdir(tmpstorage): 519 dir_age = time.time() - os.stat(os.path.join(tmpstorage,dir)).st_mtime 520 if dir_age > 3600*24*14: # anything older than a two weeks 521 functions.removedirs(dir) 522 tmpstorage = self.topdir 523 524 if not tray < len(self.steering.GetTrays()): 525 raise Exception,"requested a tray index > number of trays in configuration" 526 527 # Check if there is metaproject to fetch 528 if not self.steering.GetTrays()[tray].GetMetaProjectList(): 529 return "/tmp/" 530 531 # Extract the nth tray and get its metaproject 532 mp = self.steering.GetTrays()[tray].GetMetaProjectList()[0] 533 mpname = mp.GetName() 534 mpver = mp.GetVersion() 535 536 if self.mpdownloaded.has_key(mpname) and self.mpdownloaded[mpname].version == mpver: 537 print "fetch_metaproject: tray %d " % tray 538 print "%s.%s is already downloaded from previous tray." % (mpname,mpver) 539 print self.mpdownloaded[mpname].path 540 print "Skipping download" 541 return self.mpdownloaded[mpname].path 542 meta = I3Tarball() 543 meta.name = mpname 544 meta.version = mpver 545 self.mpdownloaded[meta.name] = None 546 547 platform,gccversion = getplatform().split('.gcc-') # determine the plaform 548 gccversion = 'gcc-'+gccversion 549 meta.suffix = ".tar.gz" 550 server = xmlrpclib.ServerProxy(self.url) 551 url = server.get_tarball(meta.name,meta.version,platform,gccversion) 552 if not url: 553 raise Exception, "No compatible plaforms found (%s.%s)" % (platform, gccversion) 554 555 meta.url = os.path.join(self.baseurl,url.replace(meta.suffix,'')) 556 meta.filebase = os.path.basename(meta.url) 557 meta.md5sum = "%s.md5sum" % meta.filebase 558 self.mpdownloaded[meta.name] = fetch_tarball(meta,tmpstorage) 559 return meta.path
560 561
562 - def fetch_dependencies(self):
563 """ 564 Download files needed from server. 565 """ 566 print "fetching dependencies %s" % self.baseurl 567 for d in self.steering.GetDependencies(): 568 if type(d) in types.StringTypes: 569 fetchurl = self.parser.parse(d) 570 else: 571 fetchurl = self.parser.parse(d.GetName()) 572 if not isurl(fetchurl): 573 fetchurl = self.parser.parse("%s/%s" % (self.baseurl,fetchurl)) 574 575 print "retrieving %s..." % fetchurl 576 failed = wget(fetchurl) 577 #failed = fetch_file(fetchurl,tmpstorage=self.cache) 578 if failed: 579 raise Exception, "unable to fetch file from %s" % fetchurl 580 self.excludefiles.append(basename(fetchurl)) 581 return self.excludefiles
582 583
584 - def getmemusage(self, stats):
585 # Get memory usage info 586 if not os.uname()[0] == 'Linux': 587 self.logger.info("getmemusage: Not a Linux machine") 588 return 589 usage = open("/proc/self/status") 590 for line in usage.readlines(): 591 if line.startswith('Vm'): 592 name,value,unit = line.split() 593 value = float(value.strip()) 594 if unit == "MB": 595 value = value*1024 596 if not stats.has_key(name): 597 stats[name] = value 598 else: 599 stats[name] = max(value,stats[name]) 600 usage.close()
601
602 - def getstats(self, file=None):
603 # read summary map from IceTray 604 if not file: file = self.statsfile 605 stats = {} 606 if not os.path.exists(file): 607 print "Unable to find statsfile", file 608 return self.stats 609 if file.endswith('.xml'): 610 stats = XMLSummaryParser().ParseFile(file) 611 else: 612 if file: 613 statsfile = open(file,'r') 614 else: 615 statsfile = open(self.statsfile,'r') 616 for line in statsfile: 617 s = line.split(':') 618 if len(s) > 1: 619 stats[s[0].strip()] = s[1].strip() 620 statsfile.close() 621 622 for key in stats.keys(): 623 try: 624 self.setstats(key,float(stats[key])) 625 except: 626 self.setstats(key,stats[key]) 627 628 # self.getmemusage(self.stats) 629 return stats
630
631 - def tarwork(self):
632 workpath = dirname(self.workdir) 633 workdir = basename(self.workdir) 634 os.system("tar -C%s -czf %s/work.tgz ./%s" % (workpath,self.topdir,workdir))
635
636 - def runextern(self):
637 externstats = '.extern.stats' 638 print '\n\n------ begin proc %d externs -------\n' % self.procnum 639 outfile = os.path.join(self.workdir,"i3exe_%d.out" % self.procnum) 640 cmd = "" 641 if os.path.exists("/usr/bin/time"): 642 cmd += "/usr/bin/time -o %s " % externstats 643 cmd += " -f \"" 644 cmd += "mem_heap: %K\\n" 645 cmd += "mem_heap_peak:%M\\n" 646 cmd += "user_time:%U\\n" 647 cmd += "sys_time:%S\\n" 648 cmd += "real_time:%e\" -- " 649 cmd += "%s i3exec.py --nproc=%d " % (self.python ,self.nproc) 650 cmd += " --procnum=%d --dataset=%d " % (self.procnum,self.dataset) 651 cmd += " --fetch=%s " % self.opts['fetch'] 652 cmd += " --extern=1" 653 cmd += " --lib=" + " --lib=".join(self.opts['lib']) 654 if self.url: 655 cmd += " --url=%s " % self.url 656 cmd += " --runcfg" 657 cmd += " %s" % self.configfile 658 cmd = expandvars(cmd) 659 print cmd 660 sys.stdout.flush() 661 662 #retval = os.system("%s>%s" % (cmd,outfile)) 663 retval = os.system(cmd) 664 stats = self.getstats(externstats) 665 return retval,stats
666
667 - def get_trays(self):
668 """ 669 Extract number of trays and iterations in each tray 670 @return: array of indices indicating the number of iterations in each tray 671 """ 672 iters = map(IceTrayConfig.GetIterations,self.steering.GetTrays()) 673 return map(int,iters)
674
675 - def setlog4cplus(self):
676 """ 677 Configure log4cplus to write output logging 678 @return: path to log output file 679 """ 680 lg4cppath = os.path.join(self.workdir,"log4cplus.conf") 681 myputenv('I3LOGGING_CONFIG', lg4cppath) 682 lg4cp = open(lg4cppath,'w') 683 print >> lg4cp, "log4cplus.appender.default=log4cplus::FileAppender" 684 print >> lg4cp, "log4cplus.appender.default.File=%s" % self.logoutput 685 print >> lg4cp, "log4cplus.appender.default.layout.ConversionPattern=%d [%d{%Z}] %-5p %c %x: %F:% L %m%n" 686 if self.debug: 687 print >> lg4cp, "log4cplus.rootLogger=INFO, default" 688 else: 689 print >> lg4cp, "log4cplus.rootLogger=WARN, default" 690 lg4cp.close() 691 692 # Now add python loggin 693 handler = logging.FileHandler(self.logoutput) 694 formatter = logging.Formatter("iceprod : %(levelname)s %(name)s : %(message)s") 695 handler.setFormatter(formatter) 696 self.logger.addHandler(handler) 697 698 os.system("touch %s" % self.logoutput) 699 return self.logoutput
700
701 - def flush_output(self):
702 """ 703 Cat out/err files 704 """ 705 os.system("cat %s" % self.out) 706 os.system("cat %s >&2" % self.err) 707 sys.stdout.flush()
708
709 - def run(self):
710 """ 711 Execute each of the icetray iterations in configuration 712 @return: execution value (0 if everything went ok) 713 """ 714 retval = 0 715 tray = 0 716 717 msg = "Configuring log4cplus....." 718 print msg 719 try: 720 self.setlog4cplus() 721 msg += 'ok' 722 except Exception,e: 723 msg += 'failed' 724 print >> sys.stderr, exc_type, e 725 print >> sys.stderr, msg 726 self.statusmsg.append(msg.decode("utf8","ignore")) 727 728 msg = "%s - IceTray externs....." % time.strftime("%H:%M:%S") 729 print msg 730 retval,stats = self.runextern() 731 print "return value", retval 732 if retval==0: 733 msg += 'ok' 734 self.statusmsg.append(msg.decode("utf8","ignore")) 735 else: 736 msg += 'failed' 737 self.statusmsg.append(msg.decode("utf8","ignore")) 738 if self.debug: self.tarwork() 739 return 5 740 741 742 for iterations in self.get_trays(): 743 744 # download and setup metaproject tarball (if needed) 745 build = self.fetch_metaproject(tray,tmpstorage=self.cache) 746 os.system("touch %s" % build) 747 self.setpython(tray) 748 self.setenv(build) 749 750 for iter in range(iterations): 751 msg = "%s - IceTray %d iteration %d....." % (time.strftime("%H:%M:%S"),tray,iter) 752 print msg 753 iter_start = time.time() 754 retval = self.execute(tray,iter) 755 print "return value", retval 756 self.setstats("tray(%u,%u) exec time" % (tray,iter),time.time()-iter_start) 757 if retval==0: 758 msg += 'ok' 759 self.statusmsg.append(msg.decode("utf8","ignore")) 760 print msg 761 if self.production and (self.ping > 0) and (iter%2 == 0): 762 try: 763 self.server.ping(self.dataset,self.procnum,self.localhost,self.passkey,tray,iter+1) 764 except Exception,e: 765 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 766 else: 767 msg += 'failed' 768 self.statusmsg.append(msg.decode("utf8","ignore")) 769 self.statusmsg.append('<br>---- log4cplus ----<br>') 770 self.statusmsg.append(functions.tail(self.logoutput,100)) 771 self.statusmsg.append('<br>---- stdout ----<br>') 772 self.statusmsg.append(functions.tail(self.out,200).decode("utf8","ignore")) 773 self.statusmsg.append('<br>---- stderr ----<br>') 774 self.statusmsg.append(functions.tail(self.err,350).decode("utf8","ignore")) 775 print >> sys.stderr, msg 776 if self.debug: self.tarwork() 777 return retval 778 tray +=1 779 print "statsfile",self.statsfile 780 if self.statsfile.endswith('.xml'): 781 try: 782 self.getstats(self.statsfile) 783 except Exception,e: 784 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 785 print "unable to process summary file %s" % self.statsfile 786 if self.statsfile.endswith('.xml') and os.path.exists('iceprod.'+self.statsfile): 787 try: 788 self.getstats('iceprod.'+self.statsfile) 789 except Exception,e: 790 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 791 print "unable to process summary file iceprod.%s" % self.statsfile 792 793 if self.finish(): 794 return retval 795 else: return 7
796
797 - def finish(self):
798 """ 799 Stage out generated files and generate an inventory 800 """ 801 # set location of output tarball 802 tar = os.path.join(self.topdir,'output_%d_%d.tar' % (self.dataset,self.procnum)) 803 procnum = self.procnum 804 dataset = self.dataset 805 passkey = self.passkey 806 807 # copy output files to top directory 808 # exclude invisible files and .err .out 809 filelist = [] 810 if self.target: filelist.append(basename(self.logoutput)) 811 812 # also exclude gunzipped version of gzipped files 813 exclude_gunzip = map(lambda s: s.replace(".gz",""),self.excludefiles) 814 self.excludefiles.extend(exclude_gunzip) 815 816 # include all .root .i3 files 817 suffix = ["*.root","*.i3"] 818 suffix.extend( self.suffix_in ) 819 suffix_gz= map(lambda s: s+".gz",suffix) 820 suffix.extend( suffix_gz ) 821 suffix_in_gz= map(lambda s: s+".gz",self.suffix_in) 822 self.suffix_in.extend( suffix_in_gz ) 823 map(filelist.extend,map(glob.glob,suffix)) 824 excludelist = [] 825 map(excludelist.extend,map(glob.glob,self.excludefiles)) 826 print 'filelist',filelist 827 print 'excludelist',excludelist 828 829 # Set final progress of job 830 if self.production: 831 try: 832 tray = len(self.get_trays()) - 1 833 iter = self.get_trays()[-1] 834 self.server.ping(self.dataset,self.procnum,self.localhost,self.passkey,tray,iter) 835 except: pass 836 837 print "copying data" 838 if self.stageout and self.production: 839 self.logger.info("setting status to COPYING") 840 maxtries = 5 841 for trial in range(maxtries): 842 try: 843 self.server.copying(self.dataset,self.procnum,self.passkey) 844 break 845 except Exception, e: 846 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 847 self.logger.error(e) 848 if trial >= maxtries-1: 849 self.statusmsg.append('%s: Job was unable set status to copying' % e) 850 self.logger.error("COPYING failed") 851 self.abort(FAILED) 852 return False 853 # generate inventory 854 inventorylist = [] 855 inventory = open(os.path.join(self.topdir,_inventory),'w') 856 857 self.logger.info("generating inventory") 858 self.nocopy = self.stageout 859 if not self.nocopy: 860 for file in filelist: 861 if re.match(r'^\.',file) or file in excludelist: 862 if file not in self.suffix_in: continue 863 864 print file 865 if not (file.endswith('.gz') or file.endswith('.root')): 866 cmd ="gzip "+file+" -c > "+file+".gz" 867 print cmd 868 os.system(cmd) 869 file = file+".gz" 870 if self.mktar: 871 dir = os.path.dirname(file) 872 cmd = "tar" 873 if os.path.isdir(dir): 874 cmd += " -C%s " % dir 875 if not os.path.exists(tar): 876 cmd += " -cf" 877 else: 878 cmd += " -uf" 879 cmd += " %s %s" %(tar,basename(file)) 880 print cmd 881 os.system(cmd) 882 883 inventory.close() 884 self.logger.info("inventory done") 885 self.logger.info("Setting i3exec runtime") 886 self.setstats("i3exec runtime",time.time()-self.starttime) 887 888 self.logger.info("production: %s" % self.production) 889 if self.production: 890 self.logger.info("Setting final job status") 891 892 try: 893 stats = {} 894 for key,value in self.stats.items(): 895 self.logger.info("%s = %s" % (key,value)) 896 897 if self.stats.has_key(key): 898 stats[key] = value 899 continue 900 901 for keepkey in steering.GetStatistics(): 902 if re.match(keepkey,key): 903 stats[key] = value 904 break 905 906 self.server.finish(self.dataset,self.procnum,cPickle.dumps(stats),self.passkey,self.stageout) 907 except Exception, e: 908 self.statusmsg.append('%s: Job was unable to set finish status' % e) 909 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 910 print e 911 self.logger.error("setting status failed") 912 self.abort(FAILED) 913 914 xmlinventory = os.path.join(self.topdir,iceprod.core.inventory.name) 915 self.logger.info("xmlinventory %s" % xmlinventory) 916 try: 917 oi = FileInventory() 918 if os.path.exists(xmlinventory): 919 self.logger.info("xmlinventory %s exists" % xmlinventory) 920 oi.Read(xmlinventory) 921 922 oi.Write(xmlinventory) 923 for file in oi.filelist: 924 source = file['source'] 925 print source 926 if source.startswith('file:'): # replace path with localpath 927 source = os.path.abspath(basename(source.replace('file:',''))) 928 if os.path.exists(source): 929 cmd = "mv %s %s/" % (source,self.topdir) 930 print cmd 931 os.system(cmd) 932 self.logger.info("xmlinventory done") 933 except Exception,e: 934 self.logger.info("inventory exception: %s" % e) 935 936 return True
937
938 - def abort(self,error,message=""):
939 self.flush_output() 940 if message: 941 self.statusmsg.append(message.decode("utf8","ignore")) 942 if self.production: 943 safe_abort(self.server.abort, 944 (self.procnum, 945 self.dataset,error,'<br>'.join(self.statusmsg), 946 self.passkey,cPickle.dumps(self.stats)))
947
948 - def startsoap(self,url,dataset,procnum,passkey):
949 #self.localhost = functions.gethostname() 950 self.localhost = functions.gethostid() 951 if url and passkey: 952 self.production = True 953 print "connecting to %s ..." % url 954 try: 955 self.server = xmlrpclib.ServerProxy(url) 956 status = self.server.start(self.localhost,dataset,procnum,passkey,self.grid) 957 dataset = status[0] 958 nproc = status[1] 959 procnum = status[2] 960 961 if len(status) > 3: 962 # in case we are talking to an old version of the server 963 self.passkey = status[3] 964 965 print "dataset: %d, nproc: %d, procnum:%d"%(dataset,nproc,procnum) 966 if dataset < 1: 967 print "Nothing to do. Exiting" 968 return False 969 else: 970 print "processing proc %d from dataset %d" % (procnum,dataset) 971 self.dataset = dataset 972 self.nproc = nproc 973 self.procnum = procnum 974 self.passkey = passkey 975 976 # overwrite option dictionary with newly set values from server 977 self.opts["nproc"] = self.nproc 978 self.opts["procnum"] = self.procnum 979 self.opts["dataset"] = self.dataset 980 self.opts["key"] = self.passkey 981 982 set_sig_handler(self,procnum,dataset,passkey) 983 return True 984 985 except Exception, e: 986 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 987 print >> sys.stderr, e , "could not fetch job from server" 988 return False 989 else: 990 return True
991 992
993 - def execute(self,tray,iter):
994 print '\n\n------ begin proc %d iteration %d in tray %d -------\n' % (self.procnum,iter,tray) 995 cmd = "" 996 cmd += "%s i3exec.py --nproc=%d " % (self.python,self.nproc) 997 cmd += " --procnum=%d --dataset=%d " % (self.procnum,self.dataset) 998 cmd += " --lib=" + " --lib=".join(self.opts['lib']) 999 cmd += " --iter=%d --tray=%d " % (iter,tray) 1000 cmd += " --validate=%d " % self.validate 1001 cmd += " --cache=%s " % self.cache 1002 cmd += " --runcfg" 1003 cmd += " --fetch=%s " % self.opts['fetch'] 1004 if self.opts.has_key('gpu'): 1005 cmd += " --gpu=%d " % self.opts['gpu'] 1006 if self.opts.has_key('run'): 1007 cmd += " --run=%d " % self.opts['run'] 1008 if self.opts.has_key('subrun'): 1009 cmd += " --subrun=%d " % self.opts['subrun'] 1010 if self.opts.has_key('date'): 1011 cmd += " --date=%s " % self.opts['date'] 1012 if self.url: 1013 cmd += " --url=%s " % self.url 1014 cmd += " --key=%s " % self.passkey 1015 if self.opts.has_key("zipsafe"): 1016 cmd += " --zipsafe=%s " % self.opts['zipsafe'] 1017 cmd += " %s" % self.configfile 1018 cmd += " 2>>%s" % self.err 1019 cmd += " 1>>%s" % self.out 1020 cmd = expandvars(cmd) 1021 print cmd 1022 try: 1023 return os.system(cmd) 1024 except Exception,e: 1025 print "Caught exception: ",e 1026 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1027 return 1
1028 #------ end of class Run ----- 1029 1030 #------ start of class RunInstanceDAG --------------------------------------- 1031
1032 -class RunInstanceDAG(RunInstance):
1033 - def __init__(self,opts):
1034 """RunInstance constructor for multipart jobs.""" 1035 RunInstance.__init__(self,opts) 1036 self.task = None 1037 self.taskname = '' 1038 self.dagtemp = '' 1039 self.tray = False 1040 self.iter = False 1041 self.protocol = '' 1042 self.host = '' 1043 self.basedir = '' 1044 self.abort_on_fail = True 1045 1046 if self.opts.has_key("tray"): 1047 self.tray = int(self.opts["tray"]) 1048 if self.opts.has_key("abort-on-fail"): 1049 self.abort_on_fail = bool(self.opts["abort-on-fail"]) 1050 if self.opts.has_key("iter"): 1051 self.iter = int(self.opts["iter"]) 1052 if self.opts.has_key("task"): 1053 self.taskname = self.opts["task"] 1054 if self.opts.has_key("dagtemp"): 1055 self.dagtemp = expandvars(self.opts["dagtemp"]) 1056 pos = self.dagtemp.find("://") 1057 self.protocol = self.dagtemp[0:pos] 1058 host_end = self.dagtemp.find("/",pos+3) 1059 self.host = self.dagtemp[pos+3:host_end] 1060 self.basedir = self.dagtemp[host_end:] 1061 1062 if self.protocol != 'gsiftp': 1063 # only support either GridFTP or local file 1064 self.basedir = self.host + self.basedir 1065 self.host = ''
1066 1067 #------ start of functions redefined from RunInstance ------------------------- 1068
1069 - def abort(self,error,message=""):
1070 """ 1071 Handle job errors and report back to server 1072 """ 1073 self.flush_output() 1074 if message: 1075 self.statusmsg.append(message.decode("utf8","ignore")) 1076 if self.production and self.abort_on_fail: 1077 safe_abort(self.server.abort, 1078 (self.procnum, 1079 self.dataset,error,'<br>'.join(self.statusmsg), 1080 self.passkey,cPickle.dumps(self.stats)))
1081 1082 1083
1084 - def run(self):
1085 """Runs a task in a multipart simulation job.""" 1086 1087 self.task = self.steering.GetTaskDefinition(self.taskname) 1088 if not self.task: 1089 print "task '%s' not found for multipart job" % self.taskname 1090 return TASK_NOT_FOUND 1091 1092 if not self.dagtemp: 1093 print "no storage location specified for temporary files" 1094 return TASK_CONFIGURATION_ERROR 1095 1096 print "multipart job enabled, executing task %s" % self.taskname 1097 1098 print "using %s://%s%s for temporary file storage" \ 1099 % (self.protocol, self.host, self.basedir) 1100 1101 if self.task.IsCleanup(): 1102 tray = self.task.GetTray(0) 1103 idx = tray.GetIndex() 1104 task_id = self.server_start_task(idx, 0) 1105 if task_id != TASK_ERROR_ID: 1106 self.server_finish_task(task_id, self.stats) 1107 try: 1108 self.task_run_cleanup() 1109 except: 1110 self.handle_exception('cleaning dagtemp failed.') 1111 #self.server_abort(task_id) 1112 self.server_finish_job() 1113 return 0 1114 1115 retval = 0 1116 1117 msg = "Configuring log4cplus....." 1118 print msg 1119 try: 1120 self.setlog4cplus() 1121 msg += 'ok' 1122 except: 1123 msg += 'failed' 1124 self.handle_exception(msg) 1125 self.statusmsg.append(msg.decode("utf8","ignore")) 1126 1127 trays = self.task.GetTrays().values() 1128 if not self.tray is False: 1129 trays = [self.task.GetTray(self.tray)] 1130 1131 input_needed = True 1132 taskids = [] 1133 for tray in trays: 1134 idx = tray.GetIndex() 1135 if not self.iter is False: 1136 iters = [int(self.iter)] 1137 else: 1138 iters = tray.GetIters() 1139 for iter in iters: 1140 task_id = self.server_start_task(idx, iter) 1141 if task_id == TASK_ERROR_ID: 1142 print "Bad task ID received from server" 1143 return TASK_INVALID 1144 taskids.append(task_id) 1145 1146 if input_needed: 1147 self.server_copying_input(task_id) 1148 msg = "Fetching input from previous tasks....." 1149 retval = self.fetch_input() 1150 if not self.check_retval(retval, msg): 1151 self.server_abort(task_id) 1152 return retval 1153 input_needed = False 1154 1155 retval, stats = self.execute(task_id, idx, iter) 1156 if retval != 0: 1157 self.server_abort(task_id) 1158 return retval 1159 1160 if not self.task.IsCleanup(): 1161 self.server_copying_output(task_id) 1162 msg = "Storing intermediate output....." 1163 retval = self.store_output() 1164 if not self.check_retval(retval, msg): 1165 self.server_abort(task_id) 1166 return retval 1167 else: 1168 for task_id in taskids: 1169 fin = self.server_finish_task(task_id, self.stats) 1170 if not fin: 1171 # abort first task so job starts back from this point 1172 self.server_abort(taskids[0]) 1173 self.server_abort(task_id) 1174 return TASK_SERVER_ERROR 1175 1176 return 0
1177
1178 - def execute(self, task_id, tray, iter):
1179 """Executes a given tray and iteration within this task.""" 1180 self.server_processing(task_id) 1181 1182 if self.task.IsCleanup(): 1183 return self.task_run_cleanup() 1184 1185 if iter == TASK_EXTERN_ITER: 1186 return self.task_run_externs() 1187 1188 if not self.stats: 1189 self.stats = {} 1190 1191 # download and setup metaproject tarball (if needed) 1192 build = self.fetch_metaproject(tray, tmpstorage=self.cache) 1193 print "metaproject fetched" 1194 1195 self.setpython(tray) 1196 self.setenv(build) 1197 print "set python to %s" % self.python 1198 1199 print "environment setup complete" 1200 msg = "%s - IceTray %d iteration %d....." % (time.strftime("%H:%M:%S"),tray,iter) 1201 print msg 1202 1203 print '\n\n------ begin proc %d iteration %d in tray %d -------\n' \ 1204 % (self.procnum,iter,tray) 1205 cmd = "" 1206 cmd += "%s i3exec.py --nproc=%d " % (self.python,self.nproc) 1207 cmd += " --procnum=%d --dataset=%d " % (self.procnum,self.dataset) 1208 cmd += " --lib=" + " --lib=".join(self.opts['lib']) 1209 cmd += " --iter=%d --tray=%d " % (iter,tray) 1210 cmd += " --cache=%s " % self.cache 1211 cmd += " --runcfg" 1212 cmd += " --dag" 1213 cmd += " --fetch=%s " % self.opts['fetch'] 1214 if self.opts.has_key('run'): 1215 cmd += " --run=%d " % self.opts['run'] 1216 if self.opts.has_key('date'): 1217 cmd += " --date=%s " % self.opts['date'] 1218 if self.url: 1219 cmd += " --url=%s " % self.url 1220 cmd += " --key=%s " % self.passkey 1221 if self.opts.has_key("zipsafe"): 1222 cmd += " --zipsafe=%s " % self.opts['zipsafe'] 1223 if self.dagtemp: 1224 cmd += " --dagtemp=%s" % self.dagtemp 1225 cmd += " %s" % self.configfile 1226 cmd += " 2>>%s" % self.err 1227 cmd += " 1>>%s" % self.out 1228 cmd = expandvars(cmd) 1229 print cmd 1230 try: 1231 retval = os.system(cmd) 1232 except Exception: 1233 self.handle_exception("error when executing iteration %d in tray %d" \ 1234 % (iter, tray)) 1235 retval = TASK_RUN_EXCEPTION 1236 1237 if not self.check_retval(retval, msg): 1238 return retval, self.stats 1239 1240 print "statsfile",self.statsfile 1241 if self.statsfile.endswith('.xml'): 1242 try: 1243 self.getstats(self.statsfile) 1244 except: 1245 self.handle_exception("unable to process summary file %s" \ 1246 % self.statsfile) 1247 try: 1248 self.getstats("iceprod."+self.statsfile) 1249 except: 1250 self.logger.error("unable to process summary file iceprod.%s" % self.statsfile) 1251 1252 return retval, self.stats
1253
1254 - def setstats(self,name,value):
1255 """Set task statistics. This version is non-cumulative.""" 1256 self.stats[name] = value
1257
1258 - def startsoap(self,url,dataset,procnum,passkey):
1259 """Connects to the monitoring daemon and gets the task ID.""" 1260 self.localhost = functions.gethostid() 1261 if url and passkey: 1262 self.production = True 1263 print "connecting to %s ..." % url 1264 try: 1265 self.server = xmlrpclib.ServerProxy(url) 1266 1267 dataset,nproc,procnum = self.server.multipart_job_start(dataset,procnum,passkey) 1268 print "dataset: %d, nproc: %d, procnum:%d"%(dataset,nproc,procnum) 1269 if dataset < 1: 1270 print "Nothing to do. Exiting" 1271 return False 1272 else: 1273 print "processing proc %d from dataset %d" % (procnum,dataset) 1274 self.dataset = dataset 1275 self.nproc = nproc 1276 self.procnum = procnum 1277 self.passkey = passkey 1278 1279 # overwrite option dictionary with newly set values from server 1280 self.opts["nproc"] = self.nproc 1281 self.opts["procnum"] = self.procnum 1282 self.opts["dataset"] = self.dataset 1283 1284 set_sig_handler(self,procnum,dataset,passkey) 1285 return True 1286 except: 1287 self.handle_exception("could not fetch job from server") 1288 return False 1289 else: 1290 return True
1291
1292 - def fetch_dependencies(self):
1293 """Does nothing; dependencies will be fetched during fetch_input().""" 1294 return self.excludefiles
1295 1296 #------ end of functions redefined from RunInstance --------------------------- 1297 1298 #------ start of new functions for MultiRunInstance --------------------------- 1299
1300 - def check_retval(self, retval, msg=''):
1301 if retval == 0: 1302 if msg: 1303 msg += "ok" 1304 self.statusmsg.append(msg.decode("utf8","ignore")) 1305 print msg 1306 print "return value", retval 1307 return True 1308 else: 1309 if self.debug: 1310 self.tarwork() 1311 if msg: 1312 msg += "failed" 1313 self.statusmsg.append(msg.decode("utf8","ignore")) 1314 print msg 1315 print >> sys.stderr, msg 1316 self.statusmsg.append(functions.tail(self.logoutput, 200).decode("utf8","ignore")) 1317 1318 print "return value", retval 1319 return False
1320
1321 - def server_start_task(self, idx, iter):
1322 try: 1323 print "starting task %s tray %s iter %s" \ 1324 % (self.task.GetName(), idx, iter) 1325 return self.server.task_start(self.dataset, self.procnum, \ 1326 self.task.GetName(), idx, iter, \ 1327 self.localhost, self.passkey) 1328 except: 1329 self.handle_exception("unable to get task ID from server") 1330 return TASK_ERROR_ID
1331
1332 - def server_abort(self, task_id):
1333 try: 1334 self.server.task_abort(task_id, self.passkey) 1335 return True 1336 except: 1337 self.handle_exception("unable to abort task %d" % task_id) 1338 return False
1339
1340 - def server_finish_task(self, task_id, stats={}):
1341 stats['hostname'] = functions.gethostname() 1342 try: 1343 self.server.task_finish(task_id, cPickle.dumps(stats), self.passkey) 1344 return True 1345 except: 1346 self.handle_exception("unable to mark task %d as finished" % task_id) 1347 return False
1348
1349 - def server_finish_job(self):
1350 try: 1351 self.server.multipart_job_finish(self.dataset, self.procnum, self.passkey) 1352 return True 1353 except: 1354 self.handle_exception("unable to mark job %d.%d as finished" \ 1355 % (self.dataset, self.procnum)) 1356 return False
1357
1358 - def server_processing(self, task_id):
1359 try: 1360 self.server.task_processing(task_id, self.passkey) 1361 return True 1362 except: 1363 self.handle_exception("unable to mark task %d as processing" % task_id) 1364 return False
1365
1366 - def server_copying_input(self, task_id):
1367 try: 1368 self.server.task_copying_input(task_id, self.passkey) 1369 return True 1370 except: 1371 self.handle_exception("unable to mark task %d as copying output" % task_id) 1372 return False
1373
1374 - def server_copying_output(self, task_id):
1375 try: 1376 self.server.task_copying_output(task_id, self.passkey) 1377 return True 1378 except: 1379 self.handle_exception("unable to mark task %d as copying output" % task_id) 1380 return False
1381
1382 - def task_run_externs(self):
1383 msg = "%s - IceTray externs....." % time.strftime("%H:%M:%S") 1384 print msg 1385 retval, stats = self.runextern() 1386 self.check_retval(retval, msg) 1387 return retval, stats
1388
1389 - def task_run_cleanup(self):
1390 """ 1391 Remove the intermediate output from all parts of this multipart job. 1392 1393 This executed by the trashcan job that's added to the end of all the 1394 DAGs; it deletes the incidental files created by each node of the DAG 1395 and marks the job itself as complete. 1396 """ 1397 targetdir = self.get_intermediate_directory() 1398 if self.protocol == 'gsiftp': 1399 print "cleanup will use GridFTP (%s, %s)" % (self.host, targetdir) 1400 retval = self.uberftp(self.host,"rm -r %s" % targetdir) 1401 else: # anything other than GridFTP is assumed to be local file 1402 print "cleanup will use local filesystem (%s)" % (targetdir) 1403 cmd = "rm -rf %s" % (targetdir) 1404 print cmd 1405 retval = os.system(cmd) 1406 1407 msg = "deleting intermediate output....." 1408 if not self.check_retval(retval, msg): 1409 raise Exception, "Unable to delete intermediate output" 1410 stats = {} 1411 1412 return retval, stats
1413
1414 - def handle_exception(self, msg=None):
1415 traceback.print_exc(file=sys.stderr) 1416 if msg: 1417 print msg 1418 print >> sys.stderr, msg
1419
1420 - def store_output(self):
1421 """Store intermediate output from this task.""" 1422 return self.handle_file_manifest("output")
1423
1424 - def fetch_input(self):
1425 """Fetch input from previous tasks.""" 1426 return self.handle_file_manifest("input")
1427
1428 - def handle_file_manifest(self, type="input"):
1429 """Processes a file manifest and copies the files contained therein.""" 1430 name = self.task.GetName() 1431 if self.task.ParallelExecutionEnabled(): 1432 name = name + "_" + str(self.iter) 1433 manifest = "iceprod.%u.%u.%s.%s" % (int(self.dataset), int(self.procnum), \ 1434 name, type) 1435 print "Reading %s manifest from %s" % (type, manifest) 1436 if os.path.exists(manifest): 1437 print "%s manifest found" % type 1438 f = open(manifest, "r") 1439 size = 0 1440 nw_size = 0 1441 xfer_time = 0 1442 nw_xfer_time = 0 1443 exclude = [] 1444 for line in f: 1445 print "handling manifest %s" % line 1446 pieces = line.split() 1447 photonics = False 1448 extract = True 1449 shared = False 1450 verify = True 1451 if len(pieces) == 2: 1452 source, dest = pieces 1453 else: 1454 source = pieces[0] 1455 dest = pieces[1] 1456 if pieces[2] == "photonics": 1457 photonics = True 1458 elif pieces[2] == "dontextract": 1459 extract = False 1460 elif pieces[2] == "global": 1461 shared = True 1462 print "pieces[2]=%s"%(pieces[2]) 1463 print "src=%s | dest=%s"%(source,dest) 1464 1465 locked = False 1466 if type == "input": 1467 # is this a request for photonics tables? 1468 if photonics: 1469 photon_dir = os.getenv("PHOTON_TABLES_DIR") 1470 if exists(photon_dir): 1471 print "skipping %s because PHOTON_TABLES_DIR (%s) exists" % (source, photon_dir) 1472 continue 1473 myputenv("PHOTON_TABLES_DIR", self.workdir) 1474 1475 # fetch MD5 checksum 1476 retval = self.fetch_checksum(source, dest) 1477 if retval != 0: 1478 print "unable to retrieve checksum, return value: %u" % int(retval) 1479 print "integrity check disabled for %s" % dest 1480 verify = retval == 0 1481 dest_checksum = dest + TASK_CHECKSUM_SUFFIX 1482 1483 # photonics tables and global files get put in common storage 1484 if shared or photonics: 1485 if not exists(self.cache): 1486 try: os.makedirs(self.cache) 1487 except: pass 1488 cache_dest = os.path.join(self.cache, dest) 1489 cache_checksum = dest + ".cache" + TASK_CHECKSUM_SUFFIX 1490 lockfile = cache_dest + ".lock" 1491 while not locked: 1492 try: 1493 os.mknod(lockfile) 1494 locked = True 1495 except OSError, oserr: 1496 print "%s %s ." % (oserr,lockfile) 1497 time.sleep(300) # 5min 1498 try: 1499 if (time.time() - os.stat(lockfile).st_mtime) > 2400: 1500 os.remove(lockfile) 1501 except: pass 1502 if verify: 1503 retval = self.generate_checksum(cache_dest, cache_checksum) 1504 if retval == 0: 1505 if checksum(dest_checksum, cache_checksum): 1506 # file is ok, copy from cache to work dir 1507 source = cache_dest 1508 else: 1509 # verification failed, remove file 1510 os.remove(cache_dest) 1511 1512 local_file = dest 1513 if not extract: 1514 exclude.append(expandvars("${TMPWORK}/" + dest)) 1515 else: 1516 # generate MD5 sum 1517 retval = self.store_checksum(source,dest) 1518 if retval != 0: 1519 print "unable to store checksum, return value: %u" % int(retval) 1520 print "future tasks will be unable to check integrity for %s" % source 1521 local_file = source 1522 1523 start = time.time() 1524 retval = self.copy_file(source, dest) 1525 if retval != 0: 1526 if locked: 1527 os.remove(lockfile) 1528 return retval 1529 duration = time.time() - start 1530 file_size = os.path.getsize(expandvars("${TMPWORK}/" + local_file)) / (1024 * 1024) 1531 size += file_size 1532 xfer_time += duration 1533 if not (source.startswith("file:") or source.startswith("/")): 1534 nw_size += file_size 1535 nw_xfer_time += duration 1536 1537 if locked: 1538 os.remove(lockfile) 1539 1540 if type == "input" and verify: 1541 retval = self.verify_checksum(local_file) 1542 if retval != 0: 1543 print "integrity verification failed for %s" % local_file 1544 return retval 1545 1546 if size > 0: 1547 if xfer_time > 0: 1548 rate = size / xfer_time 1549 else: 1550 rate = size 1551 else: 1552 rate = 0 1553 1554 if nw_size > 0: 1555 if nw_xfer_time > 0: 1556 nw_rate = nw_size / nw_xfer_time 1557 else: 1558 nw_rate = nw_size 1559 else: 1560 nw_rate = 0 1561 1562 self.setstats("%s size (MB)" % type, size) 1563 self.setstats("network %s size (MB)" % type, nw_size) 1564 self.setstats("%s rate (MB/s)" % type, rate) 1565 self.setstats("network %s rate (MB/s)" % type, nw_rate) 1566 f.close() 1567 1568 if type == "input": 1569 unpack(exclude) 1570 else: 1571 print "No %s manifest found" % type 1572 return 0
1573
1574 - def fetch_checksum(self,source,dest):
1575 return self.copy_file(source + TASK_CHECKSUM_SUFFIX, dest + TASK_CHECKSUM_SUFFIX)
1576
1577 - def store_checksum(self,source,dest):
1578 checksum_src = source + TASK_CHECKSUM_SUFFIX 1579 checksum_dst = dest + TASK_CHECKSUM_SUFFIX 1580 retval = self.generate_checksum(source, checksum_src) 1581 if retval != 0: 1582 print "unable to generate checksum for %s" % source 1583 return retval 1584 retval = self.copy_file(checksum_src, checksum_dst) 1585 return retval
1586
1587 - def generate_checksum(self,file,checksum):
1588 cmd = "md5sum %s > %s" % (file, checksum) 1589 print cmd 1590 return os.system(cmd)
1591
1592 - def verify_checksum(self,file):
1593 checksum = file + TASK_CHECKSUM_SUFFIX 1594 if not os.path.exists(checksum): 1595 print "integrity verification skipped, no checksum available for %s" % file 1596 return 0 1597 cmd = "md5sum -c %s" % checksum 1598 print cmd 1599 return os.system(cmd)
1600
1601 - def copy_file(self,source,dest):
1602 info = {} 1603 for type, file in {"in": source, "out": dest}.items(): 1604 if not info.has_key(type): 1605 info[type] = {} 1606 pos = file.find(":") 1607 if pos == -1: 1608 file = os.path.join("file:${TMPWORK}", file) 1609 pos = 4 1610 info[type]['file'] = file 1611 protocol = file[:pos] 1612 info[type]["protocol"] = protocol 1613 if protocol == "file": 1614 info[type]["path"] = file[5:] 1615 elif protocol == "gsiftp" or protocol == "http": 1616 file = file[pos + 3:] 1617 pos = file.find("/") 1618 1619 info[type]["host"] = file[:pos] 1620 info[type]["path"] = file[pos:] 1621 else: 1622 print "Unknown transfer protocol: %s" % protocol 1623 return TASK_XFER_PROTO_ERROR 1624 1625 if info["out"]["protocol"] == "http" or info["out"]["protocol"] == "ftp": 1626 print "Unsupported destination protocol: %s" % info["out"]["protocol"] 1627 return TASK_XFER_CONFIG_ERROR 1628 1629 files = [] 1630 for type, file in info.iteritems(): 1631 url = "%s://" % file["protocol"] 1632 if file.has_key("host"): 1633 url += file["host"] + "/" 1634 url += file["path"] 1635 url = file['file'] 1636 files.append(url) 1637 1638 print files 1639 if info["in"]["protocol"] == "gsiftp" or info["out"]["protocol"] == "gsiftp": 1640 proxy = self.steering.GetSysOpt("globus_proxy").GetValue() 1641 os.chmod(proxy,0600) 1642 globus_loc = self.steering.GetSysOpt("globus_location").GetValue() 1643 path = os.path.join(globus_loc, "bin") 1644 bin = os.path.join(path, 'globus-url-copy') 1645 certs = os.path.join(globus_loc, "certificates") 1646 lib = os.path.join(globus_loc, "lib") 1647 cmd = "LD_LIBRARY_PATH=%s:$LD_LIBRARY_PATH DYLD_LIBRARY_PATH=%s:$DYLD_LIBRARY_PATH" 1648 cmd += " X509_CERT_DIR=%s X509_USER_PROXY=%s %s -cd -r -nodcau %s %s" 1649 cmd = cmd % (lib, lib, certs, proxy, bin, files[0], files[1]) 1650 elif info["in"]["protocol"] == "http" or info["in"]["protocol"] == "ftp": 1651 cmd = "wget -nv -O %s %s" % (info["out"]["path"], files[0]) 1652 elif info["in"]["protocol"] == "file" and info["out"]["protocol"] == "file": 1653 # ensure source exists 1654 if not os.path.exists(info["in"]["path"]): 1655 print "file %s does not exits!!!!!" % info["in"]["path"] 1656 # ensure destination directory exists 1657 cmd = "mkdir -p %s" % os.path.dirname(info["out"]["path"]) 1658 print cmd 1659 retval = os.system(cmd) 1660 if retval != 0: 1661 return retval 1662 1663 # hard link failed, so try normal copy 1664 cmd = "cp %s %s" % (info["in"]["path"], info["out"]["path"]) 1665 else: 1666 print "Unknown transfer protocol" 1667 return TASK_XFER_PROTO_ERROR 1668 cmd = expandvars(cmd) 1669 1670 print cmd 1671 return os.system(cmd)
1672 1673
1674 - def uberftp(self,host,cmd):
1675 """Perform an FTP command on a remote host using uberftp.""" 1676 proxy = self.steering.GetSysOpt("globus_proxy").GetValue() 1677 os.chmod(proxy,0600) 1678 1679 globus_loc = self.steering.GetSysOpt("globus_location").GetValue() 1680 path = os.path.join(globus_loc, "bin") 1681 bin = os.path.join(path, 'uberftp') 1682 1683 certs = os.path.join(globus_loc, "certificates") 1684 1685 cmd = 'X509_CERT_DIR=%s X509_USER_PROXY=%s %s %s "%s"' \ 1686 % (certs, proxy, bin, host, cmd) 1687 print cmd 1688 return os.system(cmd)
1689
1690 - def get_intermediate_directory(self):
1691 directory = self.basedir 1692 directory = os.path.join(self.basedir, str(self.dataset)) 1693 directory = os.path.join(directory, str(self.procnum)) 1694 return directory
1695 1696 #------ end of new functions for MultiRunInstance ----------------------------- 1697 1698 #------ end of class MultiRunInstance ----------------------------------------- 1699
1700 -def set_sig_handler(run,procnum,dataset,passkey):
1701 """ 1702 install signal handler to handle errors and evictions 1703 1704 @param run: RunInstance 1705 @param procnum: process number within dataset 1706 @param dataset: dataset id 1707 @param passkey: random passkey assigned to process by daemon 1708 """ 1709 def handler(signum,frame): 1710 print >> sys.stderr, "aborting" 1711 print >> sys.stderr, str(frame) 1712 run.flush_output() 1713 run.server.abort(procnum,dataset,EVICTED,'Job has been evicted.',passkey) 1714 _done = True 1715 os._exit(5)
1716 signal.signal(signal.SIGTERM, handler) 1717
1718 -def create_workspace(work,top):
1719 os.makedirs(work) 1720 os.chdir(work) 1721 myputenv('TMPWORK',work) 1722 myputenv('TMPTOP',top) 1723 cwd = os.getcwd() 1724 for file in os.listdir(top): 1725 if file.endswith(".out") or file.endswith(".err"): 1726 continue 1727 if not os.path.exists(file): 1728 os.symlink(join(top,file),file) 1729 return work
1730
1731 -def clean_workspace(top,work):
1732 os.chdir(top) 1733 if _clean: 1734 print "Cleaning workspace %s" % work 1735 os.system('rm -rf %s' % work)
1736
1737 -def safe_abort(abortfunc,args,msg='',tries=4,wait=2):
1738 failures = 0 1739 while failures < tries: 1740 print "resetting job status" 1741 try: 1742 return apply(abortfunc,args) 1743 except socket.error, e: 1744 print >> sys.stderr, e,"retrying" 1745 time.sleep(wait*failures) 1746 failures += 1
1747 1748
1749 -def wget(url,dest='./'):
1750 dest = os.path.abspath(expandvars(dest)) 1751 url = expandvars(url) 1752 if os.path.isdir(dest.replace('file:','')): 1753 dest = os.path.join(dest, os.path.basename(url)) 1754 if dest.startswith("file:"): 1755 destpath = dest[5:] 1756 else: 1757 destpath = dest 1758 dest = "file:" + dest 1759 if os.path.exists(destpath): 1760 print "wget: file already downloaded" 1761 return 0 1762 updest = os.path.join(os.environ['I3_TOPDIR'],os.path.basename(dest)) 1763 if os.path.exists(updest): 1764 print "file was sent with job" 1765 try: 1766 os.symlink(updest,destpath) 1767 except Exception, e: 1768 print 'symlink exception: %s'%str(e) 1769 else: 1770 return 0 1771 1772 if url.startswith("http://") or url.startswith("ftp://"): 1773 dest = dest.replace("file:","") 1774 cmd = 'wget -nv --tries=4 --output-document=%s %s'%(dest,url) 1775 elif url.startswith("lfn:"): 1776 cmd = "lcg-cp --vo icecube -v %s %s" % (url,dest) 1777 elif url.startswith("file:"): 1778 src = url.replace("file:","") 1779 dest = dest.replace("file:","") 1780 if not os.path.isfile(src): 1781 print "wget: not a file %s" % src 1782 return 2 1783 cmd = 'cp %s %s'% (src,dest) 1784 elif url.startswith("gsiftp://"): 1785 globus = expandvars("$GLOBUS_LOCATION") 1786 lib = os.path.join(globus,'lib') 1787 path = os.path.join(globus,'bin') 1788 bin = os.path.join(path,'globus-url-copy') 1789 print "globus",globus 1790 os.system("/bin/ls") 1791 if not dest.startswith("file:"): 1792 dest = "file:%s" % os.path.abspath(dest) 1793 cmd = "LD_LIBRARY_PATH=%s:$LD_LIBRARY_PATH DYLD_LIBRARY_PATH=%s:$DYLD_LIBRARY_PATH" 1794 cmd += " %s -cd -r -nodcau %s %s" 1795 cmd = cmd % (lib, lib, bin, url, dest) 1796 else: 1797 raise Exception, "unsupported protocol %s" % url 1798 print cmd 1799 return os.system(cmd)
1800 1801
1802 -def checksum(sum1,sum2):
1803 if not exists(sum1): return False 1804 if not exists(sum2): return False 1805 if os.system("diff %s %s" % (sum1,sum2)): 1806 print "sums differ" 1807 return False 1808 else: 1809 print "sums match" 1810 return True
1811
1812 -def isurl(url):
1813 return url.startswith("http://") \ 1814 or url.startswith("ftp://") \ 1815 or url.startswith("file:")
1816 1817
1818 -def setplatform(platform):
1819 """ 1820 Set platform information from system and format it 1821 according to icetray standard 1822 """ 1823 global _platform 1824 _platform = platform
1825 1826
1827 -def getplatform(reset=False,legacymode=False):
1828 """ 1829 Get platform information from system and format it 1830 according to icetray standard 1831 @return: plaform string 1832 """ 1833 global _platform 1834 if not reset: 1835 if _platform: return _platform 1836 1837 my_sys = os.uname() 1838 my_os = my_sys[0] 1839 my_arch = my_sys[4] 1840 1841 handle = os.popen('gcc -dumpversion','r') 1842 gccversion = handle.readline().strip() 1843 handle.close() 1844 compiler_version = gccversion.split('.'); 1845 compiler_major, compiler_minor = map(int,compiler_version[:2]) 1846 1847 _platform = "%s-%s.gcc-%s" % (my_os,my_arch,gccversion) 1848 return _platform
1849 1850
1851 -def myputenv(name,value):
1852 if value == None: print name,value 1853 else: os.environ[name]=value
1854 1855
1856 -def unpack(exclude=[]):
1857 print "unpacking %s..." % os.getcwd() 1858 #for archive in [filename for filename in os.listdir(".") if filename.endswith(".gz") and not filename in exclude]: 1859 #print "gunzip "+archive 1860 #base,suffix = os.path.splitext(archive) 1861 #os.system("gunzip "+archive+" -c > "+base) 1862 1863 for archive in [filename for filename in os.listdir(".") if filename.endswith(".bz2") and not filename in exclude]: 1864 print "bunzip2 "+archive 1865 os.system("bunzip2 -k "+archive) 1866 1867 for archive in [filename for filename in os.listdir(".") if filename.endswith(".tar") and not filename in exclude]: 1868 print "tar xf "+archive 1869 os.system("tar xf "+archive) 1870 1871 for archive in [filename for filename in os.listdir(".") if filename.endswith(".tgz") and not filename in exclude]: 1872 print "tar xzf "+archive 1873 os.system("tar xzf "+archive) 1874 1875 for archive in [filename for filename in os.listdir(".") if filename.endswith(".tar.gz") and not filename in exclude]: 1876 print "tar xzf "+archive 1877 os.system("tar xzf "+archive)
1878