1
2
3 """
4 Interface for configuring and submitting jobs on a computing cluster.
5 use this class directly. Instead use one of the implementations
6 that inherit from this class.
7
8 copyright (c) 2005 the icecube collaboration
9
10 @version: $Revision: $
11 @date: $Date: $
12 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
13 @todo: implement more functionality and features
14 """
15
16 import os
17 import re
18 import sys
19 import math
20 import dircache
21 import time
22 import string
23 import shutil
24 import cPickle
25 import logging
26 import logging.config
27 from iceprod.core import metadata
28 from iceprod.core.dataclasses import Steering
29 from iceprod.core.lex import ExpParser
30
31 logger = logging.getLogger('i3Job')
32
33
35 """
36 This class represents a generic job a distributed system.
37 """
38
39 name = 'iceprod'
40 executable = None
41 params = {}
42 input_files = []
43 output_files = []
44 mainscript = ""
45 batchopts = {}
46 arguments = []
47 nproc = 1
48 dataset_id = 0
49 proc = 0
50 host = None
51 current_time = time.asctime().replace(" ","_").replace(":",".")
52 config_file = "config.xml"
53 logfile = None
54 log = ""
55 outputfile = None
56 out = ""
57 errorfile = None
58 err = ""
59 submit_status = ""
60 submit_msg = ""
61 initialdir = None
62 outputurl = None
63 job_id = -1
64 cluster_id = None
65 simdbkey = None
66 host = None
67 port = None
68 url = None
69 metadata = None
70 post_check_script = None
71 env = {}
72 jobstatus = 0
73 argopts = {'nproc':nproc,'procnum':proc,'dataset':dataset_id}
74 parser = ExpParser(argopts,Steering())
75 steering = None
76 db_id = None
77 passkey = None
78 prio = 0
79
80
81
82
83 realtime = 0.0
84 usertime = 0.0
85 systime = 0.0
86
135
137 values = dict()
138 """
139
140 job.SetDatasetId(j['dataset_id'])
141 job.SetDatabaseId(j['job_id'])
142 job.jobstatus = j['status']
143 job.proc = j['queue_id']
144 job.prio = j['priority']
145 job.job_id = j['grid_queue_id']
146 job.passkey = j['passkey']
147 job.initialdir = j['submitdir']
148 job.AddArgOption("key",j['passkey'])
149 job.SetLogFile( "%s/%s.log" % (j['submitdir'],job.Prefix() ))
150 job.SetOutputFile( "%s/%s.out" % (j['submitdir'],job.Prefix() ))
151 job.SetErrorFile( "%s/%s.err" % (j['submitdir'],job.Prefix() ))
152 """
153
156
158 """
159 Get the host where this job is currently running
160 """
161 return self.host
162
165
168
170 """
171 Set the host where this job is currently running
172 """
173 self.host = host
174
176 """
177 Get the cluster AND job id for the submitted jobs.
178 @return: a list of jobs with their cluster and job id
179 in the condor format
180 None if no jobs have been submitted or if submission failed.
181 """
182 return self.job_id
183
186
189
191 """
192 Set the job id for the submitted job.
193 @return: id of job
194 in the condor format
195 None if no jobs have been submitted or if submission failed.
196 """
197 self.job_id = jobid
198
200 """
201 Set the database job id for the submitted job.
202 """
203 self.db_id = db_id
204
206 """
207 Return the database job id for the submitted job.
208 @return: database ID of job
209 None if job is not in database
210 """
211 return self.db_id
212
214 """
215 Define the directory where jobs will be submitted from.
216 @param path: system path to directory
217 """
218 self.initialdir = path
219
221 """
222 Get the directory where jobs will be submitted from.
223 @return: path to directory
224 """
225 return self.initialdir
226
228 """
229 Define the XML config file
230 @param path: system path to file
231 """
232 self.config_file = file
233
235 """
236 Get the XML config file
237 """
238 return self.config_file
239
240
242 """
243 Get the root directory of installation
244 @return: path to directory
245 """
246 return self.execdir
247
249 """
250 Define the root directory of installation
251 @param path: system path to directory
252 """
253 self.execdir = path
254
255
257 """
258 Get the name of this object's serialized file
259 @return: path to directory
260 """
261 return os.path.join(self.initialdir,'job_%s.qobj' % str(self.job_id))
262
264 """
265 Define the url where data will be copied to.
266 @param url: target url for data
267 """
268 self.outputurl = url
269
271 """
272 Get the directory where data will be copied to.
273 @return: path to directory
274 """
275 return self.outputurl
276
278 """
279 Define the executable file that will be run on condor
280 @param executable: path to executable file.
281 """
282 self.executable = executable
283
285 """
286 @return: the path to the executable (if set) that will be run on
287 cluster.
288 """
289 return self.executable
290
291 - def SetPostCheckScript(self,script):
292 """
293 Define the executable file that will be run after job complets to
294 check if job completed successfully
295 """
296 self.post_check_script = script
297
299 """
300 @return: the path to the post run check executable (if set)
301 """
302 return self.post_check_script
303
304
305 - def SetMainScript(self,script_path):
306 """
307 Set the path to the main script to run
308 @param script_path: script to run
309 """
310 self.mainscript = script_path
311
312
313 - def GetMainScript(self):
314 """
315 Get the path to the main script to run
316 @return: script to run
317 """
318 return self.mainscript
319
320
322 """
323 Add a parameter to be passed to executable.
324 Note: Consecutive calls to this method will append parameters in the
325 order in which they were added.
326
327 @param arg: argument to be passed to executable at runtime
328 """
329 self.arguments.append(arg)
330
331
333 """
334 Add an environment variable.
335 """
336 self.env[var] = val
337
340
355
357 """
358 @return: a list of arguments that will be passed to the executable
359 """
360 return self.arguments
361
363 """
364 Add a batchsystem option to use in submit script
365
366 @param optname: name of option be passed to executable at runtime
367 @param optval: value of option be passed to executable at runtime
368 """
369 self.batchopts[optname] = optval
370
373
374
376 """
377 Add a options to be passed to executable.
378 Similar to 'AddArgument' but options will preceed arguemtens
379 For example: executable <option(s)> <argument(s)>
380 Note: Consecutive calls to this method will append options in the
381 order in which they were added.
382
383 @param optname: name of option be passed to executable at runtime
384 @param optval: value of option be passed to executable at runtime
385 """
386 self.argopts[optname] = optval
387
393
395 """
396 Get value of option to be passed to executable.
397 @param optname: name of argument to be passed to executable at runtime
398 """
399 if optname in self.argopts.keys():
400 return self.argopts[optname]
401 else:
402 return ""
403
416
418 """
419 Add an output file that the cluster is to transfer from a remote
420 site on completion of run.
421 These files are placed in the same temporary working directory as the
422 job's executable. At this time, directories can not be transferred in
423 this way.
424
425 @param filename: relative to initial directory if set,
426 otherwise subtmit directory.
427 """
428 self.output_files.append(filename)
429
436
438 """
439 @return: the list of output files that the cluster is to transfer from a remote
440 site after execution.
441 """
442 return self.output_files
443
445 """
446 @param nproc: number of jobs to enqueue
447 """
448 self.nproc = nproc
449
451 """
452 @return: number of jobs enqueueued
453 """
454 return self.nproc
455
457 """
458 @param filename: path to logfile where condor will write the
459 state of a given job in the cluster
460 """
461 self.logfile = filename
462
464 """
465 @param filename: path to file where condor will write the output
466 generated through stdout by the job
467 """
468 self.outputfile = filename
469
471 """
472 @param filename: path to file where condor will write the output
473 generated through stderr by the job
474 """
475 self.errorfile = filename
476
478 """
479 @return: path to logfile where condor will write the state of a given
480 job in the cluster
481 """
482 if not self.logfile:
483 self.SetLogFile( "%s.log" % self.Prefix() )
484 return self.logfile
485
487 """
488 @return: path to file where condor will write the output generated
489 through stdout by the job
490 """
491 if not self.outputfile:
492 self.SetOutputFile( "%s.out" % self.Prefix() )
493 return self.outputfile
494
496 """
497 @return: path to file where condor will write the output generated
498 through stderr by the job
499 """
500 if not self.errorfile:
501 self.SetErrorFile( "%s.err" % self.Prefix() )
502 return self.errorfile
503
505 """
506 Set the unique key for current configuration on production database
507 """
508 self.simdbkey = key
509
511 """
512 Get the unique key for current configuration from production database
513 """
514 return self.simdbkey
515
517 """
518 Set the hostname of the submit node
519 """
520 self.submithost = host
521
522
524 """
525 Get the hostname of the submit node
526 """
527 return self.submithost
528
529
531 """
532 (1=enqueued,2=runnig,3=completed,4=failed)
533 """
534 self.jobstatus = status
535
538
541
544
547
550
551
554
557
559 """
560 Get job priority
561 """
562 return self.prio
563
565 """
566 Set job priority
567 """
568 self.prio = prio
569
570 - def Pickle(self,filename=None):
571 """
572 Serialize iGrid object and write it to a file
573 """
574 if not filename:
575 filename = self.Filename()
576 if not filename.startswith("/"):
577 filename = os.path.join(self.GetInitialdir(),filename)
578 file = open(filename,"w")
579 cPickle.dump(self,file)
580 file.close()
581
583 """
584 unSerialize iGrid object and write it to a file
585 """
586 file = open(filename, "r")
587 job = cPickle.load(file)
588 file.close()
589 return job
590
591 - def Write(self,file,txt,parse=True):
592 """
593 Format and write string to file
594 """
595 if parse:
596 print >> file,self.parse_params(txt)
597 else:
598 print >> file,txt
599
600
603
606
609
610
612 """
613 Replace key with value on a string. This can be anything
614 but tipically a dollar sign token such as $args(value)
615 @param src: string contaning one or more token keys to replace
616 """
617 src = self.parser.parse(src)
618
619 for k in self.params.keys():
620 src= src.replace('$(%s)' % k,str(self.params[k]))
621
622 src = src.replace('$(Process)',str(self.proc))
623 return src
624
627 i3Job.__init__(self)
628 self.name = 'iceprod.task'
629 self.task_id = 0
630 self.task_name = 'task'
631 self.idx = 0
632 self.iter = -1
633
635 """
636 Set the id for the submitted task.
637 @return: id of task
638 """
639 self.task_id = tid
640
642 """
643 Get the ID for the submitted tasks.
644 @return: ID of task
645 """
646 return self.task_id
647