1
2
3 """
4 Interface for configuring and submitting jobs on a computing cluster.
5 use this class directly. Instead use one of the implementations
6 that inherit from this class.
7
8 copyright (c) 2005 the icecube collaboration
9
10 @version: $Revision: $
11 @date: $Date: $
12 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
13 @todo: implement more functionality and features
14 """
15
16 import os
17 import re
18 import sys
19 import math
20 import dircache
21 import time
22 import string
23 import shutil
24 import cPickle
25 import logging
26 import logging.config
27 from iceprod.core import metadata
28 from iceprod.core.dataclasses import Steering
29 from iceprod.core.lex import ExpParser
30
31 logger = logging.getLogger('i3Job')
32
33
35 """
36 This class represents a generic job a distributed system.
37 """
38
40 self.name = 'iceprod'
41 self.executable = None
42 self.params = {}
43 self.input_files = []
44 self.output_files = []
45 self.mainscript = ""
46 self.batchopts = {}
47 self.arguments = []
48 self.nproc = 1
49 self.dataset_id = 0
50 self.proc = 0
51 self.host = None
52 current_time = time.asctime().replace(" ","_")
53 self.logfile = "%s_$(Proc).log" % current_time
54 self.outputfile = "%s_$(Proc).output" % current_time
55 self.errorfile = "%s_$(Proc).error" % current_time
56 self.submit_status = ""
57 self.initialdir = None
58 self.outputurl = None
59 self.job_id = -1
60 self.cluster_id = None
61 self.simdbkey = None
62 self.host = None
63 self.port = None
64 self.url = None
65 self.metadata = None
66 self.post_check_script = None
67 self.env = {}
68 self.jobstatus = 0
69 self.argopts = {'nproc':self.nproc,'procnum':self.proc,'dataset':self.dataset_id}
70 self.parser = ExpParser(self.argopts,Steering())
71 self.steering = None
72 self.db_id = None
73
74
75
76
77 self.realtime = 0.0
78 self.usertime = 0.0
79 self.systime = 0.0
80
82 return '%s.%d.%d' % (self.name,self.dataset_id,self.proc)
83
85 """
86 Get the host where this job is currently running
87 """
88 return self.host
89
91 self.dataset_id = dataset
92
94 return self.dataset_id
95
97 """
98 Set the host where this job is currently running
99 """
100 self.host = host
101
103 """
104 Get the cluster AND job id for the submitted jobs.
105 @return: a list of jobs with their cluster and job id
106 in the condor format
107 None if no jobs have been submitted or if submission failed.
108 """
109 return self.job_id
110
113
116
118 """
119 Set the job id for the submitted job.
120 @return: id of job
121 in the condor format
122 None if no jobs have been submitted or if submission failed.
123 """
124 self.job_id = jobid
125
127 """
128 Set the database job id for the submitted job.
129 """
130 self.db_id = db_id
131
133 """
134 Return the database job id for the submitted job.
135 @return: database ID of job
136 None if job is not in database
137 """
138 return self.db_id
139
141 """
142 Define the directory where jobs will be submitted from.
143 @param path: system path to directory
144 """
145 self.initialdir = path
146
148 """
149 Get the directory where jobs will be submitted from.
150 @return: path to directory
151 """
152 return self.initialdir
153
155 """
156 Get the root directory of installation
157 @return: path to directory
158 """
159 return self.execdir
160
162 """
163 Define the root directory of installation
164 @param path: system path to directory
165 """
166 self.execdir = path
167
168
170 """
171 Get the name of this object's serialized file
172 @return: path to directory
173 """
174 return os.path.join(self.initialdir,'job_%s.qobj' % str(self.job_id))
175
177 """
178 Define the url where data will be copied to.
179 @param url: target url for data
180 """
181 self.outputurl = url
182
184 """
185 Get the directory where data will be copied to.
186 @return: path to directory
187 """
188 return self.outputurl
189
191 """
192 Define the executable file that will be run on condor
193 @param executable: path to executable file.
194 """
195 self.executable = executable
196
198 """
199 @return: the path to the executable (if set) that will be run on
200 cluster.
201 """
202 return self.executable
203
204 - def SetPostCheckScript(self,script):
205 """
206 Define the executable file that will be run after job complets to
207 check if job completed successfully
208 """
209 self.post_check_script = script
210
212 """
213 @return: the path to the post run check executable (if set)
214 """
215 return self.post_check_script
216
217
218 - def SetMainScript(self,script_path):
219 """
220 Set the path to the main script to run
221 @param script_path: script to run
222 """
223 self.mainscript = script_path
224
225
226 - def GetMainScript(self):
227 """
228 Get the path to the main script to run
229 @return: script to run
230 """
231 return self.mainscript
232
233
235 """
236 Add a parameter to be passed to executable.
237 Note: Consecutive calls to this method will append parameters in the
238 order in which they were added.
239
240 @param arg: argument to be passed to executable at runtime
241 """
242 self.arguments.append(arg)
243
244
246 """
247 Add an environment variable.
248 """
249 self.env[var] = val
250
253
263
265 """
266 @return: a list of arguments that will be passed to the executable
267 """
268 return self.arguments
269
271 """
272 Add a batchsystem option to use in submit script
273
274 @param optname: name of option be passed to executable at runtime
275 @param optval: value of option be passed to executable at runtime
276 """
277 self.batchopts[optname] = optval
278
280 return self.batchopts
281
282
284 """
285 Add a options to be passed to executable.
286 Similar to 'AddArgument' but options will preceed arguemtens
287 For example: executable <option(s)> <argument(s)>
288 Note: Consecutive calls to this method will append options in the
289 order in which they were added.
290
291 @param optname: name of option be passed to executable at runtime
292 @param optval: value of option be passed to executable at runtime
293 """
294 self.argopts[optname] = optval
295
297 """
298 @return: a list of options passed to the executable
299 """
300 return map( self.format_arg, zip(self.argopts.keys(), self.argopts.values()))
301
303 """
304 Get value of option to be passed to executable.
305 @param optname: name of argument to be passed to executable at runtime
306 """
307 if optname in self.argopts.keys():
308 return self.argopts[optname]
309 else:
310 return ""
311
324
326 """
327 Add an output file that the cluster is to transfer from a remote
328 site on completion of run.
329 These files are placed in the same temporary working directory as the
330 job's executable. At this time, directories can not be transferred in
331 this way.
332
333 @param filename: relative to initial directory if set,
334 otherwise subtmit directory.
335 """
336 self.output_files.append(filename)
337
344
346 """
347 @return: the list of output files that the cluster is to transfer from a remote
348 site after execution.
349 """
350 return self.output_files
351
353 """
354 @param nproc: number of jobs to enqueue
355 """
356 self.nproc = nproc
357
359 """
360 @return: number of jobs enqueueued
361 """
362 return self.nproc
363
365 """
366 @param filename: path to logfile where condor will write the
367 state of a given job in the cluster
368 """
369 self.logfile = filename
370
372 """
373 @param filename: path to file where condor will write the output
374 generated through stdout by the job
375 """
376 self.outputfile = filename
377
379 """
380 @param filename: path to file where condor will write the output
381 generated through stderr by the job
382 """
383 self.errorfile = filename
384
386 """
387 @return: path to logfile where condor will write the state of a given
388 job in the cluster
389 """
390 return self.logfile
391
393 """
394 @return: path to file where condor will write the output generated
395 through stdout by the job
396 """
397 return self.outputfile
398
400 """
401 @return: path to file where condor will write the output generated
402 through stderr by the job
403 """
404 return self.errorfile
405
407 """
408 Set the unique key for current configuration on production database
409 """
410 self.simdbkey = key
411
413 """
414 Get the unique key for current configuration from production database
415 """
416 return self.simdbkey
417
419 """
420 Set the hostname of the submit node
421 """
422 self.submithost = host
423
424
426 """
427 Get the hostname of the submit node
428 """
429 return self.submithost
430
431
433 """
434 (1=enqueued,2=runnig,3=completed,4=failed)
435 """
436 self.jobstatus = status
437
439 return self.jobstatus
440
443
446
449
452
453
456
459
461 """
462 Get job priority
463 """
464 return self.prio
465
467 """
468 Set job priority
469 """
470 self.prio = prio
471
472 - def Pickle(self,filename=None):
473 """
474 Serialize iGrid object and write it to a file
475 """
476 if not filename:
477 filename = self.Filename()
478 if not filename.startswith("/"):
479 filename = os.path.join(self.GetInitialdir(),filename)
480 file = open(filename,"w")
481 cPickle.dump(self,file)
482 file.close()
483
485 """
486 unSerialize iGrid object and write it to a file
487 """
488 file = open(filename, "r")
489 job = cPickle.load(file)
490 file.close()
491 return job
492
493 - def Write(self,file,txt,parse=True):
494 """
495 Format and write string to file
496 """
497 if parse:
498 print >> file,self.parse_params(txt)
499 else:
500 print >> file,txt
501
502
505
507 self.steering = steering
508
511
512
514 """
515 Replace key with value on a string. This can be anything
516 but tipically a dollar sign token such as $args(value)
517 @param src: string contaning one or more token keys to replace
518 """
519 src = self.parser.parse(src)
520
521 for k in self.params.keys():
522 src= src.replace('$(%s)' % k,str(self.params[k]))
523
524 src = src.replace('$(Process)',str(self.proc))
525 return src
526