1
2
3
4 """
5 A basic wrapper for submitting and monitoring jobs to Condor.
6 This module implements only a small subset of Condor's many features.
7 (http://www.cs.wisc.edu/condor)
8 Inherits from i3Queue
9
10 copyright (c) 2005 the icecube collaboration
11
12 @version: $Revision: $
13 @date: $Date: $
14 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
15 @todo: implement more functionality of condor.
16 """
17
18 import os
19 import re
20 import sys
21 import math
22 import dircache
23 import time
24 import string
25 import shutil
26 import ConfigParser
27 from iceprod.server.grid import iGrid
28 import logging
29 from iceprod.core import metadata
30 from iceprod.core.dataclasses import Steering
31 from iceprod.core.lex import ExpParser
32 from iceprod.core import functions
33 from iceprod.server.db import ConfigDB
34 from iceprod.server.job import i3Job, i3Task
35
36 logger = logging.getLogger('IceProdDAG')
37
38
40 """
41 This class represents a job that executes in multiple parts using a DAG.
42 """
43
45 iGrid.__init__(self)
46 self.grids = dict()
47 self.grids["all"] = []
48 self.logger = logging.getLogger('IceProdDAG')
49
54
60
62 """
63 Write submit file to a file.
64 @param job: i3Job object
65 @param config_file: path to file where submit file will be written
66 """
67
68 from iceprod.core.dataclasses import IceTrayConfig
69
70 db = self.GetMonitorDB()
71
72 steering = job.GetSteering()
73
74 task_defs = steering.GetTaskDefinitions()
75 logger.debug("Task definitions: %s" % task_defs)
76 if not len(task_defs):
77
78 logger.warn("No tasks specified in config file; doing regular submit")
79 return None
80
81 job_id = job.GetDatabaseId()
82 dataset_id = job.GetDatasetId()
83 queue_id = job.GetProcNum()
84 grids = map(string.strip,steering.GetSysOpt("sub_grids").GetValue().split(","))
85
86 logger.info("initializing grid_statistics (DAG)")
87 db.InitializeGridStatsDAG(grids,steering,dataset_id)
88
89 for taskname,td in task_defs.items():
90 td_id = td.GetId()
91 if td.ParallelExecutionEnabled():
92 trays = td.GetTrays()
93 for idx,tray in trays.items():
94 for iter in tray.GetIters():
95
96 parser = ExpParser({'tray':idx,'iter':iter,'procnum':queue_id,'nproc':queue_id+1},steering)
97 if db.task_is_finished(td_id, job_id, idx, iter):
98 continue
99
100 tid = db.task_init(job.GetDatasetId(),job.GetDatabaseId(),idx,iter)
101 db.task_update_status(tid,'IDLE',key=job.GetArgOpt("key"),grid_id=0)
102
103 else:
104 if db.task_is_finished(td_id, job_id):
105 continue
106 if taskname == 'trashcan':
107 done = False
108 tid = db.task_init(job.GetDatasetId(),job.GetDatabaseId())
109
110 parser = ExpParser({'tray':0,'iter':0,'procnum':queue_id,'nproc':queue_id+1},steering)
111 db.task_update_status(tid,'IDLE',key=job.GetArgOpt("key"),grid_id=0)
112
113 db.commit()
114 self.CleanQ([job])
115 db.jobsubmitted(
116 job.GetDatasetId(), job.GetProcNum(),
117 job.GetInitialdir(), job.GetJobId())
118 return 0,"submission complete"
119
120
121
123 """
124 Remove active job/cluster from queuing system.
125 """
126 db = self.GetMonitorDB()
127
128 steering = Steering()
129 db.download_tasks(job.dataset_id,steering)
130 task_defs = steering.GetTaskDefinitions()
131 for taskname,td in task_defs.items():
132 td_id = td.GetId()
133 trays = td.GetTrays()
134 for idx,tray in trays.items():
135 for iter in tray.GetIters():
136 tid = db.get_task_id(td_id,job.job_id, idx, iter)
137 if tid and db.task_status(tid) not in ('OK','IDLE','WAITING'):
138 db.task_update_status(tid,'IDLE',key=job.passkey)
139 db.commit()
140 return -1
141
142
143
144
145
147
148 if not jobs: return 0
149 db = self.GetMonitorDB()
150
151 if isinstance(jobs,list):
152 job_list = jobs
153 else:
154 job_list = [jobs]
155
156 datasets = {}
157 for job in job_list:
158 if not datasets.has_key(job.GetDatasetId()):
159 datasets[job.GetDatasetId()] = []
160 datasets[job.GetDatasetId()].append(job)
161
162 for dataset_id in datasets.keys():
163 steering = Steering()
164 db.download_tasks(dataset_id,steering)
165 task_defs = steering.GetTaskDefinitions()
166 for job in datasets[dataset_id]:
167 for taskname,td in task_defs.items():
168
169 parents_finished = True
170 for parent in td.GetParents():
171 parent_td = steering.GetTaskDefinition(parent)
172 if not db.task_is_finished(parent_td.GetId(), job.GetDatabaseId()):
173 parents_finished = False
174 break
175
176 if parents_finished:
177 td_id = td.GetId()
178 trays = td.GetTrays()
179 for idx,tray in trays.items():
180 for iter in tray.GetIters():
181 tid = db.get_task_id(td_id,job.GetDatabaseId(), idx, iter)
182 if tid and db.task_status(tid) == 'IDLE':
183 logger.info("Resetting task %s" % taskname )
184 queue_id = job.GetProcNum()
185 parser = ExpParser({'tray':idx,'iter':iter,'procnum':queue_id,'nproc':queue_id+1},steering)
186 db.task_update_status(tid,'WAITING',key=job.GetArgOpt("key"))
187 db.commit()
188
189
191 """
192 This class represents a job that executes in multiple parts using a DAG.
193 """
194
200
201
203 """
204 Write submit file to a file.
205 @param job: i3Job object
206 @param config_file: path to file where submit file will be written
207 """
208
209 from iceprod.core.dataclasses import IceTrayConfig
210 from copy import deepcopy
211
212 db = self.GetMonitorDB()
213
214 steering = job.GetSteering()
215 task_defs = steering.GetTaskDefinitions()
216 logger.debug("Task definitions: %s" % task_defs)
217 status_sum = 0
218
219 if not len(task_defs):
220
221 logger.warn("No tasks specified in config file; ignoring task")
222 return None
223
224 job_id = job.GetDatabaseId()
225
226 file_catalog = {}
227 for taskname,td in task_defs.items():
228 args = self.GetArguments(job,td,output="dict")
229 file_catalog[taskname] = self.GetFiles(job,td,args)
230
231 td = task_defs[job.task_name]
232 td_id = td.GetId()
233
234 if td.ParallelExecutionEnabled():
235 trays = td.GetTrays()
236 for idx,tray in trays.items():
237 for iter in tray.GetIters():
238 newjob = deepcopy(job)
239 newjob.task_id = self.i3monitordb.get_task_id(td_id,newjob.GetJobId(), idx, iter)
240 if not iter == -1:
241 nodename = "%s_%u" % (job.task_name,iter)
242 else:
243 nodename = "%s_ext" % job.task_name
244 filename = job.config_file.replace(self.suffix,"%s.%s" % (nodename,self.suffix))
245 self.logger.debug("Got task %s (ID=%u) with filename %s" % (job.task_name, job.task_id, filename))
246 newjob.config_file = filename
247 args = self.GetArguments(newjob,td,idx,iter,output="dict")
248
249 input,output,notes = self.GetFiles(newjob,td,args,idx,iter,file_catalog)
250 newjob.SetOutputFile(filename.replace(self.suffix,'out'))
251 newjob.SetErrorFile(filename.replace(self.suffix,'err'))
252 self.WriteFileManifest(newjob,filename,input,output,notes)
253 self.SetArguments(newjob,td,idx,iter)
254 self.WriteConfig(newjob,filename)
255 status, submit_status = self._submit(newjob)
256 status_sum += status
257 if not status:
258 self.localdb.AddTaskInfo(newjob.task_id,newjob.initialdir,newjob.job_id,'QUEUED')
259 self.i3monitordb.task_update_status(newjob.task_id,'QUEUED',newjob.passkey)
260 else:
261
262 args = self.GetArguments(job,td,output="dict")
263 filename = job.config_file.replace(self.suffix,"%s.%s" % (job.task_name,self.suffix))
264
265 input,output,notes = self.GetFiles(job,td,args,catalog=file_catalog)
266 job.SetOutputFile(filename.replace(self.suffix,'out'))
267 job.SetErrorFile(filename.replace(self.suffix,'err'))
268
269 self.WriteFileManifest(job,filename,input,output,notes)
270 self.SetArguments(job,td)
271 job.config_file = filename
272 self.WriteConfig(job,filename)
273 status, submit_status = self._submit(job)
274 status_sum += status
275 if status:
276 self.logger.error("failed to submit jobs:"+job.submit_msg)
277 self.localdb.AddTaskInfo(job.task_id,job.initialdir,job.job_id,'ERROR')
278 self.i3monitordb.task_update_status(job.task_id,'ERROR',job.passkey)
279 else:
280 self.logger.info("submitted job %s.%s.%s:"%(job.dataset_id,job.proc,job.task_name))
281 self.localdb.AddTaskInfo(job.task_id,job.initialdir,job.job_id,'QUEUED')
282 self.i3monitordb.task_update_status(job.task_id,'QUEUED',job.passkey)
283 return status_sum,self.submit_status
284
285
286
287 - def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False):
288 steering = job.GetSteering()
289 if idx is not False:
290 td_trays = {idx: td.GetTray(idx)}
291 else:
292 td_trays = td.GetTrays()
293
294 if args.has_key('dagtemp'):
295 tmp_dir = args['dagtemp']
296 else:
297 tmp_dir = 'file:///tmp'
298
299 if args.has_key('fetch'):
300 global_dir = args['fetch']
301 else:
302 global_dir = 'file:///tmp'
303
304 td_input = {}
305 td_output = {}
306 notes = {}
307
308 if td.IsCleanup():
309
310 return (td_input, td_output, notes)
311
312 for idx, td_tray in td_trays.iteritems():
313 args['tray'] = idx
314
315 logger.debug("GetTray(%s)" % idx)
316 icetray = steering.GetTray(idx)
317
318 input_files = icetray.GetInputFiles()
319 parsed_input = []
320
321 output_files = icetray.GetOutputFiles()
322 parsed_output = []
323
324 if iter is not False:
325 iters = [iter]
326 else:
327 iters = td_tray.GetIters()
328 for iter in iters:
329 args['iter'] = iter
330 parser = ExpParser(args,steering)
331 for d in steering.GetDependencies():
332 d_file = parser.parse(d)
333 if not td_input.has_key(d_file):
334 location = d_file
335 if not self.IsUrl(location):
336 location = os.path.join(global_dir, location)
337 td_input[os.path.basename(d_file)] = [location]
338 for i_file in input_files:
339 name = i_file.GetName()
340 name = parser.parse(name)
341 if not td_input.has_key(name) \
342 and not td_output.has_key(name):
343 if catalog:
344 node = self.FindFile(steering,td,catalog,name)
345 else:
346 node = td.GetName()
347
348 note = False
349 if node == "global":
350 location = global_dir
351 note = "global"
352 if node != "global":
353 location = tmp_dir
354 location = os.path.join(location, str(job.GetDatasetId()))
355 location = os.path.join(location, str(job.GetProcNum()))
356 location = os.path.join(location, node)
357 note = "dontextract"
358 location = os.path.join(location, name)
359 if i_file.IsPhotonicsTable():
360 note = "photonics";
361 if note:
362 notes[name] = note
363 td_input[name] = [location]
364 for o_file in output_files:
365 name = o_file.GetName()
366 name = parser.parse(name)
367 if not td_output.has_key(name):
368 location = os.path.join(tmp_dir, str(job.GetDatasetId()))
369 location = os.path.join(location, str(job.GetProcNum()))
370 location = os.path.join(location, str(td.GetName()))
371 location = os.path.join(location, name)
372 td_output[name] = [location]
373
374 return (td_input, td_output, notes)
375
376 - def FindFile(self,steering,td,catalog,file):
377 parents = td.GetParents()
378
379
380 for parent in parents:
381 if catalog.has_key(parent):
382 if catalog[parent][1].has_key(file):
383 return parent
384
385
386 for parent in parents:
387 parent_td = steering.GetTaskDefinition(parent)
388 result = self.FindFile(steering,parent_td,catalog,file)
389 if result != "global":
390 return result
391
392 return "global"
393
395 logger.debug("Input files: %s" % input)
396 suffix = filename.split('.')[-1]
397 in_manifest = open(filename.replace('.'+suffix, ".input"), 'w')
398 if len(input):
399 padding = str(max(map(lambda x: max(map(len, x)), input.values())))
400 fmt_str = "%-" + padding + "s %s"
401 for i_file, locs in input.items():
402 for loc in locs:
403 file = fmt_str % (loc, i_file)
404 if notes.has_key(i_file):
405 file += "\t%s" % notes[i_file]
406 job.Write(in_manifest, file)
407 in_manifest.close()
408 logger.debug("Output files: %s" % output)
409 out_manifest = open(filename.replace('.'+suffix, ".output"), 'w')
410 if len(output):
411 padding = str(max(map(lambda x: max(map(len, x)), input.values())))
412 fmt_str = "%-" + padding + "s %s"
413 for o_file, locs in output.items():
414 for loc in locs:
415 job.Write(out_manifest, fmt_str % (o_file, loc))
416 out_manifest.close()
417 job.AddInputFile(in_manifest.name)
418 job.AddInputFile(out_manifest.name)
419
420 - def QueueJobs(self,db,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0):
421 return db.QueueTasks(maxjobs,grid_id,jobs_at_once,fifo,debug)
422
423 - def jobfinalize(self,dataset_id,job_id,job,status='OK',clear_errors=True):
432
433 - def reset_old_jobs(self, grid_id, maxidletime, maxruntime, maxsubmittime, maxcopytime, maxfailures, maxevicttime, keepalive):
434 """
435 reset status of jobs that where queued but who's status
436 has not changed in more that maxtime minutes
437
438 @param grid_id: id of current cluster
439 @param maxruntime: maximum run time for jobs
440 @param maxsubmittime: maximum submit time for jobs
441 @param maxcopytime: maximum time for jobs to be in 'copying' state
442 @param maxfailures: maximum number of time a job is allowd to fail
443 @param keepalive: how often should server expect to hear from jobs
444 """
445 return self.i3monitordb.reset_old_tasks(grid_id, maxidletime, maxruntime, maxsubmittime, maxcopytime, maxfailures, maxevicttime, keepalive)
446
447
451
455
459
463
467
484
485 - def GetArguments(self,job,td,idx=False,iter=False,output="str"):
494
495
496
511
512
514 args = {}
515 for str in argstr.split(" "):
516 str = str[2:]
517 pieces = str.split("=")
518 if len(pieces) == 1:
519 args[str] = 1
520 else:
521 args[pieces[0]] = pieces[1]
522 return args
523
526
527
529
530 if not jobs: return 0
531
532 if isinstance(jobs,list):
533 job_list = jobs
534 else:
535 job_list = [jobs]
536
537 datasets = {}
538 job_list = self.localdb.FillTaskInfo( job_list )
539 for job in job_list:
540 if datasets.has_key(job.GetDatasetId()):
541 datasets[job.GetDatasetId()].append(job)
542 else:
543 datasets[job.GetDatasetId()] = []
544
545 for dataset_id in datasets.keys():
546 steering = Steering()
547 db = self.GetMonitorDB()
548 db.download_tasks(dataset_id,steering)
549 task_defs = steering.GetTaskDefinitions()
550 for job in datasets[dataset_id]:
551 for taskname,td in task_defs.items():
552
553 parents_finished = True
554 for parent in td.GetParents():
555 parent_td = steering.GetTaskDefinition(parent)
556 if not self.i3monitordb.task_is_finished(parent_td.GetId(), job.GetJobId()):
557 parents_finished = False
558 break
559
560 if parents_finished:
561 td_id = td.GetId()
562 trays = td.GetTrays()
563 for idx,tray in trays.items():
564 for iter in tray.GetIters():
565 tid = self.i3monitordb.get_task_id(td_id,job.GetJobId(), idx, iter)
566 if tid and self.i3monitordb.task_status() == 'IDLE':
567 logger.info("Resetting task %s" % taskname )
568 self.i3monitordb.task_update_status(tid,'WAITING',job.GetArgOpt("key"))
569 self.localdb.AddTaskInfo(job.task_id,job.initialdir,job.job_id,'WAITING')
570
571 - def Clean(self,jobdict,force=False):
572 """
573 Interface: clean submit directory
574 """
575 dir = "%(submitdir)s"%jobdict
576 logger.debug(dir)
577 if os.path.exists(dir) and os.path.isdir(dir):
578 logger.info('%(dataset_id)u.%(queue_id)u: cleaning dir %(submitdir)s' % jobdict)
579 functions.removedirs(dir)
580
581
583
585 self.localdb = None
586 self.logger = logging.getLogger('LocalSQLiteDB')
587 try:
588 import sqlite3
589 self.localdb = sqlite3.connect(os.path.expandvars("$I3PROD/shared/localqueue.db"))
590 except ImportError:
591 self.logger.error("sqlite3 missing. will try sqlite.")
592 try:
593 import sqlite
594 self.localdb = sqlite.connect(os.path.expandvars("$I3PROD/shared/localqueue.db"))
595 except ImportError:
596 self.logger.error("sqlite missing. won't try to mantain queue sanity")
597 if self.localdb:
598 cursor = self.localdb.cursor()
599 try:
600 cursor.execute('CREATE TABLE IF NOT EXISTS local_task (task_id int, submitdir VARCHAR(200), grid_queue_id VARCHAR(80), status VARCHAR(80))')
601 except Exception,e:
602 self.logger.error(e)
603 else:
604 self.localdb.commit()
605 cursor.close()
606
607
608
609 - def AddTaskInfo(self,task_id,submitdir,grid_queue_id,status):
610 sql = 'SELECT * FROM `local_task` WHERE task_id = %s' % task_id
611 self.logger.debug(sql)
612 cursor = self.localdb.cursor()
613 cursor.execute(sql)
614 task = cursor.fetchone()
615 if task:
616 sql = ' UPDATE `local_task`'
617 sql += ' SET submitdir = "%s", ' % submitdir
618 sql += ' grid_queue_id = "%s", ' % grid_queue_id
619 sql += ' status = "%s" ' % status
620 sql += ' WHERE task_id = %s ' % task_id
621 else:
622 sql = 'INSERT INTO `local_task` (task_id,submitdir,grid_queue_id,status) '
623 sql += 'VALUES (%s,"%s",%s,"%s") ' % (task_id,submitdir,grid_queue_id,status)
624 cursor.execute(sql)
625 self.localdb.commit()
626
628 sql = 'SELECT * FROM `local_task` WHERE task_id = %s' % task_id
629 self.logger.debug(sql)
630 cursor = self.localdb.cursor()
631 cursor.execute(sql)
632 entry = cursor.fetchone()
633 results = dict()
634 if entry:
635 results['task_id'] = entry[0]
636 results['submitdir'] = entry[1]
637 results['grid_queue_id'] = entry[2]
638
639 return results
640
657
661