1
2
3
4
5
6
7
8
9 import os,sys,time,string,re
10 import signal
11 import random
12 import getpass
13 import os.path,getopt
14 import commands
15 import platform
16 import logging
17 import glob
18 import time
19 from os.path import expandvars,join,exists,basename,dirname,abspath
20 from iceprod.core.dataclasses import *
21 from iceprod.core.xmlparser import IceTrayXMLParser
22 from iceprod.core.lex import *
23 from iceprod.core import functions
24 from iceprod.core.constants import *
25
26 import xmlrpclib
27 import socket
28 import cPickle
29 from ConfigParser import SafeConfigParser
30 from ipxml import PrettyPrint
31
32 import traceback
33
34 _platform = None
35 _inventory = "sp_outfile_inventory.list"
36 _done = False
37 _clean = True
38
39 EVICTED = 1
40 FAILED = 2
41 python_version = platform.python_version()[:3]
42
43 _child = 0
44
45 _baseurl = 'http://icecube.wisc.edu/simulation/downloads'
46 _tmpstorage = '/tmp/%s_icesoft' % getpass.getuser()
47
48
50 if not exists(tmpstorage):
51 try: os.makedirs(tmpstorage)
52 except: pass
53
54 lockfile = join(tmpstorage,"%s.lock" % meta.filebase)
55 cache = join(tmpstorage,meta.filebase + meta.suffix)
56 cachedir = join(tmpstorage,meta.filebase)
57
58
59 md5url = meta.url + '.md5sum'
60 print "retrieving %s..." % md5url
61 failed = wget(md5url)
62 if failed: raise Exception, "unable to fetch file from %s" % md5url
63
64 cwd = os.getcwd()
65
66 trials = 5
67 while (not exists(cache)) or \
68 (not exists(cachedir)) or \
69 (not checksum(join(cwd,meta.md5sum),join(cachedir,meta.md5sum))):
70
71 os.chdir(tmpstorage)
72 try:
73 os.mknod(lockfile)
74 except OSError,oserr:
75 print "%s %s ." % (oserr,lockfile)
76 time.sleep(300)
77 try:
78 if (time.time() - os.stat(lockfile).st_mtime) > 2400:
79 os.remove(lockfile)
80 except: pass
81 continue
82
83 os.system("rm -rf %s %s" % (cache,cachedir))
84 print "retrieving %s..." % (meta.url + meta.suffix)
85 failed = wget(meta.url+meta.suffix)
86 if failed:
87 os.remove(lockfile)
88 raise Exception, "unable to fetch file from %s" % meta.url + meta.suffix
89
90 if os.path.isdir(meta.filebase):
91 os.system("rm -rf " + meta.filebase)
92
93
94 os.mkdir(meta.filebase)
95 os.system("tar -C%s -xzf %s" % (basedir,cache))
96 os.system("md5sum %s > %s" % (meta.filebase+meta.suffix,join(cachedir,meta.md5sum)))
97 os.remove(lockfile)
98 os.chdir(cwd)
99
100 if trials < 1:
101 raise Exception, "Max no. of trails reached to stage metaproject '%s'" % meta.filebase
102 trials -= 1
103
104 os.chdir(cwd)
105 meta.path = join(tmpstorage,meta.filebase)
106 return meta
107
108 -def tail(logpath,chars=75):
109 """
110 Read log4cplus to write output logging
111 @param logpath: path to logfile
112 @param chars: number of characters end of file
113 @return: last n characters in file
114 """
115 if not os.path.exists(logpath):
116 return "no log output found %s" % logpath
117 logfile = open(logpath,'r')
118
119
120 st_results = os.stat(logpath)
121 st_size = st_results[6]
122 logfile.seek(max(0,st_size-chars))
123 header = '<br>----%s----:<br>' % os.path.basename(logpath)
124 return "<br>".join(logfile.readlines())
125
126
128
130 if not self.stats.has_key(name):
131 self.stats[name] = value
132 elif type(value) == type(float()):
133 self.stats[name] += value
134 elif type(value) == type(int()):
135 self.stats[name] += value
136 else:
137 self.stats[name] = value
138
140 global _baseurl
141 self.logger = logging.getLogger('iceprod::exe')
142
143 self.python = sys.executable
144 self.starttime = time.time()
145 self.env = {}
146 self.opts = opts
147 self.passkey = None
148 self.production = False
149 self.server = None
150 self.nproc = 1
151 self.procnum = 0
152 self.dataset = 0
153 self.configfile = 'config.xml'
154 self.statsfile = 'summary.xml'
155 self.url = None
156 self.topdir = None
157 self.workdir = "."
158 self.build = None
159 self.baseurl = _baseurl
160 self.stats = {}
161 self.statusmsg = []
162 self.excludefiles = []
163 self.debug = False
164 self.mktar = False
165 self.suffix_in = []
166 self.mpdownloaded = {}
167 self.logoutput = "icetray.log"
168 self.err = "icetray.err"
169 self.out = "icetray.out"
170 self.ping = 0
171 self.stageout = False
172 self.nocopy = False
173 self.target = None
174 self.cache = '/tmp/%s_icesoft' % getpass.getuser()
175 self.localhost = os.uname()[1]
176 self.grid = 0
177 self.socktimeout = 0
178 self.validate = 0
179
180
181 if self.opts.has_key("url"):
182 self.url = self.opts["url"]
183 self.production = True
184 if self.opts.has_key("ping"):
185 self.ping = int(self.opts["ping"])
186 if self.opts.has_key("dataset"):
187 self.dataset = int(self.opts["dataset"])
188 if self.opts.has_key("procnum"):
189 self.procnum = int(self.opts["procnum"])
190 if self.opts.has_key("nproc"):
191 self.nproc = int(self.opts["nproc"])
192 if self.opts.has_key("key"):
193 self.passkey = self.opts['key']
194 if self.opts.has_key("fetch"):
195 self.baseurl = self.opts['fetch']
196 if self.opts.has_key("mktar"):
197 self.mktar = True
198 if self.opts.has_key("debug"):
199 self.debug = int(self.opts["debug"])
200 if self.opts.has_key("target"):
201 self.target = self.opts['target']
202 if self.opts.has_key("stageout"):
203 self.stageout = int(self.opts['stageout'])
204 if self.opts.has_key("nocopy"):
205 self.nocopy = True
206 if self.opts.has_key("grid"):
207 self.grid = int(self.opts["grid"])
208 if self.opts.has_key("timeout"):
209 self.socktimeout = int(self.opts["timeout"])
210 if self.opts.has_key("validate"):
211 self.validate = int(self.opts["validate"])
212
213
214
284
285
287 """
288 Get current environment and save it
289 """
290 self.env['PYTHONPATH'] = expandvars("$PYTHONPATH")
291 self.env['PATH'] = expandvars("$PATH")
292 self.env['LD_LIBRARY_PATH'] = expandvars("$LD_LIBRARY_PATH")
293 self.env['DYLD_LIBRARY_PATH'] = expandvars("$DYLD_LIBRARY_PATH")
294
296 """
297 Configure environment for run
298 @param i3build: working directory path
299 """
300
301 if self.build == i3build:
302 print "setenv: Using previously selected build environment in"
303 print i3build
304 return
305 self.build = i3build
306
307
308 platform = getplatform()
309 myputenv('I3_PLATFORM','')
310 if platform == 'system':
311 platform = getplatform(reset=True)
312 myputenv('I3_PLATFORM',platform)
313 myputenv('PLATFORM',platform)
314 print platform
315 print "Host info:"
316 print ' '.join(os.uname())
317 print "running in "+os.getcwd()
318 myputenv('I3_BUILD', i3build)
319 myputenv('I3_WORK', os.path.join(i3build,'projects'))
320 myputenv('I3_SRC', i3build)
321 myputenv('I3_PORTS', i3build)
322 myputenv('I3_TOOLS', i3build)
323 rootsys = os.path.join(i3build,'cernroot')
324 if os.path.exists(rootsys):
325 myputenv('ROOTSYS', rootsys)
326 elif self.steering.GetSysOpt("rootsys"):
327 rootsys = self.steering.GetSysOpt("rootsys").GetValue()
328 myputenv('ROOTSYS', expandvars(rootsys))
329 else:
330 myputenv('ROOTSYS', i3build)
331
332
333
334 javasearchpath = expandvars('$JAVA_HOME:/usr/java:/usr/local/java')
335 java = functions.findjava(javasearchpath)
336
337 ldlibpath = []
338 if java:
339 myputenv('JAVA_HOME',java[0])
340 ldlibpath.append(java[1])
341 else:
342 print >> sys.stderr,"!!!!!!!!!JAVA NOT FOUND IN %s!!!!!!!!!!!"% javasearchpath
343 print expandvars("using java in $JAVA_HOME")
344
345
346 ldlibpath.append(expandvars("$ROOTSYS/lib"))
347 ldlibpath.append(expandvars("$I3_BUILD/lib"))
348 ldlibpath.append(expandvars("$I3_BUILD/lib/tools"))
349 if os.uname()[4].endswith("64"):
350 ldlibpath.append(expandvars("/usr/lib64"))
351 ldlibpath.append(expandvars("/usr/lib"))
352 ldlibpath.append(self.env['LD_LIBRARY_PATH'])
353 myputenv('DYLD_LIBRARY_PATH', ":".join(ldlibpath))
354 myputenv('LD_LIBRARY_PATH', ":".join(ldlibpath))
355
356
357 pythonpath = []
358 pythonpath.append(expandvars("."))
359 pythonpath.append(expandvars("$I3_BUILD/lib"))
360 pythonpath.append(expandvars("$I3_BUILD/lib/python"))
361 pythonpath.append(expandvars("$I3_BUILD/lib/python/site-packages"))
362 pythonpath.append(self.env['PYTHONPATH'])
363 myputenv('PYTHONPATH', ":".join(pythonpath))
364
365 path = []
366 path.append(expandvars("."))
367 path.append(expandvars("$I3_BUILD/bin"))
368 path.append(expandvars("$PYROOT/bin"))
369 path.append(expandvars("/bin"))
370 path.append(expandvars("/usr/bin"))
371 path.append(self.env["PATH"])
372 myputenv('PATH', ":".join(path))
373
374 myputenv('USER_CLASSPATH',expandvars("$I3_WORK/monolith-reader/resources/triggerUtil.jar"))
375 myputenv('ICETRAY_CLASSPATH',expandvars("$I3_BUILD/lib/mmc.jar"))
376
377
378 print '\n----------- environment contents ------------'
379 for envkey in os.environ.keys():
380 print envkey, ":"
381 for env_val in os.environ[envkey].split(":"):
382 print "\t", env_val
383
384
438
439
441 """
442 Download files needed from server.
443 """
444 print "fetching dependencies %s" % self.baseurl
445 for d in self.steering.GetDependencies():
446 fetchurl = self.parser.parse(d)
447 if not isurl(fetchurl):
448 fetchurl = self.parser.parse("%s/%s" % (self.baseurl,fetchurl))
449 print "retrieving %s..." % fetchurl
450 failed = wget(fetchurl)
451 if failed:
452 raise Exception, "unable to fetch file from %s" % fetchurl
453 self.excludefiles.append(basename(fetchurl))
454 return self.excludefiles
455
456
458
459 if not os.uname()[0] == 'Linux':
460 self.logger.info("getmemusage: Not a Linux machine")
461 return
462 usage = open("/proc/self/status")
463 for line in usage.readlines():
464 if line.startswith('Vm'):
465 name,value,unit = line.split()
466 value = float(value.strip())
467 if unit == "MB":
468 value = value*1024
469 if not stats.has_key(name):
470 stats[name] = value
471 else:
472 stats[name] = max(value,stats[name])
473 usage.close()
474
476
477 if not file: file = self.statsfile
478 stats = {}
479 if not os.path.exists(file):
480 print "Unable to find statsfile", file
481 return self.stats
482 if file.endswith('.xml'):
483 stats = XMLSummaryParser().ParseFile(file)
484 else:
485 if file:
486 statsfile = open(file,'r')
487 else:
488 statsfile = open(self.statsfile,'r')
489 for line in statsfile:
490 s = line.split(':')
491 if len(s) > 1:
492 stats[s[0].strip()] = s[1].strip()
493 statsfile.close()
494
495 for key in stats.keys():
496 try:
497 self.setstats(key,float(stats[key]))
498 except:
499 self.setstats(key,stats[key])
500
501
502 return stats
503
505 workpath = dirname(self.workdir)
506 workdir = basename(self.workdir)
507 os.system("tar -C%s -czf %s/work.tgz ./%s" % (workpath,self.topdir,workdir))
508
510 externstats = '.extern.stats'
511 print '\n\n------ begin proc %d externs -------\n' % self.procnum
512 outfile = os.path.join(self.workdir,"i3exe_%d.out" % self.procnum)
513 cmd = "/usr/bin/time -o %s " % externstats
514 cmd += " -f \""
515 cmd += "mem_heap: %K\\n"
516 cmd += "mem_heap_peak:%M\\n"
517 cmd += "user_time:%U\\n"
518 cmd += "sys_time:%S\\n"
519 cmd += "real_time:%e\" -- "
520 cmd += "%s i3exec.py --nproc=%d " % (self.python ,self.nproc)
521 cmd += " --procnum=%d --dataset=%d " % (self.procnum,self.dataset)
522 cmd += " --fetch=%s " % self.opts['fetch']
523 cmd += " --extern=1"
524 cmd += " --lib=" + " --lib=".join(self.opts['lib'])
525 if self.url:
526 cmd += " --url=%s " % self.url
527 cmd += " --runcfg"
528 cmd += " %s" % self.configfile
529 cmd = expandvars(cmd)
530 print cmd
531 sys.stdout.flush()
532
533
534 retval = os.system(cmd)
535 stats = self.getstats(externstats)
536 return retval,stats
537
539 """
540 Extract number of trays and iterations in each tray
541 @return: array of indices indicating the number of iterations in each tray
542 """
543 iters = map(IceTrayConfig.GetIterations,self.steering.GetTrays())
544 return map(int,iters)
545
547 """
548 Configure log4cplus to write output logging
549 @return: path to log output file
550 """
551 lg4cppath = os.path.join(self.workdir,"log4cplus.conf")
552 myputenv('I3LOGGING_CONFIG', lg4cppath)
553 lg4cp = open(lg4cppath,'w')
554 print >> lg4cp, "log4cplus.appender.default=log4cplus::FileAppender"
555 print >> lg4cp, "log4cplus.appender.default.File=%s" % self.logoutput
556 print >> lg4cp, "log4cplus.appender.default.layout.ConversionPattern=%d [%d{%Z}] %-5p %c %x: %F:% L %m%n"
557 if self.debug:
558 print >> lg4cp, "log4cplus.rootLogger=INFO, default"
559 else:
560 print >> lg4cp, "log4cplus.rootLogger=WARN, default"
561 lg4cp.close()
562
563
564 handler = logging.FileHandler(self.logoutput)
565 formatter = logging.Formatter("iceprod : %(levelname)s %(name)s : %(message)s")
566 handler.setFormatter(formatter)
567 self.logger.addHandler(handler)
568
569 os.system("touch %s" % self.logoutput)
570 return self.logoutput
571
573 """
574 Cat out/err files
575 """
576 os.system("cat %s" % self.out)
577 os.system("cat %s >&2" % self.err)
578
580 """
581 Execute each of the icetray iterations in configuration
582 @return: execution value (0 if everything went ok)
583 """
584 retval = 0
585 tray = 0
586
587 msg = "Configuring log4cplus....."
588 print msg
589 try:
590 self.setlog4cplus()
591 msg += 'ok'
592 except Exception,e:
593 msg += 'failed'
594 print >> sys.stderr, exc_type, e
595 print >> sys.stderr, msg
596 self.statusmsg.append(msg)
597
598 msg = "%s - IceTray externs....." % time.strftime("%H:%M:%S")
599 print msg
600 retval,stats = self.runextern()
601 print "return value", retval
602 if retval==0:
603 msg += 'ok'
604 self.statusmsg.append(msg)
605 else:
606 msg += 'failed'
607 self.statusmsg.append(msg)
608 if self.debug: self.tarwork()
609 return 5
610
611
612 for iterations in self.get_trays():
613
614
615 build = self.fetch_metaproject(tray,tmpstorage=self.cache)
616 os.system("touch %s" % build)
617
618 self.setenv(build)
619
620 for iter in range(iterations):
621 msg = "%s - IceTray %d iteration %d....." % (time.strftime("%H:%M:%S"),tray,iter)
622 print msg
623 iter_start = time.time()
624 retval = self.execute(tray,iter)
625 print "return value", retval
626 self.setstats("tray(%u,%u) exec time" % (tray,iter),time.time()-iter_start)
627 if retval==0:
628 msg += 'ok'
629 self.statusmsg.append(msg)
630 print msg
631 if self.production and (self.ping > 0) and (iter%2 == 0):
632 try:
633 self.server.ping(self.dataset,self.procnum,self.localhost,self.passkey,tray,iter+1)
634 except Exception,e:
635 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
636 else:
637 msg += 'failed'
638 self.statusmsg.append(msg)
639 self.statusmsg.append('<br>---- log4cplus ----<br>')
640 self.statusmsg.append(tail(self.logoutput,100))
641 self.statusmsg.append('<br>---- stdout ----<br>')
642 self.statusmsg.append(tail(self.out,200))
643 self.statusmsg.append('<br>---- stderr ----<br>')
644 self.statusmsg.append(tail(self.err,350))
645 print >> sys.stderr, msg
646 if self.debug: self.tarwork()
647 return retval
648 tray +=1
649 print "statsfile",self.statsfile
650 if self.statsfile.endswith('.xml'):
651 try:
652 self.getstats(self.statsfile)
653 except Exception,e:
654 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
655 print "unable to process summary file %s" % self.statsfile
656 if self.statsfile.endswith('.xml') and os.path.exists('iceprod.'+self.statsfile):
657 try:
658 self.getstats('iceprod.'+self.statsfile)
659 except Exception,e:
660 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
661 print "unable to process summary file iceprod.%s" % self.statsfile
662
663 if self.finish():
664 return retval
665 else: return 7
666
668 """
669 Stage out generated files and generate an inventory
670 """
671
672 tar = os.path.join(self.topdir,'output_%d_%d.tar' % (self.dataset,self.procnum))
673 procnum = self.procnum
674 dataset = self.dataset
675 passkey = self.passkey
676
677
678
679 filelist = []
680 if self.target: filelist.append(basename(self.logoutput))
681
682
683 exclude_gunzip = map(lambda s: s.replace(".gz",""),self.excludefiles)
684 self.excludefiles.extend(exclude_gunzip)
685
686
687 suffix = ["*.root","*.i3"]
688 suffix.extend( self.suffix_in )
689 suffix_gz= map(lambda s: s+".gz",suffix)
690 suffix.extend( suffix_gz )
691 suffix_in_gz= map(lambda s: s+".gz",self.suffix_in)
692 self.suffix_in.extend( suffix_in_gz )
693 map(filelist.extend,map(glob.glob,suffix))
694 excludelist = []
695 map(excludelist.extend,map(glob.glob,self.excludefiles))
696 print 'filelist',filelist
697 print 'excludelist',excludelist
698
699
700 if self.production:
701 try:
702 tray = len(self.get_trays()) - 1
703 iter = self.get_trays()[-1]
704 self.server.ping(self.dataset,self.procnum,self.localhost,self.passkey,tray,iter)
705 except: pass
706
707 print "copying data"
708 if self.stageout and self.production:
709 self.logger.info("setting status to COPYING")
710 maxtries = 5
711 for trial in range(maxtries):
712 try:
713 self.server.copying(self.dataset,self.procnum,self.passkey)
714 break
715 except Exception, e:
716 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
717 self.logger.error(e)
718 if trial >= maxtries-1:
719 self.statusmsg.append('%s: Job was unable set status to copying' % e)
720 self.logger.error("COPYING failed")
721 self.abort(FAILED)
722 return False
723
724 inventorylist = []
725 inventory = open(os.path.join(self.topdir,_inventory),'w')
726
727 self.logger.info("generating inventory")
728 self.nocopy = self.stageout
729 if not self.nocopy:
730 for file in filelist:
731 if re.match(r'^\.',file) or file in excludelist:
732 if file not in self.suffix_in: continue
733
734 print file
735 if not (file.endswith('.gz') or file.endswith('.root')):
736 cmd ="gzip "+file+" -c > "+file+".gz"
737 print cmd
738 os.system(cmd)
739 file = file+".gz"
740 if self.mktar:
741 dir = os.path.dirname(file)
742 cmd = "tar"
743 if os.path.isdir(dir):
744 cmd += " -C%s " % dir
745 if not os.path.exists(tar):
746 cmd += " -cf"
747 else:
748 cmd += " -uf"
749 cmd += " %s %s" %(tar,basename(file))
750 print cmd
751 os.system(cmd)
752 else:
753 cmd = "mv %s %s" % (file,self.topdir)
754 print cmd
755 if not basename(file) in inventorylist:
756 inventorylist.append(basename(file))
757 print >> inventory, basename(file)
758 os.system(cmd)
759 inventory.close()
760 self.logger.info("inventory done")
761 self.logger.info("Setting i3exec runtime")
762 self.setstats("i3exec runtime",time.time()-self.starttime)
763
764 self.logger.info("production: %s" % self.production)
765 if self.production:
766 self.logger.info("Setting final job status")
767 try:
768 self.server.finish(self.dataset,self.procnum,cPickle.dumps(self.stats),self.passkey,self.stageout)
769 except Exception, e:
770 self.statusmsg.append('%s: Job was unable to set finish status' % e)
771 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
772 print e
773 self.logger.error("setting status failed")
774 self.abort(FAILED)
775 return True
776
777 - def abort(self,error,message=""):
778 self.flush_output()
779 if message:
780 self.statusmsg.append(message.decode("utf8","ignore"))
781 if self.production:
782 safe_abort(self.server.abort,
783 (self.procnum,self.dataset,error,'<br>'.join(self.statusmsg),self.passkey,cPickle.dumps(self.stats)))
784
785 - def startsoap(self,url,dataset,procnum,passkey):
786 self.localhost = functions.gethostname()
787 if url:
788 self.production = True
789 print "connecting to %s ..." % url
790 try:
791 self.server = xmlrpclib.ServerProxy(url)
792 status = self.server.start(self.localhost,dataset,procnum,passkey,self.grid)
793 dataset = status[0]
794 nproc = status[1]
795 procnum = status[2]
796
797 if len(status) > 3:
798
799 self.passkey = status[3]
800
801 print "dataset: %d, nproc: %d, procnum:%d"%(dataset,nproc,procnum)
802 if dataset < 1:
803 print "Nothing to do. Exiting"
804 return False
805 else:
806 print "processing proc %d from dataset %d" % (procnum,dataset)
807 self.dataset = dataset
808 self.nproc = nproc
809 self.procnum = procnum
810 self.passkey = passkey
811
812
813 self.opts["nproc"] = self.nproc
814 self.opts["procnum"] = self.procnum
815 self.opts["dataset"] = self.dataset
816 self.opts["key"] = self.passkey
817
818 set_sig_handler(self,procnum,dataset,passkey)
819 return True
820
821 except Exception, e:
822 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
823 print >> sys.stderr, e , "could not fetch job from server"
824 return False
825 else:
826 return True
827
828
830 print '\n\n------ begin proc %d iteration %d in tray %d -------\n' % (self.procnum,iter,tray)
831 cmd = ""
832 cmd += "%s i3exec.py --nproc=%d " % (self.python,self.nproc)
833 cmd += " --procnum=%d --dataset=%d " % (self.procnum,self.dataset)
834 cmd += " --lib=" + " --lib=".join(self.opts['lib'])
835 cmd += " --iter=%d --tray=%d " % (iter,tray)
836 cmd += " --validate=%d " % self.validate
837 cmd += " --runcfg"
838 cmd += " --fetch=%s " % self.opts['fetch']
839 if self.url:
840 cmd += " --url=%s " % self.url
841 cmd += " --key=%s " % self.passkey
842 if self.opts.has_key("zipsafe"):
843 cmd += " --zipsafe=%s " % self.opts['zipsafe']
844 cmd += " %s" % self.configfile
845 cmd += " 2>>%s" % self.err
846 cmd += " 1>>%s" % self.out
847 cmd = expandvars(cmd)
848 print cmd
849 try:
850 return os.system(cmd)
851 except Exception,e:
852 print "Caught exception: ",e
853 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
854 return 1
855
856
857
858
861 """RunInstance constructor for multipart jobs."""
862 RunInstance.__init__(self,opts)
863 self.task = None
864 self.taskname = ''
865 self.dagtemp = ''
866 self.tray = False
867 self.iter = False
868 self.protocol = ''
869 self.host = ''
870 self.basedir = ''
871
872 if self.opts.has_key("tray"):
873 self.tray = int(self.opts["tray"])
874 if self.opts.has_key("iter"):
875 self.iter = int(self.opts["iter"])
876 if self.opts.has_key("task"):
877 self.taskname = self.opts["task"]
878 if self.opts.has_key("dagtemp"):
879 self.dagtemp = self.opts["dagtemp"]
880 pos = self.dagtemp.find("://")
881 self.protocol = self.dagtemp[0:pos]
882 host_end = self.dagtemp.find("/",pos+3)
883 self.host = self.dagtemp[pos+3:host_end]
884 self.basedir = self.dagtemp[host_end:]
885
886 if self.protocol != 'gsiftp':
887
888 self.basedir = self.host + self.basedir
889 self.host = ''
890
891
892
894 """Runs a task in a multipart simulation job."""
895
896 self.task = self.steering.GetTaskDefinition(self.taskname)
897 if not self.task:
898 print "task '%s' not found for multipart job" % self.taskname
899 return TASK_NOT_FOUND
900
901 if not self.dagtemp:
902 print "no storage location specified for temporary files"
903 return TASK_CONFIGURATION_ERROR
904
905 print "multipart job enabled, executing task %s" % self.taskname
906
907 print "using %s://%s%s for temporary file storage" \
908 % (self.protocol, self.host, self.basedir)
909
910 retval = 0
911
912 msg = "Configuring log4cplus....."
913 print msg
914 try:
915 self.setlog4cplus()
916 msg += 'ok'
917 except:
918 msg += 'failed'
919 self.handle_exception(msg)
920 self.statusmsg.append(msg)
921
922 trays = self.task.GetTrays().values()
923 if not self.tray is False:
924 trays = [self.task.GetTray(self.tray)]
925
926 input_needed = True
927 for tray in trays:
928 idx = tray.GetIndex()
929 if not self.iter is False:
930 iters = [int(self.iter)]
931 else:
932 iters = tray.GetIters()
933 for iter in iters:
934 task_id = self.server_start_task(idx, iter)
935 if task_id == TASK_ERROR_ID:
936 print "Bad task ID received from server"
937 return TASK_INVALID
938
939 if input_needed:
940 self.server_copying_input(task_id)
941 msg = "Fetching input from previous tasks....."
942 retval = self.fetch_input()
943 if not self.check_retval(retval, msg):
944 self.server_abort(task_id)
945 return retval
946 input_needed = False
947
948 retval, stats = self.execute(task_id, idx, iter)
949 if retval != 0:
950 self.server_abort(task_id)
951 return retval
952 else:
953 self.server_finish_task(task_id, self.stats)
954
955 if not self.task.IsCleanup():
956 self.server_copying_output(task_id)
957 msg = "Storing intermediate output....."
958 retval = self.store_output()
959 if not self.check_retval(retval, msg):
960 self.server_abort(task_id)
961 return retval
962
963 self.server_finish_task(task_id)
964
965 if self.task.IsCleanup():
966 self.server_finish_job()
967
968 return 0
969
970 - def execute(self, task_id, tray, iter):
971 """Executes a given tray and iteration within this task."""
972 self.server_processing(task_id)
973
974 if self.task.IsCleanup():
975 return self.task_run_cleanup()
976
977 if iter == TASK_EXTERN_ITER:
978 return self.task_run_externs()
979
980 if not self.stats:
981 self.stats = {}
982
983
984 build = self.fetch_metaproject(tray, tmpstorage=self.cache)
985 print "metaproject fetched"
986
987 self.setenv(build)
988
989 print "environment setup complete"
990 msg = "%s - IceTray %d iteration %d....." % (time.strftime("%H:%M:%S"),tray,iter)
991 print msg
992
993 print '\n\n------ begin proc %d iteration %d in tray %d -------\n' \
994 % (self.procnum,iter,tray)
995 cmd = ""
996 cmd += "python -u i3exec.py --nproc=%d " % self.nproc
997 cmd += " --procnum=%d --dataset=%d " % (self.procnum,self.dataset)
998 cmd += " --lib=" + " --lib=".join(self.opts['lib'])
999 cmd += " --iter=%d --tray=%d " % (iter,tray)
1000 cmd += " --runcfg"
1001 cmd += " --dag"
1002 cmd += " --fetch=%s " % self.opts['fetch']
1003 if self.url:
1004 cmd += " --url=%s " % self.url
1005 cmd += " --key=%s " % self.passkey
1006 if self.opts.has_key("zipsafe"):
1007 cmd += " --zipsafe=%s " % self.opts['zipsafe']
1008 if self.dagtemp:
1009 cmd += " --dagtemp=%s" % self.dagtemp
1010 cmd += " %s" % self.configfile
1011 cmd = expandvars(cmd)
1012 print cmd
1013 try:
1014 retval = os.system(cmd)
1015 except Exception:
1016 self.handle_exception("error when executing iteration %d in tray %d" \
1017 % (iter, tray))
1018 retval = TASK_RUN_EXCEPTION
1019
1020 if not self.check_retval(retval, msg):
1021 return retval, self.stats
1022
1023 print "statsfile",self.statsfile
1024 if self.statsfile.endswith('.xml'):
1025 try:
1026 self.getstats(self.statsfile)
1027 except:
1028 self.handle_exception("unable to process summary file %s" \
1029 % self.statsfile)
1030
1031 return retval, self.stats
1032
1034 """Set task statistics. This version is non-cumulative."""
1035 self.stats[name] = value
1036
1037 - def startsoap(self,url,dataset,procnum,passkey):
1038 """Connects to the monitoring daemon and gets the task ID."""
1039 self.localhost = os.uname()[1]
1040 if url:
1041 self.production = True
1042 print "connecting to %s ..." % url
1043 try:
1044 self.server = xmlrpclib.ServerProxy(url)
1045
1046 dataset,nproc,procnum = self.server.multipart_job_start(dataset,procnum,passkey)
1047 print "dataset: %d, nproc: %d, procnum:%d"%(dataset,nproc,procnum)
1048 if dataset < 1:
1049 print "Nothing to do. Exiting"
1050 return False
1051 else:
1052 print "processing proc %d from dataset %d" % (procnum,dataset)
1053 self.dataset = dataset
1054 self.nproc = nproc
1055 self.procnum = procnum
1056 self.passkey = passkey
1057
1058
1059 self.opts["nproc"] = self.nproc
1060 self.opts["procnum"] = self.procnum
1061 self.opts["dataset"] = self.dataset
1062
1063 set_sig_handler(self,procnum,dataset,passkey)
1064 return True
1065 except:
1066 self.handle_exception("could not fetch job from server")
1067 return False
1068 else:
1069 return True
1070
1072 """Does nothing; dependencies will be fetched during fetch_input()."""
1073 return self.excludefiles
1074
1075
1076
1077
1078
1080 if retval == 0:
1081 if msg:
1082 msg += "ok"
1083 self.statusmsg.append(msg)
1084 print msg
1085 print "return value", retval
1086 return True
1087 else:
1088 if self.debug:
1089 self.tarwork()
1090 if msg:
1091 msg += "failed"
1092 self.statusmsg.append(msg)
1093 print msg
1094 print >> sys.stderr, msg
1095 self.statusmsg.append(tail(self.logoutput, 200))
1096 print "return value", retval
1097 return False
1098
1109
1111 try:
1112 self.server.task_abort(task_id, self.passkey)
1113 return True
1114 except:
1115 self.handle_exception("unable to abort task %d" % task_id)
1116 return False
1117
1119 try:
1120 self.server.task_finish(task_id, cPickle.dumps(stats), self.passkey)
1121 return True
1122 except:
1123 self.handle_exception("unable to mark task %d as finished" % task_id)
1124 return False
1125
1127 try:
1128 self.server.multipart_job_finish(self.dataset, self.procnum, self.passkey)
1129 return True
1130 except:
1131 self.handle_exception("unable to mark job %d.%d as finished" \
1132 % (self.dataset, self.procnum))
1133 return False
1134
1136 try:
1137 self.server.task_processing(task_id, self.passkey)
1138 return True
1139 except:
1140 self.handle_exception("unable to mark task %d as processing" % task_id)
1141 return False
1142
1150
1158
1160 msg = "%s - IceTray externs....." % time.strftime("%H:%M:%S")
1161 print msg
1162 retval, stats = self.runextern()
1163 self.check_retval(retval, msg)
1164 return retval, stats
1165
1167 """
1168 Remove the intermediate output from all parts of this multipart job.
1169
1170 This executed by the trashcan job that's added to the end of all the
1171 DAGs; it deletes the incidental files created by each node of the DAG
1172 and marks the job itself as complete.
1173 """
1174 targetdir = self.get_intermediate_directory()
1175 if self.protocol == 'gsiftp':
1176 print "cleanup will use GridFTP (%s, %s)" % (self.host, targetdir)
1177 retval = self.uberftp(self.host,"rm -r %s" % targetdir)
1178 else:
1179 print "cleanup will use local filesystem (%s)" % (targetdir)
1180 cmd = "rm -rf %s" % (targetdir)
1181 print cmd
1182 retval = os.system(cmd)
1183
1184 msg = "deleting intermediate output....."
1185 self.check_retval(retval, msg)
1186 stats = {}
1187
1188 return retval, stats
1189
1191 traceback.print_exc(file=sys.stderr)
1192 if msg:
1193 print msg
1194 print >> sys.stderr, msg
1195
1197 """Store intermediate output from this task."""
1198 return self.handle_file_manifest("output")
1199
1203
1205 """Processes a file manifest and copies the files contained therein."""
1206 name = self.task.GetName()
1207 if self.task.ParallelExecutionEnabled():
1208 name = name + "_" + str(self.iter)
1209 manifest = "iceprod.%u.%u.%s.%s" % (int(self.dataset), int(self.procnum), \
1210 name, type)
1211 print "Reading %s manifest from %s" % (type, manifest)
1212 if os.path.exists(manifest):
1213 print "%s manifest found" % type
1214 f = open(manifest, "r")
1215 size = 0
1216 nw_size = 0
1217 xfer_time = 0
1218 nw_xfer_time = 0
1219 exclude = []
1220 for line in f:
1221 pieces = line.split()
1222 photonics = False
1223 extract = True
1224 shared = False
1225 if len(pieces) == 2:
1226 source, dest = pieces
1227 else:
1228 source = pieces[0]
1229 dest = pieces[1]
1230 if pieces[2] == "photonics":
1231 photonics = True
1232 elif pieces[2] == "dontextract":
1233 extract = False
1234 elif pieces[2] == "global":
1235 shared = True
1236
1237 locked = False
1238 if type == "input":
1239
1240 if photonics:
1241 photon_dir = os.getenv("PHOTON_TABLES_DIR")
1242 if exists(photon_dir):
1243 print "skipping %s because PHOTON_TABLES_DIR (%s) exists" % (source, photon_dir)
1244 continue
1245 myputenv("PHOTON_TABLES_DIR", self.workdir)
1246
1247
1248 retval = self.fetch_checksum(source, dest)
1249 if retval != 0:
1250 print "unable to retrieve checksum, return value: %u" % int(retval)
1251 print "integrity check disabled for %s" % dest
1252 verify = retval == 0
1253 dest_checksum = dest + TASK_CHECKSUM_SUFFIX
1254
1255
1256 if shared or photonics:
1257 if not exists(self.cache):
1258 try: os.makedirs(self.cache)
1259 except: pass
1260 cache_dest = os.path.join(self.cache, dest)
1261 cache_checksum = dest + ".cache" + TASK_CHECKSUM_SUFFIX
1262 lockfile = cache_dest + ".lock"
1263 while not locked:
1264 try:
1265 os.mknod(lockfile)
1266 locked = True
1267 except OSError, oserr:
1268 print "%s %s ." % (oserr,lockfile)
1269 time.sleep(300)
1270 try:
1271 if (time.time() - os.stat(lockfile).st_mtime) > 2400:
1272 os.remove(lockfile)
1273 except: pass
1274 if verify:
1275 retval = self.generate_checksum(cache_dest, cache_checksum)
1276 if retval == 0:
1277 if checksum(dest_checksum, cache_checksum):
1278
1279 source = cache_dest
1280 else:
1281
1282 os.remove(cache_dest)
1283
1284 local_file = dest
1285 if not extract:
1286 exclude.append(expandvars("${TMPWORK}/" + dest))
1287 else:
1288
1289 retval = self.store_checksum(source,dest)
1290 if retval != 0:
1291 print "unable to store checksum, return value: %u" % int(retval)
1292 print "future tasks will be unable to check integrity for %s" % source
1293 local_file = source
1294
1295 start = time.time()
1296 retval = self.copy_file(source, dest)
1297 if retval != 0:
1298 if locked:
1299 os.remove(lockfile)
1300 return retval
1301 duration = time.time() - start
1302 file_size = os.path.getsize(expandvars("${TMPWORK}/" + local_file)) / (1024 * 1024)
1303 size += file_size
1304 xfer_time += duration
1305 if not (source.startswith("file://") or source.startswith("/")):
1306 nw_size += file_size
1307 nw_xfer_time += duration
1308
1309 if locked:
1310 os.remove(lockfile)
1311
1312 if type == "input":
1313 retval = self.verify_checksum(local_file)
1314 if retval != 0:
1315 print "integrity verification failed for %s" % local_file
1316 return retval
1317
1318 if size > 0:
1319 if xfer_time > 0:
1320 rate = size / xfer_time
1321 else:
1322 rate = size
1323 else:
1324 rate = 0
1325
1326 if nw_size > 0:
1327 if nw_xfer_time > 0:
1328 nw_rate = nw_size / nw_xfer_time
1329 else:
1330 nw_rate = nw_size
1331 else:
1332 nw_rate = 0
1333
1334 self.setstats("%s size (MB)" % type, size)
1335 self.setstats("network %s size (MB)" % type, nw_size)
1336 self.setstats("%s rate (MB/s)" % type, rate)
1337 self.setstats("network %s rate (MB/s)" % type, nw_rate)
1338 f.close()
1339
1340 if type == "input":
1341 unpack(exclude)
1342 else:
1343 print "No %s manifest found" % type
1344 return 0
1345
1348
1358
1360 cmd = "md5sum %s > %s" % (file, checksum)
1361 print cmd
1362 return os.system(cmd)
1363
1365 checksum = file + TASK_CHECKSUM_SUFFIX
1366 if not os.path.exists(checksum):
1367 print "integrity verification skipped, no checksum available for %s" % file
1368 return 0
1369 cmd = "md5sum -c %s" % checksum
1370 print cmd
1371 return os.system(cmd)
1372
1374 info = {}
1375 for type, file in {"in": source, "out": dest}.items():
1376 if not info.has_key(type):
1377 info[type] = {}
1378 pos = file.find(":")
1379 if pos == -1:
1380 file = os.path.join("file://${TMPWORK}", file)
1381 pos = 4
1382 protocol = file[:pos]
1383 info[type]["protocol"] = protocol
1384 if protocol == "file":
1385 info[type]["path"] = file[7:]
1386 elif protocol == "gsiftp" or protocol == "http":
1387 file = file[pos + 3:]
1388 pos = file.find("/")
1389
1390 info[type]["host"] = file[:pos]
1391 info[type]["path"] = file[pos:]
1392 else:
1393 print "Unknown transfer protocol: %s" % protocol
1394 return TASK_XFER_PROTO_ERROR
1395
1396 if info["out"]["protocol"] == "http" or info["out"]["protocol"] == "ftp":
1397 print "Unsupported destination protocol: %s" % info["out"]["protocol"]
1398 return TASK_XFER_CONFIG_ERROR
1399
1400 files = []
1401 for type, file in info.iteritems():
1402 url = "%s://" % file["protocol"]
1403 if file.has_key("host"):
1404 url += file["host"] + "/"
1405 url += file["path"]
1406 files.append(url)
1407
1408 print files
1409 if info["in"]["protocol"] == "gsiftp" or info["out"]["protocol"] == "gsiftp":
1410 proxy = self.steering.GetSysOpt("globus_proxy").GetValue()
1411 os.chmod(proxy,0600)
1412 globus_loc = self.steering.GetSysOpt("globus_location").GetValue()
1413 path = os.path.join(globus_loc, "bin")
1414 bin = os.path.join(path, 'globus-url-copy')
1415 certs = os.path.join(globus_loc, "certificates")
1416 lib = os.path.join(globus_loc, "lib")
1417 cmd = "LD_LIBRARY_PATH=%s:$LD_LIBRARY_PATH DYLD_LIBRARY_PATH=%s:$DYLD_LIBRARY_PATH"
1418 cmd += " X509_CERT_DIR=%s X509_USER_PROXY=%s %s -cd -r -nodcau %s %s"
1419 cmd = cmd % (lib, lib, certs, proxy, bin, files[0], files[1])
1420 elif info["in"]["protocol"] == "http" or info["in"]["protocol"] == "ftp":
1421 cmd = "wget -nv -O %s %s" % (info["out"]["path"], files[0])
1422 elif info["in"]["protocol"] == "file" and info["out"]["protocol"] == "file":
1423
1424 cmd = "mkdir -p %s" % os.path.dirname(info["out"]["path"])
1425 print cmd
1426 retval = os.system(cmd)
1427 if retval != 0:
1428 return retval
1429
1430
1431 cmd = "cp -l %s %s" % (info["in"]["path"], info["out"]["path"])
1432 print cmd
1433 retval = os.system(cmd)
1434 if retval == 0:
1435 return 0
1436
1437
1438 cmd = "cp %s %s" % (info["in"]["path"], info["out"]["path"])
1439 else:
1440 print "Unknown transfer protocol"
1441 return TASK_XFER_PROTO_ERROR
1442 cmd = expandvars(cmd)
1443
1444 print cmd
1445 return os.system(cmd)
1446
1447
1449 """Perform an FTP command on a remote host using uberftp."""
1450 proxy = self.steering.GetSysOpt("globus_proxy").GetValue()
1451 os.chmod(proxy,0600)
1452
1453 globus_loc = self.steering.GetSysOpt("globus_location").GetValue()
1454 path = os.path.join(globus_loc, "bin")
1455 bin = os.path.join(path, 'uberftp')
1456
1457 certs = os.path.join(globus_loc, "certificates")
1458
1459 cmd = 'X509_CERT_DIR=%s X509_USER_PROXY=%s %s %s "%s"' \
1460 % (certs, proxy, bin, host, cmd)
1461 print cmd
1462 return os.system(cmd)
1463
1469
1470
1471
1472
1473
1475 """
1476 install signal handler to handle errors and evictions
1477
1478 @param run: RunInstance
1479 @param procnum: process number within dataset
1480 @param dataset: dataset id
1481 @param passkey: random passkey assigned to process by daemon
1482 """
1483 def handler(signum,frame):
1484 print >> sys.stderr, "aborting"
1485 print >> sys.stderr, str(frame)
1486 run.flush_output()
1487 run.server.abort(procnum,dataset,EVICTED,'Job has been evicted.',passkey)
1488 _done = True
1489 os._exit(5)
1490 signal.signal(signal.SIGTERM, handler)
1491
1493 os.makedirs(work)
1494 os.chdir(work)
1495 myputenv('TMPWORK',work)
1496 myputenv('TMPTOP',top)
1497 cwd = os.getcwd()
1498 for file in os.listdir(top):
1499 if file.endswith(".out") or file.endswith(".err"):
1500 continue
1501 if not os.path.exists(file):
1502 os.symlink(join(top,file),file)
1503 return work
1504
1506 os.chdir(top)
1507 if _clean:
1508 print "Cleaning workspace %s" % work
1509 os.system('rm -rf %s' % work)
1510
1511 -def safe_abort(abortfunc,args,msg='',tries=4,wait=2):
1512 failures = 0
1513 while failures < tries:
1514 print "resetting job status"
1515 try:
1516 return apply(abortfunc,args)
1517 except socket.error, e:
1518 print >> sys.stderr, e,"retrying"
1519 time.sleep(wait*failures)
1520 failures += 1
1521
1522
1523 -def wget(url,dest='./'):
1524 dest = os.path.abspath(expandvars(dest))
1525 url = expandvars(url)
1526 if os.path.isdir(dest.replace('file:','')):
1527 dest = os.path.join(dest, os.path.basename(url))
1528 if not dest.startswith("file:"):
1529 dest = "file:" + dest
1530
1531 if url.startswith("http://") or url.startswith("ftp://"):
1532 dest = dest.replace("file:","")
1533 cmd = 'wget -nv --tries=4 --output-document=%s %s'%(dest,url)
1534 elif url.startswith("lfn:"):
1535 cmd = "lcg-cp --vo icecube -v %s %s" % (url,dest)
1536 elif url.startswith("file:"):
1537 src = url.replace("file:","")
1538 dest = dest.replace("file:","")
1539 cmd = 'cp %s %s'% (src,dest)
1540 elif url.startswith("gsiftp://"):
1541 globus = expandvars("$GLOBUS_LOCATION")
1542 lib = os.path.join(globus,'lib')
1543 path = os.path.join(globus,'bin')
1544 bin = os.path.join(path,'globus-url-copy')
1545 print "globus",globus
1546 os.system("/bin/ls")
1547 if not dest.startswith("file:"):
1548 dest = "file:%s" % os.path.abspath(dest)
1549 cmd = "LD_LIBRARY_PATH=%s:$LD_LIBRARY_PATH DYLD_LIBRARY_PATH=%s:$DYLD_LIBRARY_PATH"
1550 cmd += " %s -cd -r -nodcau %s %s"
1551 cmd = cmd % (lib, lib, bin, url, dest)
1552 else:
1553 raise Exception, "unsupported protocol %s" % url
1554 print cmd
1555 return os.system(cmd)
1556
1557
1559 if not exists(sum1): return False
1560 if not exists(sum2): return False
1561 return not os.system("diff %s %s" % (sum1,sum2))
1562
1564 return url.startswith("http://") \
1565 or url.startswith("ftp://") \
1566 or url.startswith("file://")
1567
1568
1576
1599
1625
1626
1628 if value == None: print name,value
1629 else: os.environ[name]=value
1630
1631
1633 print "unpacking %s..." % os.getcwd()
1634 for archive in [filename for filename in os.listdir(".") if filename.endswith(".gz") and not filename in exclude]:
1635 print "gunzip "+archive
1636 base,suffix = os.path.splitext(archive)
1637 os.system("gunzip "+archive+" -c > "+base)
1638
1639 for archive in [filename for filename in os.listdir(".") if filename.endswith(".bz2") and not filename in exclude]:
1640 print "bunzip2 "+archive
1641 os.system("bunzip2 "+archive)
1642
1643 for archive in [filename for filename in os.listdir(".") if filename.endswith(".tar") and not filename in exclude]:
1644 print "tar xf "+archive
1645 os.system("tar xf "+archive)
1646
1647 for archive in [filename for filename in os.listdir(".") if filename.endswith(".tgz") and not filename in exclude]:
1648 print "tar xzf "+archive
1649 os.system("tar xzf "+archive)
1650