1
2
3
4
5
6
7
8
9 import os,sys,time,string,re
10 import signal
11 import random
12 import getpass
13 import os.path,getopt
14 import commands
15 import platform
16 import logging
17 import glob
18 import time
19 import re
20 from os.path import expandvars,join,exists,basename,dirname,abspath
21 import iceprod.core
22 from iceprod.core.dataclasses import *
23 from iceprod.core.xmlparser import IceTrayXMLParser
24 from iceprod.core.lex import *
25 from iceprod.core import functions
26 from iceprod.core.constants import *
27 from iceprod.core.inventory import FileInventory
28
29 import xmlrpclib
30 import socket
31 import cPickle
32 from ConfigParser import SafeConfigParser
33 from ipxml import PrettyPrint
34
35 import traceback
36
37 _platform = None
38 _inventory = "sp_outfile_inventory.list"
39 _done = False
40 _clean = True
41
42 EVICTED = 1
43 FAILED = 2
44 python_version = platform.python_version()[:3]
45
46 _child = 0
47
48 _baseurl = 'http://icecube.wisc.edu/simulation/downloads'
49 _tmpstorage = '/tmp/%s_icesoft' % getpass.getuser()
50
52 if ':' in tmpstorage:
53
54 storage = None
55 for t in tmpstorage.split(':'):
56 base = dirname(t)
57 if not exists(base):
58 continue
59 if not exists(t):
60 try: os.makedirs(t)
61 except: continue
62 storage = t
63 break
64 if storage:
65 tmpstorage = storage
66 else:
67 tmpstorage = _tmpstorage
68 return tmpstorage
69
71 tmpstorage = get_tmpstorage(tmpstorage)
72
73 filename = basename(url)
74 lockfile = join(tmpstorage,"%s.lock" % filename)
75 cache = join(tmpstorage,filename)
76 mdcache = cache + '.md5sum'
77
78
79 md5url = url + '.md5sum'
80 print "retrieving %s..." % md5url
81 failed = wget(md5url)
82 if failed:
83 return wget(url)
84
85 cwd = os.getcwd()
86
87 trials = 5
88 while (not exists(cache)) or \
89 (not checksum(join(cwd,filename+'.md5sum'),mdcache)):
90
91 print cache, "does not exist. "
92 os.chdir(tmpstorage)
93 try:
94 os.mknod(lockfile)
95 except OSError,oserr:
96 print "%s %s ." % (oserr,lockfile)
97 time.sleep(300)
98 try:
99 if (time.time() - os.stat(lockfile).st_mtime) > 2400:
100 os.remove(lockfile)
101 except: pass
102 continue
103
104 os.system("rm -rf %s %s" % (cache,mdcache))
105 print "retrieving %s..." % (url)
106 failed = wget(url)
107 if failed:
108 os.remove(lockfile)
109 raise Exception, "unable to fetch file from %s" % url
110
111
112 cmd = "md5sum %s >%s" % (filename,mdcache)
113 print cmd
114 os.system(cmd)
115 os.remove(lockfile)
116 os.chdir(cwd)
117
118
119 if trials < 1:
120 raise Exception, "Max no. of trails reached to fetch '%s'" % url
121 trials -= 1
122
123 os.chdir(cwd)
124
125 if abspath(filename) != abspath(cache):
126 if exists(filename):
127 os.remove(filename)
128 os.system("ln -s %s" % cache)
129 return failed
130
131
133 tmpstorage = get_tmpstorage(tmpstorage)
134
135 lockfile = join(tmpstorage,"%s.lock" % meta.filebase)
136 cache = join(tmpstorage,meta.filebase + meta.suffix)
137 cachedir = join(tmpstorage,meta.filebase)
138
139
140 md5url = meta.url + '.md5sum'
141 print "retrieving %s..." % md5url
142 failed = wget(md5url)
143 if failed: raise Exception, "unable to fetch file from %s" % md5url
144
145 cwd = os.getcwd()
146
147 trials = 5
148 while (not exists(cache)) or \
149 (not exists(cachedir)) or \
150 (not checksum(join(cwd,meta.md5sum),join(cachedir,meta.md5sum))):
151
152 os.chdir(tmpstorage)
153 try:
154 os.mknod(lockfile)
155 except OSError,oserr:
156 print "%s %s ." % (oserr,lockfile)
157 time.sleep(300)
158 try:
159 if (time.time() - os.stat(lockfile).st_mtime) > 2400:
160 os.remove(lockfile)
161 except: pass
162 continue
163
164 os.system("rm -rf %s %s" % (cache,cachedir))
165 print "retrieving %s..." % (meta.url + meta.suffix)
166 failed = wget(meta.url+meta.suffix)
167 if failed:
168 os.remove(lockfile)
169 raise Exception, "unable to fetch file from %s" % meta.url + meta.suffix
170
171 if os.path.isdir(meta.filebase):
172 os.system("rm -rf " + meta.filebase)
173
174
175 os.mkdir(meta.filebase)
176 os.system("tar -C%s -xzf %s" % (basedir,cache))
177 os.system("md5sum %s > %s" % (meta.filebase+meta.suffix,join(cachedir,meta.md5sum)))
178 os.remove(lockfile)
179 os.chdir(cwd)
180
181 trials -= 1
182 if trials < 1:
183 raise Exception, "Max no. of trails reached to stage metaproject '%s'" % meta.filebase
184
185 os.chdir(cwd)
186 meta.path = join(tmpstorage,meta.filebase)
187 return meta
188
189
190
192
194 if not self.stats.has_key(name):
195 self.stats[name] = value
196 elif type(value) == type(float()):
197 self.stats[name] += value
198 elif type(value) == type(int()):
199 self.stats[name] += value
200 else:
201 self.stats[name] = value
202
204 if self.steering.GetTrays()[tray].GetPython():
205 self.python = self.steering.GetTrays()[tray].GetPython()
206 else:
207 self.python = sys.executable
208 if not os.path.exists(self.python):
209 meta = I3Tarball()
210 meta.platform = getplatform()
211 meta.name = self.python
212 baseurl = os.path.join(self.baseurl,'python')
213 if functions.isurl(self.python):
214 meta.name = os.path.basename(self.python)
215 baseurl = os.path.dirname(self.python)
216 meta.suffix = ".tar.gz"
217 meta.filebase = "%s.%s" % (meta.name, meta.platform)
218 meta.md5sum = "%s.md5sum" % meta.filebase
219 meta.url = "%s/%s" % (baseurl,meta.filebase)
220 meta = fetch_tarball(meta,self.cache)
221 self.python = os.path.join(meta.path,'bin','python')
222
224 global _baseurl
225 self.logger = logging.getLogger('iceprod::exe')
226
227 self.python = sys.executable
228 self.starttime = time.time()
229 self.env = {}
230 self.opts = opts
231 self.passkey = None
232 self.production = False
233 self.server = None
234 self.nproc = 1
235 self.procnum = 0
236 self.dataset = 0
237 self.configfile = 'config.xml'
238 self.statsfile = 'summary.xml'
239 self.url = None
240 self.topdir = None
241 self.workdir = "."
242 self.build = None
243 self.baseurl = _baseurl
244 self.stats = {}
245 self.statusmsg = []
246 self.excludefiles = []
247 self.debug = False
248 self.mktar = False
249 self.suffix_in = []
250 self.mpdownloaded = {}
251 self.logoutput = "icetray.log"
252 self.err = "icetray.err"
253 self.out = "icetray.out"
254 self.ping = 0
255 self.stageout = False
256 self.nocopy = False
257 self.target = None
258 self.cache = '/tmp/%s_icesoft' % getpass.getuser()
259 self.localhost = os.uname()[1]
260 self.grid = 0
261 self.socktimeout = 0
262 self.validate = 0
263
264
265 if self.opts.has_key("url"):
266 self.url = self.opts["url"]
267 self.production = True
268 if self.opts.has_key("ping"):
269 self.ping = int(self.opts["ping"])
270 if self.opts.has_key("dataset"):
271 self.dataset = int(self.opts["dataset"])
272 if self.opts.has_key("procnum"):
273 self.procnum = int(self.opts["procnum"])
274 if self.opts.has_key("nproc"):
275 self.nproc = int(self.opts["nproc"])
276 if self.opts.has_key("key"):
277 self.passkey = self.opts['key']
278 if self.opts.has_key("fetch"):
279 self.baseurl = self.opts['fetch']
280 if self.opts.has_key("mktar"):
281 self.mktar = True
282 if self.opts.has_key("debug"):
283 self.debug = int(self.opts["debug"])
284 if self.opts.has_key("target"):
285 self.target = self.opts['target']
286 if self.opts.has_key("stageout"):
287 self.stageout = int(self.opts['stageout'])
288 if self.opts.has_key("nocopy"):
289 self.nocopy = True
290 if self.opts.has_key("grid"):
291 self.grid = int(self.opts["grid"])
292 if self.opts.has_key("timeout"):
293 self.socktimeout = int(self.opts["timeout"])
294 if self.opts.has_key("validate"):
295 self.validate = int(self.opts["validate"])
296
297 if self.opts.has_key("python"):
298 self.python = self.opts["python"]
299
398
399
401 """
402 Get current environment and save it
403 """
404 self.env['PYTHONPATH'] = expandvars("$PYTHONPATH")
405 self.env['PATH'] = expandvars("$PATH")
406 self.env['LD_LIBRARY_PATH'] = expandvars("$LD_LIBRARY_PATH")
407 self.env['DYLD_LIBRARY_PATH'] = expandvars("$DYLD_LIBRARY_PATH")
408
410 """
411 Configure environment for run
412 @param i3build: working directory path
413 """
414
415 if self.build == i3build:
416 print "setenv: Using previously selected build environment in"
417 print i3build
418 return
419 self.build = i3build
420
421
422 platform = getplatform()
423 myputenv('I3_PLATFORM','')
424 if platform == 'system':
425 platform = getplatform(reset=True)
426 myputenv('I3_PLATFORM',platform)
427 myputenv('PLATFORM',platform)
428 print platform
429 print "Host info:"
430 print ' '.join(os.uname())
431 print "running in "+os.getcwd()
432 myputenv('I3_BUILD', i3build)
433 myputenv('I3_WORK', os.path.join(i3build,'projects'))
434 myputenv('I3_SRC', i3build)
435 myputenv('I3_PORTS', i3build)
436 myputenv('I3_TOOLS', i3build)
437 rootsys = os.path.join(i3build,'cernroot')
438 if os.path.exists(rootsys):
439 myputenv('ROOTSYS', rootsys)
440 elif self.steering.GetSysOpt("rootsys"):
441 rootsys = self.steering.GetSysOpt("rootsys").GetValue()
442 myputenv('ROOTSYS', expandvars(rootsys))
443 else:
444 myputenv('ROOTSYS', i3build)
445
446
447
448 javasearchpath = expandvars('$JAVA_HOME:/usr/java:/usr/local/java')
449 java = functions.findjava(javasearchpath)
450
451 ldlibpath = []
452 if java:
453 myputenv('JAVA_HOME',java[0])
454 ldlibpath.append(java[1])
455 else:
456 print >> sys.stderr,"!!!!!!!!!JAVA NOT FOUND IN %s!!!!!!!!!!!"% javasearchpath
457 print expandvars("using java in $JAVA_HOME")
458
459
460 ldlibpath.append(expandvars("$ROOTSYS/lib"))
461 ldlibpath.append(expandvars("$I3_BUILD/lib"))
462 ldlibpath.append(expandvars("$I3_BUILD/lib/tools"))
463 if os.uname()[4].endswith("64"):
464 ldlibpath.append(expandvars("/usr/lib64"))
465 ldlibpath.append(expandvars("/usr/lib"))
466 ldlibpath.extend(glob.glob(self.python.replace("bin/python","lib*")))
467 ldlibpath.append(self.env['LD_LIBRARY_PATH'])
468 myputenv('DYLD_LIBRARY_PATH', ":".join(ldlibpath))
469 myputenv('LD_LIBRARY_PATH', ":".join(ldlibpath))
470
471
472 pythonpath = []
473 pythonpath.append(expandvars("."))
474 pythonpath.append(expandvars("$I3_BUILD/lib"))
475 pythonpath.append(expandvars("$I3_BUILD/lib/python"))
476 pythonpath.append(expandvars("$I3_BUILD/lib/python/site-packages"))
477 pythonpath.extend(glob.glob(self.python.replace("bin/python","lib*/python*/site-packages")))
478 pythonpath.append(self.env['PYTHONPATH'])
479 myputenv('PYTHONPATH', ":".join(pythonpath))
480
481
482 path = []
483 path.append(expandvars("."))
484 path.append(expandvars("$I3_BUILD/bin"))
485 path.append(expandvars("$PYROOT/bin"))
486 path.append(expandvars("/bin"))
487 path.append(expandvars("/usr/bin"))
488 path.append(self.env["PATH"])
489 myputenv('PATH', ":".join(path))
490
491 myputenv('USER_CLASSPATH',expandvars("$I3_WORK/monolith-reader/resources/triggerUtil.jar"))
492 myputenv('ICETRAY_CLASSPATH',expandvars("$I3_BUILD/lib/mmc.jar"))
493
494
495 print '\n----------- environment contents ------------'
496 for envkey in os.environ.keys():
497 print envkey, ":"
498 for env_val in os.environ[envkey].split(":"):
499 print "\t", env_val
500
501
560
561
563 """
564 Download files needed from server.
565 """
566 print "fetching dependencies %s" % self.baseurl
567 for d in self.steering.GetDependencies():
568 if type(d) in types.StringTypes:
569 fetchurl = self.parser.parse(d)
570 else:
571 fetchurl = self.parser.parse(d.GetName())
572 if not isurl(fetchurl):
573 fetchurl = self.parser.parse("%s/%s" % (self.baseurl,fetchurl))
574
575 print "retrieving %s..." % fetchurl
576 failed = wget(fetchurl)
577
578 if failed:
579 raise Exception, "unable to fetch file from %s" % fetchurl
580 self.excludefiles.append(basename(fetchurl))
581 return self.excludefiles
582
583
585
586 if not os.uname()[0] == 'Linux':
587 self.logger.info("getmemusage: Not a Linux machine")
588 return
589 usage = open("/proc/self/status")
590 for line in usage.readlines():
591 if line.startswith('Vm'):
592 name,value,unit = line.split()
593 value = float(value.strip())
594 if unit == "MB":
595 value = value*1024
596 if not stats.has_key(name):
597 stats[name] = value
598 else:
599 stats[name] = max(value,stats[name])
600 usage.close()
601
603
604 if not file: file = self.statsfile
605 stats = {}
606 if not os.path.exists(file):
607 print "Unable to find statsfile", file
608 return self.stats
609 if file.endswith('.xml'):
610 stats = XMLSummaryParser().ParseFile(file)
611 else:
612 if file:
613 statsfile = open(file,'r')
614 else:
615 statsfile = open(self.statsfile,'r')
616 for line in statsfile:
617 s = line.split(':')
618 if len(s) > 1:
619 stats[s[0].strip()] = s[1].strip()
620 statsfile.close()
621
622 for key in stats.keys():
623 try:
624 self.setstats(key,float(stats[key]))
625 except:
626 self.setstats(key,stats[key])
627
628
629 return stats
630
632 workpath = dirname(self.workdir)
633 workdir = basename(self.workdir)
634 os.system("tar -C%s -czf %s/work.tgz ./%s" % (workpath,self.topdir,workdir))
635
637 externstats = '.extern.stats'
638 print '\n\n------ begin proc %d externs -------\n' % self.procnum
639 outfile = os.path.join(self.workdir,"i3exe_%d.out" % self.procnum)
640 cmd = ""
641 if os.path.exists("/usr/bin/time"):
642 cmd += "/usr/bin/time -o %s " % externstats
643 cmd += " -f \""
644 cmd += "mem_heap: %K\\n"
645 cmd += "mem_heap_peak:%M\\n"
646 cmd += "user_time:%U\\n"
647 cmd += "sys_time:%S\\n"
648 cmd += "real_time:%e\" -- "
649 cmd += "%s i3exec.py --nproc=%d " % (self.python ,self.nproc)
650 cmd += " --procnum=%d --dataset=%d " % (self.procnum,self.dataset)
651 cmd += " --fetch=%s " % self.opts['fetch']
652 cmd += " --extern=1"
653 cmd += " --lib=" + " --lib=".join(self.opts['lib'])
654 if self.url:
655 cmd += " --url=%s " % self.url
656 cmd += " --runcfg"
657 cmd += " %s" % self.configfile
658 cmd = expandvars(cmd)
659 print cmd
660 sys.stdout.flush()
661
662
663 retval = os.system(cmd)
664 stats = self.getstats(externstats)
665 return retval,stats
666
668 """
669 Extract number of trays and iterations in each tray
670 @return: array of indices indicating the number of iterations in each tray
671 """
672 iters = map(IceTrayConfig.GetIterations,self.steering.GetTrays())
673 return map(int,iters)
674
676 """
677 Configure log4cplus to write output logging
678 @return: path to log output file
679 """
680 lg4cppath = os.path.join(self.workdir,"log4cplus.conf")
681 myputenv('I3LOGGING_CONFIG', lg4cppath)
682 lg4cp = open(lg4cppath,'w')
683 print >> lg4cp, "log4cplus.appender.default=log4cplus::FileAppender"
684 print >> lg4cp, "log4cplus.appender.default.File=%s" % self.logoutput
685 print >> lg4cp, "log4cplus.appender.default.layout.ConversionPattern=%d [%d{%Z}] %-5p %c %x: %F:% L %m%n"
686 if self.debug:
687 print >> lg4cp, "log4cplus.rootLogger=INFO, default"
688 else:
689 print >> lg4cp, "log4cplus.rootLogger=WARN, default"
690 lg4cp.close()
691
692
693 handler = logging.FileHandler(self.logoutput)
694 formatter = logging.Formatter("iceprod : %(levelname)s %(name)s : %(message)s")
695 handler.setFormatter(formatter)
696 self.logger.addHandler(handler)
697
698 os.system("touch %s" % self.logoutput)
699 return self.logoutput
700
702 """
703 Cat out/err files
704 """
705 os.system("cat %s" % self.out)
706 os.system("cat %s >&2" % self.err)
707 sys.stdout.flush()
708
710 """
711 Execute each of the icetray iterations in configuration
712 @return: execution value (0 if everything went ok)
713 """
714 retval = 0
715 tray = 0
716
717 msg = "Configuring log4cplus....."
718 print msg
719 try:
720 self.setlog4cplus()
721 msg += 'ok'
722 except Exception,e:
723 msg += 'failed'
724 print >> sys.stderr, exc_type, e
725 print >> sys.stderr, msg
726 self.statusmsg.append(msg.decode("utf8","ignore"))
727
728 msg = "%s - IceTray externs....." % time.strftime("%H:%M:%S")
729 print msg
730 retval,stats = self.runextern()
731 print "return value", retval
732 if retval==0:
733 msg += 'ok'
734 self.statusmsg.append(msg.decode("utf8","ignore"))
735 else:
736 msg += 'failed'
737 self.statusmsg.append(msg.decode("utf8","ignore"))
738 if self.debug: self.tarwork()
739 return 5
740
741
742 for iterations in self.get_trays():
743
744
745 build = self.fetch_metaproject(tray,tmpstorage=self.cache)
746 os.system("touch %s" % build)
747 self.setpython(tray)
748 self.setenv(build)
749
750 for iter in range(iterations):
751 msg = "%s - IceTray %d iteration %d....." % (time.strftime("%H:%M:%S"),tray,iter)
752 print msg
753 iter_start = time.time()
754 retval = self.execute(tray,iter)
755 print "return value", retval
756 self.setstats("tray(%u,%u) exec time" % (tray,iter),time.time()-iter_start)
757 if retval==0:
758 msg += 'ok'
759 self.statusmsg.append(msg.decode("utf8","ignore"))
760 print msg
761 if self.production and (self.ping > 0) and (iter%2 == 0):
762 try:
763 self.server.ping(self.dataset,self.procnum,self.localhost,self.passkey,tray,iter+1)
764 except Exception,e:
765 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
766 else:
767 msg += 'failed'
768 self.statusmsg.append(msg.decode("utf8","ignore"))
769 self.statusmsg.append('<br>---- log4cplus ----<br>')
770 self.statusmsg.append(functions.tail(self.logoutput,100))
771 self.statusmsg.append('<br>---- stdout ----<br>')
772 self.statusmsg.append(functions.tail(self.out,200).decode("utf8","ignore"))
773 self.statusmsg.append('<br>---- stderr ----<br>')
774 self.statusmsg.append(functions.tail(self.err,350).decode("utf8","ignore"))
775 print >> sys.stderr, msg
776 if self.debug: self.tarwork()
777 return retval
778 tray +=1
779 print "statsfile",self.statsfile
780 if self.statsfile.endswith('.xml'):
781 try:
782 self.getstats(self.statsfile)
783 except Exception,e:
784 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
785 print "unable to process summary file %s" % self.statsfile
786 if self.statsfile.endswith('.xml') and os.path.exists('iceprod.'+self.statsfile):
787 try:
788 self.getstats('iceprod.'+self.statsfile)
789 except Exception,e:
790 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
791 print "unable to process summary file iceprod.%s" % self.statsfile
792
793 if self.finish():
794 return retval
795 else: return 7
796
798 """
799 Stage out generated files and generate an inventory
800 """
801
802 tar = os.path.join(self.topdir,'output_%d_%d.tar' % (self.dataset,self.procnum))
803 procnum = self.procnum
804 dataset = self.dataset
805 passkey = self.passkey
806
807
808
809 filelist = []
810 if self.target: filelist.append(basename(self.logoutput))
811
812
813 exclude_gunzip = map(lambda s: s.replace(".gz",""),self.excludefiles)
814 self.excludefiles.extend(exclude_gunzip)
815
816
817 suffix = ["*.root","*.i3"]
818 suffix.extend( self.suffix_in )
819 suffix_gz= map(lambda s: s+".gz",suffix)
820 suffix.extend( suffix_gz )
821 suffix_in_gz= map(lambda s: s+".gz",self.suffix_in)
822 self.suffix_in.extend( suffix_in_gz )
823 map(filelist.extend,map(glob.glob,suffix))
824 excludelist = []
825 map(excludelist.extend,map(glob.glob,self.excludefiles))
826 print 'filelist',filelist
827 print 'excludelist',excludelist
828
829
830 if self.production:
831 try:
832 tray = len(self.get_trays()) - 1
833 iter = self.get_trays()[-1]
834 self.server.ping(self.dataset,self.procnum,self.localhost,self.passkey,tray,iter)
835 except: pass
836
837 print "copying data"
838 if self.stageout and self.production:
839 self.logger.info("setting status to COPYING")
840 maxtries = 5
841 for trial in range(maxtries):
842 try:
843 self.server.copying(self.dataset,self.procnum,self.passkey)
844 break
845 except Exception, e:
846 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
847 self.logger.error(e)
848 if trial >= maxtries-1:
849 self.statusmsg.append('%s: Job was unable set status to copying' % e)
850 self.logger.error("COPYING failed")
851 self.abort(FAILED)
852 return False
853
854 inventorylist = []
855 inventory = open(os.path.join(self.topdir,_inventory),'w')
856
857 self.logger.info("generating inventory")
858 self.nocopy = self.stageout
859 if not self.nocopy:
860 for file in filelist:
861 if re.match(r'^\.',file) or file in excludelist:
862 if file not in self.suffix_in: continue
863
864 print file
865 if not (file.endswith('.gz') or file.endswith('.root')):
866 cmd ="gzip "+file+" -c > "+file+".gz"
867 print cmd
868 os.system(cmd)
869 file = file+".gz"
870 if self.mktar:
871 dir = os.path.dirname(file)
872 cmd = "tar"
873 if os.path.isdir(dir):
874 cmd += " -C%s " % dir
875 if not os.path.exists(tar):
876 cmd += " -cf"
877 else:
878 cmd += " -uf"
879 cmd += " %s %s" %(tar,basename(file))
880 print cmd
881 os.system(cmd)
882
883 inventory.close()
884 self.logger.info("inventory done")
885 self.logger.info("Setting i3exec runtime")
886 self.setstats("i3exec runtime",time.time()-self.starttime)
887
888 self.logger.info("production: %s" % self.production)
889 if self.production:
890 self.logger.info("Setting final job status")
891
892 try:
893 stats = {}
894 for key,value in self.stats.items():
895 self.logger.info("%s = %s" % (key,value))
896
897 if self.stats.has_key(key):
898 stats[key] = value
899 continue
900
901 for keepkey in steering.GetStatistics():
902 if re.match(keepkey,key):
903 stats[key] = value
904 break
905
906 self.server.finish(self.dataset,self.procnum,cPickle.dumps(stats),self.passkey,self.stageout)
907 except Exception, e:
908 self.statusmsg.append('%s: Job was unable to set finish status' % e)
909 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
910 print e
911 self.logger.error("setting status failed")
912 self.abort(FAILED)
913
914 xmlinventory = os.path.join(self.topdir,iceprod.core.inventory.name)
915 self.logger.info("xmlinventory %s" % xmlinventory)
916 try:
917 oi = FileInventory()
918 if os.path.exists(xmlinventory):
919 self.logger.info("xmlinventory %s exists" % xmlinventory)
920 oi.Read(xmlinventory)
921
922 oi.Write(xmlinventory)
923 for file in oi.filelist:
924 source = file['source']
925 print source
926 if source.startswith('file:'):
927 source = os.path.abspath(basename(source.replace('file:','')))
928 if os.path.exists(source):
929 cmd = "mv %s %s/" % (source,self.topdir)
930 print cmd
931 os.system(cmd)
932 self.logger.info("xmlinventory done")
933 except Exception,e:
934 self.logger.info("inventory exception: %s" % e)
935
936 return True
937
938 - def abort(self,error,message=""):
939 self.flush_output()
940 if message:
941 self.statusmsg.append(message.decode("utf8","ignore"))
942 if self.production:
943 safe_abort(self.server.abort,
944 (self.procnum,
945 self.dataset,error,'<br>'.join(self.statusmsg),
946 self.passkey,cPickle.dumps(self.stats)))
947
948 - def startsoap(self,url,dataset,procnum,passkey):
949
950 self.localhost = functions.gethostid()
951 if url and passkey:
952 self.production = True
953 print "connecting to %s ..." % url
954 try:
955 self.server = xmlrpclib.ServerProxy(url)
956 status = self.server.start(self.localhost,dataset,procnum,passkey,self.grid)
957 dataset = status[0]
958 nproc = status[1]
959 procnum = status[2]
960
961 if len(status) > 3:
962
963 self.passkey = status[3]
964
965 print "dataset: %d, nproc: %d, procnum:%d"%(dataset,nproc,procnum)
966 if dataset < 1:
967 print "Nothing to do. Exiting"
968 return False
969 else:
970 print "processing proc %d from dataset %d" % (procnum,dataset)
971 self.dataset = dataset
972 self.nproc = nproc
973 self.procnum = procnum
974 self.passkey = passkey
975
976
977 self.opts["nproc"] = self.nproc
978 self.opts["procnum"] = self.procnum
979 self.opts["dataset"] = self.dataset
980 self.opts["key"] = self.passkey
981
982 set_sig_handler(self,procnum,dataset,passkey)
983 return True
984
985 except Exception, e:
986 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
987 print >> sys.stderr, e , "could not fetch job from server"
988 return False
989 else:
990 return True
991
992
994 print '\n\n------ begin proc %d iteration %d in tray %d -------\n' % (self.procnum,iter,tray)
995 cmd = ""
996 cmd += "%s i3exec.py --nproc=%d " % (self.python,self.nproc)
997 cmd += " --procnum=%d --dataset=%d " % (self.procnum,self.dataset)
998 cmd += " --lib=" + " --lib=".join(self.opts['lib'])
999 cmd += " --iter=%d --tray=%d " % (iter,tray)
1000 cmd += " --validate=%d " % self.validate
1001 cmd += " --cache=%s " % self.cache
1002 cmd += " --runcfg"
1003 cmd += " --fetch=%s " % self.opts['fetch']
1004 if self.opts.has_key('gpu'):
1005 cmd += " --gpu=%d " % self.opts['gpu']
1006 if self.opts.has_key('run'):
1007 cmd += " --run=%d " % self.opts['run']
1008 if self.opts.has_key('subrun'):
1009 cmd += " --subrun=%d " % self.opts['subrun']
1010 if self.opts.has_key('date'):
1011 cmd += " --date=%s " % self.opts['date']
1012 if self.url:
1013 cmd += " --url=%s " % self.url
1014 cmd += " --key=%s " % self.passkey
1015 if self.opts.has_key("zipsafe"):
1016 cmd += " --zipsafe=%s " % self.opts['zipsafe']
1017 cmd += " %s" % self.configfile
1018 cmd += " 2>>%s" % self.err
1019 cmd += " 1>>%s" % self.out
1020 cmd = expandvars(cmd)
1021 print cmd
1022 try:
1023 return os.system(cmd)
1024 except Exception,e:
1025 print "Caught exception: ",e
1026 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1027 return 1
1028
1029
1030
1031
1034 """RunInstance constructor for multipart jobs."""
1035 RunInstance.__init__(self,opts)
1036 self.task = None
1037 self.taskname = ''
1038 self.dagtemp = ''
1039 self.tray = False
1040 self.iter = False
1041 self.protocol = ''
1042 self.host = ''
1043 self.basedir = ''
1044 self.abort_on_fail = True
1045
1046 if self.opts.has_key("tray"):
1047 self.tray = int(self.opts["tray"])
1048 if self.opts.has_key("abort-on-fail"):
1049 self.abort_on_fail = bool(self.opts["abort-on-fail"])
1050 if self.opts.has_key("iter"):
1051 self.iter = int(self.opts["iter"])
1052 if self.opts.has_key("task"):
1053 self.taskname = self.opts["task"]
1054 if self.opts.has_key("dagtemp"):
1055 self.dagtemp = expandvars(self.opts["dagtemp"])
1056 pos = self.dagtemp.find("://")
1057 self.protocol = self.dagtemp[0:pos]
1058 host_end = self.dagtemp.find("/",pos+3)
1059 self.host = self.dagtemp[pos+3:host_end]
1060 self.basedir = self.dagtemp[host_end:]
1061
1062 if self.protocol != 'gsiftp':
1063
1064 self.basedir = self.host + self.basedir
1065 self.host = ''
1066
1067
1068
1069 - def abort(self,error,message=""):
1070 """
1071 Handle job errors and report back to server
1072 """
1073 self.flush_output()
1074 if message:
1075 self.statusmsg.append(message.decode("utf8","ignore"))
1076 if self.production and self.abort_on_fail:
1077 safe_abort(self.server.abort,
1078 (self.procnum,
1079 self.dataset,error,'<br>'.join(self.statusmsg),
1080 self.passkey,cPickle.dumps(self.stats)))
1081
1082
1083
1085 """Runs a task in a multipart simulation job."""
1086
1087 self.task = self.steering.GetTaskDefinition(self.taskname)
1088 if not self.task:
1089 print "task '%s' not found for multipart job" % self.taskname
1090 return TASK_NOT_FOUND
1091
1092 if not self.dagtemp:
1093 print "no storage location specified for temporary files"
1094 return TASK_CONFIGURATION_ERROR
1095
1096 print "multipart job enabled, executing task %s" % self.taskname
1097
1098 print "using %s://%s%s for temporary file storage" \
1099 % (self.protocol, self.host, self.basedir)
1100
1101 if self.task.IsCleanup():
1102 tray = self.task.GetTray(0)
1103 idx = tray.GetIndex()
1104 task_id = self.server_start_task(idx, 0)
1105 if task_id != TASK_ERROR_ID:
1106 self.server_finish_task(task_id, self.stats)
1107 try:
1108 self.task_run_cleanup()
1109 except:
1110 self.handle_exception('cleaning dagtemp failed.')
1111
1112 self.server_finish_job()
1113 return 0
1114
1115 retval = 0
1116
1117 msg = "Configuring log4cplus....."
1118 print msg
1119 try:
1120 self.setlog4cplus()
1121 msg += 'ok'
1122 except:
1123 msg += 'failed'
1124 self.handle_exception(msg)
1125 self.statusmsg.append(msg.decode("utf8","ignore"))
1126
1127 trays = self.task.GetTrays().values()
1128 if not self.tray is False:
1129 trays = [self.task.GetTray(self.tray)]
1130
1131 input_needed = True
1132 taskids = []
1133 for tray in trays:
1134 idx = tray.GetIndex()
1135 if not self.iter is False:
1136 iters = [int(self.iter)]
1137 else:
1138 iters = tray.GetIters()
1139 for iter in iters:
1140 task_id = self.server_start_task(idx, iter)
1141 if task_id == TASK_ERROR_ID:
1142 print "Bad task ID received from server"
1143 return TASK_INVALID
1144 taskids.append(task_id)
1145
1146 if input_needed:
1147 self.server_copying_input(task_id)
1148 msg = "Fetching input from previous tasks....."
1149 retval = self.fetch_input()
1150 if not self.check_retval(retval, msg):
1151 self.server_abort(task_id)
1152 return retval
1153 input_needed = False
1154
1155 retval, stats = self.execute(task_id, idx, iter)
1156 if retval != 0:
1157 self.server_abort(task_id)
1158 return retval
1159
1160 if not self.task.IsCleanup():
1161 self.server_copying_output(task_id)
1162 msg = "Storing intermediate output....."
1163 retval = self.store_output()
1164 if not self.check_retval(retval, msg):
1165 self.server_abort(task_id)
1166 return retval
1167 else:
1168 for task_id in taskids:
1169 fin = self.server_finish_task(task_id, self.stats)
1170 if not fin:
1171
1172 self.server_abort(taskids[0])
1173 self.server_abort(task_id)
1174 return TASK_SERVER_ERROR
1175
1176 return 0
1177
1178 - def execute(self, task_id, tray, iter):
1179 """Executes a given tray and iteration within this task."""
1180 self.server_processing(task_id)
1181
1182 if self.task.IsCleanup():
1183 return self.task_run_cleanup()
1184
1185 if iter == TASK_EXTERN_ITER:
1186 return self.task_run_externs()
1187
1188 if not self.stats:
1189 self.stats = {}
1190
1191
1192 build = self.fetch_metaproject(tray, tmpstorage=self.cache)
1193 print "metaproject fetched"
1194
1195 self.setpython(tray)
1196 self.setenv(build)
1197 print "set python to %s" % self.python
1198
1199 print "environment setup complete"
1200 msg = "%s - IceTray %d iteration %d....." % (time.strftime("%H:%M:%S"),tray,iter)
1201 print msg
1202
1203 print '\n\n------ begin proc %d iteration %d in tray %d -------\n' \
1204 % (self.procnum,iter,tray)
1205 cmd = ""
1206 cmd += "%s i3exec.py --nproc=%d " % (self.python,self.nproc)
1207 cmd += " --procnum=%d --dataset=%d " % (self.procnum,self.dataset)
1208 cmd += " --lib=" + " --lib=".join(self.opts['lib'])
1209 cmd += " --iter=%d --tray=%d " % (iter,tray)
1210 cmd += " --cache=%s " % self.cache
1211 cmd += " --runcfg"
1212 cmd += " --dag"
1213 cmd += " --fetch=%s " % self.opts['fetch']
1214 if self.opts.has_key('run'):
1215 cmd += " --run=%d " % self.opts['run']
1216 if self.opts.has_key('date'):
1217 cmd += " --date=%s " % self.opts['date']
1218 if self.url:
1219 cmd += " --url=%s " % self.url
1220 cmd += " --key=%s " % self.passkey
1221 if self.opts.has_key("zipsafe"):
1222 cmd += " --zipsafe=%s " % self.opts['zipsafe']
1223 if self.dagtemp:
1224 cmd += " --dagtemp=%s" % self.dagtemp
1225 cmd += " %s" % self.configfile
1226 cmd += " 2>>%s" % self.err
1227 cmd += " 1>>%s" % self.out
1228 cmd = expandvars(cmd)
1229 print cmd
1230 try:
1231 retval = os.system(cmd)
1232 except Exception:
1233 self.handle_exception("error when executing iteration %d in tray %d" \
1234 % (iter, tray))
1235 retval = TASK_RUN_EXCEPTION
1236
1237 if not self.check_retval(retval, msg):
1238 return retval, self.stats
1239
1240 print "statsfile",self.statsfile
1241 if self.statsfile.endswith('.xml'):
1242 try:
1243 self.getstats(self.statsfile)
1244 except:
1245 self.handle_exception("unable to process summary file %s" \
1246 % self.statsfile)
1247 try:
1248 self.getstats("iceprod."+self.statsfile)
1249 except:
1250 self.logger.error("unable to process summary file iceprod.%s" % self.statsfile)
1251
1252 return retval, self.stats
1253
1255 """Set task statistics. This version is non-cumulative."""
1256 self.stats[name] = value
1257
1258 - def startsoap(self,url,dataset,procnum,passkey):
1259 """Connects to the monitoring daemon and gets the task ID."""
1260 self.localhost = functions.gethostid()
1261 if url and passkey:
1262 self.production = True
1263 print "connecting to %s ..." % url
1264 try:
1265 self.server = xmlrpclib.ServerProxy(url)
1266
1267 dataset,nproc,procnum = self.server.multipart_job_start(dataset,procnum,passkey)
1268 print "dataset: %d, nproc: %d, procnum:%d"%(dataset,nproc,procnum)
1269 if dataset < 1:
1270 print "Nothing to do. Exiting"
1271 return False
1272 else:
1273 print "processing proc %d from dataset %d" % (procnum,dataset)
1274 self.dataset = dataset
1275 self.nproc = nproc
1276 self.procnum = procnum
1277 self.passkey = passkey
1278
1279
1280 self.opts["nproc"] = self.nproc
1281 self.opts["procnum"] = self.procnum
1282 self.opts["dataset"] = self.dataset
1283
1284 set_sig_handler(self,procnum,dataset,passkey)
1285 return True
1286 except:
1287 self.handle_exception("could not fetch job from server")
1288 return False
1289 else:
1290 return True
1291
1293 """Does nothing; dependencies will be fetched during fetch_input()."""
1294 return self.excludefiles
1295
1296
1297
1298
1299
1301 if retval == 0:
1302 if msg:
1303 msg += "ok"
1304 self.statusmsg.append(msg.decode("utf8","ignore"))
1305 print msg
1306 print "return value", retval
1307 return True
1308 else:
1309 if self.debug:
1310 self.tarwork()
1311 if msg:
1312 msg += "failed"
1313 self.statusmsg.append(msg.decode("utf8","ignore"))
1314 print msg
1315 print >> sys.stderr, msg
1316 self.statusmsg.append(functions.tail(self.logoutput, 200).decode("utf8","ignore"))
1317
1318 print "return value", retval
1319 return False
1320
1331
1339
1348
1357
1365
1373
1381
1383 msg = "%s - IceTray externs....." % time.strftime("%H:%M:%S")
1384 print msg
1385 retval, stats = self.runextern()
1386 self.check_retval(retval, msg)
1387 return retval, stats
1388
1390 """
1391 Remove the intermediate output from all parts of this multipart job.
1392
1393 This executed by the trashcan job that's added to the end of all the
1394 DAGs; it deletes the incidental files created by each node of the DAG
1395 and marks the job itself as complete.
1396 """
1397 targetdir = self.get_intermediate_directory()
1398 if self.protocol == 'gsiftp':
1399 print "cleanup will use GridFTP (%s, %s)" % (self.host, targetdir)
1400 retval = self.uberftp(self.host,"rm -r %s" % targetdir)
1401 else:
1402 print "cleanup will use local filesystem (%s)" % (targetdir)
1403 cmd = "rm -rf %s" % (targetdir)
1404 print cmd
1405 retval = os.system(cmd)
1406
1407 msg = "deleting intermediate output....."
1408 if not self.check_retval(retval, msg):
1409 raise Exception, "Unable to delete intermediate output"
1410 stats = {}
1411
1412 return retval, stats
1413
1415 traceback.print_exc(file=sys.stderr)
1416 if msg:
1417 print msg
1418 print >> sys.stderr, msg
1419
1421 """Store intermediate output from this task."""
1422 return self.handle_file_manifest("output")
1423
1427
1429 """Processes a file manifest and copies the files contained therein."""
1430 name = self.task.GetName()
1431 if self.task.ParallelExecutionEnabled():
1432 name = name + "_" + str(self.iter)
1433 manifest = "iceprod.%u.%u.%s.%s" % (int(self.dataset), int(self.procnum), \
1434 name, type)
1435 print "Reading %s manifest from %s" % (type, manifest)
1436 if os.path.exists(manifest):
1437 print "%s manifest found" % type
1438 f = open(manifest, "r")
1439 size = 0
1440 nw_size = 0
1441 xfer_time = 0
1442 nw_xfer_time = 0
1443 exclude = []
1444 for line in f:
1445 print "handling manifest %s" % line
1446 pieces = line.split()
1447 photonics = False
1448 extract = True
1449 shared = False
1450 verify = True
1451 if len(pieces) == 2:
1452 source, dest = pieces
1453 else:
1454 source = pieces[0]
1455 dest = pieces[1]
1456 if pieces[2] == "photonics":
1457 photonics = True
1458 elif pieces[2] == "dontextract":
1459 extract = False
1460 elif pieces[2] == "global":
1461 shared = True
1462 print "pieces[2]=%s"%(pieces[2])
1463 print "src=%s | dest=%s"%(source,dest)
1464
1465 locked = False
1466 if type == "input":
1467
1468 if photonics:
1469 photon_dir = os.getenv("PHOTON_TABLES_DIR")
1470 if exists(photon_dir):
1471 print "skipping %s because PHOTON_TABLES_DIR (%s) exists" % (source, photon_dir)
1472 continue
1473 myputenv("PHOTON_TABLES_DIR", self.workdir)
1474
1475
1476 retval = self.fetch_checksum(source, dest)
1477 if retval != 0:
1478 print "unable to retrieve checksum, return value: %u" % int(retval)
1479 print "integrity check disabled for %s" % dest
1480 verify = retval == 0
1481 dest_checksum = dest + TASK_CHECKSUM_SUFFIX
1482
1483
1484 if shared or photonics:
1485 if not exists(self.cache):
1486 try: os.makedirs(self.cache)
1487 except: pass
1488 cache_dest = os.path.join(self.cache, dest)
1489 cache_checksum = dest + ".cache" + TASK_CHECKSUM_SUFFIX
1490 lockfile = cache_dest + ".lock"
1491 while not locked:
1492 try:
1493 os.mknod(lockfile)
1494 locked = True
1495 except OSError, oserr:
1496 print "%s %s ." % (oserr,lockfile)
1497 time.sleep(300)
1498 try:
1499 if (time.time() - os.stat(lockfile).st_mtime) > 2400:
1500 os.remove(lockfile)
1501 except: pass
1502 if verify:
1503 retval = self.generate_checksum(cache_dest, cache_checksum)
1504 if retval == 0:
1505 if checksum(dest_checksum, cache_checksum):
1506
1507 source = cache_dest
1508 else:
1509
1510 os.remove(cache_dest)
1511
1512 local_file = dest
1513 if not extract:
1514 exclude.append(expandvars("${TMPWORK}/" + dest))
1515 else:
1516
1517 retval = self.store_checksum(source,dest)
1518 if retval != 0:
1519 print "unable to store checksum, return value: %u" % int(retval)
1520 print "future tasks will be unable to check integrity for %s" % source
1521 local_file = source
1522
1523 start = time.time()
1524 retval = self.copy_file(source, dest)
1525 if retval != 0:
1526 if locked:
1527 os.remove(lockfile)
1528 return retval
1529 duration = time.time() - start
1530 file_size = os.path.getsize(expandvars("${TMPWORK}/" + local_file)) / (1024 * 1024)
1531 size += file_size
1532 xfer_time += duration
1533 if not (source.startswith("file:") or source.startswith("/")):
1534 nw_size += file_size
1535 nw_xfer_time += duration
1536
1537 if locked:
1538 os.remove(lockfile)
1539
1540 if type == "input" and verify:
1541 retval = self.verify_checksum(local_file)
1542 if retval != 0:
1543 print "integrity verification failed for %s" % local_file
1544 return retval
1545
1546 if size > 0:
1547 if xfer_time > 0:
1548 rate = size / xfer_time
1549 else:
1550 rate = size
1551 else:
1552 rate = 0
1553
1554 if nw_size > 0:
1555 if nw_xfer_time > 0:
1556 nw_rate = nw_size / nw_xfer_time
1557 else:
1558 nw_rate = nw_size
1559 else:
1560 nw_rate = 0
1561
1562 self.setstats("%s size (MB)" % type, size)
1563 self.setstats("network %s size (MB)" % type, nw_size)
1564 self.setstats("%s rate (MB/s)" % type, rate)
1565 self.setstats("network %s rate (MB/s)" % type, nw_rate)
1566 f.close()
1567
1568 if type == "input":
1569 unpack(exclude)
1570 else:
1571 print "No %s manifest found" % type
1572 return 0
1573
1576
1586
1588 cmd = "md5sum %s > %s" % (file, checksum)
1589 print cmd
1590 return os.system(cmd)
1591
1593 checksum = file + TASK_CHECKSUM_SUFFIX
1594 if not os.path.exists(checksum):
1595 print "integrity verification skipped, no checksum available for %s" % file
1596 return 0
1597 cmd = "md5sum -c %s" % checksum
1598 print cmd
1599 return os.system(cmd)
1600
1602 info = {}
1603 for type, file in {"in": source, "out": dest}.items():
1604 if not info.has_key(type):
1605 info[type] = {}
1606 pos = file.find(":")
1607 if pos == -1:
1608 file = os.path.join("file:${TMPWORK}", file)
1609 pos = 4
1610 info[type]['file'] = file
1611 protocol = file[:pos]
1612 info[type]["protocol"] = protocol
1613 if protocol == "file":
1614 info[type]["path"] = file[5:]
1615 elif protocol == "gsiftp" or protocol == "http":
1616 file = file[pos + 3:]
1617 pos = file.find("/")
1618
1619 info[type]["host"] = file[:pos]
1620 info[type]["path"] = file[pos:]
1621 else:
1622 print "Unknown transfer protocol: %s" % protocol
1623 return TASK_XFER_PROTO_ERROR
1624
1625 if info["out"]["protocol"] == "http" or info["out"]["protocol"] == "ftp":
1626 print "Unsupported destination protocol: %s" % info["out"]["protocol"]
1627 return TASK_XFER_CONFIG_ERROR
1628
1629 files = []
1630 for type, file in info.iteritems():
1631 url = "%s://" % file["protocol"]
1632 if file.has_key("host"):
1633 url += file["host"] + "/"
1634 url += file["path"]
1635 url = file['file']
1636 files.append(url)
1637
1638 print files
1639 if info["in"]["protocol"] == "gsiftp" or info["out"]["protocol"] == "gsiftp":
1640 proxy = self.steering.GetSysOpt("globus_proxy").GetValue()
1641 os.chmod(proxy,0600)
1642 globus_loc = self.steering.GetSysOpt("globus_location").GetValue()
1643 path = os.path.join(globus_loc, "bin")
1644 bin = os.path.join(path, 'globus-url-copy')
1645 certs = os.path.join(globus_loc, "certificates")
1646 lib = os.path.join(globus_loc, "lib")
1647 cmd = "LD_LIBRARY_PATH=%s:$LD_LIBRARY_PATH DYLD_LIBRARY_PATH=%s:$DYLD_LIBRARY_PATH"
1648 cmd += " X509_CERT_DIR=%s X509_USER_PROXY=%s %s -cd -r -nodcau %s %s"
1649 cmd = cmd % (lib, lib, certs, proxy, bin, files[0], files[1])
1650 elif info["in"]["protocol"] == "http" or info["in"]["protocol"] == "ftp":
1651 cmd = "wget -nv -O %s %s" % (info["out"]["path"], files[0])
1652 elif info["in"]["protocol"] == "file" and info["out"]["protocol"] == "file":
1653
1654 if not os.path.exists(info["in"]["path"]):
1655 print "file %s does not exits!!!!!" % info["in"]["path"]
1656
1657 cmd = "mkdir -p %s" % os.path.dirname(info["out"]["path"])
1658 print cmd
1659 retval = os.system(cmd)
1660 if retval != 0:
1661 return retval
1662
1663
1664 cmd = "cp %s %s" % (info["in"]["path"], info["out"]["path"])
1665 else:
1666 print "Unknown transfer protocol"
1667 return TASK_XFER_PROTO_ERROR
1668 cmd = expandvars(cmd)
1669
1670 print cmd
1671 return os.system(cmd)
1672
1673
1675 """Perform an FTP command on a remote host using uberftp."""
1676 proxy = self.steering.GetSysOpt("globus_proxy").GetValue()
1677 os.chmod(proxy,0600)
1678
1679 globus_loc = self.steering.GetSysOpt("globus_location").GetValue()
1680 path = os.path.join(globus_loc, "bin")
1681 bin = os.path.join(path, 'uberftp')
1682
1683 certs = os.path.join(globus_loc, "certificates")
1684
1685 cmd = 'X509_CERT_DIR=%s X509_USER_PROXY=%s %s %s "%s"' \
1686 % (certs, proxy, bin, host, cmd)
1687 print cmd
1688 return os.system(cmd)
1689
1695
1696
1697
1698
1699
1701 """
1702 install signal handler to handle errors and evictions
1703
1704 @param run: RunInstance
1705 @param procnum: process number within dataset
1706 @param dataset: dataset id
1707 @param passkey: random passkey assigned to process by daemon
1708 """
1709 def handler(signum,frame):
1710 print >> sys.stderr, "aborting"
1711 print >> sys.stderr, str(frame)
1712 run.flush_output()
1713 run.server.abort(procnum,dataset,EVICTED,'Job has been evicted.',passkey)
1714 _done = True
1715 os._exit(5)
1716 signal.signal(signal.SIGTERM, handler)
1717
1719 os.makedirs(work)
1720 os.chdir(work)
1721 myputenv('TMPWORK',work)
1722 myputenv('TMPTOP',top)
1723 cwd = os.getcwd()
1724 for file in os.listdir(top):
1725 if file.endswith(".out") or file.endswith(".err"):
1726 continue
1727 if not os.path.exists(file):
1728 os.symlink(join(top,file),file)
1729 return work
1730
1732 os.chdir(top)
1733 if _clean:
1734 print "Cleaning workspace %s" % work
1735 os.system('rm -rf %s' % work)
1736
1737 -def safe_abort(abortfunc,args,msg='',tries=4,wait=2):
1738 failures = 0
1739 while failures < tries:
1740 print "resetting job status"
1741 try:
1742 return apply(abortfunc,args)
1743 except socket.error, e:
1744 print >> sys.stderr, e,"retrying"
1745 time.sleep(wait*failures)
1746 failures += 1
1747
1748
1749 -def wget(url,dest='./'):
1750 dest = os.path.abspath(expandvars(dest))
1751 url = expandvars(url)
1752 if os.path.isdir(dest.replace('file:','')):
1753 dest = os.path.join(dest, os.path.basename(url))
1754 if dest.startswith("file:"):
1755 destpath = dest[5:]
1756 else:
1757 destpath = dest
1758 dest = "file:" + dest
1759 if os.path.exists(destpath):
1760 print "wget: file already downloaded"
1761 return 0
1762 updest = os.path.join(os.environ['I3_TOPDIR'],os.path.basename(dest))
1763 if os.path.exists(updest):
1764 print "file was sent with job"
1765 try:
1766 os.symlink(updest,destpath)
1767 except Exception, e:
1768 print 'symlink exception: %s'%str(e)
1769 else:
1770 return 0
1771
1772 if url.startswith("http://") or url.startswith("ftp://"):
1773 dest = dest.replace("file:","")
1774 cmd = 'wget -nv --tries=4 --output-document=%s %s'%(dest,url)
1775 elif url.startswith("lfn:"):
1776 cmd = "lcg-cp --vo icecube -v %s %s" % (url,dest)
1777 elif url.startswith("file:"):
1778 src = url.replace("file:","")
1779 dest = dest.replace("file:","")
1780 if not os.path.isfile(src):
1781 print "wget: not a file %s" % src
1782 return 2
1783 cmd = 'cp %s %s'% (src,dest)
1784 elif url.startswith("gsiftp://"):
1785 globus = expandvars("$GLOBUS_LOCATION")
1786 lib = os.path.join(globus,'lib')
1787 path = os.path.join(globus,'bin')
1788 bin = os.path.join(path,'globus-url-copy')
1789 print "globus",globus
1790 os.system("/bin/ls")
1791 if not dest.startswith("file:"):
1792 dest = "file:%s" % os.path.abspath(dest)
1793 cmd = "LD_LIBRARY_PATH=%s:$LD_LIBRARY_PATH DYLD_LIBRARY_PATH=%s:$DYLD_LIBRARY_PATH"
1794 cmd += " %s -cd -r -nodcau %s %s"
1795 cmd = cmd % (lib, lib, bin, url, dest)
1796 else:
1797 raise Exception, "unsupported protocol %s" % url
1798 print cmd
1799 return os.system(cmd)
1800
1801
1803 if not exists(sum1): return False
1804 if not exists(sum2): return False
1805 if os.system("diff %s %s" % (sum1,sum2)):
1806 print "sums differ"
1807 return False
1808 else:
1809 print "sums match"
1810 return True
1811
1813 return url.startswith("http://") \
1814 or url.startswith("ftp://") \
1815 or url.startswith("file:")
1816
1817
1825
1826
1849
1850
1852 if value == None: print name,value
1853 else: os.environ[name]=value
1854
1855
1857 print "unpacking %s..." % os.getcwd()
1858
1859
1860
1861
1862
1863 for archive in [filename for filename in os.listdir(".") if filename.endswith(".bz2") and not filename in exclude]:
1864 print "bunzip2 "+archive
1865 os.system("bunzip2 -k "+archive)
1866
1867 for archive in [filename for filename in os.listdir(".") if filename.endswith(".tar") and not filename in exclude]:
1868 print "tar xf "+archive
1869 os.system("tar xf "+archive)
1870
1871 for archive in [filename for filename in os.listdir(".") if filename.endswith(".tgz") and not filename in exclude]:
1872 print "tar xzf "+archive
1873 os.system("tar xzf "+archive)
1874
1875 for archive in [filename for filename in os.listdir(".") if filename.endswith(".tar.gz") and not filename in exclude]:
1876 print "tar xzf "+archive
1877 os.system("tar xzf "+archive)
1878