1
2
3 """
4 Interface for configuring and submitting jobs on a computing cluster.
5 use this class directly. Instead use one of the implementations
6 that inherit from this class.
7
8 copyright (c) 2005 the icecube collaboration
9
10 @version: $Revision: $
11 @date: $Date: $
12 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
13 @todo: implement more functionality and features
14 """
15
16 import os
17 import re
18 import sys
19 import math
20 import dircache
21 import os.path
22 import time
23 import string
24 import shutil
25 import glob
26 import cPickle
27 import logging
28 import logging.config
29 import commands
30 import copy
31 import iceprod.core.exe
32 import iceprod.core.inventory
33 from iceprod.core import metadata
34 from iceprod.core.dataclasses import Steering
35 from iceprod.core.lex import XMLSummaryParser
36 from iceprod.core.inventory import FileInventory
37 from iceprod.core import functions
38 from iceprod.modules.gsiftp import *
39
40 logger = logging.getLogger('iGrid')
41
42
43
45 """
46 This class represents a generic job a distributed system.
47 """
48 CopyStatusEnum = {'OK':1,'NOTREADY':0,'FAILED':0}
49
51 self.executable = None
52 self.nproc = 1
53 self.proc = 0
54 self.params = {}
55 current_time = time.asctime().replace(" ","_")
56 self.logfile = "%s_$(Proc).log" % current_time
57 self.outputfile = "%s_$(Proc).output" % current_time
58 self.errorfile = "%s_$(Proc).error" % current_time
59 self.submit_status = ""
60 self.job_ids = []
61 self.cluster_id = None
62 self.jobs_submitted = -1
63 self.simdbkey = None
64 self.host = None
65 self.port = None
66 self.url = None
67 self.metadata = None
68 self.post_check_script = None
69 self.initialdir = None
70 self.env = {}
71 self.steering = None
72 self.argopts = {}
73 self.jobs = []
74 self.cpobj = URLCopy()
75 self.suffix = ".submit"
76 self.i3monitordb = None
77
80
82 logger.debug('copying %s >>> %s' % (file,url))
83
84 if url.startswith("file:") and file.startswith("file:"):
85 dest = url.replace("file:","")
86 file = file.replace("file:","")
87 if not os.path.exists(dest): os.makedirs(dest)
88 retval = os.system('mv %s %s' % (file,dest) )
89 if not os.path.exists(dest):
90 raise Exception, "Failed to copy file to target '%s'" % url
91 elif url.startswith("ssh:"):
92 dest = url.replace("ssh:","")
93 file = file.replace("file:","")
94 if re.match(r'[a-zA-Z0-9\-\_\.]*@?[a-zA-Z0-9\-\_\.]+:',url):
95 retval = os.system('scp %s %s' % (file,dest) )
96 else:
97 raise Exception, "malformated SSH url '%s'. " % dest
98 else:
99 urlcp = copy.deepcopy(self.cpobj)
100 urlcp.SetParameter('source',file)
101 urlcp.SetParameter('destination',url)
102 urlcp.SetParameter('emulate',False)
103 stats = {}
104 retval = urlcp.Execute(stats)
105
106 if not retval == 0:
107 raise Exception, "Failed to copy file to target '%s'" % url
108 return retval
109
110
112 """
113 Add new job to queue
114 """
115 self.jobs.append(job)
116
118 """
119 remove and return new job from queue
120 """
121 self.jobs.pop()
122
124 """
125 return job from queue
126 """
127 self.jobs[i]
128
130 """
131 Get list of jobs
132 """
133 return self.jobs
134
136 """
137 Add an environment variable.
138 """
139 self.env[var] = val
140
143
153
155 """
156 Define the directory where jobs will be submitted from.
157 @param path: system path to directory
158 """
159 self.initialdir = path
160
162 """
163 Get the directory where jobs will be submitted from.
164 @return: path to directory
165 """
166 return self.initialdir
167
169 """
170 Add a reference to the steering configuration
171 @param steering: a Steering object
172 """
173 self.steering = steering
174
176 """
177 Get the reference to the steering configuration
178 @return: the Steering object
179 """
180 return self.steering
181
183 """
184 Get the cluster AND job id for the submitted jobs.
185 @return: a list of jobs with their cluster and job id
186 in the condor format
187 None if no jobs have been submitted or if submission failed.
188 """
189 return self.job_ids
190
192 """
193 Get the cluster AND job id for the submitted jobs.
194 @return: a list of jobs with their cluster and job id
195 in the condor format
196 None if no jobs have been submitted or if submission failed.
197 """
198 return self.cluster_id
199
202
205
221
222
224 """
225 Get the root directory of installation
226 @return: path to directory
227 """
228 return self.execdir
229
231 """
232 Define the root directory of installation
233 @param path: system path to directory
234 """
235 self.execdir = path
236
237
239 """
240 Get the name of this object's serialized file
241 @return: path to directory
242 """
243 return os.path.join(self.initialdir,str(self.cluster_id)+'.qobj')
244
245
247 self.i3monitordb = db
249 return self.i3monitordb
250
252 """
253 @param nproc: number of jobs to enqueue
254 """
255 self.nproc = nproc
256
258 """
259 @return: number of jobs enqueueued
260 """
261 return self.nproc
262
263
265 """
266 Set the unique key for current configuration on production database
267 """
268 self.simdbkey = key
269
271 """
272 Get the unique key for current configuration from production database
273 """
274 return self.simdbkey
275
277 """
278 Set the hostname of the submit node
279 """
280 self.submithost = host
281
282
284 """
285 Get the hostname of the submit node
286 """
287 return self.submithost
288
290 """
291 Set the hostname of the soaptray server that the job was submitted to
292 """
293 self.host = host
294
295
297 """
298 Get the hostname of the soaptray server that the job was submitted to
299 """
300 return self.host
301
302
304 """
305 Set the port number of the submit node
306 """
307 self.port = port
308
310 """
311 Get the port number of the submit node
312 """
313 return self.port
314
316 """
317 Set the hostname of the submit node
318 """
319 self.url = url
320
322 """
323 Get the hostname of the submit node
324 """
325 return self.url
326
328 """
329 (0=init submission,1=runnig,2=completed,3=failed)
330 """
331 self.jobstatus = status
332
333
334 - def Submit(self,job,config_file):
335 """
336 Submit job/cluster to PBS
337
338 @param job: i3Job object
339 @param config_file: path to file were submit file will be written
340 """
341 self.submit_status = ''
342 self.WriteConfig(job,config_file)
343
344 cmd = "%s %s" % (self.enqueue_cmd,config_file)
345 status, self.submit_status = commands.getstatusoutput(cmd)
346 try:
347 id = self.get_id(self.submit_status)
348 job.SetJobId(id)
349 status = 0
350 if id < 0: status = 1
351 except Exception, e:
352 logger.error("Exception: " + str(e))
353 self.submit_status += "\nException: " + str(e)
354 status = 1
355
356 if len(self.job_ids) and not self.cluster_id:
357 self.cluster_id = self.job_ids[0]
358
359 return status,self.submit_status
360
361
363 """
364 Interface: Check status of job/cluster in queuing system.
365 """
366 return """This is just a prototype for a function and must be
367 implemented by child classes"""
368
370 """
371 Interface: Check status of job/cluster in queuing system.
372 """
373 return """This is just a prototype for a function and must be
374 implemented by child classes"""
375
377 """
378 Querie status of job on queue
379 """
380 if isinstance(jobs,list):
381 job_list = jobs
382 else:
383 job_list = [jobs]
384 for job in job_list:
385 job.SetStatus('?')
386 return 1
387
389 """
390 Interface: Remove active job/cluster from queuing system.
391 """
392 print """This is just a prototype for a function and must be
393 implemented by child classes"""
394 return -1
395
396 - def PostCopy(self,jobdict,target_url,maxtries=4):
397 """
398 Interface: Remove active job/cluster from queuing system.
399 """
400 initialdir = jobdict['submitdir']
401 dataset = jobdict['dataset_id']
402 proc = jobdict['queue_id']
403 completeset = True
404 inventoryfile = os.path.join(initialdir,iceprod.core.inventory.name)
405 copyok = 0
406 if not os.path.exists(inventoryfile):
407 raise Exception,"%d,%d: no output inventory found." % (dataset,proc)
408 inventory = FileInventory()
409 inventorylist = []
410 logger.debug( 'reading %s' % inventoryfile )
411 inventory.Read(inventoryfile)
412 for file in inventory.filelist:
413
414
415 source = file['source']
416 if source.startswith('file:'):
417 source = os.path.abspath(os.path.join(initialdir,basename(source.replace('file:',''))))
418 target = file['target']
419 if not os.path.exists( source ):
420 logger.error("%d.%d: missing data. in %s " % (dataset,proc,initialdir))
421 return self.CopyStatusEnum['NOTREADY']
422 inventorylist.append(source)
423 source = 'file:' + source
424 if self.urlcopy(source,target):
425 logger.error("failed to copy %s -> %s" % (source,target))
426 return self.CopyStatusEnum['FAILED']
427
428
429 if target_url:
430 inventorylist.extend(glob.glob(os.path.join(initialdir,'*.out')))
431 inventorylist.extend(glob.glob(os.path.join(initialdir,'*.err')))
432 inventorylist.extend(glob.glob(os.path.join(initialdir,'*.log')))
433 for file in inventorylist:
434 logger.debug( "copying %s %s" % (file,target_url))
435 if self.urlcopy(file,target_url):
436 logger.error("failed to copy %s locally" % file)
437 return self.CopyStatusEnum['FAILED']
438
439 return self.CopyStatusEnum['OK']
440
441
442 - def Clean(self,jobdict):
443 """
444 Interface: clean submit directory
445 """
446 logger.info('%(dataset_id)u.%(queue_id)u: cleaning dir %(submitdir)s' % jobdict)
447 dir = "%(submitdir)s"%jobdict
448 logger.debug(dir)
449 if os.path.exists(dir) and os.path.isdir(dir):
450 functions.removedirs(dir)
451
452 - def wait(self,job):
453 statsfile = "stats.dat"
454 statsfile = os.path.join(job.GetInitialdir(),statsfile)
455 while not os.path.exists(statsfile):
456 time.sleep(240)
457
458
459
460
461
463 """
464 Serialize iGrid object and write it to a file
465 """
466 file = open(self.FileName(), "w")
467 cPickle.dump(self,file)
468 file.close()
469
470
472 """
473 Add a classadd parameter to be included in submit file
474
475 @param name: of parameter to be set
476 @param value: to be bound to parameter (this can be a classadd
477 expression)
478 """
479 self.params[name] = value
480
482 """
483 @param name: name of parameter to retrieve
484 @return: the value bound to parameter given by name, None if parameter
485 has not been set.
486 """
487 if self.params.has_key(name):
488 return self.params[name]
489
491 """
492 @return: a list of parameter names that have been set.
493 """
494 return self.params.keys()
495
496
498 """
499 Add a options to be passed to executable.
500 Similar to 'AddArgument' but options will preceed arguemtens
501 For example: executable <option(s)> <argument(s)>
502 Note: Consecutive calls to this method will append options in the
503 order in which they were added.
504
505 @param optname: name of option be passed to executable at runtime
506 @param optval: value of option be passed to executable at runtime
507 """
508 self.argopts[optname] = optval
509
511 """
512 @return: a list of options passed to the executable
513 """
514 return map( self.format_arg, zip(self.argopts.keys(), self.argopts.values()))
515
517 """
518 Get value of option to be passed to executable.
519 @param optname: name of argument to be passed to executable at runtime
520 """
521 if optname in self.argopts.keys():
522 return self.argopts[optname]
523 else:
524 return ""
525
527 """
528 Suffix of submit script
529 """
530 return self.suffix
531
533 """
534 print usage/help info
535
536 @param arguments: cmdline args passed to program
537 """
538 print "Usage: %s <serialized iGrid object file>" % arguments[0]
539 print " "
540