1
2
3 """
4 Interface for configuring and submitting jobs on a computing cluster.
5 use this class directly. Instead use one of the implementations
6 that inherit from this class.
7
8 copyright (c) 2005 the icecube collaboration
9
10 @version: $Revision: $
11 @date: $Date: $
12 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
13 @todo: implement more functionality and features
14 """
15
16 import os
17 import re
18 import sys
19 import math
20 import dircache
21 import os.path
22 import time
23 import string
24 import shutil
25 import glob
26 import cPickle
27 import logging
28 import logging.config
29 import commands
30 import copy
31 import iceprod.core.exe
32 import iceprod.core.inventory
33 from iceprod.core import metadata
34 from iceprod.core.dataclasses import Steering
35 from iceprod.core.lex import XMLSummaryParser
36 from iceprod.core.inventory import FileInventory
37 from iceprod.core import functions
38 from iceprod.modules.gsiftp import *
39
40 logger = logging.getLogger('iGrid')
41
42
43
45 """
46 This class represents a generic job a distributed system.
47 """
48 CopyStatusEnum = {'OK':1,'NOTREADY':0,'FAILED':0}
49
79
80 - def QueueJobs(self,db,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0):
81 return db.QueueJobs(maxjobs,grid_id,jobs_at_once,fifo,debug)
82
85
87 logger.debug('copying %s >>> %s' % (file,url))
88
89 if url.startswith("/"): url = "file:"+url
90
91 if url.startswith("file:") and file.startswith("file:"):
92 dest = url.replace("file:","")
93 file = file.replace("file:","")
94 if not os.path.exists(dest): os.makedirs(dest)
95 retval = os.system('mv %s %s' % (file,dest) )
96 if not os.path.exists(dest):
97 raise Exception, "Failed to copy file to target '%s'" % url
98 elif url.startswith("ssh:"):
99 dest = url.replace("ssh:","")
100 file = file.replace("file:","")
101 if re.match(r'[a-zA-Z0-9\-\_\.]*@?[a-zA-Z0-9\-\_\.]+:',url):
102 retval = os.system('scp %s %s' % (file,dest) )
103 else:
104 raise Exception, "malformated SSH url '%s'. " % dest
105 else:
106 urlcp = self.cpobj
107 urlcp.SetParameter('source',file)
108 urlcp.SetParameter('destination',url)
109 urlcp.SetParameter('emulate',False)
110 stats = {}
111 retval = urlcp.Execute(stats)
112
113 if not retval == 0:
114 raise Exception, "Failed to copy file to target '%s'" % url
115 return retval
116
117
119 """
120 Add new job to queue
121 """
122 self.jobs.append(job)
123
125 """
126 remove and return new job from queue
127 """
128 self.jobs.pop()
129
131 """
132 return job from queue
133 """
134 self.jobs[i]
135
137 """
138 Get list of jobs
139 """
140 return self.jobs
141
143 """
144 Add an environment variable.
145 """
146 self.env[var] = val
147
150
160
162 """
163 Define the directory where jobs will be submitted from.
164 @param path: system path to directory
165 """
166 self.initialdir = path
167
169 """
170 Get the directory where jobs will be submitted from.
171 @return: path to directory
172 """
173 return self.initialdir
174
176 """
177 Add a reference to the steering configuration
178 @param steering: a Steering object
179 """
180 self.steering = steering
181
183 """
184 Get the reference to the steering configuration
185 @return: the Steering object
186 """
187 return self.steering
188
190 """
191 Get the cluster AND job id for the submitted jobs.
192 @return: a list of jobs with their cluster and job id
193 in the condor format
194 None if no jobs have been submitted or if submission failed.
195 """
196 return self.job_ids
197
199 """
200 Get the cluster AND job id for the submitted jobs.
201 @return: a list of jobs with their cluster and job id
202 in the condor format
203 None if no jobs have been submitted or if submission failed.
204 """
205 return self.cluster_id
206
209
212
228
229
231 """
232 Get the root directory of installation
233 @return: path to directory
234 """
235 return self.execdir
236
238 """
239 Define the root directory of installation
240 @param path: system path to directory
241 """
242 self.execdir = path
243
244
246 """
247 Get the name of this object's serialized file
248 @return: path to directory
249 """
250 return os.path.join(self.initialdir,str(self.cluster_id)+'.qobj')
251
252
254 self.i3monitordb = db
256 return self.i3monitordb
257
258
260 self.production = production
262 return self.production
263
265 """
266 @param nproc: number of jobs to enqueue
267 """
268 self.nproc = nproc
269
271 """
272 @return: number of jobs enqueueued
273 """
274 return self.nproc
275
276
278 """
279 Set the unique key for current configuration on production database
280 """
281 self.simdbkey = key
282
284 """
285 Get the unique key for current configuration from production database
286 """
287 return self.simdbkey
288
290 """
291 Set the hostname of the submit node
292 """
293 self.submithost = host
294
295
297 """
298 Get the hostname of the submit node
299 """
300 return self.submithost
301
303 """
304 Set the hostname of the soaptray server that the job was submitted to
305 """
306 self.host = host
307
308
310 """
311 Get the hostname of the soaptray server that the job was submitted to
312 """
313 return self.host
314
315
317 """
318 Set the port number of the submit node
319 """
320 self.port = port
321
323 """
324 Get the port number of the submit node
325 """
326 return self.port
327
329 """
330 Set the hostname of the submit node
331 """
332 self.url = url
333
335 """
336 Get the hostname of the submit node
337 """
338 return self.url
339
341 """
342 (0=init submission,1=runnig,2=completed,3=failed)
343 """
344 self.jobstatus = status
345
346
373
374
375
376
378 """
379 Submit job/cluster
380
381 @param cooke: cookie to store submit info to be return to submitter
382 """
383 self.submit_status = ''
384 status_sum = 0
385 cwdir = os.getcwd()
386
387 for job in self.jobs:
388 logger.info("submitting job %u.%u" %(job.GetDatasetId(),job.GetProcNum()))
389
390 os.chdir(job.GetInitialdir())
391 self.WriteConfig(job,job.config_file)
392 status, submit_status = self._submit(job)
393 status_sum += status
394 cookie.AddJobId(job.GetJobId())
395
396 if self.production:
397
398 if job.submit_status == 0:
399 self.i3monitordb.jobsubmitted(
400 job.GetDatasetId(), job.GetProcNum(),
401 job.GetInitialdir(), job.GetJobId())
402 else:
403 logger.error("failed to submit jobs:"+job.submit_msg)
404 self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3,
405 "failed to submit jobs:"+job.submit_msg)
406 os.chdir('/tmp')
407 self.CleanDir(job.GetInitialdir())
408
409 os.chdir(cwdir)
410
411 return status_sum,self.submit_status
412
413
415 """
416 Interface: Check status of job/cluster in queuing system.
417 """
418 return """This is just a prototype for a function and must be
419 implemented by child classes"""
420
422 """
423 Interface: Check status of job/cluster in queuing system.
424 """
425 return """This is just a prototype for a function and must be
426 implemented by child classes"""
427
429 """
430 Remove temporary directory where the current job(s) was
431 submitted from.
432 """
433 if os.path.exists(dir):
434 try:
435 os.removedirs(dir)
436 except OSError,e:
437 logger.error(e)
438
439
440
442 """
443 Querie status of job on queue
444 """
445 if isinstance(jobs,list):
446 job_list = jobs
447 else:
448 job_list = [jobs]
449 for job in job_list:
450 job.SetStatus('?')
451 return 1
452
454 return bool(re.match("[^/:]+://?.*$", path))
455
457 """
458 Interface: Remove active job/cluster from queuing system.
459 """
460 print """This is just a prototype for a function and must be
461 implemented by child classes"""
462 return -1
463
464 - def PostCopy(self,jobdict,target_url,maxtries=4):
465 """
466 Interface: Remove active job/cluster from queuing system.
467 """
468 initialdir = jobdict['submitdir']
469 dataset = jobdict['dataset_id']
470 proc = jobdict['queue_id']
471 completeset = True
472 inventoryfile = os.path.join(initialdir,iceprod.core.inventory.name)
473 copyok = 0
474 if not os.path.exists(inventoryfile):
475 raise Exception,"%d,%d: no output inventory found." % (dataset,proc)
476 inventory = FileInventory()
477 inventorylist = []
478 logger.debug( 'reading %s' % inventoryfile )
479 inventory.Read(inventoryfile)
480 for file in inventory.filelist:
481
482
483 source = file['source']
484 if source.startswith('file:'):
485 source = os.path.abspath(os.path.join(initialdir,basename(source.replace('file:',''))))
486 target = file['target']
487 if not os.path.exists( source ):
488 logger.error("%d.%d: missing data. in %s " % (dataset,proc,initialdir))
489 return self.CopyStatusEnum['NOTREADY']
490 source = 'file:' + source
491 urlcp = self.cpobj
492 urlcp.SetParameter('source',source)
493 urlcp.SetParameter('destination',target)
494 stats = {}
495 if file['track']:
496 retval = TrackURLCopy.Execute(urlcp,stats)
497 else:
498 retval = URLCopy.Execute(urlcp,stats)
499 if retval:
500 logger.error("failed to copy %s -> %s" % (source,target))
501 return self.CopyStatusEnum['FAILED']
502
503
504
505
506
507
508
509
510
511
512
513
514 return self.CopyStatusEnum['OK']
515
516
517 - def Clean(self,jobdict,force=False):
518 """
519 Interface: clean submit directory
520 """
521 dir = "%(submitdir)s"%jobdict
522 logger.debug(dir)
523 if os.path.exists(dir) and os.path.isdir(dir):
524 logger.info('%(dataset_id)u.%(queue_id)u: cleaning dir %(submitdir)s' % jobdict)
525 functions.removedirs(dir)
526
527 if jobdict.has_key("dagtemp"):
528 dagtemp = "%(dagtemp)s" % jobdict
529 if os.path.exists(dagtemp) and os.path.isdir(dagtemp):
530 functions.removedirs(dagtemp)
531 return True
532
533 - def wait(self,job):
534 statsfile = "stats.dat"
535 statsfile = os.path.join(job.GetInitialdir(),statsfile)
536 while not os.path.exists(statsfile):
537 time.sleep(240)
538
539 - def jobfinalize(self,dataset_id,job_id,job,status='OK',clear_errors=True):
540 """
541 Update status for job
542 @param dataset_id: dataset index
543 @param job_id: process number within dataset
544 """
545 return self.i3monitordb.jobfinalize(dataset_id,job_id,job,status,clear_errors)
546
547 - def reset_old_jobs(self,
548 grid_id,
549 maxidletime,
550 maxruntime,
551 maxsubmittime,
552 maxcopytime,
553 maxfailures=10,
554 maxevicttime=10,
555 keepalive=14400):
556 """
557 reset status of jobs that where queued but who's status
558 has not changed in more that maxtime minutes
559
560 @param grid_id: id of current cluster
561 @param maxruntime: maximum run time for jobs
562 @param maxsubmittime: maximum submit time for jobs
563 @param maxcopytime: maximum time for jobs to be in 'copying' state
564 @param maxfailures: maximum number of time a job is allowd to fail
565 @param keepalive: how often should server expect to hear from jobs
566 """
567 return self.i3monitordb.reset_old_jobs( grid_id, maxidletime, maxruntime, maxsubmittime, maxcopytime, maxfailures, maxevicttime, keepalive)
568
569
572
575
578
581
584
586 """
587 Serialize iGrid object and write it to a file
588 """
589 file = open(self.FileName(), "w")
590 cPickle.dump(self,file)
591 file.close()
592
593
595 """
596 Add a classadd parameter to be included in submit file
597
598 @param name: of parameter to be set
599 @param value: to be bound to parameter (this can be a classadd
600 expression)
601 """
602 self.params[name] = value
603
605 """
606 @param name: name of parameter to retrieve
607 @return: the value bound to parameter given by name, None if parameter
608 has not been set.
609 """
610 if self.params.has_key(name):
611 return self.params[name]
612
614 """
615 @return: a list of parameter names that have been set.
616 """
617 return self.params.keys()
618
619
621 """
622 Add a options to be passed to executable.
623 Similar to 'AddArgument' but options will preceed arguemtens
624 For example: executable <option(s)> <argument(s)>
625 Note: Consecutive calls to this method will append options in the
626 order in which they were added.
627
628 @param optname: name of option be passed to executable at runtime
629 @param optval: value of option be passed to executable at runtime
630 """
631 self.argopts[optname] = optval
632
638
640 """
641 Get value of option to be passed to executable.
642 @param optname: name of argument to be passed to executable at runtime
643 """
644 if optname in self.argopts.keys():
645 return self.argopts[optname]
646 else:
647 return ""
648
650 """
651 Suffix of submit script
652 """
653 return self.suffix
654
669
671 """
672 This class represents a generic DAG.
673 """
675 self.i3monitordb = None
676
677
678
679 - def QueueJobs(self,db,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0):
680 """
681 DAG: Queue individual tasks for a job.
682 """
683 return db.QueueTasks(maxjobs,grid_id,jobs_at_once,fifo,debug)
684
686 """
687 This class represents a generic DAG.
688 """
690 self.joblist = []
691 self.dataset = 0
692 self.cluster = 10
693
694 - def Submit(self,job,config_file):
695 """
696 Submit job/cluster to PBS
697
698 @param job: i3Job object
699 @param config_file: path to file were submit file will be written
700 """
701 from iceprod.server.job import i3Job
702 if job.GetDatasetId() == self.dataset or self.dataset != 0:
703 self.joblist.append(job)
704 self.dataset = job.GetDatasetId()
705 if len(self.joblist) >= cluster:
706 proc = ",".join(map(lambda x: x.GetArgOption('procnum'),self.joblist))
707 job.AddArgOption("procnum",proc)
708 else:
709 return
710
711 self.submit_status = ''
712 self.WriteConfig(job,config_file)
713
714 cmd = "%s %s" % (self.enqueue_cmd,config_file)
715 status, self.submit_status = commands.getstatusoutput(cmd)
716 try:
717 id = self.get_id(self.submit_status)
718 job.SetJobId(id)
719 status = 0
720 if id < 0: status = 1
721 except Exception, e:
722 logger.error("Exception: " + str(e))
723 self.submit_status += "\nException: " + str(e)
724 status = 1
725
726 if len(self.job_ids) and not self.cluster_id:
727 self.cluster_id = self.job_ids[0]
728
729 job.submit_status = status
730 job.submit_msg = self.submit_status
731
732 if self.production:
733
734 if job.submit_status == 0:
735 self.i3monitordb.jobsubmitted(
736 job.GetDatasetId(), job.GetProcNum(),
737 job.GetInitialdir(), job.GetJobId())
738 else:
739 logger.error("failed to submit jobs:"+job.submit_msg)
740 self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3,
741 "failed to submit jobs:"+job.submit_msg)
742 os.chdir('/tmp')
743 self.CleanDir(job.GetInitialdir())
744
745 return status,self.submit_status
746
747 - def QueueJobs(self,db,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0):
748 """
749 DAG: Queue individual tasks for a job.
750 """
751
752
754 """
755 print usage/help info
756
757 @param arguments: cmdline args passed to program
758 """
759 print "Usage: %s <serialized iGrid object file>" % arguments[0]
760 print " "
761