Package iceprod :: Package server :: Module queue
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.queue

   1  #!/bin/env python 
   2  # 
   3   
   4  """ 
   5   A class for quequing IceTrayConfig jobs on various queueing systems. 
   6    
   7   copyright  (c) 2005 the icecube collaboration 
   8   
   9   @version: $Revision: $ 
  10   @date: $Date: $ 
  11   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
  12   @todo: Add support for other queueing systems. 
  13   @todo: Move system dependent stuff into corresponding modules  
  14   (e.g. condor.py, pbs.py) 
  15  """ 
  16   
  17  import os, sys, re, time,string 
  18  import platform, getpass 
  19  import logging 
  20  import cPickle 
  21  import resource        # Resource usage information. 
  22  import grid  
  23  import plugins 
  24  from job import i3Job 
  25  from threading import Thread 
  26  from os.path import expandvars,basename 
  27   
  28  import iceprod 
  29  from iceprod.core.metadata import * 
  30  from iceprod.core.functions import * 
  31  from iceprod.core.dataclasses import * 
  32  from iceprod.core.xmlparser import IceTrayXMLParser 
  33  from iceprod.core.xmlwriter import IceTrayXMLWriter 
  34  from iceprod.core.lex import ExpParser 
  35  from dbqueue import * 
  36   
  37  logger = logging.getLogger('i3ProdQueue') 
  38   
39 -class UnsupportedSystemException(Exception):
40 - def __repr__(self):
41 return "Batch System is not currently supported!!!"
42
43 -class IncompleteConfigException(Exception):
44 - def __repr__(self):
45 return "Incomplete Steering Configuration"
46
47 -def isurl(path):
48 return path.startswith('http://') \ 49 or path.startswith('ftp://') \ 50 or path.startswith('gsiftp://') \ 51 or path.startswith('file://')
52
53 -class i3ProdQueue:
54 """ 55 This class interfaces IceTray with several queuing systems and is used 56 for simplifying the submission and monitoring of jobs. 57 Additionally, the information for jobs submitted can be entered into a 58 database through the class db.ConfigDB 59 """ 60
61 - def __init__(self,config):
62 self.config = config 63 self.queuesize = 50 64 self.i3configdb = None 65 self.i3monitordb = None 66 self.grid_id = 0 67 self.jobs_at_once = 20 68 self.rootdir = None 69 self.default_archives = [] 70 self.submitter = None 71 self.institution = None 72 self.grid = self.config.get('queue','name') 73 self.maxjobs = int(self.config.get('queue','maxjobs')) 74 self.batchsys = self.config.get('queue','batchsys') 75 logger.debug("system: %s" % self.batchsys) 76 self.rundir = self.config.get('path','submitdir') 77 try: 78 self.submithost = self.config.get('monitoring','server') 79 except: 80 self.submithost = os.uname()[1] 81 82 self.ssl = True 83 if self.config.has_option('monitoring','USESSL'): 84 self.ssl = self.config.getboolean('monitoring','USESSL') 85 if self.config.has_option('security','USESSL'): 86 self.ssl = self.ssl and self.config.getboolean('security','USESSL') 87 logger.info("Monitoring server is configured with SSL?: %s" % self.ssl)
88
89 - def SetConfigDB(self,db):
90 self.i3configdb = db
91 - def SetMonitorDB(self,db):
92 self.i3monitordb = db
93
94 - def SetGridId(self,grid_id):
95 self.grid_id = grid_id
96
97 - def SetJobsAtOnce(self,njobs):
98 self.jobs_at_once = njobs
99
100 - def SetQueueSize(self,maxsize):
101 self.queuesize = maxsize
102
103 - def SetSubmitHost(self,submithost):
104 self.submithost = submithost
105
106 - def SetSubmitter(self,submitter):
107 self.submitter = submitter
108
109 - def SetInstitution(self,institution):
110 self.institution = institution
111 112
113 - def MakeDir(self,dir):
114 """ 115 Create a temporary directory where the current job(s) will be 116 submitted from (or will run in depending on the queueing system) 117 """ 118 if not os.path.exists(dir): 119 os.makedirs(dir)
120
121 - def CleanDir(self,dir):
122 """ 123 Remove temporary directory where the current job(s) was 124 submitted from. 125 """ 126 if os.path.exists(dir): 127 try: 128 os.removedirs(dir) 129 except OSError,e: 130 logger.error(e)
131
132 - def SetQueueSize(self,maxsize):
133 self.queuesize = maxsize
134
135 - def SetSubmitHost(self,submithost):
136 self.submithost = submithost
137
138 - def SetSubmitter(self,submitter):
139 self.submitter = submitter
140
141 - def SetInstitution(self,institution):
142 self.institution = institution
143 144
145 - def MakeDir(self,dir):
146 """ 147 Create a temporary directory where the current job(s) will be 148 submitted from (or will run in depending on the queueing system) 149 """ 150 if not os.path.exists(dir): 151 os.makedirs(dir)
152 153
154 - def EnQueue(self,steering):
155 """ 156 Insert IceTrayConfig configuration to the database 157 @param steering: 158 """ 159 160 # set up a parser with default values 161 opts = {'dataset':0,'nproc':1,'procnum':0} 162 parser = ExpParser(opts,steering) 163 164 # Get list of grids to run on 165 if steering.GetExtra('Grid'): 166 grid = steering.GetExtra('Grid') 167 grids = map(string.strip,grid.split("|")) 168 elif steering.GetParameter('BATCHSYS'): 169 grid = steering.GetParameter('BATCHSYS') 170 grids = map(string.strip,grid.GetValue().split("|")) 171 elif steering.GetParameter('Grid'): 172 grid = steering.GetParameter('Grid') 173 grids = map(string.strip,grid.GetValue().split("|")) 174 else: 175 logger.warn('No grid specified. local grid asumed.') 176 grids = [self.grid] # use default 177 178 offlinefilt = steering.GetOF() 179 180 # Get Number of jobs to submit 181 if steering.GetExtra('Maxjobs'): 182 maxjobs = int(steering.GetExtra('Maxjobs')) 183 elif steering.GetParameter('MAXJOBS'): 184 maxjobs = steering.GetParameter('MAXJOBS') 185 maxjobs = int(parser.parse(maxjobs.GetValue())) 186 elif steering.GetOF(): 187 logger.warn("processing OF filesystem. This could take a while!") 188 maxjobs = offlinefilt.Search() 189 else: 190 logger.warn("Dataset size was not specified!") 191 maxjobs = 0 192 parser.opts['nproc'] = maxjobs # update parser's argopt dictionary 193 194 self.i3configdb.connect() 195 difplus = steering.GetExtra('Metadata') 196 if difplus == None: 197 raise IncompleteConfigException, 'Missing metadata' 198 199 ticket = steering.GetExtra('Ticket') 200 if ticket == None: ticket = 0 201 202 isTemplate = (steering.GetDatasetType() == "TEMPLATE" or maxjobs == 0) 203 204 dataset_id = self.i3configdb.upload_config(steering,ticket,isTemplate,maxjobs) 205 difplus.GetPlus().SetSubCategory(steering.GetCategory()) 206 prio = 0 207 try: 208 if steering.GetParameter('PRIORITY'): 209 prio = int(parser.parse(steering.GetParameter('PRIORITY').GetValue())) 210 elif steering.GetParameter('priority'): 211 prio = int(parser.parse(steering.GetParameter('priority').GetValue())) 212 except Exception,e: 213 logger.error('could not set priority: %s' % str(e) ) 214 logger.info('dataset %d has priority %s' % (dataset_id,prio)) 215 216 if dataset_id: 217 self.i3configdb.upload_metadata(dataset_id,difplus) 218 #if steering.GetTaskDefinitions(): # DAG 219 # self.i3monitordb.InitializeGridStatsDAG(steering,dataset_id) 220 #else: # Non DAG 221 # self.i3monitordb.InitializeGridStats(grids,dataset_id) 222 self.i3monitordb.InitializeGridStats(grids,dataset_id) 223 224 # update storage information 225 self.i3configdb.SetStorageURL(dataset_id,steering) 226 227 initial_state = 'WAITING' 228 if steering.GetJobDependencies(): # This dataset's jobs depend on others 229 initial_state = 'IDLE' 230 if maxjobs > 0: 231 self.i3monitordb.InitializeJobTable(maxjobs,dataset_id,prio,status=initial_state) 232 if offlinefilt: # load directory dictionary for offline filtering 233 self.i3configdb.load_filelist(offlinefilt.GetFileDict(),dataset_id) 234 else: 235 raise Exception, "Failed to upload dataset" 236 237 cookie = I3Cookie() 238 cookie.dataset_id = dataset_id 239 240 if self.ssl: 241 cookie.url = "https://%s:%s" % (self.config.get('server','server'),self.config.get('server','port')) 242 else: 243 cookie.url = "http://%s:%s" % (self.config.get('server','server'),self.config.get('server','port')) 244 245 status = "dataset has been enqueued with id %d" % dataset_id 246 return status,cookie
247 248
249 - def mkdataset(self,start=0,end=0,dataset=0):
250 """ 251 Create a list of 'size' i3Job objects 252 """ 253 job_list = [] 254 for i in range(start,end): 255 job = i3Job() 256 job.SetProcNum(i) 257 job.SetDatasetId(dataset) 258 job_list.append(job) 259 return {dataset:job_list}
260 261
262 - def Submit(self,steering=None,production=False,first=0,last=0,npid=None):
263 """ 264 Submit and monitor jobs 265 """ 266 starttime = time.time() 267 cookie = I3Cookie() 268 cookie.dataset_id = 0 269 difplus = DIF_Plus() 270 cluster = plugins.get_plugin(self.batchsys)() 271 #from plugins import dag 272 #cluster = dag.TaskQ( plugins.get_plugin(self.batchsys)() ) 273 nonproddb = dict() 274 fifo = self.config.getboolean('queue','fifo') 275 cluster_size = self.config.getint('queue','jobclustering') 276 277 if production: 278 self.i3monitordb.connect() 279 #job_list = self.i3monitordb.QueueJobs(self.maxjobs,self.grid_id,self.jobs_at_once,fifo) 280 job_list = cluster.QueueJobs(self.i3monitordb,self.maxjobs,self.grid_id,self.jobs_at_once,fifo) 281 #job_list += cluster.QueueTasks(self.i3monitordb,self.maxjobs,self.grid_id,self.jobs_at_once,fifo) 282 job_name = self.config.get('queue','jobprefix') 283 else: 284 job_name = "ipnp" 285 try: 286 import bsddb 287 nonproddb_location = self.config.get('database','non-production') 288 nonproddb = bsddb.btopen(nonproddb_location, 'c') 289 except: pass 290 291 if npid: # has a non-production id 292 cookie = cPickle.loads(nonproddb[str(npid)]) 293 elif nonproddb.keys(): 294 npid = int(nonproddb.last()[0]) + 1 295 else: 296 npid = 1 297 cookie.dataset_id = npid 298 299 pmaxjobs = steering.GetParameter('MAXJOBS') 300 maxjobs = int(pmaxjobs.GetValue()) 301 if last == 0: last = maxjobs 302 job_list = self.mkdataset(start=first,end=last,dataset=npid) # single dataset 303 steering.AddDIFPlus(self.MakeMetadata(difplus,0,simcat="Non-Production")) 304 305 # Include proxy for globus authentication 306 # --- This will probably change once we have played around with 307 # --- gridFTP some more. 308 globus_proxy = None 309 globus_libs = None 310 if self.config.has_section('globus') and self.config.getboolean('globus','useproxy'): 311 globus_proxy = expandvars(self.config.get('globus','proxy')) 312 if self.config.has_option('globus','libs'): 313 globus_libs = self.config.get('globus','libs') 314 globus_location = self.config.get('globus','job_globus_location') 315 gpass = self.config.get('globus','passwd',raw=True) 316 runtime = self.config.getint('queue','max_job_processing_time') 317 runtime += self.config.getint('queue','max_job_idle_time') 318 logger.info("checking proxyfile...") 319 if not self.CheckProxy(globus_proxy,runtime): 320 logger.info("proxy %s will likely expire before job completes" % globus_proxy) 321 logger.info("generating new proxyfile.") 322 self.MakeProxy(globus_proxy,runtime*2,gpass) 323 324 status = "" 325 for dataset_id in job_list.keys(): 326 logger.info('queueing %d jobs for dataset %d' % (len(job_list[dataset_id]),dataset_id)) 327 if production: 328 steering = self.i3configdb.download_config(dataset_id) 329 difplus = self.i3configdb.download_metadata(dataset_id) 330 steering.AddDIFPlus(difplus) 331 332 # Determine the requested batch system and build submission request 333 # Call constructor for plugin 334 cluster = plugins.get_plugin(self.batchsys)() 335 #cluster = dag.TaskQ( plugins.get_plugin(self.batchsys)() ) 336 if not cluster: 337 logger.error("%s not yet implemented" % self.batchsys) 338 raise UnsupportedSystemException, self.batchsys 339 340 cluster.SetMonitorDB(self.i3monitordb) 341 342 # Batch System options 343 # first set any bathsystem options from the server configuration 344 if self.config.has_section('batchopts'): 345 for o in self.config.options('batchopts'): 346 cluster.AddParam(o,self.config.get('batchopts',o)) 347 cluster.SetSteering(steering) 348 349 lib_url = self.config.get('path','lib_url') 350 steering.AddSysOpt(SysOpt("lib_url",lib_url)) 351 352 dtdfile = os.path.join(self.GetRootDir(),"shared","iceprod.v2.dtd") 353 xmluri = 'file:iceprod.v2.dtd' 354 try: 355 xmluri = self.config.get('path','uri') 356 except: pass 357 358 cluster.AddArgOption("grid",self.grid_id) 359 cluster.AddArgOption("lib",iceprod.zipfile()+'.zip') 360 target = self.config.get('path','target_url',raw=True) 361 steering.AddSysOpt(SysOpt("targeturl",target)) 362 #cluster.AddArgOption("target",target) 363 364 # Job will automatically move data and set status 365 stageout = self.config.getboolean('queue','jobstageout') 366 steering.AddSysOpt(SysOpt("stageout",str(stageout))) 367 cluster.AddArgOption("stageout",int(stageout)) 368 369 # Is python zipsafe (can it read zipped modules)? 370 zipsafe = self.config.getboolean('queue','zipsafe') 371 steering.AddSysOpt(SysOpt("zipsafe",str(zipsafe))) 372 if not zipsafe: 373 cluster.AddArgOption("zipsafe",0) 374 375 # need a place to store temporary output for multi-job execution 376 if self.config.has_option('path','dagtemp'): 377 dagtemp = self.config.get('path','dagtemp',raw=True) 378 if dagtemp: 379 steering.AddSysOpt(SysOpt("dagtemp",dagtemp)) 380 #cluster.AddArgOption("dagtemp",dagtemp) 381 382 # data will be copied back to submit node 383 nocopy = False 384 if self.config.has_option('queue','nocopy') and self.config.getboolean('queue','nocopy'): 385 nocopy = True 386 cluster.AddArgOption("nocopy",int(nocopy)) 387 steering.AddSysOpt(SysOpt("nocopy",str(nocopy))) 388 389 if self.config.getboolean('system','validatexml'): 390 cluster.AddArgOption("validate",1) 391 else: 392 cluster.AddArgOption("validate",0) 393 394 # tar output from job? 395 if self.config.has_option('path','tar_output') and self.config.getboolean('path','tar_output'): 396 cluster.AddArgOption("mktar") 397 398 # cache directory on nodes 399 if self.config.has_option('path','cache'): 400 #cluster.AddArgOption("cache",self.config.get('path','cache')) 401 steering.AddSysOpt(SysOpt("cache",self.config.get('path','cache'))) 402 403 # periodically update server of job status 404 cluster.AddArgOption("ping",self.config.getint('queue','ping_interval')) 405 406 # If specified, jobs will force a platform instead of auto 407 # detecting it. This is useful for homogeneous clusters where 408 # nodes don't have gcc. 409 if self.config.has_option('environment','platform'): 410 platform = self.config.get('environment','platform') 411 if platform and platform != 'system': 412 cluster.AddArgOption("platform",platform) 413 steering.AddSysOpt(SysOpt("platform",platform)) 414 415 # Add system information 416 steering.AddSysOpt(SysOpt("gridname",self.grid)) 417 steering.AddSysOpt(SysOpt("gridid",self.grid_id)) 418 419 # Tell job where to find the python environment 420 pyhome = self.config.get('environment','pythonhome') 421 cluster.AddEnv("PYROOT",pyhome) 422 if self.config.has_option('environment','pythonpath'): 423 pythonpath = self.config.get('environment','pythonpath') 424 cluster.AddEnv("PYTHONPATH",pythonpath) 425 426 # Tell job where to find the ROOT environment 427 if self.config.has_option('environment','rootsys'): 428 rootsys = self.config.get('environment','rootsys') 429 cluster.AddEnv("ROOTSYS",rootsys) 430 steering.AddSysOpt(SysOpt("rootsys",rootsys)) 431 432 # Tell job where to find the photon tables 433 if self.config.has_option('environment','photontablesdir'): 434 photontablesdir = self.config.get('environment','photontablesdir') 435 photontables = os.path.join(photontablesdir,'tables') 436 cluster.AddEnv("PHOTONTABLES",photontables) 437 cluster.AddEnv("PHOTON_TABLES_DIR",photontablesdir) 438 439 # Add any server configuired parameters that might be used in 440 # the configuration 441 for o in self.config.options('system'): 442 steering.AddSysOpt(SysOpt(o,self.config.get('system',o))) 443 444 # Include the iceprod libs as a zip archive 445 corelibs = os.path.join(self.GetRootDir(),"shared",iceprod.zipfile()) 446 if not os.path.exists(corelibs+'.zip'): 447 libdir = os.path.join(self.GetRootDir(),"lib") 448 iceprod.mktar(libdir,'iceprod/__init__.py',corelibs) 449 iceprod.mktar(libdir,'iceprod/core',corelibs,'a') 450 iceprod.mktar(libdir,'iceprod/modules',corelibs,'a') 451 452 # Tell job where to find its java environment 453 if self.config.has_option('environment','java'): 454 javahome = self.config.get('environment','java') 455 cluster.AddEnv("JAVA_HOME",javahome) 456 steering.AddSysOpt(SysOpt("javahome",javahome)) 457 458 # Tell job where to write temporary files 459 if self.config.has_option('environment','scratch'): 460 scratchdir = self.config.get('environment','scratch') 461 else: 462 scratchdir = '$PWD' 463 cluster.AddEnv("I3SCRATCH",scratchdir) 464 steering.AddSysOpt(SysOpt("scratch",scratchdir)) 465 466 # Only set the X509_USER_PROXY variable if this is not done 467 # automatically by system 468 steering.AddSysOpt(SysOpt("proxy_delegate", self.config.get('globus','delegate'))) 469 if globus_proxy: 470 if self.config.getboolean('globus','delegate'): 471 cluster.AddEnv("X509_USER_PROXY",os.path.basename(globus_proxy)) 472 steering.AddSysOpt(SysOpt("globus_proxy",os.path.basename(globus_proxy))) 473 else: 474 steering.AddSysOpt(SysOpt("globus_proxy",'$X509_USER_PROXY')) 475 steering.AddSysOpt(SysOpt("globus_location",globus_location)) 476 477 # modify all gridftp calls to be local if necessary 478 if self.config.has_option('system','gridftp_local'): 479 try: 480 for p in steering.GetParameters(): 481 v = p.GetValue() 482 if 'gsiftp:' in v: 483 # modify paramter 484 pos = v.find('gsiftp') 485 pos2 = v.find('/data/sim') 486 if pos > 0 and pos2 > 0: 487 p.SetValue(v[:pos]+'file:'+v[pos2:]) 488 steering.AddParameter(p) 489 elif 'http://x2100' in v: 490 # modify paramter 491 pos = v.find('http://x2100') 492 if 'downloads' in v: 493 pos2 = v.find('downloads') 494 if pos2 > 0: 495 p.SetValue(v[:pos]+'file:/data/sim/sim-new/'+v[pos2:]) 496 steering.AddParameter(p) 497 elif 'svn' in v: 498 pos2 = v.find('svn') 499 if pos2 > 0: 500 p.SetValue(v[:pos]+'file:/data/sim/sim-new/'+v[pos2:]) 501 steering.AddParameter(p) 502 503 for i,t in enumerate(steering.trays): 504 for m in t.iceprodpre: 505 for p in t.iceprodpre[m].GetParameters(): 506 if p.GetType() != 'string': 507 continue 508 v = p.GetValue() 509 setval = False 510 if isinstance(v,Value): 511 v = v.GetValue() 512 setval = True 513 logger.info('%r,%r,%r', p, p.GetType(), v) 514 if not isinstance(v,str): 515 logger.warning('param not a string') 516 continue 517 modify = False 518 if 'gsiftp:' in v: 519 # modify paramter 520 pos = v.find('gsiftp') 521 pos2 = v.find('/data/sim') 522 if pos >= 0 and pos2 >= 0: 523 v = v[:pos]+'file:'+v[pos2:] 524 modify = True 525 elif 'http://x2100' in v: 526 # modify paramter 527 pos = v.find('http://x2100') 528 if 'downloads' in v: 529 pos2 = v.find('downloads') 530 if pos2 >= 0: 531 v = v[:pos]+'file:/data/sim/sim-new/'+v[pos2:] 532 modify = True 533 elif 'svn' in v: 534 pos2 = v.find('svn') 535 if pos2 >= 0: 536 v = v[:pos]+'file:/data/sim/sim-new/'+v[pos2:] 537 modify = True 538 if modify: 539 if setval: 540 v = Value(v) 541 p.SetValue(v) 542 t.iceprodpre[m].AddParameter(p) 543 except Exception,e: 544 logger.error('error processing gridftp_local: %r',e,exc_info=True) 545 546 # write XML config file to submit directory 547 if production: 548 cluster.SetInitialdir(os.path.join(self.rundir, str(dataset_id))) 549 else: 550 cluster.SetInitialdir(os.path.join(self.rundir,'non-production',str(dataset_id))) 551 inidir = os.path.join(cluster.GetInitialdir(),'iceprod_%f' % time.time()) 552 self.MakeDir(cluster.GetInitialdir()) 553 xcfg = os.path.join(cluster.GetInitialdir(),'config.xml') 554 writer = IceTrayXMLWriter(steering,xmluri) 555 if self.config.has_section('substitutions'): 556 substitutions = {} 557 for o in self.config.options('substitutions'): 558 substitutions[o] =self.config.get('substitutions',o) 559 writer.AddSubstitutions(substitutions) 560 writer.write_to_file(xcfg) 561 steeringfile = xcfg 562 563 # check timer for caching svn modules 564 cachemod = starttime + 240 > time.time() 565 566 # set wgetrc 567 if self.config.has_option('path','wgetrc'): 568 wgetrc = expandvars(self.config.get('path','wgetrc',raw=True)) 569 else: 570 wgetrc = os.path.join(self.GetRootDir(),"etc","wgetrc") 571 if not os.path.exists(wgetrc): 572 wgetrc = os.path.join(self.GetRootDir(),"shared","wgetrc") 573 logger.info('wgetrc '+wgetrc) 574 wgetrcoptions = [] 575 try: 576 if os.path.isfile(wgetrc): 577 wgetrcfile = open(wgetrc,'r') 578 for line in wgetrcfile.readlines(): 579 wgetrctmp = line.split('=') 580 if len(wgetrctmp) < 2: 581 continue 582 wgetrcfirst = wgetrctmp[0].strip().lower() 583 wgetrcsecond = wgetrctmp[1].strip() 584 if wgetrcfirst in ('http_user','httpuser'): 585 wgetrcoptions.append('http-user='+wgetrcsecond) 586 elif wgetrcfirst in ('http_passwd','httppasswd'): 587 wgetrcoptions.append('http-password='+wgetrcsecond) 588 logger.info('wgetrcoptions: '+str(wgetrcoptions)) 589 wgetrcfile.close() 590 except Exception, e: 591 logger.info('wgetrc options error: %s',str(e)) 592 593 # download modules needed 594 if self.config.getboolean('queue','stage_pymods'): 595 try: 596 pymods = self.GetIPModules(steering,cluster.GetInitialdir(),cachemod,wgetrcoptions) 597 except Exception,e: 598 logger.error("failed to submit jobs: %s" % str(e)) 599 if production: 600 for job in job_list[dataset_id]: 601 self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3, "failed to submit jobs "+str(e)) 602 continue # move on to next batch 603 604 for o in self.config.options('job-env'): 605 cluster.AddEnv(o.upper(),self.config.get('job-env',o)) 606 607 # Add jobs 608 logger.info("processing jobs for dataset %u" % dataset_id) 609 for job in job_list[dataset_id]: 610 # check that task dependencies are met 611 self.check_task_dependencies(job,steering) 612 613 job.name = job_name 614 if production: 615 inidir = os.path.join(cluster.GetInitialdir(),'iceprod_%d.%f' % (job.GetProcNum(),time.time())) 616 job.SetOutputURL(target) 617 job.AddInputFile(dtdfile) 618 619 # Add python modules as dependencies 620 if self.config.getboolean('queue','stage_pymods'): 621 for p in pymods: 622 job.AddInputFile(p) 623 624 if not production: 625 # Tell job where to download libs from 626 if steering.GetParameter('LIB_URL'): 627 lib_url = steering.GetParameter('LIB_URL').GetValue() 628 629 gcdbase = cluster.GetInitialdir() 630 else: 631 # write GeoCalibDetStat .i3 file to submit directory 632 gcdbase = os.path.join(self.rundir, str(job.GetDatasetId())) 633 gcdpath = os.path.join(gcdbase, "GeoCalibDetectorStatus_%06d.i3.gz" % job.GetDatasetId()) 634 i3db_params = "" 635 opts = { 636 'dataset':job.GetDatasetId(), 637 'nproc':int(steering.GetParameter('MAXJOBS').GetValue()), 638 'procnum':job.GetProcNum() 639 } 640 parser = ExpParser(opts,steering) 641 job.AddParser(parser) 642 job.AddSteering(steering) 643 for param in steering.GetParameters(): 644 if param.GetName().startswith("I3Db::"): 645 i3dbparamname = param.GetName().replace("I3Db::","") 646 i3dbval = parser.parse(param.GetValue()) 647 if param.GetName() == "I3Db::outfile": 648 i3dbval = os.path.join(gcdbase,i3dbval) 649 gcdpath = i3dbval 650 i3db_params += " --%s %s" % (i3dbparamname.lower(),i3dbval) 651 652 # if I3Db is configured in steering section but gcd file does not 653 # already exist, then create it. 654 if i3db_params and not os.path.exists(gcdpath): 655 if not self.config.has_option('path','i3dbclient'): 656 raise Exception, "i3dbclient is not configured." 657 i3db_gcd_client = self.config.get('path','i3dbclient') 658 basedir = self.config.get('path','basedir') 659 i3db_gcd_client = os.path.join(basedir, 'bin',i3db_gcd_client) 660 logger.info(i3db_gcd_client + " " + i3db_params) 661 662 if not os.path.exists(os.path.dirname(gcdpath)): 663 os.makedirs(os.path.dirname(gcdpath)) 664 if os.system(i3db_gcd_client + " " + i3db_params): 665 raise Exception, "Unable to fetch GCD" 666 if i3db_params: 667 job.AddInputFile(gcdpath) 668 669 job.SetSubmitHost(self.submithost) 670 job.SetInitialdir(inidir) 671 self.MakeDir(job.GetInitialdir()) 672 job.SetRootDir(self.GetRootDir()) 673 674 # Set the main executable of job and add core libs 675 job.SetExecutable( 676 os.path.join(self.GetRootDir(),"bin","i3exec.py")) 677 job.AddInputFile(corelibs+'.zip') 678 679 job.AddSteering(steering) 680 if not production: 681 url = steering.GetParameter('URL') 682 if url: cluster.SetURL(url.GetValue()) 683 684 job.SetLogFile( "%s.log" % job.Prefix() ) 685 job.SetOutputFile( "%s.out" % job.Prefix() ) 686 job.SetErrorFile( "%s.err" % job.Prefix() ) 687 688 ### See if this is a job initiated file tranfer #### 689 # Swegrid style data movement 690 if job.GetOutputURL() and job.GetOutputURL().startswith('auto:'): 691 job.AddArgOption("nocopy") 692 693 if self.config.getint('queue','sockettimeout'): 694 job.AddArgOption("timeout", self.config.getint('queue','sockettimeout')) 695 696 job.AddArgOption("procnum",job.GetProcNum()) 697 if steering.GetParameter('DEBUG'): 698 job.AddArgOption("debug",steering.GetParameter('DEBUG').GetValue()) 699 700 # Tell job where to download dependencies from 701 job.AddArgOption("fetch",lib_url) 702 703 # Include wgetrc file for http/ftp authentication 704 if os.path.exists(wgetrc): 705 job.AddInputFile(wgetrc) 706 job.AddEnv("WGETRC",basename(wgetrc)) 707 708 if globus_proxy and self.config.getboolean('globus','delegate'): 709 proxy_copy = basename(expandvars(globus_proxy)) 710 proxy_copy = os.path.join(job.GetInitialdir(),proxy_copy) 711 copy(expandvars(globus_proxy),proxy_copy) 712 job.AddInputFile(proxy_copy) 713 if globus_libs and expandvars(globus_libs): 714 job.AddInputFile(expandvars(globus_libs)) 715 716 job.AddEnv("SUBMITDIR",job.GetInitialdir()) 717 718 # Give job soapmon URL 719 if production: 720 moniurl = self.GetServerURL() 721 job.AddArgOption("url",moniurl) 722 job.AddArgOption("dataset",job.GetDatasetId()) 723 steeringfile = os.path.join(job.GetInitialdir(),os.path.basename(xcfg)) 724 os.system('cp %s %s' % (xcfg,steeringfile)) 725 logger.info("writting %s in %s"%(os.path.basename(xcfg),job.GetInitialdir())) 726 else: 727 pmaxjobs = steering.GetParameter('MAXJOBS') 728 maxjobs = int(pmaxjobs.GetValue()) 729 job.AddArgOption("nproc",maxjobs) 730 731 job.AddArgument(os.path.basename(steeringfile)) 732 job.AddInputFile(steeringfile) 733 734 # Get batchsystem specific options from database 735 for opt in steering.GetBatchOpts(): 736 logger.debug("%s = %s" % (opt.GetName(),opt.GetValue())) 737 if opt.GetType().lower() in [self.batchsys.lower(),'*',self.batchsys.split('.')[0].lower()]: 738 job.AddBatchOpt(opt.GetName(),opt.GetValue()) 739 740 # Set config file 741 file = os.path.join(job.GetInitialdir(),'%s.%s' % (job.Prefix(),cluster.Suffix())) 742 job.SetConfigFile(file) 743 744 # Add job to cluster submit list 745 cluster.PushJob(job) 746 747 # set production 748 cluster.SetProduction(production) 749 750 # Submit all jobs to cluster at once 751 cluster.Submit(cookie) 752 753 if self.ssl: 754 cookie.url = "https://%s:%s" % (self.config.get('server','server'),self.config.get('server','port')) 755 else: 756 cookie.url = "http://%s:%s" % (self.config.get('server','server'),self.config.get('server','port')) 757 758 if production: 759 self.i3monitordb.disconnect() 760 self.i3configdb.disconnect() 761 else: 762 try: 763 nonproddb[str(npid)] = cPickle.dumps(cookie) 764 nonproddb.sync() 765 except Exception,e: 766 logger.error(e) 767 768 status = "dataset %s:%d,%d enqueued" % (npid,first,last) 769 logger.info("Submit done") 770 return status,cookie
771
772 - def check_task_dependencies(self,job,steering):
773 """ 774 Check that dependencies are sane 775 """ 776 task_defs = steering.GetTaskDefinitions() 777 job_id = job.GetDatabaseId() 778 for taskname,td in task_defs.items(): 779 td_id = td.GetId() 780 trays = td.GetTrays() 781 for idx,tray in trays.items(): 782 for iter in tray.GetIters(): 783 if self.i3monitordb.task_is_finished(td_id, job_id, idx, iter): 784 for parent in td.GetParents(): # check dependencies 785 parent_td = steering.GetTaskDefinition(parent) 786 if not self.i3monitordb.task_is_finished(parent_td.GetId(), job_id): 787 tid = self.i3monitordb.get_task_id(td_id,job_id, idx, iter) 788 if tid: 789 logger.info("Resetting task %s" % taskname ) 790 self.i3monitordb.task_update_status(tid,'WAITING',job.GetArgOpt("key")) 791 continue # no need to check more dependencies
792
793 - def GetIPModules(self,steering,destination,cache=False,wgetrcoptions=[]):
794 """ 795 Fetch any files from svn which are needed by i3.IceTray 796 """ 797 pymods = [] 798 799 opts = { 800 'dataset': int(steering.GetParentId()), 801 'nproc' : int(steering.GetParameter('MAXJOBS').GetValue()), 802 'procnum': 0 803 } 804 parser = ExpParser(opts,steering) 805 806 # get steering dependencies 807 for dependency in steering.GetDependencies(): 808 try: 809 if not isinstance(dependency,str): 810 dependency = dependency.GetPath() 811 url = parser.parse(dependency) 812 if (url.rsplit('.',1)[1]) not in ('py','c','cxx','cpp'): 813 logging.info('skipping %s',url) 814 continue # don't cache non-script files 815 816 logger.info("fetching %s",url) 817 if wget(url,destination,cache,wgetrcoptions): 818 raise Exception, "cannot retrieve file from %s" %url 819 pymod = os.path.join(destination,os.path.basename(url)) 820 if pymod not in pymods: 821 pymods.append(pymod) 822 except: 823 pass 824 825 # get module scripts 826 for tray in steering.GetTrays(): 827 for mod in tray.GetIceProdPres() + tray.GetIceProdPosts(): 828 mclass = mod.GetClass() 829 try: 830 if mclass in ("i3.IceTray","i3.Processing"): 831 if not mod.GetParameter('IPModuleURL'): 832 raise Exception, "cannot download icetray module without a URL!!!" 833 url = parser.parse(mod.GetParameter('IPModuleURL').GetValue().value) 834 835 if mod.GetParameter('IPModuleRevision'): 836 rev = parser.parse(mod.GetParameter('IPModuleRevision').GetValue().value) 837 838 logger.info("fetching %s" %url) 839 if wget(url,destination,cache,wgetrcoptions): 840 raise Exception, "cannot retrieve file from %s" %url 841 842 pymod = os.path.join(destination,os.path.basename(url)) 843 if pymod not in pymods: 844 pymods.append(pymod) 845 846 if mod.GetParameter("IPModuleDependencies"): 847 for dep in mod.GetParameter("IPModuleDependencies").GetValue(): 848 depurl = parser.parse(dep.value) 849 if not isurl(depurl): 850 depurl = parser.parse(os.path.join(os.path.dirname(url),dep.value)) 851 if wget(depurl,destination,cache,wgetrcoptions): 852 raise Exception, "Failed to retrieve i3filter from '%s'" % depurl 853 pymod = os.path.join(destination,os.path.basename(depurl)) 854 if pymod not in pymods: 855 pymods.append(pymod) 856 except Exception,e: 857 logger.error(e) 858 raise 859 860 return pymods
861 862
863 - def GetServerURL(self):
864 if self.config.has_option('monitoring','url'): 865 return self.config.get('monitoring','url') 866 867 monihost = self.config.get('monitoring','server') 868 moniport = self.config.get('monitoring','port') 869 870 # Check for network address translation (NAT) 871 # (i.e.) server address is different in compute node network 872 if self.config.has_option('monitoring','natserver'): 873 monihost = self.config.get('monitoring','natserver') 874 if self.config.has_option('monitoring','natport'): 875 moniport = self.config.get('monitoring','natport') 876 877 if self.ssl: 878 moniurl = "https://%s:%s" % (monihost,moniport) 879 else: 880 moniurl = "http://%s:%s" % (monihost,moniport) 881 return moniurl
882
883 - def GetPlatform(self):
884 """ 885 @return: a string representing the platform of the current system 886 in the IceTray format. 887 """ 888 889 os = platform.system().replace('Darwin','MacOSX') 890 arch = platform.machine() 891 arch = arch.replace('Power Macintosh','ppc') 892 arch = arch.replace('i686','i386') 893 return "%s-%s" % (os,arch)
894 895
896 - def SetRootDir(self,rootdir):
897 """ 898 Set the root directory where the iceprod software is stored 899 @param rootdir: directory path 900 """ 901 if rootdir: 902 self.rootdir = rootdir
903
904 - def GetRootDir(self):
905 """ 906 Get the root directory where the iceprod software is stored 907 """ 908 return self.rootdir
909
910 - def AddDefaultArchive(self,archive):
911 """ 912 Set the list of default archives that should be included with each 913 submission. 914 @param archive: archive or file needed by job (to be shipped with job) 915 """ 916 if archive: 917 self.default_archives.append(archive)
918 919 920
921 - def MakeMetadata(self,difplus,run_id,simcat=""):
922 """ 923 Log information about this dataset catalog dataset. 924 @param run_id: primary key for this job on configuration database 925 """ 926 dif = difplus.GetDIF() 927 dif.SetEntryTitle('IceProd NonProduction Dataset') 928 dif.SetSummary('IceProd NonProduction Dataset') 929 dif.SetParameters("SPACE SCIENCE > Astrophysics > Neutrinos") 930 dif.GetDataCenter().AddPersonnel(metadata.Personnel()) 931 932 plus = difplus.GetPlus() 933 plus.SetSimDBKey(run_id) 934 plus.SetSteeringFile('dataset_%06d_config.xml' % int(run_id)) 935 plus.SetStartDatetime(time.strftime("%Y-%m-%dT%H:%M:%S")) 936 plus.SetEndDatetime(time.strftime("%Y-%m-%dT%H:%M:%S")) 937 938 # We are no longer using the simdb_key as a subcategory 939 plus.SetCategory('generated') 940 plus.SetSubCategory(simcat) 941 return difplus
942 943
944 - def CheckProxy(self,proxy,runtime):
945 """ 946 Check validity of certificate proxy for gridFTP 947 @param proxy: path to proxy certificate 948 @param runtime: minimum validity time left on proxy 949 """ 950 globusdir = self.config.get('globus','directory') 951 globuslib = os.path.join(globusdir,'lib') 952 proxy_info = self.config.get('globus','proxy-info') 953 proxy_info_opts = '' 954 if self.config.has_option('globus','proxy-info-opts'): 955 proxy_info_opts = self.config.get('globus','proxy-info-opts') 956 ldlist = os.getenv('LD_LIBRARY_PATH').split(':') 957 if globuslib not in ldlist: 958 ldlist.append(globuslib) 959 os.putenv('LD_LIBRARY_PATH',expandvars(":".join(ldlist))) 960 os.putenv('GLOBUS_LOCATION',expandvars(globusdir)) 961 if os.path.exists(proxy): 962 executable = os.path.join(expandvars(globusdir),'bin',proxy_info) 963 if not os.path.exists(executable): 964 raise Exception, "executable %s does not exist" % executable 965 cmd = '%s %s -file %s -timeleft' % (executable,proxy_info_opts,proxy) 966 logger.debug(cmd) 967 output = os.popen(cmd) 968 969 # grid-proxy-info returns seconds 970 timeleft = int(output.read().strip())/60 971 logger.info("time left on proxy is %d min." % timeleft) 972 try: 973 output.close() 974 except: pass 975 return timeleft >= runtime 976 return False
977 978
979 - def MakeProxy(self,proxy,runtime,passwd):
980 """ 981 Check validity of certificate proxy for gridFTP 982 @param proxy: path to proxy certificate 983 @param runtime: minimum validity time left on proxy 984 """ 985 globusdir = self.config.get('globus','directory') 986 proxy_init = self.config.get('globus','proxy-init') 987 proxy_init_opts = '' 988 if self.config.has_option('globus','proxy-init-opts'): 989 proxy_init_opts = self.config.get('globus','proxy-init-opts') 990 if self.config.has_option('globus','certdir'): 991 proxy_init_opts += ' -certdir %s ' % self.config.get('globus','certdir') 992 globuslib = os.path.join(globusdir,'lib') 993 ldlist = os.getenv('LD_LIBRARY_PATH').split(':') 994 if globuslib not in ldlist: 995 ldlist.append(globuslib) 996 os.putenv('LD_LIBRARY_PATH',expandvars(":".join(ldlist))) 997 os.putenv('GLOBUS_LOCATION',expandvars(globusdir)) 998 executable = os.path.join(expandvars(globusdir),'bin',proxy_init) 999 cmd = '%s %s -pwstdin -valid %u:%u -out %s' % (executable,proxy_init_opts,runtime/60,runtime%60,proxy) 1000 logger.debug(cmd) 1001 try: 1002 from subprocess import Popen,PIPE 1003 except ImportError: 1004 from popen2 import Popen3 1005 p4 = Popen3(cmd,True) 1006 1007 p4.tochild.write(passwd) 1008 p4.tochild.close() 1009 1010 o = p4.fromchild.read() 1011 logger.info(o) 1012 p4.fromchild.close() 1013 1014 e = p4.childerr.read() 1015 logger.debug(e) 1016 p4.childerr.close() 1017 1018 while p4.poll() < 0: time.sleep(1) 1019 if p4.poll(): # return status 1020 logger.error(e) 1021 raise Exception, "unable to generate proxy file %s. %s returned %d" % (proxy,executable,p4.poll()) 1022 del p4 1023 1024 else: 1025 p4 = Popen(cmd,stdin=PIPE,stdout=PIPE,stderr=PIPE,close_fds=True, shell=True) 1026 o,e = p4.communicate(passwd) 1027 logger.info(o) 1028 logger.debug(e) 1029 if p4.returncode: # return status 1030 logger.error(e) 1031 raise Exception, "unable to generate proxy file %s. %s returned %d" % (proxy,executable,p4.poll()) 1032 del p4
1033