1
2
3
4 """
5 A basic submitfile for submitting and monitoring jobs to PBS.
6 This module implements only a small subset of PBS features.
7 It's interface is like that of condor however.
8 (http://www.cs.wisc.edu/condor)
9 Inherits from i3Queue
10
11 copyright (c) 2005 the icecube collaboration
12
13 @version: $Revision: $
14 @date: $Date: $
15 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
16 @todo: implement more functionality of pbs.
17 """
18
19 import os
20 import re
21 import sys
22 import math
23 import random
24 import dircache
25 import time
26 import string
27 import logging
28 import os.path
29 import getpass
30 import commands
31 from os import popen2
32 from iceprod.core import metadata
33 from iceprod.core.lex import ExpParser
34 from iceprod.server.grid import iGrid
35 from iceprod.server.job import i3Job, i3Task
36
37 from os.path import expandvars
38 localdb = None
39
40
41
42
43 pbs_status = {'Q':'QUEUED', 'R':'PROCESSING', 'C':'FINISHED'}
44
49
51 """
52 This class represents a job or cluster on a pbs system.
53 """
54
56
57 iGrid.__init__(self)
58 self.proc = 0
59 self.sleeptime = 6
60 self.enqueue_cmd = "qsub"
61 self.checkqueue_cmd = "qstat -u %s " % getpass.getuser()
62 self.queue_rm_cmd = "qdel"
63 self.suffix = "pbs"
64 self.logger = logging.getLogger('PBS')
65
67 from random import choice
68 weighted_qlist = []
69 for q in queue_list:
70 if len(q.split()) > 1:
71 queue,weight = q.split()
72 else:
73 queue,weight = q,1
74 try:
75 weight = int(weight)
76 except Exception,e:
77 self.logger.error("Exception: " + str(e))
78 self.logger.warn("Unable to get queue weight for: " +q)
79 weight = 1
80 self.logger.debug("%s:%u " % (queue,weight))
81 weighted_qlist.extend([queue]*weight)
82 return choice(weighted_qlist)
83
85 """
86 Write pbs submit file to a file.
87 @param job: i3Job object
88 @param config_file: path to file were submit file will be written
89 """
90 self.logger.debug('WriteConfig')
91
92 if not job.GetExecutable():
93 raise Exception, "no executable configured"
94
95 submitfile = open("%s" % config_file,'w')
96 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile())
97 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile())
98
99 job.Write(submitfile,"#!/bin/sh")
100 job.Write(submitfile,"#PBS -o %s" % outfile )
101 job.Write(submitfile,"#PBS -e %s" % errfile )
102
103
104 queue_list = []
105 for key in self.GetParamKeys():
106 if not key.startswith("queue"):
107 job.Write(submitfile,"#PBS %s" % (self.GetParam(key)))
108 else:
109 queue_list.append(self.GetParam(key)[2:])
110
111 if queue_list:
112 chosen_queue = self._choose_queue(queue_list)
113 job.Write(submitfile,"#PBS -q %s" % chosen_queue)
114
115
116 for key,opt in job.GetBatchOpts().items():
117 job.Write(submitfile,"#PBS %s " % opt)
118
119
120 job.Write(submitfile, "export PBS_O_WORKDIR=%s",job.GetInitialdir())
121 job.Write(submitfile, "ARCH=`uname -m | sed -e 's/Power Macintosh/ppc/ ; s/i686/i386/'`")
122 job.Write(submitfile, "PVER=`uname -r | awk '{split($1,a,\".\"); print a[2]}'`")
123 job.Write(submitfile, "if [ $ARCH == 'i386' ]; then",parse=False)
124 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False)
125 job.Write(submitfile, " PLATFORM=Linux-i386")
126 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False)
127 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-i386")
128 job.Write(submitfile, " fi")
129 job.Write(submitfile, "elif [ $ARCH == 'x86_64' ]; then",parse=False)
130 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False)
131 job.Write(submitfile, " PLATFORM=Linux-x86_64")
132 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False)
133 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-x86_64")
134 job.Write(submitfile, " fi")
135 job.Write(submitfile, "fi")
136
137
138 for var in self.env.keys():
139 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False)
140 for var in job.env.keys():
141 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False)
142 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False)
143 job.Write(submitfile,"unset I3SIMPRODPATH")
144
145 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False)
146 job.Write(submitfile,"mkdir -p $RUNDIR")
147 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False)
148
149 for file in job.GetInputFiles()+[job.GetExecutable()]:
150 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False)
151
152 job.Write(submitfile,"cd $RUNDIR",parse=False)
153
154 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments()
155 argstr = job.GetMainScript() + " " + " ".join(argopts)
156 executable = os.path.basename(job.GetExecutable())
157 job.Write(submitfile,
158 "$PYROOT/bin/python %s %s 1>%s.tmp 2>%s.tmp" % (executable,argstr,outfile,errfile),
159 parse=False)
160 job.Write(submitfile, "cat %s.tmp>&1" % outfile, parse=False)
161 job.Write(submitfile, "cat %s.tmp>&2" % errfile, parse=False)
162
163 job.Write(submitfile,"rm -f wgetrc" )
164
165 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'):
166 job.Write(submitfile,"for file in *; do")
167 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir())
168 job.Write(submitfile," mv $file %s" % job.GetInitialdir())
169 job.Write(submitfile," fi; done")
170
171 job.Write(submitfile,"#clean directory")
172 job.Write(submitfile,"cd /tmp")
173 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False)
174 job.Write(submitfile,"exit 0")
175
176 submitfile.close()
177
178
179 - def get_id(self,submit_status):
180 """
181 Parse string returned by condor on submission to extract the
182 id of the job cluster
183
184 @param submit_status: string returned by condor_submit
185 """
186 matches = re.findall(r'[0-9]+\.[0-9a-zA-Z_\-]*', submit_status)
187 self.logger.debug(submit_status)
188 if matches:
189 cluster_info = matches[0].split('.')
190 job_id = cluster_info[0]
191
192 self.job_ids.append(job_id)
193 return job_id
194 else:
195 self.logger.warn('could not parse job id from "%s"' % submit_status)
196 return -1
197
227
228
229
231 """
232 Querie status of cluster or job on condor queue
233 """
234
235 cmd = self.checkqueue_cmd
236 for id in self.job_ids:
237 cmd += " %s" % id
238 status,output = commands.getstatusoutput(cmd)
239 return output
240
241
243 """
244 Querie status of cluster or job on condor queue
245 """
246
247 if not jobs: return 0
248
249 if isinstance(jobs,list):
250 job_list = jobs
251 else:
252 job_list = [jobs]
253
254 job_dict = dict()
255 for job in job_list:
256 for job_id in str(job.GetJobId()).split(":"):
257 job_dict[job_id] = job
258
259 cmd = self.checkqueue_cmd + " | grep iceprod"
260 status,output = commands.getstatusoutput(cmd)
261 if not status:
262 for line in output.split('\n'):
263 if line.strip() == '':
264 continue
265 try:
266 tok = line.split()
267 jobId = tok[0].split(".")[0]
268 user = tok[1]
269 queue = tok[2]
270 executable = tok[3]
271 sid = tok[4]
272 nds = tok[5]
273 tsk = tok[6]
274 memory = tok[7]
275 runtime = tok[8]
276 jobStatus = tok[9]
277 if executable.startswith("iceprod."):
278 if not job_dict.has_key(jobId):
279 self.logger.warn("removing job %s with status %s. Reason: job not found in list" % \
280 (jobId,cstatus(jobStatus)))
281 self.logger.debug("job list [%s]" % str(job_dict.keys()))
282 os.system("%s %s" % (self.queue_rm_cmd,jobId))
283 except Exception,e:
284 self.logger.error("%s:%s" %(e,line))
285 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
286 else:
287 self.logger.error(cmd+": "+output)
288 return status
289
291 """
292 Remove cluster or job from queue
293 """
294 if not jobid: return "Unknown jobid. Cannot remove job."
295 if isinstance(job,i3Job):
296 cmd = "%s %s" % (self.queue_rm_cmd,job.GetJobId())
297 else:
298 cmd = "%s %s" % (self.queue_rm_cmd,job)
299 self.logger.info(cmd)
300
301 handle = os.popen(cmd, 'r')
302 status = string.join(handle.readlines())
303 self.logger.info(status)
304 handle.close()
305 return status
306
308 """
309 This class represents a job that executes in multiple parts using a DAG.
310 """
311
313 Pbs.__init__(self)
314 self.enqueue_cmd = "/bin/bash"
315 self.localdb = None
316 self.logger = logging.getLogger('PbsDAG')
317 try:
318 import sqlite3
319 self.localdb = sqlite3.connect(expandvars("$I3PROD/shared/localqueue.db"))
320 except:
321 self.logger.error("sqlite3 missing. will try sqlite.")
322 try:
323 import sqlite
324 self.localdb = sqlite.connect(expandvars("$I3PROD/shared/localqueue.db"))
325 except:
326 self.logger.error("sqlite missing. won't try to mantain queue sanity")
327 if self.localdb:
328 cursor = self.localdb.cursor()
329 try:
330 cursor.execute('CREATE TABLE IF NOT EXISTS queue (parent_id VARCHAR(80), child_id VARCHAR(80), status VARCHAR(80))')
331 except Exception,e:
332 self.logger.error(e)
333 else:
334 self.localdb.commit()
335 cursor.close()
336
340
341
352
354 """
355 Write condor submit file to a file.
356 @param job: i3Job object
357 @param config_file: path to file where submit file will be written
358 """
359
360 if not job.GetExecutable():
361 raise Exception, "no executable configured"
362
363 from iceprod.core.dataclasses import IceTrayConfig
364
365 db = self.GetMonitorDB()
366
367 steering = job.GetSteering()
368 task_defs = steering.GetTaskDefinitions()
369 self.logger.debug("Task definitions: %s" % task_defs)
370 if not len(task_defs):
371
372 self.logger.debug("No tasks specified in config file; doing regular submit")
373 self.enqueue_cmd = "qsub"
374 return PBS.WriteConfig(self, job, config_file)
375
376 dagfile = open(config_file,'w')
377 job.Write(dagfile, "#!/bin/bash")
378 job_id = job.GetDatabaseId()
379 job.dag = dict()
380
381 file_catalog = {}
382 for taskname,td in task_defs.items():
383 args = self.GetArguments(job,td,output="dict")
384 file_catalog[taskname] = self.GetFiles(job,td,args)
385 job.dag[taskname] = None
386
387 for taskname,td in task_defs.items():
388 filename = config_file.replace('.pbs',".%s.pbs" % taskname)
389 td_id = td.GetId()
390 self.logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, filename))
391 tmp_parents = self.EnumerateParentNodes(steering,td)
392 parents = []
393
394 for parent in tmp_parents:
395 parent_td = steering.GetTaskDefinition(parent)
396 if not db.task_is_finished(parent_td.GetId(), job.GetDatabaseId()):
397 self.logger.error("task id %s, '%s' job id %s" % (parent_td.GetId(),parent_td.GetName(),job.GetDatabaseId()))
398 parents.append(parent)
399 job.dag[taskname] = parents
400 if td.ParallelExecutionEnabled():
401 trays = td.GetTrays()
402 for idx,tray in trays.items():
403 for iter in tray.GetIters():
404 if not iter == -1:
405 nodename = "%s_%u" % (taskname,iter)
406 else:
407 nodename = "%s_ext" % taskname
408 filename = config_file.replace('.pbs',".%s.pbs" % nodename)
409 done = db.task_is_finished(td_id, job_id, idx, iter)
410 args = self.GetArguments(job,td,idx,iter,output="dict")
411
412 input,output,notes = self.GetFiles(job,td,args,idx,iter,file_catalog)
413 self.WriteFileManifest(job,filename,input,output,notes)
414 self.WriteSubmitFile(job,filename,td,idx,iter)
415 self.WriteDAGNode(job,dagfile,nodename,filename,parents,done)
416 else:
417 done = db.task_is_finished(td_id, job_id)
418 if taskname == 'trashcan':
419 done = False
420 args = self.GetArguments(job,td,output="dict")
421
422 input,output,notes = self.GetFiles(job,td,args,catalog=file_catalog)
423 self.WriteFileManifest(job,filename,input,output,notes)
424 self.WriteSubmitFile(job,filename,td)
425 self.WriteDAGNode(job,dagfile,taskname,filename,parents,done)
426 db.commit()
427
428 self.FinishDAGNode(job,dagfile)
429
431 return bool(re.match("[^/:]+://?.*$", path))
432
433 - def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False):
434 steering = job.GetSteering()
435 parser = ExpParser(args,steering)
436 if idx is not False:
437 td_trays = {idx: td.GetTray(idx)}
438 else:
439 td_trays = td.GetTrays()
440
441 if steering.GetSysOpt("dagtemp"):
442 tmp_dir = parser.parse(steering.GetSysOpt("dagtemp").GetValue())
443 elif args.has_key('dagtemp'):
444 tmp_dir = parser.parse(args['dagtemp'])
445 else:
446 tmp_dir = 'file:///tmp'
447
448 if args.has_key('fetch'):
449 global_dir = args['fetch']
450 else:
451 global_dir = 'file:///tmp'
452
453 td_input = {}
454 td_output = {}
455 notes = {}
456
457 if td.IsCleanup():
458
459 return (td_input, td_output, notes)
460
461 for idx, td_tray in td_trays.iteritems():
462 args['tray'] = idx
463
464 self.logger.info("GetTray(%s)" % idx)
465 icetray = steering.GetTray(idx)
466
467 input_files = icetray.GetInputFiles()
468 parsed_input = []
469
470 output_files = icetray.GetOutputFiles()
471 parsed_output = []
472
473 if iter is not False:
474 iters = [iter]
475 else:
476 iters = td_tray.GetIters()
477 for iter in iters:
478 args['iter'] = iter
479 for d in steering.GetDependencies():
480 d_file = parser.parse(d)
481 if not td_input.has_key(d_file):
482 location = d_file
483 if not self.IsUrl(location):
484 location = os.path.join(global_dir, location)
485 td_input[os.path.basename(d_file)] = [location]
486 for i_file in input_files:
487 name = i_file.GetName()
488 name = parser.parse(name)
489 if not td_input.has_key(name) \
490 and not td_output.has_key(name):
491 if catalog:
492 node = self.FindFile(steering,td,catalog,name)
493 else:
494 node = td.GetName()
495
496 note = False
497 if node == "global":
498 location = global_dir
499 note = "global"
500 if node != "global":
501 location = tmp_dir
502 location = os.path.join(location, str(job.GetDatasetId()))
503 location = os.path.join(location, str(job.GetProcNum()))
504 location = os.path.join(location, node)
505 note = "dontextract"
506 location = os.path.join(location, name)
507 if i_file.IsPhotonicsTable():
508 note = "photonics";
509 if note:
510 notes[name] = note
511 td_input[name] = [location]
512 for o_file in output_files:
513 name = o_file.GetName()
514 name = parser.parse(name)
515 if not td_output.has_key(name):
516 location = os.path.join(tmp_dir, str(job.GetDatasetId()))
517 location = os.path.join(location, str(job.GetProcNum()))
518 location = os.path.join(location, str(td.GetName()))
519 location = os.path.join(location, name)
520 td_output[name] = [location]
521
522 return (td_input, td_output, notes)
523
524 - def FindFile(self,steering,td,catalog,file):
525 parents = td.GetParents()
526
527
528 for parent in parents:
529 if catalog.has_key(parent):
530 if catalog[parent][1].has_key(file):
531 return parent
532
533
534 for parent in parents:
535 parent_td = steering.GetTaskDefinition(parent)
536 result = self.FindFile(steering,parent_td,catalog,file)
537 if result != "global":
538 return result
539
540 return "global"
541
543 self.logger.debug("Input files: %s" % input)
544 in_manifest = open(filename.replace(".pbs", ".input"), 'w')
545 job.manifest_in = in_manifest.name
546 if len(input):
547 padding = str(max(map(lambda x: max(map(len, x)), input.values())))
548 fmt_str = "%-" + padding + "s %s"
549 for i_file, locs in input.items():
550 for loc in locs:
551 file = fmt_str % (loc, i_file)
552 if notes.has_key(i_file):
553 file += "\t%s" % notes[i_file]
554 job.Write(in_manifest, file)
555 in_manifest.close()
556 self.logger.debug("Output files: %s" % output)
557 out_manifest = open(filename.replace(".pbs", ".output"), 'w')
558 job.manifest_out = out_manifest.name
559 if len(output):
560 padding = str(max(map(lambda x: max(map(len, x)), input.values())))
561 fmt_str = "%-" + padding + "s %s"
562 for o_file, locs in output.items():
563 for loc in locs:
564 job.Write(out_manifest, fmt_str % (o_file, loc))
565 out_manifest.close()
566
567 - def WriteDAGNode(self,job,dagfile,nodename,filename,parents=None,done=False):
568
569 if done: return
570 job.Write(dagfile, "%s=`qsub \\" % (nodename))
571 for parent in parents:
572 job.Write(dagfile, " -W depend=afterok:$%s\\" % (parent),parse=False)
573 job.Write(dagfile, " %s`" % (filename))
574 job.Write(dagfile, 'if [ -z $ICEPROD_JOB_ID_LIST ]; then ',parse=False)
575 job.Write(dagfile, ' ICEPROD_JOB_ID_LIST="${%s}";' % (nodename),parse=False)
576 job.Write(dagfile, 'else ')
577 job.Write(dagfile, ' ICEPROD_JOB_ID_LIST="${ICEPROD_JOB_ID_LIST}:${%s}";' % (nodename),parse=False)
578 job.Write(dagfile, 'fi')
579
581 job.Write(dagfile, 'echo "<job_id>${ICEPROD_JOB_ID_LIST}</job_id>";',parse=False)
582 job.Write(dagfile, "exit 0;")
583 dagfile.close()
584
585 - def get_id(self,submit_status):
586 """
587 Parse string returned by pbs on submission to extract the
588 id of the job cluster
589 @param submit_status: string returned by condor_submit
590 """
591 matches = re.findall(r'<job_id>.*</job_id>', submit_status)
592 self.logger.debug(submit_status)
593 if matches:
594 job_ids = map(lambda x: x.split('.')[0],matches[0].strip('<job_id>').strip('</job_id>').split(":"))
595 dag_id = job_ids[0]
596 cursor = self.localdb.cursor()
597 for job_id in job_ids:
598 sql = 'INSERT INTO `queue` (parent_id,child_id,status)'
599 sql += ' VALUES ("%s","%s","QUEUED") ' % (dag_id,job_id)
600 self.logger.info(sql)
601 cursor.execute(sql)
602 self.localdb.commit()
603 self.job_ids.append(dag_id)
604 return dag_id
605 else:
606 self.logger.warn('could not parse job id from "%s"' % submit_status)
607 return Pbs.get_id(self,submit_status)
608 return -1
609
610
611
628
629 - def GetArguments(self,job,td,idx=False,iter=False,output="str"):
638
640 args = {}
641 for str in argstr.split(" "):
642 str = str[2:]
643 pieces = str.split("=")
644 if len(pieces) == 1:
645 args[str] = 1
646 else:
647 args[pieces[0]] = pieces[1]
648 return args
649
652
654 steering = job.GetSteering()
655 args = self.GetArguments(job,td,idx,iter,output="dict")
656 parser = ExpParser(args,steering)
657
658 submitfile = open(filename,'w')
659 outfile = filename.replace('.pbs','.out')
660 errfile = filename.replace('.pbs','.err')
661
662 job.Write(submitfile,"#!/bin/sh")
663 job.Write(submitfile,"#PBS -o %s" % outfile )
664 job.Write(submitfile,"#PBS -e %s" % errfile )
665
666
667
668 queue_list = []
669 for key in self.GetParamKeys():
670 if not key.startswith("queue"):
671 job.Write(submitfile,"#PBS %s" % (self.GetParam(key)))
672 else:
673 queue_list.append(self.GetParam(key)[2:])
674
675 td_batch_opts = td.GetBatchOpts()
676 self.logger.debug(td_batch_opts)
677 try:
678 td_batch_opts = parser.parse(td_batch_opts)
679 except:
680 self.logger.warn("could not parse %s" % td_batch_opts)
681 td_batch_opts = ""
682 if td_batch_opts:
683 for opt in map(string.strip,td_batch_opts.split(";")):
684 job.Write(submitfile,"#PBS %s" % opt)
685 if opt.startswith("-q"): queue_list = []
686
687 if queue_list:
688 chosen_queue = self._choose_queue(queue_list)
689 job.Write(submitfile,"#PBS -q %s" % chosen_queue)
690
691
692 for key,opt in job.GetBatchOpts().items():
693 job.Write(submitfile,"#PBS %s " % opt)
694
695 job.Write(submitfile, "")
696 job.Write(submitfile, "ARCH=`uname -m | sed -e 's/Power Macintosh/ppc/ ; s/i686/i386/'`")
697 job.Write(submitfile, "PVER=`uname -r | awk '{split($1,a,\".\"); print a[2]}'`")
698 job.Write(submitfile, "if [ $ARCH == 'i386' ]; then",parse=False)
699 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False)
700 job.Write(submitfile, " PLATFORM=Linux-i386")
701 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False)
702 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-i386")
703 job.Write(submitfile, " fi")
704 job.Write(submitfile, "elif [ $ARCH == 'x86_64' ]; then",parse=False)
705 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False)
706 job.Write(submitfile, " PLATFORM=Linux-x86_64")
707 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False)
708 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-x86_64")
709 job.Write(submitfile, " fi")
710 job.Write(submitfile, "fi")
711
712 for var in self.env.keys():
713 job.Write(submitfile, "export %s=%s" % (var, self.env[var]), parse=False)
714 for var in job.env.keys():
715 job.Write(submitfile, "export %s=%s" % (var, job.env[var]), parse=False)
716 job.Write(submitfile, "RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False)
717 job.Write(submitfile, "mkdir -p $RUNDIR")
718
719 for file in job.GetInputFiles()+[job.GetExecutable()]:
720 job.Write(submitfile, "cp %s $RUNDIR" % file,parse=False)
721
722
723 job.Write(submitfile, "cp %s $RUNDIR" % job.manifest_in)
724 job.Write(submitfile, "cp %s $RUNDIR" % job.manifest_out)
725
726 job.Write(submitfile, "cd $RUNDIR",parse=False)
727
728 job.Write(submitfile, "echo \"running on $HOSTNAME:$RUNDIR\"",parse=False)
729 argstr = job.GetMainScript() + " " + self.GetArguments(job,td,idx,iter,output="string")
730 executable = os.path.basename(job.GetExecutable())
731 job.Write(submitfile,
732 "$PYROOT/bin/python -u %s %s 1>%s.tmp 2>%s.tmp" % (executable,argstr,outfile,errfile),
733 parse=False)
734 job.Write(submitfile, "RETVAL=$?", parse=False)
735
736 job.Write(submitfile, "cat %s.tmp>&1" % outfile, parse=False)
737 job.Write(submitfile, "cat %s.tmp>&2" % errfile, parse=False)
738 job.Write(submitfile,"rm -f wgetrc" )
739
740 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'):
741 job.Write(submitfile,"for file in *; do")
742 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir())
743 job.Write(submitfile," mv $file %s" % job.GetInitialdir())
744 job.Write(submitfile," fi; done")
745
746 job.Write(submitfile,"#clean directory")
747 job.Write(submitfile,"cd /tmp")
748 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False)
749 job.Write(submitfile,"exit $RETVAL")
750 submitfile.close()
751 os.system("chmod +x %s" % submitfile.name)
752
753
755 """
756 Querie status of job on PBS queue
757 """
758 self.logger.info("checking jobs in queue")
759 if not self.localdb: return 1
760
761 if isinstance(jobs,list):
762 job_list = jobs
763 else:
764 job_list = [jobs]
765 for job in job_list:
766 job.SetStatus('?')
767 if not job.GetJobId(): continue
768
769 cursor = self.localdb.cursor()
770 sql = 'SELECT parent_id,child_id,status FROM `queue` WHERE parent_id = "%s" ' % job.GetJobId()
771 cursor.execute(sql)
772
773 jobentries = cursor.fetchall()
774 if len(jobentries) == 0:
775 jobentries = [(job.GetJobId(),job.GetJobId(),'?')]
776 for dag_id,job_id, status in jobentries:
777 cmd = "qstat -f %s " % job_id
778 self.logger.debug(cmd)
779 status,output = commands.getstatusoutput(cmd)
780 if status:
781 self.logger.error("%s: %s: %s" % (cmd,status,output))
782 if job.GetStatus() == '?':
783 job.SetStatus('FINISHED')
784 else:
785 for line in output.split('\n'):
786 line = line.strip()
787 if line.startswith('job_state'):
788 jobstatus = cstatus(line.split('=')[1].strip())
789 if jobstatus == 'PROCESSING' or job.GetStatus() not in ['PROCESSING','QUEUED']:
790 job.SetStatus(jobstatus)
791 if line.startswith('exec_host'):
792 host = line.split('=')[1].strip()
793 job.SetHost(host)
794
795
796 if job.GetStatus() == "FINISHED":
797 cursor.execute('DELETE FROM queue WHERE parent_id = "%s"' % dag_id)
798 self.localdb.commit()
799 return 1
800
802 """
803 Remove cluster or job from queue
804 """
805 retval = 0
806 self.logger.info("checking jobs in queue")
807 if not self.localdb:
808 return Pbs.QRemove(self,job)
809
810 cursor = self.localdb.cursor()
811 sql = 'SELECT parent_id,child_id,status FROM `queue` WHERE parent_id = "%s" ' % job.job_id
812 cursor.execute(sql)
813
814 jobentries = cursor.fetchall()
815 if len(jobentries) == 0:
816 jobentries = [(job.job_id,job.job_id,'?')]
817 for dag_id,job_id, status in jobentries:
818 task = i3Task()
819 task.job_id = job_id
820 retval = Pbs.QRemove(self,task)
821
822 cursor.execute('DELETE FROM queue WHERE parent_id = "%s"' % job.job_id)
823 self.localdb.commit()
824 return retval
825
826
828 """
829 Not implemeted for DAG yet
830 """
831 if not self.localdb:
832 self.logger.warn("CleanQ: no local database found.")
833 return 0
834 else:
835 if not isinstance(jobs,list): jobs = [jobs]
836 cursor = self.localdb.cursor()
837 sql = 'SELECT child_id FROM `queue`'
838 cursor.execute(sql)
839 jobentries = cursor.fetchall()
840 for jobid in jobentries:
841 job = i3Job()
842 job.SetJobId(jobid[0])
843 jobs.append(job)
844 return Pbs.CleanQ(self,jobs)
845
846
847
850
851
852 import dag
853
868