1
2
3
4 """
5 A basic submitfile for submitting and monitoring jobs to SGE.
6 This module implements only a small subset of SGE features.
7 It's interface is like that of condor however.
8 Inherits from i3Queue
9
10 copyright (c) 2005 the icecube collaboration
11
12 @version: $Revision: $
13 @date: $Date: $
14 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
15 @todo: implement more functionality of sge.
16 """
17
18 import os
19 import re
20 import sys
21 import math
22 import random
23 import dircache
24 import time
25 import string
26 import logging
27 import os.path
28 import getpass
29 import commands
30 from os import popen2
31 from iceprod.core import metadata
32 from iceprod.core.lex import ExpParser
33 from iceprod.server.grid import iGrid
34 from iceprod.server.job import i3Job, i3Task
35
36 from os.path import expandvars
37 localdb = None
38
39 sge_status = {'qw':'QUEUED', 'r':'PROCESSING', 'hqw': 'QUEUED'}
40
45
47 """
48 This class represents a job or cluster on a sge system.
49 """
50
52
53 iGrid.__init__(self)
54 self.proc = 0
55 self.sleeptime = 30
56 self.enqueue_cmd = "qsub"
57 self.checkqueue_cmd = "qstat -u %s " % getpass.getuser()
58 self.queue_rm_cmd = "qdel"
59 self.suffix = "sge"
60 self.logger = logging.getLogger('SGEDAG')
61 self.logger.debug('Made a SGE(iGrid)')
62
63
65 from random import choice
66 weighted_qlist = []
67 for q in queue_list:
68 if len(q.split()) > 1:
69 queue,weight = q.split()
70 else:
71 queue,weight = q,1
72 try:
73 weight = int(weight)
74 except Exception,e:
75 self.logger.error("Exception: " + str(e))
76 self.logger.warn("Unable to get queue weight for: " +q)
77 weight = 1
78 self.logger.debug("%s:%u " % (queue,weight))
79 weighted_qlist.extend([queue]*weight)
80 return choice(weighted_qlist)
81
83 """
84 Write sge submit file to a file.
85 @param job: i3Job object
86 @param config_file: path to file were submit file will be written
87 """
88 self.logger.debug('WriteConfig')
89
90 if not job.GetExecutable():
91 raise Exception, "no executable configured"
92
93 submitfile = open("%s" % config_file,'w')
94 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile())
95 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile())
96
97 job.Write(submitfile,"#!/bin/sh")
98 self.logger.debug("#!/bin/sh")
99
100 job.Write(submitfile,"#$ -o %s" % outfile)
101 self.logger.debug("#$ -o %s" % outfile)
102
103 job.Write(submitfile,"#$ -e %s" % errfile )
104 self.logger.debug("#$ -e %s" % errfile )
105
106
107 queue_list = []
108 for key in self.GetParamKeys():
109 if not key.startswith("queue"):
110 job.Write(submitfile,"#$ %s" % self.GetParam(key),parse=True)
111 else:
112 queue_list.append(self.GetParam(key)[2:])
113
114 if queue_list:
115 chosen_queue = self._choose_queue(queue_list)
116 job.Write(submitfile,"#$ -q %s" % chosen_queue)
117
118
119 for key,opt in job.GetBatchOpts().items():
120 job.Write(submitfile,"#$ %s" % opt,parse=True)
121 self.logger.debug("#$ %s" % opt)
122
123
124 for var in self.env.keys():
125 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False)
126 for var in job.env.keys():
127 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False)
128 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False)
129 job.Write(submitfile,"unset I3SIMPRODPATH")
130 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False)
131 job.Write(submitfile,"mkdir -p $RUNDIR",parse=False)
132 self.logger.debug("RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()))
133 self.logger.debug("mkdir -p $RUNDIR")
134
135 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False)
136 self.logger.debug("echo \"running on $HOSTNAME:$RUNDIR\"")
137
138 self.logger.debug('%d' %len(job.GetInputFiles()))
139 for file in job.GetInputFiles()+[job.GetExecutable()]:
140 self.logger.debug('%s' %file)
141 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False)
142 job.Write(submitfile,"cd $RUNDIR",parse=False)
143
144 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments()
145 argstr = job.GetMainScript() + " " + " ".join(argopts)
146 executable = os.path.basename(job.GetExecutable())
147 self.logger.debug('executable: %s' % job.GetExecutable())
148 self.logger.debug('main script: %s' % job.GetMainScript())
149 self.logger.debug('args options: %s' % argopts)
150 self.logger.debug('arguments: %s' % job.GetArguments())
151 job.Write(submitfile, "$PYROOT/bin/python %s %s" % (executable, argstr), parse=False)
152 job.Write(submitfile, 'echo "job exited with status $?";',parse=False)
153 job.Write(submitfile, "cat %s.tmp>&1" % outfile, parse=False)
154 job.Write(submitfile, "cat %s.tmp>&2" % errfile, parse=False)
155 job.Write(submitfile,"rm -f wgetrc" )
156
157 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'):
158 job.Write(submitfile,"for file in *; do")
159 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir())
160 job.Write(submitfile," mv $file %s" % job.GetInitialdir())
161 job.Write(submitfile," fi; done")
162
163 job.Write(submitfile,"#clean directory")
164 job.Write(submitfile,"cd /tmp")
165 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False)
166 job.Write(submitfile,"exit 0")
167 self.logger.debug('Submit file written')
168 submitfile.close()
169
170 - def get_id(self,submit_status):
171 """
172 Parse string returned by condor on submission to extract the
173 id of the job cluster
174
175 @param submit_status: string returned by condor_submit
176 """
177 matches = re.findall(r'Your job [0-9]+', submit_status)
178 self.logger.debug(submit_status)
179 if matches:
180 cluster_info = matches[0].split()
181 job_id = cluster_info[-1]
182
183 self.job_ids.append(job_id)
184 return job_id
185 else:
186 self.logger.warn('could not parse job id from "%s"' % submit_status)
187 return -1
188
190 """
191 Querie status of job on condor queue
192 """
193 if isinstance(jobs,list):
194 job_list = jobs
195 else:
196 job_list = [jobs]
197
198 job_dict = {}
199 self.logger.info("beggining of CheckJobStatus")
200
201 for job in job_list:
202 self.logger.info("checking job status: job id %s for dataset %d", job.GetJobId(), job.GetDatasetId())
203
204 if job.GetJobId() < 0: continue
205
206 for job_id in job.GetJobId().split(" "):
207 job_dict[job_id] = job
208 job.SetStatus('FINISHED')
209
210 cmd = self.checkqueue_cmd
211 self.logger.debug(cmd)
212 retval,output = commands.getstatusoutput(cmd)
213
214 if retval:
215 for job in job_list: job.SetStatus('?')
216 return retval
217
218 for line in output.split('\n')[2:]:
219 try:
220 tok = line.split()
221 jobId = tok[0]
222 prio = tok[1]
223 name = tok[2]
224 user = tok[3]
225 jobStatus = tok[4]
226 runtime = tok[5]
227 queue = tok[6]
228 self.logger.debug("jobid:%s" %jobId)
229 if jobId in job_dict.keys():
230 self.logger.debug("status for jobid %s is %s" %(jobId,jobStatus))
231 status = cstatus(jobStatus)
232 job_dict[jobId].SetStatus(status)
233 except Exception,e:
234 self.logger.error("%s:%s" %(e,line))
235 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
236 break
237
238 return 1
239
241 """
242 Querie status of cluster or job on condor queue
243 """
244
245 cmd = self.checkqueue_cmd
246 for id in self.job_ids:
247 cmd += " %s" % id
248 status,output = commands.getstatusoutput(cmd)
249 return output
250
251
253 """
254 Querie status of cluster or job on condor queue
255 """
256
257 self.logger.info("beggining of CleanQ")
258 if not jobs: return 0
259
260 if isinstance(jobs,list):
261 job_list = jobs
262 else:
263 job_list = [jobs]
264 job_dict = dict()
265
266
267 for job in job_list:
268 self.logger.info("job id: %s", job.GetJobId())
269 if job.GetJobId() < 0: continue
270
271 for job_id in job.GetJobId().split(" "):
272 job_dict[job_id] = job
273
274
275 cmd = self.checkqueue_cmd
276 status,output = commands.getstatusoutput(cmd)
277 if not status:
278 for line in output.split('\n')[2:]:
279 if line.strip() == '':
280 continue
281 try:
282 tok = line.split()
283 jobId = tok[0]
284 prio = tok[1]
285 name = tok[2]
286 user = tok[3]
287 jobStatus = tok[4]
288 runtime = tok[5]
289 queue = tok[6]
290 if name.startswith("iceprod.") and not job_dict.has_key(jobId):
291 self.logger.warn("removing job %s with status %s. Reason: job not found in list" % \
292 (jobId,cstatus(jobStatus)))
293 self.logger.debug("job list [%s]" % str(job_dict.keys()))
294 os.system("%s %s" % (self.queue_rm_cmd,jobId))
295 except Exception,e:
296 self.logger.error("%s:%s" %(e,line))
297 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
298
299 return status
300
301
303 """
304 Remove cluster or job from queue
305 """
306 if isinstance(job,i3Job) and job.GetStatus() == "FINISHED":
307 return 0
308
309 cmd = "%s %s" % (self.queue_rm_cmd,job.job_id)
310 status,output = commands.getstatusoutput(cmd)
311 return status
312
313
315 """
316 This class represents a job that executes in multiple parts using a DAG.
317 """
318
320 SGE.__init__(self)
321 self.enqueue_cmd = "/bin/bash"
322 self.localdb = None
323 self.logger = logging.getLogger('SGEDAG')
324 try:
325 import sqlite3
326 self.localdb = sqlite3.connect(expandvars("$I3PROD/shared/localqueue.db"))
327 except:
328 self.logger.error("sqlite3 missing. will try sqlite.")
329 try:
330 import sqlite
331 self.localdb = sqlite.connect(expandvars("$I3PROD/shared/localqueue.db"))
332 except:
333 self.logger.error("sqlite missing. won't try to mantain queue sanity")
334 if self.localdb:
335 cursor = self.localdb.cursor()
336 try:
337 cursor.execute('CREATE TABLE IF NOT EXISTS queue (parent_id VARCHAR(80), child_id VARCHAR(80), status VARCHAR(80))')
338 except Exception,e:
339 self.logger.error(e)
340 else:
341 self.localdb.commit()
342 cursor.close()
343
347
348
359
361 """
362 Write condor submit file to a file.
363 @param job: i3Job object
364 @param config_file: path to file where submit file will be written
365 """
366
367 if not job.GetExecutable():
368 raise Exception, "no executable configured"
369
370 from iceprod.core.dataclasses import IceTrayConfig
371
372 db = self.GetMonitorDB()
373
374 steering = job.GetSteering()
375 task_defs = steering.GetTaskDefinitions()
376 self.logger.debug("Task definitions: %s" % task_defs)
377 if not len(task_defs):
378
379 self.logger.debug("No tasks specified in config file; doing regular submit")
380 self.enqueue_cmd = "qsub"
381 return SGE.WriteConfig(self, job, config_file)
382
383 dagfile = open(config_file,'w')
384 job.Write(dagfile, "#!/bin/bash")
385 job_id = job.GetDatabaseId()
386 job.dag = dict()
387
388 file_catalog = {}
389 for taskname,td in task_defs.items():
390 args = self.GetArguments(job,td,output="dict")
391 file_catalog[taskname] = self.GetFiles(job,td,args)
392 job.dag[taskname] = None
393
394 for taskname,td in task_defs.items():
395 filename = config_file.replace('.sge',".%s.sge" % taskname)
396 td_id = td.GetId()
397 self.logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, filename))
398 tmp_parents = self.EnumerateParentNodes(steering,td)
399 parents = []
400
401 for parent in tmp_parents:
402 parent_td = steering.GetTaskDefinition(parent)
403 if not db.task_is_finished(parent_td.GetId(), job.GetDatabaseId()):
404 self.logger.error("task id %s, '%s' job id %s" % (parent_td.GetId(),parent_td.GetName(),job.GetDatabaseId()))
405 parents.append(parent)
406 job.dag[taskname] = parents
407 if td.ParallelExecutionEnabled():
408 trays = td.GetTrays()
409 for idx,tray in trays.items():
410 for iter in tray.GetIters():
411 if not iter == -1:
412 nodename = "%s_%u" % (taskname,iter)
413 else:
414 nodename = "%s_ext" % taskname
415 filename = config_file.replace('.sge',".%s.sge" % nodename)
416 done = db.task_is_finished(td_id, job_id, idx, iter)
417 args = self.GetArguments(job,td,idx,iter,output="dict")
418
419 input,output,notes = self.GetFiles(job,td,args,idx,iter,file_catalog)
420 self.WriteFileManifest(job,filename,input,output,notes)
421 self.WriteSubmitFile(job,filename,td,idx,iter)
422 self.WriteDAGNode(job,dagfile,nodename,filename,parents,done)
423 else:
424 done = db.task_is_finished(td_id, job_id)
425 if taskname == 'trashcan':
426 done = False
427 args = self.GetArguments(job,td,output="dict")
428
429 input,output,notes = self.GetFiles(job,td,args,catalog=file_catalog)
430 self.WriteFileManifest(job,filename,input,output,notes)
431 self.WriteSubmitFile(job,filename,td)
432 self.WriteDAGNode(job,dagfile,taskname,filename,parents,done)
433 db.commit()
434
435 self.FinishDAGNode(job,dagfile)
436
438 return bool(re.match("[^/:]+://?.*$", path))
439
440 - def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False):
441 steering = job.GetSteering()
442 parser = ExpParser(args,steering)
443 if idx is not False:
444 td_trays = {idx: td.GetTray(idx)}
445 else:
446 td_trays = td.GetTrays()
447
448 if steering.GetSysOpt("dagtemp"):
449 tmp_dir = parser.parse(steering.GetSysOpt("dagtemp").GetValue())
450 elif args.has_key('dagtemp'):
451 tmp_dir = parser.parse(args['dagtemp'])
452 else:
453 tmp_dir = 'file:///tmp'
454
455 if args.has_key('fetch'):
456 global_dir = args['fetch']
457 else:
458 global_dir = 'file:///tmp'
459
460 td_input = {}
461 td_output = {}
462 notes = {}
463
464 if td.IsCleanup():
465
466 return (td_input, td_output, notes)
467
468 for idx, td_tray in td_trays.iteritems():
469 args['tray'] = idx
470
471 self.logger.info("GetTray(%s)" % idx)
472 icetray = steering.GetTray(idx)
473
474 input_files = icetray.GetInputFiles()
475 parsed_input = []
476
477 output_files = icetray.GetOutputFiles()
478 parsed_output = []
479
480 if iter is not False:
481 iters = [iter]
482 else:
483 iters = td_tray.GetIters()
484 for iter in iters:
485 args['iter'] = iter
486 for d in steering.GetDependencies():
487 d_file = parser.parse(d)
488 if not td_input.has_key(d_file):
489 location = d_file
490 if not self.IsUrl(location):
491 location = os.path.join(global_dir, location)
492 td_input[os.path.basename(d_file)] = [location]
493 for i_file in input_files:
494 name = i_file.GetName()
495 name = parser.parse(name)
496 if not td_input.has_key(name) \
497 and not td_output.has_key(name):
498 if catalog:
499 node = self.FindFile(steering,td,catalog,name)
500 else:
501 node = td.GetName()
502
503 note = False
504 if node == "global":
505 location = global_dir
506 note = "global"
507 if node != "global":
508 location = tmp_dir
509 location = os.path.join(location, str(job.GetDatasetId()))
510 location = os.path.join(location, str(job.GetProcNum()))
511 location = os.path.join(location, node)
512 note = "dontextract"
513 location = os.path.join(location, name)
514 if i_file.IsPhotonicsTable():
515 note = "photonics";
516 if note:
517 notes[name] = note
518 td_input[name] = [location]
519 for o_file in output_files:
520 name = o_file.GetName()
521 name = parser.parse(name)
522 if not td_output.has_key(name):
523 location = os.path.join(tmp_dir, str(job.GetDatasetId()))
524 location = os.path.join(location, str(job.GetProcNum()))
525 location = os.path.join(location, str(td.GetName()))
526 location = os.path.join(location, name)
527 td_output[name] = [location]
528
529 return (td_input, td_output, notes)
530
531 - def FindFile(self,steering,td,catalog,file):
532 parents = td.GetParents()
533
534
535 for parent in parents:
536 if catalog.has_key(parent):
537 if catalog[parent][1].has_key(file):
538 return parent
539
540
541 for parent in parents:
542 parent_td = steering.GetTaskDefinition(parent)
543 result = self.FindFile(steering,parent_td,catalog,file)
544 if result != "global":
545 return result
546
547 return "global"
548
550 self.logger.debug("Input files: %s" % input)
551 in_manifest = open(filename.replace(".sge", ".input"), 'w')
552 job.manifest_in = in_manifest.name
553 if len(input):
554 padding = str(max(map(lambda x: max(map(len, x)), input.values())))
555 fmt_str = "%-" + padding + "s %s"
556 for i_file, locs in input.items():
557 for loc in locs:
558 file = fmt_str % (loc, i_file)
559 if notes.has_key(i_file):
560 file += "\t%s" % notes[i_file]
561 job.Write(in_manifest, file)
562 in_manifest.close()
563 self.logger.debug("Output files: %s" % output)
564 out_manifest = open(filename.replace(".sge", ".output"), 'w')
565 job.manifest_out = out_manifest.name
566 if len(output):
567 padding = str(max(map(lambda x: max(map(len, x)), input.values())))
568 fmt_str = "%-" + padding + "s %s"
569 for o_file, locs in output.items():
570 for loc in locs:
571 job.Write(out_manifest, fmt_str % (o_file, loc))
572 out_manifest.close()
573
574 - def WriteDAGNode(self,job,dagfile,nodename,filename,parents=None,done=False):
575
576 if done: return
577 job.Write(dagfile, "%s=`qsub \\" % (nodename))
578 for parent in parents:
579 job.Write(dagfile, " -hold_jid $%s\\" % (parent),parse=False)
580 job.Write(dagfile, " %s| grep -o -e 'Your job [0-9]*'| awk '{ print $3}'`" % (filename))
581 job.Write(dagfile, 'if [ -z $ICEPROD_JOB_ID_LIST ]; then ',parse=False)
582 job.Write(dagfile, ' ICEPROD_JOB_ID_LIST="${%s}";' % (nodename),parse=False)
583 job.Write(dagfile, 'else ')
584 job.Write(dagfile, ' ICEPROD_JOB_ID_LIST="${ICEPROD_JOB_ID_LIST}:${%s}";' % (nodename),parse=False)
585 job.Write(dagfile, 'fi')
586
588 job.Write(dagfile, 'echo "<job_id>${ICEPROD_JOB_ID_LIST}</job_id>";',parse=False)
589 job.Write(dagfile, "exit 0;")
590 dagfile.close()
591
592 - def get_id(self,submit_status):
593 """
594 Parse string returned by sge on submission to extract the
595 id of the job cluster
596 @param submit_status: string returned by condor_submit
597 """
598 matches = re.findall(r'<job_id>.*</job_id>', submit_status)
599 self.logger.debug(submit_status)
600 if matches:
601 job_ids = map(lambda x: x.split('.')[0],matches[0].strip('<job_id>').strip('</job_id>').split(":"))
602 dag_id = job_ids[0]
603 cursor = self.localdb.cursor()
604 for job_id in job_ids:
605 sql = 'INSERT INTO `queue` (parent_id,child_id,status)'
606 sql += ' VALUES ("%s","%s","QUEUED") ' % (dag_id,job_id)
607 self.logger.info(sql)
608 cursor.execute(sql)
609 self.localdb.commit()
610 self.job_ids.append(dag_id)
611 return dag_id
612 else:
613 self.logger.warn('could not parse job id from "%s"' % submit_status)
614 return SGE.get_id(self,submit_status)
615 return -1
616
617
618
635
636 - def GetArguments(self,job,td,idx=False,iter=False,output="str"):
645
647 args = {}
648 for str in argstr.split(" "):
649 str = str[2:]
650 pieces = str.split("=")
651 if len(pieces) == 1:
652 args[str] = 1
653 else:
654 args[pieces[0]] = pieces[1]
655 return args
656
659
661 steering = job.GetSteering()
662 args = self.GetArguments(job,td,idx,iter,output="dict")
663 parser = ExpParser(args,steering)
664
665 submitfile = open(filename,'w')
666 outfile = filename.replace('.sge','.out')
667 errfile = filename.replace('.sge','.err')
668
669 job.Write(submitfile,"#!/bin/sh")
670 self.logger.debug("#!/bin/sh")
671
672 job.Write(submitfile,"#$ -o %s" % outfile)
673 self.logger.debug("#$ -o %s" % outfile)
674
675 job.Write(submitfile,"#$ -e %s" % errfile )
676 self.logger.debug("#$ -e %s" % errfile )
677
678
679
680 queue_list = []
681 for key in self.GetParamKeys():
682 if not key.startswith("queue"):
683 job.Write(submitfile,"#$ %s" % (self.GetParam(key)))
684 else:
685 queue_list.append(self.GetParam(key)[2:])
686
687 td_batch_opts = td.GetBatchOpts()
688 self.logger.debug(td_batch_opts)
689 try:
690 td_batch_opts = parser.parse(td_batch_opts)
691 except:
692 self.logger.warn("could not parse %s" % td_batch_opts)
693 td_batch_opts = ""
694 if td_batch_opts:
695 for opt in map(string.strip,td_batch_opts.split(";")):
696 job.Write(submitfile,"#$ %s" % opt)
697 if opt.startswith("-q"): queue_list = []
698
699 if queue_list:
700 chosen_queue = self._choose_queue(queue_list)
701 job.Write(submitfile,"#$ -q %s" % chosen_queue)
702
703
704 for key,opt in job.GetBatchOpts().items():
705 job.Write(submitfile,"#$ %s " % opt)
706
707 for var in self.env.keys():
708 job.Write(submitfile, "export %s=%s" % (var, self.env[var]), parse=False)
709 for var in job.env.keys():
710 job.Write(submitfile, "export %s=%s" % (var, job.env[var]), parse=False)
711 job.Write(submitfile, "RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False)
712 job.Write(submitfile, "mkdir -p $RUNDIR")
713
714 for file in job.GetInputFiles()+[job.GetExecutable()]:
715 job.Write(submitfile, "cp %s $RUNDIR" % file,parse=False)
716
717
718 job.Write(submitfile, "cp %s $RUNDIR" % job.manifest_in)
719 job.Write(submitfile, "cp %s $RUNDIR" % job.manifest_out)
720
721 job.Write(submitfile, "cd $RUNDIR",parse=False)
722
723 job.Write(submitfile, "echo \"running on $HOSTNAME:$RUNDIR\"",parse=False)
724 argstr = job.GetMainScript() + " " + self.GetArguments(job,td,idx,iter,output="string")
725 executable = os.path.basename(job.GetExecutable())
726 job.Write(submitfile,
727 "$PYROOT/bin/python -u %s %s 1>%s.tmp 2>%s.tmp" % (executable,argstr,outfile,errfile),
728 parse=False)
729 job.Write(submitfile, "RETVAL=$?", parse=False)
730
731 job.Write(submitfile, "cat %s.tmp>&1" % outfile, parse=False)
732 job.Write(submitfile, "cat %s.tmp>&2" % errfile, parse=False)
733 job.Write(submitfile,"rm -f wgetrc" )
734
735 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'):
736 job.Write(submitfile,"for file in *; do")
737 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir())
738 job.Write(submitfile," mv $file %s" % job.GetInitialdir())
739 job.Write(submitfile," fi; done")
740
741 job.Write(submitfile,"#clean directory")
742 job.Write(submitfile,"cd /tmp")
743 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False)
744 job.Write(submitfile,"exit $RETVAL")
745 submitfile.close()
746 os.system("chmod +x %s" % submitfile.name)
747
748
750 """
751 Querie status of job on SGE queue
752 """
753 self.logger.info("checking jobs in queue")
754 if not self.localdb: return 1
755
756 if isinstance(jobs,list):
757 job_list = jobs
758 else:
759 job_list = [jobs]
760 for job in job_list:
761 job.SetStatus('?')
762 if not job.GetJobId(): continue
763
764 cursor = self.localdb.cursor()
765 sql = 'SELECT parent_id,child_id,status FROM `queue` WHERE parent_id = "%s" ' % job.GetJobId()
766 cursor.execute(sql)
767
768 jobentries = cursor.fetchall()
769 if len(jobentries) == 0:
770 jobentries = [(job.GetJobId(),job.GetJobId(),'?')]
771 for dag_id,job_id, status in jobentries:
772 cmd = "qstat -f %s " % job_id
773 self.logger.debug(cmd)
774 status,output = commands.getstatusoutput(cmd)
775 if status:
776 self.logger.error("%s: %s: %s" % (cmd,status,output))
777 if job.GetStatus() == '?':
778 job.SetStatus('FINISHED')
779 else:
780 for line in output.split('\n'):
781 line = line.strip()
782 if line.startswith('job_state'):
783 jobstatus = cstatus(line.split('=')[1].strip())
784 if jobstatus == 'PROCESSING' or job.GetStatus() not in ['PROCESSING','QUEUED']:
785 job.SetStatus(jobstatus)
786 if line.startswith('exec_host'):
787 host = line.split('=')[1].strip()
788 job.SetHost(host)
789
790
791 if job.GetStatus() == "FINISHED":
792 cursor.execute('DELETE FROM queue WHERE parent_id = "%s"' % dag_id)
793 self.localdb.commit()
794 return 1
795
797 """
798 Remove cluster or job from queue
799 """
800 retval = 0
801 self.logger.info("checking jobs in queue")
802 if not self.localdb:
803 return SGE.QRemove(self,job)
804
805 cursor = self.localdb.cursor()
806 sql = 'SELECT parent_id,child_id,status FROM `queue` WHERE parent_id = "%s" ' % job.job_id
807 cursor.execute(sql)
808
809 jobentries = cursor.fetchall()
810 if len(jobentries) == 0:
811 jobentries = [(job.job_id,job.job_id,'?')]
812 for dag_id,job_id, status in jobentries:
813 task = i3Task()
814 task.job_id = job_id
815 retval = SGE.QRemove(self,task)
816
817 cursor.execute('DELETE FROM queue WHERE parent_id = "%s"' % job.job_id)
818 self.localdb.commit()
819 return retval
820
821
823 """
824 Not implemeted for DAG yet
825 """
826 if not self.localdb:
827 self.logger.warn("CleanQ: no local database found.")
828 return 0
829 else:
830 if not isinstance(jobs,list): jobs = [jobs]
831 cursor = self.localdb.cursor()
832 sql = 'SELECT child_id FROM `queue`'
833 cursor.execute(sql)
834 jobentries = cursor.fetchall()
835 for jobid in jobentries:
836 job = i3Job()
837 job.SetJobId(jobid[0])
838 jobs.append(job)
839 return SGE.CleanQ(self,jobs)
840
841
842
845
846
847 import dag
848
863