1
2
3
4 """
5 A basic submitfile for submitting and monitoring jobs to SGE.
6 This module implements only a small subset of SGE features.
7 It's interface is like that of condor however.
8 Inherits from i3Queue
9
10 copyright (c) 2005 the icecube collaboration
11
12 @version: $Revision: $
13 @date: $Date: $
14 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
15 @todo: implement more functionality of sge.
16 """
17
18 import os
19 import re
20 import sys
21 import math
22 import random
23 import dircache
24 import time
25 import string
26 import logging
27 import os.path
28 import getpass
29 import commands
30 from os import popen2
31 from iceprod.core import metadata
32 from iceprod.core.lex import ExpParser
33 from iceprod.server.grid import iGrid
34 from iceprod.server.job import i3Job, i3Task
35
36 logger = logging.getLogger('SGE')
37
38 sge_status = {'qw':'QUEUED', 'r':'PROCESSING', 'hqw': 'QUEUED'}
39
44
46 """
47 This class represents a job or cluster on a sge system.
48 """
49
51
52 iGrid.__init__(self)
53 self.sleeptime = 30
54 self.enqueue_cmd = "qsub"
55 self.checkqueue_cmd = "qstat -u %s " % getpass.getuser()
56 self.queue_rm_cmd = "qdel"
57 self.suffix = "sge"
58 logger.debug('Made a SGE(iGrid)')
59
60
62 """
63 Write sge submit file to a file.
64 @param job: i3Job object
65 @param config_file: path to file were submit file will be written
66 """
67 logger.debug('WriteConfig')
68
69 if not job.GetExecutable():
70 raise Exception, "no executable configured"
71
72 submitfile = open("%s" % config_file,'w')
73 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile())
74 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile())
75
76 job.Write(submitfile,"#!/bin/sh")
77 logger.debug("#!/bin/sh")
78
79 job.Write(submitfile,"#$ -o %s" % outfile)
80 logger.debug("#$ -o %s" % outfile)
81
82 job.Write(submitfile,"#$ -e %s" % errfile )
83 logger.debug("#$ -e %s" % errfile )
84
85
86 for key in self.GetParamKeys():
87 job.Write(submitfile,"#$ %s" % self.GetParam(key),parse=True)
88 logger.debug("#$ %s" % self.GetParam(key))
89
90
91 for key,opt in job.GetBatchOpts().items():
92 job.Write(submitfile,"#$ %s" % opt,parse=True)
93 logger.debug("#$ %s" % opt)
94
95
96 for var in self.env.keys():
97 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False)
98 for var in job.env.keys():
99 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False)
100 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False)
101 job.Write(submitfile,"unset I3SIMPRODPATH")
102 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False)
103 job.Write(submitfile,"mkdir -p $RUNDIR",parse=False)
104 logger.debug("RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()))
105 logger.debug("mkdir -p $RUNDIR")
106
107 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False)
108 logger.debug("echo \"running on $HOSTNAME:$RUNDIR\"")
109
110 logger.debug('%d' %len(job.GetInputFiles()))
111 for file in job.GetInputFiles()+[job.GetExecutable()]:
112 logger.debug('%s' %file)
113 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False)
114 job.Write(submitfile,"cd $RUNDIR",parse=False)
115
116 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments()
117 argstr = job.GetMainScript() + " " + " ".join(argopts)
118 executable = os.path.basename(job.GetExecutable())
119 logger.debug('executable: %s' % job.GetExecutable())
120 logger.debug('main script: %s' % job.GetMainScript())
121 logger.debug('args options: %s' % argopts)
122 logger.debug('arguments: %s' % job.GetArguments())
123 job.Write(submitfile, "$PYROOT/bin/python %s %s" % (executable, argstr), parse=False)
124 job.Write(submitfile, 'echo "job exited with status $?";',parse=False)
125
126 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'):
127 job.Write(submitfile,"for file in *; do")
128 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir())
129 job.Write(submitfile," mv $file %s" % job.GetInitialdir())
130 job.Write(submitfile," fi; done")
131
132 job.Write(submitfile,"#clean directory")
133 job.Write(submitfile,"cd /tmp")
134 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False)
135 logger.debug('Submit file written')
136 submitfile.close();
137
138 - def get_id(self,submit_status):
139 """
140 Parse string returned by condor on submission to extract the
141 id of the job cluster
142
143 @param submit_status: string returned by condor_submit
144 """
145 matches = re.findall(r'Your job [0-9]+', submit_status)
146 if matches:
147 cluster_info = matches[0].split()
148 job_id = cluster_info[-1]
149
150 self.job_ids.append(job_id)
151 return job_id
152 else:
153 logger.warn('could not parse job id from "%s"' % submit_status)
154 return -1
155
157 """
158 Querie status of job on condor queue
159 """
160 if isinstance(jobs,list):
161 job_list = jobs
162 else:
163 job_list = [jobs]
164
165 job_dict = {}
166 logger.info("beggining of CheckJobStatus")
167
168 for job in job_list:
169 logger.info("checking job status: job id %s for dataset %d", job.GetJobId(), job.GetDatasetId())
170
171 if job.GetJobId() < 0: continue
172
173 for job_id in job.GetJobId().split(" "):
174 job_dict[job_id] = job
175 job.SetStatus('FINISHED')
176
177 cmd = self.checkqueue_cmd
178 logger.debug(cmd)
179 retval,output = commands.getstatusoutput(cmd)
180
181 if retval:
182 for job in job_list: job.SetStatus('?')
183 return retval
184
185 for line in output.split('\n')[2:]:
186 try:
187 tok = line.split()
188 jobId = tok[0]
189 prio = tok[1]
190 name = tok[2]
191 user = tok[3]
192 jobStatus = tok[4]
193 runtime = tok[5]
194 queue = tok[6]
195 logger.debug("jobid:%s" %jobId)
196 if jobId in job_dict.keys():
197 logger.debug("status for jobid %s is %s" %(jobId,jobStatus))
198 status = cstatus(jobStatus)
199 job_dict[jobId].SetStatus(status)
200 except Exception,e:
201 logger.error("%s:%s" %(e,line))
202 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
203 break
204
205 return 1
206
208 """
209 Querie status of cluster or job on condor queue
210 """
211
212 cmd = self.checkqueue_cmd
213 for id in self.job_ids:
214 cmd += " %s" % id
215 status,output = commands.getstatusoutput(cmd)
216 return output
217
218
220 """
221 Querie status of cluster or job on condor queue
222 """
223
224 logger.info("beggining of CleanQ")
225 if not jobs: return 0
226
227 if isinstance(jobs,list):
228 job_list = jobs
229 else:
230 job_list = [jobs]
231 job_dict = dict()
232
233
234 for job in job_list:
235 logger.info("job id: %s", job.GetJobId())
236
237 logger.info("ja laenge is 0")
238 if job.GetJobId() < 0: continue
239
240 for job_id in job.GetJobId().split(" "):
241 job_dict[job_id] = job
242 job.SetStatus('FINISHED')
243
244 cmd = self.checkqueue_cmd
245 status,output = commands.getstatusoutput(cmd)
246 if not status:
247 for line in output.split('\n')[2:]:
248 if line.strip() == '':
249 continue
250 try:
251 tok = line.split()
252 jobId = tok[0]
253 prio = tok[1]
254 name = tok[2]
255 user = tok[3]
256 jobStatus = tok[4]
257 runtime = tok[5]
258 queue = tok[6]
259 if name.startswith("iceprod.") and not job_dict.has_key(jobId):
260 logger.warn("removing job %s with status %s. Reason: job not found in list" % \
261 (jobId,cstatus(jobStatus)))
262 logger.debug("job list [%s]" % str(job_dict.keys()))
263 os.system("%s %s" % (self.queue_rm_cmd,jobId))
264 except Exception,e:
265 logger.error("%s:%s" %(e,line))
266 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
267
268 return status
269
270
272 """
273 Remove cluster or job from queue
274 """
275 if isinstance(job,i3Job) and job.GetStatus() == "FINISHED":
276 return 0
277
278 cmd = "%s %s" % (self.queue_rm_cmd,job.job_id)
279 status,output = commands.getstatusoutput(cmd)
280 return status
281
282
284 """
285 This class represents a job that executes in multiple parts using a DAG.
286 """
287
294
296 """
297 Submit job/cluster to PBS
298
299 @param job: i3Job object
300 @param config_file: path to file were submit file will be written
301 """
302 self.submit_status = ''
303 status_sum = 0
304 cwdir = os.getcwd()
305
306 for job in self.jobs:
307 logger.info("submitting job %u.%u" %(job.GetDatasetId(),job.GetProcNum()))
308
309 steering = job.GetSteering()
310 task_defs = steering.GetTaskDefinitions()
311 logger.debug("Task definitions: %s" % task_defs)
312
313
314 if not len(task_defs):
315
316 logger.debug("No tasks specified in config file; doing regular submit")
317 SGE.WriteConfig(self, job, job.config_file)
318
319 cmd = "%s %s" % (self.enqueue_cmd,job.config_file)
320 status, self.submit_status = commands.getstatusoutput(cmd)
321 status_sum += status
322 try:
323 id = self.get_id(self.submit_status)
324 job.SetJobId(id)
325 if id < 0: status_sum += 1
326 except Exception, e:
327 logger.error("Exception: " + str(e))
328 self.submit_status += "\nException: " + str(e)
329 status_sum += 1
330
331 job.submit_status = status
332 job.submit_msg = self.submit_status
333
334 logger.info("subtmit status: %s, submit message: %s" %(job.submit_status, job.submit_msg))
335
336 cookie.AddJobId(job.GetJobId())
337 os.chdir(cwdir)
338 else:
339
340
341
342
343 db = self.GetMonitorDB()
344
345 job_id = job.GetDatabaseId()
346
347 file_catalog = {}
348 tasks=dict()
349 args_dict=dict()
350
351 for taskname,td in task_defs.items():
352 tasks[taskname]=td
353 args = self.GetArguments(job,td,output="dict")
354 args_dict[taskname]=args
355 file_catalog[taskname] = self.GetFiles(job,td,args)
356
357 if (len(td.GetParents()) == 0):
358 toplevel_taskname=taskname
359
360 cur_td=tasks[toplevel_taskname]
361 first=True
362 last_id=-1
363
364 dagfile = open(job.config_file,'w')
365
366 while(len(cur_td.GetChildren())>0):
367 taskname=cur_td.GetName()
368 taskfile = job.config_file.replace('.sge',".%s.sge" % taskname)
369 td_id = td.GetId()
370
371 logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, taskfile))
372 parents = " ".join(self.EnumerateParentNodes(steering,cur_td))
373
374 os.chdir(job.GetInitialdir())
375
376 input, output, notes=self.GetFiles(job, cur_td, args_dict[taskname], catalog=file_catalog)
377
378 self.WriteFileManifest(job, taskfile,input,output,notes)
379 self.WriteTaskConfig(job,taskfile, cur_td, input, output)
380
381 if(first):
382 cmd = "%s %s" % (self.enqueue_cmd, taskfile)
383 logger.info("first in the task chain................")
384 else:
385 cmd = "%s -hold_jid %s %s" % (self.enqueue_cmd, last_id, taskfile)
386 logger.info(cmd)
387 status, self.submit_status = commands.getstatusoutput(cmd)
388 logger.info("subtmit status: %s, submit message: %s" %(status, self.submit_status))
389 status_sum += status
390 id=-1
391 try:
392 id = self.get_id(self.submit_status)
393 logger.info("id to add as subtask: %s", id)
394 if(first):
395 logger.info("FIRST SETTING JOBID to %s", id)
396 job.SetJobId(id)
397 first = False
398 else:
399 temp_id=job.GetJobId()
400 job.SetJobId(temp_id + " " + id)
401
402 if id < 0: status_sum += 1
403 except Exception, e:
404 logger.error("Exception: " + str(e))
405 self.submit_status += "\nException: " + str(e)
406 status_sum += 1
407 last_id=id
408
409 cur_td=tasks[cur_td.GetChildren()[0]]
410
411
412 taskname=cur_td.GetName()
413 taskfile = job.config_file.replace('.sge',".%s.sge" % taskname)
414 td_id = td.GetId()
415
416 logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, taskfile))
417
418
419 os.chdir(job.GetInitialdir())
420 input, output, notes=self.GetFiles(job, cur_td, args_dict[taskname], catalog=file_catalog)
421
422 self.WriteFileManifest(job, taskfile,input,output,notes)
423 self.WriteTaskConfig(job,taskfile, cur_td, input, output)
424
425 cmd = "%s -hold_jid %s %s" % (self.enqueue_cmd, last_id, taskfile)
426 status, self.submit_status = commands.getstatusoutput(cmd)
427 status_sum += status
428 id=-1
429 try:
430 id = self.get_id(self.submit_status)
431 temp_id=job.GetJobId()
432 job.SetJobId(temp_id + " " + id)
433
434 if id < 0: status_sum += 1
435 except Exception, e:
436 logger.error("Exception: " + str(e))
437 self.submit_status += "\nException: " + str(e)
438 status_sum += 1
439
440 for taskname,td in task_defs.items():
441 parents = " ".join(self.EnumerateParentNodes(steering,td))
442 done = db.task_is_finished(td.GetId(), job_id)
443 if(taskname=="trashcan"):
444 done=False
445 filename= job.config_file.replace('.sge',".%s.sge" % taskname)
446 self.WriteDAGNode(job,dagfile,taskname,filename,parents,done)
447
448 dagfile.close()
449 db.commit()
450
451 job.submit_status = status_sum
452 job.submit_msg = self.submit_status
453
454 if self.production:
455
456 if job.submit_status == 0:
457 self.i3monitordb.jobsubmitted(
458 job.GetDatasetId(), job.GetProcNum(),
459 job.GetInitialdir(), job.GetJobId())
460 else:
461 logger.error("failed to submit jobs:"+job.submit_msg)
462 self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3,
463 "failed to submit jobs:"+job.submit_msg)
464 os.chdir('/tmp')
465 self.CleanDir(job.GetInitialdir())
466
467 cookie.AddJobId(job.GetJobId())
468 os.chdir(cwdir)
469
470 return status_sum,self.submit_status
471
472
474 return bool(re.match("[^/:]+://?.*$", path))
475
476 - def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False):
477 steering = job.GetSteering()
478 if idx is not False:
479 td_trays = {idx: td.GetTray(idx)}
480 else:
481 td_trays = td.GetTrays()
482
483 if args.has_key('dagtemp'):
484 tmp_dir = args['dagtemp']
485 else:
486 tmp_dir = 'file:///tmp'
487
488 if args.has_key('fetch'):
489 global_dir = args['fetch']
490 else:
491 global_dir = 'file:///tmp'
492
493 td_input = {}
494 td_output = {}
495 notes = {}
496
497 if td.IsCleanup():
498
499 return (td_input, td_output, notes)
500
501 for idx, td_tray in td_trays.iteritems():
502 args['tray'] = idx
503
504 logger.info("GetTray(%s)" % idx)
505 icetray = steering.GetTray(idx)
506
507 input_files = icetray.GetInputFiles()
508 parsed_input = []
509
510 output_files = icetray.GetOutputFiles()
511 parsed_output = []
512
513 if iter is not False:
514 iters = [iter]
515 else:
516 iters = td_tray.GetIters()
517 for iter in iters:
518 args['iter'] = iter
519 parser = ExpParser(args,steering)
520 for d in steering.GetDependencies():
521 d_file = parser.parse(d)
522 if not td_input.has_key(d_file):
523 location = d_file
524 if not self.IsUrl(location):
525 location = os.path.join(global_dir, location)
526 td_input[os.path.basename(d_file)] = [location]
527 for i_file in input_files:
528 name = i_file.GetName()
529 name = parser.parse(name)
530 if not td_input.has_key(name) \
531 and not td_output.has_key(name):
532 if catalog:
533 node = self.FindFile(steering,td,catalog,name)
534 else:
535 node = td.GetName()
536
537 note = False
538 if node == "global":
539 location = global_dir
540 note = "global"
541 if node != "global":
542 location = tmp_dir
543 location = os.path.join(location, str(job.GetDatasetId()))
544 location = os.path.join(location, str(job.GetProcNum()))
545 location = os.path.join(location, node)
546 note = "dontextract"
547 location = os.path.join(location, name)
548 if i_file.IsPhotonicsTable():
549 note = "photonics";
550 if note:
551 notes[name] = note
552 td_input[name] = [location]
553 for o_file in output_files:
554 name = o_file.GetName()
555 name = parser.parse(name)
556 if not td_output.has_key(name):
557 location = os.path.join(tmp_dir, str(job.GetDatasetId()))
558 location = os.path.join(location, str(job.GetProcNum()))
559 location = os.path.join(location, str(td.GetName()))
560 location = os.path.join(location, name)
561 td_output[name] = [location]
562
563 return (td_input, td_output, notes)
564
565 - def FindFile(self,steering,td,catalog,file):
566 parents = td.GetParents()
567
568
569 for parent in parents:
570 if catalog.has_key(parent):
571 if catalog[parent][1].has_key(file):
572 return parent
573
574
575 for parent in parents:
576 parent_td = steering.GetTaskDefinition(parent)
577 result = self.FindFile(steering,parent_td,catalog,file)
578 if result != "global":
579 return result
580
581 return "global"
582
584 logger.debug("Input files: %s" % input)
585 in_manifest = open(filename.replace("\.sge", ".input"), 'w')
586 if len(input):
587 padding = str(max(map(lambda x: max(map(len, x)), input.values())))
588 fmt_str = "%-" + padding + "s %s"
589 for i_file, locs in input.items():
590 for loc in locs:
591 file = fmt_str % (loc, i_file)
592
593
594 if notes.has_key(i_file):
595 file += "\t%s" % notes[i_file]
596 job.Write(in_manifest, file)
597 in_manifest.close()
598 logger.debug("Output files: %s" % output)
599 out_manifest = open(filename.replace(".sge", ".output"), 'w')
600 if len(output):
601 padding = str(max(map(lambda x: max(map(len, x)), input.values())))
602 fmt_str = "%-" + padding + "s %s"
603 for o_file, locs in output.items():
604 for loc in locs:
605 file = fmt_str % (o_file, loc)
606
607
608 job.Write(out_manifest, file)
609 out_manifest.close()
610
611 - def WriteDAGNode(self,job,dagfile,nodename,filename,parents=None,done=False):
612 str = "JOB %s %s" % (nodename,filename)
613 if done:
614 str += " DONE"
615 job.Write(dagfile,str)
616 if parents:
617 job.Write(dagfile, "PARENT %s CHILD %s" % (parents,nodename))
618
635
636 - def GetArguments(self,job,td,idx=False,iter=False,output="str"):
645
647 args = {}
648 for str in argstr.split(" "):
649 str = str[2:]
650 pieces = str.split("=")
651 if len(pieces) == 1:
652 args[str] = 1
653 else:
654 args[pieces[0]] = pieces[1]
655 return args
656
657
659 """
660 Write sge submit file to a file.
661 @param job: i3Job object
662 @param config_file: path to file were submit file will be written
663 """
664 logger.debug('WriteConfig')
665 steering = job.GetSteering()
666 args = self.GetArguments(job,td,False,False,output="dict")
667 parser = ExpParser(args,steering)
668
669 tname=td.GetName()
670
671 if not job.GetExecutable():
672 raise Exception, "no executable configured"
673
674 submitfile = open("%s" % config_file,'w')
675 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile()).replace(".out", "."+tname+".out")
676 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile()).replace(".err", "."+tname+".err")
677
678 job.Write(submitfile,"#!/bin/sh")
679 logger.debug("#!/bin/sh")
680
681 job.Write(submitfile,"#$ -o %s" % outfile)
682 logger.debug("#$ -o %s" % outfile)
683
684 job.Write(submitfile,"#$ -e %s" % errfile )
685 logger.debug("#$ -e %s" % errfile )
686
687
688 for key in self.GetParamKeys():
689 job.Write(submitfile,"#$ %s" % self.GetParam(key),parse=True)
690 logger.debug("#$ %s" % self.GetParam(key))
691
692
693 for key,opt in job.GetBatchOpts().items():
694 job.Write(submitfile,"#$ %s" % opt,parse=True)
695 logger.debug("#$ %s" % opt)
696
697
698 td_batch_opts = td.GetBatchOpts()
699 logger.debug(td_batch_opts)
700 try:
701 td_batch_opts = parser.parse(td_batch_opts)
702 except Exception, e:
703 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
704 logger.warn("%s: could not parse %s" % (e,td_batch_opts))
705 td_batch_opts = ""
706 if td_batch_opts:
707 for opt in map(string.strip,td_batch_opts.split(";")):
708 logger.debug("#$ %s" % opt)
709 job.Write(submitfile, "#$ %s" % opt, parse=False)
710
711
712 for var in self.env.keys():
713 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False)
714 for var in job.env.keys():
715 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False)
716 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False)
717 job.Write(submitfile,"unset I3SIMPRODPATH")
718 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False)
719 job.Write(submitfile,"mkdir -p $RUNDIR",parse=False)
720 logger.debug("RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()))
721 logger.debug("mkdir -p $RUNDIR")
722
723 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False)
724 logger.debug("echo \"running on $HOSTNAME:$RUNDIR\"")
725
726 logger.debug('%d' %len(job.GetInputFiles()))
727 for file in job.GetInputFiles()+[job.GetExecutable()]:
728 logger.debug("copying general files for task - file %s" % file)
729 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False)
730
731 manifestfile = os.path.join(job.GetInitialdir(),config_file).replace(".sge", ".input")
732 logger.debug("copy manifest %s" % manifestfile)
733 job.Write(submitfile,"cp %s $RUNDIR" % manifestfile,parse=False)
734
735 manifestfile = os.path.join(job.GetInitialdir(),config_file).replace(".sge", ".output")
736 logger.debug("copy manifest %s" % manifestfile)
737 job.Write(submitfile,"cp %s $RUNDIR" % manifestfile,parse=False)
738
739 dagfile = os.path.join(job.GetInitialdir(),job.GetOutputFile()).replace(".out", ".sge")
740 logger.debug("copy dagfile %s" % dagfile)
741 job.Write(submitfile,"cp %s $RUNDIR" % dagfile,parse=False)
742
743 for key in input_dict.keys():
744 logger.debug("copying specific files for task - file %s" % key)
745 logger.debug("copy %s", input_dict[key][0])
746
747 job.Write(submitfile,"cp %s $RUNDIR" % input_dict[key][0][7:],parse=False)
748
749 job.Write(submitfile,"cd $RUNDIR",parse=False)
750
751 dag_string = " --dag --task=%s " % tname
752
753
754
755
756
757
758 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments()
759 argstr = job.GetMainScript() + dag_string + " ".join(argopts)
760 executable = os.path.basename(job.GetExecutable())
761 logger.debug('executable: %s' % job.GetExecutable())
762 logger.debug('main script: %s' % job.GetMainScript())
763 logger.debug('args options: %s' % argopts)
764 logger.debug('arguments: %s' % job.GetArguments())
765 job.Write(submitfile, "$PYROOT/bin/python %s %s" % (executable, argstr), parse=False)
766 job.Write(submitfile, 'echo "job exited with status $?";',parse=False)
767
768 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'):
769 logger.debug("writing the for loop...")
770 job.Write(submitfile,"for file in *; do")
771 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir())
772 job.Write(submitfile," mv $file %s" % job.GetInitialdir())
773 job.Write(submitfile," fi; done")
774
775 job.Write(submitfile,"#clean directory")
776 job.Write(submitfile,"cd /tmp")
777 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False)
778 logger.debug('Submit file written')
779 submitfile.close();
780