Package iceprod :: Package server :: Package plugins :: Module condor
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.plugins.condor

  1  #!/bin/env python 
  2  # 
  3   
  4  """ 
  5   A basic wrapper for submitting and monitoring jobs to Condor.  
  6   This module implements only a small subset of Condor's many features.  
  7   (http://www.cs.wisc.edu/condor)    
  8   Inherits from i3Queue 
  9   
 10   copyright  (c) 2005 the icecube collaboration 
 11   
 12   @version: $Revision: $ 
 13   @date: $Date: $ 
 14   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 15   @todo: implement more functionality of condor. 
 16  """ 
 17   
 18  import os 
 19  import re 
 20  import sys 
 21  import math 
 22  import dircache 
 23  import time 
 24  import string 
 25  import shutil 
 26  import ConfigParser 
 27  import logging 
 28  import commands 
 29  import getpass 
 30  try: 
 31      from cStringIO import StringIO 
 32  except ImportError: 
 33      from StringIO import StringIO 
 34  from os.path import expandvars 
 35  from iceprod.core.odict import OrderedDict 
 36  from iceprod.core import metadata 
 37  from iceprod.core.dataclasses import Steering 
 38  from iceprod.core.lex import ExpParser 
 39  from iceprod.server.db import ConfigDB,MonitorDB 
 40  from iceprod.server.grid import iGrid,DAG 
 41  from iceprod.server.job import i3Job, i3Task 
 42  import dag 
 43   
 44   
 45  logger = logging.getLogger('Condor') 
 46   
 47  condor_status = { 
 48       '0':'UNEXPANDED',  
 49       '1':'QUEUED',  
 50       '2':'PROCESSING',  
 51       '3':'RESET', 
 52       '4':'FINISHED', 
 53       '5':'ERROR', 
 54       '6':'ERROR' # submission error 
 55       } 
 56   
57 -def cstatus(istatus):
58 if not istatus: istatus = '4' 59 if condor_status.has_key(istatus): 60 return condor_status[istatus] 61 return 'X'
62 63
64 -class Condor(iGrid):
65 """ 66 This class represents a job or cluster on a condor system. 67 """ 68 69 # Supported universed in Condor V6.7 70 valid_universes = [ 71 'STANDARD', 72 'PVM', 73 'VANILLA', 74 'SCHEDULER', 75 'MPI', 76 'GLOBUS', 77 'GRID', 78 'JAVA' 79 ] 80 81
82 - def __init__(self):
83 iGrid.__init__(self) 84 self.universe = "vanilla" 85 self.cluster_id = -1 86 self.post = None 87 self.enqueue_cmd = "condor_submit" 88 self.checkqueue_cmd = "condor_q" 89 self.queue_rm_cmd = "condor_rm" 90 self.suffix = "condor" 91 self.exe_prefix = "iceprod." 92 93 self.AddParam("Should_Transfer_Files","ALWAYS") 94 self.AddParam("When_To_Transfer_Output","ON_EXIT")
95
96 - def Submit(self,cookie):
97 """ 98 Submit job/cluster to Condor 99 @param job: i3Job object 100 @param config_file: path to file were submit file will be written 101 """ 102 self.submit_status = '' 103 status_sum = 0 104 cwdir = os.getcwd() 105 106 for job in self.jobs: 107 os.chdir(job.GetInitialdir()) 108 self.WriteConfig(job,job.config_file) 109 110 cmd = self.enqueue_cmd 111 for key in self.GetParamKeys(): 112 if key.startswith('-'): 113 cmd += " %s %s" %( key, self.GetParam(key) ) 114 cmd += " " + job.config_file 115 status, self.submit_status = commands.getstatusoutput(cmd) 116 status_sum += status 117 try: 118 id = self.get_id(self.submit_status) 119 job.SetJobId(id) 120 if id < 0: status_sum += 1 121 except Exception, e: 122 logger.error("Exception: " + str(e)) 123 self.submit_status += "\nException: " + str(e) 124 status_sum += 1 125 job.submit_status = status 126 job.submit_msg = self.submit_status 127 cookie.AddJobId(job.GetJobId()) 128 129 if self.production: 130 # update database 131 if job.submit_status == 0: 132 self.i3monitordb.jobsubmitted( 133 job.GetDatasetId(), job.GetProcNum(), 134 job.GetInitialdir(), job.GetJobId()) 135 else: 136 logger.error("failed to submit jobs:"+job.submit_msg) 137 self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3, 138 "failed to submit jobs:"+job.submit_msg) 139 os.chdir('/tmp') 140 self.CleanDir(job.GetInitialdir()) 141 142 os.chdir(cwdir) 143 144 return status_sum,self.submit_status
145 146
147 - def SetUniverse(self,universe):
148 """ 149 Define the condor universe for this job 150 @param universe: string contaning a valid condor universe 151 """ 152 if universe.upper() in self.valid_universes: 153 self.universe = universe 154 else: 155 raise Exception, 'unsupported universe: %s' % universe
156
157 - def GetUniverse(self):
158 """ 159 Get the condor universe for this job 160 @return: the currently set condor universe. If none has been set 161 the default value of 'vanilla' is returned. 162 """ 163 return self.universe
164 165
166 - def GetJobIds(self):
167 """ 168 Get the cluster AND job id for the submitted jobs. 169 @return: a list of jobs with their cluster and job id 170 in the condor format 171 None if no jobs have been submitted or if submission failed. 172 """ 173 return ['%d.%d' % (self.cluster_id, job_id) \ 174 for job_id in range(self.jobs_submitted)]
175 176
177 - def WriteConfig(self,job,config_file):
178 """ 179 Write condor submit file to a file. 180 @param job: i3Job object 181 @param config_file: path to file were submit file will be written 182 """ 183 184 if not job.GetExecutable(): 185 raise Exception, "no executable configured" 186 187 submitfile = open(config_file,'w') 188 wrapper = open(config_file.replace('condor','sh'),'w') 189 190 job.Write(wrapper,"#!/bin/sh") 191 job.Write(submitfile,"Universe = %s" % self.GetUniverse()) 192 job.Write(submitfile,"Executable = %s" % wrapper.name) 193 job.Write(submitfile,"Log = %s" % job.GetLogFile()) 194 job.Write(submitfile,"Output = %s" % job.GetOutputFile()) 195 job.Write(submitfile,"Error = %s" % job.GetErrorFile()) 196 197 job.Write(wrapper, "ARCH=`uname -m | sed -e 's/Power Macintosh/ppc/ ; s/i686/i386/'`") 198 job.Write(wrapper, "PVER=`uname -r | awk '{split($1,a,\".\"); print a[2]}'`") 199 job.Write(wrapper, "if [ $ARCH == 'i386' ]; then",parse=False) 200 job.Write(wrapper, " if [ $PVER == '4' ]; then",parse=False) 201 job.Write(wrapper, " PLATFORM=Linux-i386") 202 job.Write(wrapper, " elif [ $PVER == '6' ]; then",parse=False) 203 job.Write(wrapper, " PLATFORM=Linux-libstdc++6-i386") 204 job.Write(wrapper, " fi") 205 job.Write(wrapper, "elif [ $ARCH == 'x86_64' ]; then",parse=False) 206 job.Write(wrapper, " if [ $PVER == '4' ]; then",parse=False) 207 job.Write(wrapper, " PLATFORM=Linux-x86_64") 208 job.Write(wrapper, " elif [ $PVER == '6' ]; then",parse=False) 209 job.Write(wrapper, " PLATFORM=Linux-libstdc++6-x86_64") 210 job.Write(wrapper, " fi") 211 job.Write(wrapper, "fi") 212 213 for var in self.env.keys(): 214 job.Write(wrapper, "export %s=%s" % (var, self.env[var]) ,parse=False) 215 for var in job.env.keys(): 216 job.Write(wrapper, "export %s=%s" % (var, job.env[var]) ,parse=False) 217 job.Write(wrapper, 'export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH',parse=False) 218 job.Write(wrapper, 'export LD_LIBRARY_PATH=$PYROOT/lib:$LD_LIBRARY_PATH',parse=False) 219 job.Write(wrapper, 'cmd="$PYROOT/bin/python %s $*"' % os.path.basename(job.GetExecutable()),parse=False) 220 job.Write(wrapper, "echo $cmd",parse=False) 221 job.Write(wrapper, "exec $cmd",parse=False) 222 223 # Define Condor submit directory 224 if job.GetInitialdir(): 225 job.Write(submitfile,"Initialdir = %s" % job.GetInitialdir()) 226 227 if job.GetInputFiles(): 228 inputfile_list = ",".join(job.GetInputFiles()) 229 inputfile_list += ","+job.GetExecutable() 230 job.Write(submitfile,"%s = %s" % ("Transfer_Input_Files",inputfile_list)) 231 232 if job.GetOutputFiles(): 233 outputfile_list = ",".join(job.GetOutputFiles()) 234 job.Write(submitfile,"%s = %s" % ("Transfer_Output_Files",inputfile_list)) 235 236 #if self.GetArgOpt('stageout'): 237 # self.AddParam("When_To_Transfer_Output","ON_ERROR") 238 239 # Add general batch options 240 for key in self.GetParamKeys(): 241 if not key.startswith('-'): 242 job.Write(submitfile, "%s = %s" % (key, self.GetParam(key))) 243 244 # Add job specific batch options 245 for key,opt in job.GetBatchOpts().items(): 246 if key.lower() == "requirements" and self.GetParam(key.lower()): 247 opt += " && " + self.GetParam(key.lower()) 248 if not key.startswith('-'): 249 job.Write(submitfile, "%s = %s" % (key, opt)) 250 251 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 252 argstr = job.GetMainScript() + " " + " ".join(argopts) 253 job.Write(submitfile,"Arguments = %s" % argstr,parse=False) 254 255 job.Write(submitfile, "Queue" ) 256 submitfile.close(); 257 wrapper.close(); 258 os.system("chmod +x %s" % wrapper.name)
259 260
261 - def get_id(self,submit_status):
262 """ 263 Parse string returned by condor on submission to extract the 264 id of the job cluster 265 266 @param submit_status: string returned by condor_submit 267 """ 268 matches = re.findall("[0-9]+ job\(s\) submitted to cluster [0-9]+", 269 submit_status) 270 logger.debug(submit_status) 271 if matches: 272 cluster_info = matches[0].split() 273 job_id = cluster_info[-1] 274 275 self.job_ids.append(job_id) 276 return job_id 277 else: 278 logger.warn('could not parse job id from "%s"' % submit_status) 279 return -1
280
281 - def CheckQ(self,db=None):
282 """ 283 Querie status of cluster or job on condor queue 284 """ 285 286 cmd = "condor_q " + " ".join(self.job_ids) 287 status,output = commands.getstatusoutput(cmd) 288 return output
289 290
291 - def CleanQ(self,jobs=None):
292 """ 293 Querie status of cluster or job on condor queue 294 and remove those which are not active in DB 295 """ 296 297 if not jobs: return 0 298 299 if isinstance(jobs,list): 300 job_list = jobs 301 else: 302 job_list = [jobs] 303 304 jobs_to_remove = [] 305 job_dict = dict() 306 307 for job in job_list: 308 job_dict[job.GetJobId()] = job 309 310 cmd = 'condor_q %s'% getpass.getuser() 311 cmd += ' -format "%s " ClusterId' 312 cmd += ' -format "%s " Cmd' 313 cmd += ' -format "%s " JobStatus' 314 cmd += ' -format "%s\\n" DAGManJobId' 315 status,output = commands.getstatusoutput(cmd) 316 if not status: 317 for line in output.split('\n'): 318 if line.strip() == '': 319 continue 320 try: 321 tok = line.split() 322 jobId = tok[0] 323 executable = os.path.basename(tok[1]) 324 jobStatus = tok[2] 325 if len(tok) > 3: jobId = tok[3] 326 if executable.startswith('condor_dagman'): continue 327 if executable.startswith(self.exe_prefix): 328 extok = executable.split('.') 329 name = extok[0] 330 dataset = extok[1] 331 proc = extok[2] 332 suffix = ".".join(extok[3:]) 333 if int(dataset) and not job_dict.has_key(jobId): 334 logger.warn("removing job %s with status %s. Reason: job not found in list" % \ 335 (jobId,cstatus(jobStatus))) 336 logger.debug("job list [%s]" % str(job_dict.keys())) 337 jobs_to_remove.append(jobId) 338 except Exception,e: 339 logger.error("%s:%s" %(e,line)) 340 apply(sys.excepthook,sys.exc_info()) 341 if len(jobs_to_remove): 342 cmd = "condor_rm %s" % " ".join(jobs_to_remove) 343 logger.debug(cmd) 344 os.system(cmd) 345 346 return status
347 348
349 - def CheckJobStatus(self,jobs):
350 """ 351 Querie status of job on condor queue 352 """ 353 if isinstance(jobs,list): 354 job_list = jobs 355 else: 356 job_list = [jobs] 357 for job in job_list: 358 job_id = job.GetJobId() 359 if job_id < 0: return 0 360 cmd = 'condor_q -format "%s" JobStatus ' + str(job_id) 361 logger.debug(cmd) 362 status,output = commands.getstatusoutput(cmd) 363 if status: 364 job.SetStatus('?') 365 logger.warn(output) 366 else: 367 job.SetStatus(cstatus(output.strip())) 368 logger.debug(cstatus(output.strip())) 369 return 1
370 371
372 - def QRemove(self,jobs):
373 """ 374 Remove cluster or job from condor queue 375 """ 376 if isinstance(jobs,list): 377 job_list = jobs 378 else: 379 job_list = [jobs] 380 381 status = [] 382 for job in job_list: 383 if job: 384 cmd = "condor_rm %s" % job.GetJobId() 385 logger.debug(cmd) 386 handle = os.popen(cmd, 'r') 387 status.append( string.join(handle.readlines()) ) 388 handle.close() 389 return "\n".join(status)
390 391
392 -class CondorNFS(Condor):
393 """ 394 This class represents a job or cluster on a condor system. 395 """ 396
397 - def __init__(self):
398 Condor.__init__(self)
399 400
401 - def WriteConfig(self,job,config_file):
402 """ 403 Write condor submit file to a file. 404 @param job: i3Job object 405 @param config_file: path to file were submit file will be written 406 """ 407 Condor.WriteConfig(self,job,config_file) 408 os.system("chmod +x %s" % config_file) 409 410 for infile in job.GetInputFiles(): 411 os.system("cp %s %s" % (infile,job.GetInitialdir())) 412 os.system("cp %s %s" % (job.GetExecutable(),job.GetInitialdir()))
413
414 -class CondorDAG(Condor):
415 """ 416 This class represents a job that executes in multiple parts using a DAG. 417 """ 418
419 - def __init__(self):
420 Condor.__init__(self) 421 self.enqueue_cmd = "condor_submit_dag -f -notification never" 422 self.exe_prefix = "condor_dagman"
423
424 - def WriteConfig(self,job,config_file):
425 """ 426 Write condor submit file to a file. 427 @param job: i3Job object 428 @param config_file: path to file where submit file will be written 429 """ 430 431 if not job.GetExecutable(): 432 raise Exception, "no executable configured" 433 434 from iceprod.core.dataclasses import IceTrayConfig 435 436 db = self.GetMonitorDB() 437 438 steering = job.GetSteering() 439 task_defs = steering.GetTaskDefinitions() 440 logger.debug("Task definitions: %s" % task_defs) 441 if not len(task_defs): 442 # not a DAG 443 logger.debug("No tasks specified in config file; doing regular submit") 444 self.enqueue_cmd = "condor_submit" 445 return Condor.WriteConfig(self, job, config_file) 446 447 dagfile = StringIO() 448 job_id = job.GetDatabaseId() 449 450 file_catalog = {} 451 for taskname,td in task_defs.items(): 452 args = self.GetArguments(job,td,output="dict") 453 file_catalog[taskname] = self.GetFiles(job,td,args) 454 455 for taskname,td in task_defs.items(): 456 filename = config_file.replace('condor',"%s.condor" % taskname) 457 td_id = td.GetId() 458 logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, filename)) 459 parents = " ".join(self.EnumerateParentNodes(steering,td)) 460 if td.ParallelExecutionEnabled(): 461 trays = td.GetTrays() 462 for idx,tray in trays.items(): 463 for iter in tray.GetIters(): 464 if not iter == -1: 465 nodename = "%s_%u" % (taskname,iter) 466 else: 467 nodename = "%s_ext" % taskname 468 filename = config_file.replace('condor',"%s.condor" % nodename) 469 done = db.task_is_finished(td_id, job_id, idx, iter) 470 args = self.GetArguments(job,td,idx,iter,output="dict") 471 472 input,output,notes = self.GetFiles(job,td,args,idx,iter,file_catalog) 473 self.WriteFileManifest(job,filename,input,output,notes) 474 self.WriteSubmitFile(job,filename,td,idx,iter) 475 self.WriteDAGNode(job,dagfile,nodename,filename,parents,done) 476 else: 477 done = db.task_is_finished(td_id, job_id) 478 if taskname == 'trashcan': 479 done = False 480 args = self.GetArguments(job,td,output="dict") 481 482 input,output,notes = self.GetFiles(job,td,args,catalog=file_catalog) 483 self.WriteFileManifest(job,filename,input,output,notes) 484 self.WriteSubmitFile(job,filename,td) 485 self.WriteDAGNode(job,dagfile,taskname,filename,parents,done) 486 db.commit() 487 488 # reorder DAGNode so definitions are before relationships 489 dagfile.seek(0) 490 job_def = [] 491 other = [] 492 for line in dagfile: 493 if line.startswith('JOB'): 494 job_def.append(line) 495 else: 496 other.append(line) 497 dagfile.close() 498 dagfile = open(config_file,'w') 499 for line in job_def: 500 dagfile.write(line) 501 for line in other: 502 dagfile.write(line) 503 504 # write DOT 505 job.Write(dagfile,"DOT dag.dot") 506 dagfile.close()
507
508 - def IsUrl(self, path):
509 return bool(re.match("[^/:]+://?.*$", path))
510
511 - def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False):
512 steering = job.GetSteering() 513 parser = ExpParser(args,steering) 514 if idx is not False: 515 td_trays = {idx: td.GetTray(idx)} 516 else: 517 td_trays = td.GetTrays() 518 519 if steering.GetSysOpt("dagtemp"): 520 tmp_dir = parser.parse(steering.GetSysOpt("dagtemp").GetValue()) 521 elif args.has_key('dagtemp'): 522 tmp_dir = parser.parse(args['dagtemp']) 523 else: 524 tmp_dir = 'file:///tmp' # TODO: give this a sane default or make it an error 525 526 if args.has_key('fetch'): 527 global_dir = args['fetch'] 528 else: 529 global_dir = 'file:///tmp' 530 531 td_input = {} 532 td_output = {} 533 notes = {} 534 535 if td.IsCleanup(): 536 # cleanup job has no input/output files required 537 return (td_input, td_output, notes) 538 539 for idx, td_tray in td_trays.iteritems(): 540 args['tray'] = idx 541 542 logger.debug("GetTray(%s)" % idx) 543 icetray = steering.GetTray(idx) 544 545 input_files = icetray.GetInputFiles() 546 parsed_input = [] 547 548 output_files = icetray.GetOutputFiles() 549 parsed_output = [] 550 551 if iter is not False: 552 iters = [iter] 553 else: 554 iters = td_tray.GetIters() 555 for iter in iters: 556 args['iter'] = iter 557 parser = ExpParser(args,steering) 558 for d in steering.GetDependencies(): 559 d_file = parser.parse(d) 560 if not td_input.has_key(d_file): 561 location = d_file 562 if not self.IsUrl(location): 563 location = os.path.join(global_dir, location) 564 td_input[os.path.basename(d_file)] = [location] 565 for i_file in input_files: 566 name = i_file.GetName() 567 name = parser.parse(name) 568 if not td_input.has_key(name) \ 569 and not td_output.has_key(name): 570 if catalog: 571 node = self.FindFile(steering,td,catalog,name) 572 else: 573 node = td.GetName() 574 575 note = False 576 if node == "global": 577 location = global_dir 578 note = "global" 579 if node != "global": 580 location = tmp_dir 581 location = os.path.join(location, str(job.GetDatasetId())) 582 location = os.path.join(location, str(job.GetProcNum())) 583 location = os.path.join(location, node) 584 note = "dontextract" 585 location = os.path.join(location, name) 586 if i_file.IsPhotonicsTable(): 587 note = "photonics"; 588 if note: 589 notes[name] = note 590 td_input[name] = [location] 591 for o_file in output_files: 592 name = o_file.GetName() 593 name = parser.parse(name) 594 if not td_output.has_key(name): 595 location = os.path.join(tmp_dir, str(job.GetDatasetId())) 596 location = os.path.join(location, str(job.GetProcNum())) 597 location = os.path.join(location, str(td.GetName())) 598 location = os.path.join(location, name) 599 td_output[name] = [location] 600 601 return (td_input, td_output, notes)
602
603 - def FindFile(self,steering,td,catalog,file):
604 parents = td.GetParents() 605 606 # check immediate parents for this file 607 for parent in parents: 608 if catalog.has_key(parent): 609 if catalog[parent][1].has_key(file): 610 return parent 611 612 # check older ancestors 613 for parent in parents: 614 parent_td = steering.GetTaskDefinition(parent) 615 result = self.FindFile(steering,parent_td,catalog,file) 616 if result != "global": 617 return result 618 619 return "global"
620
621 - def WriteFileManifest(self,job,filename,input,output,notes):
622 logger.debug("Input files: %s" % input) 623 in_manifest = open(filename.replace(".condor", ".input"), 'w') 624 if len(input): 625 padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 626 fmt_str = "%-" + padding + "s %s" 627 for i_file, locs in input.items(): 628 for loc in locs: 629 file = fmt_str % (loc, i_file) 630 if notes.has_key(i_file): 631 file += "\t%s" % notes[i_file] 632 job.Write(in_manifest, file) 633 in_manifest.close() 634 logger.debug("Output files: %s" % output) 635 out_manifest = open(filename.replace(".condor", ".output"), 'w') 636 if len(output): 637 padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 638 fmt_str = "%-" + padding + "s %s" 639 for o_file, locs in output.items(): 640 for loc in locs: 641 job.Write(out_manifest, fmt_str % (o_file, loc)) 642 out_manifest.close()
643
644 - def WriteDAGNode(self,job,dagfile,nodename,filename,parents=None,done=False):
645 str = "JOB %s %s" % (nodename,filename) 646 if done: 647 str += " DONE" 648 job.Write(dagfile,str) 649 if parents: 650 job.Write(dagfile, "PARENT %s CHILD %s" % (parents,nodename))
651
652 - def EnumerateParentNodes(self,steering,td):
653 parents = td.GetParents() 654 parentnodes = [] 655 for parent in parents: 656 parentobj = steering.GetTaskDefinition(parent) 657 if parentobj.ParallelExecutionEnabled(): 658 for idx,tray in parentobj.GetTrays().items(): 659 for iter in tray.GetIters(): 660 if not iter == -1: 661 nodename = "%s_%u" % (parent,iter) 662 else: 663 nodename = "%s_ext" % parent 664 parentnodes.append(nodename) 665 else: 666 parentnodes.append(parent) 667 return parentnodes
668
669 - def GetArguments(self,job,td,idx=False,iter=False,output="str"):
670 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 671 argstr = " --dag --task=%s" % td.GetName() 672 if not idx is False: 673 argstr += " --tray=%s --iter=%s" % (idx,iter) 674 argstr += job.GetMainScript() + " " + " ".join(argopts) 675 if output == "dict": 676 argstr = self.ArgStrToDict(argstr) 677 return argstr
678
679 - def ArgStrToDict(self,argstr):
680 args = {} 681 for str in argstr.split(" "): 682 str = str[2:] 683 pieces = str.split("=") 684 if len(pieces) == 1: 685 args[str] = 1 686 else: 687 args[pieces[0]] = pieces[1] 688 return args
689
690 - def getAdditionalOptions(self,job):
691 return {}
692
693 - def WriteSubmitFile(self,job,filename,td,idx=False,iter=False):
694 wrapper = open(filename.replace('condor','sh'),'w') 695 args = self.GetArguments(job,td,idx,iter,output="dict") 696 steering = job.GetSteering() 697 parser = ExpParser(args,steering) 698 699 job.Write(wrapper,"#!/bin/sh") 700 job.Write(wrapper, "ARCH=`uname -m | sed -e 's/Power Macintosh/ppc/ ; s/i686/i386/'`") 701 job.Write(wrapper, "PVER=`uname -r | awk '{split($1,a,\".\"); print a[2]}'`") 702 job.Write(wrapper, "if [ $ARCH == 'i386' ]; then",parse=False) 703 job.Write(wrapper, " if [ $PVER == '4' ]; then",parse=False) 704 job.Write(wrapper, " PLATFORM=Linux-i386") 705 job.Write(wrapper, " elif [ $PVER == '6' ]; then",parse=False) 706 job.Write(wrapper, " PLATFORM=Linux-libstdc++6-i386") 707 job.Write(wrapper, " fi") 708 job.Write(wrapper, "elif [ $ARCH == 'x86_64' ]; then",parse=False) 709 job.Write(wrapper, " if [ $PVER == '4' ]; then",parse=False) 710 job.Write(wrapper, " PLATFORM=Linux-x86_64") 711 job.Write(wrapper, " elif [ $PVER == '6' ]; then",parse=False) 712 job.Write(wrapper, " PLATFORM=Linux-libstdc++6-x86_64") 713 job.Write(wrapper, " fi") 714 job.Write(wrapper, "fi") 715 716 for var in self.env.keys(): 717 job.Write(wrapper, "export %s=%s" % (var, self.env[var]), parse=False) 718 for var in job.env.keys(): 719 job.Write(wrapper, "export %s=%s" % (var, job.env[var]), parse=False) 720 job.Write(wrapper, 'export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH', parse=False) 721 job.Write(wrapper, 'cmd="$PYROOT/bin/python -u %s $*"' % os.path.basename(job.GetExecutable()), parse=False) 722 job.Write(wrapper, "echo $cmd", parse=False) 723 job.Write(wrapper, "exec $cmd", parse=False) 724 wrapper.close(); 725 os.system("chmod +x %s" % wrapper.name) 726 727 submitfile = open(filename,'w') 728 job.Write(submitfile,"Universe = %s" % self.GetUniverse()) 729 job.Write(submitfile,"Executable = %s" % wrapper.name) 730 if td.GetName() == "trashcan": 731 job.Write(submitfile,"Priority = 2") 732 job.Write(submitfile,"Log = %s" % job.GetLogFile()) 733 job.Write(submitfile,"Output = %s" % filename.replace(self.suffix, 'out')) 734 job.Write(submitfile,"Error = %s" % filename.replace(self.suffix, 'err')) 735 job.Write(submitfile,"Notification = Never") 736 737 # Define Condor submit directory 738 if job.GetInitialdir(): 739 job.Write(submitfile,"Initialdir = %s" % job.GetInitialdir()) 740 741 if job.GetInputFiles(): 742 inputfile_list = ",".join(job.GetInputFiles()) 743 inputfile_list += ","+job.GetExecutable() 744 inputfile_list += ","+filename.replace(".condor", ".input") 745 inputfile_list += ","+filename.replace(".condor", ".output") 746 job.Write(submitfile,"%s = %s" % ("Transfer_Input_Files",inputfile_list)) 747 748 if job.GetOutputFiles(): 749 outputfile_list = ",".join(job.GetOutputFiles()) 750 job.Write(submitfile,"%s = %s" % ("Transfer_Output_Files",outputfile_list)) 751 752 # Add general batch options 753 req = [] 754 if td.GetRequirements(): 755 req.append(parser.parse(td.GetRequirements())) 756 if td.GetBatchOpts(): 757 for opt in parser.parse(td.GetBatchOpts()).split(";"): 758 if not opt.lower().startswith("requirements"): 759 job.Write(submitfile, opt) 760 for key in self.GetParamKeys(): 761 if key.lower() == "requirements": 762 req.append(self.GetParam(key)) 763 else: 764 val = self.GetParam(key) 765 if not key.startswith('-'): 766 job.Write(submitfile, "%s = %s" % (key, val)) 767 768 # Add general batch options 769 for key in self.GetParamKeys(): 770 if key.lower() == "requirements": 771 req.append(self.GetParam(key)) 772 if not key.startswith('-'): 773 job.Write(submitfile, "%s = %s" % (key, self.GetParam(key))) 774 775 # Add job specific batch options 776 for opt in job.GetSteering().GetBatchOpts(): 777 if opt.GetName().lower() == "requirements": 778 req.append(opt.GetValue()) 779 else: 780 if not opt.GetName().startswith('-'): 781 job.Write(submitfile, "%s = %s" % (opt.GetName(), opt.GetValue())) 782 if req: 783 job.Write(submitfile, "Requirements = (%s) " % " && ".join(req)) 784 785 786 for name, value in self.GetAdditionalOptions(job).items(): 787 job.Write(submitfile, "%s = %s" % (name, value)) 788 789 argstr = self.GetArguments(job,td,idx,iter) 790 job.Write(submitfile,"Arguments = %s" % argstr,parse=False) 791 792 job.Write(submitfile, "Queue" ) 793 submitfile.close();
794
795 - def GetAdditionalOptions(self,job):
796 #return {'x509userproxy': expandvars(job.GetSteering().GetSysOpt("globus_proxy").GetValue())} 797 return {}
798
799 -class CondorG(CondorDAG):
800 - def __init__(self):
801 CondorDAG.__init__(self) 802 self.SetUniverse("grid")
803
804 - def GetAdditionalOptions(self,job):
805 return {'x509userproxy': expandvars(job.GetSteering().GetSysOpt("globus_proxy").GetValue())}
806 807
808 -class CondorI3DAG(dag.TaskQ,Condor):
809 - def __init__(self):
810 Condor.__init__(self) 811 dag.TaskQ.__init__(self) # inheritance resolution order is important!!!! 812 self.logger = logging.getLogger('CondorI3DAG') 813 self.suffix = "condor" 814 self.AddParam("Should_Transfer_Files","ALWAYS") 815 self.AddParam("When_To_Transfer_Output","ON_EXIT")
816
817 - def Submit(self,cookie):
818 819 status_sum = 0 820 submit_status = "" 821 for job in self.jobs: 822 if isinstance(job,i3Task): 823 self.logger.info("submitting task") 824 status, stdout = dag.TaskQ.SubmitTask(self,job) 825 cookie.AddJobId(job.GetJobId()) 826 status_sum += status 827 submit_status += stdout 828 return status_sum, submit_status
829