Package iceprod :: Package core :: Module exe
[hide private]
[frames] | no frames]

Source Code for Module iceprod.core.exe

   1  #!/usr/bin/python 
   2  # @author: Juan Carlos Diaz-Velez 
   3  # @date:   04 May 2005 
   4  # (c) IceCube collaboration. 
   5  # @brief:  Steering script for running I3Sim on Condor. 
   6  # Evaluator: Module for parsing and evaluating expressions in IceTray values 
   7  # 
   8  ###################################################################### 
   9  import os,sys,time,string,re 
  10  import signal 
  11  import random 
  12  import getpass 
  13  import os.path,getopt 
  14  import commands 
  15  import platform 
  16  import logging 
  17  import glob 
  18  import time 
  19  from os.path import expandvars,join,exists,basename,dirname,abspath 
  20  from iceprod.core.dataclasses import * 
  21  from iceprod.core.xmlparser import IceTrayXMLParser 
  22  from iceprod.core.lex import * 
  23  from iceprod.core import functions 
  24  from iceprod.core.constants import * 
  25   
  26  import xmlrpclib 
  27  import socket 
  28  import cPickle 
  29  from ConfigParser import SafeConfigParser 
  30  from ipxml import PrettyPrint 
  31   
  32  import traceback 
  33   
  34  _platform = None 
  35  _inventory = "sp_outfile_inventory.list" 
  36  _done  = False 
  37  _clean = True 
  38   
  39  EVICTED = 1 
  40  FAILED  = 2 
  41  python_version = platform.python_version()[:3] 
  42   
  43  _child = 0 
  44   
  45  _baseurl = 'http://icecube.wisc.edu/simulation/downloads' 
  46  _tmpstorage = '/tmp/%s_icesoft' % getpass.getuser() 
  47   
  48   
49 -def fetch_tarball(meta,tmpstorage=_tmpstorage,basedir='.'):
50 if not exists(tmpstorage): 51 try: os.makedirs(tmpstorage) 52 except: pass 53 54 lockfile = join(tmpstorage,"%s.lock" % meta.filebase) 55 cache = join(tmpstorage,meta.filebase + meta.suffix) 56 cachedir = join(tmpstorage,meta.filebase) 57 58 #fetch checksum 59 md5url = meta.url + '.md5sum' 60 print "retrieving %s..." % md5url 61 failed = wget(md5url) 62 if failed: raise Exception, "unable to fetch file from %s" % md5url 63 64 cwd = os.getcwd() 65 # See if we need to download new tarball 66 trials = 5 # limit number of attempts 67 while (not exists(cache)) or \ 68 (not exists(cachedir)) or \ 69 (not checksum(join(cwd,meta.md5sum),join(cachedir,meta.md5sum))): 70 71 os.chdir(tmpstorage) 72 try: # create lockfile 73 os.mknod(lockfile) 74 except OSError,oserr: # file exists 75 print "%s %s ." % (oserr,lockfile) 76 time.sleep(300) # 5min 77 try: 78 if (time.time() - os.stat(lockfile).st_mtime) > 2400: 79 os.remove(lockfile) 80 except: pass 81 continue # check cache again 82 83 os.system("rm -rf %s %s" % (cache,cachedir)) 84 print "retrieving %s..." % (meta.url + meta.suffix) 85 failed = wget(meta.url+meta.suffix) 86 if failed: 87 os.remove(lockfile) 88 raise Exception, "unable to fetch file from %s" % meta.url + meta.suffix 89 90 if os.path.isdir(meta.filebase): # clean old i3_work 91 os.system("rm -rf " + meta.filebase) 92 93 # create md5sum 94 os.mkdir(meta.filebase) # create i3_work 95 os.system("tar -C%s -xzf %s" % (basedir,cache)) 96 os.system("md5sum %s > %s" % (meta.filebase+meta.suffix,join(cachedir,meta.md5sum))) 97 os.remove(lockfile) 98 os.chdir(cwd) 99 100 if trials < 1: 101 raise Exception, "Max no. of trails reached to stage metaproject '%s'" % meta.filebase 102 trials -= 1 # decrement counter 103 104 os.chdir(cwd) # if not there already 105 meta.path = join(tmpstorage,meta.filebase) 106 return meta
107
108 -def tail(logpath,chars=75):
109 """ 110 Read log4cplus to write output logging 111 @param logpath: path to logfile 112 @param chars: number of characters end of file 113 @return: last n characters in file 114 """ 115 if not os.path.exists(logpath): 116 return "no log output found %s" % logpath 117 logfile = open(logpath,'r') 118 119 #Find the size of the file and move to the end 120 st_results = os.stat(logpath) 121 st_size = st_results[6] 122 logfile.seek(max(0,st_size-chars)) 123 header = '<br>----%s----:<br>' % os.path.basename(logpath) 124 return "<br>".join(logfile.readlines())
125 126 #------ start of class Run -----
127 -class RunInstance:
128
129 - def setstats(self,name,value):
130 if not self.stats.has_key(name): 131 self.stats[name] = value 132 elif type(value) == type(float()): 133 self.stats[name] += value 134 elif type(value) == type(int()): 135 self.stats[name] += value 136 else: 137 self.stats[name] = value
138
139 - def __init__(self,opts):
140 global _baseurl 141 self.logger = logging.getLogger('iceprod::exe') 142 143 self.python = sys.executable 144 self.starttime = time.time() 145 self.env = {} 146 self.opts = opts 147 self.passkey = None 148 self.production = False 149 self.server = None 150 self.nproc = 1 151 self.procnum = 0 152 self.dataset = 0 153 self.configfile = 'config.xml' 154 self.statsfile = 'summary.xml' 155 self.url = None 156 self.topdir = None 157 self.workdir = "." 158 self.build = None 159 self.baseurl = _baseurl 160 self.stats = {} 161 self.statusmsg = [] 162 self.excludefiles = [] 163 self.debug = False 164 self.mktar = False 165 self.suffix_in = [] 166 self.mpdownloaded = {} 167 self.logoutput = "icetray.log" 168 self.err = "icetray.err" 169 self.out = "icetray.out" 170 self.ping = 0 171 self.stageout = False 172 self.nocopy = False 173 self.target = None 174 self.cache = '/tmp/%s_icesoft' % getpass.getuser() 175 self.localhost = os.uname()[1] 176 self.grid = 0 177 self.socktimeout = 0 178 self.validate = 0 179 180 # override defaults with cmdline options 181 if self.opts.has_key("url"): 182 self.url = self.opts["url"] 183 self.production = True 184 if self.opts.has_key("ping"): 185 self.ping = int(self.opts["ping"]) 186 if self.opts.has_key("dataset"): 187 self.dataset = int(self.opts["dataset"]) 188 if self.opts.has_key("procnum"): 189 self.procnum = int(self.opts["procnum"]) 190 if self.opts.has_key("nproc"): 191 self.nproc = int(self.opts["nproc"]) 192 if self.opts.has_key("key"): 193 self.passkey = self.opts['key'] 194 if self.opts.has_key("fetch"): 195 self.baseurl = self.opts['fetch'] 196 if self.opts.has_key("mktar"): 197 self.mktar = True 198 if self.opts.has_key("debug"): 199 self.debug = int(self.opts["debug"]) 200 if self.opts.has_key("target"): 201 self.target = self.opts['target'] 202 if self.opts.has_key("stageout"): 203 self.stageout = int(self.opts['stageout']) 204 if self.opts.has_key("nocopy"): 205 self.nocopy = True 206 if self.opts.has_key("grid"): 207 self.grid = int(self.opts["grid"]) 208 if self.opts.has_key("timeout"): 209 self.socktimeout = int(self.opts["timeout"]) 210 if self.opts.has_key("validate"): 211 self.validate = int(self.opts["validate"])
212 213 214
215 - def configure(self):
216 """ 217 Configure I3Run instance and set status for monitoring system 218 @return: True if job was configures succesfully 219 """ 220 print "getting config for dataset %d " % self.dataset 221 self.out = os.path.join(self.topdir,self.out) 222 self.err = os.path.join(self.topdir,self.err) 223 224 self.stats['host_id'] = functions.gethostid() 225 self.stats['platform'] = getplatform() 226 227 # Set the timeout for XMLRPC connections 228 if self.socktimeout: 229 socket.setdefaulttimeout(self.socktimeout) 230 231 try: 232 if self.production and not os.path.exists(self.configfile): 233 domtree = cPickle.loads(self.server.getconfig(self.dataset)) 234 self.steering = IceTrayXMLParser(Steering()).LoadDocument(domtree) 235 cfg = open(self.configfile,'w') 236 PrettyPrint(domtree,cfg) 237 cfg.close() 238 else: 239 self.steering = IceTrayXMLParser(Steering()).ParseFile(self.configfile,validate=self.validate) 240 except Exception, e: 241 print >> sys.stderr, e,"aborting" 242 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 243 return False 244 245 self.steering.AddSysOpt(SysOpt('hostname',self.localhost)) 246 if self.steering.GetSysOpt("cache"): 247 cache = expandvars(self.steering.GetSysOpt("cache").GetValue()) 248 self.cache = os.path.join(cache,'%s_icesoft' % getpass.getuser()) 249 if not exists(self.cache): 250 try: os.makedirs(self.cache) 251 except: pass 252 253 # set globus environment 254 if self.steering.GetSysOpt("proxy_delegate") and \ 255 self.steering.GetSysOpt("proxy_delegate").GetValue() == 'True': 256 unpack() 257 globus_proxy = abspath(expandvars(self.steering.GetSysOpt("globus_proxy").GetValue())) 258 globus_loc = abspath(expandvars(self.steering.GetSysOpt("globus_location").GetValue())) 259 globus_path = os.path.join(globus_loc, "bin") 260 globus_certs = os.path.join(globus_loc, "certificates") 261 globus_lib = os.path.join(globus_loc, "lib") 262 os.chmod(globus_proxy,0600) 263 myputenv("GLOBUS_LOCATION",globus_loc) 264 myputenv("X509_CERT_DIR",globus_certs) 265 myputenv("X509_USER_PROXY",globus_proxy) 266 267 268 steerdict = self.steering.get_param_dict() 269 self.parser = ExpParser(self.opts,self.steering) 270 if steerdict.has_key('outputfiles'): 271 outfiles = self.parser.parse(steerdict['outputfiles']) 272 self.suffix_in.extend(map(string.strip,outfiles.split(','))) 273 if steerdict.has_key('excludefiles'): 274 excludefiles = self.parser.parse(steerdict['excludefiles']) 275 self.excludefiles.extend(map(string.strip,excludefiles.split(','))) 276 print "excluded files:",self.excludefiles 277 if steerdict.has_key('summaryfile'): 278 self.statsfile = self.parser.parse(steerdict['summaryfile']) 279 280 os.system("touch %s/work.tgz" % self.topdir ) 281 self.getenv() 282 print self.url,self.dataset,self.procnum,self.passkey 283 return self.startsoap(self.url,self.dataset,self.procnum,self.passkey)
284 285
286 - def getenv(self):
287 """ 288 Get current environment and save it 289 """ 290 self.env['PYTHONPATH'] = expandvars("$PYTHONPATH") 291 self.env['PATH'] = expandvars("$PATH") 292 self.env['LD_LIBRARY_PATH'] = expandvars("$LD_LIBRARY_PATH") 293 self.env['DYLD_LIBRARY_PATH'] = expandvars("$DYLD_LIBRARY_PATH")
294
295 - def setenv(self,i3build):
296 """ 297 Configure environment for run 298 @param i3build: working directory path 299 """ 300 # See if we need to do anything with the environment 301 if self.build == i3build: 302 print "setenv: Using previously selected build environment in" 303 print i3build 304 return 305 self.build = i3build 306 307 # determine the plaform we are running on 308 platform = getplatform() 309 myputenv('I3_PLATFORM','') 310 if platform == 'system': # forcing a getplatform 311 platform = getplatform(reset=True) 312 myputenv('I3_PLATFORM',platform) 313 myputenv('PLATFORM',platform) 314 print platform 315 print "Host info:" 316 print ' '.join(os.uname()) 317 print "running in "+os.getcwd() 318 myputenv('I3_BUILD', i3build) 319 myputenv('I3_WORK', os.path.join(i3build,'projects')) 320 myputenv('I3_SRC', i3build) 321 myputenv('I3_PORTS', i3build) 322 myputenv('I3_TOOLS', i3build) 323 rootsys = os.path.join(i3build,'cernroot') 324 if os.path.exists(rootsys): 325 myputenv('ROOTSYS', rootsys) 326 elif self.steering.GetSysOpt("rootsys"): 327 rootsys = self.steering.GetSysOpt("rootsys").GetValue() 328 myputenv('ROOTSYS', expandvars(rootsys)) 329 else: 330 myputenv('ROOTSYS', i3build) 331 332 # SETUP JAVA ENVIRONMENT 333 #find any java installed on system 334 javasearchpath = expandvars('$JAVA_HOME:/usr/java:/usr/local/java') 335 java = functions.findjava(javasearchpath) 336 337 ldlibpath = [] 338 if java: 339 myputenv('JAVA_HOME',java[0]) 340 ldlibpath.append(java[1]) 341 else: 342 print >> sys.stderr,"!!!!!!!!!JAVA NOT FOUND IN %s!!!!!!!!!!!"% javasearchpath 343 print expandvars("using java in $JAVA_HOME") 344 345 #Set library path 346 ldlibpath.append(expandvars("$ROOTSYS/lib")) 347 ldlibpath.append(expandvars("$I3_BUILD/lib")) 348 ldlibpath.append(expandvars("$I3_BUILD/lib/tools")) 349 if os.uname()[4].endswith("64"): 350 ldlibpath.append(expandvars("/usr/lib64")) 351 ldlibpath.append(expandvars("/usr/lib")) 352 ldlibpath.append(self.env['LD_LIBRARY_PATH']) 353 myputenv('DYLD_LIBRARY_PATH', ":".join(ldlibpath)) 354 myputenv('LD_LIBRARY_PATH', ":".join(ldlibpath)) 355 356 #Set other environment paths 357 pythonpath = [] 358 pythonpath.append(expandvars(".")) 359 pythonpath.append(expandvars("$I3_BUILD/lib")) 360 pythonpath.append(expandvars("$I3_BUILD/lib/python")) 361 pythonpath.append(expandvars("$I3_BUILD/lib/python/site-packages")) 362 pythonpath.append(self.env['PYTHONPATH']) 363 myputenv('PYTHONPATH', ":".join(pythonpath)) 364 365 path = [] 366 path.append(expandvars(".")) 367 path.append(expandvars("$I3_BUILD/bin")) 368 path.append(expandvars("$PYROOT/bin")) 369 path.append(expandvars("/bin")) 370 path.append(expandvars("/usr/bin")) 371 path.append(self.env["PATH"]) 372 myputenv('PATH', ":".join(path)) 373 374 myputenv('USER_CLASSPATH',expandvars("$I3_WORK/monolith-reader/resources/triggerUtil.jar")) 375 myputenv('ICETRAY_CLASSPATH',expandvars("$I3_BUILD/lib/mmc.jar")) 376 377 378 print '\n----------- environment contents ------------' 379 for envkey in os.environ.keys(): 380 print envkey, ":" 381 for env_val in os.environ[envkey].split(":"): 382 print "\t", env_val
383 384
385 - def fetch_metaproject(self,tray=0,tmpstorage=_tmpstorage):
386 """ 387 Cached download of metaproject tarball 388 @param tray: integer tray index in config file 389 @param tmpstorage: location to cache software 390 """ 391 392 # check disk space 393 dfout = os.popen("df -P %s" % tmpstorage) 394 dfout.readline() # pop header line 395 avail_disk = int(dfout.readline().strip().split()[3]) 396 if avail_disk < 1039501: # approx 1G 397 print "cachdir %s does not have enogh space" % tmpstorage 398 # clean up old meta-projects 399 if not os.path.abspath(tmpstorage).startswith(os.path.abspath(self.topdir)): 400 for dir in os.listdir(tmpstorage): 401 dir_age = time.time() - os.stat(dir).st_mtime 402 if dir_age > 3600*24*14: # anything older than a two weeks 403 functions.removedirs(dir) 404 tmpstorage = self.topdir 405 406 if not tray < len(self.steering.GetTrays()): 407 raise Exception,"requested a tray index > number of trays in configuration" 408 409 # Extract the nth tray and get its metaproject 410 mp = self.steering.GetTrays()[tray].GetMetaProjectList()[0] 411 mpname = mp.GetName() 412 mpver = mp.GetVersion() 413 414 if self.mpdownloaded.has_key(mpname) and self.mpdownloaded[mpname].version == mpver: 415 print "fetch_metaproject: tray %d " % tray 416 print "%s.%s is already downloaded from previous tray." % (mpname,mpver) 417 print self.mpdownloaded[mpname].path 418 print "Skipping download" 419 return self.mpdownloaded[mpname].path 420 meta = I3Tarball() 421 meta.name = mpname 422 meta.version = mpver 423 self.mpdownloaded[meta.name] = None 424 425 platform = getplatform() # determine the plaform 426 while not self.mpdownloaded[meta.name]: 427 meta.filebase = "%s.%s.%s" % (meta.name,meta.version,platform) 428 meta.suffix = ".tar.gz" 429 meta.md5sum = "%s.md5sum" % meta.filebase 430 meta.url = "%s/IceTray/%s/%s" % (self.baseurl,meta.name,meta.filebase) 431 try: 432 self.mpdownloaded[meta.name] = fetch_tarball(meta,tmpstorage) 433 except Exception,e: 434 # try an older gcc 435 print e 436 platform = next_platform(platform) 437 return meta.path
438 439
440 - def fetch_dependencies(self):
441 """ 442 Download files needed from server. 443 """ 444 print "fetching dependencies %s" % self.baseurl 445 for d in self.steering.GetDependencies(): 446 fetchurl = self.parser.parse(d) 447 if not isurl(fetchurl): 448 fetchurl = self.parser.parse("%s/%s" % (self.baseurl,fetchurl)) 449 print "retrieving %s..." % fetchurl 450 failed = wget(fetchurl) 451 if failed: 452 raise Exception, "unable to fetch file from %s" % fetchurl 453 self.excludefiles.append(basename(fetchurl)) 454 return self.excludefiles
455 456
457 - def getmemusage(self, stats):
458 # Get memory usage info 459 if not os.uname()[0] == 'Linux': 460 self.logger.info("getmemusage: Not a Linux machine") 461 return 462 usage = open("/proc/self/status") 463 for line in usage.readlines(): 464 if line.startswith('Vm'): 465 name,value,unit = line.split() 466 value = float(value.strip()) 467 if unit == "MB": 468 value = value*1024 469 if not stats.has_key(name): 470 stats[name] = value 471 else: 472 stats[name] = max(value,stats[name]) 473 usage.close()
474
475 - def getstats(self, file=None):
476 # read summary map from IceTray 477 if not file: file = self.statsfile 478 stats = {} 479 if not os.path.exists(file): 480 print "Unable to find statsfile", file 481 return self.stats 482 if file.endswith('.xml'): 483 stats = XMLSummaryParser().ParseFile(file) 484 else: 485 if file: 486 statsfile = open(file,'r') 487 else: 488 statsfile = open(self.statsfile,'r') 489 for line in statsfile: 490 s = line.split(':') 491 if len(s) > 1: 492 stats[s[0].strip()] = s[1].strip() 493 statsfile.close() 494 495 for key in stats.keys(): 496 try: 497 self.setstats(key,float(stats[key])) 498 except: 499 self.setstats(key,stats[key]) 500 501 # self.getmemusage(self.stats) 502 return stats
503
504 - def tarwork(self):
505 workpath = dirname(self.workdir) 506 workdir = basename(self.workdir) 507 os.system("tar -C%s -czf %s/work.tgz ./%s" % (workpath,self.topdir,workdir))
508
509 - def runextern(self):
510 externstats = '.extern.stats' 511 print '\n\n------ begin proc %d externs -------\n' % self.procnum 512 outfile = os.path.join(self.workdir,"i3exe_%d.out" % self.procnum) 513 cmd = "/usr/bin/time -o %s " % externstats 514 cmd += " -f \"" 515 cmd += "mem_heap: %K\\n" 516 cmd += "mem_heap_peak:%M\\n" 517 cmd += "user_time:%U\\n" 518 cmd += "sys_time:%S\\n" 519 cmd += "real_time:%e\" -- " 520 cmd += "%s i3exec.py --nproc=%d " % (self.python ,self.nproc) 521 cmd += " --procnum=%d --dataset=%d " % (self.procnum,self.dataset) 522 cmd += " --fetch=%s " % self.opts['fetch'] 523 cmd += " --extern=1" 524 cmd += " --lib=" + " --lib=".join(self.opts['lib']) 525 if self.url: 526 cmd += " --url=%s " % self.url 527 cmd += " --runcfg" 528 cmd += " %s" % self.configfile 529 cmd = expandvars(cmd) 530 print cmd 531 sys.stdout.flush() 532 533 #retval = os.system("%s>%s" % (cmd,outfile)) 534 retval = os.system(cmd) 535 stats = self.getstats(externstats) 536 return retval,stats
537
538 - def get_trays(self):
539 """ 540 Extract number of trays and iterations in each tray 541 @return: array of indices indicating the number of iterations in each tray 542 """ 543 iters = map(IceTrayConfig.GetIterations,self.steering.GetTrays()) 544 return map(int,iters)
545
546 - def setlog4cplus(self):
547 """ 548 Configure log4cplus to write output logging 549 @return: path to log output file 550 """ 551 lg4cppath = os.path.join(self.workdir,"log4cplus.conf") 552 myputenv('I3LOGGING_CONFIG', lg4cppath) 553 lg4cp = open(lg4cppath,'w') 554 print >> lg4cp, "log4cplus.appender.default=log4cplus::FileAppender" 555 print >> lg4cp, "log4cplus.appender.default.File=%s" % self.logoutput 556 print >> lg4cp, "log4cplus.appender.default.layout.ConversionPattern=%d [%d{%Z}] %-5p %c %x: %F:% L %m%n" 557 if self.debug: 558 print >> lg4cp, "log4cplus.rootLogger=INFO, default" 559 else: 560 print >> lg4cp, "log4cplus.rootLogger=WARN, default" 561 lg4cp.close() 562 563 # Now add python loggin 564 handler = logging.FileHandler(self.logoutput) 565 formatter = logging.Formatter("iceprod : %(levelname)s %(name)s : %(message)s") 566 handler.setFormatter(formatter) 567 self.logger.addHandler(handler) 568 569 os.system("touch %s" % self.logoutput) 570 return self.logoutput
571
572 - def flush_output(self):
573 """ 574 Cat out/err files 575 """ 576 os.system("cat %s" % self.out) 577 os.system("cat %s >&2" % self.err)
578
579 - def run(self):
580 """ 581 Execute each of the icetray iterations in configuration 582 @return: execution value (0 if everything went ok) 583 """ 584 retval = 0 585 tray = 0 586 587 msg = "Configuring log4cplus....." 588 print msg 589 try: 590 self.setlog4cplus() 591 msg += 'ok' 592 except Exception,e: 593 msg += 'failed' 594 print >> sys.stderr, exc_type, e 595 print >> sys.stderr, msg 596 self.statusmsg.append(msg) 597 598 msg = "%s - IceTray externs....." % time.strftime("%H:%M:%S") 599 print msg 600 retval,stats = self.runextern() 601 print "return value", retval 602 if retval==0: 603 msg += 'ok' 604 self.statusmsg.append(msg) 605 else: 606 msg += 'failed' 607 self.statusmsg.append(msg) 608 if self.debug: self.tarwork() 609 return 5 610 611 612 for iterations in self.get_trays(): 613 614 # download and setup metaproject tarball (if needed) 615 build = self.fetch_metaproject(tray,tmpstorage=self.cache) 616 os.system("touch %s" % build) 617 618 self.setenv(build) 619 620 for iter in range(iterations): 621 msg = "%s - IceTray %d iteration %d....." % (time.strftime("%H:%M:%S"),tray,iter) 622 print msg 623 iter_start = time.time() 624 retval = self.execute(tray,iter) 625 print "return value", retval 626 self.setstats("tray(%u,%u) exec time" % (tray,iter),time.time()-iter_start) 627 if retval==0: 628 msg += 'ok' 629 self.statusmsg.append(msg) 630 print msg 631 if self.production and (self.ping > 0) and (iter%2 == 0): 632 try: 633 self.server.ping(self.dataset,self.procnum,self.localhost,self.passkey,tray,iter+1) 634 except Exception,e: 635 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 636 else: 637 msg += 'failed' 638 self.statusmsg.append(msg) 639 self.statusmsg.append('<br>---- log4cplus ----<br>') 640 self.statusmsg.append(tail(self.logoutput,100)) 641 self.statusmsg.append('<br>---- stdout ----<br>') 642 self.statusmsg.append(tail(self.out,200)) 643 self.statusmsg.append('<br>---- stderr ----<br>') 644 self.statusmsg.append(tail(self.err,350)) 645 print >> sys.stderr, msg 646 if self.debug: self.tarwork() 647 return retval 648 tray +=1 649 print "statsfile",self.statsfile 650 if self.statsfile.endswith('.xml'): 651 try: 652 self.getstats(self.statsfile) 653 except Exception,e: 654 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 655 print "unable to process summary file %s" % self.statsfile 656 if self.statsfile.endswith('.xml') and os.path.exists('iceprod.'+self.statsfile): 657 try: 658 self.getstats('iceprod.'+self.statsfile) 659 except Exception,e: 660 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 661 print "unable to process summary file iceprod.%s" % self.statsfile 662 663 if self.finish(): 664 return retval 665 else: return 7
666
667 - def finish(self):
668 """ 669 Stage out generated files and generate an inventory 670 """ 671 # set location of output tarball 672 tar = os.path.join(self.topdir,'output_%d_%d.tar' % (self.dataset,self.procnum)) 673 procnum = self.procnum 674 dataset = self.dataset 675 passkey = self.passkey 676 677 # copy output files to top directory 678 # exclude invisible files and .err .out 679 filelist = [] 680 if self.target: filelist.append(basename(self.logoutput)) 681 682 # also exclude gunzipped version of gzipped files 683 exclude_gunzip = map(lambda s: s.replace(".gz",""),self.excludefiles) 684 self.excludefiles.extend(exclude_gunzip) 685 686 # include all .root .i3 files 687 suffix = ["*.root","*.i3"] 688 suffix.extend( self.suffix_in ) 689 suffix_gz= map(lambda s: s+".gz",suffix) 690 suffix.extend( suffix_gz ) 691 suffix_in_gz= map(lambda s: s+".gz",self.suffix_in) 692 self.suffix_in.extend( suffix_in_gz ) 693 map(filelist.extend,map(glob.glob,suffix)) 694 excludelist = [] 695 map(excludelist.extend,map(glob.glob,self.excludefiles)) 696 print 'filelist',filelist 697 print 'excludelist',excludelist 698 699 # Set final progress of job 700 if self.production: 701 try: 702 tray = len(self.get_trays()) - 1 703 iter = self.get_trays()[-1] 704 self.server.ping(self.dataset,self.procnum,self.localhost,self.passkey,tray,iter) 705 except: pass 706 707 print "copying data" 708 if self.stageout and self.production: 709 self.logger.info("setting status to COPYING") 710 maxtries = 5 711 for trial in range(maxtries): 712 try: 713 self.server.copying(self.dataset,self.procnum,self.passkey) 714 break 715 except Exception, e: 716 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 717 self.logger.error(e) 718 if trial >= maxtries-1: 719 self.statusmsg.append('%s: Job was unable set status to copying' % e) 720 self.logger.error("COPYING failed") 721 self.abort(FAILED) 722 return False 723 # generate inventory 724 inventorylist = [] 725 inventory = open(os.path.join(self.topdir,_inventory),'w') 726 727 self.logger.info("generating inventory") 728 self.nocopy = self.stageout 729 if not self.nocopy: 730 for file in filelist: 731 if re.match(r'^\.',file) or file in excludelist: 732 if file not in self.suffix_in: continue 733 734 print file 735 if not (file.endswith('.gz') or file.endswith('.root')): 736 cmd ="gzip "+file+" -c > "+file+".gz" 737 print cmd 738 os.system(cmd) 739 file = file+".gz" 740 if self.mktar: 741 dir = os.path.dirname(file) 742 cmd = "tar" 743 if os.path.isdir(dir): 744 cmd += " -C%s " % dir 745 if not os.path.exists(tar): 746 cmd += " -cf" 747 else: 748 cmd += " -uf" 749 cmd += " %s %s" %(tar,basename(file)) 750 print cmd 751 os.system(cmd) 752 else: 753 cmd = "mv %s %s" % (file,self.topdir) 754 print cmd 755 if not basename(file) in inventorylist: 756 inventorylist.append(basename(file)) 757 print >> inventory, basename(file) 758 os.system(cmd) 759 inventory.close() 760 self.logger.info("inventory done") 761 self.logger.info("Setting i3exec runtime") 762 self.setstats("i3exec runtime",time.time()-self.starttime) 763 764 self.logger.info("production: %s" % self.production) 765 if self.production: 766 self.logger.info("Setting final job status") 767 try: 768 self.server.finish(self.dataset,self.procnum,cPickle.dumps(self.stats),self.passkey,self.stageout) 769 except Exception, e: 770 self.statusmsg.append('%s: Job was unable to set finish status' % e) 771 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 772 print e 773 self.logger.error("setting status failed") 774 self.abort(FAILED) 775 return True
776
777 - def abort(self,error,message=""):
778 self.flush_output() 779 if message: 780 self.statusmsg.append(message.decode("utf8","ignore")) 781 if self.production: 782 safe_abort(self.server.abort, 783 (self.procnum,self.dataset,error,'<br>'.join(self.statusmsg),self.passkey,cPickle.dumps(self.stats)))
784
785 - def startsoap(self,url,dataset,procnum,passkey):
786 self.localhost = functions.gethostname() 787 if url: 788 self.production = True 789 print "connecting to %s ..." % url 790 try: 791 self.server = xmlrpclib.ServerProxy(url) 792 status = self.server.start(self.localhost,dataset,procnum,passkey,self.grid) 793 dataset = status[0] 794 nproc = status[1] 795 procnum = status[2] 796 797 if len(status) > 3: 798 # in case we are talking to an old version of the server 799 self.passkey = status[3] 800 801 print "dataset: %d, nproc: %d, procnum:%d"%(dataset,nproc,procnum) 802 if dataset < 1: 803 print "Nothing to do. Exiting" 804 return False 805 else: 806 print "processing proc %d from dataset %d" % (procnum,dataset) 807 self.dataset = dataset 808 self.nproc = nproc 809 self.procnum = procnum 810 self.passkey = passkey 811 812 # overwrite option dictionary with newly set values from server 813 self.opts["nproc"] = self.nproc 814 self.opts["procnum"] = self.procnum 815 self.opts["dataset"] = self.dataset 816 self.opts["key"] = self.passkey 817 818 set_sig_handler(self,procnum,dataset,passkey) 819 return True 820 821 except Exception, e: 822 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 823 print >> sys.stderr, e , "could not fetch job from server" 824 return False 825 else: 826 return True
827 828
829 - def execute(self,tray,iter):
830 print '\n\n------ begin proc %d iteration %d in tray %d -------\n' % (self.procnum,iter,tray) 831 cmd = "" 832 cmd += "%s i3exec.py --nproc=%d " % (self.python,self.nproc) 833 cmd += " --procnum=%d --dataset=%d " % (self.procnum,self.dataset) 834 cmd += " --lib=" + " --lib=".join(self.opts['lib']) 835 cmd += " --iter=%d --tray=%d " % (iter,tray) 836 cmd += " --validate=%d " % self.validate 837 cmd += " --runcfg" 838 cmd += " --fetch=%s " % self.opts['fetch'] 839 if self.url: 840 cmd += " --url=%s " % self.url 841 cmd += " --key=%s " % self.passkey 842 if self.opts.has_key("zipsafe"): 843 cmd += " --zipsafe=%s " % self.opts['zipsafe'] 844 cmd += " %s" % self.configfile 845 cmd += " 2>>%s" % self.err 846 cmd += " 1>>%s" % self.out 847 cmd = expandvars(cmd) 848 print cmd 849 try: 850 return os.system(cmd) 851 except Exception,e: 852 print "Caught exception: ",e 853 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 854 return 1
855 #------ end of class Run ----- 856 857 #------ start of class RunInstanceDAG --------------------------------------- 858
859 -class RunInstanceDAG(RunInstance):
860 - def __init__(self,opts):
861 """RunInstance constructor for multipart jobs.""" 862 RunInstance.__init__(self,opts) 863 self.task = None 864 self.taskname = '' 865 self.dagtemp = '' 866 self.tray = False 867 self.iter = False 868 self.protocol = '' 869 self.host = '' 870 self.basedir = '' 871 872 if self.opts.has_key("tray"): 873 self.tray = int(self.opts["tray"]) 874 if self.opts.has_key("iter"): 875 self.iter = int(self.opts["iter"]) 876 if self.opts.has_key("task"): 877 self.taskname = self.opts["task"] 878 if self.opts.has_key("dagtemp"): 879 self.dagtemp = self.opts["dagtemp"] 880 pos = self.dagtemp.find("://") 881 self.protocol = self.dagtemp[0:pos] 882 host_end = self.dagtemp.find("/",pos+3) 883 self.host = self.dagtemp[pos+3:host_end] 884 self.basedir = self.dagtemp[host_end:] 885 886 if self.protocol != 'gsiftp': 887 # only support either GridFTP or local file 888 self.basedir = self.host + self.basedir 889 self.host = ''
890 891 #------ start of functions redefined from RunInstance ------------------------- 892
893 - def run(self):
894 """Runs a task in a multipart simulation job.""" 895 896 self.task = self.steering.GetTaskDefinition(self.taskname) 897 if not self.task: 898 print "task '%s' not found for multipart job" % self.taskname 899 return TASK_NOT_FOUND 900 901 if not self.dagtemp: 902 print "no storage location specified for temporary files" 903 return TASK_CONFIGURATION_ERROR 904 905 print "multipart job enabled, executing task %s" % self.taskname 906 907 print "using %s://%s%s for temporary file storage" \ 908 % (self.protocol, self.host, self.basedir) 909 910 retval = 0 911 912 msg = "Configuring log4cplus....." 913 print msg 914 try: 915 self.setlog4cplus() 916 msg += 'ok' 917 except: 918 msg += 'failed' 919 self.handle_exception(msg) 920 self.statusmsg.append(msg) 921 922 trays = self.task.GetTrays().values() 923 if not self.tray is False: 924 trays = [self.task.GetTray(self.tray)] 925 926 input_needed = True 927 for tray in trays: 928 idx = tray.GetIndex() 929 if not self.iter is False: 930 iters = [int(self.iter)] 931 else: 932 iters = tray.GetIters() 933 for iter in iters: 934 task_id = self.server_start_task(idx, iter) 935 if task_id == TASK_ERROR_ID: 936 print "Bad task ID received from server" 937 return TASK_INVALID 938 939 if input_needed: 940 self.server_copying_input(task_id) 941 msg = "Fetching input from previous tasks....." 942 retval = self.fetch_input() 943 if not self.check_retval(retval, msg): 944 self.server_abort(task_id) 945 return retval 946 input_needed = False 947 948 retval, stats = self.execute(task_id, idx, iter) 949 if retval != 0: 950 self.server_abort(task_id) 951 return retval 952 else: 953 self.server_finish_task(task_id, self.stats) 954 955 if not self.task.IsCleanup(): 956 self.server_copying_output(task_id) 957 msg = "Storing intermediate output....." 958 retval = self.store_output() 959 if not self.check_retval(retval, msg): 960 self.server_abort(task_id) 961 return retval 962 963 self.server_finish_task(task_id) 964 965 if self.task.IsCleanup(): 966 self.server_finish_job() 967 968 return 0
969
970 - def execute(self, task_id, tray, iter):
971 """Executes a given tray and iteration within this task.""" 972 self.server_processing(task_id) 973 974 if self.task.IsCleanup(): 975 return self.task_run_cleanup() 976 977 if iter == TASK_EXTERN_ITER: 978 return self.task_run_externs() 979 980 if not self.stats: 981 self.stats = {} 982 983 # download and setup metaproject tarball (if needed) 984 build = self.fetch_metaproject(tray, tmpstorage=self.cache) 985 print "metaproject fetched" 986 987 self.setenv(build) 988 989 print "environment setup complete" 990 msg = "%s - IceTray %d iteration %d....." % (time.strftime("%H:%M:%S"),tray,iter) 991 print msg 992 993 print '\n\n------ begin proc %d iteration %d in tray %d -------\n' \ 994 % (self.procnum,iter,tray) 995 cmd = "" 996 cmd += "python -u i3exec.py --nproc=%d " % self.nproc 997 cmd += " --procnum=%d --dataset=%d " % (self.procnum,self.dataset) 998 cmd += " --lib=" + " --lib=".join(self.opts['lib']) 999 cmd += " --iter=%d --tray=%d " % (iter,tray) 1000 cmd += " --runcfg" 1001 cmd += " --dag" 1002 cmd += " --fetch=%s " % self.opts['fetch'] 1003 if self.url: 1004 cmd += " --url=%s " % self.url 1005 cmd += " --key=%s " % self.passkey 1006 if self.opts.has_key("zipsafe"): 1007 cmd += " --zipsafe=%s " % self.opts['zipsafe'] 1008 if self.dagtemp: 1009 cmd += " --dagtemp=%s" % self.dagtemp 1010 cmd += " %s" % self.configfile 1011 cmd = expandvars(cmd) 1012 print cmd 1013 try: 1014 retval = os.system(cmd) 1015 except Exception: 1016 self.handle_exception("error when executing iteration %d in tray %d" \ 1017 % (iter, tray)) 1018 retval = TASK_RUN_EXCEPTION 1019 1020 if not self.check_retval(retval, msg): 1021 return retval, self.stats 1022 1023 print "statsfile",self.statsfile 1024 if self.statsfile.endswith('.xml'): 1025 try: 1026 self.getstats(self.statsfile) 1027 except: 1028 self.handle_exception("unable to process summary file %s" \ 1029 % self.statsfile) 1030 1031 return retval, self.stats
1032
1033 - def setstats(self,name,value):
1034 """Set task statistics. This version is non-cumulative.""" 1035 self.stats[name] = value
1036
1037 - def startsoap(self,url,dataset,procnum,passkey):
1038 """Connects to the monitoring daemon and gets the task ID.""" 1039 self.localhost = os.uname()[1] 1040 if url: 1041 self.production = True 1042 print "connecting to %s ..." % url 1043 try: 1044 self.server = xmlrpclib.ServerProxy(url) 1045 1046 dataset,nproc,procnum = self.server.multipart_job_start(dataset,procnum,passkey) 1047 print "dataset: %d, nproc: %d, procnum:%d"%(dataset,nproc,procnum) 1048 if dataset < 1: 1049 print "Nothing to do. Exiting" 1050 return False 1051 else: 1052 print "processing proc %d from dataset %d" % (procnum,dataset) 1053 self.dataset = dataset 1054 self.nproc = nproc 1055 self.procnum = procnum 1056 self.passkey = passkey 1057 1058 # overwrite option dictionary with newly set values from server 1059 self.opts["nproc"] = self.nproc 1060 self.opts["procnum"] = self.procnum 1061 self.opts["dataset"] = self.dataset 1062 1063 set_sig_handler(self,procnum,dataset,passkey) 1064 return True 1065 except: 1066 self.handle_exception("could not fetch job from server") 1067 return False 1068 else: 1069 return True
1070
1071 - def fetch_dependencies(self):
1072 """Does nothing; dependencies will be fetched during fetch_input().""" 1073 return self.excludefiles
1074 1075 #------ end of functions redefined from RunInstance --------------------------- 1076 1077 #------ start of new functions for MultiRunInstance --------------------------- 1078
1079 - def check_retval(self, retval, msg=''):
1080 if retval == 0: 1081 if msg: 1082 msg += "ok" 1083 self.statusmsg.append(msg) 1084 print msg 1085 print "return value", retval 1086 return True 1087 else: 1088 if self.debug: 1089 self.tarwork() 1090 if msg: 1091 msg += "failed" 1092 self.statusmsg.append(msg) 1093 print msg 1094 print >> sys.stderr, msg 1095 self.statusmsg.append(tail(self.logoutput, 200)) 1096 print "return value", retval 1097 return False
1098
1099 - def server_start_task(self, idx, iter):
1100 try: 1101 print "starting task %s tray %s iter %s" \ 1102 % (self.task.GetName(), idx, iter) 1103 return self.server.task_start(self.dataset, self.procnum, \ 1104 self.task.GetName(), idx, iter, \ 1105 self.localhost, self.passkey) 1106 except: 1107 self.handle_exception("unable to get task ID from server") 1108 return TASK_ERROR_ID
1109
1110 - def server_abort(self, task_id):
1111 try: 1112 self.server.task_abort(task_id, self.passkey) 1113 return True 1114 except: 1115 self.handle_exception("unable to abort task %d" % task_id) 1116 return False
1117
1118 - def server_finish_task(self, task_id, stats={}):
1119 try: 1120 self.server.task_finish(task_id, cPickle.dumps(stats), self.passkey) 1121 return True 1122 except: 1123 self.handle_exception("unable to mark task %d as finished" % task_id) 1124 return False
1125
1126 - def server_finish_job(self):
1127 try: 1128 self.server.multipart_job_finish(self.dataset, self.procnum, self.passkey) 1129 return True 1130 except: 1131 self.handle_exception("unable to mark job %d.%d as finished" \ 1132 % (self.dataset, self.procnum)) 1133 return False
1134
1135 - def server_processing(self, task_id):
1136 try: 1137 self.server.task_processing(task_id, self.passkey) 1138 return True 1139 except: 1140 self.handle_exception("unable to mark task %d as processing" % task_id) 1141 return False
1142
1143 - def server_copying_input(self, task_id):
1144 try: 1145 self.server.task_copying_input(task_id, self.passkey) 1146 return True 1147 except: 1148 self.handle_exception("unable to mark task %d as copying output" % task_id) 1149 return False
1150
1151 - def server_copying_output(self, task_id):
1152 try: 1153 self.server.task_copying_output(task_id, self.passkey) 1154 return True 1155 except: 1156 self.handle_exception("unable to mark task %d as copying output" % task_id) 1157 return False
1158
1159 - def task_run_externs(self):
1160 msg = "%s - IceTray externs....." % time.strftime("%H:%M:%S") 1161 print msg 1162 retval, stats = self.runextern() 1163 self.check_retval(retval, msg) 1164 return retval, stats
1165
1166 - def task_run_cleanup(self):
1167 """ 1168 Remove the intermediate output from all parts of this multipart job. 1169 1170 This executed by the trashcan job that's added to the end of all the 1171 DAGs; it deletes the incidental files created by each node of the DAG 1172 and marks the job itself as complete. 1173 """ 1174 targetdir = self.get_intermediate_directory() 1175 if self.protocol == 'gsiftp': 1176 print "cleanup will use GridFTP (%s, %s)" % (self.host, targetdir) 1177 retval = self.uberftp(self.host,"rm -r %s" % targetdir) 1178 else: # anything other than GridFTP is assumed to be local file 1179 print "cleanup will use local filesystem (%s)" % (targetdir) 1180 cmd = "rm -rf %s" % (targetdir) 1181 print cmd 1182 retval = os.system(cmd) 1183 1184 msg = "deleting intermediate output....." 1185 self.check_retval(retval, msg) 1186 stats = {} 1187 1188 return retval, stats
1189
1190 - def handle_exception(self, msg=None):
1191 traceback.print_exc(file=sys.stderr) 1192 if msg: 1193 print msg 1194 print >> sys.stderr, msg
1195
1196 - def store_output(self):
1197 """Store intermediate output from this task.""" 1198 return self.handle_file_manifest("output")
1199
1200 - def fetch_input(self):
1201 """Fetch input from previous tasks.""" 1202 return self.handle_file_manifest("input")
1203
1204 - def handle_file_manifest(self, type="input"):
1205 """Processes a file manifest and copies the files contained therein.""" 1206 name = self.task.GetName() 1207 if self.task.ParallelExecutionEnabled(): 1208 name = name + "_" + str(self.iter) 1209 manifest = "iceprod.%u.%u.%s.%s" % (int(self.dataset), int(self.procnum), \ 1210 name, type) 1211 print "Reading %s manifest from %s" % (type, manifest) 1212 if os.path.exists(manifest): 1213 print "%s manifest found" % type 1214 f = open(manifest, "r") 1215 size = 0 1216 nw_size = 0 1217 xfer_time = 0 1218 nw_xfer_time = 0 1219 exclude = [] 1220 for line in f: 1221 pieces = line.split() 1222 photonics = False 1223 extract = True 1224 shared = False 1225 if len(pieces) == 2: 1226 source, dest = pieces 1227 else: 1228 source = pieces[0] 1229 dest = pieces[1] 1230 if pieces[2] == "photonics": 1231 photonics = True 1232 elif pieces[2] == "dontextract": 1233 extract = False 1234 elif pieces[2] == "global": 1235 shared = True 1236 1237 locked = False 1238 if type == "input": 1239 # is this a request for photonics tables? 1240 if photonics: 1241 photon_dir = os.getenv("PHOTON_TABLES_DIR") 1242 if exists(photon_dir): 1243 print "skipping %s because PHOTON_TABLES_DIR (%s) exists" % (source, photon_dir) 1244 continue 1245 myputenv("PHOTON_TABLES_DIR", self.workdir) 1246 1247 # fetch MD5 checksum 1248 retval = self.fetch_checksum(source, dest) 1249 if retval != 0: 1250 print "unable to retrieve checksum, return value: %u" % int(retval) 1251 print "integrity check disabled for %s" % dest 1252 verify = retval == 0 1253 dest_checksum = dest + TASK_CHECKSUM_SUFFIX 1254 1255 # photonics tables and global files get put in common storage 1256 if shared or photonics: 1257 if not exists(self.cache): 1258 try: os.makedirs(self.cache) 1259 except: pass 1260 cache_dest = os.path.join(self.cache, dest) 1261 cache_checksum = dest + ".cache" + TASK_CHECKSUM_SUFFIX 1262 lockfile = cache_dest + ".lock" 1263 while not locked: 1264 try: 1265 os.mknod(lockfile) 1266 locked = True 1267 except OSError, oserr: 1268 print "%s %s ." % (oserr,lockfile) 1269 time.sleep(300) # 5min 1270 try: 1271 if (time.time() - os.stat(lockfile).st_mtime) > 2400: 1272 os.remove(lockfile) 1273 except: pass 1274 if verify: 1275 retval = self.generate_checksum(cache_dest, cache_checksum) 1276 if retval == 0: 1277 if checksum(dest_checksum, cache_checksum): 1278 # file is ok, copy from cache to work dir 1279 source = cache_dest 1280 else: 1281 # verification failed, remove file 1282 os.remove(cache_dest) 1283 1284 local_file = dest 1285 if not extract: 1286 exclude.append(expandvars("${TMPWORK}/" + dest)) 1287 else: 1288 # generate MD5 sum 1289 retval = self.store_checksum(source,dest) 1290 if retval != 0: 1291 print "unable to store checksum, return value: %u" % int(retval) 1292 print "future tasks will be unable to check integrity for %s" % source 1293 local_file = source 1294 1295 start = time.time() 1296 retval = self.copy_file(source, dest) 1297 if retval != 0: 1298 if locked: 1299 os.remove(lockfile) 1300 return retval 1301 duration = time.time() - start 1302 file_size = os.path.getsize(expandvars("${TMPWORK}/" + local_file)) / (1024 * 1024) 1303 size += file_size 1304 xfer_time += duration 1305 if not (source.startswith("file://") or source.startswith("/")): 1306 nw_size += file_size 1307 nw_xfer_time += duration 1308 1309 if locked: 1310 os.remove(lockfile) 1311 1312 if type == "input": 1313 retval = self.verify_checksum(local_file) 1314 if retval != 0: 1315 print "integrity verification failed for %s" % local_file 1316 return retval 1317 1318 if size > 0: 1319 if xfer_time > 0: 1320 rate = size / xfer_time 1321 else: 1322 rate = size 1323 else: 1324 rate = 0 1325 1326 if nw_size > 0: 1327 if nw_xfer_time > 0: 1328 nw_rate = nw_size / nw_xfer_time 1329 else: 1330 nw_rate = nw_size 1331 else: 1332 nw_rate = 0 1333 1334 self.setstats("%s size (MB)" % type, size) 1335 self.setstats("network %s size (MB)" % type, nw_size) 1336 self.setstats("%s rate (MB/s)" % type, rate) 1337 self.setstats("network %s rate (MB/s)" % type, nw_rate) 1338 f.close() 1339 1340 if type == "input": 1341 unpack(exclude) 1342 else: 1343 print "No %s manifest found" % type 1344 return 0
1345
1346 - def fetch_checksum(self,source,dest):
1347 return self.copy_file(source + TASK_CHECKSUM_SUFFIX, dest + TASK_CHECKSUM_SUFFIX)
1348
1349 - def store_checksum(self,source,dest):
1350 checksum_src = source + TASK_CHECKSUM_SUFFIX 1351 checksum_dst = dest + TASK_CHECKSUM_SUFFIX 1352 retval = self.generate_checksum(source, checksum_src) 1353 if retval != 0: 1354 print "unable to generate checksum for %s" % source 1355 return retval 1356 retval = self.copy_file(checksum_src, checksum_dst) 1357 return retval
1358
1359 - def generate_checksum(self,file,checksum):
1360 cmd = "md5sum %s > %s" % (file, checksum) 1361 print cmd 1362 return os.system(cmd)
1363
1364 - def verify_checksum(self,file):
1365 checksum = file + TASK_CHECKSUM_SUFFIX 1366 if not os.path.exists(checksum): 1367 print "integrity verification skipped, no checksum available for %s" % file 1368 return 0 1369 cmd = "md5sum -c %s" % checksum 1370 print cmd 1371 return os.system(cmd)
1372
1373 - def copy_file(self,source,dest):
1374 info = {} 1375 for type, file in {"in": source, "out": dest}.items(): 1376 if not info.has_key(type): 1377 info[type] = {} 1378 pos = file.find(":") 1379 if pos == -1: 1380 file = os.path.join("file://${TMPWORK}", file) 1381 pos = 4 1382 protocol = file[:pos] 1383 info[type]["protocol"] = protocol 1384 if protocol == "file": 1385 info[type]["path"] = file[7:] 1386 elif protocol == "gsiftp" or protocol == "http": 1387 file = file[pos + 3:] 1388 pos = file.find("/") 1389 1390 info[type]["host"] = file[:pos] 1391 info[type]["path"] = file[pos:] 1392 else: 1393 print "Unknown transfer protocol: %s" % protocol 1394 return TASK_XFER_PROTO_ERROR 1395 1396 if info["out"]["protocol"] == "http" or info["out"]["protocol"] == "ftp": 1397 print "Unsupported destination protocol: %s" % info["out"]["protocol"] 1398 return TASK_XFER_CONFIG_ERROR 1399 1400 files = [] 1401 for type, file in info.iteritems(): 1402 url = "%s://" % file["protocol"] 1403 if file.has_key("host"): 1404 url += file["host"] + "/" 1405 url += file["path"] 1406 files.append(url) 1407 1408 print files 1409 if info["in"]["protocol"] == "gsiftp" or info["out"]["protocol"] == "gsiftp": 1410 proxy = self.steering.GetSysOpt("globus_proxy").GetValue() 1411 os.chmod(proxy,0600) 1412 globus_loc = self.steering.GetSysOpt("globus_location").GetValue() 1413 path = os.path.join(globus_loc, "bin") 1414 bin = os.path.join(path, 'globus-url-copy') 1415 certs = os.path.join(globus_loc, "certificates") 1416 lib = os.path.join(globus_loc, "lib") 1417 cmd = "LD_LIBRARY_PATH=%s:$LD_LIBRARY_PATH DYLD_LIBRARY_PATH=%s:$DYLD_LIBRARY_PATH" 1418 cmd += " X509_CERT_DIR=%s X509_USER_PROXY=%s %s -cd -r -nodcau %s %s" 1419 cmd = cmd % (lib, lib, certs, proxy, bin, files[0], files[1]) 1420 elif info["in"]["protocol"] == "http" or info["in"]["protocol"] == "ftp": 1421 cmd = "wget -nv -O %s %s" % (info["out"]["path"], files[0]) 1422 elif info["in"]["protocol"] == "file" and info["out"]["protocol"] == "file": 1423 # ensure destination directory exists 1424 cmd = "mkdir -p %s" % os.path.dirname(info["out"]["path"]) 1425 print cmd 1426 retval = os.system(cmd) 1427 if retval != 0: 1428 return retval 1429 1430 # try to copy using hard links first 1431 cmd = "cp -l %s %s" % (info["in"]["path"], info["out"]["path"]) 1432 print cmd 1433 retval = os.system(cmd) 1434 if retval == 0: 1435 return 0 1436 1437 # hard link failed, so try normal copy 1438 cmd = "cp %s %s" % (info["in"]["path"], info["out"]["path"]) 1439 else: 1440 print "Unknown transfer protocol" 1441 return TASK_XFER_PROTO_ERROR 1442 cmd = expandvars(cmd) 1443 1444 print cmd 1445 return os.system(cmd)
1446 1447
1448 - def uberftp(self,host,cmd):
1449 """Perform an FTP command on a remote host using uberftp.""" 1450 proxy = self.steering.GetSysOpt("globus_proxy").GetValue() 1451 os.chmod(proxy,0600) 1452 1453 globus_loc = self.steering.GetSysOpt("globus_location").GetValue() 1454 path = os.path.join(globus_loc, "bin") 1455 bin = os.path.join(path, 'uberftp') 1456 1457 certs = os.path.join(globus_loc, "certificates") 1458 1459 cmd = 'X509_CERT_DIR=%s X509_USER_PROXY=%s %s %s "%s"' \ 1460 % (certs, proxy, bin, host, cmd) 1461 print cmd 1462 return os.system(cmd)
1463
1464 - def get_intermediate_directory(self):
1465 directory = self.basedir 1466 directory = os.path.join(self.basedir, str(self.dataset)) 1467 directory = os.path.join(directory, str(self.procnum)) 1468 return directory
1469 1470 #------ end of new functions for MultiRunInstance ----------------------------- 1471 1472 #------ end of class MultiRunInstance ----------------------------------------- 1473
1474 -def set_sig_handler(run,procnum,dataset,passkey):
1475 """ 1476 install signal handler to handle errors and evictions 1477 1478 @param run: RunInstance 1479 @param procnum: process number within dataset 1480 @param dataset: dataset id 1481 @param passkey: random passkey assigned to process by daemon 1482 """ 1483 def handler(signum,frame): 1484 print >> sys.stderr, "aborting" 1485 print >> sys.stderr, str(frame) 1486 run.flush_output() 1487 run.server.abort(procnum,dataset,EVICTED,'Job has been evicted.',passkey) 1488 _done = True 1489 os._exit(5)
1490 signal.signal(signal.SIGTERM, handler) 1491
1492 -def create_workspace(work,top):
1493 os.makedirs(work) 1494 os.chdir(work) 1495 myputenv('TMPWORK',work) 1496 myputenv('TMPTOP',top) 1497 cwd = os.getcwd() 1498 for file in os.listdir(top): 1499 if file.endswith(".out") or file.endswith(".err"): 1500 continue 1501 if not os.path.exists(file): 1502 os.symlink(join(top,file),file) 1503 return work
1504
1505 -def clean_workspace(top,work):
1506 os.chdir(top) 1507 if _clean: 1508 print "Cleaning workspace %s" % work 1509 os.system('rm -rf %s' % work)
1510
1511 -def safe_abort(abortfunc,args,msg='',tries=4,wait=2):
1512 failures = 0 1513 while failures < tries: 1514 print "resetting job status" 1515 try: 1516 return apply(abortfunc,args) 1517 except socket.error, e: 1518 print >> sys.stderr, e,"retrying" 1519 time.sleep(wait*failures) 1520 failures += 1
1521 1522
1523 -def wget(url,dest='./'):
1524 dest = os.path.abspath(expandvars(dest)) 1525 url = expandvars(url) 1526 if os.path.isdir(dest.replace('file:','')): 1527 dest = os.path.join(dest, os.path.basename(url)) 1528 if not dest.startswith("file:"): 1529 dest = "file:" + dest 1530 1531 if url.startswith("http://") or url.startswith("ftp://"): 1532 dest = dest.replace("file:","") 1533 cmd = 'wget -nv --tries=4 --output-document=%s %s'%(dest,url) 1534 elif url.startswith("lfn:"): 1535 cmd = "lcg-cp --vo icecube -v %s %s" % (url,dest) 1536 elif url.startswith("file:"): 1537 src = url.replace("file:","") 1538 dest = dest.replace("file:","") 1539 cmd = 'cp %s %s'% (src,dest) 1540 elif url.startswith("gsiftp://"): 1541 globus = expandvars("$GLOBUS_LOCATION") 1542 lib = os.path.join(globus,'lib') 1543 path = os.path.join(globus,'bin') 1544 bin = os.path.join(path,'globus-url-copy') 1545 print "globus",globus 1546 os.system("/bin/ls") 1547 if not dest.startswith("file:"): 1548 dest = "file:%s" % os.path.abspath(dest) 1549 cmd = "LD_LIBRARY_PATH=%s:$LD_LIBRARY_PATH DYLD_LIBRARY_PATH=%s:$DYLD_LIBRARY_PATH" 1550 cmd += " %s -cd -r -nodcau %s %s" 1551 cmd = cmd % (lib, lib, bin, url, dest) 1552 else: 1553 raise Exception, "unsupported protocol %s" % url 1554 print cmd 1555 return os.system(cmd)
1556 1557
1558 -def checksum(sum1,sum2):
1559 if not exists(sum1): return False 1560 if not exists(sum2): return False 1561 return not os.system("diff %s %s" % (sum1,sum2))
1562
1563 -def isurl(url):
1564 return url.startswith("http://") \ 1565 or url.startswith("ftp://") \ 1566 or url.startswith("file://")
1567 1568
1569 -def setplatform(platform):
1570 """ 1571 Set platform information from system and format it 1572 according to icetray standard 1573 """ 1574 global _platform 1575 _platform = platform
1576
1577 -def getplatform(reset=False,legacymode=False):
1578 """ 1579 Get platform information from system and format it 1580 according to icetray standard 1581 @return: plaform string 1582 """ 1583 global _platform 1584 if not reset: 1585 if _platform: return _platform 1586 1587 my_sys = os.uname() 1588 my_os = my_sys[0] 1589 my_arch = my_sys[4] 1590 1591 handle = os.popen('gcc -dumpversion','r') 1592 gccversion = handle.readline().strip() 1593 handle.close() 1594 compiler_version = gccversion.split('.'); 1595 compiler_major, compiler_minor = map(int,compiler_version[:2]) 1596 1597 _platform = "%s-%s.gcc-%s" % (my_os,my_arch,gccversion) 1598 return _platform
1599
1600 -def next_platform(plat):
1601 """ 1602 Get next platform tag 1603 """ 1604 known_versions = [ 1605 [4,1,3], 1606 [4,1,2], 1607 [3,4,6], 1608 ] 1609 p = plat.split(".gcc-") 1610 gcc = map(int,p[1].split('.')) 1611 if gcc <= [3,4,6]: 1612 raise Exception, "No compatible plaforms found" 1613 if gcc > [4,1,3]: 1614 for i in range(3): 1615 index = 2-i 1616 ver = gcc[index] 1617 gcc[index] = (gcc[index] - 1) % 10 1618 if gcc[index] < ver: break 1619 else: 1620 for ver in known_versions: 1621 if ver < gcc: 1622 gcc = ver 1623 break 1624 return "%s.gcc-%u.%u.%u" %(p[0],gcc[0],gcc[1],gcc[2])
1625 1626
1627 -def myputenv(name,value):
1628 if value == None: print name,value 1629 else: os.environ[name]=value
1630 1631
1632 -def unpack(exclude=[]):
1633 print "unpacking %s..." % os.getcwd() 1634 for archive in [filename for filename in os.listdir(".") if filename.endswith(".gz") and not filename in exclude]: 1635 print "gunzip "+archive 1636 base,suffix = os.path.splitext(archive) 1637 os.system("gunzip "+archive+" -c > "+base) 1638 1639 for archive in [filename for filename in os.listdir(".") if filename.endswith(".bz2") and not filename in exclude]: 1640 print "bunzip2 "+archive 1641 os.system("bunzip2 "+archive) 1642 1643 for archive in [filename for filename in os.listdir(".") if filename.endswith(".tar") and not filename in exclude]: 1644 print "tar xf "+archive 1645 os.system("tar xf "+archive) 1646 1647 for archive in [filename for filename in os.listdir(".") if filename.endswith(".tgz") and not filename in exclude]: 1648 print "tar xzf "+archive 1649 os.system("tar xzf "+archive)
1650