1
2
3
4 """
5 A basic wrapper for submitting and monitoring jobs to Condor.
6 This module implements only a small subset of Condor's many features.
7 (http://www.cs.wisc.edu/condor)
8 Inherits from i3Queue
9
10 copyright (c) 2005 the icecube collaboration
11
12 @version: $Revision: $
13 @date: $Date: $
14 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
15 @todo: implement more functionality of condor.
16 """
17
18 import os
19 import re
20 import sys
21 import math
22 import dircache
23 import time
24 import string
25 import shutil
26 import ConfigParser
27 import logging
28 import commands
29 import getpass
30 from os.path import expandvars
31 from iceprod.core import metadata
32 from iceprod.core.dataclasses import Steering
33 from iceprod.core.lex import ExpParser
34 from iceprod.server.db import ConfigDB,MonitorDB
35 from iceprod.server.grid import iGrid
36
37 logger = logging.getLogger('Condor')
38
39 condor_status = {
40 '0':'UNEXPANDED',
41 '1':'QUEUED',
42 '2':'PROCESSING',
43 '3':'RESET',
44 '4':'FINISHED',
45 '5':'ERROR',
46 '6':'ERROR'
47 }
48
54
55
57 """
58 This class represents a job or cluster on a condor system.
59 """
60
61
62 valid_universes = [
63 'STANDARD',
64 'PVM',
65 'VANILLA',
66 'SCHEDULER',
67 'MPI',
68 'GLOBUS',
69 'GRID',
70 'JAVA'
71 ]
72
73
75 iGrid.__init__(self)
76 self.universe = "vanilla"
77 self.cluster_id = -1
78 self.post = None
79 self.enqueue_cmd = "condor_submit"
80 self.checkqueue_cmd = "condor_q"
81 self.queue_rm_cmd = "condor_rm"
82 self.suffix = "condor"
83 self.exe_prefix = "iceprod."
84
85 self.AddParam("Should_Transfer_Files","ALWAYS")
86 self.AddParam("When_To_Transfer_Output","ON_EXIT")
87
88 - def Submit(self,job,config_file):
89 """
90 Submit job/cluster to Condor
91 @param job: i3Job object
92 @param config_file: path to file were submit file will be written
93 """
94 self.submit_status = ''
95 self.WriteConfig(job,config_file)
96
97 cmd = self.enqueue_cmd
98 for key in self.GetParamKeys():
99 if key.startswith('-'):
100 cmd += " %s %s" %( key, self.GetParam(key) )
101 cmd += " " + config_file
102 status, self.submit_status = commands.getstatusoutput(cmd)
103 try:
104 id = self.get_id(self.submit_status)
105 job.SetJobId(id)
106 if id < 0: status = 1
107 status = 0
108 except Exception, e:
109 logger.error("Exception: " + str(e))
110 self.submit_status += "\nException: " + str(e)
111 status = 1
112
113 if len(self.job_ids) and not self.cluster_id:
114 self.cluster_id = self.job_ids[0]
115
116 return status,self.submit_status
117
118
120 """
121 Define the condor universe for this job
122 @param universe: string contaning a valid condor universe
123 """
124 if universe.upper() in self.valid_universes:
125 self.universe = universe
126 else:
127 raise Exception, 'unsupported universe: %s' % universe
128
130 """
131 Get the condor universe for this job
132 @return: the currently set condor universe. If none has been set
133 the default value of 'vanilla' is returned.
134 """
135 return self.universe
136
137
139 """
140 Get the cluster AND job id for the submitted jobs.
141 @return: a list of jobs with their cluster and job id
142 in the condor format
143 None if no jobs have been submitted or if submission failed.
144 """
145 return ['%d.%d' % (self.cluster_id, job_id) \
146 for job_id in range(self.jobs_submitted)]
147
148
150 """
151 Write condor submit file to a file.
152 @param job: i3Job object
153 @param config_file: path to file were submit file will be written
154 """
155
156 if not job.GetExecutable():
157 raise Exception, "no executable configured"
158
159 submitfile = open(config_file,'w')
160 wrapper = open(config_file.replace('condor','sh'),'w')
161
162 job.Write(wrapper,"#!/bin/sh")
163 job.Write(submitfile,"Universe = %s" % self.GetUniverse())
164 job.Write(submitfile,"Executable = %s" % wrapper.name)
165 job.Write(submitfile,"Log = %s" % job.GetLogFile())
166 job.Write(submitfile,"Output = %s" % job.GetOutputFile())
167 job.Write(submitfile,"Error = %s" % job.GetErrorFile())
168
169 job.Write(wrapper, "ARCH=`uname -m | sed -e 's/Power Macintosh/ppc/ ; s/i686/i386/'`")
170 job.Write(wrapper, "PVER=`uname -r | awk '{split($1,a,\".\"); print a[2]}'`")
171 job.Write(wrapper, "if [ $ARCH == 'i386' ]; then",parse=False)
172 job.Write(wrapper, " if [ $PVER == '4' ]; then",parse=False)
173 job.Write(wrapper, " PLATFORM=Linux-i386")
174 job.Write(wrapper, " elif [ $PVER == '6' ]; then",parse=False)
175 job.Write(wrapper, " PLATFORM=Linux-libstdc++6-i386")
176 job.Write(wrapper, " fi")
177 job.Write(wrapper, "elif [ $ARCH == 'x86_64' ]; then",parse=False)
178 job.Write(wrapper, " if [ $PVER == '4' ]; then",parse=False)
179 job.Write(wrapper, " PLATFORM=Linux-x86_64")
180 job.Write(wrapper, " elif [ $PVER == '6' ]; then",parse=False)
181 job.Write(wrapper, " PLATFORM=Linux-libstdc++6-x86_64")
182 job.Write(wrapper, " fi")
183 job.Write(wrapper, "fi")
184
185 for var in self.env.keys():
186 job.Write(wrapper, "export %s=%s" % (var, self.env[var]) ,parse=False)
187 for var in job.env.keys():
188 job.Write(wrapper, "export %s=%s" % (var, job.env[var]) ,parse=False)
189 job.Write(wrapper, 'export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH',parse=False)
190 job.Write(wrapper, 'cmd="$PYROOT/bin/python %s $*"' % os.path.basename(job.GetExecutable()),parse=False)
191 job.Write(wrapper, "echo $cmd",parse=False)
192 job.Write(wrapper, "exec $cmd",parse=False)
193
194
195 if job.GetInitialdir():
196 job.Write(submitfile,"Initialdir = %s" % job.GetInitialdir())
197
198 if job.GetInputFiles():
199 inputfile_list = ",".join(job.GetInputFiles())
200 inputfile_list += ","+job.GetExecutable()
201 job.Write(submitfile,"%s = %s" % ("Transfer_Input_Files",inputfile_list))
202
203 if job.GetOutputFiles():
204 outputfile_list = ",".join(job.GetOutputFiles())
205 job.Write(submitfile,"%s = %s" % ("Transfer_Output_Files",inputfile_list))
206
207
208
209
210
211 for key in self.GetParamKeys():
212 if not key.startswith('-'):
213 job.Write(submitfile, "%s = %s" % (key, self.GetParam(key)))
214
215
216 for key,opt in job.GetBatchOpts().items():
217 if not key.startswith('-'):
218 job.Write(submitfile, "%s = %s" % (key, opt))
219
220 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments()
221 argstr = job.GetMainScript() + " " + " ".join(argopts)
222 job.Write(submitfile,"Arguments = %s" % argstr,parse=False)
223
224 job.Write(submitfile, "Queue" )
225 submitfile.close();
226 wrapper.close();
227 os.system("chmod +x %s" % wrapper.name)
228
229
230 - def get_id(self,submit_status):
231 """
232 Parse string returned by condor on submission to extract the
233 id of the job cluster
234
235 @param submit_status: string returned by condor_submit
236 """
237 matches = re.findall("[0-9]+ job\(s\) submitted to cluster [0-9]+",
238 submit_status)
239 logger.debug(submit_status)
240 if matches:
241 cluster_info = matches[0].split()
242 job_id = cluster_info[-1]
243
244 self.job_ids.append(job_id)
245 return job_id
246 else:
247 logger.warn('could not parse job id from "%s"' % submit_status)
248 return -1
249
251 """
252 Querie status of cluster or job on condor queue
253 """
254
255 cmd = "condor_q " + " ".join(self.job_ids)
256 status,output = commands.getstatusoutput(cmd)
257 return output
258
259
261 """
262 Querie status of cluster or job on condor queue
263 and remove those which are not active in DB
264 """
265
266 if not jobs: return 0
267
268 if isinstance(jobs,list):
269 job_list = jobs
270 else:
271 job_list = [jobs]
272
273 jobs_to_remove = []
274 job_dict = dict()
275
276 for job in job_list:
277 job_dict[job.GetJobId()] = job
278
279 cmd = 'condor_q %s'% getpass.getuser()
280 cmd += ' -format "%s " ClusterId'
281 cmd += ' -format "%s " Cmd'
282 cmd += ' -format "%s\\n" JobStatus'
283 status,output = commands.getstatusoutput(cmd)
284 if not status:
285 for line in output.split('\n'):
286 try:
287 tok = line.split()
288 jobId = tok[0]
289 executable = os.path.basename(tok[1])
290 jobStatus = tok[2]
291 if executable.startswith(self.exe_prefix):
292 name,dataset,proc,suffix = executable.split('.')
293 if int(dataset) and not job_dict.has_key(jobId):
294 logger.warn("removing job %s with status %s. Reason: job not found in list" % \
295 (jobId,cstatus(jobStatus)))
296 logger.debug("job list [%s]" % str(job_dict.keys()))
297 jobs_to_remove.append(jobId)
298 except Exception,e:
299 logger.error("%s:%s" %(e,line))
300 if len(jobs_to_remove):
301 cmd = "condor_rm %s" % " ".join(jobs_to_remove)
302 logger.debug(cmd)
303 os.system(cmd)
304
305 return status
306
307
309 """
310 Querie status of job on condor queue
311 """
312 if isinstance(jobs,list):
313 job_list = jobs
314 else:
315 job_list = [jobs]
316 for job in job_list:
317 job_id = job.GetJobId()
318 if job_id < 0: return 0
319 cmd = 'condor_q -format "%s" JobStatus ' + str(job_id)
320 status,output = commands.getstatusoutput(cmd)
321 if status:
322 job.SetStatus('?')
323 else:
324 job.SetStatus(cstatus(output.strip()))
325 return 1
326
327
329 """
330 Remove cluster or job from condor queue
331 """
332 if isinstance(jobs,list):
333 job_list = jobs
334 else:
335 job_list = [jobs]
336
337 status = []
338 for jobid in job_list:
339 if jobid:
340 cmd = "condor_rm %s" % jobid
341 logger.debug(cmd)
342 handle = os.popen(cmd, 'r')
343 status.append( string.join(handle.readlines()) )
344 handle.close()
345 return "\n".join(status)
346
347
349 """
350 This class represents a job or cluster on a condor system.
351 """
352
355
356
369
371 """
372 This class represents a job that executes in multiple parts using a DAG.
373 """
374
376 Condor.__init__(self)
377 self.enqueue_cmd = "condor_submit_dag -f"
378 self.exe_prefix = "condor_dagman"
379
381 """
382 Write condor submit file to a file.
383 @param job: i3Job object
384 @param config_file: path to file where submit file will be written
385 """
386
387 if not job.GetExecutable():
388 raise Exception, "no executable configured"
389
390 from iceprod.core.dataclasses import IceTrayConfig
391
392 db = self.GetMonitorDB()
393
394 steering = job.GetSteering()
395 task_defs = steering.GetTaskDefinitions()
396 logger.debug("Task definitions: %s" % task_defs)
397 if not len(task_defs):
398
399 logger.debug("No tasks specified in config file; doing regular submit")
400 self.enqueue_cmd = "condor_submit"
401 return Condor.WriteConfig(self, job, config_file)
402
403 dagfile = open(config_file,'w')
404 job_id = job.GetDatabaseId()
405
406 file_catalog = {}
407 for taskname,td in task_defs.items():
408 args = self.GetArguments(job,td,output="dict")
409 file_catalog[taskname] = self.GetFiles(job,td,args)
410
411 for taskname,td in task_defs.items():
412 filename = config_file.replace('condor',"%s.condor" % taskname)
413 td_id = td.GetId()
414 logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, filename))
415 parents = " ".join(self.EnumerateParentNodes(steering,td))
416 if td.ParallelExecutionEnabled():
417 trays = td.GetTrays()
418 for idx,tray in trays.items():
419 for iter in tray.GetIters():
420 if not iter == -1:
421 nodename = "%s_%u" % (taskname,iter)
422 else:
423 nodename = "%s_ext" % taskname
424 filename = config_file.replace('condor',"%s.condor" % nodename)
425 done = db.task_is_finished(td_id, job_id, idx, iter)
426 args = self.GetArguments(job,td,idx,iter,output="dict")
427
428 input,output,notes = self.GetFiles(job,td,args,idx,iter,file_catalog)
429 self.WriteFileManifest(job,filename,input,output,notes)
430 self.WriteSubmitFile(job,filename,td,idx,iter)
431 self.WriteDAGNode(job,dagfile,nodename,filename,parents,done)
432 else:
433 done = db.task_is_finished(td_id, job_id)
434 args = self.GetArguments(job,td,output="dict")
435
436 input,output,notes = self.GetFiles(job,td,args,catalog=file_catalog)
437 self.WriteFileManifest(job,filename,input,output,notes)
438 self.WriteSubmitFile(job,filename,td)
439 self.WriteDAGNode(job,dagfile,taskname,filename,parents,done)
440 db.commit()
441
442 job.Write(dagfile,"DOT dag.dot")
443 dagfile.close()
444
446 return bool(re.match("[^/:]+://?.*$", path))
447
448 - def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False):
449 steering = job.GetSteering()
450 if idx is not False:
451 td_trays = {idx: td.GetTray(idx)}
452 else:
453 td_trays = td.GetTrays()
454
455 if args.has_key('dagtemp'):
456 tmp_dir = args['dagtemp']
457 else:
458 tmp_dir = 'file:///tmp'
459
460 if args.has_key('fetch'):
461 global_dir = args['fetch']
462 else:
463 global_dir = 'file:///tmp'
464
465 td_input = {}
466 td_output = {}
467 notes = {}
468
469 if td.IsCleanup():
470
471 return (td_input, td_output, notes)
472
473 for idx, td_tray in td_trays.iteritems():
474 args['tray'] = idx
475 icetray = steering.GetTray(idx)
476
477 input_files = icetray.GetInputFiles()
478 parsed_input = []
479
480 output_files = icetray.GetOutputFiles()
481 parsed_output = []
482
483 if iter is not False:
484 iters = [iter]
485 else:
486 iters = td_tray.GetIters()
487 for iter in iters:
488 args['iter'] = iter
489 parser = ExpParser(args,steering)
490 for d in steering.GetDependencies():
491 d_file = parser.parse(d)
492 if not td_input.has_key(d_file):
493 location = d_file
494 if not self.IsUrl(location):
495 location = os.path.join(global_dir, location)
496 td_input[os.path.basename(d_file)] = [location]
497 for i_file in input_files:
498 name = i_file.GetName()
499 name = parser.parse(name)
500 if not td_input.has_key(name) \
501 and not td_output.has_key(name):
502 if catalog:
503 node = self.FindFile(steering,td,catalog,name)
504 else:
505 node = td.GetName()
506
507 note = False
508 if node == "global":
509 location = global_dir
510 note = "global"
511 if node != "global":
512 location = tmp_dir
513 location = os.path.join(location, str(job.GetDatasetId()))
514 location = os.path.join(location, str(job.GetProcNum()))
515 location = os.path.join(location, node)
516 note = "dontextract"
517 location = os.path.join(location, name)
518 if i_file.IsPhotonicsTable():
519 note = "photonics";
520 if note:
521 notes[name] = note
522 td_input[name] = [location]
523 for o_file in output_files:
524 name = o_file.GetName()
525 name = parser.parse(name)
526 if not td_output.has_key(name):
527 location = os.path.join(tmp_dir, str(job.GetDatasetId()))
528 location = os.path.join(location, str(job.GetProcNum()))
529 location = os.path.join(location, str(td.GetName()))
530 location = os.path.join(location, name)
531 td_output[name] = [location]
532
533 return (td_input, td_output, notes)
534
535 - def FindFile(self,steering,td,catalog,file):
536 parents = td.GetParents()
537
538
539 for parent in parents:
540 if catalog.has_key(parent):
541 if catalog[parent][1].has_key(file):
542 return parent
543
544
545 for parent in parents:
546 parent_td = steering.GetTaskDefinition(parent)
547 result = self.FindFile(steering,parent_td,catalog,file)
548 if result != "global":
549 return result
550
551 return "global"
552
554 logger.debug("Input files: %s" % input)
555 in_manifest = open(filename.replace(".condor", ".input"), 'w')
556 if len(input):
557 padding = str(max(map(lambda x: max(map(len, x)), input.values())))
558 fmt_str = "%-" + padding + "s %s"
559 for i_file, locs in input.items():
560 for loc in locs:
561 file = fmt_str % (loc, i_file)
562 if notes.has_key(i_file):
563 file += "\t%s" % notes[i_file]
564 job.Write(in_manifest, file)
565 in_manifest.close()
566 logger.debug("Output files: %s" % output)
567 out_manifest = open(filename.replace(".condor", ".output"), 'w')
568 if len(output):
569 padding = str(max(map(lambda x: max(map(len, x)), input.values())))
570 fmt_str = "%-" + padding + "s %s"
571 for o_file, locs in output.items():
572 for loc in locs:
573 job.Write(out_manifest, fmt_str % (o_file, loc))
574 out_manifest.close()
575
576 - def WriteDAGNode(self,job,dagfile,nodename,filename,parents=None,done=False):
577 str = "JOB %s %s" % (nodename,filename)
578 if done:
579 str += " DONE"
580 job.Write(dagfile,str)
581 if parents:
582 job.Write(dagfile, "PARENT %s CHILD %s" % (parents,nodename))
583
585 parents = td.GetParents()
586 parentnodes = []
587 for parent in parents:
588 parentobj = steering.GetTaskDefinition(parent)
589 if parentobj.ParallelExecutionEnabled():
590 for idx,tray in parentobj.GetTrays().items():
591 for iter in tray.GetIters():
592 if not iter == -1:
593 nodename = "%s_%u" % (parent,iter)
594 else:
595 nodename = "%s_ext" % parent
596 parentnodes.append(nodename)
597 else:
598 parentnodes.append(parent)
599 return parentnodes
600
601 - def GetArguments(self,job,td,idx=False,iter=False,output="str"):
610
612 args = {}
613 for str in argstr.split(" "):
614 str = str[2:]
615 pieces = str.split("=")
616 if len(pieces) == 1:
617 args[str] = 1
618 else:
619 args[pieces[0]] = pieces[1]
620 return args
621
624
626 wrapper = open(filename.replace('condor','sh'),'w')
627
628 job.Write(wrapper,"#!/bin/sh")
629 job.Write(wrapper, "ARCH=`uname -m | sed -e 's/Power Macintosh/ppc/ ; s/i686/i386/'`")
630 job.Write(wrapper, "PVER=`uname -r | awk '{split($1,a,\".\"); print a[2]}'`")
631 job.Write(wrapper, "if [ $ARCH == 'i386' ]; then",parse=False)
632 job.Write(wrapper, " if [ $PVER == '4' ]; then",parse=False)
633 job.Write(wrapper, " PLATFORM=Linux-i386")
634 job.Write(wrapper, " elif [ $PVER == '6' ]; then",parse=False)
635 job.Write(wrapper, " PLATFORM=Linux-libstdc++6-i386")
636 job.Write(wrapper, " fi")
637 job.Write(wrapper, "elif [ $ARCH == 'x86_64' ]; then",parse=False)
638 job.Write(wrapper, " if [ $PVER == '4' ]; then",parse=False)
639 job.Write(wrapper, " PLATFORM=Linux-x86_64")
640 job.Write(wrapper, " elif [ $PVER == '6' ]; then",parse=False)
641 job.Write(wrapper, " PLATFORM=Linux-libstdc++6-x86_64")
642 job.Write(wrapper, " fi")
643 job.Write(wrapper, "fi")
644
645 for var in self.env.keys():
646 job.Write(wrapper, "export %s=%s" % (var, self.env[var]), parse=False)
647 for var in job.env.keys():
648 job.Write(wrapper, "export %s=%s" % (var, job.env[var]), parse=False)
649 job.Write(wrapper, 'export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH', parse=False)
650 job.Write(wrapper, 'cmd="$PYROOT/bin/python -u %s $*"' % os.path.basename(job.GetExecutable()), parse=False)
651 job.Write(wrapper, "echo $cmd", parse=False)
652 job.Write(wrapper, "exec $cmd", parse=False)
653 wrapper.close();
654
655 submitfile = open(filename,'w')
656 job.Write(submitfile,"Universe = %s" % self.GetUniverse())
657 job.Write(submitfile,"Executable = %s" % wrapper.name)
658 job.Write(submitfile,"Log = %s" % job.GetLogFile())
659 job.Write(submitfile,"Output = %s" % filename.replace('condor', 'out'))
660 job.Write(submitfile,"Error = %s" % filename.replace('condor', 'err'))
661
662
663 if job.GetInitialdir():
664 job.Write(submitfile,"Initialdir = %s" % job.GetInitialdir())
665
666 if job.GetInputFiles():
667 inputfile_list = ",".join(job.GetInputFiles())
668 inputfile_list += ","+job.GetExecutable()
669 inputfile_list += ","+filename.replace(".condor", ".input")
670 inputfile_list += ","+filename.replace(".condor", ".output")
671 job.Write(submitfile,"%s = %s" % ("Transfer_Input_Files",inputfile_list))
672
673 if job.GetOutputFiles():
674 outputfile_list = ",".join(job.GetOutputFiles())
675 job.Write(submitfile,"%s = %s" % ("Transfer_Output_Files",outputfile_list))
676
677
678 for key in self.GetParamKeys():
679 if key.lower() == "requirements" and td.GetRequirements():
680 val = td.GetRequirements()
681 else:
682 val = self.GetParam(key)
683 if not key.startswith('-'):
684 job.Write(submitfile, "%s = %s" % (key, val))
685
686
687 for opt in job.GetSteering().GetBatchOpts():
688 if not key.startswith('-'):
689 job.Write(submitfile, "%s = %s" % (opt.GetName(), opt.GetValue()))
690
691 for name, value in self.GetAdditionalOptions(job).items():
692 job.Write(submitfile, "%s = %s" % (name, value))
693
694 argstr = self.GetArguments(job,td,idx,iter)
695 job.Write(submitfile,"Arguments = %s" % argstr,parse=False)
696
697 job.Write(submitfile, "Queue" )
698 submitfile.close();
699
702
710