Package iceprod :: Package server :: Package plugins :: Module sge
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.plugins.sge

  1  #!/bin/env python 
  2  # 
  3   
  4  """ 
  5   A basic submitfile for submitting and monitoring jobs to SGE.  
  6   This module implements only a small subset of SGE features. 
  7   It's interface is like that of condor however. 
  8   Inherits from i3Queue 
  9   
 10   copyright  (c) 2005 the icecube collaboration 
 11   
 12   @version: $Revision: $ 
 13   @date: $Date: $ 
 14   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 15   @todo: implement more functionality of sge. 
 16  """ 
 17   
 18  import os 
 19  import re 
 20  import sys 
 21  import math 
 22  import random  
 23  import dircache 
 24  import time 
 25  import string 
 26  import logging 
 27  import os.path 
 28  import getpass 
 29  import commands 
 30  from os import popen2 
 31  from iceprod.core import metadata 
 32  from iceprod.core.lex import ExpParser 
 33  from iceprod.server.grid import iGrid 
 34  from iceprod.server.job import i3Job, i3Task 
 35   
 36  from os.path import expandvars 
 37  localdb = None 
 38   
 39  sge_status = {'qw':'QUEUED', 'r':'PROCESSING', 'hqw': 'QUEUED'} 
 40   
41 -def cstatus(istatus):
42 if sge_status.has_key(istatus): 43 return sge_status[istatus] 44 return 'FINISHED'
45
46 -class SGE(iGrid):
47 """ 48 This class represents a job or cluster on a sge system. 49 """ 50
51 - def __init__(self):
52 53 iGrid.__init__(self) 54 self.proc = 0 55 self.sleeptime = 30 56 self.enqueue_cmd = "qsub" 57 self.checkqueue_cmd = "qstat -u %s " % getpass.getuser() 58 self.queue_rm_cmd = "qdel" 59 self.suffix = "sge" 60 self.logger = logging.getLogger('SGEDAG') 61 self.logger.debug('Made a SGE(iGrid)')
62 63
64 - def _choose_queue(self,queue_list):
65 from random import choice 66 weighted_qlist = [] 67 for q in queue_list: 68 if len(q.split()) > 1: 69 queue,weight = q.split() 70 else: 71 queue,weight = q,1 72 try: 73 weight = int(weight) 74 except Exception,e: 75 self.logger.error("Exception: " + str(e)) 76 self.logger.warn("Unable to get queue weight for: " +q) 77 weight = 1 78 self.logger.debug("%s:%u " % (queue,weight)) 79 weighted_qlist.extend([queue]*weight) 80 return choice(weighted_qlist)
81
82 - def WriteConfig(self,job,config_file):
83 """ 84 Write sge submit file to a file. 85 @param job: i3Job object 86 @param config_file: path to file were submit file will be written 87 """ 88 self.logger.debug('WriteConfig') 89 90 if not job.GetExecutable(): 91 raise Exception, "no executable configured" 92 93 submitfile = open("%s" % config_file,'w') 94 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile()) 95 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile()) 96 97 job.Write(submitfile,"#!/bin/sh") 98 self.logger.debug("#!/bin/sh") 99 100 job.Write(submitfile,"#$ -o %s" % outfile) 101 self.logger.debug("#$ -o %s" % outfile) 102 103 job.Write(submitfile,"#$ -e %s" % errfile ) 104 self.logger.debug("#$ -e %s" % errfile ) 105 106 # Add general batch options 107 queue_list = [] 108 for key in self.GetParamKeys(): 109 if not key.startswith("queue"): 110 job.Write(submitfile,"#$ %s" % self.GetParam(key),parse=True) 111 else: 112 queue_list.append(self.GetParam(key)[2:]) 113 114 if queue_list: 115 chosen_queue = self._choose_queue(queue_list) 116 job.Write(submitfile,"#$ -q %s" % chosen_queue) 117 118 # Add job specific batch options 119 for key,opt in job.GetBatchOpts().items(): 120 job.Write(submitfile,"#$ %s" % opt,parse=True) 121 self.logger.debug("#$ %s" % opt) 122 123 #Export environment variable,value pairs 124 for var in self.env.keys(): 125 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False) 126 for var in job.env.keys(): 127 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False) 128 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False) 129 job.Write(submitfile,"unset I3SIMPRODPATH") 130 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 131 job.Write(submitfile,"mkdir -p $RUNDIR",parse=False) 132 self.logger.debug("RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time())) 133 self.logger.debug("mkdir -p $RUNDIR") 134 135 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 136 self.logger.debug("echo \"running on $HOSTNAME:$RUNDIR\"") 137 138 self.logger.debug('%d' %len(job.GetInputFiles())) 139 for file in job.GetInputFiles()+[job.GetExecutable()]: 140 self.logger.debug('%s' %file) 141 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False) 142 job.Write(submitfile,"cd $RUNDIR",parse=False) 143 144 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 145 argstr = job.GetMainScript() + " " + " ".join(argopts) 146 executable = os.path.basename(job.GetExecutable()) 147 self.logger.debug('executable: %s' % job.GetExecutable()) 148 self.logger.debug('main script: %s' % job.GetMainScript()) 149 self.logger.debug('args options: %s' % argopts) 150 self.logger.debug('arguments: %s' % job.GetArguments()) 151 job.Write(submitfile, "$PYROOT/bin/python %s %s" % (executable, argstr), parse=False) 152 job.Write(submitfile, 'echo "job exited with status $?";',parse=False) 153 job.Write(submitfile, "cat %s.tmp>&1" % outfile, parse=False) 154 job.Write(submitfile, "cat %s.tmp>&2" % errfile, parse=False) 155 job.Write(submitfile,"rm -f wgetrc" ) 156 157 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'): 158 job.Write(submitfile,"for file in *; do") 159 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 160 job.Write(submitfile," mv $file %s" % job.GetInitialdir()) 161 job.Write(submitfile," fi; done") 162 163 job.Write(submitfile,"#clean directory") 164 job.Write(submitfile,"cd /tmp") 165 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False) 166 job.Write(submitfile,"exit 0") 167 self.logger.debug('Submit file written') 168 submitfile.close()
169
170 - def get_id(self,submit_status):
171 """ 172 Parse string returned by condor on submission to extract the 173 id of the job cluster 174 175 @param submit_status: string returned by condor_submit 176 """ 177 matches = re.findall(r'Your job [0-9]+', submit_status) 178 self.logger.debug(submit_status) 179 if matches: 180 cluster_info = matches[0].split() 181 job_id = cluster_info[-1] 182 183 self.job_ids.append(job_id) 184 return job_id 185 else: 186 self.logger.warn('could not parse job id from "%s"' % submit_status) 187 return -1
188
189 - def CheckJobStatus(self,jobs):
190 """ 191 Querie status of job on condor queue 192 """ 193 if isinstance(jobs,list): 194 job_list = jobs 195 else: 196 job_list = [jobs] 197 198 job_dict = {} 199 self.logger.info("beggining of CheckJobStatus") 200 201 for job in job_list: # initialize status to FINISHED 202 self.logger.info("checking job status: job id %s for dataset %d", job.GetJobId(), job.GetDatasetId()) 203 204 if job.GetJobId() < 0: continue 205 206 for job_id in job.GetJobId().split(" "): 207 job_dict[job_id] = job 208 job.SetStatus('FINISHED') 209 210 cmd = self.checkqueue_cmd 211 self.logger.debug(cmd) 212 retval,output = commands.getstatusoutput(cmd) 213 214 if retval: # failed to get status from sge 215 for job in job_list: job.SetStatus('?') 216 return retval 217 218 for line in output.split('\n')[2:]: # skip first two lines 219 try: 220 tok = line.split() 221 jobId = tok[0] 222 prio = tok[1] 223 name = tok[2] 224 user = tok[3] 225 jobStatus = tok[4] 226 runtime = tok[5] 227 queue = tok[6] 228 self.logger.debug("jobid:%s" %jobId) 229 if jobId in job_dict.keys(): 230 self.logger.debug("status for jobid %s is %s" %(jobId,jobStatus)) 231 status = cstatus(jobStatus) 232 job_dict[jobId].SetStatus(status) 233 except Exception,e: 234 self.logger.error("%s:%s" %(e,line)) 235 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 236 break 237 238 return 1
239
240 - def CheckQ(self,db=None):
241 """ 242 Querie status of cluster or job on condor queue 243 """ 244 245 cmd = self.checkqueue_cmd 246 for id in self.job_ids: 247 cmd += " %s" % id 248 status,output = commands.getstatusoutput(cmd) 249 return output
250 251
252 - def CleanQ(self,jobs=None):
253 """ 254 Querie status of cluster or job on condor queue 255 """ 256 257 self.logger.info("beggining of CleanQ") 258 if not jobs: return 0 259 260 if isinstance(jobs,list): 261 job_list = jobs 262 else: 263 job_list = [jobs] 264 job_dict = dict() 265 266 267 for job in job_list: 268 self.logger.info("job id: %s", job.GetJobId()) 269 if job.GetJobId() < 0: continue 270 271 for job_id in job.GetJobId().split(" "): 272 job_dict[job_id] = job 273 274 # cmd = self.checkqueue_cmd + " | grep iceprod" 275 cmd = self.checkqueue_cmd 276 status,output = commands.getstatusoutput(cmd) 277 if not status: 278 for line in output.split('\n')[2:]: # skip first two lines 279 if line.strip() == '': 280 continue 281 try: 282 tok = line.split() 283 jobId = tok[0] 284 prio = tok[1] 285 name = tok[2] 286 user = tok[3] 287 jobStatus = tok[4] 288 runtime = tok[5] 289 queue = tok[6] 290 if name.startswith("iceprod.") and not job_dict.has_key(jobId): 291 self.logger.warn("removing job %s with status %s. Reason: job not found in list" % \ 292 (jobId,cstatus(jobStatus))) 293 self.logger.debug("job list [%s]" % str(job_dict.keys())) 294 os.system("%s %s" % (self.queue_rm_cmd,jobId)) 295 except Exception,e: 296 self.logger.error("%s:%s" %(e,line)) 297 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 298 299 return status
300 301
302 - def QRemove(self,job):
303 """ 304 Remove cluster or job from queue 305 """ 306 if isinstance(job,i3Job) and job.GetStatus() == "FINISHED": 307 return 0 # no need to remove something that is not there 308 309 cmd = "%s %s" % (self.queue_rm_cmd,job.job_id) 310 status,output = commands.getstatusoutput(cmd) 311 return status
312 313
314 -class SgeDAG(SGE):
315 """ 316 This class represents a job that executes in multiple parts using a DAG. 317 """ 318
319 - def __init__(self):
320 SGE.__init__(self) 321 self.enqueue_cmd = "/bin/bash" 322 self.localdb = None 323 self.logger = logging.getLogger('SGEDAG') 324 try: 325 import sqlite3 326 self.localdb = sqlite3.connect(expandvars("$I3PROD/shared/localqueue.db")) 327 except: 328 self.logger.error("sqlite3 missing. will try sqlite.") 329 try: 330 import sqlite 331 self.localdb = sqlite.connect(expandvars("$I3PROD/shared/localqueue.db")) 332 except: 333 self.logger.error("sqlite missing. won't try to mantain queue sanity") 334 if self.localdb: 335 cursor = self.localdb.cursor() 336 try: 337 cursor.execute('CREATE TABLE IF NOT EXISTS queue (parent_id VARCHAR(80), child_id VARCHAR(80), status VARCHAR(80))') 338 except Exception,e: 339 self.logger.error(e) 340 else: 341 self.localdb.commit() 342 cursor.close()
343
344 - def __del__(self):
345 if self.localdb: 346 self.localdb.close()
347 348
349 - def GetLogs(self,job):
350 """ 351 Read output logs from job 352 """ 353 status,output = commands.getstatusoutput("tail -n 100 %s" % job.GetLogFile().replace(".log","*.log")) 354 job.log = output 355 status,output = commands.getstatusoutput("tail -n 100 %s" % job.GetLogFile().replace(".err","*.err")) 356 job.err = output 357 status,output = commands.getstatusoutput("tail -n 100 %s" % job.GetLogFile().replace(".out","*.out")) 358 job.out = output
359
360 - def WriteConfig(self,job,config_file):
361 """ 362 Write condor submit file to a file. 363 @param job: i3Job object 364 @param config_file: path to file where submit file will be written 365 """ 366 367 if not job.GetExecutable(): 368 raise Exception, "no executable configured" 369 370 from iceprod.core.dataclasses import IceTrayConfig 371 372 db = self.GetMonitorDB() 373 374 steering = job.GetSteering() 375 task_defs = steering.GetTaskDefinitions() 376 self.logger.debug("Task definitions: %s" % task_defs) 377 if not len(task_defs): 378 # not a DAG 379 self.logger.debug("No tasks specified in config file; doing regular submit") 380 self.enqueue_cmd = "qsub" 381 return SGE.WriteConfig(self, job, config_file) 382 383 dagfile = open(config_file,'w') 384 job.Write(dagfile, "#!/bin/bash") 385 job_id = job.GetDatabaseId() 386 job.dag = dict() # create dendency tree 387 388 file_catalog = {} 389 for taskname,td in task_defs.items(): 390 args = self.GetArguments(job,td,output="dict") 391 file_catalog[taskname] = self.GetFiles(job,td,args) 392 job.dag[taskname] = None 393 394 for taskname,td in task_defs.items(): 395 filename = config_file.replace('.sge',".%s.sge" % taskname) 396 td_id = td.GetId() 397 self.logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, filename)) 398 tmp_parents = self.EnumerateParentNodes(steering,td) 399 parents = [] 400 # only add parent if is is not completed 401 for parent in tmp_parents: 402 parent_td = steering.GetTaskDefinition(parent) 403 if not db.task_is_finished(parent_td.GetId(), job.GetDatabaseId()): 404 self.logger.error("task id %s, '%s' job id %s" % (parent_td.GetId(),parent_td.GetName(),job.GetDatabaseId())) 405 parents.append(parent) 406 job.dag[taskname] = parents 407 if td.ParallelExecutionEnabled(): 408 trays = td.GetTrays() 409 for idx,tray in trays.items(): 410 for iter in tray.GetIters(): 411 if not iter == -1: 412 nodename = "%s_%u" % (taskname,iter) 413 else: 414 nodename = "%s_ext" % taskname 415 filename = config_file.replace('.sge',".%s.sge" % nodename) 416 done = db.task_is_finished(td_id, job_id, idx, iter) 417 args = self.GetArguments(job,td,idx,iter,output="dict") 418 419 input,output,notes = self.GetFiles(job,td,args,idx,iter,file_catalog) 420 self.WriteFileManifest(job,filename,input,output,notes) 421 self.WriteSubmitFile(job,filename,td,idx,iter) 422 self.WriteDAGNode(job,dagfile,nodename,filename,parents,done) 423 else: 424 done = db.task_is_finished(td_id, job_id) 425 if taskname == 'trashcan': 426 done = False 427 args = self.GetArguments(job,td,output="dict") 428 429 input,output,notes = self.GetFiles(job,td,args,catalog=file_catalog) 430 self.WriteFileManifest(job,filename,input,output,notes) 431 self.WriteSubmitFile(job,filename,td) 432 self.WriteDAGNode(job,dagfile,taskname,filename,parents,done) 433 db.commit() 434 435 self.FinishDAGNode(job,dagfile)
436
437 - def IsUrl(self, path):
438 return bool(re.match("[^/:]+://?.*$", path))
439
440 - def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False):
441 steering = job.GetSteering() 442 parser = ExpParser(args,steering) 443 if idx is not False: 444 td_trays = {idx: td.GetTray(idx)} 445 else: 446 td_trays = td.GetTrays() 447 448 if steering.GetSysOpt("dagtemp"): 449 tmp_dir = parser.parse(steering.GetSysOpt("dagtemp").GetValue()) 450 elif args.has_key('dagtemp'): 451 tmp_dir = parser.parse(args['dagtemp']) 452 else: 453 tmp_dir = 'file:///tmp' # TODO: give this a sane default or make it an error 454 455 if args.has_key('fetch'): 456 global_dir = args['fetch'] 457 else: 458 global_dir = 'file:///tmp' 459 460 td_input = {} 461 td_output = {} 462 notes = {} 463 464 if td.IsCleanup(): 465 # cleanup job has no input/output files required 466 return (td_input, td_output, notes) 467 468 for idx, td_tray in td_trays.iteritems(): 469 args['tray'] = idx 470 471 self.logger.info("GetTray(%s)" % idx) 472 icetray = steering.GetTray(idx) 473 474 input_files = icetray.GetInputFiles() 475 parsed_input = [] 476 477 output_files = icetray.GetOutputFiles() 478 parsed_output = [] 479 480 if iter is not False: 481 iters = [iter] 482 else: 483 iters = td_tray.GetIters() 484 for iter in iters: 485 args['iter'] = iter 486 for d in steering.GetDependencies(): 487 d_file = parser.parse(d) 488 if not td_input.has_key(d_file): 489 location = d_file 490 if not self.IsUrl(location): 491 location = os.path.join(global_dir, location) 492 td_input[os.path.basename(d_file)] = [location] 493 for i_file in input_files: 494 name = i_file.GetName() 495 name = parser.parse(name) 496 if not td_input.has_key(name) \ 497 and not td_output.has_key(name): 498 if catalog: 499 node = self.FindFile(steering,td,catalog,name) 500 else: 501 node = td.GetName() 502 503 note = False 504 if node == "global": 505 location = global_dir 506 note = "global" 507 if node != "global": 508 location = tmp_dir 509 location = os.path.join(location, str(job.GetDatasetId())) 510 location = os.path.join(location, str(job.GetProcNum())) 511 location = os.path.join(location, node) 512 note = "dontextract" 513 location = os.path.join(location, name) 514 if i_file.IsPhotonicsTable(): 515 note = "photonics"; 516 if note: 517 notes[name] = note 518 td_input[name] = [location] 519 for o_file in output_files: 520 name = o_file.GetName() 521 name = parser.parse(name) 522 if not td_output.has_key(name): 523 location = os.path.join(tmp_dir, str(job.GetDatasetId())) 524 location = os.path.join(location, str(job.GetProcNum())) 525 location = os.path.join(location, str(td.GetName())) 526 location = os.path.join(location, name) 527 td_output[name] = [location] 528 529 return (td_input, td_output, notes)
530
531 - def FindFile(self,steering,td,catalog,file):
532 parents = td.GetParents() 533 534 # check immediate parents for this file 535 for parent in parents: 536 if catalog.has_key(parent): 537 if catalog[parent][1].has_key(file): 538 return parent 539 540 # check older ancestors 541 for parent in parents: 542 parent_td = steering.GetTaskDefinition(parent) 543 result = self.FindFile(steering,parent_td,catalog,file) 544 if result != "global": 545 return result 546 547 return "global"
548
549 - def WriteFileManifest(self,job,filename,input,output,notes):
550 self.logger.debug("Input files: %s" % input) 551 in_manifest = open(filename.replace(".sge", ".input"), 'w') 552 job.manifest_in = in_manifest.name 553 if len(input): 554 padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 555 fmt_str = "%-" + padding + "s %s" 556 for i_file, locs in input.items(): 557 for loc in locs: 558 file = fmt_str % (loc, i_file) 559 if notes.has_key(i_file): 560 file += "\t%s" % notes[i_file] 561 job.Write(in_manifest, file) 562 in_manifest.close() 563 self.logger.debug("Output files: %s" % output) 564 out_manifest = open(filename.replace(".sge", ".output"), 'w') 565 job.manifest_out = out_manifest.name 566 if len(output): 567 padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 568 fmt_str = "%-" + padding + "s %s" 569 for o_file, locs in output.items(): 570 for loc in locs: 571 job.Write(out_manifest, fmt_str % (o_file, loc)) 572 out_manifest.close()
573
574 - def WriteDAGNode(self,job,dagfile,nodename,filename,parents=None,done=False):
575 #str = "echo JOB %s %s" % (nodename,filename) 576 if done: return 577 job.Write(dagfile, "%s=`qsub \\" % (nodename)) 578 for parent in parents: 579 job.Write(dagfile, " -hold_jid $%s\\" % (parent),parse=False) 580 job.Write(dagfile, " %s| grep -o -e 'Your job [0-9]*'| awk '{ print $3}'`" % (filename)) 581 job.Write(dagfile, 'if [ -z $ICEPROD_JOB_ID_LIST ]; then ',parse=False) 582 job.Write(dagfile, ' ICEPROD_JOB_ID_LIST="${%s}";' % (nodename),parse=False) 583 job.Write(dagfile, 'else ') 584 job.Write(dagfile, ' ICEPROD_JOB_ID_LIST="${ICEPROD_JOB_ID_LIST}:${%s}";' % (nodename),parse=False) 585 job.Write(dagfile, 'fi')
586
587 - def FinishDAGNode(self,job,dagfile):
588 job.Write(dagfile, 'echo "<job_id>${ICEPROD_JOB_ID_LIST}</job_id>";',parse=False) 589 job.Write(dagfile, "exit 0;") 590 dagfile.close()
591
592 - def get_id(self,submit_status):
593 """ 594 Parse string returned by sge on submission to extract the 595 id of the job cluster 596 @param submit_status: string returned by condor_submit 597 """ 598 matches = re.findall(r'<job_id>.*</job_id>', submit_status) 599 self.logger.debug(submit_status) 600 if matches: 601 job_ids = map(lambda x: x.split('.')[0],matches[0].strip('<job_id>').strip('</job_id>').split(":")) 602 dag_id = job_ids[0] 603 cursor = self.localdb.cursor() 604 for job_id in job_ids: 605 sql = 'INSERT INTO `queue` (parent_id,child_id,status)' 606 sql += ' VALUES ("%s","%s","QUEUED") ' % (dag_id,job_id) 607 self.logger.info(sql) 608 cursor.execute(sql) 609 self.localdb.commit() 610 self.job_ids.append(dag_id) 611 return dag_id 612 else: 613 self.logger.warn('could not parse job id from "%s"' % submit_status) 614 return SGE.get_id(self,submit_status) 615 return -1
616 617 618
619 - def EnumerateParentNodes(self,steering,td):
620 parents = td.GetParents() 621 parentnodes = [] 622 for parent in parents: 623 parentobj = steering.GetTaskDefinition(parent) 624 if parentobj.ParallelExecutionEnabled(): 625 for idx,tray in parentobj.GetTrays().items(): 626 for iter in tray.GetIters(): 627 if not iter == -1: 628 nodename = "%s_%u" % (parent,iter) 629 else: 630 nodename = "%s_ext" % parent 631 parentnodes.append(nodename) 632 else: 633 parentnodes.append(parent) 634 return parentnodes
635
636 - def GetArguments(self,job,td,idx=False,iter=False,output="str"):
637 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 638 argstr = " --dag --task=%s" % td.GetName() 639 if not idx is False: 640 argstr += " --tray=%s --iter=%s" % (idx,iter) 641 argstr += job.GetMainScript() + " " + " ".join(argopts) 642 if output == "dict": 643 argstr = self.ArgStrToDict(argstr) 644 return argstr
645
646 - def ArgStrToDict(self,argstr):
647 args = {} 648 for str in argstr.split(" "): 649 str = str[2:] 650 pieces = str.split("=") 651 if len(pieces) == 1: 652 args[str] = 1 653 else: 654 args[pieces[0]] = pieces[1] 655 return args
656
657 - def getAdditionalOptions(self,job):
658 return {}
659
660 - def WriteSubmitFile(self,job,filename,td,idx=False,iter=False):
661 steering = job.GetSteering() 662 args = self.GetArguments(job,td,idx,iter,output="dict") 663 parser = ExpParser(args,steering) 664 665 submitfile = open(filename,'w') 666 outfile = filename.replace('.sge','.out') 667 errfile = filename.replace('.sge','.err') 668 669 job.Write(submitfile,"#!/bin/sh") 670 self.logger.debug("#!/bin/sh") 671 672 job.Write(submitfile,"#$ -o %s" % outfile) 673 self.logger.debug("#$ -o %s" % outfile) 674 675 job.Write(submitfile,"#$ -e %s" % errfile ) 676 self.logger.debug("#$ -e %s" % errfile ) 677 678 # Add general batch options 679 680 queue_list = [] 681 for key in self.GetParamKeys(): 682 if not key.startswith("queue"): 683 job.Write(submitfile,"#$ %s" % (self.GetParam(key))) 684 else: 685 queue_list.append(self.GetParam(key)[2:]) 686 687 td_batch_opts = td.GetBatchOpts() 688 self.logger.debug(td_batch_opts) 689 try: 690 td_batch_opts = parser.parse(td_batch_opts) 691 except: 692 self.logger.warn("could not parse %s" % td_batch_opts) 693 td_batch_opts = "" 694 if td_batch_opts: 695 for opt in map(string.strip,td_batch_opts.split(";")): 696 job.Write(submitfile,"#$ %s" % opt) 697 if opt.startswith("-q"): queue_list = [] # overriding default queue 698 699 if queue_list: 700 chosen_queue = self._choose_queue(queue_list) 701 job.Write(submitfile,"#$ -q %s" % chosen_queue) 702 703 # Add job specific batch options 704 for key,opt in job.GetBatchOpts().items(): 705 job.Write(submitfile,"#$ %s " % opt) 706 707 for var in self.env.keys(): 708 job.Write(submitfile, "export %s=%s" % (var, self.env[var]), parse=False) 709 for var in job.env.keys(): 710 job.Write(submitfile, "export %s=%s" % (var, job.env[var]), parse=False) 711 job.Write(submitfile, "RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 712 job.Write(submitfile, "mkdir -p $RUNDIR") 713 714 for file in job.GetInputFiles()+[job.GetExecutable()]: 715 job.Write(submitfile, "cp %s $RUNDIR" % file,parse=False) 716 717 # Need manifest files 718 job.Write(submitfile, "cp %s $RUNDIR" % job.manifest_in) 719 job.Write(submitfile, "cp %s $RUNDIR" % job.manifest_out) 720 721 job.Write(submitfile, "cd $RUNDIR",parse=False) 722 723 job.Write(submitfile, "echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 724 argstr = job.GetMainScript() + " " + self.GetArguments(job,td,idx,iter,output="string") 725 executable = os.path.basename(job.GetExecutable()) 726 job.Write(submitfile, 727 "$PYROOT/bin/python -u %s %s 1>%s.tmp 2>%s.tmp" % (executable,argstr,outfile,errfile), 728 parse=False) 729 job.Write(submitfile, "RETVAL=$?", parse=False) 730 731 job.Write(submitfile, "cat %s.tmp>&1" % outfile, parse=False) 732 job.Write(submitfile, "cat %s.tmp>&2" % errfile, parse=False) 733 job.Write(submitfile,"rm -f wgetrc" ) 734 735 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'): 736 job.Write(submitfile,"for file in *; do") 737 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 738 job.Write(submitfile," mv $file %s" % job.GetInitialdir()) 739 job.Write(submitfile," fi; done") 740 741 job.Write(submitfile,"#clean directory") 742 job.Write(submitfile,"cd /tmp") 743 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False) 744 job.Write(submitfile,"exit $RETVAL") 745 submitfile.close() 746 os.system("chmod +x %s" % submitfile.name)
747 748
749 - def CheckJobStatus(self,jobs):
750 """ 751 Querie status of job on SGE queue 752 """ 753 self.logger.info("checking jobs in queue") 754 if not self.localdb: return 1 755 756 if isinstance(jobs,list): 757 job_list = jobs 758 else: 759 job_list = [jobs] 760 for job in job_list: 761 job.SetStatus('?') 762 if not job.GetJobId(): continue 763 764 cursor = self.localdb.cursor() 765 sql = 'SELECT parent_id,child_id,status FROM `queue` WHERE parent_id = "%s" ' % job.GetJobId() 766 cursor.execute(sql) 767 768 jobentries = cursor.fetchall() 769 if len(jobentries) == 0: # make tuple object if not found in db 770 jobentries = [(job.GetJobId(),job.GetJobId(),'?')] 771 for dag_id,job_id, status in jobentries: 772 cmd = "qstat -f %s " % job_id 773 self.logger.debug(cmd) 774 status,output = commands.getstatusoutput(cmd) 775 if status: 776 self.logger.error("%s: %s: %s" % (cmd,status,output)) 777 if job.GetStatus() == '?': 778 job.SetStatus('FINISHED') 779 else: 780 for line in output.split('\n'): # skip first two lines 781 line = line.strip() 782 if line.startswith('job_state'): 783 jobstatus = cstatus(line.split('=')[1].strip()) 784 if jobstatus == 'PROCESSING' or job.GetStatus() not in ['PROCESSING','QUEUED']: 785 job.SetStatus(jobstatus) 786 if line.startswith('exec_host'): 787 host = line.split('=')[1].strip() 788 job.SetHost(host) 789 790 # clean up job db 791 if job.GetStatus() == "FINISHED": 792 cursor.execute('DELETE FROM queue WHERE parent_id = "%s"' % dag_id) 793 self.localdb.commit() 794 return 1
795
796 - def QRemove(self,job):
797 """ 798 Remove cluster or job from queue 799 """ 800 retval = 0 801 self.logger.info("checking jobs in queue") 802 if not self.localdb: 803 return SGE.QRemove(self,job) 804 805 cursor = self.localdb.cursor() 806 sql = 'SELECT parent_id,child_id,status FROM `queue` WHERE parent_id = "%s" ' % job.job_id 807 cursor.execute(sql) 808 809 jobentries = cursor.fetchall() 810 if len(jobentries) == 0: # make tuple object if not found in db 811 jobentries = [(job.job_id,job.job_id,'?')] 812 for dag_id,job_id, status in jobentries: 813 task = i3Task() 814 task.job_id = job_id 815 retval = SGE.QRemove(self,task) 816 817 cursor.execute('DELETE FROM queue WHERE parent_id = "%s"' % job.job_id) 818 self.localdb.commit() 819 return retval
820 821
822 - def CleanQ(self,jobs=None):
823 """ 824 Not implemeted for DAG yet 825 """ 826 if not self.localdb: 827 self.logger.warn("CleanQ: no local database found.") 828 return 0 829 else: 830 if not isinstance(jobs,list): jobs = [jobs] 831 cursor = self.localdb.cursor() 832 sql = 'SELECT child_id FROM `queue`' 833 cursor.execute(sql) 834 jobentries = cursor.fetchall() 835 for jobid in jobentries: 836 job = i3Job() 837 job.SetJobId(jobid[0]) 838 jobs.append(job) 839 return SGE.CleanQ(self,jobs)
840 841 842
843 - def GetAdditionalOptions(self,job):
844 return {}
845 846 847 import dag 848
849 -class SgeI3Dag(dag.TaskQ,SGE):
850 - def __init__(self):
851 SGE.__init__(self) 852 dag.TaskQ.__init__(self) # inheritance resolution order is important!!!! 853 self.logger = logging.getLogger('SgeI3Dag') 854 self.suffix = "sge"
855
856 - def Submit(self,cookie):
857 858 for job in self.jobs: 859 if isinstance(job,i3Task): 860 dag.TaskQ.SubmitTask(self,job) 861 self.logger.info("submitting task") 862 return SGE.Submit(self,cookie)
863