1
2
3
4 """
5 A class for quequing IceTrayConfig jobs on various queueing systems.
6
7 copyright (c) 2005 the icecube collaboration
8
9 @version: $Revision: $
10 @date: $Date: $
11 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
12 @todo: Add support for other queueing systems.
13 @todo: Move system dependent stuff into corresponding modules
14 (e.g. condor.py, pbs.py)
15 """
16
17 import os, sys, re, time,string
18 import platform, getpass
19 import logging
20 import cPickle
21 import resource
22 import grid
23 import plugins
24 from job import i3Job
25 from threading import Thread
26 from os.path import expandvars,basename
27
28 import iceprod
29 from iceprod.core.metadata import *
30 from iceprod.core.functions import *
31 from iceprod.core.dataclasses import *
32 from iceprod.core.xmlparser import IceTrayXMLParser
33 from iceprod.core.xmlwriter import IceTrayXMLWriter
34 from iceprod.core.lex import ExpParser
35 from dbqueue import *
36
37 logger = logging.getLogger('i3ProdQueue')
38
41 return "Batch System is not currently supported!!!"
42
45 return "Incomplete Steering Configuration"
46
48 return path.startswith('http://') \
49 or path.startswith('ftp://') \
50 or path.startswith('gsiftp://') \
51 or path.startswith('file://')
52
54 """
55 This class interfaces IceTray with several queuing systems and is used
56 for simplifying the submission and monitoring of jobs.
57 Additionally, the information for jobs submitted can be entered into a
58 database through the class db.ConfigDB
59 """
60
62 self.config = config
63 self.queuesize = 50
64 self.i3configdb = None
65 self.i3monitordb = None
66 self.grid_id = 0
67 self.jobs_at_once = 20
68 self.rootdir = None
69 self.default_archives = []
70 self.submitter = None
71 self.institution = None
72 self.grid = self.config.get('queue','name')
73 self.maxjobs = int(self.config.get('queue','maxjobs'))
74 self.batchsys = self.config.get('queue','batchsys')
75 logger.debug("system: %s" % self.batchsys)
76 self.rundir = self.config.get('path','submitdir')
77 try:
78 self.submithost = self.config.get('monitoring','server')
79 except:
80 self.submithost = os.uname()[1]
81
82 self.ssl = True
83 if self.config.has_option('monitoring','USESSL'):
84 self.ssl = self.config.getboolean('monitoring','USESSL')
85 if self.config.has_option('security','USESSL'):
86 self.ssl = self.ssl and self.config.getboolean('security','USESSL')
87 logger.info("Monitoring server is configured with SSL?: %s" % self.ssl)
88
93
95 self.grid_id = grid_id
96
98 self.jobs_at_once = njobs
99
101 self.queuesize = maxsize
102
104 self.submithost = submithost
105
107 self.submitter = submitter
108
110 self.institution = institution
111
112
114 """
115 Create a temporary directory where the current job(s) will be
116 submitted from (or will run in depending on the queueing system)
117 """
118 if not os.path.exists(dir):
119 os.makedirs(dir)
120
122 """
123 Remove temporary directory where the current job(s) was
124 submitted from.
125 """
126 if os.path.exists(dir):
127 try:
128 os.removedirs(dir)
129 except OSError,e:
130 logger.error(e)
131
133 self.queuesize = maxsize
134
136 self.submithost = submithost
137
139 self.submitter = submitter
140
142 self.institution = institution
143
144
146 """
147 Create a temporary directory where the current job(s) will be
148 submitted from (or will run in depending on the queueing system)
149 """
150 if not os.path.exists(dir):
151 os.makedirs(dir)
152
153
155 """
156 Insert IceTrayConfig configuration to the database
157 @param steering:
158 """
159
160
161 opts = {'dataset':0,'nproc':1,'procnum':0}
162 parser = ExpParser(opts,steering)
163
164
165 if steering.GetExtra('Grid'):
166 grid = steering.GetExtra('Grid')
167 grids = map(string.strip,grid.split("|"))
168 elif steering.GetParameter('BATCHSYS'):
169 grid = steering.GetParameter('BATCHSYS')
170 grids = map(string.strip,grid.GetValue().split("|"))
171 elif steering.GetParameter('Grid'):
172 grid = steering.GetParameter('Grid')
173 grids = map(string.strip,grid.GetValue().split("|"))
174 else:
175 logger.warn('No grid specified. local grid asumed.')
176 grids = [self.grid]
177
178 offlinefilt = steering.GetOF()
179
180
181 if steering.GetExtra('Maxjobs'):
182 maxjobs = int(steering.GetExtra('Maxjobs'))
183 elif steering.GetParameter('MAXJOBS'):
184 maxjobs = steering.GetParameter('MAXJOBS')
185 maxjobs = int(parser.parse(maxjobs.GetValue()))
186 elif steering.GetOF():
187 logger.warn("processing OF filesystem. This could take a while!")
188 maxjobs = offlinefilt.Search()
189 else:
190 logger.warn("Dataset size was not specified!")
191 maxjobs = 0
192 parser.opts['nproc'] = maxjobs
193
194 self.i3configdb.connect()
195 difplus = steering.GetExtra('Metadata')
196 if difplus == None:
197 raise IncompleteConfigException, 'Missing metadata'
198
199 ticket = steering.GetExtra('Ticket')
200 if ticket == None: ticket = 0
201
202 isTemplate = (steering.GetDatasetType() == "TEMPLATE" or maxjobs == 0)
203
204 dataset_id = self.i3configdb.upload_config(steering,ticket,isTemplate,maxjobs)
205 difplus.GetPlus().SetSubCategory(steering.GetCategory())
206 prio = 0
207 try:
208 if steering.GetParameter('PRIORITY'):
209 prio = int(parser.parse(steering.GetParameter('PRIORITY').GetValue()))
210 elif steering.GetParameter('priority'):
211 prio = int(parser.parse(steering.GetParameter('priority').GetValue()))
212 except Exception,e:
213 logger.error('could not set priority: %s' % str(e) )
214 logger.info('dataset %d has priority %s' % (dataset_id,prio))
215
216 if dataset_id:
217 self.i3configdb.upload_metadata(dataset_id,difplus)
218
219
220
221
222 self.i3monitordb.InitializeGridStats(grids,dataset_id)
223
224
225 self.i3configdb.SetStorageURL(dataset_id,steering)
226
227 initial_state = 'WAITING'
228 if steering.GetJobDependencies():
229 initial_state = 'IDLE'
230 if maxjobs > 0:
231 self.i3monitordb.InitializeJobTable(maxjobs,dataset_id,prio,status=initial_state)
232 if offlinefilt:
233 self.i3configdb.load_filelist(offlinefilt.GetFileDict(),dataset_id)
234 else:
235 raise Exception, "Failed to upload dataset"
236
237 cookie = I3Cookie()
238 cookie.dataset_id = dataset_id
239
240 if self.ssl:
241 cookie.url = "https://%s:%s" % (self.config.get('server','server'),self.config.get('server','port'))
242 else:
243 cookie.url = "http://%s:%s" % (self.config.get('server','server'),self.config.get('server','port'))
244
245 status = "dataset has been enqueued with id %d" % dataset_id
246 return status,cookie
247
248
249 - def mkdataset(self,start=0,end=0,dataset=0):
260
261
262 - def Submit(self,steering=None,production=False,first=0,last=0,npid=None):
263 """
264 Submit and monitor jobs
265 """
266 starttime = time.time()
267 cookie = I3Cookie()
268 cookie.dataset_id = 0
269 difplus = DIF_Plus()
270 cluster = plugins.get_plugin(self.batchsys)()
271
272
273 nonproddb = dict()
274 fifo = self.config.getboolean('queue','fifo')
275 cluster_size = self.config.getint('queue','jobclustering')
276
277 if production:
278 self.i3monitordb.connect()
279
280 job_list = cluster.QueueJobs(self.i3monitordb,self.maxjobs,self.grid_id,self.jobs_at_once,fifo)
281
282 job_name = self.config.get('queue','jobprefix')
283 else:
284 job_name = "ipnp"
285 try:
286 import bsddb
287 nonproddb_location = self.config.get('database','non-production')
288 nonproddb = bsddb.btopen(nonproddb_location, 'c')
289 except: pass
290
291 if npid:
292 cookie = cPickle.loads(nonproddb[str(npid)])
293 elif nonproddb.keys():
294 npid = int(nonproddb.last()[0]) + 1
295 else:
296 npid = 1
297 cookie.dataset_id = npid
298
299 pmaxjobs = steering.GetParameter('MAXJOBS')
300 maxjobs = int(pmaxjobs.GetValue())
301 if last == 0: last = maxjobs
302 job_list = self.mkdataset(start=first,end=last,dataset=npid)
303 steering.AddDIFPlus(self.MakeMetadata(difplus,0,simcat="Non-Production"))
304
305
306
307
308 globus_proxy = None
309 globus_libs = None
310 if self.config.has_section('globus') and self.config.getboolean('globus','useproxy'):
311 globus_proxy = expandvars(self.config.get('globus','proxy'))
312 if self.config.has_option('globus','libs'):
313 globus_libs = self.config.get('globus','libs')
314 globus_location = self.config.get('globus','job_globus_location')
315 gpass = self.config.get('globus','passwd',raw=True)
316 runtime = self.config.getint('queue','max_job_processing_time')
317 runtime += self.config.getint('queue','max_job_idle_time')
318 logger.info("checking proxyfile...")
319 if not self.CheckProxy(globus_proxy,runtime):
320 logger.info("proxy %s will likely expire before job completes" % globus_proxy)
321 logger.info("generating new proxyfile.")
322 self.MakeProxy(globus_proxy,runtime*2,gpass)
323
324 status = ""
325 for dataset_id in job_list.keys():
326 logger.info('queueing %d jobs for dataset %d' % (len(job_list[dataset_id]),dataset_id))
327 if production:
328 steering = self.i3configdb.download_config(dataset_id)
329 difplus = self.i3configdb.download_metadata(dataset_id)
330 steering.AddDIFPlus(difplus)
331
332
333
334 cluster = plugins.get_plugin(self.batchsys)()
335
336 if not cluster:
337 logger.error("%s not yet implemented" % self.batchsys)
338 raise UnsupportedSystemException, self.batchsys
339
340 cluster.SetMonitorDB(self.i3monitordb)
341
342
343
344 if self.config.has_section('batchopts'):
345 for o in self.config.options('batchopts'):
346 cluster.AddParam(o,self.config.get('batchopts',o))
347 cluster.SetSteering(steering)
348
349 lib_url = self.config.get('path','lib_url')
350 steering.AddSysOpt(SysOpt("lib_url",lib_url))
351
352 dtdfile = os.path.join(self.GetRootDir(),"shared","iceprod.v2.dtd")
353 xmluri = 'file:iceprod.v2.dtd'
354 try:
355 xmluri = self.config.get('path','uri')
356 except: pass
357
358 cluster.AddArgOption("grid",self.grid_id)
359 cluster.AddArgOption("lib",iceprod.zipfile()+'.zip')
360 target = self.config.get('path','target_url',raw=True)
361 steering.AddSysOpt(SysOpt("targeturl",target))
362
363
364
365 stageout = self.config.getboolean('queue','jobstageout')
366 steering.AddSysOpt(SysOpt("stageout",str(stageout)))
367 cluster.AddArgOption("stageout",int(stageout))
368
369
370 zipsafe = self.config.getboolean('queue','zipsafe')
371 steering.AddSysOpt(SysOpt("zipsafe",str(zipsafe)))
372 if not zipsafe:
373 cluster.AddArgOption("zipsafe",0)
374
375
376 if self.config.has_option('path','dagtemp'):
377 dagtemp = self.config.get('path','dagtemp',raw=True)
378 if dagtemp:
379 steering.AddSysOpt(SysOpt("dagtemp",dagtemp))
380
381
382
383 nocopy = False
384 if self.config.has_option('queue','nocopy') and self.config.getboolean('queue','nocopy'):
385 nocopy = True
386 cluster.AddArgOption("nocopy",int(nocopy))
387 steering.AddSysOpt(SysOpt("nocopy",str(nocopy)))
388
389 if self.config.getboolean('system','validatexml'):
390 cluster.AddArgOption("validate",1)
391 else:
392 cluster.AddArgOption("validate",0)
393
394
395 if self.config.has_option('path','tar_output') and self.config.getboolean('path','tar_output'):
396 cluster.AddArgOption("mktar")
397
398
399 if self.config.has_option('path','cache'):
400
401 steering.AddSysOpt(SysOpt("cache",self.config.get('path','cache')))
402
403
404 cluster.AddArgOption("ping",self.config.getint('queue','ping_interval'))
405
406
407
408
409 if self.config.has_option('environment','platform'):
410 platform = self.config.get('environment','platform')
411 if platform and platform != 'system':
412 cluster.AddArgOption("platform",platform)
413 steering.AddSysOpt(SysOpt("platform",platform))
414
415
416 steering.AddSysOpt(SysOpt("gridname",self.grid))
417 steering.AddSysOpt(SysOpt("gridid",self.grid_id))
418
419
420 pyhome = self.config.get('environment','pythonhome')
421 cluster.AddEnv("PYROOT",pyhome)
422 if self.config.has_option('environment','pythonpath'):
423 pythonpath = self.config.get('environment','pythonpath')
424 cluster.AddEnv("PYTHONPATH",pythonpath)
425
426
427 if self.config.has_option('environment','rootsys'):
428 rootsys = self.config.get('environment','rootsys')
429 cluster.AddEnv("ROOTSYS",rootsys)
430 steering.AddSysOpt(SysOpt("rootsys",rootsys))
431
432
433 if self.config.has_option('environment','photontablesdir'):
434 photontablesdir = self.config.get('environment','photontablesdir')
435 photontables = os.path.join(photontablesdir,'tables')
436 cluster.AddEnv("PHOTONTABLES",photontables)
437 cluster.AddEnv("PHOTON_TABLES_DIR",photontablesdir)
438
439
440
441 for o in self.config.options('system'):
442 steering.AddSysOpt(SysOpt(o,self.config.get('system',o)))
443
444
445 corelibs = os.path.join(self.GetRootDir(),"shared",iceprod.zipfile())
446 if not os.path.exists(corelibs+'.zip'):
447 libdir = os.path.join(self.GetRootDir(),"lib")
448 iceprod.mktar(libdir,'iceprod/__init__.py',corelibs)
449 iceprod.mktar(libdir,'iceprod/core',corelibs,'a')
450 iceprod.mktar(libdir,'iceprod/modules',corelibs,'a')
451
452
453 if self.config.has_option('environment','java'):
454 javahome = self.config.get('environment','java')
455 cluster.AddEnv("JAVA_HOME",javahome)
456 steering.AddSysOpt(SysOpt("javahome",javahome))
457
458
459 if self.config.has_option('environment','scratch'):
460 scratchdir = self.config.get('environment','scratch')
461 else:
462 scratchdir = '$PWD'
463 cluster.AddEnv("I3SCRATCH",scratchdir)
464 steering.AddSysOpt(SysOpt("scratch",scratchdir))
465
466
467
468 steering.AddSysOpt(SysOpt("proxy_delegate", self.config.get('globus','delegate')))
469 if globus_proxy:
470 if self.config.getboolean('globus','delegate'):
471 cluster.AddEnv("X509_USER_PROXY",os.path.basename(globus_proxy))
472 steering.AddSysOpt(SysOpt("globus_proxy",os.path.basename(globus_proxy)))
473 else:
474 steering.AddSysOpt(SysOpt("globus_proxy",'$X509_USER_PROXY'))
475 steering.AddSysOpt(SysOpt("globus_location",globus_location))
476
477
478 if self.config.has_option('system','gridftp_local'):
479 try:
480 for p in steering.GetParameters():
481 v = p.GetValue()
482 if 'gsiftp:' in v:
483
484 pos = v.find('gsiftp')
485 pos2 = v.find('/data/sim')
486 if pos > 0 and pos2 > 0:
487 p.SetValue(v[:pos]+'file:'+v[pos2:])
488 steering.AddParameter(p)
489 elif 'http://x2100' in v:
490
491 pos = v.find('http://x2100')
492 if 'downloads' in v:
493 pos2 = v.find('downloads')
494 if pos2 > 0:
495 p.SetValue(v[:pos]+'file:/data/sim/sim-new/'+v[pos2:])
496 steering.AddParameter(p)
497 elif 'svn' in v:
498 pos2 = v.find('svn')
499 if pos2 > 0:
500 p.SetValue(v[:pos]+'file:/data/sim/sim-new/'+v[pos2:])
501 steering.AddParameter(p)
502
503 for i,t in enumerate(steering.trays):
504 for m in t.iceprodpre:
505 for p in t.iceprodpre[m].GetParameters():
506 if p.GetType() != 'string':
507 continue
508 v = p.GetValue()
509 setval = False
510 if isinstance(v,Value):
511 v = v.GetValue()
512 setval = True
513 logger.info('%r,%r,%r', p, p.GetType(), v)
514 if not isinstance(v,str):
515 logger.warning('param not a string')
516 continue
517 modify = False
518 if 'gsiftp:' in v:
519
520 pos = v.find('gsiftp')
521 pos2 = v.find('/data/sim')
522 if pos >= 0 and pos2 >= 0:
523 v = v[:pos]+'file:'+v[pos2:]
524 modify = True
525 elif 'http://x2100' in v:
526
527 pos = v.find('http://x2100')
528 if 'downloads' in v:
529 pos2 = v.find('downloads')
530 if pos2 >= 0:
531 v = v[:pos]+'file:/data/sim/sim-new/'+v[pos2:]
532 modify = True
533 elif 'svn' in v:
534 pos2 = v.find('svn')
535 if pos2 >= 0:
536 v = v[:pos]+'file:/data/sim/sim-new/'+v[pos2:]
537 modify = True
538 if modify:
539 if setval:
540 v = Value(v)
541 p.SetValue(v)
542 t.iceprodpre[m].AddParameter(p)
543 except Exception,e:
544 logger.error('error processing gridftp_local: %r',e,exc_info=True)
545
546
547 if production:
548 cluster.SetInitialdir(os.path.join(self.rundir, str(dataset_id)))
549 else:
550 cluster.SetInitialdir(os.path.join(self.rundir,'non-production',str(dataset_id)))
551 inidir = os.path.join(cluster.GetInitialdir(),'iceprod_%f' % time.time())
552 self.MakeDir(cluster.GetInitialdir())
553 xcfg = os.path.join(cluster.GetInitialdir(),'config.xml')
554 writer = IceTrayXMLWriter(steering,xmluri)
555 if self.config.has_section('substitutions'):
556 substitutions = {}
557 for o in self.config.options('substitutions'):
558 substitutions[o] =self.config.get('substitutions',o)
559 writer.AddSubstitutions(substitutions)
560 writer.write_to_file(xcfg)
561 steeringfile = xcfg
562
563
564 cachemod = starttime + 240 > time.time()
565
566
567 if self.config.has_option('path','wgetrc'):
568 wgetrc = expandvars(self.config.get('path','wgetrc',raw=True))
569 else:
570 wgetrc = os.path.join(self.GetRootDir(),"etc","wgetrc")
571 if not os.path.exists(wgetrc):
572 wgetrc = os.path.join(self.GetRootDir(),"shared","wgetrc")
573 logger.info('wgetrc '+wgetrc)
574 wgetrcoptions = []
575 try:
576 if os.path.isfile(wgetrc):
577 wgetrcfile = open(wgetrc,'r')
578 for line in wgetrcfile.readlines():
579 wgetrctmp = line.split('=')
580 if len(wgetrctmp) < 2:
581 continue
582 wgetrcfirst = wgetrctmp[0].strip().lower()
583 wgetrcsecond = wgetrctmp[1].strip()
584 if wgetrcfirst in ('http_user','httpuser'):
585 wgetrcoptions.append('http-user='+wgetrcsecond)
586 elif wgetrcfirst in ('http_passwd','httppasswd'):
587 wgetrcoptions.append('http-password='+wgetrcsecond)
588 logger.info('wgetrcoptions: '+str(wgetrcoptions))
589 wgetrcfile.close()
590 except Exception, e:
591 logger.info('wgetrc options error: %s',str(e))
592
593
594 if self.config.getboolean('queue','stage_pymods'):
595 try:
596 pymods = self.GetIPModules(steering,cluster.GetInitialdir(),cachemod,wgetrcoptions)
597 except Exception,e:
598 logger.error("failed to submit jobs: %s" % str(e))
599 if production:
600 for job in job_list[dataset_id]:
601 self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3, "failed to submit jobs "+str(e))
602 continue
603
604 for o in self.config.options('job-env'):
605 cluster.AddEnv(o.upper(),self.config.get('job-env',o))
606
607
608 logger.info("processing jobs for dataset %u" % dataset_id)
609 for job in job_list[dataset_id]:
610
611 self.check_task_dependencies(job,steering)
612
613 job.name = job_name
614 if production:
615 inidir = os.path.join(cluster.GetInitialdir(),'iceprod_%d.%f' % (job.GetProcNum(),time.time()))
616 job.SetOutputURL(target)
617 job.AddInputFile(dtdfile)
618
619
620 if self.config.getboolean('queue','stage_pymods'):
621 for p in pymods:
622 job.AddInputFile(p)
623
624 if not production:
625
626 if steering.GetParameter('LIB_URL'):
627 lib_url = steering.GetParameter('LIB_URL').GetValue()
628
629 gcdbase = cluster.GetInitialdir()
630 else:
631
632 gcdbase = os.path.join(self.rundir, str(job.GetDatasetId()))
633 gcdpath = os.path.join(gcdbase, "GeoCalibDetectorStatus_%06d.i3.gz" % job.GetDatasetId())
634 i3db_params = ""
635 opts = {
636 'dataset':job.GetDatasetId(),
637 'nproc':int(steering.GetParameter('MAXJOBS').GetValue()),
638 'procnum':job.GetProcNum()
639 }
640 parser = ExpParser(opts,steering)
641 job.AddParser(parser)
642 job.AddSteering(steering)
643 for param in steering.GetParameters():
644 if param.GetName().startswith("I3Db::"):
645 i3dbparamname = param.GetName().replace("I3Db::","")
646 i3dbval = parser.parse(param.GetValue())
647 if param.GetName() == "I3Db::outfile":
648 i3dbval = os.path.join(gcdbase,i3dbval)
649 gcdpath = i3dbval
650 i3db_params += " --%s %s" % (i3dbparamname.lower(),i3dbval)
651
652
653
654 if i3db_params and not os.path.exists(gcdpath):
655 if not self.config.has_option('path','i3dbclient'):
656 raise Exception, "i3dbclient is not configured."
657 i3db_gcd_client = self.config.get('path','i3dbclient')
658 basedir = self.config.get('path','basedir')
659 i3db_gcd_client = os.path.join(basedir, 'bin',i3db_gcd_client)
660 logger.info(i3db_gcd_client + " " + i3db_params)
661
662 if not os.path.exists(os.path.dirname(gcdpath)):
663 os.makedirs(os.path.dirname(gcdpath))
664 if os.system(i3db_gcd_client + " " + i3db_params):
665 raise Exception, "Unable to fetch GCD"
666 if i3db_params:
667 job.AddInputFile(gcdpath)
668
669 job.SetSubmitHost(self.submithost)
670 job.SetInitialdir(inidir)
671 self.MakeDir(job.GetInitialdir())
672 job.SetRootDir(self.GetRootDir())
673
674
675 job.SetExecutable(
676 os.path.join(self.GetRootDir(),"bin","i3exec.py"))
677 job.AddInputFile(corelibs+'.zip')
678
679 job.AddSteering(steering)
680 if not production:
681 url = steering.GetParameter('URL')
682 if url: cluster.SetURL(url.GetValue())
683
684 job.SetLogFile( "%s.log" % job.Prefix() )
685 job.SetOutputFile( "%s.out" % job.Prefix() )
686 job.SetErrorFile( "%s.err" % job.Prefix() )
687
688
689
690 if job.GetOutputURL() and job.GetOutputURL().startswith('auto:'):
691 job.AddArgOption("nocopy")
692
693 if self.config.getint('queue','sockettimeout'):
694 job.AddArgOption("timeout", self.config.getint('queue','sockettimeout'))
695
696 job.AddArgOption("procnum",job.GetProcNum())
697 if steering.GetParameter('DEBUG'):
698 job.AddArgOption("debug",steering.GetParameter('DEBUG').GetValue())
699
700
701 job.AddArgOption("fetch",lib_url)
702
703
704 if os.path.exists(wgetrc):
705 job.AddInputFile(wgetrc)
706 job.AddEnv("WGETRC",basename(wgetrc))
707
708 if globus_proxy and self.config.getboolean('globus','delegate'):
709 proxy_copy = basename(expandvars(globus_proxy))
710 proxy_copy = os.path.join(job.GetInitialdir(),proxy_copy)
711 copy(expandvars(globus_proxy),proxy_copy)
712 job.AddInputFile(proxy_copy)
713 if globus_libs and expandvars(globus_libs):
714 job.AddInputFile(expandvars(globus_libs))
715
716 job.AddEnv("SUBMITDIR",job.GetInitialdir())
717
718
719 if production:
720 moniurl = self.GetServerURL()
721 job.AddArgOption("url",moniurl)
722 job.AddArgOption("dataset",job.GetDatasetId())
723 steeringfile = os.path.join(job.GetInitialdir(),os.path.basename(xcfg))
724 os.system('cp %s %s' % (xcfg,steeringfile))
725 logger.info("writting %s in %s"%(os.path.basename(xcfg),job.GetInitialdir()))
726 else:
727 pmaxjobs = steering.GetParameter('MAXJOBS')
728 maxjobs = int(pmaxjobs.GetValue())
729 job.AddArgOption("nproc",maxjobs)
730
731 job.AddArgument(os.path.basename(steeringfile))
732 job.AddInputFile(steeringfile)
733
734
735 for opt in steering.GetBatchOpts():
736 logger.debug("%s = %s" % (opt.GetName(),opt.GetValue()))
737 if opt.GetType().lower() in [self.batchsys.lower(),'*',self.batchsys.split('.')[0].lower()]:
738 job.AddBatchOpt(opt.GetName(),opt.GetValue())
739
740
741 file = os.path.join(job.GetInitialdir(),'%s.%s' % (job.Prefix(),cluster.Suffix()))
742 job.SetConfigFile(file)
743
744
745 cluster.PushJob(job)
746
747
748 cluster.SetProduction(production)
749
750
751 cluster.Submit(cookie)
752
753 if self.ssl:
754 cookie.url = "https://%s:%s" % (self.config.get('server','server'),self.config.get('server','port'))
755 else:
756 cookie.url = "http://%s:%s" % (self.config.get('server','server'),self.config.get('server','port'))
757
758 if production:
759 self.i3monitordb.disconnect()
760 self.i3configdb.disconnect()
761 else:
762 try:
763 nonproddb[str(npid)] = cPickle.dumps(cookie)
764 nonproddb.sync()
765 except Exception,e:
766 logger.error(e)
767
768 status = "dataset %s:%d,%d enqueued" % (npid,first,last)
769 logger.info("Submit done")
770 return status,cookie
771
773 """
774 Check that dependencies are sane
775 """
776 task_defs = steering.GetTaskDefinitions()
777 job_id = job.GetDatabaseId()
778 for taskname,td in task_defs.items():
779 td_id = td.GetId()
780 trays = td.GetTrays()
781 for idx,tray in trays.items():
782 for iter in tray.GetIters():
783 if self.i3monitordb.task_is_finished(td_id, job_id, idx, iter):
784 for parent in td.GetParents():
785 parent_td = steering.GetTaskDefinition(parent)
786 if not self.i3monitordb.task_is_finished(parent_td.GetId(), job_id):
787 tid = self.i3monitordb.get_task_id(td_id,job_id, idx, iter)
788 if tid:
789 logger.info("Resetting task %s" % taskname )
790 self.i3monitordb.task_update_status(tid,'WAITING',job.GetArgOpt("key"))
791 continue
792
793 - def GetIPModules(self,steering,destination,cache=False,wgetrcoptions=[]):
794 """
795 Fetch any files from svn which are needed by i3.IceTray
796 """
797 pymods = []
798
799 opts = {
800 'dataset': int(steering.GetParentId()),
801 'nproc' : int(steering.GetParameter('MAXJOBS').GetValue()),
802 'procnum': 0
803 }
804 parser = ExpParser(opts,steering)
805
806
807 for dependency in steering.GetDependencies():
808 try:
809 if not isinstance(dependency,str):
810 dependency = dependency.GetPath()
811 url = parser.parse(dependency)
812 if (url.rsplit('.',1)[1]) not in ('py','c','cxx','cpp'):
813 logging.info('skipping %s',url)
814 continue
815
816 logger.info("fetching %s",url)
817 if wget(url,destination,cache,wgetrcoptions):
818 raise Exception, "cannot retrieve file from %s" %url
819 pymod = os.path.join(destination,os.path.basename(url))
820 if pymod not in pymods:
821 pymods.append(pymod)
822 except:
823 pass
824
825
826 for tray in steering.GetTrays():
827 for mod in tray.GetIceProdPres() + tray.GetIceProdPosts():
828 mclass = mod.GetClass()
829 try:
830 if mclass in ("i3.IceTray","i3.Processing"):
831 if not mod.GetParameter('IPModuleURL'):
832 raise Exception, "cannot download icetray module without a URL!!!"
833 url = parser.parse(mod.GetParameter('IPModuleURL').GetValue().value)
834
835 if mod.GetParameter('IPModuleRevision'):
836 rev = parser.parse(mod.GetParameter('IPModuleRevision').GetValue().value)
837
838 logger.info("fetching %s" %url)
839 if wget(url,destination,cache,wgetrcoptions):
840 raise Exception, "cannot retrieve file from %s" %url
841
842 pymod = os.path.join(destination,os.path.basename(url))
843 if pymod not in pymods:
844 pymods.append(pymod)
845
846 if mod.GetParameter("IPModuleDependencies"):
847 for dep in mod.GetParameter("IPModuleDependencies").GetValue():
848 depurl = parser.parse(dep.value)
849 if not isurl(depurl):
850 depurl = parser.parse(os.path.join(os.path.dirname(url),dep.value))
851 if wget(depurl,destination,cache,wgetrcoptions):
852 raise Exception, "Failed to retrieve i3filter from '%s'" % depurl
853 pymod = os.path.join(destination,os.path.basename(depurl))
854 if pymod not in pymods:
855 pymods.append(pymod)
856 except Exception,e:
857 logger.error(e)
858 raise
859
860 return pymods
861
862
864 if self.config.has_option('monitoring','url'):
865 return self.config.get('monitoring','url')
866
867 monihost = self.config.get('monitoring','server')
868 moniport = self.config.get('monitoring','port')
869
870
871
872 if self.config.has_option('monitoring','natserver'):
873 monihost = self.config.get('monitoring','natserver')
874 if self.config.has_option('monitoring','natport'):
875 moniport = self.config.get('monitoring','natport')
876
877 if self.ssl:
878 moniurl = "https://%s:%s" % (monihost,moniport)
879 else:
880 moniurl = "http://%s:%s" % (monihost,moniport)
881 return moniurl
882
894
895
897 """
898 Set the root directory where the iceprod software is stored
899 @param rootdir: directory path
900 """
901 if rootdir:
902 self.rootdir = rootdir
903
905 """
906 Get the root directory where the iceprod software is stored
907 """
908 return self.rootdir
909
911 """
912 Set the list of default archives that should be included with each
913 submission.
914 @param archive: archive or file needed by job (to be shipped with job)
915 """
916 if archive:
917 self.default_archives.append(archive)
918
919
920
942
943
945 """
946 Check validity of certificate proxy for gridFTP
947 @param proxy: path to proxy certificate
948 @param runtime: minimum validity time left on proxy
949 """
950 globusdir = self.config.get('globus','directory')
951 globuslib = os.path.join(globusdir,'lib')
952 proxy_info = self.config.get('globus','proxy-info')
953 proxy_info_opts = ''
954 if self.config.has_option('globus','proxy-info-opts'):
955 proxy_info_opts = self.config.get('globus','proxy-info-opts')
956 ldlist = os.getenv('LD_LIBRARY_PATH').split(':')
957 if globuslib not in ldlist:
958 ldlist.append(globuslib)
959 os.putenv('LD_LIBRARY_PATH',expandvars(":".join(ldlist)))
960 os.putenv('GLOBUS_LOCATION',expandvars(globusdir))
961 if os.path.exists(proxy):
962 executable = os.path.join(expandvars(globusdir),'bin',proxy_info)
963 if not os.path.exists(executable):
964 raise Exception, "executable %s does not exist" % executable
965 cmd = '%s %s -file %s -timeleft' % (executable,proxy_info_opts,proxy)
966 logger.debug(cmd)
967 output = os.popen(cmd)
968
969
970 timeleft = int(output.read().strip())/60
971 logger.info("time left on proxy is %d min." % timeleft)
972 try:
973 output.close()
974 except: pass
975 return timeleft >= runtime
976 return False
977
978
980 """
981 Check validity of certificate proxy for gridFTP
982 @param proxy: path to proxy certificate
983 @param runtime: minimum validity time left on proxy
984 """
985 globusdir = self.config.get('globus','directory')
986 proxy_init = self.config.get('globus','proxy-init')
987 proxy_init_opts = ''
988 if self.config.has_option('globus','proxy-init-opts'):
989 proxy_init_opts = self.config.get('globus','proxy-init-opts')
990 if self.config.has_option('globus','certdir'):
991 proxy_init_opts += ' -certdir %s ' % self.config.get('globus','certdir')
992 globuslib = os.path.join(globusdir,'lib')
993 ldlist = os.getenv('LD_LIBRARY_PATH').split(':')
994 if globuslib not in ldlist:
995 ldlist.append(globuslib)
996 os.putenv('LD_LIBRARY_PATH',expandvars(":".join(ldlist)))
997 os.putenv('GLOBUS_LOCATION',expandvars(globusdir))
998 executable = os.path.join(expandvars(globusdir),'bin',proxy_init)
999 cmd = '%s %s -pwstdin -valid %u:%u -out %s' % (executable,proxy_init_opts,runtime/60,runtime%60,proxy)
1000 logger.debug(cmd)
1001 try:
1002 from subprocess import Popen,PIPE
1003 except ImportError:
1004 from popen2 import Popen3
1005 p4 = Popen3(cmd,True)
1006
1007 p4.tochild.write(passwd)
1008 p4.tochild.close()
1009
1010 o = p4.fromchild.read()
1011 logger.info(o)
1012 p4.fromchild.close()
1013
1014 e = p4.childerr.read()
1015 logger.debug(e)
1016 p4.childerr.close()
1017
1018 while p4.poll() < 0: time.sleep(1)
1019 if p4.poll():
1020 logger.error(e)
1021 raise Exception, "unable to generate proxy file %s. %s returned %d" % (proxy,executable,p4.poll())
1022 del p4
1023
1024 else:
1025 p4 = Popen(cmd,stdin=PIPE,stdout=PIPE,stderr=PIPE,close_fds=True, shell=True)
1026 o,e = p4.communicate(passwd)
1027 logger.info(o)
1028 logger.debug(e)
1029 if p4.returncode:
1030 logger.error(e)
1031 raise Exception, "unable to generate proxy file %s. %s returned %d" % (proxy,executable,p4.poll())
1032 del p4
1033