Package iceprod :: Package server :: Package plugins :: Module condor
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.plugins.condor

  1  #!/bin/env python 
  2  # 
  3   
  4  """ 
  5   A basic wrapper for submitting and monitoring jobs to Condor.  
  6   This module implements only a small subset of Condor's many features.  
  7   (http://www.cs.wisc.edu/condor)    
  8   Inherits from i3Queue 
  9   
 10   copyright  (c) 2005 the icecube collaboration 
 11   
 12   @version: $Revision: $ 
 13   @date: $Date: $ 
 14   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 15   @todo: implement more functionality of condor. 
 16  """ 
 17   
 18  import os 
 19  import re 
 20  import sys 
 21  import math 
 22  import dircache 
 23  import time 
 24  import string 
 25  import shutil 
 26  import ConfigParser 
 27  import logging 
 28  import commands 
 29  import getpass 
 30  from os.path import expandvars 
 31  from iceprod.core import metadata 
 32  from iceprod.core.dataclasses import Steering 
 33  from iceprod.core.lex import ExpParser 
 34  from iceprod.server.db import ConfigDB,MonitorDB 
 35  from iceprod.server.grid import iGrid 
 36   
 37  logger = logging.getLogger('Condor') 
 38   
 39  condor_status = { 
 40       '0':'UNEXPANDED',  
 41       '1':'QUEUED',  
 42       '2':'PROCESSING',  
 43       '3':'RESET', 
 44       '4':'FINISHED', 
 45       '5':'ERROR', 
 46       '6':'ERROR' # submission error 
 47       } 
 48   
49 -def cstatus(istatus):
50 if not istatus: istatus = '4' 51 if condor_status.has_key(istatus): 52 return condor_status[istatus] 53 return 'X'
54 55
56 -class Condor(iGrid):
57 """ 58 This class represents a job or cluster on a condor system. 59 """ 60 61 # Supported universed in Condor V6.7 62 valid_universes = [ 63 'STANDARD', 64 'PVM', 65 'VANILLA', 66 'SCHEDULER', 67 'MPI', 68 'GLOBUS', 69 'GRID', 70 'JAVA' 71 ] 72 73
74 - def __init__(self):
75 iGrid.__init__(self) 76 self.universe = "vanilla" 77 self.cluster_id = -1 78 self.post = None 79 self.enqueue_cmd = "condor_submit" 80 self.checkqueue_cmd = "condor_q" 81 self.queue_rm_cmd = "condor_rm" 82 self.suffix = "condor" 83 self.exe_prefix = "iceprod." 84 85 self.AddParam("Should_Transfer_Files","ALWAYS") 86 self.AddParam("When_To_Transfer_Output","ON_EXIT")
87
88 - def Submit(self,job,config_file):
89 """ 90 Submit job/cluster to Condor 91 @param job: i3Job object 92 @param config_file: path to file were submit file will be written 93 """ 94 self.submit_status = '' 95 self.WriteConfig(job,config_file) 96 97 cmd = self.enqueue_cmd 98 for key in self.GetParamKeys(): 99 if key.startswith('-'): 100 cmd += " %s %s" %( key, self.GetParam(key) ) 101 cmd += " " + config_file 102 status, self.submit_status = commands.getstatusoutput(cmd) 103 try: 104 id = self.get_id(self.submit_status) 105 job.SetJobId(id) 106 if id < 0: status = 1 107 status = 0 108 except Exception, e: 109 logger.error("Exception: " + str(e)) 110 self.submit_status += "\nException: " + str(e) 111 status = 1 112 113 if len(self.job_ids) and not self.cluster_id: 114 self.cluster_id = self.job_ids[0] 115 116 return status,self.submit_status
117 118
119 - def SetUniverse(self,universe):
120 """ 121 Define the condor universe for this job 122 @param universe: string contaning a valid condor universe 123 """ 124 if universe.upper() in self.valid_universes: 125 self.universe = universe 126 else: 127 raise Exception, 'unsupported universe: %s' % universe
128
129 - def GetUniverse(self):
130 """ 131 Get the condor universe for this job 132 @return: the currently set condor universe. If none has been set 133 the default value of 'vanilla' is returned. 134 """ 135 return self.universe
136 137
138 - def GetJobIds(self):
139 """ 140 Get the cluster AND job id for the submitted jobs. 141 @return: a list of jobs with their cluster and job id 142 in the condor format 143 None if no jobs have been submitted or if submission failed. 144 """ 145 return ['%d.%d' % (self.cluster_id, job_id) \ 146 for job_id in range(self.jobs_submitted)]
147 148
149 - def WriteConfig(self,job,config_file):
150 """ 151 Write condor submit file to a file. 152 @param job: i3Job object 153 @param config_file: path to file were submit file will be written 154 """ 155 156 if not job.GetExecutable(): 157 raise Exception, "no executable configured" 158 159 submitfile = open(config_file,'w') 160 wrapper = open(config_file.replace('condor','sh'),'w') 161 162 job.Write(wrapper,"#!/bin/sh") 163 job.Write(submitfile,"Universe = %s" % self.GetUniverse()) 164 job.Write(submitfile,"Executable = %s" % wrapper.name) 165 job.Write(submitfile,"Log = %s" % job.GetLogFile()) 166 job.Write(submitfile,"Output = %s" % job.GetOutputFile()) 167 job.Write(submitfile,"Error = %s" % job.GetErrorFile()) 168 169 job.Write(wrapper, "ARCH=`uname -m | sed -e 's/Power Macintosh/ppc/ ; s/i686/i386/'`") 170 job.Write(wrapper, "PVER=`uname -r | awk '{split($1,a,\".\"); print a[2]}'`") 171 job.Write(wrapper, "if [ $ARCH == 'i386' ]; then",parse=False) 172 job.Write(wrapper, " if [ $PVER == '4' ]; then",parse=False) 173 job.Write(wrapper, " PLATFORM=Linux-i386") 174 job.Write(wrapper, " elif [ $PVER == '6' ]; then",parse=False) 175 job.Write(wrapper, " PLATFORM=Linux-libstdc++6-i386") 176 job.Write(wrapper, " fi") 177 job.Write(wrapper, "elif [ $ARCH == 'x86_64' ]; then",parse=False) 178 job.Write(wrapper, " if [ $PVER == '4' ]; then",parse=False) 179 job.Write(wrapper, " PLATFORM=Linux-x86_64") 180 job.Write(wrapper, " elif [ $PVER == '6' ]; then",parse=False) 181 job.Write(wrapper, " PLATFORM=Linux-libstdc++6-x86_64") 182 job.Write(wrapper, " fi") 183 job.Write(wrapper, "fi") 184 185 for var in self.env.keys(): 186 job.Write(wrapper, "export %s=%s" % (var, self.env[var]) ,parse=False) 187 for var in job.env.keys(): 188 job.Write(wrapper, "export %s=%s" % (var, job.env[var]) ,parse=False) 189 job.Write(wrapper, 'export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH',parse=False) 190 job.Write(wrapper, 'cmd="$PYROOT/bin/python %s $*"' % os.path.basename(job.GetExecutable()),parse=False) 191 job.Write(wrapper, "echo $cmd",parse=False) 192 job.Write(wrapper, "exec $cmd",parse=False) 193 194 # Define Condor submit directory 195 if job.GetInitialdir(): 196 job.Write(submitfile,"Initialdir = %s" % job.GetInitialdir()) 197 198 if job.GetInputFiles(): 199 inputfile_list = ",".join(job.GetInputFiles()) 200 inputfile_list += ","+job.GetExecutable() 201 job.Write(submitfile,"%s = %s" % ("Transfer_Input_Files",inputfile_list)) 202 203 if job.GetOutputFiles(): 204 outputfile_list = ",".join(job.GetOutputFiles()) 205 job.Write(submitfile,"%s = %s" % ("Transfer_Output_Files",inputfile_list)) 206 207 #if self.GetArgOpt('stageout'): 208 # self.AddParam("When_To_Transfer_Output","ON_ERROR") 209 210 # Add general batch options 211 for key in self.GetParamKeys(): 212 if not key.startswith('-'): 213 job.Write(submitfile, "%s = %s" % (key, self.GetParam(key))) 214 215 # Add job specific batch options 216 for key,opt in job.GetBatchOpts().items(): 217 if not key.startswith('-'): 218 job.Write(submitfile, "%s = %s" % (key, opt)) 219 220 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 221 argstr = job.GetMainScript() + " " + " ".join(argopts) 222 job.Write(submitfile,"Arguments = %s" % argstr,parse=False) 223 224 job.Write(submitfile, "Queue" ) 225 submitfile.close(); 226 wrapper.close(); 227 os.system("chmod +x %s" % wrapper.name)
228 229
230 - def get_id(self,submit_status):
231 """ 232 Parse string returned by condor on submission to extract the 233 id of the job cluster 234 235 @param submit_status: string returned by condor_submit 236 """ 237 matches = re.findall("[0-9]+ job\(s\) submitted to cluster [0-9]+", 238 submit_status) 239 logger.debug(submit_status) 240 if matches: 241 cluster_info = matches[0].split() 242 job_id = cluster_info[-1] 243 244 self.job_ids.append(job_id) 245 return job_id 246 else: 247 logger.warn('could not parse job id from "%s"' % submit_status) 248 return -1
249
250 - def CheckQ(self,db=None):
251 """ 252 Querie status of cluster or job on condor queue 253 """ 254 255 cmd = "condor_q " + " ".join(self.job_ids) 256 status,output = commands.getstatusoutput(cmd) 257 return output
258 259
260 - def CleanQ(self,jobs=None):
261 """ 262 Querie status of cluster or job on condor queue 263 and remove those which are not active in DB 264 """ 265 266 if not jobs: return 0 267 268 if isinstance(jobs,list): 269 job_list = jobs 270 else: 271 job_list = [jobs] 272 273 jobs_to_remove = [] 274 job_dict = dict() 275 276 for job in job_list: 277 job_dict[job.GetJobId()] = job 278 279 cmd = 'condor_q %s'% getpass.getuser() 280 cmd += ' -format "%s " ClusterId' 281 cmd += ' -format "%s " Cmd' 282 cmd += ' -format "%s\\n" JobStatus' 283 status,output = commands.getstatusoutput(cmd) 284 if not status: 285 for line in output.split('\n'): 286 try: 287 tok = line.split() 288 jobId = tok[0] 289 executable = os.path.basename(tok[1]) 290 jobStatus = tok[2] 291 if executable.startswith(self.exe_prefix): 292 name,dataset,proc,suffix = executable.split('.') 293 if int(dataset) and not job_dict.has_key(jobId): 294 logger.warn("removing job %s with status %s. Reason: job not found in list" % \ 295 (jobId,cstatus(jobStatus))) 296 logger.debug("job list [%s]" % str(job_dict.keys())) 297 jobs_to_remove.append(jobId) 298 except Exception,e: 299 logger.error("%s:%s" %(e,line)) 300 if len(jobs_to_remove): 301 cmd = "condor_rm %s" % " ".join(jobs_to_remove) 302 logger.debug(cmd) 303 os.system(cmd) 304 305 return status
306 307
308 - def CheckJobStatus(self,jobs):
309 """ 310 Querie status of job on condor queue 311 """ 312 if isinstance(jobs,list): 313 job_list = jobs 314 else: 315 job_list = [jobs] 316 for job in job_list: 317 job_id = job.GetJobId() 318 if job_id < 0: return 0 319 cmd = 'condor_q -format "%s" JobStatus ' + str(job_id) 320 status,output = commands.getstatusoutput(cmd) 321 if status: 322 job.SetStatus('?') 323 else: 324 job.SetStatus(cstatus(output.strip())) 325 return 1
326 327
328 - def QRemove(self,jobs):
329 """ 330 Remove cluster or job from condor queue 331 """ 332 if isinstance(jobs,list): 333 job_list = jobs 334 else: 335 job_list = [jobs] 336 337 status = [] 338 for jobid in job_list: 339 if jobid: 340 cmd = "condor_rm %s" % jobid 341 logger.debug(cmd) 342 handle = os.popen(cmd, 'r') 343 status.append( string.join(handle.readlines()) ) 344 handle.close() 345 return "\n".join(status)
346 347
348 -class CondorNFS(Condor):
349 """ 350 This class represents a job or cluster on a condor system. 351 """ 352
353 - def __init__(self):
354 Condor.__init__(self)
355 356
357 - def WriteConfig(self,job,config_file):
358 """ 359 Write condor submit file to a file. 360 @param job: i3Job object 361 @param config_file: path to file were submit file will be written 362 """ 363 Condor.WriteConfig(self,job,config_file) 364 os.system("chmod +x %s" % config_file) 365 366 for infile in job.GetInputFiles(): 367 os.system("cp %s %s" % (infile,job.GetInitialdir())) 368 os.system("cp %s %s" % (job.GetExecutable(),job.GetInitialdir()))
369
370 -class CondorDAG(Condor):
371 """ 372 This class represents a job that executes in multiple parts using a DAG. 373 """ 374
375 - def __init__(self):
376 Condor.__init__(self) 377 self.enqueue_cmd = "condor_submit_dag -f" 378 self.exe_prefix = "condor_dagman"
379
380 - def WriteConfig(self,job,config_file):
381 """ 382 Write condor submit file to a file. 383 @param job: i3Job object 384 @param config_file: path to file where submit file will be written 385 """ 386 387 if not job.GetExecutable(): 388 raise Exception, "no executable configured" 389 390 from iceprod.core.dataclasses import IceTrayConfig 391 392 db = self.GetMonitorDB() 393 394 steering = job.GetSteering() 395 task_defs = steering.GetTaskDefinitions() 396 logger.debug("Task definitions: %s" % task_defs) 397 if not len(task_defs): 398 # not a DAG 399 logger.debug("No tasks specified in config file; doing regular submit") 400 self.enqueue_cmd = "condor_submit" 401 return Condor.WriteConfig(self, job, config_file) 402 403 dagfile = open(config_file,'w') 404 job_id = job.GetDatabaseId() 405 406 file_catalog = {} 407 for taskname,td in task_defs.items(): 408 args = self.GetArguments(job,td,output="dict") 409 file_catalog[taskname] = self.GetFiles(job,td,args) 410 411 for taskname,td in task_defs.items(): 412 filename = config_file.replace('condor',"%s.condor" % taskname) 413 td_id = td.GetId() 414 logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, filename)) 415 parents = " ".join(self.EnumerateParentNodes(steering,td)) 416 if td.ParallelExecutionEnabled(): 417 trays = td.GetTrays() 418 for idx,tray in trays.items(): 419 for iter in tray.GetIters(): 420 if not iter == -1: 421 nodename = "%s_%u" % (taskname,iter) 422 else: 423 nodename = "%s_ext" % taskname 424 filename = config_file.replace('condor',"%s.condor" % nodename) 425 done = db.task_is_finished(td_id, job_id, idx, iter) 426 args = self.GetArguments(job,td,idx,iter,output="dict") 427 428 input,output,notes = self.GetFiles(job,td,args,idx,iter,file_catalog) 429 self.WriteFileManifest(job,filename,input,output,notes) 430 self.WriteSubmitFile(job,filename,td,idx,iter) 431 self.WriteDAGNode(job,dagfile,nodename,filename,parents,done) 432 else: 433 done = db.task_is_finished(td_id, job_id) 434 args = self.GetArguments(job,td,output="dict") 435 436 input,output,notes = self.GetFiles(job,td,args,catalog=file_catalog) 437 self.WriteFileManifest(job,filename,input,output,notes) 438 self.WriteSubmitFile(job,filename,td) 439 self.WriteDAGNode(job,dagfile,taskname,filename,parents,done) 440 db.commit() 441 442 job.Write(dagfile,"DOT dag.dot") 443 dagfile.close()
444
445 - def IsUrl(self, path):
446 return bool(re.match("[^/:]+://?.*$", path))
447
448 - def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False):
449 steering = job.GetSteering() 450 if idx is not False: 451 td_trays = {idx: td.GetTray(idx)} 452 else: 453 td_trays = td.GetTrays() 454 455 if args.has_key('dagtemp'): 456 tmp_dir = args['dagtemp'] 457 else: 458 tmp_dir = 'file:///tmp' # TODO: give this a sane default or make it an error 459 460 if args.has_key('fetch'): 461 global_dir = args['fetch'] 462 else: 463 global_dir = 'file:///tmp' 464 465 td_input = {} 466 td_output = {} 467 notes = {} 468 469 if td.IsCleanup(): 470 # cleanup job has no input/output files required 471 return (td_input, td_output, notes) 472 473 for idx, td_tray in td_trays.iteritems(): 474 args['tray'] = idx 475 icetray = steering.GetTray(idx) 476 477 input_files = icetray.GetInputFiles() 478 parsed_input = [] 479 480 output_files = icetray.GetOutputFiles() 481 parsed_output = [] 482 483 if iter is not False: 484 iters = [iter] 485 else: 486 iters = td_tray.GetIters() 487 for iter in iters: 488 args['iter'] = iter 489 parser = ExpParser(args,steering) 490 for d in steering.GetDependencies(): 491 d_file = parser.parse(d) 492 if not td_input.has_key(d_file): 493 location = d_file 494 if not self.IsUrl(location): 495 location = os.path.join(global_dir, location) 496 td_input[os.path.basename(d_file)] = [location] 497 for i_file in input_files: 498 name = i_file.GetName() 499 name = parser.parse(name) 500 if not td_input.has_key(name) \ 501 and not td_output.has_key(name): 502 if catalog: 503 node = self.FindFile(steering,td,catalog,name) 504 else: 505 node = td.GetName() 506 507 note = False 508 if node == "global": 509 location = global_dir 510 note = "global" 511 if node != "global": 512 location = tmp_dir 513 location = os.path.join(location, str(job.GetDatasetId())) 514 location = os.path.join(location, str(job.GetProcNum())) 515 location = os.path.join(location, node) 516 note = "dontextract" 517 location = os.path.join(location, name) 518 if i_file.IsPhotonicsTable(): 519 note = "photonics"; 520 if note: 521 notes[name] = note 522 td_input[name] = [location] 523 for o_file in output_files: 524 name = o_file.GetName() 525 name = parser.parse(name) 526 if not td_output.has_key(name): 527 location = os.path.join(tmp_dir, str(job.GetDatasetId())) 528 location = os.path.join(location, str(job.GetProcNum())) 529 location = os.path.join(location, str(td.GetName())) 530 location = os.path.join(location, name) 531 td_output[name] = [location] 532 533 return (td_input, td_output, notes)
534
535 - def FindFile(self,steering,td,catalog,file):
536 parents = td.GetParents() 537 538 # check immediate parents for this file 539 for parent in parents: 540 if catalog.has_key(parent): 541 if catalog[parent][1].has_key(file): 542 return parent 543 544 # check older ancestors 545 for parent in parents: 546 parent_td = steering.GetTaskDefinition(parent) 547 result = self.FindFile(steering,parent_td,catalog,file) 548 if result != "global": 549 return result 550 551 return "global"
552
553 - def WriteFileManifest(self,job,filename,input,output,notes):
554 logger.debug("Input files: %s" % input) 555 in_manifest = open(filename.replace(".condor", ".input"), 'w') 556 if len(input): 557 padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 558 fmt_str = "%-" + padding + "s %s" 559 for i_file, locs in input.items(): 560 for loc in locs: 561 file = fmt_str % (loc, i_file) 562 if notes.has_key(i_file): 563 file += "\t%s" % notes[i_file] 564 job.Write(in_manifest, file) 565 in_manifest.close() 566 logger.debug("Output files: %s" % output) 567 out_manifest = open(filename.replace(".condor", ".output"), 'w') 568 if len(output): 569 padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 570 fmt_str = "%-" + padding + "s %s" 571 for o_file, locs in output.items(): 572 for loc in locs: 573 job.Write(out_manifest, fmt_str % (o_file, loc)) 574 out_manifest.close()
575
576 - def WriteDAGNode(self,job,dagfile,nodename,filename,parents=None,done=False):
577 str = "JOB %s %s" % (nodename,filename) 578 if done: 579 str += " DONE" 580 job.Write(dagfile,str) 581 if parents: 582 job.Write(dagfile, "PARENT %s CHILD %s" % (parents,nodename))
583
584 - def EnumerateParentNodes(self,steering,td):
585 parents = td.GetParents() 586 parentnodes = [] 587 for parent in parents: 588 parentobj = steering.GetTaskDefinition(parent) 589 if parentobj.ParallelExecutionEnabled(): 590 for idx,tray in parentobj.GetTrays().items(): 591 for iter in tray.GetIters(): 592 if not iter == -1: 593 nodename = "%s_%u" % (parent,iter) 594 else: 595 nodename = "%s_ext" % parent 596 parentnodes.append(nodename) 597 else: 598 parentnodes.append(parent) 599 return parentnodes
600
601 - def GetArguments(self,job,td,idx=False,iter=False,output="str"):
602 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 603 argstr = " --dag --task=%s" % td.GetName() 604 if not idx is False: 605 argstr += " --tray=%s --iter=%s" % (idx,iter) 606 argstr += job.GetMainScript() + " " + " ".join(argopts) 607 if output == "dict": 608 argstr = self.ArgStrToDict(argstr) 609 return argstr
610
611 - def ArgStrToDict(self,argstr):
612 args = {} 613 for str in argstr.split(" "): 614 str = str[2:] 615 pieces = str.split("=") 616 if len(pieces) == 1: 617 args[str] = 1 618 else: 619 args[pieces[0]] = pieces[1] 620 return args
621
622 - def getAdditionalOptions(self,job):
623 return {}
624
625 - def WriteSubmitFile(self,job,filename,td,idx=False,iter=False):
626 wrapper = open(filename.replace('condor','sh'),'w') 627 628 job.Write(wrapper,"#!/bin/sh") 629 job.Write(wrapper, "ARCH=`uname -m | sed -e 's/Power Macintosh/ppc/ ; s/i686/i386/'`") 630 job.Write(wrapper, "PVER=`uname -r | awk '{split($1,a,\".\"); print a[2]}'`") 631 job.Write(wrapper, "if [ $ARCH == 'i386' ]; then",parse=False) 632 job.Write(wrapper, " if [ $PVER == '4' ]; then",parse=False) 633 job.Write(wrapper, " PLATFORM=Linux-i386") 634 job.Write(wrapper, " elif [ $PVER == '6' ]; then",parse=False) 635 job.Write(wrapper, " PLATFORM=Linux-libstdc++6-i386") 636 job.Write(wrapper, " fi") 637 job.Write(wrapper, "elif [ $ARCH == 'x86_64' ]; then",parse=False) 638 job.Write(wrapper, " if [ $PVER == '4' ]; then",parse=False) 639 job.Write(wrapper, " PLATFORM=Linux-x86_64") 640 job.Write(wrapper, " elif [ $PVER == '6' ]; then",parse=False) 641 job.Write(wrapper, " PLATFORM=Linux-libstdc++6-x86_64") 642 job.Write(wrapper, " fi") 643 job.Write(wrapper, "fi") 644 645 for var in self.env.keys(): 646 job.Write(wrapper, "export %s=%s" % (var, self.env[var]), parse=False) 647 for var in job.env.keys(): 648 job.Write(wrapper, "export %s=%s" % (var, job.env[var]), parse=False) 649 job.Write(wrapper, 'export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH', parse=False) 650 job.Write(wrapper, 'cmd="$PYROOT/bin/python -u %s $*"' % os.path.basename(job.GetExecutable()), parse=False) 651 job.Write(wrapper, "echo $cmd", parse=False) 652 job.Write(wrapper, "exec $cmd", parse=False) 653 wrapper.close(); 654 655 submitfile = open(filename,'w') 656 job.Write(submitfile,"Universe = %s" % self.GetUniverse()) 657 job.Write(submitfile,"Executable = %s" % wrapper.name) 658 job.Write(submitfile,"Log = %s" % job.GetLogFile()) 659 job.Write(submitfile,"Output = %s" % filename.replace('condor', 'out')) 660 job.Write(submitfile,"Error = %s" % filename.replace('condor', 'err')) 661 662 # Define Condor submit directory 663 if job.GetInitialdir(): 664 job.Write(submitfile,"Initialdir = %s" % job.GetInitialdir()) 665 666 if job.GetInputFiles(): 667 inputfile_list = ",".join(job.GetInputFiles()) 668 inputfile_list += ","+job.GetExecutable() 669 inputfile_list += ","+filename.replace(".condor", ".input") 670 inputfile_list += ","+filename.replace(".condor", ".output") 671 job.Write(submitfile,"%s = %s" % ("Transfer_Input_Files",inputfile_list)) 672 673 if job.GetOutputFiles(): 674 outputfile_list = ",".join(job.GetOutputFiles()) 675 job.Write(submitfile,"%s = %s" % ("Transfer_Output_Files",outputfile_list)) 676 677 # Add general batch options 678 for key in self.GetParamKeys(): 679 if key.lower() == "requirements" and td.GetRequirements(): 680 val = td.GetRequirements() 681 else: 682 val = self.GetParam(key) 683 if not key.startswith('-'): 684 job.Write(submitfile, "%s = %s" % (key, val)) 685 686 # Add job specific batch options 687 for opt in job.GetSteering().GetBatchOpts(): 688 if not key.startswith('-'): 689 job.Write(submitfile, "%s = %s" % (opt.GetName(), opt.GetValue())) 690 691 for name, value in self.GetAdditionalOptions(job).items(): 692 job.Write(submitfile, "%s = %s" % (name, value)) 693 694 argstr = self.GetArguments(job,td,idx,iter) 695 job.Write(submitfile,"Arguments = %s" % argstr,parse=False) 696 697 job.Write(submitfile, "Queue" ) 698 submitfile.close();
699
700 - def GetAdditionalOptions(self,job):
701 return {'x509userproxy': expandvars(job.GetSteering().GetSysOpt("globus_proxy").GetValue())}
702
703 -class CondorG(CondorDAG):
704 - def __init__(self):
705 CondorDAG.__init__(self) 706 self.SetUniverse("grid")
707
708 - def GetAdditionalOptions(self,job):
709 return {'x509userproxy': expandvars(job.GetSteering().GetSysOpt("globus_proxy").GetValue())}
710