Package iceprod :: Package server :: Module queue
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.queue

  1  #!/bin/env python 
  2  # 
  3   
  4  """ 
  5   A class for quequing IceTrayConfig jobs on various queueing systems. 
  6    
  7   copyright  (c) 2005 the icecube collaboration 
  8   
  9   @version: $Revision: $ 
 10   @date: $Date: $ 
 11   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 12   @todo: Add support for other queueing systems. 
 13   @todo: Move system dependent stuff into corresponding modules  
 14   (e.g. condor.py, pbs.py) 
 15  """ 
 16   
 17  import os, sys, re, time,string 
 18  import platform, getpass 
 19  import logging 
 20  import cPickle 
 21  import resource        # Resource usage information. 
 22  import grid  
 23  import plugins 
 24  from job import i3Job 
 25  from threading import Thread 
 26  from os.path import expandvars,basename 
 27   
 28  import iceprod 
 29  from iceprod.core.metadata import * 
 30  from iceprod.core.functions import * 
 31  from iceprod.core.dataclasses import * 
 32  from iceprod.core.xmlparser import IceTrayXMLParser 
 33  from iceprod.core.xmlwriter import IceTrayXMLWriter 
 34  from iceprod.core.lex import ExpParser 
 35  from dbqueue import * 
 36   
 37  logger = logging.getLogger('i3ProdQueue') 
 38   
39 -class UnsupportedSystemException(Exception):
40 - def __repr__(self):
41 return "Batch System is not currently supported!!!"
42
43 -class IncompleteConfigException(Exception):
44 - def __repr__(self):
45 return "Incomplete Steering Configuration"
46
47 -def isurl(path):
48 return path.startswith('http://') \ 49 or path.startswith('ftp://') \ 50 or path.startswith('gsiftp://') \ 51 or path.startswith('file://')
52
53 -class i3ProdQueue:
54 """ 55 This class interfaces IceTray with several queuing systems and is used 56 for simplifying the submission and monitoring of jobs. 57 Additionally, the information for jobs submitted can be entered into a 58 database through the class db.ConfigDB 59 """ 60
61 - def __init__(self,config):
62 self.config = config 63 self.queuesize = 50 64 self.i3configdb = None 65 self.i3monitordb = None 66 self.grid_id = 0 67 self.jobs_at_once = 20 68 self.rootdir = None 69 self.default_archives = [] 70 self.submitter = None 71 self.institution = None 72 self.grid = self.config.get('queue','name') 73 self.maxjobs = int(self.config.get('queue','maxjobs')) 74 self.batchsys = self.config.get('queue','batchsys') 75 logger.debug("system: %s" % self.batchsys) 76 self.rundir = self.config.get('path','submitdir') 77 try: 78 self.submithost = self.config.get('monitoring','server') 79 except: 80 self.submithost = os.uname()[1] 81 82 self.ssl = True 83 if self.config.has_option('monitoring','USESSL'): 84 self.ssl = self.config.getboolean('monitoring','USESSL') 85 if self.config.has_option('security','USESSL'): 86 self.ssl = self.ssl and self.config.getboolean('security','USESSL') 87 logger.info("Monitoring server is configured with SSL?: %s" % self.ssl)
88
89 - def SetConfigDB(self,db):
90 self.i3configdb = db
91 - def SetMonitorDB(self,db):
92 self.i3monitordb = db
93
94 - def SetGridId(self,grid_id):
95 self.grid_id = grid_id
96
97 - def SetJobsAtOnce(self,njobs):
98 self.jobs_at_once = njobs
99
100 - def SetQueueSize(self,maxsize):
101 self.queuesize = maxsize
102
103 - def SetSubmitHost(self,submithost):
104 self.submithost = submithost
105
106 - def SetSubmitter(self,submitter):
107 self.submitter = submitter
108
109 - def SetInstitution(self,institution):
110 self.institution = institution
111 112
113 - def MakeDir(self,dir):
114 """ 115 Create a temporary directory where the current job(s) will be 116 submitted from (or will run in depending on the queueing system) 117 """ 118 if not os.path.exists(dir): 119 os.makedirs(dir)
120
121 - def CleanDir(self,dir):
122 """ 123 Remove temporary directory where the current job(s) was 124 submitted from. 125 """ 126 if os.path.exists(dir): 127 try: 128 removedirs(dir) 129 except OSError,e: 130 logger.error(e)
131
132 - def SetQueueSize(self,maxsize):
133 self.queuesize = maxsize
134
135 - def SetSubmitHost(self,submithost):
136 self.submithost = submithost
137
138 - def SetSubmitter(self,submitter):
139 self.submitter = submitter
140
141 - def SetInstitution(self,institution):
142 self.institution = institution
143 144
145 - def MakeDir(self,dir):
146 """ 147 Create a temporary directory where the current job(s) will be 148 submitted from (or will run in depending on the queueing system) 149 """ 150 if not os.path.exists(dir): 151 os.makedirs(dir)
152 153
154 - def EnQueue(self,steering):
155 """ 156 Insert IceTrayConfig configuration to the database 157 @param steering: 158 """ 159 160 # set up a parser with default values 161 opts = {'dataset':0,'nproc':1,'procnum':0} 162 parser = ExpParser(opts,steering) 163 164 # Get list of grids to run on 165 if steering.GetExtra('Grid'): 166 grid = steering.GetExtra('Grid') 167 grids = map(string.strip,grid.split("|")) 168 elif steering.GetParameter('BATCHSYS'): 169 grid = steering.GetParameter('BATCHSYS') 170 grids = map(string.strip,grid.GetValue().split("|")) 171 elif steering.GetParameter('Grid'): 172 grid = steering.GetParameter('Grid') 173 grids = map(string.strip,grid.GetValue().split("|")) 174 else: 175 logger.warn('No grid specified. local grid asumed.') 176 grids = [self.grid] # use default 177 178 offlinefilt = steering.GetOF() 179 180 # Get Number of jobs to submit 181 if steering.GetExtra('Maxjobs'): 182 maxjobs = int(steering.GetExtra('Maxjobs')) 183 steering.AddParameter(Parameter("int","MAXJOBS",maxjobs)) 184 elif steering.GetParameter('MAXJOBS'): 185 maxjobs = steering.GetParameter('MAXJOBS') 186 maxjobs = int(parser.parse(maxjobs.GetValue())) 187 elif steering.GetOF(): 188 logger.warn("processing OF filesystem. This could take a while!") 189 maxjobs = offlinefilt.Search() 190 else: 191 raise Exception, "Dataset size was not specified!" 192 parser.opts['nproc'] = maxjobs # update parser's argopt dictionary 193 194 self.i3configdb.connect() 195 difplus = steering.GetExtra('Metadata') 196 if difplus == None: 197 raise IncompleteConfigException, 'Missing metadata' 198 199 ticket = steering.GetExtra('Ticket') 200 if ticket == None: ticket = 0 201 202 isTemplate = (steering.GetDatasetType() == "TEMPLATE" or maxjobs == 0) 203 204 dataset_id = self.i3configdb.upload_config(steering,ticket,isTemplate,maxjobs) 205 difplus.GetPlus().SetSubCategory(steering.GetCategory()) 206 prio = 0 207 try: 208 if steering.GetParameter('PRIORITY'): 209 prio = int(parser.parse(steering.GetParameter('PRIORITY').GetValue())) 210 elif steering.GetParameter('priority'): 211 prio = int(parser.parse(steering.GetParameter('priority').GetValue())) 212 except Exception,e: 213 logger.error('could not set priority: %s' % str(e) ) 214 logger.info('dataset %d has priority %s' % (dataset_id,prio)) 215 216 if dataset_id: 217 self.i3configdb.upload_metadata(dataset_id,difplus) 218 self.i3monitordb.InitializeGridStats(grids,dataset_id) 219 220 # update storage information 221 self.i3configdb.SetStorageURL(dataset_id,steering) 222 223 if maxjobs > 0: 224 self.i3monitordb.InitializeJobTable(maxjobs,dataset_id,prio) 225 if offlinefilt: # load directory dictionary for offline filtering 226 self.i3configdb.load_filelist(offlinefilt.GetFileDict(),dataset_id) 227 else: 228 raise Exception, "Failed to upload dataset" 229 230 cookie = I3Cookie() 231 cookie.dataset_id = dataset_id 232 233 if self.ssl: 234 cookie.url = "https://%s:%s" % (self.config.get('server','server'),self.config.get('server','port')) 235 else: 236 cookie.url = "http://%s:%s" % (self.config.get('server','server'),self.config.get('server','port')) 237 238 status = "dataset has been enqueued with id %d" % dataset_id 239 return status,cookie
240 241
242 - def mkdataset(self,start=0,end=0,dataset=0):
243 """ 244 Create a list of 'size' i3Job objects 245 """ 246 job_list = [] 247 for i in range(start,end): 248 job = i3Job() 249 job.SetProcNum(i) 250 job.SetDatasetId(dataset) 251 job_list.append(job) 252 return {dataset:job_list}
253 254
255 - def Submit(self,steering=None,production=False,first=0,last=0,npid=None):
256 """ 257 Submit and monitor jobs 258 """ 259 starttime = time.time() 260 cookie = I3Cookie() 261 cookie.dataset_id = 0 262 difplus = DIF_Plus() 263 cluster = plugins.get_plugin(self.batchsys)() 264 nonproddb = dict() 265 fifo = self.config.getboolean('queue','fifo') 266 267 if production: 268 self.i3monitordb.connect() 269 job_list = self.i3monitordb.QueueJobs(self.maxjobs,self.grid_id,self.jobs_at_once,fifo) 270 job_name = self.config.get('queue','jobprefix') 271 else: 272 job_name = "ipnp" 273 try: 274 import bsddb 275 nonproddb_location = self.config.get('database','non-production') 276 nonproddb = bsddb.btopen(nonproddb_location, 'c') 277 except: pass 278 279 if npid: # has a non-production id 280 cookie = cPickle.loads(nonproddb[str(npid)]) 281 elif nonproddb.keys(): 282 npid = int(nonproddb.last()[0]) + 1 283 else: 284 npid = 1 285 286 pmaxjobs = steering.GetParameter('MAXJOBS') 287 maxjobs = int(pmaxjobs.GetValue()) 288 if last == 0: last = maxjobs 289 job_list = self.mkdataset(start=first,end=last,dataset=npid) # single dataset 290 steering.AddDIFPlus(self.MakeMetadata(difplus,0,simcat="Non-Production")) 291 292 # Include proxy for globus authentication 293 # --- This will probably change once we have played around with 294 # --- gridFTP some more. 295 globus_proxy = None 296 globus_libs = None 297 if self.config.has_section('globus') and self.config.getboolean('globus','useproxy'): 298 globus_proxy = expandvars(self.config.get('globus','proxy')) 299 if self.config.has_option('globus','libs'): 300 globus_libs = self.config.get('globus','libs') 301 globus_location = self.config.get('globus','job_globus_location') 302 gpass = self.config.get('globus','passwd',raw=True) 303 runtime = self.config.getint('queue','max_job_processing_time') 304 runtime += self.config.getint('queue','max_job_idle_time') 305 logger.info("checking proxyfile...") 306 if not self.CheckProxy(globus_proxy,runtime): 307 logger.info("proxy %s will likely expire before job completes" % globus_proxy) 308 logger.info("generating new proxyfile.") 309 self.MakeProxy(globus_proxy,runtime*2,gpass) 310 311 status = "" 312 for dataset_id in job_list.keys(): 313 logger.info('queueing %d jobs for dataset %d' % (len(job_list[dataset_id]),dataset_id)) 314 if production: 315 steering = self.i3configdb.download_config(dataset_id) 316 difplus = self.i3configdb.download_metadata(dataset_id) 317 steering.AddDIFPlus(difplus) 318 319 # Determine the requested batch system and build submission request 320 # Call constructor for plugin 321 cluster = plugins.get_plugin(self.batchsys)() 322 if not cluster: 323 logger.error("%s not yet implemented" % self.batchsys) 324 raise UnsupportedSystemException, self.batchsys 325 326 cluster.SetMonitorDB(self.i3monitordb) 327 328 # Batch System options 329 # first set any bathsystem options from the server configuration 330 if self.config.has_section('batchopts'): 331 for o in self.config.options('batchopts'): 332 cluster.AddParam(o,self.config.get('batchopts',o)) 333 cluster.SetSteering(steering) 334 335 lib_url = self.config.get('path','lib_url') 336 steering.AddSysOpt(SysOpt("lib_url",lib_url)) 337 338 dtdfile = os.path.join(self.GetRootDir(),"shared","icetray.v2.dtd") 339 xmluri = 'file:icetray.v2.dtd' 340 try: 341 xmluri = self.config.get('path','uri') 342 except: pass 343 344 cluster.AddArgOption("grid",self.grid_id) 345 cluster.AddArgOption("lib",iceprod.zipfile()+'.zip') 346 target = self.config.get('path','target_url',raw=True) 347 steering.AddSysOpt(SysOpt("targeturl",target)) 348 #cluster.AddArgOption("target",target) 349 350 # Job will automatically move data and set status 351 stageout = self.config.getboolean('queue','jobstageout') 352 steering.AddSysOpt(SysOpt("stageout",str(stageout))) 353 cluster.AddArgOption("stageout",int(stageout)) 354 355 # Is python zipsafe (can it read zipped modules)? 356 zipsafe = self.config.getboolean('queue','zipsafe') 357 steering.AddSysOpt(SysOpt("zipsafe",str(zipsafe))) 358 if not zipsafe: 359 cluster.AddArgOption("zipsafe",0) 360 361 # need a place to store temporary output for multi-job execution 362 if self.config.has_option('path','dagtemp'): 363 dagtemp = self.config.get('path','dagtemp',raw=True) 364 if dagtemp: 365 steering.AddSysOpt(SysOpt("dagtemp",dagtemp)) 366 cluster.AddArgOption("dagtemp",dagtemp) 367 368 # data will be copied back to submit node 369 nocopy = False 370 if self.config.has_option('queue','nocopy') and self.config.getboolean('queue','nocopy'): 371 nocopy = True 372 cluster.AddArgOption("nocopy",int(nocopy)) 373 steering.AddSysOpt(SysOpt("nocopy",str(nocopy))) 374 375 if self.config.getboolean('system','validatexml'): 376 cluster.AddArgOption("validate",1) 377 else: 378 cluster.AddArgOption("validate",0) 379 380 # tar output from job? 381 if self.config.has_option('path','tar_output') and self.config.getboolean('path','tar_output'): 382 cluster.AddArgOption("mktar") 383 384 # cache directory on nodes 385 if self.config.has_option('path','cache'): 386 #cluster.AddArgOption("cache",self.config.get('path','cache')) 387 steering.AddSysOpt(SysOpt("cache",self.config.get('path','cache'))) 388 389 # periodically update server of job status 390 cluster.AddArgOption("ping",self.config.getint('queue','ping_interval')) 391 392 # If specified, jobs will force a platform instead of auto 393 # detecting it. This is useful for homogeneous clusters where 394 # nodes don't have gcc. 395 if self.config.has_option('environment','platform'): 396 platform = self.config.get('environment','platform') 397 if platform and platform != 'system': 398 cluster.AddArgOption("platform",platform) 399 steering.AddSysOpt(SysOpt("platform",platform)) 400 401 # Add system information 402 steering.AddSysOpt(SysOpt("gridname",self.grid)) 403 steering.AddSysOpt(SysOpt("gridid",self.grid_id)) 404 405 # Tell job where to find the python environment 406 pyhome = self.config.get('environment','pythonhome') 407 cluster.AddEnv("PYROOT",pyhome) 408 if self.config.has_option('environment','pythonpath'): 409 pythonpath = self.config.get('environment','pythonpath') 410 cluster.AddEnv("PYTHONPATH",pythonpath) 411 412 # Tell job where to find the ROOT environment 413 if self.config.has_option('environment','rootsys'): 414 rootsys = self.config.get('environment','rootsys') 415 cluster.AddEnv("ROOTSYS",rootsys) 416 steering.AddSysOpt(SysOpt("rootsys",rootsys)) 417 418 # Tell job where to find the photon tables 419 if self.config.has_option('environment','photontablesdir'): 420 photontablesdir = self.config.get('environment','photontablesdir') 421 photontables = os.path.join(photontablesdir,'tables') 422 else: 423 photontables = self.config.get('environment','photontables') 424 photontablesdir = os.path.join(photontables,'../') 425 cluster.AddEnv("PHOTONTABLES",photontables) 426 cluster.AddEnv("PHOTON_TABLES_DIR",photontablesdir) 427 steering.AddSysOpt(SysOpt("photontablesdir",photontablesdir)) 428 429 # Add any server configuired parameters that might be used in 430 # the configuration 431 for o in self.config.options('system'): 432 steering.AddSysOpt(SysOpt(o,self.config.get('system',o))) 433 434 # Include the iceprod libs as a zip archive 435 corelibs = os.path.join(self.GetRootDir(),"shared",iceprod.zipfile()) 436 if not os.path.exists(corelibs+'.zip'): 437 libdir = os.path.join(self.GetRootDir(),"lib") 438 iceprod.mktar(libdir,'iceprod/__init__.py',corelibs) 439 iceprod.mktar(libdir,'iceprod/core',corelibs,'a') 440 iceprod.mktar(libdir,'iceprod/modules',corelibs,'a') 441 442 # Tell job where to find its java environment 443 if self.config.has_option('environment','java'): 444 javahome = self.config.get('environment','java') 445 cluster.AddEnv("JAVA_HOME",javahome) 446 steering.AddSysOpt(SysOpt("javahome",javahome)) 447 448 # Tell job where to write temporary files 449 if self.config.has_option('environment','scratch'): 450 scratchdir = self.config.get('environment','scratch') 451 else: 452 scratchdir = '$PWD' 453 cluster.AddEnv("I3SCRATCH",scratchdir) 454 steering.AddSysOpt(SysOpt("scratch",scratchdir)) 455 456 # Only set the X509_USER_PROXY variable if this is not done 457 # automatically by system 458 steering.AddSysOpt(SysOpt("proxy_delegate", self.config.get('globus','delegate'))) 459 if globus_proxy: 460 if self.config.getboolean('globus','delegate'): 461 cluster.AddEnv("X509_USER_PROXY",os.path.basename(globus_proxy)) 462 steering.AddSysOpt(SysOpt("globus_proxy",os.path.basename(globus_proxy))) 463 else: 464 steering.AddSysOpt(SysOpt("globus_proxy",'$X509_USER_PROXY')) 465 steering.AddSysOpt(SysOpt("globus_location",globus_location)) 466 467 # write XML config file to submit directory 468 if production: 469 cluster.SetInitialdir(os.path.join(self.rundir, str(dataset_id))) 470 else: 471 cluster.SetInitialdir(os.path.join(self.rundir,'non-production',str(dataset_id))) 472 inidir = os.path.join(cluster.GetInitialdir(),'iceprod_%f' % time.time()) 473 self.MakeDir(cluster.GetInitialdir()) 474 xcfg = os.path.join(cluster.GetInitialdir(),'config.xml') 475 writer = IceTrayXMLWriter(steering,xmluri) 476 writer.write_to_file(xcfg) 477 steeringfile = xcfg 478 479 # check timer for caching svn modules 480 cachemod = starttime + 240 < time.time() 481 482 # download modules needed 483 if self.config.getboolean('queue','stage_pymods'): 484 pymods = self.GetIPModules(steering,cluster.GetInitialdir(),cachemod) 485 486 for o in self.config.options('job-env'): 487 cluster.AddEnv(o.upper(),self.config.get('job-env',o)) 488 489 # Add jobs 490 logger.info("processing jobs for dataset %u" % dataset_id) 491 for job in job_list[dataset_id]: 492 493 job.name = job_name 494 cluster.PushJob(job) 495 if production: 496 inidir = os.path.join(cluster.GetInitialdir(),'iceprod_%d.%f' % (job.GetProcNum(),time.time())) 497 job.SetOutputURL(target) 498 job.AddInputFile(dtdfile) 499 500 # Add python modules as dependencies 501 if self.config.getboolean('queue','stage_pymods'): 502 for p in pymods: 503 job.AddInputFile(p) 504 505 if not production: 506 # Tell job where to download libs from 507 if steering.GetParameter('LIB_URL'): 508 lib_url = steering.GetParameter('LIB_URL').GetValue() 509 510 gcdbase = cluster.GetInitialdir() 511 else: 512 # write GeoCalibDetStat .i3 file to submit directory 513 gcdbase = os.path.join(self.rundir, str(job.GetDatasetId())) 514 gcdpath = os.path.join(gcdbase, "GeoCalibDetectorStatus_%06d.i3.gz" % job.GetDatasetId()) 515 i3db_params = "" 516 opts = { 517 'dataset':job.GetDatasetId(), 518 'nproc':int(steering.GetParameter('MAXJOBS').GetValue()), 519 'procnum':job.GetProcNum() 520 } 521 parser = ExpParser(opts,steering) 522 job.AddParser(parser) 523 job.AddSteering(steering) 524 for param in steering.GetParameters(): 525 if param.GetName().startswith("I3Db::"): 526 i3dbparamname = param.GetName().replace("I3Db::","") 527 i3dbval = parser.parse(param.GetValue()) 528 if param.GetName() == "I3Db::outfile": 529 i3dbval = os.path.join(gcdbase,i3dbval) 530 gcdpath = i3dbval 531 i3db_params += " --%s %s" % (i3dbparamname.lower(),i3dbval) 532 533 # if I3Db is configured in steering section but gcd file does not 534 # already exist, then create it. 535 if i3db_params and not os.path.exists(gcdpath): 536 if not self.config.has_option('path','i3dbclient'): 537 raise Exception, "i3dbclient is not configured." 538 i3db_gcd_client = self.config.get('path','i3dbclient') 539 basedir = self.config.get('path','basedir') 540 i3db_gcd_client = os.path.join(basedir, 'bin',i3db_gcd_client) 541 logger.info(i3db_gcd_client + " " + i3db_params) 542 543 if not os.path.exists(os.path.dirname(gcdpath)): 544 os.makedirs(os.path.dirname(gcdpath)) 545 if os.system(i3db_gcd_client + " " + i3db_params): 546 raise Exception, "Unable to fetch GCD" 547 if i3db_params: 548 job.AddInputFile(gcdpath) 549 550 job.SetSubmitHost(self.submithost) 551 job.SetInitialdir(inidir) 552 self.MakeDir(job.GetInitialdir()) 553 job.SetRootDir(self.GetRootDir()) 554 555 # Set the main executable of job and add core libs 556 job.SetExecutable( 557 os.path.join(self.GetRootDir(),"bin","i3exec.py")) 558 job.AddInputFile(corelibs+'.zip') 559 560 job.AddSteering(steering) 561 if not production: 562 url = steering.GetParameter('URL') 563 if url: cluster.SetURL(url.GetValue()) 564 565 job.SetLogFile( "%s.log" % job.Prefix() ) 566 job.SetOutputFile( "%s.out" % job.Prefix() ) 567 job.SetErrorFile( "%s.err" % job.Prefix() ) 568 569 ### See if this is a job initiated file tranfer #### 570 # Swegrid style data movement 571 if job.GetOutputURL() and job.GetOutputURL().startswith('auto:'): 572 job.AddArgOption("nocopy") 573 574 if self.config.getint('queue','sockettimeout'): 575 job.AddArgOption("timeout", self.config.getint('queue','sockettimeout')) 576 577 job.AddArgOption("procnum",job.GetProcNum()) 578 if steering.GetParameter('DEBUG'): 579 job.AddArgOption("debug",steering.GetParameter('DEBUG').GetValue()) 580 581 # Tell job where to download dependencies from 582 job.AddArgOption("fetch",lib_url) 583 584 # Include wgetrc file for http/ftp authentication 585 if self.config.has_option('path','wgetrc'): 586 wgetrc = expandvars(self.config.get('path','wgetrc',raw=True)) 587 else: 588 wgetrc = os.path.join(self.GetRootDir(),"etc","wgetrc") 589 if not os.path.exists(wgetrc): 590 wgetrc = os.path.join(self.GetRootDir(),"shared","wgetrc") 591 if os.path.exists(wgetrc): 592 job.AddInputFile(wgetrc) 593 job.AddEnv("WGETRC",basename(wgetrc)) 594 595 if globus_proxy and self.config.getboolean('globus','delegate'): 596 proxy_copy = basename(expandvars(globus_proxy)) 597 proxy_copy = os.path.join(job.GetInitialdir(),proxy_copy) 598 copy(expandvars(globus_proxy),proxy_copy) 599 job.AddInputFile(proxy_copy) 600 if globus_libs: 601 job.AddInputFile(expandvars(globus_libs)) 602 603 job.AddEnv("SUBMITDIR",job.GetInitialdir()) 604 605 # Give job soapmon URL 606 if production: 607 moniurl = self.GetServerURL() 608 job.AddArgOption("url",moniurl) 609 job.AddArgOption("dataset",job.GetDatasetId()) 610 steeringfile = os.path.join(job.GetInitialdir(),os.path.basename(xcfg)) 611 os.system('cp %s %s' % (xcfg,steeringfile)) 612 logger.info("writting %s in %s"%(os.path.basename(xcfg),job.GetInitialdir())) 613 else: 614 pmaxjobs = steering.GetParameter('MAXJOBS') 615 maxjobs = int(pmaxjobs.GetValue()) 616 job.AddArgOption("nproc",maxjobs) 617 618 job.AddArgument(os.path.basename(steeringfile)) 619 job.AddInputFile(steeringfile) 620 621 # Get batchsystem specific options from database 622 for opt in steering.GetBatchOpts(): 623 logger.debug("%s = %s" % (opt.GetName(),opt.GetValue())) 624 if opt.GetType().lower() in [self.batchsys.lower(),'*',self.batchsys.split('.')[0].lower()]: 625 job.AddBatchOpt(opt.GetName(),opt.GetValue()) 626 627 # Submit to cluster 628 file = os.path.join(job.GetInitialdir(),'%s.%s' % (job.Prefix(),cluster.Suffix())) 629 cwdir = os.getcwd() 630 os.chdir(job.GetInitialdir()) 631 logger.info("submitting job %u.%u" %(job.GetDatasetId(),job.GetProcNum())) 632 retval,out = cluster.Submit(job,file) 633 cookie.AddJobId(job.GetJobId()) 634 status += out 635 if production: 636 if retval == 0: 637 self.i3monitordb.jobsubmitted( 638 job.GetDatasetId(), job.GetProcNum(), 639 job.GetInitialdir(), job.GetJobId()) 640 else: 641 logger.error("failed to submit jobs:"+status) 642 self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3, 643 "failed to submit jobs:"+status) 644 os.chdir('/tmp') 645 self.CleanDir(job.GetInitialdir()) 646 os.chdir(cwdir) 647 648 if self.ssl: 649 cookie.url = "https://%s:%s" % (self.config.get('server','server'),self.config.get('server','port')) 650 else: 651 cookie.url = "http://%s:%s" % (self.config.get('server','server'),self.config.get('server','port')) 652 653 if production: 654 self.i3monitordb.disconnect() 655 self.i3configdb.disconnect() 656 else: 657 try: 658 nonproddb[str(npid)] = cPickle.dumps(cookie) 659 nonproddb.sync() 660 except Exception,e: 661 logger.error(e) 662 663 status = "dataset %s:%d,%d enqueued" % (npid,first,last) 664 logger.info("Submit done") 665 return status,cookie
666
667 - def GetIPModules(self,steering,destination,cache=False):
668 """ 669 Fetch any files from svn which are needed by i3.IceTray 670 """ 671 pymods = [] 672 673 opts = { 674 'dataset': int(steering.GetParentId()), 675 'nproc' : int(steering.GetParameter('MAXJOBS').GetValue()), 676 'procnum': 0 677 } 678 parser = ExpParser(opts,steering) 679 680 for tray in steering.GetTrays(): 681 for mod in tray.GetIceProdPres() + tray.GetIceProdPosts(): 682 mclass = mod.GetClass() 683 if mclass == "i3.IceTray": 684 if not mod.GetParameter('IPModuleURL'): 685 raise Exception, "cannot download icetray module without a URL!!!" 686 url = parser.parse(mod.GetParameter('IPModuleURL').GetValue().value) 687 688 if mod.GetParameter('IPModuleRevision'): 689 rev = parser.parse(mod.GetParameter('IPModuleRevision').GetValue().value) 690 691 logger.info("fetching %s" %url) 692 if wget(url,destination,cache): raise Exception, "cannot retrieve file from %s" %url 693 pymod = os.path.join(destination,os.path.basename(url)) 694 if pymod not in pymods: 695 pymods.append(pymod) 696 return pymods
697 698
699 - def GetServerURL(self):
700 if self.config.has_option('monitoring','url'): 701 return self.config.get('monitoring','url') 702 703 monihost = self.config.get('monitoring','server') 704 moniport = self.config.get('monitoring','port') 705 706 # Check for network address translation (NAT) 707 # (i.e.) server address is different in compute node network 708 if self.config.has_option('monitoring','natserver'): 709 monihost = self.config.get('monitoring','natserver') 710 if self.config.has_option('monitoring','natport'): 711 moniport = self.config.get('monitoring','natport') 712 713 if self.ssl: 714 moniurl = "https://%s:%s" % (monihost,moniport) 715 else: 716 moniurl = "http://%s:%s" % (monihost,moniport) 717 return moniurl
718
719 - def GetPlatform(self):
720 """ 721 @return: a string representing the platform of the current system 722 in the IceTray format. 723 """ 724 725 os = platform.system().replace('Darwin','MacOSX') 726 arch = platform.machine() 727 arch = arch.replace('Power Macintosh','ppc') 728 arch = arch.replace('i686','i386') 729 return "%s-%s" % (os,arch)
730 731
732 - def SetRootDir(self,rootdir):
733 """ 734 Set the root directory where the iceprod software is stored 735 @param rootdir: directory path 736 """ 737 if rootdir: 738 self.rootdir = rootdir
739
740 - def GetRootDir(self):
741 """ 742 Get the root directory where the iceprod software is stored 743 """ 744 return self.rootdir
745
746 - def AddDefaultArchive(self,archive):
747 """ 748 Set the list of default archives that should be included with each 749 submission. 750 @param archive: archive or file needed by job (to be shipped with job) 751 """ 752 if archive: 753 self.default_archives.append(archive)
754 755 756
757 - def MakeMetadata(self,difplus,run_id,simcat=""):
758 """ 759 Log information about this dataset catalog dataset. 760 @param run_id: primary key for this job on configuration database 761 """ 762 dif = difplus.GetDIF() 763 dif.SetEntryTitle('IceProd NonProduction Dataset') 764 dif.SetSummary('IceProd NonProduction Dataset') 765 dif.SetParameters("SPACE SCIENCE > Astrophysics > Neutrinos") 766 dif.GetDataCenter().AddPersonnel(metadata.Personnel()) 767 768 plus = difplus.GetPlus() 769 plus.SetSimDBKey(run_id) 770 plus.SetSteeringFile('dataset_%06d_config.xml' % int(run_id)) 771 plus.SetStartDatetime(time.strftime("%Y-%m-%dT%H:%M:%S")) 772 plus.SetEndDatetime(time.strftime("%Y-%m-%dT%H:%M:%S")) 773 774 # We are no longer using the simdb_key as a subcategory 775 plus.SetCategory('generated') 776 plus.SetSubCategory(simcat) 777 return difplus
778 779
780 - def CheckProxy(self,proxy,runtime):
781 """ 782 Check validity of certificate proxy for gridFTP 783 @param proxy: path to proxy certificate 784 @param runtime: minimum validity time left on proxy 785 """ 786 globusdir = self.config.get('globus','directory') 787 globuslib = os.path.join(globusdir,'lib') 788 proxy_info = self.config.get('globus','proxy-info') 789 proxy_info_opts = '' 790 if self.config.has_option('globus','proxy-info-opts'): 791 proxy_info_opts = self.config.get('globus','proxy-info-opts') 792 ldlist = os.getenv('LD_LIBRARY_PATH').split(':') 793 if globuslib not in ldlist: 794 ldlist.append(globuslib) 795 os.putenv('LD_LIBRARY_PATH',expandvars(":".join(ldlist))) 796 os.putenv('GLOBUS_LOCATION',expandvars(globusdir)) 797 if os.path.exists(proxy): 798 executable = os.path.join(expandvars(globusdir),'bin',proxy_info) 799 if not os.path.exists(executable): 800 raise Exception, "executable %s does not exist" % executable 801 cmd = '%s %s -file %s -timeleft' % (executable,proxy_info_opts,proxy) 802 logger.debug(cmd) 803 output = os.popen(cmd) 804 805 # grid-proxy-info returns seconds 806 timeleft = int(output.read().strip())/60 807 logger.info("time left on proxy is %d min." % timeleft) 808 try: 809 output.close() 810 except: pass 811 return timeleft >= runtime 812 return False
813 814
815 - def MakeProxy(self,proxy,runtime,passwd):
816 """ 817 Check validity of certificate proxy for gridFTP 818 @param proxy: path to proxy certificate 819 @param runtime: minimum validity time left on proxy 820 """ 821 globusdir = self.config.get('globus','directory') 822 certdir = self.config.get('globus','certdir') 823 proxy_init = self.config.get('globus','proxy-init') 824 proxy_init_opts = '' 825 if self.config.has_option('globus','proxy-init-opts'): 826 proxy_init_opts = self.config.get('globus','proxy-init-opts') 827 proxy_init_opts += ' -certdir %s ' % certdir 828 globuslib = os.path.join(globusdir,'lib') 829 ldlist = os.getenv('LD_LIBRARY_PATH').split(':') 830 if globuslib not in ldlist: 831 ldlist.append(globuslib) 832 os.putenv('LD_LIBRARY_PATH',expandvars(":".join(ldlist))) 833 os.putenv('GLOBUS_LOCATION',expandvars(globusdir)) 834 executable = os.path.join(expandvars(globusdir),'bin',proxy_init) 835 cmd = '%s %s -pwstdin -valid %u:%u -out %s' % (executable,proxy_init_opts,runtime/60,runtime%60,proxy) 836 logger.debug(cmd) 837 try: 838 from subprocess import Popen,PIPE 839 except ImportError: 840 from popen2 import Popen3 841 p4 = Popen3(cmd,True) 842 843 p4.tochild.write(passwd) 844 p4.tochild.close() 845 846 o = p4.fromchild.read() 847 logger.info(o) 848 p4.fromchild.close() 849 850 e = p4.childerr.read() 851 logger.debug(e) 852 p4.childerr.close() 853 854 while p4.poll() < 0: time.sleep(1) 855 if p4.poll(): # return status 856 logger.error(e) 857 raise Exception, "unable to generate proxy file %s. %s returned %d" % (proxy,executable,p4.poll()) 858 del p4 859 860 else: 861 p4 = Popen(cmd,stdin=PIPE,stdout=PIPE,stderr=PIPE,close_fds=True, shell=True) 862 o,e = p4.communicate(passwd) 863 logger.info(o) 864 logger.debug(e) 865 if p4.returncode: # return status 866 logger.error(e) 867 raise Exception, "unable to generate proxy file %s. %s returned %d" % (proxy,executable,p4.poll()) 868 del p4
869