Package iceprod :: Package server :: Package plugins :: Module sge_thorsten
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.plugins.sge_thorsten

  1  #!/bin/env python 
  2  # 
  3   
  4  """ 
  5   A basic submitfile for submitting and monitoring jobs to SGE.  
  6   This module implements only a small subset of SGE features. 
  7   It's interface is like that of condor however. 
  8   Inherits from i3Queue 
  9   
 10   copyright  (c) 2005 the icecube collaboration 
 11   
 12   @version: $Revision: $ 
 13   @date: $Date: $ 
 14   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 15   @todo: implement more functionality of sge. 
 16  """ 
 17   
 18  import os 
 19  import re 
 20  import sys 
 21  import math 
 22  import random  
 23  import dircache 
 24  import time 
 25  import string 
 26  import logging 
 27  import os.path 
 28  import getpass 
 29  import commands 
 30  from os import popen2 
 31  from iceprod.core import metadata 
 32  from iceprod.core.lex import ExpParser 
 33  from iceprod.server.grid import iGrid 
 34  from iceprod.server.job import i3Job, i3Task 
 35   
 36  logger = logging.getLogger('SGE') 
 37   
 38  sge_status = {'qw':'QUEUED', 'r':'PROCESSING', 'hqw': 'QUEUED'} 
 39   
40 -def cstatus(istatus):
41 if sge_status.has_key(istatus): 42 return sge_status[istatus] 43 return 'FINISHED'
44
45 -class SGE(iGrid):
46 """ 47 This class represents a job or cluster on a sge system. 48 """ 49
50 - def __init__(self):
51 52 iGrid.__init__(self) 53 self.sleeptime = 30 54 self.enqueue_cmd = "qsub" 55 self.checkqueue_cmd = "qstat -u %s " % getpass.getuser() 56 self.queue_rm_cmd = "qdel" 57 self.suffix = "sge" 58 logger.debug('Made a SGE(iGrid)')
59 60
61 - def WriteConfig(self,job,config_file):
62 """ 63 Write sge submit file to a file. 64 @param job: i3Job object 65 @param config_file: path to file were submit file will be written 66 """ 67 logger.debug('WriteConfig') 68 69 if not job.GetExecutable(): 70 raise Exception, "no executable configured" 71 72 submitfile = open("%s" % config_file,'w') 73 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile()) 74 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile()) 75 76 job.Write(submitfile,"#!/bin/sh") 77 logger.debug("#!/bin/sh") 78 79 job.Write(submitfile,"#$ -o %s" % outfile) 80 logger.debug("#$ -o %s" % outfile) 81 82 job.Write(submitfile,"#$ -e %s" % errfile ) 83 logger.debug("#$ -e %s" % errfile ) 84 85 # Add general batch options 86 for key in self.GetParamKeys(): 87 job.Write(submitfile,"#$ %s" % self.GetParam(key),parse=True) 88 logger.debug("#$ %s" % self.GetParam(key)) 89 90 # Add job specific batch options 91 for key,opt in job.GetBatchOpts().items(): 92 job.Write(submitfile,"#$ %s" % opt,parse=True) 93 logger.debug("#$ %s" % opt) 94 95 #Export environment variable,value pairs 96 for var in self.env.keys(): 97 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False) 98 for var in job.env.keys(): 99 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False) 100 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False) 101 job.Write(submitfile,"unset I3SIMPRODPATH") 102 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 103 job.Write(submitfile,"mkdir -p $RUNDIR",parse=False) 104 logger.debug("RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time())) 105 logger.debug("mkdir -p $RUNDIR") 106 107 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 108 logger.debug("echo \"running on $HOSTNAME:$RUNDIR\"") 109 110 logger.debug('%d' %len(job.GetInputFiles())) 111 for file in job.GetInputFiles()+[job.GetExecutable()]: 112 logger.debug('%s' %file) 113 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False) 114 job.Write(submitfile,"cd $RUNDIR",parse=False) 115 116 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 117 argstr = job.GetMainScript() + " " + " ".join(argopts) 118 executable = os.path.basename(job.GetExecutable()) 119 logger.debug('executable: %s' % job.GetExecutable()) 120 logger.debug('main script: %s' % job.GetMainScript()) 121 logger.debug('args options: %s' % argopts) 122 logger.debug('arguments: %s' % job.GetArguments()) 123 job.Write(submitfile, "$PYROOT/bin/python %s %s" % (executable, argstr), parse=False) 124 job.Write(submitfile, 'echo "job exited with status $?";',parse=False) 125 126 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'): 127 job.Write(submitfile,"for file in *; do") 128 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 129 job.Write(submitfile," mv $file %s" % job.GetInitialdir()) 130 job.Write(submitfile," fi; done") 131 132 job.Write(submitfile,"#clean directory") 133 job.Write(submitfile,"cd /tmp") 134 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False) 135 logger.debug('Submit file written') 136 submitfile.close();
137
138 - def get_id(self,submit_status):
139 """ 140 Parse string returned by condor on submission to extract the 141 id of the job cluster 142 143 @param submit_status: string returned by condor_submit 144 """ 145 matches = re.findall(r'Your job [0-9]+', submit_status) 146 if matches: 147 cluster_info = matches[0].split() 148 job_id = cluster_info[-1] 149 150 self.job_ids.append(job_id) 151 return job_id 152 else: 153 logger.warn('could not parse job id from "%s"' % submit_status) 154 return -1
155
156 - def CheckJobStatus(self,jobs):
157 """ 158 Querie status of job on condor queue 159 """ 160 if isinstance(jobs,list): 161 job_list = jobs 162 else: 163 job_list = [jobs] 164 165 job_dict = {} 166 logger.info("beggining of CheckJobStatus") 167 168 for job in job_list: # initialize status to FINISHED 169 logger.info("checking job status: job id %s for dataset %d", job.GetJobId(), job.GetDatasetId()) 170 171 if job.GetJobId() < 0: continue 172 173 for job_id in job.GetJobId().split(" "): 174 job_dict[job_id] = job 175 job.SetStatus('FINISHED') 176 177 cmd = self.checkqueue_cmd 178 logger.debug(cmd) 179 retval,output = commands.getstatusoutput(cmd) 180 181 if retval: # failed to get status from sge 182 for job in job_list: job.SetStatus('?') 183 return retval 184 185 for line in output.split('\n')[2:]: # skip first two lines 186 try: 187 tok = line.split() 188 jobId = tok[0] 189 prio = tok[1] 190 name = tok[2] 191 user = tok[3] 192 jobStatus = tok[4] 193 runtime = tok[5] 194 queue = tok[6] 195 logger.debug("jobid:%s" %jobId) 196 if jobId in job_dict.keys(): 197 logger.debug("status for jobid %s is %s" %(jobId,jobStatus)) 198 status = cstatus(jobStatus) 199 job_dict[jobId].SetStatus(status) 200 except Exception,e: 201 logger.error("%s:%s" %(e,line)) 202 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 203 break 204 205 return 1
206
207 - def CheckQ(self,db=None):
208 """ 209 Querie status of cluster or job on condor queue 210 """ 211 212 cmd = self.checkqueue_cmd 213 for id in self.job_ids: 214 cmd += " %s" % id 215 status,output = commands.getstatusoutput(cmd) 216 return output
217 218
219 - def CleanQ(self,jobs=None):
220 """ 221 Querie status of cluster or job on condor queue 222 """ 223 224 logger.info("beggining of CleanQ") 225 if not jobs: return 0 226 227 if isinstance(jobs,list): 228 job_list = jobs 229 else: 230 job_list = [jobs] 231 job_dict = dict() 232 233 234 for job in job_list: 235 logger.info("job id: %s", job.GetJobId()) 236 237 logger.info("ja laenge is 0") 238 if job.GetJobId() < 0: continue 239 240 for job_id in job.GetJobId().split(" "): 241 job_dict[job_id] = job 242 job.SetStatus('FINISHED') 243 244 cmd = self.checkqueue_cmd 245 status,output = commands.getstatusoutput(cmd) 246 if not status: 247 for line in output.split('\n')[2:]: # skip first two lines 248 if line.strip() == '': 249 continue 250 try: 251 tok = line.split() 252 jobId = tok[0] 253 prio = tok[1] 254 name = tok[2] 255 user = tok[3] 256 jobStatus = tok[4] 257 runtime = tok[5] 258 queue = tok[6] 259 if name.startswith("iceprod.") and not job_dict.has_key(jobId): 260 logger.warn("removing job %s with status %s. Reason: job not found in list" % \ 261 (jobId,cstatus(jobStatus))) 262 logger.debug("job list [%s]" % str(job_dict.keys())) 263 os.system("%s %s" % (self.queue_rm_cmd,jobId)) 264 except Exception,e: 265 logger.error("%s:%s" %(e,line)) 266 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 267 268 return status
269 270
271 - def QRemove(self,job):
272 """ 273 Remove cluster or job from queue 274 """ 275 if isinstance(job,i3Job) and job.GetStatus() == "FINISHED": 276 return 0 # no need to remove something that is not there 277 278 cmd = "%s %s" % (self.queue_rm_cmd,job.job_id) 279 status,output = commands.getstatusoutput(cmd) 280 return status
281 282
283 -class SgeDAG(SGE):
284 """ 285 This class represents a job that executes in multiple parts using a DAG. 286 """ 287
288 - def __init__(self):
289 SGE.__init__(self) 290 #self.enqueue_cmd = "condor_submit_dag -f -notification never" 291 #self.exe_prefix = "condor_dagman" 292 logger.info("enque cmd %s" % self.enqueue_cmd) 293 logger.info("infit sgedag")
294
295 - def Submit(self, cookie):
296 """ 297 Submit job/cluster to PBS 298 299 @param job: i3Job object 300 @param config_file: path to file were submit file will be written 301 """ 302 self.submit_status = '' 303 status_sum = 0 304 cwdir = os.getcwd() 305 306 for job in self.jobs: 307 logger.info("submitting job %u.%u" %(job.GetDatasetId(),job.GetProcNum())) 308 309 steering = job.GetSteering() 310 task_defs = steering.GetTaskDefinitions() 311 logger.debug("Task definitions: %s" % task_defs) 312 313 ## we do have a single job.. no cluster like for gpu execution .. go with standard sge 314 if not len(task_defs): 315 # not a DAG 316 logger.debug("No tasks specified in config file; doing regular submit") 317 SGE.WriteConfig(self, job, job.config_file) 318 319 cmd = "%s %s" % (self.enqueue_cmd,job.config_file) 320 status, self.submit_status = commands.getstatusoutput(cmd) 321 status_sum += status 322 try: 323 id = self.get_id(self.submit_status) 324 job.SetJobId(id) 325 if id < 0: status_sum += 1 326 except Exception, e: 327 logger.error("Exception: " + str(e)) 328 self.submit_status += "\nException: " + str(e) 329 status_sum += 1 330 331 job.submit_status = status 332 job.submit_msg = self.submit_status 333 334 logger.info("subtmit status: %s, submit message: %s" %(job.submit_status, job.submit_msg)) 335 336 cookie.AddJobId(job.GetJobId()) 337 os.chdir(cwdir) 338 else:## WE HAVE MULTIPLE TASKS per JOB // DAG is active - go through from the toplevel and always 339 ## check the cluster id. The cluster id is given to the child job together with -j holdjid==parent_id 340 ## so it waits till it is executed 341 ## also write out a dagfile which visualizes the child/parent relationship 342 343 db = self.GetMonitorDB() 344 345 job_id = job.GetDatabaseId() 346 347 file_catalog = {} 348 tasks=dict() 349 args_dict=dict() 350 351 for taskname,td in task_defs.items(): 352 tasks[taskname]=td 353 args = self.GetArguments(job,td,output="dict") 354 args_dict[taskname]=args 355 file_catalog[taskname] = self.GetFiles(job,td,args) 356 357 if (len(td.GetParents()) == 0): 358 toplevel_taskname=taskname 359 360 cur_td=tasks[toplevel_taskname] 361 first=True 362 last_id=-1 363 364 dagfile = open(job.config_file,'w') 365 366 while(len(cur_td.GetChildren())>0): 367 taskname=cur_td.GetName() 368 taskfile = job.config_file.replace('.sge',".%s.sge" % taskname) 369 td_id = td.GetId() 370 371 logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, taskfile)) 372 parents = " ".join(self.EnumerateParentNodes(steering,cur_td)) 373 374 os.chdir(job.GetInitialdir()) 375 376 input, output, notes=self.GetFiles(job, cur_td, args_dict[taskname], catalog=file_catalog) 377 378 self.WriteFileManifest(job, taskfile,input,output,notes) 379 self.WriteTaskConfig(job,taskfile, cur_td, input, output) 380 381 if(first): 382 cmd = "%s %s" % (self.enqueue_cmd, taskfile) 383 logger.info("first in the task chain................") 384 else: 385 cmd = "%s -hold_jid %s %s" % (self.enqueue_cmd, last_id, taskfile) 386 logger.info(cmd) 387 status, self.submit_status = commands.getstatusoutput(cmd) 388 logger.info("subtmit status: %s, submit message: %s" %(status, self.submit_status)) 389 status_sum += status 390 id=-1 391 try: 392 id = self.get_id(self.submit_status) 393 logger.info("id to add as subtask: %s", id) 394 if(first): 395 logger.info("FIRST SETTING JOBID to %s", id) 396 job.SetJobId(id) 397 first = False 398 else: 399 temp_id=job.GetJobId() 400 job.SetJobId(temp_id + " " + id) 401 402 if id < 0: status_sum += 1 403 except Exception, e: 404 logger.error("Exception: " + str(e)) 405 self.submit_status += "\nException: " + str(e) 406 status_sum += 1 407 last_id=id 408 409 cur_td=tasks[cur_td.GetChildren()[0]] 410 411 ## last task 412 taskname=cur_td.GetName() 413 taskfile = job.config_file.replace('.sge',".%s.sge" % taskname) 414 td_id = td.GetId() 415 416 logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, taskfile)) 417 #parents = " ".join(self.EnumerateParentNodes(steering,cur_td)) 418 419 os.chdir(job.GetInitialdir()) 420 input, output, notes=self.GetFiles(job, cur_td, args_dict[taskname], catalog=file_catalog) 421 422 self.WriteFileManifest(job, taskfile,input,output,notes) 423 self.WriteTaskConfig(job,taskfile, cur_td, input, output) 424 425 cmd = "%s -hold_jid %s %s" % (self.enqueue_cmd, last_id, taskfile) 426 status, self.submit_status = commands.getstatusoutput(cmd) 427 status_sum += status 428 id=-1 429 try: 430 id = self.get_id(self.submit_status) 431 temp_id=job.GetJobId() 432 job.SetJobId(temp_id + " " + id) 433 434 if id < 0: status_sum += 1 435 except Exception, e: 436 logger.error("Exception: " + str(e)) 437 self.submit_status += "\nException: " + str(e) 438 status_sum += 1 439 440 for taskname,td in task_defs.items(): 441 parents = " ".join(self.EnumerateParentNodes(steering,td)) 442 done = db.task_is_finished(td.GetId(), job_id) 443 if(taskname=="trashcan"): 444 done=False 445 filename= job.config_file.replace('.sge',".%s.sge" % taskname) 446 self.WriteDAGNode(job,dagfile,taskname,filename,parents,done) 447 448 dagfile.close() 449 db.commit() 450 451 job.submit_status = status_sum 452 job.submit_msg = self.submit_status 453 454 if self.production: 455 # update database 456 if job.submit_status == 0: 457 self.i3monitordb.jobsubmitted( 458 job.GetDatasetId(), job.GetProcNum(), 459 job.GetInitialdir(), job.GetJobId()) 460 else: 461 logger.error("failed to submit jobs:"+job.submit_msg) 462 self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3, 463 "failed to submit jobs:"+job.submit_msg) 464 os.chdir('/tmp') 465 self.CleanDir(job.GetInitialdir()) 466 467 cookie.AddJobId(job.GetJobId()) 468 os.chdir(cwdir) 469 470 return status_sum,self.submit_status
471 472
473 - def IsUrl(self, path):
474 return bool(re.match("[^/:]+://?.*$", path))
475
476 - def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False):
477 steering = job.GetSteering() 478 if idx is not False: 479 td_trays = {idx: td.GetTray(idx)} 480 else: 481 td_trays = td.GetTrays() 482 483 if args.has_key('dagtemp'): 484 tmp_dir = args['dagtemp'] 485 else: 486 tmp_dir = 'file:///tmp' # TODO: give this a sane default or make it an error 487 488 if args.has_key('fetch'): 489 global_dir = args['fetch'] 490 else: 491 global_dir = 'file:///tmp' 492 493 td_input = {} 494 td_output = {} 495 notes = {} 496 497 if td.IsCleanup(): 498 # cleanup job has no input/output files required 499 return (td_input, td_output, notes) 500 501 for idx, td_tray in td_trays.iteritems(): 502 args['tray'] = idx 503 504 logger.info("GetTray(%s)" % idx) 505 icetray = steering.GetTray(idx) 506 507 input_files = icetray.GetInputFiles() 508 parsed_input = [] 509 510 output_files = icetray.GetOutputFiles() 511 parsed_output = [] 512 513 if iter is not False: 514 iters = [iter] 515 else: 516 iters = td_tray.GetIters() 517 for iter in iters: 518 args['iter'] = iter 519 parser = ExpParser(args,steering) 520 for d in steering.GetDependencies(): 521 d_file = parser.parse(d) 522 if not td_input.has_key(d_file): 523 location = d_file 524 if not self.IsUrl(location): 525 location = os.path.join(global_dir, location) 526 td_input[os.path.basename(d_file)] = [location] 527 for i_file in input_files: 528 name = i_file.GetName() 529 name = parser.parse(name) 530 if not td_input.has_key(name) \ 531 and not td_output.has_key(name): 532 if catalog: 533 node = self.FindFile(steering,td,catalog,name) 534 else: 535 node = td.GetName() 536 537 note = False 538 if node == "global": 539 location = global_dir 540 note = "global" 541 if node != "global": 542 location = tmp_dir 543 location = os.path.join(location, str(job.GetDatasetId())) 544 location = os.path.join(location, str(job.GetProcNum())) 545 location = os.path.join(location, node) 546 note = "dontextract" 547 location = os.path.join(location, name) 548 if i_file.IsPhotonicsTable(): 549 note = "photonics"; 550 if note: 551 notes[name] = note 552 td_input[name] = [location] 553 for o_file in output_files: 554 name = o_file.GetName() 555 name = parser.parse(name) 556 if not td_output.has_key(name): 557 location = os.path.join(tmp_dir, str(job.GetDatasetId())) 558 location = os.path.join(location, str(job.GetProcNum())) 559 location = os.path.join(location, str(td.GetName())) 560 location = os.path.join(location, name) 561 td_output[name] = [location] 562 563 return (td_input, td_output, notes)
564
565 - def FindFile(self,steering,td,catalog,file):
566 parents = td.GetParents() 567 568 # check immediate parents for this file 569 for parent in parents: 570 if catalog.has_key(parent): 571 if catalog[parent][1].has_key(file): 572 return parent 573 574 # check older ancestors 575 for parent in parents: 576 parent_td = steering.GetTaskDefinition(parent) 577 result = self.FindFile(steering,parent_td,catalog,file) 578 if result != "global": 579 return result 580 581 return "global"
582
583 - def WriteFileManifest(self,job,filename,input,output,notes):
584 logger.debug("Input files: %s" % input) 585 in_manifest = open(filename.replace("\.sge", ".input"), 'w') 586 if len(input): 587 padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 588 fmt_str = "%-" + padding + "s %s" 589 for i_file, locs in input.items(): 590 for loc in locs: 591 file = fmt_str % (loc, i_file) 592 #if("///" in file): 593 # file=file.replace("///", "/") 594 if notes.has_key(i_file): 595 file += "\t%s" % notes[i_file] 596 job.Write(in_manifest, file) 597 in_manifest.close() 598 logger.debug("Output files: %s" % output) 599 out_manifest = open(filename.replace(".sge", ".output"), 'w') 600 if len(output): 601 padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 602 fmt_str = "%-" + padding + "s %s" 603 for o_file, locs in output.items(): 604 for loc in locs: 605 file = fmt_str % (o_file, loc) 606 #if("///" in file): 607 # file=file.replace("///", "/") 608 job.Write(out_manifest, file) 609 out_manifest.close()
610
611 - def WriteDAGNode(self,job,dagfile,nodename,filename,parents=None,done=False):
612 str = "JOB %s %s" % (nodename,filename) 613 if done: 614 str += " DONE" 615 job.Write(dagfile,str) 616 if parents: 617 job.Write(dagfile, "PARENT %s CHILD %s" % (parents,nodename))
618
619 - def EnumerateParentNodes(self,steering,td):
620 parents = td.GetParents() 621 parentnodes = [] 622 for parent in parents: 623 parentobj = steering.GetTaskDefinition(parent) 624 if parentobj.ParallelExecutionEnabled(): 625 for idx,tray in parentobj.GetTrays().items(): 626 for iter in tray.GetIters(): 627 if not iter == -1: 628 nodename = "%s_%u" % (parent,iter) 629 else: 630 nodename = "%s_ext" % parent 631 parentnodes.append(nodename) 632 else: 633 parentnodes.append(parent) 634 return parentnodes
635
636 - def GetArguments(self,job,td,idx=False,iter=False,output="str"):
637 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 638 argstr = " --dag --task=%s" % td.GetName() 639 if not idx is False: 640 argstr += " --tray=%s --iter=%s" % (idx,iter) 641 argstr += job.GetMainScript() + " " + " ".join(argopts) 642 if output == "dict": 643 argstr = self.ArgStrToDict(argstr) 644 return argstr
645
646 - def ArgStrToDict(self,argstr):
647 args = {} 648 for str in argstr.split(" "): 649 str = str[2:] 650 pieces = str.split("=") 651 if len(pieces) == 1: 652 args[str] = 1 653 else: 654 args[pieces[0]] = pieces[1] 655 return args
656 657
658 - def WriteTaskConfig(self,job,config_file, td, input_dict, output_dict):
659 """ 660 Write sge submit file to a file. 661 @param job: i3Job object 662 @param config_file: path to file were submit file will be written 663 """ 664 logger.debug('WriteConfig') 665 steering = job.GetSteering() 666 args = self.GetArguments(job,td,False,False,output="dict") 667 parser = ExpParser(args,steering) 668 669 tname=td.GetName() 670 671 if not job.GetExecutable(): 672 raise Exception, "no executable configured" 673 674 submitfile = open("%s" % config_file,'w') 675 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile()).replace(".out", "."+tname+".out") 676 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile()).replace(".err", "."+tname+".err") 677 678 job.Write(submitfile,"#!/bin/sh") 679 logger.debug("#!/bin/sh") 680 681 job.Write(submitfile,"#$ -o %s" % outfile) 682 logger.debug("#$ -o %s" % outfile) 683 684 job.Write(submitfile,"#$ -e %s" % errfile ) 685 logger.debug("#$ -e %s" % errfile ) 686 687 # Add general batch options 688 for key in self.GetParamKeys(): 689 job.Write(submitfile,"#$ %s" % self.GetParam(key),parse=True) 690 logger.debug("#$ %s" % self.GetParam(key)) 691 692 # Add job specific batch options 693 for key,opt in job.GetBatchOpts().items(): 694 job.Write(submitfile,"#$ %s" % opt,parse=True) 695 logger.debug("#$ %s" % opt) 696 697 ## add task batch options 698 td_batch_opts = td.GetBatchOpts() 699 logger.debug(td_batch_opts) 700 try: 701 td_batch_opts = parser.parse(td_batch_opts) 702 except Exception, e: 703 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 704 logger.warn("%s: could not parse %s" % (e,td_batch_opts)) 705 td_batch_opts = "" 706 if td_batch_opts: 707 for opt in map(string.strip,td_batch_opts.split(";")): 708 logger.debug("#$ %s" % opt) 709 job.Write(submitfile, "#$ %s" % opt, parse=False) 710 711 #Export environment variable,value pairs 712 for var in self.env.keys(): 713 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False) 714 for var in job.env.keys(): 715 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False) 716 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False) 717 job.Write(submitfile,"unset I3SIMPRODPATH") 718 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 719 job.Write(submitfile,"mkdir -p $RUNDIR",parse=False) 720 logger.debug("RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time())) 721 logger.debug("mkdir -p $RUNDIR") 722 723 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 724 logger.debug("echo \"running on $HOSTNAME:$RUNDIR\"") 725 726 logger.debug('%d' %len(job.GetInputFiles())) 727 for file in job.GetInputFiles()+[job.GetExecutable()]: 728 logger.debug("copying general files for task - file %s" % file) 729 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False) 730 731 manifestfile = os.path.join(job.GetInitialdir(),config_file).replace(".sge", ".input") 732 logger.debug("copy manifest %s" % manifestfile) 733 job.Write(submitfile,"cp %s $RUNDIR" % manifestfile,parse=False) 734 735 manifestfile = os.path.join(job.GetInitialdir(),config_file).replace(".sge", ".output") 736 logger.debug("copy manifest %s" % manifestfile) 737 job.Write(submitfile,"cp %s $RUNDIR" % manifestfile,parse=False) 738 739 dagfile = os.path.join(job.GetInitialdir(),job.GetOutputFile()).replace(".out", ".sge") 740 logger.debug("copy dagfile %s" % dagfile) 741 job.Write(submitfile,"cp %s $RUNDIR" % dagfile,parse=False) 742 743 for key in input_dict.keys(): 744 logger.debug("copying specific files for task - file %s" % key) 745 logger.debug("copy %s", input_dict[key][0]) 746 747 job.Write(submitfile,"cp %s $RUNDIR" % input_dict[key][0][7:],parse=False) 748 749 job.Write(submitfile,"cd $RUNDIR",parse=False) 750 751 dag_string = " --dag --task=%s " % tname 752 753 ## no idx > 0 ??? no itereated trays? 754 755 #if idx is True: 756 # argstr += " --tray=%s --iter=%s" % (idx,iter) 757 758 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 759 argstr = job.GetMainScript() + dag_string + " ".join(argopts) 760 executable = os.path.basename(job.GetExecutable()) 761 logger.debug('executable: %s' % job.GetExecutable()) 762 logger.debug('main script: %s' % job.GetMainScript()) 763 logger.debug('args options: %s' % argopts) 764 logger.debug('arguments: %s' % job.GetArguments()) 765 job.Write(submitfile, "$PYROOT/bin/python %s %s" % (executable, argstr), parse=False) 766 job.Write(submitfile, 'echo "job exited with status $?";',parse=False) 767 768 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'): 769 logger.debug("writing the for loop...") 770 job.Write(submitfile,"for file in *; do") 771 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 772 job.Write(submitfile," mv $file %s" % job.GetInitialdir()) 773 job.Write(submitfile," fi; done") 774 775 job.Write(submitfile,"#clean directory") 776 job.Write(submitfile,"cd /tmp") 777 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False) 778 logger.debug('Submit file written') 779 submitfile.close();
780