Package iceprod :: Package server :: Package plugins :: Module dag
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.plugins.dag

  1  #!/bin/env python 
  2  # 
  3   
  4  """ 
  5   A basic wrapper for submitting and monitoring jobs to Condor.  
  6   This module implements only a small subset of Condor's many features.  
  7   (http://www.cs.wisc.edu/condor)    
  8   Inherits from i3Queue 
  9   
 10   copyright  (c) 2005 the icecube collaboration 
 11   
 12   @version: $Revision: $ 
 13   @date: $Date: $ 
 14   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 15   @todo: implement more functionality of condor. 
 16  """ 
 17   
 18  import os 
 19  import re 
 20  import sys 
 21  import math 
 22  import dircache 
 23  import time 
 24  import string 
 25  import shutil 
 26  import ConfigParser 
 27  from iceprod.server.grid import iGrid 
 28  import logging 
 29  from iceprod.core import metadata 
 30  from iceprod.core.dataclasses import Steering 
 31  from iceprod.core.lex import ExpParser 
 32  from iceprod.core import functions  
 33  from iceprod.server.db import ConfigDB 
 34  from iceprod.server.job import i3Job, i3Task 
 35   
 36  logger = logging.getLogger('IceProdDAG') 
 37   
 38   
39 -class IceProdDAG(iGrid):
40 """ 41 This class represents a job that executes in multiple parts using a DAG. 42 """ 43
44 - def __init__(self):
45 iGrid.__init__(self) 46 self.grids = dict() 47 self.grids["all"] = [] 48 self.logger = logging.getLogger('IceProdDAG')
49
50 - def ConfigureGrids(self,key,val):
51 self.grids[key] = val.split(",") 52 all = reduce(lambda y1,y2: y1|y2, map(lambda x:set(self.grids[x]),self.grids.keys())) 53 self.grids["all"] = list(all)
54
55 - def Submit(self,cookie):
56 57 self.logger.debug("Submitting DAG") 58 for job in self.jobs: 59 self.SubmitDAG(job)
60
61 - def SubmitDAG(self,job):
62 """ 63 Write submit file to a file. 64 @param job: i3Job object 65 @param config_file: path to file where submit file will be written 66 """ 67 68 from iceprod.core.dataclasses import IceTrayConfig 69 70 db = self.GetMonitorDB() 71 72 steering = job.GetSteering() 73 74 task_defs = steering.GetTaskDefinitions() 75 logger.debug("Task definitions: %s" % task_defs) 76 if not len(task_defs): 77 # not a DAG 78 logger.warn("No tasks specified in config file; doing regular submit") 79 return None 80 81 job_id = job.GetDatabaseId() 82 dataset_id = job.GetDatasetId() 83 queue_id = job.GetProcNum() 84 grids = map(string.strip,steering.GetSysOpt("sub_grids").GetValue().split(",")) 85 86 logger.info("initializing grid_statistics (DAG)") 87 db.InitializeGridStatsDAG(grids,steering,dataset_id) 88 89 for taskname,td in task_defs.items(): 90 td_id = td.GetId() 91 if td.ParallelExecutionEnabled(): 92 trays = td.GetTrays() 93 for idx,tray in trays.items(): 94 for iter in tray.GetIters(): 95 96 parser = ExpParser({'tray':idx,'iter':iter,'procnum':queue_id,'nproc':queue_id+1},steering) 97 if db.task_is_finished(td_id, job_id, idx, iter): 98 continue 99 100 tid = db.task_init(job.GetDatasetId(),job.GetDatabaseId(),idx,iter) 101 db.task_update_status(tid,'IDLE',key=job.GetArgOpt("key"),grid_id=0) 102 103 else: 104 if db.task_is_finished(td_id, job_id): 105 continue 106 if taskname == 'trashcan': 107 done = False 108 tid = db.task_init(job.GetDatasetId(),job.GetDatabaseId()) 109 110 parser = ExpParser({'tray':0,'iter':0,'procnum':queue_id,'nproc':queue_id+1},steering) 111 db.task_update_status(tid,'IDLE',key=job.GetArgOpt("key"),grid_id=0) 112 113 db.commit() 114 self.CleanQ([job]) 115 db.jobsubmitted( 116 job.GetDatasetId(), job.GetProcNum(), 117 job.GetInitialdir(), job.GetJobId()) 118 return 0,"submission complete"
119 120 121
122 - def QRemove(self,job):
123 """ 124 Remove active job/cluster from queuing system. 125 """ 126 db = self.GetMonitorDB() 127 128 steering = Steering() 129 db.download_tasks(job.dataset_id,steering) 130 task_defs = steering.GetTaskDefinitions() 131 for taskname,td in task_defs.items(): 132 td_id = td.GetId() 133 trays = td.GetTrays() 134 for idx,tray in trays.items(): 135 for iter in tray.GetIters(): 136 tid = db.get_task_id(td_id,job.job_id, idx, iter) 137 if tid and db.task_status(tid) not in ('OK','IDLE','WAITING'): 138 db.task_update_status(tid,'IDLE',key=job.passkey) 139 db.commit() 140 return -1
141 142 143 144 145
146 - def CleanQ(self,jobs=None):
147 148 if not jobs: return 0 149 db = self.GetMonitorDB() 150 151 if isinstance(jobs,list): 152 job_list = jobs 153 else: 154 job_list = [jobs] 155 156 datasets = {} 157 for job in job_list: 158 if not datasets.has_key(job.GetDatasetId()): 159 datasets[job.GetDatasetId()] = [] 160 datasets[job.GetDatasetId()].append(job) 161 162 for dataset_id in datasets.keys(): 163 steering = Steering() 164 db.download_tasks(dataset_id,steering) 165 task_defs = steering.GetTaskDefinitions() 166 for job in datasets[dataset_id]: 167 for taskname,td in task_defs.items(): 168 169 parents_finished = True 170 for parent in td.GetParents(): # check dependencies 171 parent_td = steering.GetTaskDefinition(parent) 172 if not db.task_is_finished(parent_td.GetId(), job.GetDatabaseId()): 173 parents_finished = False 174 break 175 176 if parents_finished: 177 td_id = td.GetId() 178 trays = td.GetTrays() 179 for idx,tray in trays.items(): 180 for iter in tray.GetIters(): 181 tid = db.get_task_id(td_id,job.GetDatabaseId(), idx, iter) 182 if tid and db.task_status(tid) == 'IDLE': 183 logger.info("Resetting task %s" % taskname ) 184 queue_id = job.GetProcNum() 185 parser = ExpParser({'tray':idx,'iter':iter,'procnum':queue_id,'nproc':queue_id+1},steering) 186 db.task_update_status(tid,'WAITING',key=job.GetArgOpt("key")) 187 db.commit()
188 189
190 -class TaskQ(iGrid):
191 """ 192 This class represents a job that executes in multiple parts using a DAG. 193 """ 194
195 - def __init__(self):
196 iGrid.__init__(self) 197 self.suffix = 'dag' 198 self.logger = logging.getLogger('TaskQ') 199 self.localdb = LocalDB()
200 201
202 - def SubmitTask(self,job):
203 """ 204 Write submit file to a file. 205 @param job: i3Job object 206 @param config_file: path to file where submit file will be written 207 """ 208 209 from iceprod.core.dataclasses import IceTrayConfig 210 from copy import deepcopy 211 212 db = self.GetMonitorDB() 213 214 steering = job.GetSteering() 215 task_defs = steering.GetTaskDefinitions() 216 logger.debug("Task definitions: %s" % task_defs) 217 status_sum = 0 218 219 if not len(task_defs): 220 # not a DAG 221 logger.warn("No tasks specified in config file; ignoring task") 222 return None 223 224 job_id = job.GetDatabaseId() 225 226 file_catalog = {} 227 for taskname,td in task_defs.items(): 228 args = self.GetArguments(job,td,output="dict") 229 file_catalog[taskname] = self.GetFiles(job,td,args) 230 231 td = task_defs[job.task_name] 232 td_id = td.GetId() 233 234 if td.ParallelExecutionEnabled(): 235 trays = td.GetTrays() 236 for idx,tray in trays.items(): 237 for iter in tray.GetIters(): 238 newjob = deepcopy(job) 239 newjob.task_id = self.i3monitordb.get_task_id(td_id,newjob.GetJobId(), idx, iter) 240 if not iter == -1: 241 nodename = "%s_%u" % (job.task_name,iter) 242 else: 243 nodename = "%s_ext" % job.task_name 244 filename = job.config_file.replace(self.suffix,"%s.%s" % (nodename,self.suffix)) 245 self.logger.debug("Got task %s (ID=%u) with filename %s" % (job.task_name, job.task_id, filename)) 246 newjob.config_file = filename 247 args = self.GetArguments(newjob,td,idx,iter,output="dict") 248 249 input,output,notes = self.GetFiles(newjob,td,args,idx,iter,file_catalog) 250 newjob.SetOutputFile(filename.replace(self.suffix,'out')) 251 newjob.SetErrorFile(filename.replace(self.suffix,'err')) 252 self.WriteFileManifest(newjob,filename,input,output,notes) 253 self.SetArguments(newjob,td,idx,iter) 254 self.WriteConfig(newjob,filename) 255 status, submit_status = self._submit(newjob) 256 status_sum += status 257 if not status: 258 self.localdb.AddTaskInfo(newjob.task_id,newjob.initialdir,newjob.job_id,'QUEUED') 259 self.i3monitordb.task_update_status(newjob.task_id,'QUEUED',newjob.passkey) 260 else: 261 #job.task_id = self.i3monitordb.get_task_id(td_id,job.GetJobId(), idx, iter) 262 args = self.GetArguments(job,td,output="dict") 263 filename = job.config_file.replace(self.suffix,"%s.%s" % (job.task_name,self.suffix)) 264 265 input,output,notes = self.GetFiles(job,td,args,catalog=file_catalog) 266 job.SetOutputFile(filename.replace(self.suffix,'out')) 267 job.SetErrorFile(filename.replace(self.suffix,'err')) 268 269 self.WriteFileManifest(job,filename,input,output,notes) 270 self.SetArguments(job,td) 271 job.config_file = filename 272 self.WriteConfig(job,filename) 273 status, submit_status = self._submit(job) 274 status_sum += status 275 if status: 276 self.logger.error("failed to submit jobs:"+job.submit_msg) 277 self.localdb.AddTaskInfo(job.task_id,job.initialdir,job.job_id,'ERROR') 278 self.i3monitordb.task_update_status(job.task_id,'ERROR',job.passkey) 279 else: 280 self.logger.info("submitted job %s.%s.%s:"%(job.dataset_id,job.proc,job.task_name)) 281 self.localdb.AddTaskInfo(job.task_id,job.initialdir,job.job_id,'QUEUED') 282 self.i3monitordb.task_update_status(job.task_id,'QUEUED',job.passkey) 283 return status_sum,self.submit_status
284 285 286
287 - def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False):
288 steering = job.GetSteering() 289 if idx is not False: 290 td_trays = {idx: td.GetTray(idx)} 291 else: 292 td_trays = td.GetTrays() 293 294 if args.has_key('dagtemp'): 295 tmp_dir = args['dagtemp'] 296 else: 297 tmp_dir = 'file:///tmp' # TODO: give this a sane default or make it an error 298 299 if args.has_key('fetch'): 300 global_dir = args['fetch'] 301 else: 302 global_dir = 'file:///tmp' 303 304 td_input = {} 305 td_output = {} 306 notes = {} 307 308 if td.IsCleanup(): 309 # cleanup job has no input/output files required 310 return (td_input, td_output, notes) 311 312 for idx, td_tray in td_trays.iteritems(): 313 args['tray'] = idx 314 315 logger.debug("GetTray(%s)" % idx) 316 icetray = steering.GetTray(idx) 317 318 input_files = icetray.GetInputFiles() 319 parsed_input = [] 320 321 output_files = icetray.GetOutputFiles() 322 parsed_output = [] 323 324 if iter is not False: 325 iters = [iter] 326 else: 327 iters = td_tray.GetIters() 328 for iter in iters: 329 args['iter'] = iter 330 parser = ExpParser(args,steering) 331 for d in steering.GetDependencies(): 332 d_file = parser.parse(d) 333 if not td_input.has_key(d_file): 334 location = d_file 335 if not self.IsUrl(location): 336 location = os.path.join(global_dir, location) 337 td_input[os.path.basename(d_file)] = [location] 338 for i_file in input_files: 339 name = i_file.GetName() 340 name = parser.parse(name) 341 if not td_input.has_key(name) \ 342 and not td_output.has_key(name): 343 if catalog: 344 node = self.FindFile(steering,td,catalog,name) 345 else: 346 node = td.GetName() 347 348 note = False 349 if node == "global": 350 location = global_dir 351 note = "global" 352 if node != "global": 353 location = tmp_dir 354 location = os.path.join(location, str(job.GetDatasetId())) 355 location = os.path.join(location, str(job.GetProcNum())) 356 location = os.path.join(location, node) 357 note = "dontextract" 358 location = os.path.join(location, name) 359 if i_file.IsPhotonicsTable(): 360 note = "photonics"; 361 if note: 362 notes[name] = note 363 td_input[name] = [location] 364 for o_file in output_files: 365 name = o_file.GetName() 366 name = parser.parse(name) 367 if not td_output.has_key(name): 368 location = os.path.join(tmp_dir, str(job.GetDatasetId())) 369 location = os.path.join(location, str(job.GetProcNum())) 370 location = os.path.join(location, str(td.GetName())) 371 location = os.path.join(location, name) 372 td_output[name] = [location] 373 374 return (td_input, td_output, notes)
375
376 - def FindFile(self,steering,td,catalog,file):
377 parents = td.GetParents() 378 379 # check immediate parents for this file 380 for parent in parents: 381 if catalog.has_key(parent): 382 if catalog[parent][1].has_key(file): 383 return parent 384 385 # check older ancestors 386 for parent in parents: 387 parent_td = steering.GetTaskDefinition(parent) 388 result = self.FindFile(steering,parent_td,catalog,file) 389 if result != "global": 390 return result 391 392 return "global"
393
394 - def WriteFileManifest(self,job,filename,input,output,notes):
395 logger.debug("Input files: %s" % input) 396 suffix = filename.split('.')[-1] 397 in_manifest = open(filename.replace('.'+suffix, ".input"), 'w') 398 if len(input): 399 padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 400 fmt_str = "%-" + padding + "s %s" 401 for i_file, locs in input.items(): 402 for loc in locs: 403 file = fmt_str % (loc, i_file) 404 if notes.has_key(i_file): 405 file += "\t%s" % notes[i_file] 406 job.Write(in_manifest, file) 407 in_manifest.close() 408 logger.debug("Output files: %s" % output) 409 out_manifest = open(filename.replace('.'+suffix, ".output"), 'w') 410 if len(output): 411 padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 412 fmt_str = "%-" + padding + "s %s" 413 for o_file, locs in output.items(): 414 for loc in locs: 415 job.Write(out_manifest, fmt_str % (o_file, loc)) 416 out_manifest.close() 417 job.AddInputFile(in_manifest.name) 418 job.AddInputFile(out_manifest.name)
419
420 - def QueueJobs(self,db,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0):
421 return db.QueueTasks(maxjobs,grid_id,jobs_at_once,fifo,debug)
422
423 - def jobfinalize(self,dataset_id,job_id,job,status='OK',clear_errors=True):
424 """ 425 Update status for job 426 @param dataset_id: dataset index 427 @param job_id: process number within dataset 428 """ 429 #return self.i3monitordb.jobfinalize(self,dataset_id,job_id,passkey,status,clear_errors) 430 job = self.localdb.FillTaskInfo( [ job ] )[0] 431 return self.i3monitordb.task_update_status(job['task_id'],status,job['job.passkey'])
432
433 - def reset_old_jobs(self, grid_id, maxidletime, maxruntime, maxsubmittime, maxcopytime, maxfailures, maxevicttime, keepalive):
434 """ 435 reset status of jobs that where queued but who's status 436 has not changed in more that maxtime minutes 437 438 @param grid_id: id of current cluster 439 @param maxruntime: maximum run time for jobs 440 @param maxsubmittime: maximum submit time for jobs 441 @param maxcopytime: maximum time for jobs to be in 'copying' state 442 @param maxfailures: maximum number of time a job is allowd to fail 443 @param keepalive: how often should server expect to hear from jobs 444 """ 445 return self.i3monitordb.reset_old_tasks(grid_id, maxidletime, maxruntime, maxsubmittime, maxcopytime, maxfailures, maxevicttime, keepalive)
446 447
448 - def GetResetJobs(self,grid_id,max_reset=50):
449 task_list = self.i3monitordb.GetResetTasks(grid_id,max_reset) 450 return self.localdb.FillTaskInfo(task_list)
451
452 - def GetFinishedJobs(self,grid_id,max_reset=50):
453 task_list = self.i3monitordb.GetFinishedTasks(grid_id,max_reset) 454 return self.localdb.FillTaskInfo(task_list)
455
456 - def GetActiveJobs(self,grid_id):
457 task_list = self.i3monitordb.GetActiveTasks(grid_id) 458 return self.localdb.FillTaskInfo(task_list)
459
460 - def GetProcessingJobs(self,grid_id,max_reset=50):
461 task_list = self.i3monitordb.GetProcessingTasks(grid_id,max_reset) 462 return self.localdb.FillTaskInfo(task_list)
463
464 - def GetQueuedJobs(self,grid_id,max_reset=50):
465 task_list = self.i3monitordb.GetQueuedTasks(grid_id,max_reset) 466 return self.localdb.FillTaskInfo(task_list)
467
468 - def EnumerateParentNodes(self,steering,td):
469 parents = td.GetParents() 470 parentnodes = [] 471 for parent in parents: 472 parentobj = steering.GetTaskDefinition(parent) 473 if parentobj.ParallelExecutionEnabled(): 474 for idx,tray in parentobj.GetTrays().items(): 475 for iter in tray.GetIters(): 476 if not iter == -1: 477 nodename = "%s_%u" % (parent,iter) 478 else: 479 nodename = "%s_ext" % parent 480 parentnodes.append(nodename) 481 else: 482 parentnodes.append(parent) 483 return parentnodes
484
485 - def GetArguments(self,job,td,idx=False,iter=False,output="str"):
486 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 487 argstr = " --dag --task=%s" % td.GetName() 488 if not idx is False: 489 argstr += " --tray=%s --iter=%s" % (idx,iter) 490 argstr += job.GetMainScript() + " " + " ".join(argopts) 491 if output == "dict": 492 argstr = self.ArgStrToDict(argstr) 493 return argstr
494 495 496
497 - def SetArguments(self,job,td,idx=False,iter=False):
498 """ 499 Add DAG arguments for job 500 """ 501 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 502 job.AddArgOption('dag') 503 job.AddArgOption('task',td.GetName()) 504 job.AddArgOption('abort-on-fail','0') 505 506 if not idx is False: 507 job.AddArgOption('tray',job.idx) 508 job.AddArgOption('iter',job.iter) 509 510 return
511 512
513 - def ArgStrToDict(self,argstr):
514 args = {} 515 for str in argstr.split(" "): 516 str = str[2:] 517 pieces = str.split("=") 518 if len(pieces) == 1: 519 args[str] = 1 520 else: 521 args[pieces[0]] = pieces[1] 522 return args
523
524 - def getAdditionalOptions(self,job):
525 return {}
526 527
528 - def CleanQ(self,jobs=None):
529 530 if not jobs: return 0 531 532 if isinstance(jobs,list): 533 job_list = jobs 534 else: 535 job_list = [jobs] 536 537 datasets = {} 538 job_list = self.localdb.FillTaskInfo( job_list ) 539 for job in job_list: 540 if datasets.has_key(job.GetDatasetId()): 541 datasets[job.GetDatasetId()].append(job) 542 else: 543 datasets[job.GetDatasetId()] = [] 544 545 for dataset_id in datasets.keys(): 546 steering = Steering() 547 db = self.GetMonitorDB() 548 db.download_tasks(dataset_id,steering) 549 task_defs = steering.GetTaskDefinitions() 550 for job in datasets[dataset_id]: 551 for taskname,td in task_defs.items(): 552 553 parents_finished = True 554 for parent in td.GetParents(): # check dependencies 555 parent_td = steering.GetTaskDefinition(parent) 556 if not self.i3monitordb.task_is_finished(parent_td.GetId(), job.GetJobId()): 557 parents_finished = False 558 break 559 560 if parents_finished: 561 td_id = td.GetId() 562 trays = td.GetTrays() 563 for idx,tray in trays.items(): 564 for iter in tray.GetIters(): 565 tid = self.i3monitordb.get_task_id(td_id,job.GetJobId(), idx, iter) 566 if tid and self.i3monitordb.task_status() == 'IDLE': 567 logger.info("Resetting task %s" % taskname ) 568 self.i3monitordb.task_update_status(tid,'WAITING',job.GetArgOpt("key")) 569 self.localdb.AddTaskInfo(job.task_id,job.initialdir,job.job_id,'WAITING')
570
571 - def Clean(self,jobdict,force=False):
572 """ 573 Interface: clean submit directory 574 """ 575 dir = "%(submitdir)s"%jobdict 576 logger.debug(dir) 577 if os.path.exists(dir) and os.path.isdir(dir): 578 logger.info('%(dataset_id)u.%(queue_id)u: cleaning dir %(submitdir)s' % jobdict) 579 functions.removedirs(dir)
580 581
582 -class LocalDB:
583
584 - def __init__(self):
585 self.localdb = None 586 self.logger = logging.getLogger('LocalSQLiteDB') 587 try: 588 import sqlite3 589 self.localdb = sqlite3.connect(os.path.expandvars("$I3PROD/shared/localqueue.db")) 590 except ImportError: 591 self.logger.error("sqlite3 missing. will try sqlite.") 592 try: 593 import sqlite 594 self.localdb = sqlite.connect(os.path.expandvars("$I3PROD/shared/localqueue.db")) 595 except ImportError: 596 self.logger.error("sqlite missing. won't try to mantain queue sanity") 597 if self.localdb: 598 cursor = self.localdb.cursor() 599 try: 600 cursor.execute('CREATE TABLE IF NOT EXISTS local_task (task_id int, submitdir VARCHAR(200), grid_queue_id VARCHAR(80), status VARCHAR(80))') 601 except Exception,e: 602 self.logger.error(e) 603 else: 604 self.localdb.commit() 605 cursor.close()
606 607 608
609 - def AddTaskInfo(self,task_id,submitdir,grid_queue_id,status):
610 sql = 'SELECT * FROM `local_task` WHERE task_id = %s' % task_id 611 self.logger.debug(sql) 612 cursor = self.localdb.cursor() 613 cursor.execute(sql) 614 task = cursor.fetchone() 615 if task: 616 sql = ' UPDATE `local_task`' 617 sql += ' SET submitdir = "%s", ' % submitdir 618 sql += ' grid_queue_id = "%s", ' % grid_queue_id 619 sql += ' status = "%s" ' % status 620 sql += ' WHERE task_id = %s ' % task_id 621 else: 622 sql = 'INSERT INTO `local_task` (task_id,submitdir,grid_queue_id,status) ' 623 sql += 'VALUES (%s,"%s",%s,"%s") ' % (task_id,submitdir,grid_queue_id,status) 624 cursor.execute(sql) 625 self.localdb.commit()
626
627 - def GetTaskInfo(self,task_id):
628 sql = 'SELECT * FROM `local_task` WHERE task_id = %s' % task_id 629 self.logger.debug(sql) 630 cursor = self.localdb.cursor() 631 cursor.execute(sql) 632 entry = cursor.fetchone() 633 results = dict() 634 if entry: 635 results['task_id'] = entry[0] 636 results['submitdir'] = entry[1] 637 results['grid_queue_id'] = entry[2] 638 #results['status'] = entry[3] 639 return results
640
641 - def FillTaskInfo(self,task_list):
642 for task in task_list: 643 if isinstance(task,i3Task): 644 j = self.GetTaskInfo( task.GetTaskId() ) 645 if not j: continue 646 task.SetJobId(j['grid_queue_id']) 647 task.SetInitialdir(j['submitdir']) 648 task.SetLogFile( "%s/%s.log" % (j['submitdir'],task.Prefix() )) 649 task.SetOutputFile( "%s/%s.out" % (j['submitdir'],task.Prefix() )) 650 task.SetErrorFile( "%s/%s.err" % (j['submitdir'],task.Prefix() )) 651 else: 652 j = self.GetTaskInfo( task["task_id"] ) 653 if not j: continue 654 for key in j.keys(): 655 task[key] = j[key] 656 return task_list
657
658 - def __del__(self):
659 if self.localdb: 660 self.localdb.close()
661