1
2
3
4 """
5 A class for quequing IceTrayConfig jobs on various queueing systems.
6
7 copyright (c) 2005 the icecube collaboration
8
9 @version: $Revision: $
10 @date: $Date: $
11 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
12 @todo: Add support for other queueing systems.
13 @todo: Move system dependent stuff into corresponding modules
14 (e.g. condor.py, pbs.py)
15 """
16
17 import os, sys, re, time,string
18 import platform, getpass
19 import logging
20 import cPickle
21 import resource
22 import grid
23 import plugins
24 from job import i3Job
25 from threading import Thread
26 from os.path import expandvars,basename
27
28 import iceprod
29 from iceprod.core.metadata import *
30 from iceprod.core.functions import *
31 from iceprod.core.dataclasses import *
32 from iceprod.core.xmlparser import IceTrayXMLParser
33 from iceprod.core.xmlwriter import IceTrayXMLWriter
34 from iceprod.core.lex import ExpParser
35 from dbqueue import *
36
37 logger = logging.getLogger('i3ProdQueue')
38
41 return "Batch System is not currently supported!!!"
42
45 return "Incomplete Steering Configuration"
46
48 return path.startswith('http://') \
49 or path.startswith('ftp://') \
50 or path.startswith('gsiftp://') \
51 or path.startswith('file://')
52
54 """
55 This class interfaces IceTray with several queuing systems and is used
56 for simplifying the submission and monitoring of jobs.
57 Additionally, the information for jobs submitted can be entered into a
58 database through the class db.ConfigDB
59 """
60
62 self.config = config
63 self.queuesize = 50
64 self.i3configdb = None
65 self.i3monitordb = None
66 self.grid_id = 0
67 self.jobs_at_once = 20
68 self.rootdir = None
69 self.default_archives = []
70 self.submitter = None
71 self.institution = None
72 self.grid = self.config.get('queue','name')
73 self.maxjobs = int(self.config.get('queue','maxjobs'))
74 self.batchsys = self.config.get('queue','batchsys')
75 logger.debug("system: %s" % self.batchsys)
76 self.rundir = self.config.get('path','submitdir')
77 try:
78 self.submithost = self.config.get('monitoring','server')
79 except:
80 self.submithost = os.uname()[1]
81
82 self.ssl = True
83 if self.config.has_option('monitoring','USESSL'):
84 self.ssl = self.config.getboolean('monitoring','USESSL')
85 if self.config.has_option('security','USESSL'):
86 self.ssl = self.ssl and self.config.getboolean('security','USESSL')
87 logger.info("Monitoring server is configured with SSL?: %s" % self.ssl)
88
93
95 self.grid_id = grid_id
96
98 self.jobs_at_once = njobs
99
101 self.queuesize = maxsize
102
104 self.submithost = submithost
105
107 self.submitter = submitter
108
110 self.institution = institution
111
112
114 """
115 Create a temporary directory where the current job(s) will be
116 submitted from (or will run in depending on the queueing system)
117 """
118 if not os.path.exists(dir):
119 os.makedirs(dir)
120
122 """
123 Remove temporary directory where the current job(s) was
124 submitted from.
125 """
126 if os.path.exists(dir):
127 try:
128 removedirs(dir)
129 except OSError,e:
130 logger.error(e)
131
133 self.queuesize = maxsize
134
136 self.submithost = submithost
137
139 self.submitter = submitter
140
142 self.institution = institution
143
144
146 """
147 Create a temporary directory where the current job(s) will be
148 submitted from (or will run in depending on the queueing system)
149 """
150 if not os.path.exists(dir):
151 os.makedirs(dir)
152
153
155 """
156 Insert IceTrayConfig configuration to the database
157 @param steering:
158 """
159
160
161 opts = {'dataset':0,'nproc':1,'procnum':0}
162 parser = ExpParser(opts,steering)
163
164
165 if steering.GetExtra('Grid'):
166 grid = steering.GetExtra('Grid')
167 grids = map(string.strip,grid.split("|"))
168 elif steering.GetParameter('BATCHSYS'):
169 grid = steering.GetParameter('BATCHSYS')
170 grids = map(string.strip,grid.GetValue().split("|"))
171 elif steering.GetParameter('Grid'):
172 grid = steering.GetParameter('Grid')
173 grids = map(string.strip,grid.GetValue().split("|"))
174 else:
175 logger.warn('No grid specified. local grid asumed.')
176 grids = [self.grid]
177
178 offlinefilt = steering.GetOF()
179
180
181 if steering.GetExtra('Maxjobs'):
182 maxjobs = int(steering.GetExtra('Maxjobs'))
183 steering.AddParameter(Parameter("int","MAXJOBS",maxjobs))
184 elif steering.GetParameter('MAXJOBS'):
185 maxjobs = steering.GetParameter('MAXJOBS')
186 maxjobs = int(parser.parse(maxjobs.GetValue()))
187 elif steering.GetOF():
188 logger.warn("processing OF filesystem. This could take a while!")
189 maxjobs = offlinefilt.Search()
190 else:
191 raise Exception, "Dataset size was not specified!"
192 parser.opts['nproc'] = maxjobs
193
194 self.i3configdb.connect()
195 difplus = steering.GetExtra('Metadata')
196 if difplus == None:
197 raise IncompleteConfigException, 'Missing metadata'
198
199 ticket = steering.GetExtra('Ticket')
200 if ticket == None: ticket = 0
201
202 isTemplate = (steering.GetDatasetType() == "TEMPLATE" or maxjobs == 0)
203
204 dataset_id = self.i3configdb.upload_config(steering,ticket,isTemplate,maxjobs)
205 difplus.GetPlus().SetSubCategory(steering.GetCategory())
206 prio = 0
207 try:
208 if steering.GetParameter('PRIORITY'):
209 prio = int(parser.parse(steering.GetParameter('PRIORITY').GetValue()))
210 elif steering.GetParameter('priority'):
211 prio = int(parser.parse(steering.GetParameter('priority').GetValue()))
212 except Exception,e:
213 logger.error('could not set priority: %s' % str(e) )
214 logger.info('dataset %d has priority %s' % (dataset_id,prio))
215
216 if dataset_id:
217 self.i3configdb.upload_metadata(dataset_id,difplus)
218 self.i3monitordb.InitializeGridStats(grids,dataset_id)
219
220
221 self.i3configdb.SetStorageURL(dataset_id,steering)
222
223 if maxjobs > 0:
224 self.i3monitordb.InitializeJobTable(maxjobs,dataset_id,prio)
225 if offlinefilt:
226 self.i3configdb.load_filelist(offlinefilt.GetFileDict(),dataset_id)
227 else:
228 raise Exception, "Failed to upload dataset"
229
230 cookie = I3Cookie()
231 cookie.dataset_id = dataset_id
232
233 if self.ssl:
234 cookie.url = "https://%s:%s" % (self.config.get('server','server'),self.config.get('server','port'))
235 else:
236 cookie.url = "http://%s:%s" % (self.config.get('server','server'),self.config.get('server','port'))
237
238 status = "dataset has been enqueued with id %d" % dataset_id
239 return status,cookie
240
241
242 - def mkdataset(self,start=0,end=0,dataset=0):
253
254
255 - def Submit(self,steering=None,production=False,first=0,last=0,npid=None):
256 """
257 Submit and monitor jobs
258 """
259 starttime = time.time()
260 cookie = I3Cookie()
261 cookie.dataset_id = 0
262 difplus = DIF_Plus()
263 cluster = plugins.get_plugin(self.batchsys)()
264 nonproddb = dict()
265 fifo = self.config.getboolean('queue','fifo')
266
267 if production:
268 self.i3monitordb.connect()
269 job_list = self.i3monitordb.QueueJobs(self.maxjobs,self.grid_id,self.jobs_at_once,fifo)
270 job_name = self.config.get('queue','jobprefix')
271 else:
272 job_name = "ipnp"
273 try:
274 import bsddb
275 nonproddb_location = self.config.get('database','non-production')
276 nonproddb = bsddb.btopen(nonproddb_location, 'c')
277 except: pass
278
279 if npid:
280 cookie = cPickle.loads(nonproddb[str(npid)])
281 elif nonproddb.keys():
282 npid = int(nonproddb.last()[0]) + 1
283 else:
284 npid = 1
285
286 pmaxjobs = steering.GetParameter('MAXJOBS')
287 maxjobs = int(pmaxjobs.GetValue())
288 if last == 0: last = maxjobs
289 job_list = self.mkdataset(start=first,end=last,dataset=npid)
290 steering.AddDIFPlus(self.MakeMetadata(difplus,0,simcat="Non-Production"))
291
292
293
294
295 globus_proxy = None
296 globus_libs = None
297 if self.config.has_section('globus') and self.config.getboolean('globus','useproxy'):
298 globus_proxy = expandvars(self.config.get('globus','proxy'))
299 if self.config.has_option('globus','libs'):
300 globus_libs = self.config.get('globus','libs')
301 globus_location = self.config.get('globus','job_globus_location')
302 gpass = self.config.get('globus','passwd',raw=True)
303 runtime = self.config.getint('queue','max_job_processing_time')
304 runtime += self.config.getint('queue','max_job_idle_time')
305 logger.info("checking proxyfile...")
306 if not self.CheckProxy(globus_proxy,runtime):
307 logger.info("proxy %s will likely expire before job completes" % globus_proxy)
308 logger.info("generating new proxyfile.")
309 self.MakeProxy(globus_proxy,runtime*2,gpass)
310
311 status = ""
312 for dataset_id in job_list.keys():
313 logger.info('queueing %d jobs for dataset %d' % (len(job_list[dataset_id]),dataset_id))
314 if production:
315 steering = self.i3configdb.download_config(dataset_id)
316 difplus = self.i3configdb.download_metadata(dataset_id)
317 steering.AddDIFPlus(difplus)
318
319
320
321 cluster = plugins.get_plugin(self.batchsys)()
322 if not cluster:
323 logger.error("%s not yet implemented" % self.batchsys)
324 raise UnsupportedSystemException, self.batchsys
325
326 cluster.SetMonitorDB(self.i3monitordb)
327
328
329
330 if self.config.has_section('batchopts'):
331 for o in self.config.options('batchopts'):
332 cluster.AddParam(o,self.config.get('batchopts',o))
333 cluster.SetSteering(steering)
334
335 lib_url = self.config.get('path','lib_url')
336 steering.AddSysOpt(SysOpt("lib_url",lib_url))
337
338 dtdfile = os.path.join(self.GetRootDir(),"shared","icetray.v2.dtd")
339 xmluri = 'file:icetray.v2.dtd'
340 try:
341 xmluri = self.config.get('path','uri')
342 except: pass
343
344 cluster.AddArgOption("grid",self.grid_id)
345 cluster.AddArgOption("lib",iceprod.zipfile()+'.zip')
346 target = self.config.get('path','target_url',raw=True)
347 steering.AddSysOpt(SysOpt("targeturl",target))
348
349
350
351 stageout = self.config.getboolean('queue','jobstageout')
352 steering.AddSysOpt(SysOpt("stageout",str(stageout)))
353 cluster.AddArgOption("stageout",int(stageout))
354
355
356 zipsafe = self.config.getboolean('queue','zipsafe')
357 steering.AddSysOpt(SysOpt("zipsafe",str(zipsafe)))
358 if not zipsafe:
359 cluster.AddArgOption("zipsafe",0)
360
361
362 if self.config.has_option('path','dagtemp'):
363 dagtemp = self.config.get('path','dagtemp',raw=True)
364 if dagtemp:
365 steering.AddSysOpt(SysOpt("dagtemp",dagtemp))
366 cluster.AddArgOption("dagtemp",dagtemp)
367
368
369 nocopy = False
370 if self.config.has_option('queue','nocopy') and self.config.getboolean('queue','nocopy'):
371 nocopy = True
372 cluster.AddArgOption("nocopy",int(nocopy))
373 steering.AddSysOpt(SysOpt("nocopy",str(nocopy)))
374
375 if self.config.getboolean('system','validatexml'):
376 cluster.AddArgOption("validate",1)
377 else:
378 cluster.AddArgOption("validate",0)
379
380
381 if self.config.has_option('path','tar_output') and self.config.getboolean('path','tar_output'):
382 cluster.AddArgOption("mktar")
383
384
385 if self.config.has_option('path','cache'):
386
387 steering.AddSysOpt(SysOpt("cache",self.config.get('path','cache')))
388
389
390 cluster.AddArgOption("ping",self.config.getint('queue','ping_interval'))
391
392
393
394
395 if self.config.has_option('environment','platform'):
396 platform = self.config.get('environment','platform')
397 if platform and platform != 'system':
398 cluster.AddArgOption("platform",platform)
399 steering.AddSysOpt(SysOpt("platform",platform))
400
401
402 steering.AddSysOpt(SysOpt("gridname",self.grid))
403 steering.AddSysOpt(SysOpt("gridid",self.grid_id))
404
405
406 pyhome = self.config.get('environment','pythonhome')
407 cluster.AddEnv("PYROOT",pyhome)
408 if self.config.has_option('environment','pythonpath'):
409 pythonpath = self.config.get('environment','pythonpath')
410 cluster.AddEnv("PYTHONPATH",pythonpath)
411
412
413 if self.config.has_option('environment','rootsys'):
414 rootsys = self.config.get('environment','rootsys')
415 cluster.AddEnv("ROOTSYS",rootsys)
416 steering.AddSysOpt(SysOpt("rootsys",rootsys))
417
418
419 if self.config.has_option('environment','photontablesdir'):
420 photontablesdir = self.config.get('environment','photontablesdir')
421 photontables = os.path.join(photontablesdir,'tables')
422 else:
423 photontables = self.config.get('environment','photontables')
424 photontablesdir = os.path.join(photontables,'../')
425 cluster.AddEnv("PHOTONTABLES",photontables)
426 cluster.AddEnv("PHOTON_TABLES_DIR",photontablesdir)
427 steering.AddSysOpt(SysOpt("photontablesdir",photontablesdir))
428
429
430
431 for o in self.config.options('system'):
432 steering.AddSysOpt(SysOpt(o,self.config.get('system',o)))
433
434
435 corelibs = os.path.join(self.GetRootDir(),"shared",iceprod.zipfile())
436 if not os.path.exists(corelibs+'.zip'):
437 libdir = os.path.join(self.GetRootDir(),"lib")
438 iceprod.mktar(libdir,'iceprod/__init__.py',corelibs)
439 iceprod.mktar(libdir,'iceprod/core',corelibs,'a')
440 iceprod.mktar(libdir,'iceprod/modules',corelibs,'a')
441
442
443 if self.config.has_option('environment','java'):
444 javahome = self.config.get('environment','java')
445 cluster.AddEnv("JAVA_HOME",javahome)
446 steering.AddSysOpt(SysOpt("javahome",javahome))
447
448
449 if self.config.has_option('environment','scratch'):
450 scratchdir = self.config.get('environment','scratch')
451 else:
452 scratchdir = '$PWD'
453 cluster.AddEnv("I3SCRATCH",scratchdir)
454 steering.AddSysOpt(SysOpt("scratch",scratchdir))
455
456
457
458 steering.AddSysOpt(SysOpt("proxy_delegate", self.config.get('globus','delegate')))
459 if globus_proxy:
460 if self.config.getboolean('globus','delegate'):
461 cluster.AddEnv("X509_USER_PROXY",os.path.basename(globus_proxy))
462 steering.AddSysOpt(SysOpt("globus_proxy",os.path.basename(globus_proxy)))
463 else:
464 steering.AddSysOpt(SysOpt("globus_proxy",'$X509_USER_PROXY'))
465 steering.AddSysOpt(SysOpt("globus_location",globus_location))
466
467
468 if production:
469 cluster.SetInitialdir(os.path.join(self.rundir, str(dataset_id)))
470 else:
471 cluster.SetInitialdir(os.path.join(self.rundir,'non-production',str(dataset_id)))
472 inidir = os.path.join(cluster.GetInitialdir(),'iceprod_%f' % time.time())
473 self.MakeDir(cluster.GetInitialdir())
474 xcfg = os.path.join(cluster.GetInitialdir(),'config.xml')
475 writer = IceTrayXMLWriter(steering,xmluri)
476 writer.write_to_file(xcfg)
477 steeringfile = xcfg
478
479
480 cachemod = starttime + 240 < time.time()
481
482
483 if self.config.getboolean('queue','stage_pymods'):
484 pymods = self.GetIPModules(steering,cluster.GetInitialdir(),cachemod)
485
486 for o in self.config.options('job-env'):
487 cluster.AddEnv(o.upper(),self.config.get('job-env',o))
488
489
490 logger.info("processing jobs for dataset %u" % dataset_id)
491 for job in job_list[dataset_id]:
492
493 job.name = job_name
494 cluster.PushJob(job)
495 if production:
496 inidir = os.path.join(cluster.GetInitialdir(),'iceprod_%d.%f' % (job.GetProcNum(),time.time()))
497 job.SetOutputURL(target)
498 job.AddInputFile(dtdfile)
499
500
501 if self.config.getboolean('queue','stage_pymods'):
502 for p in pymods:
503 job.AddInputFile(p)
504
505 if not production:
506
507 if steering.GetParameter('LIB_URL'):
508 lib_url = steering.GetParameter('LIB_URL').GetValue()
509
510 gcdbase = cluster.GetInitialdir()
511 else:
512
513 gcdbase = os.path.join(self.rundir, str(job.GetDatasetId()))
514 gcdpath = os.path.join(gcdbase, "GeoCalibDetectorStatus_%06d.i3.gz" % job.GetDatasetId())
515 i3db_params = ""
516 opts = {
517 'dataset':job.GetDatasetId(),
518 'nproc':int(steering.GetParameter('MAXJOBS').GetValue()),
519 'procnum':job.GetProcNum()
520 }
521 parser = ExpParser(opts,steering)
522 job.AddParser(parser)
523 job.AddSteering(steering)
524 for param in steering.GetParameters():
525 if param.GetName().startswith("I3Db::"):
526 i3dbparamname = param.GetName().replace("I3Db::","")
527 i3dbval = parser.parse(param.GetValue())
528 if param.GetName() == "I3Db::outfile":
529 i3dbval = os.path.join(gcdbase,i3dbval)
530 gcdpath = i3dbval
531 i3db_params += " --%s %s" % (i3dbparamname.lower(),i3dbval)
532
533
534
535 if i3db_params and not os.path.exists(gcdpath):
536 if not self.config.has_option('path','i3dbclient'):
537 raise Exception, "i3dbclient is not configured."
538 i3db_gcd_client = self.config.get('path','i3dbclient')
539 basedir = self.config.get('path','basedir')
540 i3db_gcd_client = os.path.join(basedir, 'bin',i3db_gcd_client)
541 logger.info(i3db_gcd_client + " " + i3db_params)
542
543 if not os.path.exists(os.path.dirname(gcdpath)):
544 os.makedirs(os.path.dirname(gcdpath))
545 if os.system(i3db_gcd_client + " " + i3db_params):
546 raise Exception, "Unable to fetch GCD"
547 if i3db_params:
548 job.AddInputFile(gcdpath)
549
550 job.SetSubmitHost(self.submithost)
551 job.SetInitialdir(inidir)
552 self.MakeDir(job.GetInitialdir())
553 job.SetRootDir(self.GetRootDir())
554
555
556 job.SetExecutable(
557 os.path.join(self.GetRootDir(),"bin","i3exec.py"))
558 job.AddInputFile(corelibs+'.zip')
559
560 job.AddSteering(steering)
561 if not production:
562 url = steering.GetParameter('URL')
563 if url: cluster.SetURL(url.GetValue())
564
565 job.SetLogFile( "%s.log" % job.Prefix() )
566 job.SetOutputFile( "%s.out" % job.Prefix() )
567 job.SetErrorFile( "%s.err" % job.Prefix() )
568
569
570
571 if job.GetOutputURL() and job.GetOutputURL().startswith('auto:'):
572 job.AddArgOption("nocopy")
573
574 if self.config.getint('queue','sockettimeout'):
575 job.AddArgOption("timeout", self.config.getint('queue','sockettimeout'))
576
577 job.AddArgOption("procnum",job.GetProcNum())
578 if steering.GetParameter('DEBUG'):
579 job.AddArgOption("debug",steering.GetParameter('DEBUG').GetValue())
580
581
582 job.AddArgOption("fetch",lib_url)
583
584
585 if self.config.has_option('path','wgetrc'):
586 wgetrc = expandvars(self.config.get('path','wgetrc',raw=True))
587 else:
588 wgetrc = os.path.join(self.GetRootDir(),"etc","wgetrc")
589 if not os.path.exists(wgetrc):
590 wgetrc = os.path.join(self.GetRootDir(),"shared","wgetrc")
591 if os.path.exists(wgetrc):
592 job.AddInputFile(wgetrc)
593 job.AddEnv("WGETRC",basename(wgetrc))
594
595 if globus_proxy and self.config.getboolean('globus','delegate'):
596 proxy_copy = basename(expandvars(globus_proxy))
597 proxy_copy = os.path.join(job.GetInitialdir(),proxy_copy)
598 copy(expandvars(globus_proxy),proxy_copy)
599 job.AddInputFile(proxy_copy)
600 if globus_libs:
601 job.AddInputFile(expandvars(globus_libs))
602
603 job.AddEnv("SUBMITDIR",job.GetInitialdir())
604
605
606 if production:
607 moniurl = self.GetServerURL()
608 job.AddArgOption("url",moniurl)
609 job.AddArgOption("dataset",job.GetDatasetId())
610 steeringfile = os.path.join(job.GetInitialdir(),os.path.basename(xcfg))
611 os.system('cp %s %s' % (xcfg,steeringfile))
612 logger.info("writting %s in %s"%(os.path.basename(xcfg),job.GetInitialdir()))
613 else:
614 pmaxjobs = steering.GetParameter('MAXJOBS')
615 maxjobs = int(pmaxjobs.GetValue())
616 job.AddArgOption("nproc",maxjobs)
617
618 job.AddArgument(os.path.basename(steeringfile))
619 job.AddInputFile(steeringfile)
620
621
622 for opt in steering.GetBatchOpts():
623 logger.debug("%s = %s" % (opt.GetName(),opt.GetValue()))
624 if opt.GetType().lower() in [self.batchsys.lower(),'*',self.batchsys.split('.')[0].lower()]:
625 job.AddBatchOpt(opt.GetName(),opt.GetValue())
626
627
628 file = os.path.join(job.GetInitialdir(),'%s.%s' % (job.Prefix(),cluster.Suffix()))
629 cwdir = os.getcwd()
630 os.chdir(job.GetInitialdir())
631 logger.info("submitting job %u.%u" %(job.GetDatasetId(),job.GetProcNum()))
632 retval,out = cluster.Submit(job,file)
633 cookie.AddJobId(job.GetJobId())
634 status += out
635 if production:
636 if retval == 0:
637 self.i3monitordb.jobsubmitted(
638 job.GetDatasetId(), job.GetProcNum(),
639 job.GetInitialdir(), job.GetJobId())
640 else:
641 logger.error("failed to submit jobs:"+status)
642 self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3,
643 "failed to submit jobs:"+status)
644 os.chdir('/tmp')
645 self.CleanDir(job.GetInitialdir())
646 os.chdir(cwdir)
647
648 if self.ssl:
649 cookie.url = "https://%s:%s" % (self.config.get('server','server'),self.config.get('server','port'))
650 else:
651 cookie.url = "http://%s:%s" % (self.config.get('server','server'),self.config.get('server','port'))
652
653 if production:
654 self.i3monitordb.disconnect()
655 self.i3configdb.disconnect()
656 else:
657 try:
658 nonproddb[str(npid)] = cPickle.dumps(cookie)
659 nonproddb.sync()
660 except Exception,e:
661 logger.error(e)
662
663 status = "dataset %s:%d,%d enqueued" % (npid,first,last)
664 logger.info("Submit done")
665 return status,cookie
666
668 """
669 Fetch any files from svn which are needed by i3.IceTray
670 """
671 pymods = []
672
673 opts = {
674 'dataset': int(steering.GetParentId()),
675 'nproc' : int(steering.GetParameter('MAXJOBS').GetValue()),
676 'procnum': 0
677 }
678 parser = ExpParser(opts,steering)
679
680 for tray in steering.GetTrays():
681 for mod in tray.GetIceProdPres() + tray.GetIceProdPosts():
682 mclass = mod.GetClass()
683 if mclass == "i3.IceTray":
684 if not mod.GetParameter('IPModuleURL'):
685 raise Exception, "cannot download icetray module without a URL!!!"
686 url = parser.parse(mod.GetParameter('IPModuleURL').GetValue().value)
687
688 if mod.GetParameter('IPModuleRevision'):
689 rev = parser.parse(mod.GetParameter('IPModuleRevision').GetValue().value)
690
691 logger.info("fetching %s" %url)
692 if wget(url,destination,cache): raise Exception, "cannot retrieve file from %s" %url
693 pymod = os.path.join(destination,os.path.basename(url))
694 if pymod not in pymods:
695 pymods.append(pymod)
696 return pymods
697
698
700 if self.config.has_option('monitoring','url'):
701 return self.config.get('monitoring','url')
702
703 monihost = self.config.get('monitoring','server')
704 moniport = self.config.get('monitoring','port')
705
706
707
708 if self.config.has_option('monitoring','natserver'):
709 monihost = self.config.get('monitoring','natserver')
710 if self.config.has_option('monitoring','natport'):
711 moniport = self.config.get('monitoring','natport')
712
713 if self.ssl:
714 moniurl = "https://%s:%s" % (monihost,moniport)
715 else:
716 moniurl = "http://%s:%s" % (monihost,moniport)
717 return moniurl
718
730
731
733 """
734 Set the root directory where the iceprod software is stored
735 @param rootdir: directory path
736 """
737 if rootdir:
738 self.rootdir = rootdir
739
741 """
742 Get the root directory where the iceprod software is stored
743 """
744 return self.rootdir
745
747 """
748 Set the list of default archives that should be included with each
749 submission.
750 @param archive: archive or file needed by job (to be shipped with job)
751 """
752 if archive:
753 self.default_archives.append(archive)
754
755
756
778
779
781 """
782 Check validity of certificate proxy for gridFTP
783 @param proxy: path to proxy certificate
784 @param runtime: minimum validity time left on proxy
785 """
786 globusdir = self.config.get('globus','directory')
787 globuslib = os.path.join(globusdir,'lib')
788 proxy_info = self.config.get('globus','proxy-info')
789 proxy_info_opts = ''
790 if self.config.has_option('globus','proxy-info-opts'):
791 proxy_info_opts = self.config.get('globus','proxy-info-opts')
792 ldlist = os.getenv('LD_LIBRARY_PATH').split(':')
793 if globuslib not in ldlist:
794 ldlist.append(globuslib)
795 os.putenv('LD_LIBRARY_PATH',expandvars(":".join(ldlist)))
796 os.putenv('GLOBUS_LOCATION',expandvars(globusdir))
797 if os.path.exists(proxy):
798 executable = os.path.join(expandvars(globusdir),'bin',proxy_info)
799 if not os.path.exists(executable):
800 raise Exception, "executable %s does not exist" % executable
801 cmd = '%s %s -file %s -timeleft' % (executable,proxy_info_opts,proxy)
802 logger.debug(cmd)
803 output = os.popen(cmd)
804
805
806 timeleft = int(output.read().strip())/60
807 logger.info("time left on proxy is %d min." % timeleft)
808 try:
809 output.close()
810 except: pass
811 return timeleft >= runtime
812 return False
813
814
816 """
817 Check validity of certificate proxy for gridFTP
818 @param proxy: path to proxy certificate
819 @param runtime: minimum validity time left on proxy
820 """
821 globusdir = self.config.get('globus','directory')
822 certdir = self.config.get('globus','certdir')
823 proxy_init = self.config.get('globus','proxy-init')
824 proxy_init_opts = ''
825 if self.config.has_option('globus','proxy-init-opts'):
826 proxy_init_opts = self.config.get('globus','proxy-init-opts')
827 proxy_init_opts += ' -certdir %s ' % certdir
828 globuslib = os.path.join(globusdir,'lib')
829 ldlist = os.getenv('LD_LIBRARY_PATH').split(':')
830 if globuslib not in ldlist:
831 ldlist.append(globuslib)
832 os.putenv('LD_LIBRARY_PATH',expandvars(":".join(ldlist)))
833 os.putenv('GLOBUS_LOCATION',expandvars(globusdir))
834 executable = os.path.join(expandvars(globusdir),'bin',proxy_init)
835 cmd = '%s %s -pwstdin -valid %u:%u -out %s' % (executable,proxy_init_opts,runtime/60,runtime%60,proxy)
836 logger.debug(cmd)
837 try:
838 from subprocess import Popen,PIPE
839 except ImportError:
840 from popen2 import Popen3
841 p4 = Popen3(cmd,True)
842
843 p4.tochild.write(passwd)
844 p4.tochild.close()
845
846 o = p4.fromchild.read()
847 logger.info(o)
848 p4.fromchild.close()
849
850 e = p4.childerr.read()
851 logger.debug(e)
852 p4.childerr.close()
853
854 while p4.poll() < 0: time.sleep(1)
855 if p4.poll():
856 logger.error(e)
857 raise Exception, "unable to generate proxy file %s. %s returned %d" % (proxy,executable,p4.poll())
858 del p4
859
860 else:
861 p4 = Popen(cmd,stdin=PIPE,stdout=PIPE,stderr=PIPE,close_fds=True, shell=True)
862 o,e = p4.communicate(passwd)
863 logger.info(o)
864 logger.debug(e)
865 if p4.returncode:
866 logger.error(e)
867 raise Exception, "unable to generate proxy file %s. %s returned %d" % (proxy,executable,p4.poll())
868 del p4
869