1   
  2   
  3   
  4  """ 
  5   A basic submitfile for submitting and monitoring jobs to SGE.  
  6   This module implements only a small subset of SGE features. 
  7   It's interface is like that of condor however. 
  8   Inherits from i3Queue 
  9   
 10   copyright  (c) 2005 the icecube collaboration 
 11   
 12   @version: $Revision: $ 
 13   @date: $Date: $ 
 14   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 15   @todo: implement more functionality of sge. 
 16  """ 
 17   
 18  import os 
 19  import re 
 20  import sys 
 21  import math 
 22  import random  
 23  import dircache 
 24  import time 
 25  import string 
 26  import logging 
 27  import os.path 
 28  import getpass 
 29  import commands 
 30  from os import popen2 
 31  from iceprod.core import metadata 
 32  from iceprod.core.lex import ExpParser 
 33  from iceprod.server.grid import iGrid 
 34  from iceprod.server.job import i3Job, i3Task 
 35   
 36  logger = logging.getLogger('SGE') 
 37   
 38  sge_status = {'qw':'QUEUED', 'r':'PROCESSING', 'hqw': 'QUEUED'} 
 39   
 44   
 46      """ 
 47      This class represents a job or cluster on a sge system. 
 48      """ 
 49   
 51   
 52          iGrid.__init__(self) 
 53          self.sleeptime      = 30 
 54          self.enqueue_cmd    = "qsub" 
 55          self.checkqueue_cmd = "qstat -u %s " % getpass.getuser() 
 56          self.queue_rm_cmd   = "qdel" 
 57          self.suffix         = "sge" 
 58          logger.debug('Made a SGE(iGrid)') 
  59           
 60   
 62          """ 
 63          Write sge submit file to a file. 
 64          @param job: i3Job object  
 65          @param config_file: path to file were submit file will be written 
 66          """ 
 67          logger.debug('WriteConfig') 
 68           
 69          if not job.GetExecutable(): 
 70              raise Exception,  "no executable configured" 
 71   
 72          submitfile = open("%s" % config_file,'w') 
 73          outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile()) 
 74          errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile()) 
 75           
 76          job.Write(submitfile,"#!/bin/sh") 
 77          logger.debug("#!/bin/sh") 
 78           
 79          job.Write(submitfile,"#$ -o %s" % outfile) 
 80          logger.debug("#$ -o %s" % outfile) 
 81           
 82          job.Write(submitfile,"#$ -e %s" % errfile )  
 83          logger.debug("#$ -e %s" % errfile )  
 84   
 85           
 86          for key in self.GetParamKeys():  
 87              job.Write(submitfile,"#$ %s" % self.GetParam(key),parse=True) 
 88              logger.debug("#$ %s" % self.GetParam(key)) 
 89   
 90           
 91          for key,opt in job.GetBatchOpts().items():  
 92              job.Write(submitfile,"#$ %s" % opt,parse=True) 
 93              logger.debug("#$ %s" % opt) 
 94   
 95           
 96          for var in self.env.keys(): 
 97              job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False) 
 98          for var in job.env.keys(): 
 99              job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False) 
100          job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False) 
101          job.Write(submitfile,"unset I3SIMPRODPATH") 
102          job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 
103          job.Write(submitfile,"mkdir -p $RUNDIR",parse=False) 
104          logger.debug("RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time())) 
105          logger.debug("mkdir -p $RUNDIR") 
106           
107          job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 
108          logger.debug("echo \"running on $HOSTNAME:$RUNDIR\"") 
109   
110          logger.debug('%d' %len(job.GetInputFiles())) 
111          for file in job.GetInputFiles()+[job.GetExecutable()]: 
112              logger.debug('%s' %file)         
113              job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False) 
114          job.Write(submitfile,"cd $RUNDIR",parse=False) 
115   
116          argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 
117          argstr = job.GetMainScript() + " " + " ".join(argopts) 
118          executable = os.path.basename(job.GetExecutable()) 
119          logger.debug('executable: %s' % job.GetExecutable()) 
120          logger.debug('main script: %s' % job.GetMainScript()) 
121          logger.debug('args options: %s' % argopts)  
122          logger.debug('arguments: %s' % job.GetArguments()) 
123          job.Write(submitfile, "$PYROOT/bin/python %s %s" % (executable, argstr), parse=False) 
124          job.Write(submitfile, 'echo "job exited with status $?";',parse=False) 
125   
126          if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'): 
127             job.Write(submitfile,"for file in *; do") 
128             job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 
129             job.Write(submitfile,"   mv $file %s" % job.GetInitialdir()) 
130             job.Write(submitfile," fi; done") 
131              
132          job.Write(submitfile,"#clean directory") 
133          job.Write(submitfile,"cd /tmp") 
134          job.Write(submitfile,"rm -rf $RUNDIR ",parse=False) 
135          logger.debug('Submit file written') 
136          submitfile.close(); 
 137   
138 -    def get_id(self,submit_status): 
 139          """ 
140          Parse string returned by condor on submission to extract the 
141          id of the job cluster 
142   
143          @param submit_status: string returned by condor_submit 
144          """ 
145          matches =  re.findall(r'Your job [0-9]+', submit_status)  
146          if matches: 
147              cluster_info = matches[0].split() 
148              job_id = cluster_info[-1] 
149   
150              self.job_ids.append(job_id) 
151              return job_id 
152          else: 
153              logger.warn('could not parse job id from "%s"' % submit_status) 
154              return -1 
 155   
157          """ 
158          Querie status of job on condor queue 
159          """ 
160          if isinstance(jobs,list): 
161              job_list = jobs 
162          else: 
163              job_list = [jobs] 
164   
165          job_dict = {} 
166          logger.info("beggining of CheckJobStatus") 
167         
168          for job in job_list:  
169              logger.info("checking job status: job id %s for dataset %d", job.GetJobId(), job.GetDatasetId()) 
170       
171              if job.GetJobId() < 0: continue 
172   
173              for job_id in job.GetJobId().split(" "): 
174                  job_dict[job_id] = job 
175                  job.SetStatus('FINISHED') 
176   
177          cmd = self.checkqueue_cmd 
178          logger.debug(cmd) 
179          retval,output = commands.getstatusoutput(cmd) 
180   
181          if retval:  
182              for job in job_list: job.SetStatus('?') 
183              return retval 
184   
185          for line in output.split('\n')[2:]:  
186              try: 
187                  tok        = line.split() 
188                  jobId      = tok[0] 
189                  prio       = tok[1] 
190                  name       = tok[2] 
191                  user       = tok[3] 
192                  jobStatus  = tok[4] 
193                  runtime    = tok[5] 
194                  queue      = tok[6] 
195                  logger.debug("jobid:%s" %jobId) 
196                  if jobId in job_dict.keys(): 
197                      logger.debug("status for jobid %s is %s" %(jobId,jobStatus)) 
198                      status = cstatus(jobStatus) 
199                      job_dict[jobId].SetStatus(status) 
200              except Exception,e: 
201                  logger.error("%s:%s" %(e,line)) 
202                  sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)  
203                  break 
204       
205          return 1 
 206   
208          """ 
209          Querie status of cluster or job on condor queue 
210          """ 
211   
212          cmd = self.checkqueue_cmd 
213          for id in self.job_ids:  
214              cmd += " %s" % id 
215          status,output = commands.getstatusoutput(cmd) 
216          return output 
 217   
218   
220          """ 
221          Querie status of cluster or job on condor queue 
222          """ 
223           
224          logger.info("beggining of CleanQ") 
225          if not jobs: return 0 
226           
227          if isinstance(jobs,list): 
228             job_list = jobs 
229          else: 
230             job_list = [jobs] 
231          job_dict = dict() 
232       
233       
234          for job in job_list: 
235              logger.info("job id: %s", job.GetJobId()) 
236         
237              logger.info("ja laenge is 0") 
238              if job.GetJobId() < 0: continue 
239               
240              for job_id in job.GetJobId().split(" "): 
241                  job_dict[job_id] = job 
242                  job.SetStatus('FINISHED') 
243   
244          cmd = self.checkqueue_cmd 
245          status,output = commands.getstatusoutput(cmd) 
246          if not status: 
247              for line in output.split('\n')[2:]:  
248                  if line.strip() == '': 
249                      continue 
250                  try: 
251                      tok        = line.split() 
252                      jobId      = tok[0] 
253                      prio       = tok[1] 
254                      name       = tok[2] 
255                      user       = tok[3] 
256                      jobStatus  = tok[4] 
257                      runtime    = tok[5] 
258                      queue      = tok[6] 
259                      if name.startswith("iceprod.") and not job_dict.has_key(jobId): 
260                          logger.warn("removing job %s with status %s. Reason: job not found in list" % \ 
261                                      (jobId,cstatus(jobStatus))) 
262                          logger.debug("job list [%s]" % str(job_dict.keys())) 
263                          os.system("%s %s" % (self.queue_rm_cmd,jobId)) 
264                  except Exception,e: 
265                      logger.error("%s:%s" %(e,line)) 
266                      sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 
267           
268          return status 
 269   
270   
272          """ 
273          Remove cluster or job from queue 
274          """ 
275          if isinstance(job,i3Job) and job.GetStatus() == "FINISHED":  
276             return 0  
277   
278          cmd = "%s %s" % (self.queue_rm_cmd,job.job_id) 
279          status,output = commands.getstatusoutput(cmd) 
280          return status 
  281   
282   
284      """ 
285      This class represents a job that executes in multiple parts using a DAG. 
286      """ 
287   
294   
296          """ 
297          Submit job/cluster to PBS 
298   
299          @param job: i3Job object 
300          @param config_file: path to file were submit file will be written 
301          """ 
302          self.submit_status = '' 
303          status_sum = 0 
304          cwdir = os.getcwd() 
305   
306          for job in self.jobs: 
307              logger.info("submitting job %u.%u" %(job.GetDatasetId(),job.GetProcNum())) 
308   
309              steering  = job.GetSteering() 
310              task_defs = steering.GetTaskDefinitions() 
311              logger.debug("Task definitions: %s" % task_defs) 
312   
313               
314              if not len(task_defs): 
315                   
316                  logger.debug("No tasks specified in config file; doing regular submit") 
317                  SGE.WriteConfig(self, job, job.config_file) 
318   
319                  cmd = "%s %s" % (self.enqueue_cmd,job.config_file) 
320                  status, self.submit_status = commands.getstatusoutput(cmd) 
321                  status_sum += status 
322                  try: 
323                      id = self.get_id(self.submit_status) 
324                      job.SetJobId(id) 
325                      if id < 0: status_sum += 1 
326                  except Exception, e: 
327                      logger.error("Exception: " + str(e)) 
328                      self.submit_status += "\nException: " + str(e) 
329                      status_sum += 1 
330   
331                  job.submit_status = status 
332                  job.submit_msg = self.submit_status 
333   
334                  logger.info("subtmit status: %s, submit message: %s" %(job.submit_status, job.submit_msg)) 
335   
336                  cookie.AddJobId(job.GetJobId()) 
337                  os.chdir(cwdir) 
338              else: 
339                    
340                    
341                    
342           
343                  db = self.GetMonitorDB() 
344                   
345                  job_id    = job.GetDatabaseId() 
346                   
347                  file_catalog = {} 
348                  tasks=dict() 
349                  args_dict=dict() 
350                   
351                  for taskname,td in task_defs.items(): 
352                      tasks[taskname]=td           
353                      args = self.GetArguments(job,td,output="dict") 
354                      args_dict[taskname]=args 
355                      file_catalog[taskname] = self.GetFiles(job,td,args) 
356                   
357                      if (len(td.GetParents()) == 0): 
358                          toplevel_taskname=taskname 
359                   
360                  cur_td=tasks[toplevel_taskname] 
361                  first=True 
362                  last_id=-1 
363                   
364                  dagfile   = open(job.config_file,'w') 
365                   
366                  while(len(cur_td.GetChildren())>0): 
367                      taskname=cur_td.GetName() 
368                      taskfile = job.config_file.replace('.sge',".%s.sge" % taskname) 
369                      td_id = td.GetId() 
370                       
371                      logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, taskfile)) 
372                      parents = " ".join(self.EnumerateParentNodes(steering,cur_td)) 
373                       
374                      os.chdir(job.GetInitialdir()) 
375                   
376                      input, output, notes=self.GetFiles(job, cur_td, args_dict[taskname], catalog=file_catalog) 
377             
378                      self.WriteFileManifest(job, taskfile,input,output,notes) 
379                      self.WriteTaskConfig(job,taskfile, cur_td, input, output) 
380   
381                      if(first): 
382                          cmd = "%s %s" % (self.enqueue_cmd, taskfile) 
383                          logger.info("first in the task chain................") 
384                      else: 
385                          cmd = "%s -hold_jid %s %s" % (self.enqueue_cmd, last_id, taskfile) 
386                      logger.info(cmd) 
387                      status, self.submit_status = commands.getstatusoutput(cmd) 
388                      logger.info("subtmit status: %s, submit message: %s" %(status, self.submit_status)) 
389                      status_sum += status 
390                      id=-1 
391                      try: 
392                          id = self.get_id(self.submit_status) 
393                          logger.info("id to add as subtask: %s", id) 
394                          if(first): 
395                              logger.info("FIRST SETTING JOBID to %s", id) 
396                              job.SetJobId(id) 
397                              first = False 
398                          else: 
399                              temp_id=job.GetJobId() 
400                              job.SetJobId(temp_id + " " + id)         
401                       
402                          if id < 0: status_sum += 1 
403                      except Exception, e: 
404                          logger.error("Exception: " + str(e)) 
405                          self.submit_status += "\nException: " + str(e) 
406                          status_sum += 1 
407                      last_id=id 
408   
409                      cur_td=tasks[cur_td.GetChildren()[0]] 
410   
411                   
412                  taskname=cur_td.GetName() 
413                  taskfile = job.config_file.replace('.sge',".%s.sge" % taskname) 
414                  td_id = td.GetId() 
415           
416                  logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, taskfile)) 
417                   
418   
419                  os.chdir(job.GetInitialdir()) 
420                  input, output, notes=self.GetFiles(job, cur_td, args_dict[taskname], catalog=file_catalog) 
421                   
422                  self.WriteFileManifest(job, taskfile,input,output,notes) 
423                  self.WriteTaskConfig(job,taskfile, cur_td, input, output) 
424   
425                  cmd = "%s -hold_jid %s %s" % (self.enqueue_cmd, last_id, taskfile) 
426                  status, self.submit_status = commands.getstatusoutput(cmd) 
427                  status_sum += status 
428                  id=-1 
429                  try: 
430                      id = self.get_id(self.submit_status) 
431                      temp_id=job.GetJobId() 
432                      job.SetJobId(temp_id + " " + id)     
433                     
434                      if id < 0: status_sum += 1 
435                  except Exception, e: 
436                      logger.error("Exception: " + str(e)) 
437                      self.submit_status += "\nException: " + str(e) 
438                      status_sum += 1 
439                   
440                  for taskname,td in task_defs.items(): 
441                      parents = " ".join(self.EnumerateParentNodes(steering,td)) 
442                      done = db.task_is_finished(td.GetId(), job_id) 
443                      if(taskname=="trashcan"): 
444                          done=False 
445                      filename= job.config_file.replace('.sge',".%s.sge" % taskname) 
446                      self.WriteDAGNode(job,dagfile,taskname,filename,parents,done) 
447               
448                  dagfile.close() 
449                  db.commit() 
450           
451              job.submit_status = status_sum 
452              job.submit_msg = self.submit_status     
453               
454              if self.production: 
455                   
456                  if job.submit_status == 0:  
457                      self.i3monitordb.jobsubmitted( 
458                          job.GetDatasetId(), job.GetProcNum(), 
459                          job.GetInitialdir(), job.GetJobId()) 
460                  else:  
461                      logger.error("failed to submit jobs:"+job.submit_msg) 
462                      self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3, 
463                                      "failed to submit jobs:"+job.submit_msg) 
464                      os.chdir('/tmp') 
465                      self.CleanDir(job.GetInitialdir()) 
466   
467              cookie.AddJobId(job.GetJobId()) 
468              os.chdir(cwdir) 
469           
470          return status_sum,self.submit_status 
 471   
472   
474          return bool(re.match("[^/:]+://?.*$", path)) 
 475   
476 -    def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False): 
 477          steering = job.GetSteering() 
478          if idx is not False: 
479              td_trays = {idx: td.GetTray(idx)} 
480          else: 
481              td_trays = td.GetTrays() 
482   
483          if args.has_key('dagtemp'): 
484              tmp_dir = args['dagtemp'] 
485          else: 
486              tmp_dir = 'file:///tmp'  
487   
488          if args.has_key('fetch'): 
489              global_dir = args['fetch'] 
490          else: 
491              global_dir = 'file:///tmp' 
492   
493          td_input  = {} 
494          td_output = {} 
495          notes     = {} 
496   
497          if td.IsCleanup(): 
498               
499              return (td_input, td_output, notes) 
500   
501          for idx, td_tray in td_trays.iteritems(): 
502              args['tray'] = idx 
503   
504              logger.info("GetTray(%s)" % idx) 
505              icetray = steering.GetTray(idx) 
506   
507              input_files  = icetray.GetInputFiles() 
508              parsed_input = [] 
509   
510              output_files  = icetray.GetOutputFiles() 
511              parsed_output = [] 
512   
513              if iter is not False: 
514                  iters = [iter] 
515              else: 
516                  iters = td_tray.GetIters() 
517              for iter in iters: 
518                  args['iter'] = iter 
519                  parser = ExpParser(args,steering) 
520                  for d in steering.GetDependencies(): 
521                      d_file = parser.parse(d) 
522                      if not td_input.has_key(d_file): 
523                          location = d_file 
524                          if not self.IsUrl(location): 
525                              location = os.path.join(global_dir, location) 
526                          td_input[os.path.basename(d_file)] = [location] 
527                  for i_file in input_files: 
528                      name = i_file.GetName() 
529                      name = parser.parse(name) 
530                      if not td_input.has_key(name) \ 
531                          and not td_output.has_key(name): 
532                          if catalog: 
533                              node = self.FindFile(steering,td,catalog,name) 
534                          else: 
535                              node = td.GetName() 
536   
537                          note = False 
538                          if node == "global": 
539                              location = global_dir 
540                              note = "global" 
541                          if node != "global": 
542                              location = tmp_dir 
543                              location = os.path.join(location, str(job.GetDatasetId())) 
544                              location = os.path.join(location, str(job.GetProcNum())) 
545                              location = os.path.join(location, node) 
546                              note = "dontextract" 
547                          location = os.path.join(location, name) 
548                          if i_file.IsPhotonicsTable(): 
549                              note = "photonics"; 
550                          if note: 
551                              notes[name] = note 
552                          td_input[name] = [location] 
553                  for o_file in output_files: 
554                      name = o_file.GetName() 
555                      name = parser.parse(name) 
556                      if not td_output.has_key(name): 
557                          location = os.path.join(tmp_dir, str(job.GetDatasetId())) 
558                          location = os.path.join(location, str(job.GetProcNum())) 
559                          location = os.path.join(location, str(td.GetName())) 
560                          location = os.path.join(location, name) 
561                          td_output[name] = [location] 
562   
563          return (td_input, td_output, notes) 
 564   
565 -    def FindFile(self,steering,td,catalog,file): 
 566          parents = td.GetParents() 
567   
568           
569          for parent in parents: 
570              if catalog.has_key(parent): 
571                  if catalog[parent][1].has_key(file): 
572                      return parent 
573   
574           
575          for parent in parents: 
576              parent_td = steering.GetTaskDefinition(parent) 
577              result    = self.FindFile(steering,parent_td,catalog,file) 
578              if result != "global": 
579                  return result 
580   
581          return "global" 
 582   
584          logger.debug("Input files: %s" % input) 
585          in_manifest = open(filename.replace("\.sge", ".input"), 'w') 
586          if len(input): 
587              padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 
588              fmt_str = "%-" + padding + "s %s" 
589              for i_file, locs in input.items(): 
590                  for loc in locs: 
591                      file = fmt_str % (loc, i_file) 
592                       
593                       
594                      if notes.has_key(i_file): 
595                          file += "\t%s" % notes[i_file] 
596                      job.Write(in_manifest, file) 
597          in_manifest.close() 
598          logger.debug("Output files: %s" % output) 
599          out_manifest = open(filename.replace(".sge", ".output"), 'w') 
600          if len(output): 
601              padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 
602              fmt_str = "%-" + padding + "s %s" 
603              for o_file, locs in output.items(): 
604                  for loc in locs: 
605                      file = fmt_str % (o_file, loc) 
606                       
607                       
608                      job.Write(out_manifest, file) 
609          out_manifest.close() 
 610   
611 -    def WriteDAGNode(self,job,dagfile,nodename,filename,parents=None,done=False): 
 612          str = "JOB %s %s" % (nodename,filename) 
613          if done: 
614              str += " DONE" 
615          job.Write(dagfile,str) 
616          if parents: 
617              job.Write(dagfile, "PARENT %s CHILD %s" % (parents,nodename)) 
 618   
635   
636 -    def GetArguments(self,job,td,idx=False,iter=False,output="str"): 
 645   
647          args = {} 
648          for str in argstr.split(" "): 
649              str = str[2:] 
650              pieces = str.split("=") 
651              if len(pieces) == 1: 
652                  args[str] = 1 
653              else: 
654                  args[pieces[0]] = pieces[1] 
655          return args 
 656   
657   
659          """ 
660          Write sge submit file to a file. 
661          @param job: i3Job object  
662          @param config_file: path to file were submit file will be written 
663          """ 
664          logger.debug('WriteConfig') 
665          steering = job.GetSteering() 
666          args   = self.GetArguments(job,td,False,False,output="dict") 
667          parser = ExpParser(args,steering) 
668           
669          tname=td.GetName() 
670           
671          if not job.GetExecutable(): 
672              raise Exception,  "no executable configured" 
673   
674          submitfile = open("%s" % config_file,'w') 
675          outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile()).replace(".out", "."+tname+".out") 
676          errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile()).replace(".err", "."+tname+".err") 
677   
678          job.Write(submitfile,"#!/bin/sh") 
679          logger.debug("#!/bin/sh") 
680   
681          job.Write(submitfile,"#$ -o %s" % outfile) 
682          logger.debug("#$ -o %s" % outfile) 
683   
684          job.Write(submitfile,"#$ -e %s" % errfile )  
685          logger.debug("#$ -e %s" % errfile )  
686   
687           
688          for key in self.GetParamKeys():  
689              job.Write(submitfile,"#$ %s" % self.GetParam(key),parse=True) 
690              logger.debug("#$ %s" % self.GetParam(key)) 
691   
692           
693          for key,opt in job.GetBatchOpts().items():  
694              job.Write(submitfile,"#$ %s" % opt,parse=True) 
695              logger.debug("#$ %s" % opt) 
696   
697           
698          td_batch_opts = td.GetBatchOpts() 
699          logger.debug(td_batch_opts) 
700          try: 
701             td_batch_opts = parser.parse(td_batch_opts) 
702          except Exception, e: 
703             sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)  
704             logger.warn("%s: could not parse %s" % (e,td_batch_opts)) 
705             td_batch_opts = "" 
706          if td_batch_opts:  
707              for opt in map(string.strip,td_batch_opts.split(";")): 
708                  logger.debug("#$ %s" % opt) 
709                  job.Write(submitfile, "#$ %s" % opt, parse=False) 
710           
711           
712          for var in self.env.keys(): 
713              job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False) 
714          for var in job.env.keys(): 
715              job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False) 
716          job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False) 
717          job.Write(submitfile,"unset I3SIMPRODPATH") 
718          job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 
719          job.Write(submitfile,"mkdir -p $RUNDIR",parse=False) 
720          logger.debug("RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time())) 
721          logger.debug("mkdir -p $RUNDIR") 
722   
723          job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 
724          logger.debug("echo \"running on $HOSTNAME:$RUNDIR\"") 
725   
726          logger.debug('%d' %len(job.GetInputFiles())) 
727          for file in job.GetInputFiles()+[job.GetExecutable()]: 
728              logger.debug("copying general files for task - file %s" % file)      
729              job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False) 
730           
731          manifestfile = os.path.join(job.GetInitialdir(),config_file).replace(".sge", ".input") 
732          logger.debug("copy manifest %s" % manifestfile) 
733          job.Write(submitfile,"cp %s $RUNDIR" % manifestfile,parse=False) 
734           
735          manifestfile = os.path.join(job.GetInitialdir(),config_file).replace(".sge", ".output") 
736          logger.debug("copy manifest %s" % manifestfile) 
737          job.Write(submitfile,"cp %s $RUNDIR" % manifestfile,parse=False) 
738           
739          dagfile = os.path.join(job.GetInitialdir(),job.GetOutputFile()).replace(".out", ".sge") 
740          logger.debug("copy dagfile %s" % dagfile) 
741          job.Write(submitfile,"cp %s $RUNDIR" % dagfile,parse=False) 
742               
743          for key in input_dict.keys(): 
744              logger.debug("copying specific files for task - file %s" % key) 
745              logger.debug("copy %s", input_dict[key][0]) 
746   
747              job.Write(submitfile,"cp %s $RUNDIR" % input_dict[key][0][7:],parse=False) 
748          
749          job.Write(submitfile,"cd $RUNDIR",parse=False) 
750   
751          dag_string  = " --dag --task=%s " % tname 
752           
753           
754           
755           
756           
757           
758          argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 
759          argstr = job.GetMainScript() + dag_string + " ".join(argopts) 
760          executable = os.path.basename(job.GetExecutable()) 
761          logger.debug('executable: %s' % job.GetExecutable()) 
762          logger.debug('main script: %s' % job.GetMainScript()) 
763          logger.debug('args options: %s' % argopts)  
764          logger.debug('arguments: %s' % job.GetArguments()) 
765          job.Write(submitfile, "$PYROOT/bin/python %s %s" % (executable, argstr), parse=False) 
766          job.Write(submitfile, 'echo "job exited with status $?";',parse=False) 
767   
768          if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'): 
769              logger.debug("writing the for loop...") 
770              job.Write(submitfile,"for file in *; do") 
771              job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 
772              job.Write(submitfile,"   mv $file %s" % job.GetInitialdir()) 
773              job.Write(submitfile," fi; done") 
774   
775          job.Write(submitfile,"#clean directory") 
776          job.Write(submitfile,"cd /tmp") 
777          job.Write(submitfile,"rm -rf $RUNDIR ",parse=False) 
778          logger.debug('Submit file written') 
779          submitfile.close(); 
  780