Package iceprod :: Package server :: Module grid
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.grid

  1  #!/bin/env python 
  2  # 
  3  """ 
  4   Interface for configuring and submitting jobs on a computing cluster.  
  5   use this class directly. Instead use one of the implementations 
  6   that inherit from this class. 
  7   
  8   copyright  (c) 2005 the icecube collaboration 
  9   
 10   @version: $Revision: $ 
 11   @date: $Date: $ 
 12   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 13   @todo: implement more functionality and features 
 14  """ 
 15   
 16  import os 
 17  import re 
 18  import sys 
 19  import math 
 20  import dircache 
 21  import os.path 
 22  import time 
 23  import string 
 24  import shutil 
 25  import glob 
 26  import cPickle 
 27  import logging 
 28  import logging.config 
 29  import commands 
 30  import copy 
 31  import iceprod.core.exe 
 32  import iceprod.core.inventory 
 33  from iceprod.core import metadata 
 34  from iceprod.core.dataclasses import Steering 
 35  from iceprod.core.lex import XMLSummaryParser 
 36  from iceprod.core.inventory import FileInventory 
 37  from iceprod.core import functions  
 38  from iceprod.modules.gsiftp import * 
 39   
 40  logger = logging.getLogger('iGrid') 
 41   
 42   
 43   
44 -class iGrid:
45 """ 46 This class represents a generic job a distributed system. 47 """ 48 CopyStatusEnum = {'OK':1,'NOTREADY':0,'FAILED':0} 49
50 - def __init__(self):
51 self.executable = None 52 self.nproc = 1 53 self.proc = 0 54 self.params = {} 55 current_time = time.asctime().replace(" ","_") 56 self.logfile = "%s_$(Proc).log" % current_time 57 self.outputfile = "%s_$(Proc).output" % current_time 58 self.errorfile = "%s_$(Proc).error" % current_time 59 self.submit_status = "" 60 self.job_ids = [] 61 self.cluster_id = None 62 self.jobs_submitted = -1 63 self.simdbkey = None 64 self.host = None 65 self.port = None 66 self.url = None 67 self.metadata = None 68 self.post_check_script = None 69 self.initialdir = None 70 self.env = {} 71 self.steering = None 72 self.argopts = {} 73 self.jobs = [] 74 self.cpobj = URLCopy() 75 self.suffix = ".submit" 76 self.i3monitordb = None
77
78 - def SetCopyClass(self,cpobj):
79 self.cpobj = cpobj
80
81 - def urlcopy(self,file,url):
82 logger.debug('copying %s >>> %s' % (file,url)) 83 84 if url.startswith("file:") and file.startswith("file:"): 85 dest = url.replace("file:","") 86 file = file.replace("file:","") 87 if not os.path.exists(dest): os.makedirs(dest) 88 retval = os.system('mv %s %s' % (file,dest) ) 89 if not os.path.exists(dest): 90 raise Exception, "Failed to copy file to target '%s'" % url 91 elif url.startswith("ssh:"): 92 dest = url.replace("ssh:","") 93 file = file.replace("file:","") 94 if re.match(r'[a-zA-Z0-9\-\_\.]*@?[a-zA-Z0-9\-\_\.]+:',url): 95 retval = os.system('scp %s %s' % (file,dest) ) 96 else: 97 raise Exception, "malformated SSH url '%s'. " % dest 98 else: 99 urlcp = copy.deepcopy(self.cpobj) 100 urlcp.SetParameter('source',file) 101 urlcp.SetParameter('destination',url) 102 urlcp.SetParameter('emulate',False) 103 stats = {} 104 retval = urlcp.Execute(stats) 105 106 if not retval == 0: 107 raise Exception, "Failed to copy file to target '%s'" % url 108 return retval
109 110
111 - def PushJob(self,job):
112 """ 113 Add new job to queue 114 """ 115 self.jobs.append(job)
116
117 - def PopJob(self):
118 """ 119 remove and return new job from queue 120 """ 121 self.jobs.pop()
122
123 - def GetJob(self,i):
124 """ 125 return job from queue 126 """ 127 self.jobs[i]
128
129 - def GetJobList(self):
130 """ 131 Get list of jobs 132 """ 133 return self.jobs
134
135 - def AddEnv(self,var,val):
136 """ 137 Add an environment variable. 138 """ 139 self.env[var] = val
140
141 - def GetEnv(self):
142 return self.env
143
144 - def format_arg(self,tuple):
145 """ 146 format runtime cmdline argument 147 @param tuple: tuple in the form (argname,argval) 148 """ 149 if len(tuple[0]) > 1: 150 return "--%s=%s" % tuple 151 else: 152 return "-%s %s" % tuple
153
154 - def SetInitialdir(self,path):
155 """ 156 Define the directory where jobs will be submitted from. 157 @param path: system path to directory 158 """ 159 self.initialdir = path
160
161 - def GetInitialdir(self):
162 """ 163 Get the directory where jobs will be submitted from. 164 @return: path to directory 165 """ 166 return self.initialdir
167
168 - def SetSteering(self,steering):
169 """ 170 Add a reference to the steering configuration 171 @param steering: a Steering object 172 """ 173 self.steering = steering
174
175 - def GetSteering(self):
176 """ 177 Get the reference to the steering configuration 178 @return: the Steering object 179 """ 180 return self.steering
181
182 - def GetJobIds(self):
183 """ 184 Get the cluster AND job id for the submitted jobs. 185 @return: a list of jobs with their cluster and job id 186 in the condor format 187 None if no jobs have been submitted or if submission failed. 188 """ 189 return self.job_ids
190
191 - def GetClusterId(self):
192 """ 193 Get the cluster AND job id for the submitted jobs. 194 @return: a list of jobs with their cluster and job id 195 in the condor format 196 None if no jobs have been submitted or if submission failed. 197 """ 198 return self.cluster_id
199
200 - def SetMetadataPath(self,path):
201 self.metadata = path
202
203 - def GetMetadataPath(self):
204 return self.metadata
205
206 - def UpdateMetadata(self):
207 """ 208 Update metadata information 209 """ 210 if os.path.exists(self.GetMetadataPath()): 211 logger.debug("Updating metadata") 212 try: 213 difPlusReader = metadata.MetadataParser() 214 difplus = difPlusReader.ParseFile(self.GetMetadataPath()) 215 plus = difplus.GetPlus() 216 plus.SetEndDatetime() 217 plus.SetI3DBKey(0) # Where is this number comming from? 218 metadata.MetadataWriter(difplus).write_to_file(self.GetMetadataPath()) 219 except Exception, e: 220 logger.error("Error updating metadata: %s" % str(e))
221 222
223 - def GetRootDir(self):
224 """ 225 Get the root directory of installation 226 @return: path to directory 227 """ 228 return self.execdir
229
230 - def SetRootDir(self,path):
231 """ 232 Define the root directory of installation 233 @param path: system path to directory 234 """ 235 self.execdir = path
236 237
238 - def FileName(self):
239 """ 240 Get the name of this object's serialized file 241 @return: path to directory 242 """ 243 return os.path.join(self.initialdir,str(self.cluster_id)+'.qobj')
244 245
246 - def SetMonitorDB(self,db):
247 self.i3monitordb = db
248 - def GetMonitorDB(self):
249 return self.i3monitordb
250
251 - def SetQueue(self,nproc):
252 """ 253 @param nproc: number of jobs to enqueue 254 """ 255 self.nproc = nproc
256
257 - def GetQueue(self):
258 """ 259 @return: number of jobs enqueueued 260 """ 261 return self.nproc
262 263
264 - def SetSimDBKey(self,key):
265 """ 266 Set the unique key for current configuration on production database 267 """ 268 self.simdbkey = key
269
270 - def GetSimDBKey(self):
271 """ 272 Get the unique key for current configuration from production database 273 """ 274 return self.simdbkey
275
276 - def SetSubmitHost(self,host):
277 """ 278 Set the hostname of the submit node 279 """ 280 self.submithost = host
281 282
283 - def GetSubmitHost(self):
284 """ 285 Get the hostname of the submit node 286 """ 287 return self.submithost
288
289 - def SetHost(self,host):
290 """ 291 Set the hostname of the soaptray server that the job was submitted to 292 """ 293 self.host = host
294 295
296 - def GetHost(self):
297 """ 298 Get the hostname of the soaptray server that the job was submitted to 299 """ 300 return self.host
301 302
303 - def SetPort(self,port):
304 """ 305 Set the port number of the submit node 306 """ 307 self.port = port
308
309 - def GetPort(self):
310 """ 311 Get the port number of the submit node 312 """ 313 return self.port
314
315 - def SetURL(self,url):
316 """ 317 Set the hostname of the submit node 318 """ 319 self.url = url
320
321 - def GetURL(self):
322 """ 323 Get the hostname of the submit node 324 """ 325 return self.url
326
327 - def SetStatus(self,status):
328 """ 329 (0=init submission,1=runnig,2=completed,3=failed) 330 """ 331 self.jobstatus = status
332 333
334 - def Submit(self,job,config_file):
335 """ 336 Submit job/cluster to PBS 337 338 @param job: i3Job object 339 @param config_file: path to file were submit file will be written 340 """ 341 self.submit_status = '' 342 self.WriteConfig(job,config_file) 343 344 cmd = "%s %s" % (self.enqueue_cmd,config_file) 345 status, self.submit_status = commands.getstatusoutput(cmd) 346 try: 347 id = self.get_id(self.submit_status) 348 job.SetJobId(id) 349 status = 0 350 if id < 0: status = 1 351 except Exception, e: 352 logger.error("Exception: " + str(e)) 353 self.submit_status += "\nException: " + str(e) 354 status = 1 355 356 if len(self.job_ids) and not self.cluster_id: 357 self.cluster_id = self.job_ids[0] 358 359 return status,self.submit_status
360 361
362 - def CheckQ(self,db=None):
363 """ 364 Interface: Check status of job/cluster in queuing system. 365 """ 366 return """This is just a prototype for a function and must be 367 implemented by child classes"""
368
369 - def CleanQ(self,jobs=None):
370 """ 371 Interface: Check status of job/cluster in queuing system. 372 """ 373 return """This is just a prototype for a function and must be 374 implemented by child classes"""
375
376 - def CheckJobStatus(self,jobs):
377 """ 378 Querie status of job on queue 379 """ 380 if isinstance(jobs,list): 381 job_list = jobs 382 else: 383 job_list = [jobs] 384 for job in job_list: 385 job.SetStatus('?') 386 return 1
387
388 - def QRemove(self,jobid):
389 """ 390 Interface: Remove active job/cluster from queuing system. 391 """ 392 print """This is just a prototype for a function and must be 393 implemented by child classes""" 394 return -1
395
396 - def PostCopy(self,jobdict,target_url,maxtries=4):
397 """ 398 Interface: Remove active job/cluster from queuing system. 399 """ 400 initialdir = jobdict['submitdir'] 401 dataset = jobdict['dataset_id'] 402 proc = jobdict['queue_id'] 403 completeset = True 404 inventoryfile = os.path.join(initialdir,iceprod.core.inventory.name) 405 copyok = 0 406 if not os.path.exists(inventoryfile): 407 raise Exception,"%d,%d: no output inventory found." % (dataset,proc) 408 inventory = FileInventory() 409 inventorylist = [] 410 logger.debug( 'reading %s' % inventoryfile ) 411 inventory.Read(inventoryfile) 412 for file in inventory.filelist: 413 414 # Check to make sure all files have been copyied 415 source = file['source'] 416 if source.startswith('file:'): 417 source = os.path.abspath(os.path.join(initialdir,basename(source.replace('file:','')))) 418 target = file['target'] 419 if not os.path.exists( source ): 420 logger.error("%d.%d: missing data. in %s " % (dataset,proc,initialdir)) 421 return self.CopyStatusEnum['NOTREADY'] 422 inventorylist.append(source) 423 source = 'file:' + source 424 if self.urlcopy(source,target): 425 logger.error("failed to copy %s -> %s" % (source,target)) 426 return self.CopyStatusEnum['FAILED'] 427 428 #copy log files to local target 429 if target_url: 430 inventorylist.extend(glob.glob(os.path.join(initialdir,'*.out'))) 431 inventorylist.extend(glob.glob(os.path.join(initialdir,'*.err'))) 432 inventorylist.extend(glob.glob(os.path.join(initialdir,'*.log'))) 433 for file in inventorylist: 434 logger.debug( "copying %s %s" % (file,target_url)) 435 if self.urlcopy(file,target_url): 436 logger.error("failed to copy %s locally" % file) 437 return self.CopyStatusEnum['FAILED'] 438 439 return self.CopyStatusEnum['OK']
440 441
442 - def Clean(self,jobdict):
443 """ 444 Interface: clean submit directory 445 """ 446 logger.info('%(dataset_id)u.%(queue_id)u: cleaning dir %(submitdir)s' % jobdict) 447 dir = "%(submitdir)s"%jobdict 448 logger.debug(dir) 449 if os.path.exists(dir) and os.path.isdir(dir): 450 functions.removedirs(dir)
451
452 - def wait(self,job):
453 statsfile = "stats.dat" 454 statsfile = os.path.join(job.GetInitialdir(),statsfile) 455 while not os.path.exists(statsfile): 456 time.sleep(240)
457 458 459 460 461
462 - def SelfPickle(self):
463 """ 464 Serialize iGrid object and write it to a file 465 """ 466 file = open(self.FileName(), "w") 467 cPickle.dump(self,file) 468 file.close()
469 470
471 - def AddParam(self,name,value):
472 """ 473 Add a classadd parameter to be included in submit file 474 475 @param name: of parameter to be set 476 @param value: to be bound to parameter (this can be a classadd 477 expression) 478 """ 479 self.params[name] = value
480
481 - def GetParam(self,name):
482 """ 483 @param name: name of parameter to retrieve 484 @return: the value bound to parameter given by name, None if parameter 485 has not been set. 486 """ 487 if self.params.has_key(name): 488 return self.params[name]
489
490 - def GetParamKeys(self):
491 """ 492 @return: a list of parameter names that have been set. 493 """ 494 return self.params.keys()
495 496
497 - def AddArgOption(self,optname,optval=""):
498 """ 499 Add a options to be passed to executable. 500 Similar to 'AddArgument' but options will preceed arguemtens 501 For example: executable <option(s)> <argument(s)> 502 Note: Consecutive calls to this method will append options in the 503 order in which they were added. 504 505 @param optname: name of option be passed to executable at runtime 506 @param optval: value of option be passed to executable at runtime 507 """ 508 self.argopts[optname] = optval
509
510 - def GetArgOptions(self):
511 """ 512 @return: a list of options passed to the executable 513 """ 514 return map( self.format_arg, zip(self.argopts.keys(), self.argopts.values()))
515
516 - def GetArgOpt(self,optname):
517 """ 518 Get value of option to be passed to executable. 519 @param optname: name of argument to be passed to executable at runtime 520 """ 521 if optname in self.argopts.keys(): 522 return self.argopts[optname] 523 else: 524 return ""
525
526 - def Suffix(self):
527 """ 528 Suffix of submit script 529 """ 530 return self.suffix
531
532 -def usage(arguments):
533 """ 534 print usage/help info 535 536 @param arguments: cmdline args passed to program 537 """ 538 print "Usage: %s <serialized iGrid object file>" % arguments[0] 539 print " "
540