1
2
3
4 """
5 A basic wrapper for submitting and monitoring jobs to Condor.
6 Inherits from i3Queue
7
8 copyright (c) 2005 the icecube collaboration
9
10 @version: $Revision: $
11 @date: $Date: $
12 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
13 """
14
15 import os
16 import re
17 import sys
18 import math
19 import dircache
20 import commands
21 import time
22 import string
23 import shutil
24 import ConfigParser
25 import logging
26 import iceprod
27 import iceprod.core.exe
28 from iceprod.core import metadata
29 from iceprod.core.dataclasses import Steering
30 from iceprod.server.grid import iGrid,DAG
31 from iceprod.core.lex import ExpParser
32 from os.path import basename,dirname
33 from copy import deepcopy
34
35 logger = logging.getLogger('eGee')
36
37 egee_status = {'1':'QUEUED', '2':'PROCESSING'}
38
40 return '\"%s\"' % str(x)
41
46
47
49 """
50 This class represents a job or cluster on an egee grid.
51 """
52 queued_state = [
53 'READY',
54 'SCHEDULED',
55 'WAITING',
56 'SUBMITTED',
57 'ACCEPTED',
58 'PREPARING',
59 'QUEUEING',
60 'QUEUED',
61 'INLRMS:Q',
62 ]
63 running_state = [
64 'RUNNING',
65 'EXECUTED',
66 'FINISHING',
67 ]
68
69 error_state = [
70 'FAILED',
71 'DONE(EXITCODE!=0)',
72 'ABORTED',
73 ]
74
75 finished_state = [
76 'DONE(SUCCESS)',
77 'CLEARED',
78 'CANCELLED',
79 ]
80
81 undetermined_states = [
82 'UNAVAILABLE',
83 ]
84
85
87 iGrid.__init__(self)
88 self.cluster_id = -1
89 self.post = None
90 self.ids = "ids"
91 self.enqueue_cmd = "glite-wms-job-submit"
92 self.checkqueue_cmd = "glite-wms-job-status"
93 self.queue_rm_cmd = "glite-wms-job-cancel"
94 self.get_output_cmd = "glite-wms-job-output"
95 self.suffix = "jdl"
96 self.grid_storage = "%(gridstorage)s/%(dataset_id)u/%(queue_id)u/%(filename)s"
97 self.logger = logging.getLogger('gLite')
98
99
101 from random import choice
102 weighted_rlist = []
103
104 for r in resource_list:
105 if len(r.split()) > 1:
106 resource,weight = r.split()
107 else:
108 resource,weight = r,1
109 try:
110 weight = int(weight)
111 except Exception,e:
112 self.logger.error("Exception: " + str(e))
113 self.logger.warn("Unable to get resource weight for: " +r)
114 weight = 1
115
116 self.logger.debug("%s:%u " % (resource,weight))
117 weighted_rlist.extend([resource]*weight)
118
119 return choice(weighted_rlist)
120
121 - def BundleFiles(self,job,tarballname="inputfiles.tar"):
122 """
123 Bundle multiple files as a single tarball so as not to exceed
124 limit of input files
125 @param filelist: list of filenames
126 @return: tarball path
127 """
128 import tarfile
129 tmpdir = os.path.join(job.GetInitialdir(),"inputfiles")
130 tarpath = os.path.join(self.GetInitialdir(),"inputfiles.tar")
131 if os.path.isfile(tarpath): return tarpath
132 if not os.path.exists(tmpdir):
133 os.makedirs(tmpdir)
134 cwdir = os.getcwd()
135 os.chdir(tmpdir)
136 tfile = tarfile.open(tarpath, 'w')
137 for file in job.GetInputFiles():
138 if file.endswith('.py'):
139 os.system('cp -f %s .' % file)
140 self.logger.info("adding file %s to tarball" % file)
141 tfile.add(os.path.basename(file))
142 tfile.close()
143 os.chdir(cwdir)
144 return tarpath
145
147 """
148 Write JDL to a file.
149 @param job: i3Job object
150 @param config_file: path to file were submit file will be written
151 """
152
153 if not job.GetExecutable():
154 raise Exception, "no executable configured"
155
156 submitfile = open(config_file,'w')
157 wrapper = open(config_file.replace('jdl','sh'),'w')
158 self.ids= config_file.replace('jdl','ids')
159
160 job.Write(wrapper,"#!/bin/sh")
161 job.Write(wrapper,'echo "running iceprod on $HOSTNAME";',parse=False)
162 job.Write(wrapper,"uname -a;")
163
164
165 job.Write(wrapper,"I3SCRATCH=/tmp",parse=False)
166 for var in self.env.keys():
167 job.Write(wrapper, "export %s=%s" % (var, self.env[var]),parse=False )
168 for var in job.env.keys():
169 job.Write(wrapper, "export %s=%s" % (var, job.env[var]),parse=False)
170
171 job.Write(wrapper,"INIT_DIR=$PWD",parse=False)
172 job.Write(wrapper,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False)
173 job.Write(wrapper,"mkdir -p $RUNDIR")
174 job.Write(wrapper,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False)
175 job.Write(wrapper,"cd $RUNDIR",parse=False)
176 job.Write(wrapper,"ln -s $INIT_DIR/* $RUNDIR/",parse=False)
177
178 err = basename(job.GetErrorFile())
179 out = basename(job.GetOutputFile())
180 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments()
181 argstr = job.GetMainScript() + " " + " ".join(argopts)
182 inputfiles = []
183 inputfiles.append(wrapper.name)
184 inputfiles.append(job.GetExecutable())
185 inputtar = self.BundleFiles(job,tarballname="inputfiles.tar")
186 inputfiles.append(inputtar)
187 for file in job.GetInputFiles():
188 if not file.endswith('.py'):
189 inputfiles.append(file)
190 inputfiles = map(qoute,inputfiles)
191
192 outputfiles = []
193 outputfiles.append('work.tgz')
194 outputfiles.append("icetray.%06u.log" % job.GetProcNum())
195 outputfiles.append('%s'%err)
196 outputfiles.append('%s'%out)
197 outputfiles.append(iceprod.core.exe._inventory)
198 outputfiles.extend(job.GetOutputFiles())
199 outputfiles = map(qoute,outputfiles)
200
201 pyhome = ''
202 if self.env.has_key('PYROOT'):
203 pyhome = self.env['PYROOT']
204 if job.env.has_key('PYROOT'):
205 pyhome = job.env['PYROOT']
206 if pyhome.startswith('http:'):
207 job.Write(wrapper,"wget --quiet %s" % pyhome,parse=False)
208 if pyhome.endswith('.tgz'):
209 job.env['PYROOT'] = '$PWD/python-2.3'
210 job.Write(wrapper,"tar xzf %s" % os.path.basename(pyhome),parse=False)
211
212
213 job.Write(submitfile,'Executable = "%s";' % basename(wrapper.name))
214 job.Write(submitfile,'StdOutput = "%s";' % out)
215 job.Write(submitfile,'StdError = "%s";' % err)
216 job.Write(submitfile,'InputSandbox = {%s};' % ','.join(inputfiles))
217 job.Write(submitfile,'OutputSandbox = {%s};' % ','.join(outputfiles))
218 job.Write(submitfile,'Arguments = "%s";' % argstr,parse=False)
219
220 job.Write(wrapper, 'cmd="$PYROOT/bin/python %s $*"' % basename(job.GetExecutable()),parse=False)
221 job.Write(wrapper, "echo $cmd",parse=False)
222 job.Write(wrapper, "$cmd",parse=False)
223 job.Write(wrapper, "retval=$?",parse=False)
224
225
226 for key in self.GetParamKeys():
227 if not job.batchopts.has_key(key):
228 job.AddBatchOpt(key,self.GetParam(key))
229
230
231 submitdir = dirname(os.path.abspath(out))
232 for key,opt in job.GetBatchOpts().items():
233 if key.startswith('-'): continue
234 elif key.upper().startswith('OUTPUTSANDBOXBASEDESTURI'):
235 opt += submitdir
236 job.Write(submitfile, '%s = "%s";' % (key, opt))
237
238 submitfile.close();
239
240 read_from_inventory = False
241 if not read_from_inventory:
242
243 if not self.GetArgOpt('stageout'):
244 job.Write(wrapper,"# copying files to storage element")
245 job.Write(wrapper,"for file in *; do")
246 job.Write(wrapper," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir())
247 job.Write(wrapper," lcg-cr --vo icecube \\",parse=False)
248 job.Write(wrapper," -d udo-dcache01.grid.uni-dortmund.de \\",parse=False)
249 job.Write(wrapper," -l lfn:/grid/icecube/iceprod/%s/%s/$file` \\" \
250 %(job.GetDatasetId(),job.GetProcNum()),parse=False)
251 job.Write(wrapper," file:$PWD/$file",parse=False)
252 job.Write(wrapper," fi; done")
253
254
255 else:
256
257
258
259
260
261
262 inventoryfile = os.path.join(submitdir,iceprod.core.exe._inventory)
263 copyok = 0
264 if not os.path.exists(inventoryfile):
265 self.logger.error("%d,%d: no output inventory found." % (dataset,proc))
266 return self.CopyStatusEnum['NOTREADY']
267 inventory = FileInventory()
268 inventorylist = []
269 self.logger.debug( 'reading %s' % inventoryfile )
270 inventory.Read(inventoryfile)
271
272
273 job.Write(wrapper, "export LFC_HOST=`lcg-infosites --vo icecube lfc`",parse=False)
274 job.Write(wrapper, "export LCG_GFAL_INFOSYS=lcg-bdii.ifh.de:2170",parse=False)
275
276
277
278 copy_cmds = []
279 for file in inventory.filelist:
280
281 cmd = 'lcg-cr --vo icecube '
282 cmd += '-l %s/%s ' % (file["target"], os.path.split(file["source"])[1])
283 cmd += '%s' % (file["source"])
284
285 copy_cmds.append("lfc-mkdir -p %s" % (file["target"][4:]))
286 copy_cmds.append(cmd)
287
288 cmd = "\n".join(copy_cmds)
289 job.Write(wrapper,"# copying files to storage element")
290 job.Write(wrapper,cmd,parse=False)
291
292
293 del_output = False
294 if del_output:
295 del_cmds = []
296 for file in inventory.filelist:
297 cmd = "/bin/rm %s" % (file["source"][5:])
298 del_cmds.append(cmd)
299 cmd = "\n".join(del_cmds)
300
301 job.Write(wrapper,"# deleting copied files")
302 job.Write(wrapper,cmd,parse=False)
303
304 job.Write(wrapper,"cd $INIT_DIR",parse=False)
305 job.Write(wrapper,"rm -rf $RUNDIR",parse=False)
306 job.Write(wrapper,"exit $retval",parse=False)
307
308 wrapper.close();
309
311 """
312 Submit job/cluster to grid
313
314 @param job: i3Job object
315 @param config_file: path to file were submit file will be written
316 """
317 status = 0
318 self.submit_status = ''
319 cwdir = os.getcwd()
320 for job in self.jobs:
321 self.logger.info("submitting job %u.%u" %(job.GetDatasetId(),job.GetProcNum()))
322 os.chdir(job.GetInitialdir())
323
324 self.WriteConfig(job,job.config_file)
325
326 submit_args = []
327
328
329 resource_list = []
330 for key,opt in job.GetBatchOpts().items():
331 if key.startswith('-resource'):
332 if opt.startswith('-r'):
333 resource = opt[2:].strip()
334 else:
335 resource = opt
336 resource_list.append(resource)
337
338 elif key.startswith('-'):
339 submit_args.append(opt)
340 if resource_list:
341 resource = self._choose_resource(resource_list)
342 submit_args.append('-r %s' % resource)
343
344 cmd = "%s %s -o %s %s" % (self.enqueue_cmd," ".join(submit_args),self.ids,job.config_file)
345 self.logger.debug(cmd)
346 status, self.submit_status = commands.getstatusoutput(cmd)
347 if status != 0:
348 self.logger.error("Failed to execute command")
349 self.logger.error(cmd)
350 try:
351 id = self.get_id(self.submit_status)
352 job.SetJobId(id)
353 if id in (-1,None) : status = 1
354 else: status = 0
355 except Exception, e:
356 self.logger.error("Exception: " + str(e))
357 self.submit_status += "\nException: " + str(e)
358 status = 1
359
360 if len(self.job_ids) and not self.cluster_id:
361 self.cluster_id = self.job_ids[0]
362
363 job.submit_status = status
364 job.submit_msg = self.submit_status
365
366 if self.production:
367
368 if job.submit_status == 0:
369 self.i3monitordb.jobsubmitted(
370 job.GetDatasetId(), job.GetProcNum(),
371 job.GetInitialdir(), job.GetJobId())
372 else:
373 logger.error("failed to submit jobs:"+job.submit_msg)
374 self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3,
375 "failed to submit jobs:"+job.submit_msg)
376 os.chdir('/tmp')
377
378
379 os.chdir(cwdir)
380
381
382 return status,self.submit_status
383
384
385 - def get_id(self,submit_status):
386 """
387 Parse string returned by on submission to extract the
388 id of the job cluster
389
390 @param submit_status: string returned by submit cdm
391 """
392
393 self.logger.debug(submit_status)
394 idfile = open(self.ids,'r')
395 job_id = None
396 for line in idfile.readlines():
397 if line.strip().startswith('http'):
398 job_id = line.strip()
399 self.job_ids.append(job_id)
400 break
401 if not job_id:
402 self.logger.warn('could not parse job id from "%s"' % self.ids)
403 return job_id
404
405
407 """
408 Querie status of cluster or job on queue
409 """
410
411 cmd = self.checkqueue_cmd
412 for id in self.job_ids:
413 cmd += " %s" % id
414 self.logger.debug(cmd)
415 pout = os.popen(cmd)
416 status = string.join(pout.readlines())
417 pout.close()
418 return status
419
421 """
422 Check consistency of queue and remove jobs which shouldn't be there
423 """
424
425 return 0
426
428 """
429 Querie status of job on glite queue
430 """
431 status_dict = {
432 'SUBMITTED': "The job has been submitted by the user but not yet processed by the Network Server",
433 'WAITING': "The job has been accepted by the Network Server but not yet processed by the Workload Manager",
434 'READY': "The job has been assigned to a Computing Element but not yet transferred to it",
435 'SCHEDULED': "The job is waiting in the Computing Element's queue",
436 'RUNNING': "The job is running",
437 'DONE': "The job has finished",
438 'ABORTED': "The job has been aborted by the WMS (e.g. because it was too long, or the proxy certificated expired, etc.)",
439 'CANCELLED': "The job has been cancelled by the user",
440 'CLEARED': "The Output Sandbox has been transferred to the User Interface ",
441 '?': "Unknown status"
442 }
443 if isinstance(jobs,list):
444 job_list = jobs
445 else:
446 job_list = [jobs]
447 for job in job_list:
448 dataset_id = job.GetDatasetId()
449 queue_id = job.GetProcNum()
450 job_id = job.GetJobId()
451 status = "?"
452
453 if not job_id:
454 job.SetStatus('?')
455
456 cmd = self.checkqueue_cmd + ' ' + job_id
457 handle = os.popen(cmd, 'r')
458
459 for line in handle.readlines():
460 line = line.strip()
461 if line.startswith('Current Status:'):
462 status = line[(line.find(':') + 1):].replace(' ','').split()[0]
463 handle.close()
464 self.logger.info("%s.%s (%s) : %s" %(dataset_id,queue_id,job_id,status.upper()))
465 if status_dict.has_key(status.upper()): self.logger.info(status_dict[status.upper()])
466
467 if status.upper() in self.queued_state:
468 job.SetStatus('QUEUED')
469 elif status.upper() in self.running_state:
470 job.SetStatus('PROCESSING')
471 elif status.upper() in self.error_state:
472 job.SetStatus('FAILED')
473 elif status.upper() in self.finished_state:
474 job.SetStatus('FINISHED')
475 else:
476 job.SetStatus('?')
477
478 return 1
479
480
482 """
483 Remove cluster or job from glite queue
484 """
485
486 cmd = self.queue_rm_cmd + " --noint "
487 if jobid: cmd += " %s" % job.job_id
488 else:
489 for id in self.job_ids: cmd += " %s" % id
490
491 self.logger.debug(cmd)
492 handle = os.popen(cmd, 'r')
493 status = string.join(handle.readlines())
494 handle.close()
495
496 if status.find('Job has already finished') != -1:
497 cmd = self.queue_rm_cmd + " --noint "
498 if job: cmd += " %s" % job.job_id
499 else:
500 for id in self.job_ids: cmd += " %s" % id
501
502 self.logger.debug(cmd)
503 handle = os.popen(cmd, 'r')
504 status = string.join(handle.readlines())
505 handle.close()
506
507 return status
508
509
510
511 - def PostCopy(self,jobdict,target_url,maxtries=4):
512 """
513 Interface: Remove active job/cluster from queuing system.
514 """
515 if not jobdict['grid_queue_id']: return True
516
517 dataset = jobdict['dataset_id']
518 proc = jobdict['queue_id']
519 job_id = jobdict['grid_queue_id']
520 submitdir = jobdict['submitdir']
521
522 cmd = '%s --noint --dir %s %s' % (self.get_output_cmd,submitdir,job_id)
523 self.logger.debug(cmd)
524 if not os.system(cmd):
525
526 jobdict['grid_queue_id'] = None
527
528
529 inventoryfile = os.path.join(submitdir,iceprod.core.exe._inventory)
530 if not os.path.exists(inventoryfile):
531 self.logger.error("%d,%d: no output inventory found." % (dataset,proc))
532 return self.CopyStatusEnum['NOTREADY']
533 inventory = FileInventory()
534 inventorylist = []
535 self.logger.debug( 'reading %s' % inventoryfile )
536 inventory.Read(inventoryfile)
537
538
539 for file in inventory.filelist:
540
541 jobdict['filename'] = os.path.basename(file['source'])
542 cmd = 'lcg-cp --vo icecube '
543 cmd += ' -v %s ' % self.grid_storage
544 cmd += ' file:/%(submitdir)s/%(filename)s'
545
546
547 if os.system(cmd % jobdict):
548 cmd = 'lcg-del --vo icecube '
549 cmd += ' -a %s ' % self.grid_storage
550 os.system(cmd % jobdict)
551
552 return self.CopyStatusEnum['OK']
553
554
555 - def Clean(self,jobdict,force=False):
586
588 """
589 purge job from from queue
590 """
591
592 return self.QRemove(job)
593
594
596 """
597 This class represents a job or cluster on an egee grid.
598 """
599
601 gLite.__init__(self)
602 self.enqueue_cmd = "edg-job-submit"
603 self.checkqueue_cmd = "edg-job-status"
604 self.get_output_cmd = "edj-job-get-output"
605 self.queue_rm_cmd = "edg-job-cancel"
606 self.logger = logging.getLogger('Edg')
607
608
609 - def PostCopy(self,jobdict,target_url,maxtries=4):
610 """
611 Interface: Remove active job/cluster from queuing system.
612 """
613
614 dataset = jobdict['dataset_id']
615 proc = jobdict['queue_id']
616 job_id = jobdict['grid_queue_id']
617 submitdir = jobdict['submitdir']
618
619 cmd = '%s -dir %s/output %s' % (self.get_output_cmd,submitdir,job_id)
620 self.logger.debug(cmd)
621 os.system(cmd)
622 return True
623
624 - def Clean(self,jobdict,force=False):
642
643
644
646 """
647 Next generation middleware replacement for gLite
648 """
649 queued_state = [
650 'IDLE',
651 'PENDING',
652 'REGISTERED',
653 ]
654
655 running_state = [
656 'RUNNING',
657 'REALLY-RUNNING',
658 ]
659
660 error_state = [
661 'ABORTED',
662 'DONE-FAILED',
663 'CANCELLED',
664 'HELD',
665 ]
666
667 finished_state = [
668 'DONE-OK',
669 ]
670
671 undetermined_states = [
672 'UNKNOWND',
673 ]
674
676 gLite.__init__(self)
677 self.cluster_id = -1
678 self.post = None
679 self.ids = "ids"
680 self.enqueue_cmd = "glite-ce-job-submit"
681 self.checkqueue_cmd = "glite-ce-job-status"
682 self.list_queue_cmd = "glite-ce-job-list"
683 self.queue_rm_cmd = "glite-ce-job-cancel"
684 self.get_output_cmd = "glite-ce-job-output"
685 self.purge_job_cmd = "glite-ce-job-purge"
686 self.suffix = "jdl"
687 self.logger = logging.getLogger('Cream')
688
690 """
691 Querie status of cluster or job on queue
692 """
693
694 cmd = self.checkqueue_cmd
695 for id in self.job_ids:
696 cmd += " %s" % id
697 self.logger.debug(cmd)
698 pout = os.popen(cmd)
699 status = string.join(pout.readlines())
700 pout.close()
701
703 """
704 Check consistency of queue and remove jobs which shouldn't be there
705 """
706 cmd = self.list_queue_cmd
707 pout = os.popen(cmd)
708 status = string.join(pout.readlines())
709 pout.close()
710 print status
711
713 """
714 purge job from from cream queue
715 """
716
717 cmd = self.purge_job_cmd + " --noint "
718 if jobid: cmd += " %s" % jobid
719 else:
720 for id in self.job_ids: cmd += " %s" % id
721
722 self.logger.debug(cmd)
723 status, output = commands.getstatusoutput(cmd)
724 if status: self.logger.error(output)
725 return status
726
739
741 """
742 Querie status of job on glite queue
743 """
744 status_dict = {
745 'REGISTERED': "The job has been registered but it has not been started yet",
746 'PENDING': "The job has been started, but it has still to be submitted to the LRMS abstraction layer module",
747 'IDLE': "The job is idle in the Local Resource Management System (LRMS)",
748 'RUNNING': "The job wrapper, which ecompasses the user job, is running in the LRMS",
749 'REALLY-RUNNING': "The job is running",
750 'DONE-OK': "The job has finished",
751 'DONE-FAILED': "The job has been executed but some errors occurred",
752 'ABORTED': "Errors occurred during the management of the job, e.g. submission to the LRMS",
753 'CANCELLED': "The job has been cancelled by the user",
754 '?': "Unknown status"
755 }
756 if isinstance(jobs,list):
757 job_list = jobs
758 else:
759 job_list = [jobs]
760 for job in job_list:
761 dataset_id = job.GetDatasetId()
762 queue_id = job.GetProcNum()
763 job_id = job.GetJobId()
764 status = "?"
765
766 if not job_id:
767 job.SetStatus('?')
768
769 cmd = self.checkqueue_cmd + ' ' + job_id
770 try:
771 try:
772 handle = os.popen(cmd, 'r')
773 status_search = re.search(r'Status\s*=\s*\[[a-zA-Z\-]+\]',handle.read())
774 except Exception,e:
775 self.logger.warning("unable to receive job status: %s"%e)
776 finally:
777 if handle:
778 handle.close()
779 self.logger.info("job status %s" %(status_search))
780 if not status_search:
781 self.logger.warning("job status for %s.%s is unknown. marking as ?"%(dataset_id,queue_id))
782 job.SetStatus('?')
783 continue
784 status = status_search.group()
785 status = status.split('=')[1]
786 status = status.strip()
787 status = status.strip(']')
788 status = status.strip('[')
789 print status
790 self.logger.info("%s.%s (%s) : %s" %(dataset_id,queue_id,job_id,status.upper()))
791 if status_dict.has_key(status.upper()): self.logger.info(status_dict[status.upper()])
792
793 if status.upper() in self.queued_state:
794 job.SetStatus('QUEUED')
795 elif status.upper() in self.running_state:
796 job.SetStatus('PROCESSING')
797 elif status.upper() in self.error_state:
798 job.SetStatus('FAILED')
799 elif status.upper() in self.finished_state:
800 job.SetStatus('FINISHED')
801 else:
802 job.SetStatus('?')
803
804 return 1
805
806
808 """
809 This class represents a job that executes in multiple parts using a DAG.
810 The DAG is only supported in gLite and not in Cream.
811 """
812
815
817 """
818 Write JDL submit file.
819 @param job: i3Job object
820 @param config_file: path to file where submit file will be written
821 """
822
823 if not job.GetExecutable():
824 raise Exception, "no executable configured"
825
826 from iceprod.core.dataclasses import IceTrayConfig
827
828 db = self.GetMonitorDB()
829
830 steering = job.GetSteering()
831 task_defs = steering.GetTaskDefinitions()
832 logger.debug("Task definitions: %s" % task_defs)
833 if not len(task_defs):
834
835 logger.debug("No tasks specified in config file; doing regular submit")
836 return gLite.WriteConfig(self, job, config_file)
837
838 dagfile = open(config_file,'w')
839 job_id = job.GetDatabaseId()
840
841 job.Write(dagfile,'[')
842 job.Write(dagfile,' Type = "dag";')
843 job.Write(dagfile,' max_running_nodes = 25;')
844
845 file_catalog = {}
846 for taskname,td in task_defs.items():
847 args = self.GetArguments(job,td,output="dict")
848 file_catalog[taskname] = self.GetFiles(job,td,args)
849
850 job.Write(dagfile,' nodes = [')
851 dag_nodes = []
852 for taskname,td in task_defs.items():
853 filename = config_file.replace('jdl',"%s.jdl" % taskname)
854 td_id = td.GetId()
855 logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, filename))
856 parents = self.EnumerateParentNodes(steering,td)
857 if td.ParallelExecutionEnabled():
858 trays = td.GetTrays()
859 for idx,tray in trays.items():
860 for iter in tray.GetIters():
861 cjob = deepcopy(job)
862
863 if not iter == -1:
864 nodename = "%s_%u" % (taskname,iter)
865 else:
866 nodename = "%s_ext" % taskname
867 filename = config_file.replace('jdl',"%s.jdl" % nodename)
868 done = db.task_is_finished(td_id, job_id, idx, iter)
869 args = self.GetArguments(cjob,td,idx,iter,output="dict")
870
871 input,output,notes = self.GetFiles(cjob,td,args,idx,iter,file_catalog)
872
873 self.WriteFileManifest(cjob,filename,input,output,notes)
874 cjob.AddInputFile(filename.replace(".jdl", ".input"))
875 cjob.AddInputFile(filename.replace(".jdl", ".output"))
876 self.WriteSubmitFile(cjob,filename,td,idx,iter)
877 cjob.Write(dagfile,' %s = [ ' % nodename)
878 cjob.Write(dagfile,' file = "%s"; ' % filename)
879 cjob.Write(dagfile,' node_retry_count = 1; ')
880 cjob.Write(dagfile,' ];')
881 if len(parents) == 1:
882 dag_nodes.append("{%s,%s}" % (parents[0],nodename))
883 elif len(parents) > 1:
884 dag_nodes.append("{%s,%s}" % ("{%s}" % ",".join(parents),nodename))
885 else:
886 cjob = deepcopy(job)
887 done = db.task_is_finished(td_id, job_id)
888 if taskname == 'trashcan':
889 done = False
890 args = self.GetArguments(cjob,td,output="dict")
891
892 input,output,notes = self.GetFiles(job,td,args,catalog=file_catalog)
893 self.WriteFileManifest(cjob,filename,input,output,notes)
894 self.WriteSubmitFile(cjob,filename,td)
895 cjob.Write(dagfile,' %s = [ ' % taskname)
896 cjob.Write(dagfile,' file = "%s"; ' % filename)
897 cjob.Write(dagfile,' node_retry_count = 1; ')
898 cjob.Write(dagfile,' ];')
899 if len(parents) == 1:
900 dag_nodes.append("{%s,%s}" % (parents[0],taskname))
901 elif len(parents) > 1:
902 dag_nodes.append("{%s,%s}" % ("{%s}" % ",".join(parents),taskname))
903 job.Write(dagfile,' ]; ')
904 job.Write(dagfile,' dependencies = { %s };' % ",".join(dag_nodes))
905 job.Write(dagfile,']; ')
906 db.commit()
907 dagfile.close()
908 self.ids= config_file.replace('jdl','ids')
909
911 return bool(re.match("[^/:]+://?.*$", path))
912
913 - def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False):
914 steering = job.GetSteering()
915 if idx is not False:
916 td_trays = {idx: td.GetTray(idx)}
917 else:
918 td_trays = td.GetTrays()
919
920 if args.has_key('dagtemp'):
921 tmp_dir = args['dagtemp']
922 else:
923 tmp_dir = 'file:///tmp'
924
925 if args.has_key('fetch'):
926 global_dir = args['fetch']
927 else:
928 global_dir = 'file:///tmp'
929
930 td_input = {}
931 td_output = {}
932 notes = {}
933
934 if td.IsCleanup():
935
936 return (td_input, td_output, notes)
937
938 for idx, td_tray in td_trays.iteritems():
939 args['tray'] = idx
940
941 logger.debug("GetTray(%s)" % idx)
942 icetray = steering.GetTray(idx)
943
944 input_files = icetray.GetInputFiles()
945 parsed_input = []
946
947 output_files = icetray.GetOutputFiles()
948 parsed_output = []
949
950 if iter is not False:
951 iters = [iter]
952 else:
953 iters = td_tray.GetIters()
954 for iter in iters:
955 args['iter'] = iter
956 parser = ExpParser(args,steering)
957 for d in steering.GetDependencies():
958 d_file = parser.parse(d)
959 if not td_input.has_key(d_file):
960 location = d_file
961 if not self.IsUrl(location):
962 location = os.path.join(global_dir, location)
963 td_input[os.path.basename(d_file)] = [location]
964 for i_file in input_files:
965 name = i_file.GetName()
966 name = parser.parse(name)
967 if not td_input.has_key(name) \
968 and not td_output.has_key(name):
969 if catalog:
970 node = self.FindFile(steering,td,catalog,name)
971 else:
972 node = td.GetName()
973
974 note = False
975 if node == "global":
976 location = global_dir
977 note = "global"
978 if node != "global":
979 location = tmp_dir
980 location = os.path.join(location, str(job.GetDatasetId()))
981 location = os.path.join(location, str(job.GetProcNum()))
982 location = os.path.join(location, node)
983 note = "dontextract"
984 location = os.path.join(location, name)
985 if i_file.IsPhotonicsTable():
986 note = "photonics";
987 if note:
988 notes[name] = note
989 td_input[name] = [location]
990 for o_file in output_files:
991 name = o_file.GetName()
992 name = parser.parse(name)
993 if not td_output.has_key(name):
994 location = os.path.join(tmp_dir, str(job.GetDatasetId()))
995 location = os.path.join(location, str(job.GetProcNum()))
996 location = os.path.join(location, str(td.GetName()))
997 location = os.path.join(location, name)
998 td_output[name] = [location]
999
1000 return (td_input, td_output, notes)
1001
1002 - def FindFile(self,steering,td,catalog,file):
1003 parents = td.GetParents()
1004
1005
1006 for parent in parents:
1007 if catalog.has_key(parent):
1008 if catalog[parent][1].has_key(file):
1009 return parent
1010
1011
1012 for parent in parents:
1013 parent_td = steering.GetTaskDefinition(parent)
1014 result = self.FindFile(steering,parent_td,catalog,file)
1015 if result != "global":
1016 return result
1017
1018 return "global"
1019
1021 logger.debug("Input files: %s" % input)
1022 in_manifest = open(filename.replace(".jdl", ".input"), 'w')
1023 if len(input):
1024 padding = str(max(map(lambda x: max(map(len, x)), input.values())))
1025 fmt_str = "%-" + padding + "s %s"
1026 for i_file, locs in input.items():
1027 for loc in locs:
1028 file = fmt_str % (loc, i_file)
1029 if notes.has_key(i_file):
1030 file += "\t%s" % notes[i_file]
1031 job.Write(in_manifest, file)
1032 in_manifest.close()
1033 logger.debug("Output files: %s" % output)
1034 out_manifest = open(filename.replace(".jdl", ".output"), 'w')
1035 if len(output):
1036 padding = str(max(map(lambda x: max(map(len, x)), input.values())))
1037 fmt_str = "%-" + padding + "s %s"
1038 for o_file, locs in output.items():
1039 for loc in locs:
1040 job.Write(out_manifest, fmt_str % (o_file, loc))
1041 out_manifest.close()
1042
1044 parents = td.GetParents()
1045 parentnodes = []
1046 for parent in parents:
1047 parentobj = steering.GetTaskDefinition(parent)
1048 if parentobj.ParallelExecutionEnabled():
1049 for idx,tray in parentobj.GetTrays().items():
1050 for iter in tray.GetIters():
1051 if not iter == -1:
1052 nodename = "%s_%u" % (parent,iter)
1053 else:
1054 nodename = "%s_ext" % parent
1055 parentnodes.append(nodename)
1056 else:
1057 parentnodes.append(parent)
1058 return parentnodes
1059
1060 - def GetArguments(self,job,td,idx=False,iter=False,output="str"):
1069
1071 args = {}
1072 for str in argstr.split(" "):
1073 str = str[2:]
1074 pieces = str.split("=")
1075 if len(pieces) == 1:
1076 args[str] = 1
1077 else:
1078 args[pieces[0]] = pieces[1]
1079 return args
1080
1083
1092
1096