Package iceprod :: Package server :: Package plugins :: Module pbs
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.plugins.pbs

  1  #!/bin/env python 
  2  # 
  3   
  4  """ 
  5   A basic submitfile for submitting and monitoring jobs to PBS.  
  6   This module implements only a small subset of PBS features. 
  7   It's interface is like that of condor however. 
  8   (http://www.cs.wisc.edu/condor)  
  9   Inherits from i3Queue 
 10   
 11   copyright  (c) 2005 the icecube collaboration 
 12   
 13   @version: $Revision: $ 
 14   @date: $Date: $ 
 15   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 16   @todo: implement more functionality of pbs. 
 17  """ 
 18   
 19  import os 
 20  import re 
 21  import sys 
 22  import math 
 23  import random  
 24  import dircache 
 25  import time 
 26  import string 
 27  import logging 
 28  import os.path 
 29  import getpass 
 30  import commands 
 31  from os import popen2 
 32  from iceprod.core import metadata 
 33  from iceprod.core.lex import ExpParser 
 34  from iceprod.server.grid import iGrid 
 35  from iceprod.server.job import i3Job, i3Task 
 36   
 37  from os.path import expandvars 
 38  localdb = None 
 39   
 40          
 41   
 42   
 43  pbs_status = {'Q':'QUEUED', 'R':'PROCESSING', 'C':'FINISHED'} 
 44   
45 -def cstatus(istatus):
46 if pbs_status.has_key(istatus): 47 return pbs_status[istatus] 48 return 'FINISHED'
49
50 -class Pbs(iGrid):
51 """ 52 This class represents a job or cluster on a pbs system. 53 """ 54
55 - def __init__(self):
56 57 iGrid.__init__(self) 58 self.proc = 0 59 self.sleeptime = 6 60 self.enqueue_cmd = "qsub" 61 self.checkqueue_cmd = "qstat -u %s " % getpass.getuser() 62 self.queue_rm_cmd = "qdel" 63 self.suffix = "pbs" 64 self.logger = logging.getLogger('PBS')
65
66 - def _choose_queue(self,queue_list):
67 from random import choice 68 weighted_qlist = [] 69 for q in queue_list: 70 if len(q.split()) > 1: 71 queue,weight = q.split() 72 else: 73 queue,weight = q,1 74 try: 75 weight = int(weight) 76 except Exception,e: 77 self.logger.error("Exception: " + str(e)) 78 self.logger.warn("Unable to get queue weight for: " +q) 79 weight = 1 80 self.logger.debug("%s:%u " % (queue,weight)) 81 weighted_qlist.extend([queue]*weight) 82 return choice(weighted_qlist)
83
84 - def WriteConfig(self,job,config_file):
85 """ 86 Write pbs submit file to a file. 87 @param job: i3Job object 88 @param config_file: path to file were submit file will be written 89 """ 90 self.logger.debug('WriteConfig') 91 92 if not job.GetExecutable(): 93 raise Exception, "no executable configured" 94 95 submitfile = open("%s" % config_file,'w') 96 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile()) 97 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile()) 98 99 job.Write(submitfile,"#!/bin/sh") 100 job.Write(submitfile,"#PBS -o %s" % outfile ) 101 job.Write(submitfile,"#PBS -e %s" % errfile ) 102 103 # Add general batch options 104 queue_list = [] 105 for key in self.GetParamKeys(): 106 if not key.startswith("queue"): 107 job.Write(submitfile,"#PBS %s" % (self.GetParam(key))) 108 else: 109 queue_list.append(self.GetParam(key)[2:]) 110 111 if queue_list: 112 chosen_queue = self._choose_queue(queue_list) 113 job.Write(submitfile,"#PBS -q %s" % chosen_queue) 114 115 # Add job specific batch options 116 for key,opt in job.GetBatchOpts().items(): 117 job.Write(submitfile,"#PBS %s " % opt) 118 119 120 job.Write(submitfile, "export PBS_O_WORKDIR=%s",job.GetInitialdir()) 121 job.Write(submitfile, "ARCH=`uname -m | sed -e 's/Power Macintosh/ppc/ ; s/i686/i386/'`") 122 job.Write(submitfile, "PVER=`uname -r | awk '{split($1,a,\".\"); print a[2]}'`") 123 job.Write(submitfile, "if [ $ARCH == 'i386' ]; then",parse=False) 124 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False) 125 job.Write(submitfile, " PLATFORM=Linux-i386") 126 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False) 127 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-i386") 128 job.Write(submitfile, " fi") 129 job.Write(submitfile, "elif [ $ARCH == 'x86_64' ]; then",parse=False) 130 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False) 131 job.Write(submitfile, " PLATFORM=Linux-x86_64") 132 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False) 133 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-x86_64") 134 job.Write(submitfile, " fi") 135 job.Write(submitfile, "fi") 136 137 #Export environment variable,value pairs 138 for var in self.env.keys(): 139 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False) 140 for var in job.env.keys(): 141 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False) 142 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False) 143 job.Write(submitfile,"unset I3SIMPRODPATH") 144 145 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 146 job.Write(submitfile,"mkdir -p $RUNDIR") 147 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 148 149 for file in job.GetInputFiles()+[job.GetExecutable()]: 150 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False) 151 152 job.Write(submitfile,"cd $RUNDIR",parse=False) 153 154 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 155 argstr = job.GetMainScript() + " " + " ".join(argopts) 156 executable = os.path.basename(job.GetExecutable()) 157 job.Write(submitfile, 158 "$PYROOT/bin/python %s %s 1>%s.tmp 2>%s.tmp" % (executable,argstr,outfile,errfile), 159 parse=False) 160 job.Write(submitfile, "cat %s.tmp>&1" % outfile, parse=False) 161 job.Write(submitfile, "cat %s.tmp>&2" % errfile, parse=False) 162 163 job.Write(submitfile,"rm -f wgetrc" ) 164 165 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'): 166 job.Write(submitfile,"for file in *; do") 167 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 168 job.Write(submitfile," mv $file %s" % job.GetInitialdir()) 169 job.Write(submitfile," fi; done") 170 171 job.Write(submitfile,"#clean directory") 172 job.Write(submitfile,"cd /tmp") 173 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False) 174 job.Write(submitfile,"exit 0") 175 176 submitfile.close()
177 178
179 - def get_id(self,submit_status):
180 """ 181 Parse string returned by condor on submission to extract the 182 id of the job cluster 183 184 @param submit_status: string returned by condor_submit 185 """ 186 matches = re.findall(r'[0-9]+\.[0-9a-zA-Z_\-]*', submit_status) 187 self.logger.debug(submit_status) 188 if matches: 189 cluster_info = matches[0].split('.') 190 job_id = cluster_info[0] 191 192 self.job_ids.append(job_id) 193 return job_id 194 else: 195 self.logger.warn('could not parse job id from "%s"' % submit_status) 196 return -1
197
198 - def CheckJobStatus(self,jobs):
199 """ 200 Querie status of job on condor queue 201 """ 202 if isinstance(jobs,list): 203 job_list = jobs 204 else: 205 job_list = [jobs] 206 for job in job_list: 207 job.SetStatus('?') 208 if job.GetJobId() < 0: continue 209 job_id = job.GetJobId() 210 cmd = "qstat -f %s " % job_id 211 status,output = commands.getstatusoutput(cmd) 212 if status in [153,39168]: 213 job.SetStatus('FINISHED') 214 elif status: 215 job.SetStatus('?') 216 self.logger.error("%s: %s: %s" % (cmd,status,output)) 217 else: 218 for line in output.split('\n'): # skip first two lines 219 line = line.strip() 220 if line.startswith('job_state'): 221 status = cstatus(line.split('=')[1].strip()) 222 job.SetStatus(status) 223 if line.startswith('exec_host'): 224 host = line.split('=')[1].strip() 225 job.SetHost(host) 226 return 1
227 228 229
230 - def CheckQ(self,db=None):
231 """ 232 Querie status of cluster or job on condor queue 233 """ 234 235 cmd = self.checkqueue_cmd 236 for id in self.job_ids: 237 cmd += " %s" % id 238 status,output = commands.getstatusoutput(cmd) 239 return output
240 241
242 - def CleanQ(self,jobs=None):
243 """ 244 Querie status of cluster or job on condor queue 245 """ 246 247 if not jobs: return 0 248 249 if isinstance(jobs,list): 250 job_list = jobs 251 else: 252 job_list = [jobs] 253 254 job_dict = dict() 255 for job in job_list: 256 for job_id in str(job.GetJobId()).split(":"): 257 job_dict[job_id] = job 258 259 cmd = self.checkqueue_cmd + " | grep iceprod" 260 status,output = commands.getstatusoutput(cmd) 261 if not status: 262 for line in output.split('\n'): 263 if line.strip() == '': 264 continue 265 try: 266 tok = line.split() 267 jobId = tok[0].split(".")[0] 268 user = tok[1] 269 queue = tok[2] 270 executable = tok[3] 271 sid = tok[4] 272 nds = tok[5] 273 tsk = tok[6] 274 memory = tok[7] 275 runtime = tok[8] 276 jobStatus = tok[9] 277 if executable.startswith("iceprod."): 278 if not job_dict.has_key(jobId): 279 self.logger.warn("removing job %s with status %s. Reason: job not found in list" % \ 280 (jobId,cstatus(jobStatus))) 281 self.logger.debug("job list [%s]" % str(job_dict.keys())) 282 os.system("%s %s" % (self.queue_rm_cmd,jobId)) 283 except Exception,e: 284 self.logger.error("%s:%s" %(e,line)) 285 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 286 else: 287 self.logger.error(cmd+": "+output) 288 return status
289
290 - def QRemove(self,job):
291 """ 292 Remove cluster or job from queue 293 """ 294 if not jobid: return "Unknown jobid. Cannot remove job." 295 if isinstance(job,i3Job): 296 cmd = "%s %s" % (self.queue_rm_cmd,job.GetJobId()) 297 else: 298 cmd = "%s %s" % (self.queue_rm_cmd,job) 299 self.logger.info(cmd) 300 301 handle = os.popen(cmd, 'r') 302 status = string.join(handle.readlines()) 303 self.logger.info(status) 304 handle.close() 305 return status
306
307 -class PBSDag(Pbs):
308 """ 309 This class represents a job that executes in multiple parts using a DAG. 310 """ 311
312 - def __init__(self):
313 Pbs.__init__(self) 314 self.enqueue_cmd = "/bin/bash" 315 self.localdb = None 316 self.logger = logging.getLogger('PbsDAG') 317 try: 318 import sqlite3 319 self.localdb = sqlite3.connect(expandvars("$I3PROD/shared/localqueue.db")) 320 except: 321 self.logger.error("sqlite3 missing. will try sqlite.") 322 try: 323 import sqlite 324 self.localdb = sqlite.connect(expandvars("$I3PROD/shared/localqueue.db")) 325 except: 326 self.logger.error("sqlite missing. won't try to mantain queue sanity") 327 if self.localdb: 328 cursor = self.localdb.cursor() 329 try: 330 cursor.execute('CREATE TABLE IF NOT EXISTS queue (parent_id VARCHAR(80), child_id VARCHAR(80), status VARCHAR(80))') 331 except Exception,e: 332 self.logger.error(e) 333 else: 334 self.localdb.commit() 335 cursor.close()
336
337 - def __del__(self):
338 if self.localdb: 339 self.localdb.close()
340 341
342 - def GetLogs(self,job):
343 """ 344 Read output logs from job 345 """ 346 status,output = commands.getstatusoutput("tail -n 100 %s" % job.GetLogFile().replace(".log","*.log")) 347 job.log = output 348 status,output = commands.getstatusoutput("tail -n 100 %s" % job.GetLogFile().replace(".err","*.err")) 349 job.err = output 350 status,output = commands.getstatusoutput("tail -n 100 %s" % job.GetLogFile().replace(".out","*.out")) 351 job.out = output
352
353 - def WriteConfig(self,job,config_file):
354 """ 355 Write condor submit file to a file. 356 @param job: i3Job object 357 @param config_file: path to file where submit file will be written 358 """ 359 360 if not job.GetExecutable(): 361 raise Exception, "no executable configured" 362 363 from iceprod.core.dataclasses import IceTrayConfig 364 365 db = self.GetMonitorDB() 366 367 steering = job.GetSteering() 368 task_defs = steering.GetTaskDefinitions() 369 self.logger.debug("Task definitions: %s" % task_defs) 370 if not len(task_defs): 371 # not a DAG 372 self.logger.debug("No tasks specified in config file; doing regular submit") 373 self.enqueue_cmd = "qsub" 374 return PBS.WriteConfig(self, job, config_file) 375 376 dagfile = open(config_file,'w') 377 job.Write(dagfile, "#!/bin/bash") 378 job_id = job.GetDatabaseId() 379 job.dag = dict() # create dendency tree 380 381 file_catalog = {} 382 for taskname,td in task_defs.items(): 383 args = self.GetArguments(job,td,output="dict") 384 file_catalog[taskname] = self.GetFiles(job,td,args) 385 job.dag[taskname] = None 386 387 for taskname,td in task_defs.items(): 388 filename = config_file.replace('.pbs',".%s.pbs" % taskname) 389 td_id = td.GetId() 390 self.logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, filename)) 391 tmp_parents = self.EnumerateParentNodes(steering,td) 392 parents = [] 393 # only add parent if is is not completed 394 for parent in tmp_parents: 395 parent_td = steering.GetTaskDefinition(parent) 396 if not db.task_is_finished(parent_td.GetId(), job.GetDatabaseId()): 397 self.logger.error("task id %s, '%s' job id %s" % (parent_td.GetId(),parent_td.GetName(),job.GetDatabaseId())) 398 parents.append(parent) 399 job.dag[taskname] = parents 400 if td.ParallelExecutionEnabled(): 401 trays = td.GetTrays() 402 for idx,tray in trays.items(): 403 for iter in tray.GetIters(): 404 if not iter == -1: 405 nodename = "%s_%u" % (taskname,iter) 406 else: 407 nodename = "%s_ext" % taskname 408 filename = config_file.replace('.pbs',".%s.pbs" % nodename) 409 done = db.task_is_finished(td_id, job_id, idx, iter) 410 args = self.GetArguments(job,td,idx,iter,output="dict") 411 412 input,output,notes = self.GetFiles(job,td,args,idx,iter,file_catalog) 413 self.WriteFileManifest(job,filename,input,output,notes) 414 self.WriteSubmitFile(job,filename,td,idx,iter) 415 self.WriteDAGNode(job,dagfile,nodename,filename,parents,done) 416 else: 417 done = db.task_is_finished(td_id, job_id) 418 if taskname == 'trashcan': 419 done = False 420 args = self.GetArguments(job,td,output="dict") 421 422 input,output,notes = self.GetFiles(job,td,args,catalog=file_catalog) 423 self.WriteFileManifest(job,filename,input,output,notes) 424 self.WriteSubmitFile(job,filename,td) 425 self.WriteDAGNode(job,dagfile,taskname,filename,parents,done) 426 db.commit() 427 428 self.FinishDAGNode(job,dagfile)
429
430 - def IsUrl(self, path):
431 return bool(re.match("[^/:]+://?.*$", path))
432
433 - def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False):
434 steering = job.GetSteering() 435 parser = ExpParser(args,steering) 436 if idx is not False: 437 td_trays = {idx: td.GetTray(idx)} 438 else: 439 td_trays = td.GetTrays() 440 441 if steering.GetSysOpt("dagtemp"): 442 tmp_dir = parser.parse(steering.GetSysOpt("dagtemp").GetValue()) 443 elif args.has_key('dagtemp'): 444 tmp_dir = parser.parse(args['dagtemp']) 445 else: 446 tmp_dir = 'file:///tmp' # TODO: give this a sane default or make it an error 447 448 if args.has_key('fetch'): 449 global_dir = args['fetch'] 450 else: 451 global_dir = 'file:///tmp' 452 453 td_input = {} 454 td_output = {} 455 notes = {} 456 457 if td.IsCleanup(): 458 # cleanup job has no input/output files required 459 return (td_input, td_output, notes) 460 461 for idx, td_tray in td_trays.iteritems(): 462 args['tray'] = idx 463 464 self.logger.info("GetTray(%s)" % idx) 465 icetray = steering.GetTray(idx) 466 467 input_files = icetray.GetInputFiles() 468 parsed_input = [] 469 470 output_files = icetray.GetOutputFiles() 471 parsed_output = [] 472 473 if iter is not False: 474 iters = [iter] 475 else: 476 iters = td_tray.GetIters() 477 for iter in iters: 478 args['iter'] = iter 479 for d in steering.GetDependencies(): 480 d_file = parser.parse(d) 481 if not td_input.has_key(d_file): 482 location = d_file 483 if not self.IsUrl(location): 484 location = os.path.join(global_dir, location) 485 td_input[os.path.basename(d_file)] = [location] 486 for i_file in input_files: 487 name = i_file.GetName() 488 name = parser.parse(name) 489 if not td_input.has_key(name) \ 490 and not td_output.has_key(name): 491 if catalog: 492 node = self.FindFile(steering,td,catalog,name) 493 else: 494 node = td.GetName() 495 496 note = False 497 if node == "global": 498 location = global_dir 499 note = "global" 500 if node != "global": 501 location = tmp_dir 502 location = os.path.join(location, str(job.GetDatasetId())) 503 location = os.path.join(location, str(job.GetProcNum())) 504 location = os.path.join(location, node) 505 note = "dontextract" 506 location = os.path.join(location, name) 507 if i_file.IsPhotonicsTable(): 508 note = "photonics"; 509 if note: 510 notes[name] = note 511 td_input[name] = [location] 512 for o_file in output_files: 513 name = o_file.GetName() 514 name = parser.parse(name) 515 if not td_output.has_key(name): 516 location = os.path.join(tmp_dir, str(job.GetDatasetId())) 517 location = os.path.join(location, str(job.GetProcNum())) 518 location = os.path.join(location, str(td.GetName())) 519 location = os.path.join(location, name) 520 td_output[name] = [location] 521 522 return (td_input, td_output, notes)
523
524 - def FindFile(self,steering,td,catalog,file):
525 parents = td.GetParents() 526 527 # check immediate parents for this file 528 for parent in parents: 529 if catalog.has_key(parent): 530 if catalog[parent][1].has_key(file): 531 return parent 532 533 # check older ancestors 534 for parent in parents: 535 parent_td = steering.GetTaskDefinition(parent) 536 result = self.FindFile(steering,parent_td,catalog,file) 537 if result != "global": 538 return result 539 540 return "global"
541
542 - def WriteFileManifest(self,job,filename,input,output,notes):
543 self.logger.debug("Input files: %s" % input) 544 in_manifest = open(filename.replace(".pbs", ".input"), 'w') 545 job.manifest_in = in_manifest.name 546 if len(input): 547 padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 548 fmt_str = "%-" + padding + "s %s" 549 for i_file, locs in input.items(): 550 for loc in locs: 551 file = fmt_str % (loc, i_file) 552 if notes.has_key(i_file): 553 file += "\t%s" % notes[i_file] 554 job.Write(in_manifest, file) 555 in_manifest.close() 556 self.logger.debug("Output files: %s" % output) 557 out_manifest = open(filename.replace(".pbs", ".output"), 'w') 558 job.manifest_out = out_manifest.name 559 if len(output): 560 padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 561 fmt_str = "%-" + padding + "s %s" 562 for o_file, locs in output.items(): 563 for loc in locs: 564 job.Write(out_manifest, fmt_str % (o_file, loc)) 565 out_manifest.close()
566
567 - def WriteDAGNode(self,job,dagfile,nodename,filename,parents=None,done=False):
568 #str = "echo JOB %s %s" % (nodename,filename) 569 if done: return 570 job.Write(dagfile, "%s=`qsub \\" % (nodename)) 571 for parent in parents: 572 job.Write(dagfile, " -W depend=afterok:$%s\\" % (parent),parse=False) 573 job.Write(dagfile, " %s`" % (filename)) 574 job.Write(dagfile, 'if [ -z $ICEPROD_JOB_ID_LIST ]; then ',parse=False) 575 job.Write(dagfile, ' ICEPROD_JOB_ID_LIST="${%s}";' % (nodename),parse=False) 576 job.Write(dagfile, 'else ') 577 job.Write(dagfile, ' ICEPROD_JOB_ID_LIST="${ICEPROD_JOB_ID_LIST}:${%s}";' % (nodename),parse=False) 578 job.Write(dagfile, 'fi')
579
580 - def FinishDAGNode(self,job,dagfile):
581 job.Write(dagfile, 'echo "<job_id>${ICEPROD_JOB_ID_LIST}</job_id>";',parse=False) 582 job.Write(dagfile, "exit 0;") 583 dagfile.close()
584
585 - def get_id(self,submit_status):
586 """ 587 Parse string returned by pbs on submission to extract the 588 id of the job cluster 589 @param submit_status: string returned by condor_submit 590 """ 591 matches = re.findall(r'<job_id>.*</job_id>', submit_status) 592 self.logger.debug(submit_status) 593 if matches: 594 job_ids = map(lambda x: x.split('.')[0],matches[0].strip('<job_id>').strip('</job_id>').split(":")) 595 dag_id = job_ids[0] 596 cursor = self.localdb.cursor() 597 for job_id in job_ids: 598 sql = 'INSERT INTO `queue` (parent_id,child_id,status)' 599 sql += ' VALUES ("%s","%s","QUEUED") ' % (dag_id,job_id) 600 self.logger.info(sql) 601 cursor.execute(sql) 602 self.localdb.commit() 603 self.job_ids.append(dag_id) 604 return dag_id 605 else: 606 self.logger.warn('could not parse job id from "%s"' % submit_status) 607 return Pbs.get_id(self,submit_status) 608 return -1
609 610 611
612 - def EnumerateParentNodes(self,steering,td):
613 parents = td.GetParents() 614 parentnodes = [] 615 for parent in parents: 616 parentobj = steering.GetTaskDefinition(parent) 617 if parentobj.ParallelExecutionEnabled(): 618 for idx,tray in parentobj.GetTrays().items(): 619 for iter in tray.GetIters(): 620 if not iter == -1: 621 nodename = "%s_%u" % (parent,iter) 622 else: 623 nodename = "%s_ext" % parent 624 parentnodes.append(nodename) 625 else: 626 parentnodes.append(parent) 627 return parentnodes
628
629 - def GetArguments(self,job,td,idx=False,iter=False,output="str"):
630 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 631 argstr = " --dag --task=%s" % td.GetName() 632 if not idx is False: 633 argstr += " --tray=%s --iter=%s" % (idx,iter) 634 argstr += job.GetMainScript() + " " + " ".join(argopts) 635 if output == "dict": 636 argstr = self.ArgStrToDict(argstr) 637 return argstr
638
639 - def ArgStrToDict(self,argstr):
640 args = {} 641 for str in argstr.split(" "): 642 str = str[2:] 643 pieces = str.split("=") 644 if len(pieces) == 1: 645 args[str] = 1 646 else: 647 args[pieces[0]] = pieces[1] 648 return args
649
650 - def getAdditionalOptions(self,job):
651 return {}
652
653 - def WriteSubmitFile(self,job,filename,td,idx=False,iter=False):
654 steering = job.GetSteering() 655 args = self.GetArguments(job,td,idx,iter,output="dict") 656 parser = ExpParser(args,steering) 657 658 submitfile = open(filename,'w') 659 outfile = filename.replace('.pbs','.out') 660 errfile = filename.replace('.pbs','.err') 661 662 job.Write(submitfile,"#!/bin/sh") 663 job.Write(submitfile,"#PBS -o %s" % outfile ) 664 job.Write(submitfile,"#PBS -e %s" % errfile ) 665 666 # Add general batch options 667 668 queue_list = [] 669 for key in self.GetParamKeys(): 670 if not key.startswith("queue"): 671 job.Write(submitfile,"#PBS %s" % (self.GetParam(key))) 672 else: 673 queue_list.append(self.GetParam(key)[2:]) 674 675 td_batch_opts = td.GetBatchOpts() 676 self.logger.debug(td_batch_opts) 677 try: 678 td_batch_opts = parser.parse(td_batch_opts) 679 except: 680 self.logger.warn("could not parse %s" % td_batch_opts) 681 td_batch_opts = "" 682 if td_batch_opts: 683 for opt in map(string.strip,td_batch_opts.split(";")): 684 job.Write(submitfile,"#PBS %s" % opt) 685 if opt.startswith("-q"): queue_list = [] # overriding default queue 686 687 if queue_list: 688 chosen_queue = self._choose_queue(queue_list) 689 job.Write(submitfile,"#PBS -q %s" % chosen_queue) 690 691 # Add job specific batch options 692 for key,opt in job.GetBatchOpts().items(): 693 job.Write(submitfile,"#PBS %s " % opt) 694 695 job.Write(submitfile, "") 696 job.Write(submitfile, "ARCH=`uname -m | sed -e 's/Power Macintosh/ppc/ ; s/i686/i386/'`") 697 job.Write(submitfile, "PVER=`uname -r | awk '{split($1,a,\".\"); print a[2]}'`") 698 job.Write(submitfile, "if [ $ARCH == 'i386' ]; then",parse=False) 699 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False) 700 job.Write(submitfile, " PLATFORM=Linux-i386") 701 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False) 702 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-i386") 703 job.Write(submitfile, " fi") 704 job.Write(submitfile, "elif [ $ARCH == 'x86_64' ]; then",parse=False) 705 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False) 706 job.Write(submitfile, " PLATFORM=Linux-x86_64") 707 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False) 708 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-x86_64") 709 job.Write(submitfile, " fi") 710 job.Write(submitfile, "fi") 711 712 for var in self.env.keys(): 713 job.Write(submitfile, "export %s=%s" % (var, self.env[var]), parse=False) 714 for var in job.env.keys(): 715 job.Write(submitfile, "export %s=%s" % (var, job.env[var]), parse=False) 716 job.Write(submitfile, "RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 717 job.Write(submitfile, "mkdir -p $RUNDIR") 718 719 for file in job.GetInputFiles()+[job.GetExecutable()]: 720 job.Write(submitfile, "cp %s $RUNDIR" % file,parse=False) 721 722 # Need manifest files 723 job.Write(submitfile, "cp %s $RUNDIR" % job.manifest_in) 724 job.Write(submitfile, "cp %s $RUNDIR" % job.manifest_out) 725 726 job.Write(submitfile, "cd $RUNDIR",parse=False) 727 728 job.Write(submitfile, "echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 729 argstr = job.GetMainScript() + " " + self.GetArguments(job,td,idx,iter,output="string") 730 executable = os.path.basename(job.GetExecutable()) 731 job.Write(submitfile, 732 "$PYROOT/bin/python -u %s %s 1>%s.tmp 2>%s.tmp" % (executable,argstr,outfile,errfile), 733 parse=False) 734 job.Write(submitfile, "RETVAL=$?", parse=False) 735 736 job.Write(submitfile, "cat %s.tmp>&1" % outfile, parse=False) 737 job.Write(submitfile, "cat %s.tmp>&2" % errfile, parse=False) 738 job.Write(submitfile,"rm -f wgetrc" ) 739 740 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'): 741 job.Write(submitfile,"for file in *; do") 742 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 743 job.Write(submitfile," mv $file %s" % job.GetInitialdir()) 744 job.Write(submitfile," fi; done") 745 746 job.Write(submitfile,"#clean directory") 747 job.Write(submitfile,"cd /tmp") 748 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False) 749 job.Write(submitfile,"exit $RETVAL") 750 submitfile.close() 751 os.system("chmod +x %s" % submitfile.name)
752 753
754 - def CheckJobStatus(self,jobs):
755 """ 756 Querie status of job on PBS queue 757 """ 758 self.logger.info("checking jobs in queue") 759 if not self.localdb: return 1 760 761 if isinstance(jobs,list): 762 job_list = jobs 763 else: 764 job_list = [jobs] 765 for job in job_list: 766 job.SetStatus('?') 767 if not job.GetJobId(): continue 768 769 cursor = self.localdb.cursor() 770 sql = 'SELECT parent_id,child_id,status FROM `queue` WHERE parent_id = "%s" ' % job.GetJobId() 771 cursor.execute(sql) 772 773 jobentries = cursor.fetchall() 774 if len(jobentries) == 0: # make tuple object if not found in db 775 jobentries = [(job.GetJobId(),job.GetJobId(),'?')] 776 for dag_id,job_id, status in jobentries: 777 cmd = "qstat -f %s " % job_id 778 self.logger.debug(cmd) 779 status,output = commands.getstatusoutput(cmd) 780 if status: 781 self.logger.error("%s: %s: %s" % (cmd,status,output)) 782 if job.GetStatus() == '?': 783 job.SetStatus('FINISHED') 784 else: 785 for line in output.split('\n'): # skip first two lines 786 line = line.strip() 787 if line.startswith('job_state'): 788 jobstatus = cstatus(line.split('=')[1].strip()) 789 if jobstatus == 'PROCESSING' or job.GetStatus() not in ['PROCESSING','QUEUED']: 790 job.SetStatus(jobstatus) 791 if line.startswith('exec_host'): 792 host = line.split('=')[1].strip() 793 job.SetHost(host) 794 795 # clean up job db 796 if job.GetStatus() == "FINISHED": 797 cursor.execute('DELETE FROM queue WHERE parent_id = "%s"' % dag_id) 798 self.localdb.commit() 799 return 1
800
801 - def QRemove(self,job):
802 """ 803 Remove cluster or job from queue 804 """ 805 retval = 0 806 self.logger.info("checking jobs in queue") 807 if not self.localdb: 808 return Pbs.QRemove(self,job) 809 810 cursor = self.localdb.cursor() 811 sql = 'SELECT parent_id,child_id,status FROM `queue` WHERE parent_id = "%s" ' % job.job_id 812 cursor.execute(sql) 813 814 jobentries = cursor.fetchall() 815 if len(jobentries) == 0: # make tuple object if not found in db 816 jobentries = [(job.job_id,job.job_id,'?')] 817 for dag_id,job_id, status in jobentries: 818 task = i3Task() 819 task.job_id = job_id 820 retval = Pbs.QRemove(self,task) 821 822 cursor.execute('DELETE FROM queue WHERE parent_id = "%s"' % job.job_id) 823 self.localdb.commit() 824 return retval
825 826
827 - def CleanQ(self,jobs=None):
828 """ 829 Not implemeted for DAG yet 830 """ 831 if not self.localdb: 832 self.logger.warn("CleanQ: no local database found.") 833 return 0 834 else: 835 if not isinstance(jobs,list): jobs = [jobs] 836 cursor = self.localdb.cursor() 837 sql = 'SELECT child_id FROM `queue`' 838 cursor.execute(sql) 839 jobentries = cursor.fetchall() 840 for jobid in jobentries: 841 job = i3Job() 842 job.SetJobId(jobid[0]) 843 jobs.append(job) 844 return Pbs.CleanQ(self,jobs)
845 846 847
848 - def GetAdditionalOptions(self,job):
849 return {}
850 851 852 import dag 853
854 -class PbsI3Dag(dag.TaskQ,Pbs):
855 - def __init__(self):
856 Pbs.__init__(self) 857 dag.TaskQ.__init__(self) # inheritance resolution order is important!!!! 858 self.logger = logging.getLogger('PbsI3Dag') 859 self.suffix = "pbs"
860
861 - def Submit(self,cookie):
862 863 for job in self.jobs: 864 if isinstance(job,i3Task): 865 dag.TaskQ.SubmitTask(self,job) 866 self.logger.info("submitting task") 867 return Pbs.Submit(self,cookie)
868