Package iceprod :: Package server :: Module job
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.job

  1  #!/bin/env python 
  2  # 
  3  """ 
  4   Interface for configuring and submitting jobs on a computing cluster.  
  5   use this class directly. Instead use one of the implementations 
  6   that inherit from this class. 
  7   
  8   copyright  (c) 2005 the icecube collaboration 
  9   
 10   @version: $Revision: $ 
 11   @date: $Date: $ 
 12   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 13   @todo: implement more functionality and features 
 14  """ 
 15   
 16  import os 
 17  import re 
 18  import sys 
 19  import math 
 20  import dircache 
 21  import time 
 22  import string 
 23  import shutil 
 24  import cPickle 
 25  import logging 
 26  import logging.config 
 27  from iceprod.core import metadata 
 28  from iceprod.core.dataclasses import Steering 
 29  from iceprod.core.lex import ExpParser 
 30   
 31  logger = logging.getLogger('i3Job') 
 32   
 33   
34 -class i3Job:
35 """ 36 This class represents a generic job a distributed system. 37 """ 38
39 - def __init__(self):
40 self.name = 'iceprod' 41 self.executable = None 42 self.params = {} 43 self.input_files = [] 44 self.output_files = [] 45 self.mainscript = "" 46 self.batchopts = {} 47 self.arguments = [] 48 self.nproc = 1 49 self.dataset_id = 0 50 self.proc = 0 51 self.host = None 52 current_time = time.asctime().replace(" ","_") 53 self.logfile = "%s_$(Proc).log" % current_time 54 self.outputfile = "%s_$(Proc).output" % current_time 55 self.errorfile = "%s_$(Proc).error" % current_time 56 self.submit_status = "" 57 self.initialdir = None 58 self.outputurl = None 59 self.job_id = -1 60 self.cluster_id = None 61 self.simdbkey = None 62 self.host = None 63 self.port = None 64 self.url = None 65 self.metadata = None 66 self.post_check_script = None 67 self.env = {} 68 self.jobstatus = 0 69 self.argopts = {'nproc':self.nproc,'procnum':self.proc,'dataset':self.dataset_id} 70 self.parser = ExpParser(self.argopts,Steering()) 71 self.steering = None 72 self.db_id = None 73 74 #(0=init submission,1=runnig,2=completed,3=failed) 75 76 # Aggregate CPU times 77 self.realtime = 0.0 78 self.usertime = 0.0 79 self.systime = 0.0
80
81 - def Prefix(self):
82 return '%s.%d.%d' % (self.name,self.dataset_id,self.proc)
83
84 - def GetHost(self):
85 """ 86 Get the host where this job is currently running 87 """ 88 return self.host
89
90 - def SetDatasetId(self,dataset):
91 self.dataset_id = dataset
92
93 - def GetDatasetId(self):
94 return self.dataset_id
95
96 - def SetHost(self,host):
97 """ 98 Set the host where this job is currently running 99 """ 100 self.host = host
101
102 - def GetJobId(self):
103 """ 104 Get the cluster AND job id for the submitted jobs. 105 @return: a list of jobs with their cluster and job id 106 in the condor format 107 None if no jobs have been submitted or if submission failed. 108 """ 109 return self.job_id
110
111 - def GetProcNum(self):
112 return self.proc
113
114 - def SetProcNum(self,proc):
115 self.proc = proc
116
117 - def SetJobId(self,jobid):
118 """ 119 Set the job id for the submitted job. 120 @return: id of job 121 in the condor format 122 None if no jobs have been submitted or if submission failed. 123 """ 124 self.job_id = jobid
125
126 - def SetDatabaseId(self,db_id):
127 """ 128 Set the database job id for the submitted job. 129 """ 130 self.db_id = db_id
131
132 - def GetDatabaseId(self):
133 """ 134 Return the database job id for the submitted job. 135 @return: database ID of job 136 None if job is not in database 137 """ 138 return self.db_id
139
140 - def SetInitialdir(self,path):
141 """ 142 Define the directory where jobs will be submitted from. 143 @param path: system path to directory 144 """ 145 self.initialdir = path
146
147 - def GetInitialdir(self):
148 """ 149 Get the directory where jobs will be submitted from. 150 @return: path to directory 151 """ 152 return self.initialdir
153
154 - def GetRootDir(self):
155 """ 156 Get the root directory of installation 157 @return: path to directory 158 """ 159 return self.execdir
160
161 - def SetRootDir(self,path):
162 """ 163 Define the root directory of installation 164 @param path: system path to directory 165 """ 166 self.execdir = path
167 168
169 - def FileName(self):
170 """ 171 Get the name of this object's serialized file 172 @return: path to directory 173 """ 174 return os.path.join(self.initialdir,'job_%s.qobj' % str(self.job_id))
175
176 - def SetOutputURL(self,url):
177 """ 178 Define the url where data will be copied to. 179 @param url: target url for data 180 """ 181 self.outputurl = url
182
183 - def GetOutputURL(self):
184 """ 185 Get the directory where data will be copied to. 186 @return: path to directory 187 """ 188 return self.outputurl
189
190 - def SetExecutable(self,executable):
191 """ 192 Define the executable file that will be run on condor 193 @param executable: path to executable file. 194 """ 195 self.executable = executable
196
197 - def GetExecutable(self):
198 """ 199 @return: the path to the executable (if set) that will be run on 200 cluster. 201 """ 202 return self.executable
203
204 - def SetPostCheckScript(self,script):
205 """ 206 Define the executable file that will be run after job complets to 207 check if job completed successfully 208 """ 209 self.post_check_script = script
210
211 - def GetPostCheckScript(self):
212 """ 213 @return: the path to the post run check executable (if set) 214 """ 215 return self.post_check_script
216 217
218 - def SetMainScript(self,script_path):
219 """ 220 Set the path to the main script to run 221 @param script_path: script to run 222 """ 223 self.mainscript = script_path
224 225
226 - def GetMainScript(self):
227 """ 228 Get the path to the main script to run 229 @return: script to run 230 """ 231 return self.mainscript
232 233
234 - def AddArgument(self,arg):
235 """ 236 Add a parameter to be passed to executable. 237 Note: Consecutive calls to this method will append parameters in the 238 order in which they were added. 239 240 @param arg: argument to be passed to executable at runtime 241 """ 242 self.arguments.append(arg)
243 244
245 - def AddEnv(self,var,val):
246 """ 247 Add an environment variable. 248 """ 249 self.env[var] = val
250
251 - def GetEnv(self):
252 return self.env
253
254 - def format_arg(self,tuple):
255 """ 256 format runtime cmdline argument 257 @param tuple: tuple in the form (argname,argval) 258 """ 259 if len(tuple[0]) > 1: 260 return "--%s=%s" % tuple 261 else: 262 return "-%s %s" % tuple
263
264 - def GetArguments(self):
265 """ 266 @return: a list of arguments that will be passed to the executable 267 """ 268 return self.arguments
269
270 - def AddBatchOpt(self,optname,optval):
271 """ 272 Add a batchsystem option to use in submit script 273 274 @param optname: name of option be passed to executable at runtime 275 @param optval: value of option be passed to executable at runtime 276 """ 277 self.batchopts[optname] = optval
278
279 - def GetBatchOpts(self):
280 return self.batchopts
281 282
283 - def AddArgOption(self,optname,optval=""):
284 """ 285 Add a options to be passed to executable. 286 Similar to 'AddArgument' but options will preceed arguemtens 287 For example: executable <option(s)> <argument(s)> 288 Note: Consecutive calls to this method will append options in the 289 order in which they were added. 290 291 @param optname: name of option be passed to executable at runtime 292 @param optval: value of option be passed to executable at runtime 293 """ 294 self.argopts[optname] = optval
295
296 - def GetArgOptions(self):
297 """ 298 @return: a list of options passed to the executable 299 """ 300 return map( self.format_arg, zip(self.argopts.keys(), self.argopts.values()))
301
302 - def GetArgOpt(self,optname):
303 """ 304 Get value of option to be passed to executable. 305 @param optname: name of argument to be passed to executable at runtime 306 """ 307 if optname in self.argopts.keys(): 308 return self.argopts[optname] 309 else: 310 return ""
311
312 - def AddInputFile(self,filename):
313 """ 314 Add an input file that Condor is to transfer to a remote 315 site to set up the execution environment for the job before it is run. 316 These files are placed in the same temporary working directory as the 317 job's executable. At this time, directories can not be transferred in 318 this way. 319 320 @param filename: relative to initial directory if set, 321 otherwise subtmit directory. 322 """ 323 self.input_files.append(filename)
324
325 - def AddOutputFile(self,filename):
326 """ 327 Add an output file that the cluster is to transfer from a remote 328 site on completion of run. 329 These files are placed in the same temporary working directory as the 330 job's executable. At this time, directories can not be transferred in 331 this way. 332 333 @param filename: relative to initial directory if set, 334 otherwise subtmit directory. 335 """ 336 self.output_files.append(filename)
337
338 - def GetInputFiles(self):
339 """ 340 @return: the list of input files that Condor is to transfer to a remote 341 site to set up the execution environment for the job before it is run. 342 """ 343 return self.input_files
344
345 - def GetOutputFiles(self):
346 """ 347 @return: the list of output files that the cluster is to transfer from a remote 348 site after execution. 349 """ 350 return self.output_files
351
352 - def SetQueue(self,nproc):
353 """ 354 @param nproc: number of jobs to enqueue 355 """ 356 self.nproc = nproc
357
358 - def GetQueue(self):
359 """ 360 @return: number of jobs enqueueued 361 """ 362 return self.nproc
363
364 - def SetLogFile(self,filename):
365 """ 366 @param filename: path to logfile where condor will write the 367 state of a given job in the cluster 368 """ 369 self.logfile = filename
370
371 - def SetOutputFile(self,filename):
372 """ 373 @param filename: path to file where condor will write the output 374 generated through stdout by the job 375 """ 376 self.outputfile = filename
377
378 - def SetErrorFile(self,filename):
379 """ 380 @param filename: path to file where condor will write the output 381 generated through stderr by the job 382 """ 383 self.errorfile = filename
384
385 - def GetLogFile(self):
386 """ 387 @return: path to logfile where condor will write the state of a given 388 job in the cluster 389 """ 390 return self.logfile
391
392 - def GetOutputFile(self):
393 """ 394 @return: path to file where condor will write the output generated 395 through stdout by the job 396 """ 397 return self.outputfile
398
399 - def GetErrorFile(self):
400 """ 401 @return: path to file where condor will write the output generated 402 through stderr by the job 403 """ 404 return self.errorfile
405
406 - def SetSimDBKey(self,key):
407 """ 408 Set the unique key for current configuration on production database 409 """ 410 self.simdbkey = key
411
412 - def GetSimDBKey(self):
413 """ 414 Get the unique key for current configuration from production database 415 """ 416 return self.simdbkey
417
418 - def SetSubmitHost(self,host):
419 """ 420 Set the hostname of the submit node 421 """ 422 self.submithost = host
423 424
425 - def GetSubmitHost(self):
426 """ 427 Get the hostname of the submit node 428 """ 429 return self.submithost
430 431
432 - def SetStatus(self,status):
433 """ 434 (1=enqueued,2=runnig,3=completed,4=failed) 435 """ 436 self.jobstatus = status
437
438 - def GetStatus(self):
439 return self.jobstatus
440
441 - def SetRealTime(self,time):
442 self.realtime = time
443
444 - def GetRealTime(self):
445 return self.realtime
446
447 - def SetUserTime(self,time):
448 self.usertime = time
449
450 - def GetUserTime(self):
451 return self.usertime
452 453
454 - def SetSysTime(self,time):
455 self.systime = time
456
457 - def GetSysTime(self):
458 return self.systime
459
460 - def GetPrio(self):
461 """ 462 Get job priority 463 """ 464 return self.prio
465
466 - def SetPrio(self,prio):
467 """ 468 Set job priority 469 """ 470 self.prio = prio
471
472 - def Pickle(self,filename=None):
473 """ 474 Serialize iGrid object and write it to a file 475 """ 476 if not filename: 477 filename = self.Filename() 478 if not filename.startswith("/"): 479 filename = os.path.join(self.GetInitialdir(),filename) 480 file = open(filename,"w") 481 cPickle.dump(self,file) 482 file.close()
483
484 - def unPickle(self,filename):
485 """ 486 unSerialize iGrid object and write it to a file 487 """ 488 file = open(filename, "r") 489 job = cPickle.load(file) 490 file.close() 491 return job
492
493 - def Write(self,file,txt,parse=True):
494 """ 495 Format and write string to file 496 """ 497 if parse: 498 print >> file,self.parse_params(txt) 499 else: 500 print >> file,txt
501 502
503 - def GetSteering(self):
504 return self.steering
505
506 - def AddSteering(self,steering):
507 self.steering = steering
508
509 - def AddParser(self,parser):
510 self.parser = parser
511 512
513 - def parse_params(self,src):
514 """ 515 Replace key with value on a string. This can be anything 516 but tipically a dollar sign token such as $args(value) 517 @param src: string contaning one or more token keys to replace 518 """ 519 src = self.parser.parse(src) 520 521 for k in self.params.keys(): 522 src= src.replace('$(%s)' % k,str(self.params[k])) 523 524 src = src.replace('$(Process)',str(self.proc)) 525 return src
526