Package iceprod :: Package server :: Module job
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.job

  1  #!/bin/env python 
  2  # 
  3  """ 
  4   Interface for configuring and submitting jobs on a computing cluster.  
  5   use this class directly. Instead use one of the implementations 
  6   that inherit from this class. 
  7   
  8   copyright  (c) 2005 the icecube collaboration 
  9   
 10   @version: $Revision: $ 
 11   @date: $Date: $ 
 12   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 13   @todo: implement more functionality and features 
 14  """ 
 15   
 16  import os 
 17  import re 
 18  import sys 
 19  import math 
 20  import dircache 
 21  import time 
 22  import string 
 23  import shutil 
 24  import cPickle 
 25  import logging 
 26  import logging.config 
 27  from iceprod.core import metadata 
 28  from iceprod.core.dataclasses import Steering 
 29  from iceprod.core.lex import ExpParser 
 30   
 31  logger = logging.getLogger('i3Job') 
 32   
 33   
34 -class i3Job:
35 """ 36 This class represents a generic job a distributed system. 37 """ 38 39 name = 'iceprod' 40 executable = None 41 params = {} 42 input_files = [] 43 output_files = [] 44 mainscript = "" 45 batchopts = {} 46 arguments = [] 47 nproc = 1 48 dataset_id = 0 49 proc = 0 50 host = None 51 current_time = time.asctime().replace(" ","_").replace(":",".") 52 config_file = "config.xml" 53 logfile = None 54 log = "" 55 outputfile = None 56 out = "" 57 errorfile = None 58 err = "" 59 submit_status = "" 60 submit_msg = "" 61 initialdir = None 62 outputurl = None 63 job_id = -1 64 cluster_id = None 65 simdbkey = None 66 host = None 67 port = None 68 url = None 69 metadata = None 70 post_check_script = None 71 env = {} 72 jobstatus = 0 73 argopts = {'nproc':nproc,'procnum':proc,'dataset':dataset_id} 74 parser = ExpParser(argopts,Steering()) 75 steering = None 76 db_id = None 77 passkey = None 78 prio = 0 79 80 #(0=init submission,1=runnig,2=completed,3=failed) 81 82 # Aggregate CPU times 83 realtime = 0.0 84 usertime = 0.0 85 systime = 0.0 86
87 - def __init__(self):
88 self.name = 'iceprod' 89 self.executable = None 90 self.params = {} 91 self.input_files = [] 92 self.output_files = [] 93 self.mainscript = "" 94 self.batchopts = {} 95 self.arguments = [] 96 self.nproc = 1 97 self.dataset_id = 0 98 self.proc = 0 99 self.host = None 100 current_time = time.asctime().replace(" ","_").replace(":",".") 101 self.config_file = "config.xml" 102 self.logfile = None 103 self.log = "" 104 self.outputfile = None 105 self.out = "" 106 self.errorfile = None 107 self.err = "" 108 self.submit_status = "" 109 self.submit_msg = "" 110 self.initialdir = None 111 self.outputurl = None 112 self.job_id = -1 113 self.cluster_id = None 114 self.simdbkey = None 115 self.host = None 116 self.port = None 117 self.url = None 118 self.metadata = None 119 self.post_check_script = None 120 self.env = {} 121 self.jobstatus = 0 122 self.argopts = {'nproc':self.nproc,'procnum':self.proc,'dataset':self.dataset_id} 123 self.parser = ExpParser(self.argopts,Steering()) 124 self.steering = None 125 self.db_id = None 126 self.passkey = None 127 self.prio = 0 128 129 #(0=init submission,1=runnig,2=completed,3=failed) 130 131 # Aggregate CPU times 132 self.realtime = 0.0 133 self.usertime = 0.0 134 self.systime = 0.0
135
136 - def dict(self):
137 values = dict() 138 """ 139 140 job.SetDatasetId(j['dataset_id']) 141 job.SetDatabaseId(j['job_id']) 142 job.jobstatus = j['status'] 143 job.proc = j['queue_id'] 144 job.prio = j['priority'] 145 job.job_id = j['grid_queue_id'] 146 job.passkey = j['passkey'] 147 job.initialdir = j['submitdir'] 148 job.AddArgOption("key",j['passkey']) 149 job.SetLogFile( "%s/%s.log" % (j['submitdir'],job.Prefix() )) 150 job.SetOutputFile( "%s/%s.out" % (j['submitdir'],job.Prefix() )) 151 job.SetErrorFile( "%s/%s.err" % (j['submitdir'],job.Prefix() )) 152 """
153
154 - def Prefix(self):
155 return '%s.%d.%d' % (self.name,self.dataset_id,self.proc)
156
157 - def GetHost(self):
158 """ 159 Get the host where this job is currently running 160 """ 161 return self.host
162
163 - def SetDatasetId(self,dataset):
164 self.dataset_id = dataset
165
166 - def GetDatasetId(self):
167 return self.dataset_id
168
169 - def SetHost(self,host):
170 """ 171 Set the host where this job is currently running 172 """ 173 self.host = host
174
175 - def GetJobId(self):
176 """ 177 Get the cluster AND job id for the submitted jobs. 178 @return: a list of jobs with their cluster and job id 179 in the condor format 180 None if no jobs have been submitted or if submission failed. 181 """ 182 return self.job_id
183
184 - def GetProcNum(self):
185 return self.proc
186
187 - def SetProcNum(self,proc):
188 self.proc = proc
189
190 - def SetJobId(self,jobid):
191 """ 192 Set the job id for the submitted job. 193 @return: id of job 194 in the condor format 195 None if no jobs have been submitted or if submission failed. 196 """ 197 self.job_id = jobid
198
199 - def SetDatabaseId(self,db_id):
200 """ 201 Set the database job id for the submitted job. 202 """ 203 self.db_id = db_id
204
205 - def GetDatabaseId(self):
206 """ 207 Return the database job id for the submitted job. 208 @return: database ID of job 209 None if job is not in database 210 """ 211 return self.db_id
212
213 - def SetInitialdir(self,path):
214 """ 215 Define the directory where jobs will be submitted from. 216 @param path: system path to directory 217 """ 218 self.initialdir = path
219
220 - def GetInitialdir(self):
221 """ 222 Get the directory where jobs will be submitted from. 223 @return: path to directory 224 """ 225 return self.initialdir
226
227 - def SetConfigFile(self,file):
228 """ 229 Define the XML config file 230 @param path: system path to file 231 """ 232 self.config_file = file
233
234 - def GetConfigFile(self):
235 """ 236 Get the XML config file 237 """ 238 return self.config_file
239 240
241 - def GetRootDir(self):
242 """ 243 Get the root directory of installation 244 @return: path to directory 245 """ 246 return self.execdir
247
248 - def SetRootDir(self,path):
249 """ 250 Define the root directory of installation 251 @param path: system path to directory 252 """ 253 self.execdir = path
254 255
256 - def FileName(self):
257 """ 258 Get the name of this object's serialized file 259 @return: path to directory 260 """ 261 return os.path.join(self.initialdir,'job_%s.qobj' % str(self.job_id))
262
263 - def SetOutputURL(self,url):
264 """ 265 Define the url where data will be copied to. 266 @param url: target url for data 267 """ 268 self.outputurl = url
269
270 - def GetOutputURL(self):
271 """ 272 Get the directory where data will be copied to. 273 @return: path to directory 274 """ 275 return self.outputurl
276
277 - def SetExecutable(self,executable):
278 """ 279 Define the executable file that will be run on condor 280 @param executable: path to executable file. 281 """ 282 self.executable = executable
283
284 - def GetExecutable(self):
285 """ 286 @return: the path to the executable (if set) that will be run on 287 cluster. 288 """ 289 return self.executable
290
291 - def SetPostCheckScript(self,script):
292 """ 293 Define the executable file that will be run after job complets to 294 check if job completed successfully 295 """ 296 self.post_check_script = script
297
298 - def GetPostCheckScript(self):
299 """ 300 @return: the path to the post run check executable (if set) 301 """ 302 return self.post_check_script
303 304
305 - def SetMainScript(self,script_path):
306 """ 307 Set the path to the main script to run 308 @param script_path: script to run 309 """ 310 self.mainscript = script_path
311 312
313 - def GetMainScript(self):
314 """ 315 Get the path to the main script to run 316 @return: script to run 317 """ 318 return self.mainscript
319 320
321 - def AddArgument(self,arg):
322 """ 323 Add a parameter to be passed to executable. 324 Note: Consecutive calls to this method will append parameters in the 325 order in which they were added. 326 327 @param arg: argument to be passed to executable at runtime 328 """ 329 self.arguments.append(arg)
330 331
332 - def AddEnv(self,var,val):
333 """ 334 Add an environment variable. 335 """ 336 self.env[var] = val
337
338 - def GetEnv(self):
339 return self.env
340
341 - def format_arg(self,tuple):
342 """ 343 format runtime cmdline argument 344 @param tuple: tuple in the form (argname,argval) 345 """ 346 if len(tuple[0]) > 1: 347 key = "--%s" % tuple[0] 348 else: 349 key = "-%s" % tuple[0] 350 351 if len(str(tuple[1])): 352 return "%s=%s" % (key,tuple[1]) 353 else: 354 return key
355
356 - def GetArguments(self):
357 """ 358 @return: a list of arguments that will be passed to the executable 359 """ 360 return self.arguments
361
362 - def AddBatchOpt(self,optname,optval):
363 """ 364 Add a batchsystem option to use in submit script 365 366 @param optname: name of option be passed to executable at runtime 367 @param optval: value of option be passed to executable at runtime 368 """ 369 self.batchopts[optname] = optval
370
371 - def GetBatchOpts(self):
372 return self.batchopts
373 374
375 - def AddArgOption(self,optname,optval=""):
376 """ 377 Add a options to be passed to executable. 378 Similar to 'AddArgument' but options will preceed arguemtens 379 For example: executable <option(s)> <argument(s)> 380 Note: Consecutive calls to this method will append options in the 381 order in which they were added. 382 383 @param optname: name of option be passed to executable at runtime 384 @param optval: value of option be passed to executable at runtime 385 """ 386 self.argopts[optname] = optval
387
388 - def GetArgOptions(self):
389 """ 390 @return: a list of options passed to the executable 391 """ 392 return map( self.format_arg, zip(self.argopts.keys(), self.argopts.values()))
393
394 - def GetArgOpt(self,optname):
395 """ 396 Get value of option to be passed to executable. 397 @param optname: name of argument to be passed to executable at runtime 398 """ 399 if optname in self.argopts.keys(): 400 return self.argopts[optname] 401 else: 402 return ""
403
404 - def AddInputFile(self,filename):
405 """ 406 Add an input file that Condor is to transfer to a remote 407 site to set up the execution environment for the job before it is run. 408 These files are placed in the same temporary working directory as the 409 job's executable. At this time, directories can not be transferred in 410 this way. 411 412 @param filename: relative to initial directory if set, 413 otherwise subtmit directory. 414 """ 415 self.input_files.append(filename)
416
417 - def AddOutputFile(self,filename):
418 """ 419 Add an output file that the cluster is to transfer from a remote 420 site on completion of run. 421 These files are placed in the same temporary working directory as the 422 job's executable. At this time, directories can not be transferred in 423 this way. 424 425 @param filename: relative to initial directory if set, 426 otherwise subtmit directory. 427 """ 428 self.output_files.append(filename)
429
430 - def GetInputFiles(self):
431 """ 432 @return: the list of input files that Condor is to transfer to a remote 433 site to set up the execution environment for the job before it is run. 434 """ 435 return self.input_files
436
437 - def GetOutputFiles(self):
438 """ 439 @return: the list of output files that the cluster is to transfer from a remote 440 site after execution. 441 """ 442 return self.output_files
443
444 - def SetQueue(self,nproc):
445 """ 446 @param nproc: number of jobs to enqueue 447 """ 448 self.nproc = nproc
449
450 - def GetQueue(self):
451 """ 452 @return: number of jobs enqueueued 453 """ 454 return self.nproc
455
456 - def SetLogFile(self,filename):
457 """ 458 @param filename: path to logfile where condor will write the 459 state of a given job in the cluster 460 """ 461 self.logfile = filename
462
463 - def SetOutputFile(self,filename):
464 """ 465 @param filename: path to file where condor will write the output 466 generated through stdout by the job 467 """ 468 self.outputfile = filename
469
470 - def SetErrorFile(self,filename):
471 """ 472 @param filename: path to file where condor will write the output 473 generated through stderr by the job 474 """ 475 self.errorfile = filename
476
477 - def GetLogFile(self):
478 """ 479 @return: path to logfile where condor will write the state of a given 480 job in the cluster 481 """ 482 if not self.logfile: 483 self.SetLogFile( "%s.log" % self.Prefix() ) 484 return self.logfile
485
486 - def GetOutputFile(self):
487 """ 488 @return: path to file where condor will write the output generated 489 through stdout by the job 490 """ 491 if not self.outputfile: 492 self.SetOutputFile( "%s.out" % self.Prefix() ) 493 return self.outputfile
494
495 - def GetErrorFile(self):
496 """ 497 @return: path to file where condor will write the output generated 498 through stderr by the job 499 """ 500 if not self.errorfile: 501 self.SetErrorFile( "%s.err" % self.Prefix() ) 502 return self.errorfile
503
504 - def SetSimDBKey(self,key):
505 """ 506 Set the unique key for current configuration on production database 507 """ 508 self.simdbkey = key
509
510 - def GetSimDBKey(self):
511 """ 512 Get the unique key for current configuration from production database 513 """ 514 return self.simdbkey
515
516 - def SetSubmitHost(self,host):
517 """ 518 Set the hostname of the submit node 519 """ 520 self.submithost = host
521 522
523 - def GetSubmitHost(self):
524 """ 525 Get the hostname of the submit node 526 """ 527 return self.submithost
528 529
530 - def SetStatus(self,status):
531 """ 532 (1=enqueued,2=runnig,3=completed,4=failed) 533 """ 534 self.jobstatus = status
535
536 - def GetStatus(self):
537 return self.jobstatus
538
539 - def SetRealTime(self,time):
540 self.realtime = time
541
542 - def GetRealTime(self):
543 return self.realtime
544
545 - def SetUserTime(self,time):
546 self.usertime = time
547
548 - def GetUserTime(self):
549 return self.usertime
550 551
552 - def SetSysTime(self,time):
553 self.systime = time
554
555 - def GetSysTime(self):
556 return self.systime
557
558 - def GetPrio(self):
559 """ 560 Get job priority 561 """ 562 return self.prio
563
564 - def SetPrio(self,prio):
565 """ 566 Set job priority 567 """ 568 self.prio = prio
569
570 - def Pickle(self,filename=None):
571 """ 572 Serialize iGrid object and write it to a file 573 """ 574 if not filename: 575 filename = self.Filename() 576 if not filename.startswith("/"): 577 filename = os.path.join(self.GetInitialdir(),filename) 578 file = open(filename,"w") 579 cPickle.dump(self,file) 580 file.close()
581
582 - def unPickle(self,filename):
583 """ 584 unSerialize iGrid object and write it to a file 585 """ 586 file = open(filename, "r") 587 job = cPickle.load(file) 588 file.close() 589 return job
590
591 - def Write(self,file,txt,parse=True):
592 """ 593 Format and write string to file 594 """ 595 if parse: 596 print >> file,self.parse_params(txt) 597 else: 598 print >> file,txt
599 600
601 - def GetSteering(self):
602 return self.steering
603
604 - def AddSteering(self,steering):
605 self.steering = steering
606
607 - def AddParser(self,parser):
608 self.parser = parser
609 610
611 - def parse_params(self,src):
612 """ 613 Replace key with value on a string. This can be anything 614 but tipically a dollar sign token such as $args(value) 615 @param src: string contaning one or more token keys to replace 616 """ 617 src = self.parser.parse(src) 618 619 for k in self.params.keys(): 620 src= src.replace('$(%s)' % k,str(self.params[k])) 621 622 src = src.replace('$(Process)',str(self.proc)) 623 return src
624
625 -class i3Task(i3Job):
626 - def __init__(self):
627 i3Job.__init__(self) 628 self.name = 'iceprod.task' 629 self.task_id = 0 630 self.task_name = 'task' 631 self.idx = 0 632 self.iter = -1
633
634 - def SetTaskId(self,tid):
635 """ 636 Set the id for the submitted task. 637 @return: id of task 638 """ 639 self.task_id = tid
640
641 - def GetTaskId(self):
642 """ 643 Get the ID for the submitted tasks. 644 @return: ID of task 645 """ 646 return self.task_id
647