Package iceprod :: Package server :: Module grid
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.grid

  1  #!/bin/env python 
  2  # 
  3  """ 
  4   Interface for configuring and submitting jobs on a computing cluster.  
  5   use this class directly. Instead use one of the implementations 
  6   that inherit from this class. 
  7   
  8   copyright  (c) 2005 the icecube collaboration 
  9   
 10   @version: $Revision: $ 
 11   @date: $Date: $ 
 12   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 13   @todo: implement more functionality and features 
 14  """ 
 15   
 16  import os 
 17  import re 
 18  import sys 
 19  import math 
 20  import dircache 
 21  import os.path 
 22  import time 
 23  import string 
 24  import shutil 
 25  import glob 
 26  import cPickle 
 27  import logging 
 28  import logging.config 
 29  import commands 
 30  import copy 
 31  import iceprod.core.exe 
 32  import iceprod.core.inventory 
 33  from iceprod.core import metadata 
 34  from iceprod.core.dataclasses import Steering 
 35  from iceprod.core.lex import XMLSummaryParser 
 36  from iceprod.core.inventory import FileInventory 
 37  from iceprod.core import functions  
 38  from iceprod.modules.gsiftp import * 
 39   
 40  logger = logging.getLogger('iGrid') 
 41   
 42   
 43   
44 -class iGrid:
45 """ 46 This class represents a generic job a distributed system. 47 """ 48 CopyStatusEnum = {'OK':1,'NOTREADY':0,'FAILED':0} 49
50 - def __init__(self,size=1):
51 self.executable = None 52 self.size = size 53 self.nproc = 1 54 self.proc = 0 55 self.params = {} 56 current_time = time.asctime().replace(" ","_") 57 self.logfile = "%s_$(Proc).log" % current_time 58 self.outputfile = "%s_$(Proc).output" % current_time 59 self.errorfile = "%s_$(Proc).error" % current_time 60 self.submit_status = "" 61 self.job_ids = [] 62 self.cluster_id = None 63 self.jobs_submitted = -1 64 self.simdbkey = None 65 self.host = None 66 self.port = None 67 self.url = None 68 self.metadata = None 69 self.post_check_script = None 70 self.initialdir = None 71 self.env = {} 72 self.steering = None 73 self.argopts = {} 74 self.jobs = [] 75 self.cpobj = URLCopy() 76 self.suffix = ".submit" 77 self.i3monitordb = None 78 self.production = True
79
80 - def QueueJobs(self,db,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0):
81 return db.QueueJobs(maxjobs,grid_id,jobs_at_once,fifo,debug)
82
83 - def SetCopyClass(self,cpobj):
84 self.cpobj = cpobj
85
86 - def urlcopy(self,file,url):
87 logger.debug('copying %s >>> %s' % (file,url)) 88 89 if url.startswith("/"): url = "file:"+url 90 91 if url.startswith("file:") and file.startswith("file:"): 92 dest = url.replace("file:","") 93 file = file.replace("file:","") 94 if not os.path.exists(dest): os.makedirs(dest) 95 retval = os.system('mv %s %s' % (file,dest) ) 96 if not os.path.exists(dest): 97 raise Exception, "Failed to copy file to target '%s'" % url 98 elif url.startswith("ssh:"): 99 dest = url.replace("ssh:","") 100 file = file.replace("file:","") 101 if re.match(r'[a-zA-Z0-9\-\_\.]*@?[a-zA-Z0-9\-\_\.]+:',url): 102 retval = os.system('scp %s %s' % (file,dest) ) 103 else: 104 raise Exception, "malformated SSH url '%s'. " % dest 105 else: 106 urlcp = self.cpobj 107 urlcp.SetParameter('source',file) 108 urlcp.SetParameter('destination',url) 109 urlcp.SetParameter('emulate',False) 110 stats = {} 111 retval = urlcp.Execute(stats) 112 113 if not retval == 0: 114 raise Exception, "Failed to copy file to target '%s'" % url 115 return retval
116 117
118 - def PushJob(self,job):
119 """ 120 Add new job to queue 121 """ 122 self.jobs.append(job)
123
124 - def PopJob(self):
125 """ 126 remove and return new job from queue 127 """ 128 self.jobs.pop()
129
130 - def GetJob(self,i):
131 """ 132 return job from queue 133 """ 134 self.jobs[i]
135
136 - def GetJobList(self):
137 """ 138 Get list of jobs 139 """ 140 return self.jobs
141
142 - def AddEnv(self,var,val):
143 """ 144 Add an environment variable. 145 """ 146 self.env[var] = val
147
148 - def GetEnv(self):
149 return self.env
150
151 - def format_arg(self,tuple):
152 """ 153 format runtime cmdline argument 154 @param tuple: tuple in the form (argname,argval) 155 """ 156 if len(tuple[0]) > 1: 157 return "--%s=%s" % tuple 158 else: 159 return "-%s %s" % tuple
160
161 - def SetInitialdir(self,path):
162 """ 163 Define the directory where jobs will be submitted from. 164 @param path: system path to directory 165 """ 166 self.initialdir = path
167
168 - def GetInitialdir(self):
169 """ 170 Get the directory where jobs will be submitted from. 171 @return: path to directory 172 """ 173 return self.initialdir
174
175 - def SetSteering(self,steering):
176 """ 177 Add a reference to the steering configuration 178 @param steering: a Steering object 179 """ 180 self.steering = steering
181
182 - def GetSteering(self):
183 """ 184 Get the reference to the steering configuration 185 @return: the Steering object 186 """ 187 return self.steering
188
189 - def GetJobIds(self):
190 """ 191 Get the cluster AND job id for the submitted jobs. 192 @return: a list of jobs with their cluster and job id 193 in the condor format 194 None if no jobs have been submitted or if submission failed. 195 """ 196 return self.job_ids
197
198 - def GetClusterId(self):
199 """ 200 Get the cluster AND job id for the submitted jobs. 201 @return: a list of jobs with their cluster and job id 202 in the condor format 203 None if no jobs have been submitted or if submission failed. 204 """ 205 return self.cluster_id
206
207 - def SetMetadataPath(self,path):
208 self.metadata = path
209
210 - def GetMetadataPath(self):
211 return self.metadata
212
213 - def UpdateMetadata(self):
214 """ 215 Update metadata information 216 """ 217 if os.path.exists(self.GetMetadataPath()): 218 logger.debug("Updating metadata") 219 try: 220 difPlusReader = metadata.MetadataParser() 221 difplus = difPlusReader.ParseFile(self.GetMetadataPath()) 222 plus = difplus.GetPlus() 223 plus.SetEndDatetime() 224 plus.SetI3DBKey(0) # Where is this number comming from? 225 metadata.MetadataWriter(difplus).write_to_file(self.GetMetadataPath()) 226 except Exception, e: 227 logger.error("Error updating metadata: %s" % str(e))
228 229
230 - def GetRootDir(self):
231 """ 232 Get the root directory of installation 233 @return: path to directory 234 """ 235 return self.execdir
236
237 - def SetRootDir(self,path):
238 """ 239 Define the root directory of installation 240 @param path: system path to directory 241 """ 242 self.execdir = path
243 244
245 - def FileName(self):
246 """ 247 Get the name of this object's serialized file 248 @return: path to directory 249 """ 250 return os.path.join(self.initialdir,str(self.cluster_id)+'.qobj')
251 252
253 - def SetMonitorDB(self,db):
254 self.i3monitordb = db
255 - def GetMonitorDB(self):
256 return self.i3monitordb
257 258
259 - def SetProduction(self,production):
260 self.production = production
261 - def GetProduction(self):
262 return self.production
263
264 - def SetQueue(self,nproc):
265 """ 266 @param nproc: number of jobs to enqueue 267 """ 268 self.nproc = nproc
269
270 - def GetQueue(self):
271 """ 272 @return: number of jobs enqueueued 273 """ 274 return self.nproc
275 276
277 - def SetSimDBKey(self,key):
278 """ 279 Set the unique key for current configuration on production database 280 """ 281 self.simdbkey = key
282
283 - def GetSimDBKey(self):
284 """ 285 Get the unique key for current configuration from production database 286 """ 287 return self.simdbkey
288
289 - def SetSubmitHost(self,host):
290 """ 291 Set the hostname of the submit node 292 """ 293 self.submithost = host
294 295
296 - def GetSubmitHost(self):
297 """ 298 Get the hostname of the submit node 299 """ 300 return self.submithost
301
302 - def SetHost(self,host):
303 """ 304 Set the hostname of the soaptray server that the job was submitted to 305 """ 306 self.host = host
307 308
309 - def GetHost(self):
310 """ 311 Get the hostname of the soaptray server that the job was submitted to 312 """ 313 return self.host
314 315
316 - def SetPort(self,port):
317 """ 318 Set the port number of the submit node 319 """ 320 self.port = port
321
322 - def GetPort(self):
323 """ 324 Get the port number of the submit node 325 """ 326 return self.port
327
328 - def SetURL(self,url):
329 """ 330 Set the hostname of the submit node 331 """ 332 self.url = url
333
334 - def GetURL(self):
335 """ 336 Get the hostname of the submit node 337 """ 338 return self.url
339
340 - def SetStatus(self,status):
341 """ 342 (0=init submission,1=runnig,2=completed,3=failed) 343 """ 344 self.jobstatus = status
345 346
347 - def _submit(self,job):
348 """ 349 Submit job 350 351 @param config_file: path to file were submit file will be written 352 """ 353 cmd = "%s %s" % (self.enqueue_cmd,job.config_file) 354 logger.debug("executing: "+cmd) 355 status, submit_status = commands.getstatusoutput(cmd) 356 if status: 357 logger.debug("unable to submit job:" + submit_status) 358 return status,submit_status 359 else: 360 try: 361 id = self.get_id(submit_status) 362 job.SetJobId(id) 363 if id < 0: status += 1 364 except Exception, e: 365 logger.error("Exception: " + str(e)) 366 submit_status += "\nException: " + str(e) 367 status += 1 368 369 job.submit_status = status 370 job.submit_msg = submit_status 371 372 return status,submit_status
373 374 375 376
377 - def Submit(self,cookie):
378 """ 379 Submit job/cluster 380 381 @param cooke: cookie to store submit info to be return to submitter 382 """ 383 self.submit_status = '' 384 status_sum = 0 385 cwdir = os.getcwd() 386 387 for job in self.jobs: 388 logger.info("submitting job %u.%u" %(job.GetDatasetId(),job.GetProcNum())) 389 390 os.chdir(job.GetInitialdir()) 391 self.WriteConfig(job,job.config_file) 392 status, submit_status = self._submit(job) 393 status_sum += status 394 cookie.AddJobId(job.GetJobId()) 395 396 if self.production: 397 # update database 398 if job.submit_status == 0: 399 self.i3monitordb.jobsubmitted( 400 job.GetDatasetId(), job.GetProcNum(), 401 job.GetInitialdir(), job.GetJobId()) 402 else: 403 logger.error("failed to submit jobs:"+job.submit_msg) 404 self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3, 405 "failed to submit jobs:"+job.submit_msg) 406 os.chdir('/tmp') 407 self.CleanDir(job.GetInitialdir()) 408 409 os.chdir(cwdir) 410 411 return status_sum,self.submit_status
412 413
414 - def CheckQ(self,db=None):
415 """ 416 Interface: Check status of job/cluster in queuing system. 417 """ 418 return """This is just a prototype for a function and must be 419 implemented by child classes"""
420
421 - def CleanQ(self,jobs=None):
422 """ 423 Interface: Check status of job/cluster in queuing system. 424 """ 425 return """This is just a prototype for a function and must be 426 implemented by child classes"""
427
428 - def CleanDir(self,dir):
429 """ 430 Remove temporary directory where the current job(s) was 431 submitted from. 432 """ 433 if os.path.exists(dir): 434 try: 435 os.removedirs(dir) 436 except OSError,e: 437 logger.error(e)
438 439 440
441 - def CheckJobStatus(self,jobs):
442 """ 443 Querie status of job on queue 444 """ 445 if isinstance(jobs,list): 446 job_list = jobs 447 else: 448 job_list = [jobs] 449 for job in job_list: 450 job.SetStatus('?') 451 return 1
452
453 - def IsUrl(self, path):
454 return bool(re.match("[^/:]+://?.*$", path))
455
456 - def QRemove(self,jobid):
457 """ 458 Interface: Remove active job/cluster from queuing system. 459 """ 460 print """This is just a prototype for a function and must be 461 implemented by child classes""" 462 return -1
463
464 - def PostCopy(self,jobdict,target_url,maxtries=4):
465 """ 466 Interface: Remove active job/cluster from queuing system. 467 """ 468 initialdir = jobdict['submitdir'] 469 dataset = jobdict['dataset_id'] 470 proc = jobdict['queue_id'] 471 completeset = True 472 inventoryfile = os.path.join(initialdir,iceprod.core.inventory.name) 473 copyok = 0 474 if not os.path.exists(inventoryfile): 475 raise Exception,"%d,%d: no output inventory found." % (dataset,proc) 476 inventory = FileInventory() 477 inventorylist = [] 478 logger.debug( 'reading %s' % inventoryfile ) 479 inventory.Read(inventoryfile) 480 for file in inventory.filelist: 481 482 # Check to make sure all files have been copyied 483 source = file['source'] 484 if source.startswith('file:'): # replace path with localpath 485 source = os.path.abspath(os.path.join(initialdir,basename(source.replace('file:','')))) 486 target = file['target'] 487 if not os.path.exists( source ): 488 logger.error("%d.%d: missing data. in %s " % (dataset,proc,initialdir)) 489 return self.CopyStatusEnum['NOTREADY'] 490 source = 'file:' + source 491 urlcp = self.cpobj 492 urlcp.SetParameter('source',source) 493 urlcp.SetParameter('destination',target) 494 stats = {} 495 if file['track']: 496 retval = TrackURLCopy.Execute(urlcp,stats) 497 else: 498 retval = URLCopy.Execute(urlcp,stats) 499 if retval: 500 logger.error("failed to copy %s -> %s" % (source,target)) 501 return self.CopyStatusEnum['FAILED'] 502 503 #copy log files to local target 504 # if target_url: 505 # inventorylist.extend(glob.glob(os.path.join(initialdir,'*.out'))) 506 # inventorylist.extend(glob.glob(os.path.join(initialdir,'*.err'))) 507 # inventorylist.extend(glob.glob(os.path.join(initialdir,'*.log'))) 508 # for file in inventorylist: 509 # logger.debug( "copying %s %s" % (file,target_url)) 510 # if self.urlcopy(file,target_url): 511 # logger.error("failed to copy %s locally" % file) 512 # return self.CopyStatusEnum['FAILED'] 513 514 return self.CopyStatusEnum['OK']
515 516
517 - def Clean(self,jobdict,force=False):
518 """ 519 Interface: clean submit directory 520 """ 521 dir = "%(submitdir)s"%jobdict 522 logger.debug(dir) 523 if os.path.exists(dir) and os.path.isdir(dir): 524 logger.info('%(dataset_id)u.%(queue_id)u: cleaning dir %(submitdir)s' % jobdict) 525 functions.removedirs(dir) 526 527 if jobdict.has_key("dagtemp"): 528 dagtemp = "%(dagtemp)s" % jobdict 529 if os.path.exists(dagtemp) and os.path.isdir(dagtemp): 530 functions.removedirs(dagtemp) 531 return True
532
533 - def wait(self,job):
534 statsfile = "stats.dat" 535 statsfile = os.path.join(job.GetInitialdir(),statsfile) 536 while not os.path.exists(statsfile): 537 time.sleep(240)
538
539 - def jobfinalize(self,dataset_id,job_id,job,status='OK',clear_errors=True):
540 """ 541 Update status for job 542 @param dataset_id: dataset index 543 @param job_id: process number within dataset 544 """ 545 return self.i3monitordb.jobfinalize(dataset_id,job_id,job,status,clear_errors)
546
547 - def reset_old_jobs(self, 548 grid_id, 549 maxidletime, 550 maxruntime, 551 maxsubmittime, 552 maxcopytime, 553 maxfailures=10, 554 maxevicttime=10, 555 keepalive=14400):
556 """ 557 reset status of jobs that where queued but who's status 558 has not changed in more that maxtime minutes 559 560 @param grid_id: id of current cluster 561 @param maxruntime: maximum run time for jobs 562 @param maxsubmittime: maximum submit time for jobs 563 @param maxcopytime: maximum time for jobs to be in 'copying' state 564 @param maxfailures: maximum number of time a job is allowd to fail 565 @param keepalive: how often should server expect to hear from jobs 566 """ 567 return self.i3monitordb.reset_old_jobs( grid_id, maxidletime, maxruntime, maxsubmittime, maxcopytime, maxfailures, maxevicttime, keepalive)
568 569
570 - def GetResetJobs(self,grid_id,max_reset=50):
571 return self.i3monitordb.GetResetJobs(grid_id,max_reset)
572
573 - def GetFinishedJobs(self,grid_id,max_reset=50):
574 return self.i3monitordb.GetFinishedJobs(grid_id,max_reset)
575
576 - def GetActiveJobs(self,grid_id):
577 return self.i3monitordb.GetActiveJobs(grid_id)
578
579 - def GetProcessingJobs(self,grid_id,max_reset=50):
580 return self.i3monitordb.GetProcessingJobs(grid_id,max_reset)
581
582 - def GetQueuedJobs(self,grid_id,max_reset=50):
583 return self.i3monitordb.GetQueuedJobs(grid_id,max_reset)
584
585 - def SelfPickle(self):
586 """ 587 Serialize iGrid object and write it to a file 588 """ 589 file = open(self.FileName(), "w") 590 cPickle.dump(self,file) 591 file.close()
592 593
594 - def AddParam(self,name,value):
595 """ 596 Add a classadd parameter to be included in submit file 597 598 @param name: of parameter to be set 599 @param value: to be bound to parameter (this can be a classadd 600 expression) 601 """ 602 self.params[name] = value
603
604 - def GetParam(self,name):
605 """ 606 @param name: name of parameter to retrieve 607 @return: the value bound to parameter given by name, None if parameter 608 has not been set. 609 """ 610 if self.params.has_key(name): 611 return self.params[name]
612
613 - def GetParamKeys(self):
614 """ 615 @return: a list of parameter names that have been set. 616 """ 617 return self.params.keys()
618 619
620 - def AddArgOption(self,optname,optval=""):
621 """ 622 Add a options to be passed to executable. 623 Similar to 'AddArgument' but options will preceed arguemtens 624 For example: executable <option(s)> <argument(s)> 625 Note: Consecutive calls to this method will append options in the 626 order in which they were added. 627 628 @param optname: name of option be passed to executable at runtime 629 @param optval: value of option be passed to executable at runtime 630 """ 631 self.argopts[optname] = optval
632
633 - def GetArgOptions(self):
634 """ 635 @return: a list of options passed to the executable 636 """ 637 return map( self.format_arg, zip(self.argopts.keys(), self.argopts.values()))
638
639 - def GetArgOpt(self,optname):
640 """ 641 Get value of option to be passed to executable. 642 @param optname: name of argument to be passed to executable at runtime 643 """ 644 if optname in self.argopts.keys(): 645 return self.argopts[optname] 646 else: 647 return ""
648
649 - def Suffix(self):
650 """ 651 Suffix of submit script 652 """ 653 return self.suffix
654
655 - def GetLogs(self,job):
656 """ 657 Read output logs from job 658 """ 659 job.log = "" 660 job.out = "" 661 job.err = "" 662 for log in glob.glob(job.GetLogFile().replace(".log","*.log")): 663 job.log += functions.tail(log,100) 664 for log in glob.glob(job.GetOutputFile().replace(".out","*.out")): 665 job.out += functions.tail(log,100) 666 for log in glob.glob(job.GetErrorFile().replace(".out","*.out")): 667 job.err += functions.tail(log,100) 668 return
669
670 -class DAG(iGrid):
671 """ 672 This class represents a generic DAG. 673 """
674 - def __init__(self):
675 self.i3monitordb = None
676 677 678
679 - def QueueJobs(self,db,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0):
680 """ 681 DAG: Queue individual tasks for a job. 682 """ 683 return db.QueueTasks(maxjobs,grid_id,jobs_at_once,fifo,debug)
684
685 -class Clustered(iGrid):
686 """ 687 This class represents a generic DAG. 688 """
689 - def __init__(self):
690 self.joblist = [] 691 self.dataset = 0 692 self.cluster = 10
693
694 - def Submit(self,job,config_file):
695 """ 696 Submit job/cluster to PBS 697 698 @param job: i3Job object 699 @param config_file: path to file were submit file will be written 700 """ 701 from iceprod.server.job import i3Job 702 if job.GetDatasetId() == self.dataset or self.dataset != 0: 703 self.joblist.append(job) 704 self.dataset = job.GetDatasetId() 705 if len(self.joblist) >= cluster: # Flush 706 proc = ",".join(map(lambda x: x.GetArgOption('procnum'),self.joblist)) 707 job.AddArgOption("procnum",proc) 708 else: 709 return 710 711 self.submit_status = '' 712 self.WriteConfig(job,config_file) 713 714 cmd = "%s %s" % (self.enqueue_cmd,config_file) 715 status, self.submit_status = commands.getstatusoutput(cmd) 716 try: 717 id = self.get_id(self.submit_status) 718 job.SetJobId(id) 719 status = 0 720 if id < 0: status = 1 721 except Exception, e: 722 logger.error("Exception: " + str(e)) 723 self.submit_status += "\nException: " + str(e) 724 status = 1 725 726 if len(self.job_ids) and not self.cluster_id: 727 self.cluster_id = self.job_ids[0] 728 729 job.submit_status = status 730 job.submit_msg = self.submit_status 731 732 if self.production: 733 # update database 734 if job.submit_status == 0: 735 self.i3monitordb.jobsubmitted( 736 job.GetDatasetId(), job.GetProcNum(), 737 job.GetInitialdir(), job.GetJobId()) 738 else: 739 logger.error("failed to submit jobs:"+job.submit_msg) 740 self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3, 741 "failed to submit jobs:"+job.submit_msg) 742 os.chdir('/tmp') 743 self.CleanDir(job.GetInitialdir()) 744 745 return status,self.submit_status
746
747 - def QueueJobs(self,db,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0):
748 """ 749 DAG: Queue individual tasks for a job. 750 """
751 752
753 -def usage(arguments):
754 """ 755 print usage/help info 756 757 @param arguments: cmdline args passed to program 758 """ 759 print "Usage: %s <serialized iGrid object file>" % arguments[0] 760 print " "
761