Package iceprod :: Package server :: Package plugins :: Module glite
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.plugins.glite

   1  #!/bin/env python 
   2  # 
   3   
   4  """ 
   5   A basic wrapper for submitting and monitoring jobs to Condor. 
   6   Inherits from i3Queue 
   7    
   8   copyright  (c) 2005 the icecube collaboration 
   9    
  10   @version: $Revision: $ 
  11   @date: $Date: $ 
  12   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
  13  """ 
  14   
  15  import os 
  16  import re 
  17  import sys 
  18  import math 
  19  import dircache 
  20  import commands 
  21  import time 
  22  import string 
  23  import shutil 
  24  import ConfigParser 
  25  import logging 
  26  import iceprod 
  27  import iceprod.core.exe 
  28  from iceprod.core import metadata 
  29  from iceprod.core.dataclasses import Steering 
  30  from iceprod.server.grid import iGrid,DAG 
  31  from iceprod.core.lex import ExpParser 
  32  from os.path import basename,dirname 
  33  from copy import deepcopy 
  34   
  35  logger = logging.getLogger('eGee') 
  36   
  37  egee_status = {'1':'QUEUED', '2':'PROCESSING'} 
  38   
39 -def qoute(x):
40 return '\"%s\"' % str(x)
41
42 -def cstatus(istatus):
43 if egee_status.has_key(istatus): 44 return egee_status[istatus] 45 return 'X'
46 47
48 -class gLite(iGrid):
49 """ 50 This class represents a job or cluster on an egee grid. 51 """ 52 queued_state = [ 53 'READY', 54 'SCHEDULED', 55 'WAITING', 56 'SUBMITTED', 57 'ACCEPTED', 58 'PREPARING', 59 'QUEUEING', 60 'QUEUED', 61 'INLRMS:Q', 62 ] 63 running_state = [ 64 'RUNNING', 65 'EXECUTED', 66 'FINISHING', 67 ] 68 69 error_state = [ 70 'FAILED', 71 'DONE(EXITCODE!=0)', 72 'ABORTED', 73 ] 74 75 finished_state = [ 76 'DONE(SUCCESS)', 77 'CLEARED', 78 'CANCELLED', 79 ] 80 81 undetermined_states = [ 82 'UNAVAILABLE', 83 ] 84 85
86 - def __init__(self):
87 iGrid.__init__(self) 88 self.cluster_id = -1 89 self.post = None 90 self.ids = "ids" 91 self.enqueue_cmd = "glite-wms-job-submit" 92 self.checkqueue_cmd = "glite-wms-job-status" 93 self.queue_rm_cmd = "glite-wms-job-cancel" 94 self.get_output_cmd = "glite-wms-job-output" 95 self.suffix = "jdl" 96 self.grid_storage = "%(gridstorage)s/%(dataset_id)u/%(queue_id)u/%(filename)s" 97 self.logger = logging.getLogger('gLite')
98 99
100 - def _choose_resource(self,resource_list):
101 from random import choice 102 weighted_rlist = [] 103 104 for r in resource_list: 105 if len(r.split()) > 1: 106 resource,weight = r.split() 107 else: 108 resource,weight = r,1 109 try: 110 weight = int(weight) 111 except Exception,e: 112 self.logger.error("Exception: " + str(e)) 113 self.logger.warn("Unable to get resource weight for: " +r) 114 weight = 1 115 116 self.logger.debug("%s:%u " % (resource,weight)) 117 weighted_rlist.extend([resource]*weight) 118 119 return choice(weighted_rlist)
120
121 - def BundleFiles(self,job,tarballname="inputfiles.tar"):
122 """ 123 Bundle multiple files as a single tarball so as not to exceed 124 limit of input files 125 @param filelist: list of filenames 126 @return: tarball path 127 """ 128 import tarfile 129 tmpdir = os.path.join(job.GetInitialdir(),"inputfiles") 130 tarpath = os.path.join(self.GetInitialdir(),"inputfiles.tar") 131 if os.path.isfile(tarpath): return tarpath 132 if not os.path.exists(tmpdir): 133 os.makedirs(tmpdir) 134 cwdir = os.getcwd() 135 os.chdir(tmpdir) 136 tfile = tarfile.open(tarpath, 'w') 137 for file in job.GetInputFiles(): 138 if file.endswith('.py'): 139 os.system('cp -f %s .' % file) 140 self.logger.info("adding file %s to tarball" % file) 141 tfile.add(os.path.basename(file)) 142 tfile.close() 143 os.chdir(cwdir) 144 return tarpath
145
146 - def WriteConfig(self,job,config_file):
147 """ 148 Write JDL to a file. 149 @param job: i3Job object 150 @param config_file: path to file were submit file will be written 151 """ 152 153 if not job.GetExecutable(): 154 raise Exception, "no executable configured" 155 156 submitfile = open(config_file,'w') 157 wrapper = open(config_file.replace('jdl','sh'),'w') 158 self.ids= config_file.replace('jdl','ids') 159 160 job.Write(wrapper,"#!/bin/sh") 161 job.Write(wrapper,'echo "running iceprod on $HOSTNAME";',parse=False) 162 job.Write(wrapper,"uname -a;") 163 164 # set default in case I3SCRATCH is not defined 165 job.Write(wrapper,"I3SCRATCH=/tmp",parse=False) 166 for var in self.env.keys(): 167 job.Write(wrapper, "export %s=%s" % (var, self.env[var]),parse=False ) 168 for var in job.env.keys(): 169 job.Write(wrapper, "export %s=%s" % (var, job.env[var]),parse=False) 170 171 job.Write(wrapper,"INIT_DIR=$PWD",parse=False) 172 job.Write(wrapper,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 173 job.Write(wrapper,"mkdir -p $RUNDIR") 174 job.Write(wrapper,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 175 job.Write(wrapper,"cd $RUNDIR",parse=False) 176 job.Write(wrapper,"ln -s $INIT_DIR/* $RUNDIR/",parse=False) 177 178 err = basename(job.GetErrorFile()) 179 out = basename(job.GetOutputFile()) 180 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 181 argstr = job.GetMainScript() + " " + " ".join(argopts) 182 inputfiles = [] 183 inputfiles.append(wrapper.name) 184 inputfiles.append(job.GetExecutable()) 185 inputtar = self.BundleFiles(job,tarballname="inputfiles.tar") 186 inputfiles.append(inputtar) 187 for file in job.GetInputFiles(): 188 if not file.endswith('.py'): 189 inputfiles.append(file) 190 inputfiles = map(qoute,inputfiles) 191 192 outputfiles = [] 193 outputfiles.append('work.tgz') 194 outputfiles.append("icetray.%06u.log" % job.GetProcNum()) 195 outputfiles.append('%s'%err) 196 outputfiles.append('%s'%out) 197 outputfiles.append(iceprod.core.exe._inventory) # inventory 198 outputfiles.extend(job.GetOutputFiles()) 199 outputfiles = map(qoute,outputfiles) 200 201 pyhome = '' 202 if self.env.has_key('PYROOT'): 203 pyhome = self.env['PYROOT'] 204 if job.env.has_key('PYROOT'): 205 pyhome = job.env['PYROOT'] 206 if pyhome.startswith('http:'): 207 job.Write(wrapper,"wget --quiet %s" % pyhome,parse=False) 208 if pyhome.endswith('.tgz'): 209 job.env['PYROOT'] = '$PWD/python-2.3' 210 job.Write(wrapper,"tar xzf %s" % os.path.basename(pyhome),parse=False) 211 #inputfiles.append(pyhome) 212 213 job.Write(submitfile,'Executable = "%s";' % basename(wrapper.name)) 214 job.Write(submitfile,'StdOutput = "%s";' % out) 215 job.Write(submitfile,'StdError = "%s";' % err) 216 job.Write(submitfile,'InputSandbox = {%s};' % ','.join(inputfiles)) 217 job.Write(submitfile,'OutputSandbox = {%s};' % ','.join(outputfiles)) 218 job.Write(submitfile,'Arguments = "%s";' % argstr,parse=False) 219 220 job.Write(wrapper, 'cmd="$PYROOT/bin/python %s $*"' % basename(job.GetExecutable()),parse=False) 221 job.Write(wrapper, "echo $cmd",parse=False) 222 job.Write(wrapper, "$cmd",parse=False) 223 job.Write(wrapper, "retval=$?",parse=False) 224 225 # Add general batch options 226 for key in self.GetParamKeys(): 227 if not job.batchopts.has_key(key): 228 job.AddBatchOpt(key,self.GetParam(key)) 229 230 # Add job specific batch options 231 submitdir = dirname(os.path.abspath(out)) 232 for key,opt in job.GetBatchOpts().items(): 233 if key.startswith('-'): continue 234 elif key.upper().startswith('OUTPUTSANDBOXBASEDESTURI'): 235 opt += submitdir 236 job.Write(submitfile, '%s = "%s";' % (key, opt)) 237 238 submitfile.close(); 239 240 read_from_inventory = False 241 if not read_from_inventory: # 242 ################## old stuff ######################### 243 if not self.GetArgOpt('stageout'): 244 job.Write(wrapper,"# copying files to storage element") 245 job.Write(wrapper,"for file in *; do") 246 job.Write(wrapper," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 247 job.Write(wrapper," lcg-cr --vo icecube \\",parse=False) 248 job.Write(wrapper," -d udo-dcache01.grid.uni-dortmund.de \\",parse=False) 249 job.Write(wrapper," -l lfn:/grid/icecube/iceprod/%s/%s/$file` \\" \ 250 %(job.GetDatasetId(),job.GetProcNum()),parse=False) 251 job.Write(wrapper," file:$PWD/$file",parse=False) 252 job.Write(wrapper," fi; done") 253 ####################################################### 254 255 else: 256 ################## 257 # read inventoryfile 258 # copied from PostCopy(), doesn't seem to work 259 # target and source entries in the inventory.xml should look like this: 260 # <source>file:$RUNDIR/output.tgz</source> 261 # <target>lfn:/grid/icecube/some/folders/for/organization</target> 262 inventoryfile = os.path.join(submitdir,iceprod.core.exe._inventory) 263 copyok = 0 264 if not os.path.exists(inventoryfile): 265 self.logger.error("%d,%d: no output inventory found." % (dataset,proc)) 266 return self.CopyStatusEnum['NOTREADY'] 267 inventory = FileInventory() 268 inventorylist = [] 269 self.logger.debug( 'reading %s' % inventoryfile ) 270 inventory.Read(inventoryfile) 271 ################### 272 273 job.Write(wrapper, "export LFC_HOST=`lcg-infosites --vo icecube lfc`",parse=False) # needed for lfc-mkdir !!! 274 job.Write(wrapper, "export LCG_GFAL_INFOSYS=lcg-bdii.ifh.de:2170",parse=False) # should be ok on the WN, only to be sure. 275 276 277 # Copy Output to SE 278 copy_cmds = [] 279 for file in inventory.filelist: 280 # lcg-cr --vo icecube -l lfn:/grid/icecube/fabian/tests/$FILENAME file:/$RUNDIR/$FILENAME 281 cmd = 'lcg-cr --vo icecube ' 282 cmd += '-l %s/%s ' % (file["target"], os.path.split(file["source"])[1]) 283 cmd += '%s' % (file["source"]) 284 285 copy_cmds.append("lfc-mkdir -p %s" % (file["target"][4:])) 286 copy_cmds.append(cmd) 287 288 cmd = "\n".join(copy_cmds) 289 job.Write(wrapper,"# copying files to storage element") 290 job.Write(wrapper,cmd,parse=False) 291 292 # delete files from WN 293 del_output = False 294 if del_output: # actual this shouldn't be necessary, but I put it that there don't occure problems with globus-copy 295 del_cmds = [] 296 for file in inventory.filelist: 297 cmd = "/bin/rm %s" % (file["source"][5:]) 298 del_cmds.append(cmd) 299 cmd = "\n".join(del_cmds) 300 301 job.Write(wrapper,"# deleting copied files") 302 job.Write(wrapper,cmd,parse=False) 303 304 job.Write(wrapper,"cd $INIT_DIR",parse=False) 305 job.Write(wrapper,"rm -rf $RUNDIR",parse=False) 306 job.Write(wrapper,"exit $retval",parse=False) 307 308 wrapper.close();
309
310 - def Submit(self,cookie):
311 """ 312 Submit job/cluster to grid 313 314 @param job: i3Job object 315 @param config_file: path to file were submit file will be written 316 """ 317 status = 0 318 self.submit_status = '' 319 cwdir = os.getcwd() 320 for job in self.jobs: 321 self.logger.info("submitting job %u.%u" %(job.GetDatasetId(),job.GetProcNum())) 322 os.chdir(job.GetInitialdir()) 323 324 self.WriteConfig(job,job.config_file) 325 326 submit_args = [] 327 328 # Add job specific batch options 329 resource_list = [] 330 for key,opt in job.GetBatchOpts().items(): 331 if key.startswith('-resource'): 332 if opt.startswith('-r'): 333 resource = opt[2:].strip() 334 else: 335 resource = opt 336 resource_list.append(resource) 337 338 elif key.startswith('-'): 339 submit_args.append(opt) 340 if resource_list: 341 resource = self._choose_resource(resource_list) 342 submit_args.append('-r %s' % resource) 343 344 cmd = "%s %s -o %s %s" % (self.enqueue_cmd," ".join(submit_args),self.ids,job.config_file) 345 self.logger.debug(cmd) 346 status, self.submit_status = commands.getstatusoutput(cmd) 347 if status != 0: 348 self.logger.error("Failed to execute command") 349 self.logger.error(cmd) 350 try: 351 id = self.get_id(self.submit_status) 352 job.SetJobId(id) 353 if id in (-1,None) : status = 1 354 else: status = 0 355 except Exception, e: 356 self.logger.error("Exception: " + str(e)) 357 self.submit_status += "\nException: " + str(e) 358 status = 1 359 360 if len(self.job_ids) and not self.cluster_id: 361 self.cluster_id = self.job_ids[0] 362 363 job.submit_status = status 364 job.submit_msg = self.submit_status 365 366 if self.production: 367 # update database 368 if job.submit_status == 0: 369 self.i3monitordb.jobsubmitted( 370 job.GetDatasetId(), job.GetProcNum(), 371 job.GetInitialdir(), job.GetJobId()) 372 else: 373 logger.error("failed to submit jobs:"+job.submit_msg) 374 self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3, 375 "failed to submit jobs:"+job.submit_msg) 376 os.chdir('/tmp') 377 #self.CleanDir(job.GetInitialdir()) 378 379 os.chdir(cwdir) 380 # done submitting this job, go to next 381 382 return status,self.submit_status
383 384
385 - def get_id(self,submit_status):
386 """ 387 Parse string returned by on submission to extract the 388 id of the job cluster 389 390 @param submit_status: string returned by submit cdm 391 """ 392 393 self.logger.debug(submit_status) 394 idfile = open(self.ids,'r') 395 job_id = None 396 for line in idfile.readlines(): 397 if line.strip().startswith('http'): 398 job_id = line.strip() 399 self.job_ids.append(job_id) 400 break 401 if not job_id: 402 self.logger.warn('could not parse job id from "%s"' % self.ids) 403 return job_id
404 405
406 - def CheckQ(self,db=None):
407 """ 408 Querie status of cluster or job on queue 409 """ 410 411 cmd = self.checkqueue_cmd 412 for id in self.job_ids: 413 cmd += " %s" % id 414 self.logger.debug(cmd) 415 pout = os.popen(cmd) 416 status = string.join(pout.readlines()) 417 pout.close() 418 return status
419
420 - def CleanQ(self,jobs=None):
421 """ 422 Check consistency of queue and remove jobs which shouldn't be there 423 """ 424 425 return 0
426
427 - def CheckJobStatus(self,jobs):
428 """ 429 Querie status of job on glite queue 430 """ 431 status_dict = { 432 'SUBMITTED': "The job has been submitted by the user but not yet processed by the Network Server", 433 'WAITING': "The job has been accepted by the Network Server but not yet processed by the Workload Manager", 434 'READY': "The job has been assigned to a Computing Element but not yet transferred to it", 435 'SCHEDULED': "The job is waiting in the Computing Element's queue", 436 'RUNNING': "The job is running", 437 'DONE': "The job has finished", 438 'ABORTED': "The job has been aborted by the WMS (e.g. because it was too long, or the proxy certificated expired, etc.)", 439 'CANCELLED': "The job has been cancelled by the user", 440 'CLEARED': "The Output Sandbox has been transferred to the User Interface ", 441 '?': "Unknown status" 442 } 443 if isinstance(jobs,list): 444 job_list = jobs 445 else: 446 job_list = [jobs] 447 for job in job_list: 448 dataset_id = job.GetDatasetId() 449 queue_id = job.GetProcNum() 450 job_id = job.GetJobId() 451 status = "?" 452 453 if not job_id: 454 job.SetStatus('?') 455 456 cmd = self.checkqueue_cmd + ' ' + job_id 457 handle = os.popen(cmd, 'r') 458 459 for line in handle.readlines(): 460 line = line.strip() 461 if line.startswith('Current Status:'): 462 status = line[(line.find(':') + 1):].replace(' ','').split()[0] 463 handle.close() 464 self.logger.info("%s.%s (%s) : %s" %(dataset_id,queue_id,job_id,status.upper())) 465 if status_dict.has_key(status.upper()): self.logger.info(status_dict[status.upper()]) 466 467 if status.upper() in self.queued_state: 468 job.SetStatus('QUEUED') 469 elif status.upper() in self.running_state: 470 job.SetStatus('PROCESSING') 471 elif status.upper() in self.error_state: 472 job.SetStatus('FAILED') 473 elif status.upper() in self.finished_state: 474 job.SetStatus('FINISHED') 475 else: 476 job.SetStatus('?') 477 478 return 1
479 480
481 - def QRemove(self,job=None):
482 """ 483 Remove cluster or job from glite queue 484 """ 485 486 cmd = self.queue_rm_cmd + " --noint " 487 if jobid: cmd += " %s" % job.job_id 488 else: 489 for id in self.job_ids: cmd += " %s" % id 490 491 self.logger.debug(cmd) 492 handle = os.popen(cmd, 'r') 493 status = string.join(handle.readlines()) 494 handle.close() 495 496 if status.find('Job has already finished') != -1: 497 cmd = self.queue_rm_cmd + " --noint " 498 if job: cmd += " %s" % job.job_id 499 else: 500 for id in self.job_ids: cmd += " %s" % id 501 502 self.logger.debug(cmd) 503 handle = os.popen(cmd, 'r') 504 status = string.join(handle.readlines()) 505 handle.close() 506 507 return status
508 509 510
511 - def PostCopy(self,jobdict,target_url,maxtries=4):
512 """ 513 Interface: Remove active job/cluster from queuing system. 514 """ 515 if not jobdict['grid_queue_id']: return True 516 517 dataset = jobdict['dataset_id'] 518 proc = jobdict['queue_id'] 519 job_id = jobdict['grid_queue_id'] 520 submitdir = jobdict['submitdir'] 521 522 cmd = '%s --noint --dir %s %s' % (self.get_output_cmd,submitdir,job_id) 523 self.logger.debug(cmd) 524 if not os.system(cmd): 525 # job finished correctly. no need to remove from queue 526 jobdict['grid_queue_id'] = None 527 528 ### Need to add copy of lfn: files 529 inventoryfile = os.path.join(submitdir,iceprod.core.exe._inventory) 530 if not os.path.exists(inventoryfile): 531 self.logger.error("%d,%d: no output inventory found." % (dataset,proc)) 532 return self.CopyStatusEnum['NOTREADY'] 533 inventory = FileInventory() 534 inventorylist = [] 535 self.logger.debug( 'reading %s' % inventoryfile ) 536 inventory.Read(inventoryfile) 537 538 # copy data back to submit dir 539 for file in inventory.filelist: 540 541 jobdict['filename'] = os.path.basename(file['source']) 542 cmd = 'lcg-cp --vo icecube ' 543 cmd += ' -v %s ' % self.grid_storage 544 cmd += ' file:/%(submitdir)s/%(filename)s' 545 546 # delete files from temporary storage 547 if os.system(cmd % jobdict): 548 cmd = 'lcg-del --vo icecube ' 549 cmd += ' -a %s ' % self.grid_storage 550 os.system(cmd % jobdict) 551 552 return self.CopyStatusEnum['OK']
553 554
555 - def Clean(self,jobdict,force=False):
556 """ 557 remove job from queue 558 """ 559 if not jobdict['grid_queue_id']: return True 560 561 dataset = jobdict['dataset_id'] 562 proc = jobdict['queue_id'] 563 job_id = jobdict['grid_queue_id'] 564 submitdir = jobdict['submitdir'] 565 566 from iceprod.server.job import i3Job 567 job = i3Job() 568 job.SetDatasetId(dataset) 569 job.SetJobId(job_id) 570 job.SetProcNum(proc) 571 job.SetRootDir(submitdir) 572 # This should be done through Remove 573 if job_id: 574 self.CheckJobStatus([job]) 575 if job.GetStatus() in self.running_state and not force: return False 576 elif job.GetStatus() in self.error_state: 577 self.QPurge(job_id) 578 elif job.GetStatus() in self.finished_state: 579 self.QPurge(job) 580 else: 581 self.logger.info('removing job: %s' % job_id) 582 self.QRemove(job) 583 return False 584 585 return iGrid.Clean(self,jobdict,force)
586
587 - def QPurge(self,job=None):
588 """ 589 purge job from from queue 590 """ 591 # for glite, assume they meant to remove the job 592 return self.QRemove(job)
593 594
595 -class Edg(gLite):
596 """ 597 This class represents a job or cluster on an egee grid. 598 """ 599
600 - def __init__(self):
601 gLite.__init__(self) 602 self.enqueue_cmd = "edg-job-submit" 603 self.checkqueue_cmd = "edg-job-status" 604 self.get_output_cmd = "edj-job-get-output" 605 self.queue_rm_cmd = "edg-job-cancel" 606 self.logger = logging.getLogger('Edg')
607 608
609 - def PostCopy(self,jobdict,target_url,maxtries=4):
610 """ 611 Interface: Remove active job/cluster from queuing system. 612 """ 613 614 dataset = jobdict['dataset_id'] 615 proc = jobdict['queue_id'] 616 job_id = jobdict['grid_queue_id'] 617 submitdir = jobdict['submitdir'] 618 619 cmd = '%s -dir %s/output %s' % (self.get_output_cmd,submitdir,job_id) 620 self.logger.debug(cmd) 621 os.system(cmd) 622 return True
623
624 - def Clean(self,jobdict,force=False):
625 """ 626 remove job from queue 627 """ 628 job = i3Job() 629 job.dataset_id = jobdict['dataset_id'] 630 job.proc = jobdict['queue_id'] 631 job.job_id = jobdict['grid_queue_id'] 632 job.submitdir = jobdict['submitdir'] 633 634 if job_id: 635 self.logger.info('removing job: %s' % job.job_id) 636 self.QRemove(job) 637 638 cmd = '%s -dir %s %s' % (self.get_output_cmd,job.submitdir,job.job_id) 639 self.logger.debug(cmd) 640 os.system(cmd) 641 return iGrid.Clean(self,jobdict,force)
642 643 644
645 -class Cream(gLite):
646 """ 647 Next generation middleware replacement for gLite 648 """ 649 queued_state = [ 650 'IDLE', 651 'PENDING', 652 'REGISTERED', 653 ] 654 655 running_state = [ 656 'RUNNING', 657 'REALLY-RUNNING', 658 ] 659 660 error_state = [ 661 'ABORTED', 662 'DONE-FAILED', 663 'CANCELLED', 664 'HELD', 665 ] 666 667 finished_state = [ 668 'DONE-OK', 669 ] 670 671 undetermined_states = [ 672 'UNKNOWND', 673 ] 674
675 - def __init__(self):
676 gLite.__init__(self) 677 self.cluster_id = -1 678 self.post = None 679 self.ids = "ids" 680 self.enqueue_cmd = "glite-ce-job-submit" 681 self.checkqueue_cmd = "glite-ce-job-status" 682 self.list_queue_cmd = "glite-ce-job-list" 683 self.queue_rm_cmd = "glite-ce-job-cancel" 684 self.get_output_cmd = "glite-ce-job-output" 685 self.purge_job_cmd = "glite-ce-job-purge" 686 self.suffix = "jdl" 687 self.logger = logging.getLogger('Cream')
688
689 - def CheckQ(self,db=None):
690 """ 691 Querie status of cluster or job on queue 692 """ 693 694 cmd = self.checkqueue_cmd 695 for id in self.job_ids: 696 cmd += " %s" % id 697 self.logger.debug(cmd) 698 pout = os.popen(cmd) 699 status = string.join(pout.readlines()) 700 pout.close()
701
702 - def CleanQ(self,jobs=None):
703 """ 704 Check consistency of queue and remove jobs which shouldn't be there 705 """ 706 cmd = self.list_queue_cmd 707 pout = os.popen(cmd) 708 status = string.join(pout.readlines()) 709 pout.close() 710 print status
711
712 - def QPurge(self,jobid=None):
713 """ 714 purge job from from cream queue 715 """ 716 717 cmd = self.purge_job_cmd + " --noint " 718 if jobid: cmd += " %s" % jobid 719 else: 720 for id in self.job_ids: cmd += " %s" % id 721 722 self.logger.debug(cmd) 723 status, output = commands.getstatusoutput(cmd) 724 if status: self.logger.error(output) 725 return status
726
727 - def WriteConfig(self,job,config_file):
728 gLite.WriteConfig(self,job,config_file) 729 submitfile_orig = open(config_file,'r') 730 slines = submitfile_orig.readlines() 731 submitfile_orig.close() 732 733 submitfile = open(config_file,'w') 734 job.Write(submitfile,'Type = "Job";') 735 job.Write(submitfile,'JobType = "Normal";') 736 for line in slines: 737 job.Write(submitfile,line) 738 submitfile.close();
739
740 - def CheckJobStatus(self,jobs):
741 """ 742 Querie status of job on glite queue 743 """ 744 status_dict = { 745 'REGISTERED': "The job has been registered but it has not been started yet", 746 'PENDING': "The job has been started, but it has still to be submitted to the LRMS abstraction layer module", 747 'IDLE': "The job is idle in the Local Resource Management System (LRMS)", 748 'RUNNING': "The job wrapper, which ecompasses the user job, is running in the LRMS", 749 'REALLY-RUNNING': "The job is running", 750 'DONE-OK': "The job has finished", 751 'DONE-FAILED': "The job has been executed but some errors occurred", 752 'ABORTED': "Errors occurred during the management of the job, e.g. submission to the LRMS", 753 'CANCELLED': "The job has been cancelled by the user", 754 '?': "Unknown status" 755 } 756 if isinstance(jobs,list): 757 job_list = jobs 758 else: 759 job_list = [jobs] 760 for job in job_list: 761 dataset_id = job.GetDatasetId() 762 queue_id = job.GetProcNum() 763 job_id = job.GetJobId() 764 status = "?" 765 766 if not job_id: 767 job.SetStatus('?') 768 769 cmd = self.checkqueue_cmd + ' ' + job_id 770 try: 771 try: 772 handle = os.popen(cmd, 'r') 773 status_search = re.search(r'Status\s*=\s*\[[a-zA-Z\-]+\]',handle.read()) 774 except Exception,e: 775 self.logger.warning("unable to receive job status: %s"%e) 776 finally: 777 if handle: 778 handle.close() 779 self.logger.info("job status %s" %(status_search)) 780 if not status_search: 781 self.logger.warning("job status for %s.%s is unknown. marking as ?"%(dataset_id,queue_id)) 782 job.SetStatus('?') 783 continue 784 status = status_search.group() 785 status = status.split('=')[1] 786 status = status.strip() 787 status = status.strip(']') 788 status = status.strip('[') 789 print status 790 self.logger.info("%s.%s (%s) : %s" %(dataset_id,queue_id,job_id,status.upper())) 791 if status_dict.has_key(status.upper()): self.logger.info(status_dict[status.upper()]) 792 793 if status.upper() in self.queued_state: 794 job.SetStatus('QUEUED') 795 elif status.upper() in self.running_state: 796 job.SetStatus('PROCESSING') 797 elif status.upper() in self.error_state: 798 job.SetStatus('FAILED') 799 elif status.upper() in self.finished_state: 800 job.SetStatus('FINISHED') 801 else: 802 job.SetStatus('?') 803 804 return 1
805 806
807 -class gLiteDAG(gLite):
808 """ 809 This class represents a job that executes in multiple parts using a DAG. 810 The DAG is only supported in gLite and not in Cream. 811 """ 812
813 - def __init__(self):
814 gLite.__init__(self)
815
816 - def WriteConfig(self,job,config_file):
817 """ 818 Write JDL submit file. 819 @param job: i3Job object 820 @param config_file: path to file where submit file will be written 821 """ 822 823 if not job.GetExecutable(): 824 raise Exception, "no executable configured" 825 826 from iceprod.core.dataclasses import IceTrayConfig 827 828 db = self.GetMonitorDB() 829 830 steering = job.GetSteering() 831 task_defs = steering.GetTaskDefinitions() 832 logger.debug("Task definitions: %s" % task_defs) 833 if not len(task_defs): 834 # not a DAG 835 logger.debug("No tasks specified in config file; doing regular submit") 836 return gLite.WriteConfig(self, job, config_file) 837 838 dagfile = open(config_file,'w') 839 job_id = job.GetDatabaseId() 840 841 job.Write(dagfile,'[') 842 job.Write(dagfile,' Type = "dag";') 843 job.Write(dagfile,' max_running_nodes = 25;') 844 845 file_catalog = {} 846 for taskname,td in task_defs.items(): 847 args = self.GetArguments(job,td,output="dict") 848 file_catalog[taskname] = self.GetFiles(job,td,args) 849 850 job.Write(dagfile,' nodes = [') 851 dag_nodes = [] 852 for taskname,td in task_defs.items(): 853 filename = config_file.replace('jdl',"%s.jdl" % taskname) 854 td_id = td.GetId() 855 logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, filename)) 856 parents = self.EnumerateParentNodes(steering,td) 857 if td.ParallelExecutionEnabled(): 858 trays = td.GetTrays() 859 for idx,tray in trays.items(): 860 for iter in tray.GetIters(): 861 cjob = deepcopy(job) # make a deep copy of the job 862 863 if not iter == -1: 864 nodename = "%s_%u" % (taskname,iter) 865 else: 866 nodename = "%s_ext" % taskname 867 filename = config_file.replace('jdl',"%s.jdl" % nodename) 868 done = db.task_is_finished(td_id, job_id, idx, iter) 869 args = self.GetArguments(cjob,td,idx,iter,output="dict") 870 871 input,output,notes = self.GetFiles(cjob,td,args,idx,iter,file_catalog) 872 873 self.WriteFileManifest(cjob,filename,input,output,notes) 874 cjob.AddInputFile(filename.replace(".jdl", ".input")) 875 cjob.AddInputFile(filename.replace(".jdl", ".output")) 876 self.WriteSubmitFile(cjob,filename,td,idx,iter) 877 cjob.Write(dagfile,' %s = [ ' % nodename) 878 cjob.Write(dagfile,' file = "%s"; ' % filename) 879 cjob.Write(dagfile,' node_retry_count = 1; ') 880 cjob.Write(dagfile,' ];') 881 if len(parents) == 1: 882 dag_nodes.append("{%s,%s}" % (parents[0],nodename)) 883 elif len(parents) > 1: 884 dag_nodes.append("{%s,%s}" % ("{%s}" % ",".join(parents),nodename)) 885 else: 886 cjob = deepcopy(job) # make a deep copy of the job 887 done = db.task_is_finished(td_id, job_id) 888 if taskname == 'trashcan': 889 done = False 890 args = self.GetArguments(cjob,td,output="dict") 891 892 input,output,notes = self.GetFiles(job,td,args,catalog=file_catalog) 893 self.WriteFileManifest(cjob,filename,input,output,notes) 894 self.WriteSubmitFile(cjob,filename,td) 895 cjob.Write(dagfile,' %s = [ ' % taskname) 896 cjob.Write(dagfile,' file = "%s"; ' % filename) 897 cjob.Write(dagfile,' node_retry_count = 1; ') 898 cjob.Write(dagfile,' ];') 899 if len(parents) == 1: 900 dag_nodes.append("{%s,%s}" % (parents[0],taskname)) 901 elif len(parents) > 1: 902 dag_nodes.append("{%s,%s}" % ("{%s}" % ",".join(parents),taskname)) 903 job.Write(dagfile,' ]; ') 904 job.Write(dagfile,' dependencies = { %s };' % ",".join(dag_nodes)) 905 job.Write(dagfile,']; ') 906 db.commit() 907 dagfile.close() 908 self.ids= config_file.replace('jdl','ids')
909
910 - def IsUrl(self, path):
911 return bool(re.match("[^/:]+://?.*$", path))
912
913 - def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False):
914 steering = job.GetSteering() 915 if idx is not False: 916 td_trays = {idx: td.GetTray(idx)} 917 else: 918 td_trays = td.GetTrays() 919 920 if args.has_key('dagtemp'): 921 tmp_dir = args['dagtemp'] 922 else: 923 tmp_dir = 'file:///tmp' # TODO: give this a sane default or make it an error 924 925 if args.has_key('fetch'): 926 global_dir = args['fetch'] 927 else: 928 global_dir = 'file:///tmp' 929 930 td_input = {} 931 td_output = {} 932 notes = {} 933 934 if td.IsCleanup(): 935 # cleanup job has no input/output files required 936 return (td_input, td_output, notes) 937 938 for idx, td_tray in td_trays.iteritems(): 939 args['tray'] = idx 940 941 logger.debug("GetTray(%s)" % idx) 942 icetray = steering.GetTray(idx) 943 944 input_files = icetray.GetInputFiles() 945 parsed_input = [] 946 947 output_files = icetray.GetOutputFiles() 948 parsed_output = [] 949 950 if iter is not False: 951 iters = [iter] 952 else: 953 iters = td_tray.GetIters() 954 for iter in iters: 955 args['iter'] = iter 956 parser = ExpParser(args,steering) 957 for d in steering.GetDependencies(): 958 d_file = parser.parse(d) 959 if not td_input.has_key(d_file): 960 location = d_file 961 if not self.IsUrl(location): 962 location = os.path.join(global_dir, location) 963 td_input[os.path.basename(d_file)] = [location] 964 for i_file in input_files: 965 name = i_file.GetName() 966 name = parser.parse(name) 967 if not td_input.has_key(name) \ 968 and not td_output.has_key(name): 969 if catalog: 970 node = self.FindFile(steering,td,catalog,name) 971 else: 972 node = td.GetName() 973 974 note = False 975 if node == "global": 976 location = global_dir 977 note = "global" 978 if node != "global": 979 location = tmp_dir 980 location = os.path.join(location, str(job.GetDatasetId())) 981 location = os.path.join(location, str(job.GetProcNum())) 982 location = os.path.join(location, node) 983 note = "dontextract" 984 location = os.path.join(location, name) 985 if i_file.IsPhotonicsTable(): 986 note = "photonics"; 987 if note: 988 notes[name] = note 989 td_input[name] = [location] 990 for o_file in output_files: 991 name = o_file.GetName() 992 name = parser.parse(name) 993 if not td_output.has_key(name): 994 location = os.path.join(tmp_dir, str(job.GetDatasetId())) 995 location = os.path.join(location, str(job.GetProcNum())) 996 location = os.path.join(location, str(td.GetName())) 997 location = os.path.join(location, name) 998 td_output[name] = [location] 999 1000 return (td_input, td_output, notes)
1001
1002 - def FindFile(self,steering,td,catalog,file):
1003 parents = td.GetParents() 1004 1005 # check immediate parents for this file 1006 for parent in parents: 1007 if catalog.has_key(parent): 1008 if catalog[parent][1].has_key(file): 1009 return parent 1010 1011 # check older ancestors 1012 for parent in parents: 1013 parent_td = steering.GetTaskDefinition(parent) 1014 result = self.FindFile(steering,parent_td,catalog,file) 1015 if result != "global": 1016 return result 1017 1018 return "global"
1019
1020 - def WriteFileManifest(self,job,filename,input,output,notes):
1021 logger.debug("Input files: %s" % input) 1022 in_manifest = open(filename.replace(".jdl", ".input"), 'w') 1023 if len(input): 1024 padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 1025 fmt_str = "%-" + padding + "s %s" 1026 for i_file, locs in input.items(): 1027 for loc in locs: 1028 file = fmt_str % (loc, i_file) 1029 if notes.has_key(i_file): 1030 file += "\t%s" % notes[i_file] 1031 job.Write(in_manifest, file) 1032 in_manifest.close() 1033 logger.debug("Output files: %s" % output) 1034 out_manifest = open(filename.replace(".jdl", ".output"), 'w') 1035 if len(output): 1036 padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 1037 fmt_str = "%-" + padding + "s %s" 1038 for o_file, locs in output.items(): 1039 for loc in locs: 1040 job.Write(out_manifest, fmt_str % (o_file, loc)) 1041 out_manifest.close()
1042
1043 - def EnumerateParentNodes(self,steering,td):
1044 parents = td.GetParents() 1045 parentnodes = [] 1046 for parent in parents: 1047 parentobj = steering.GetTaskDefinition(parent) 1048 if parentobj.ParallelExecutionEnabled(): 1049 for idx,tray in parentobj.GetTrays().items(): 1050 for iter in tray.GetIters(): 1051 if not iter == -1: 1052 nodename = "%s_%u" % (parent,iter) 1053 else: 1054 nodename = "%s_ext" % parent 1055 parentnodes.append(nodename) 1056 else: 1057 parentnodes.append(parent) 1058 return parentnodes
1059
1060 - def GetArguments(self,job,td,idx=False,iter=False,output="str"):
1061 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 1062 argstr = " --dag --task=%s" % td.GetName() 1063 if not idx is False: 1064 argstr += " --tray=%s --iter=%s" % (idx,iter) 1065 argstr += job.GetMainScript() + " " + " ".join(argopts) 1066 if output == "dict": 1067 argstr = self.ArgStrToDict(argstr) 1068 return argstr
1069
1070 - def ArgStrToDict(self,argstr):
1071 args = {} 1072 for str in argstr.split(" "): 1073 str = str[2:] 1074 pieces = str.split("=") 1075 if len(pieces) == 1: 1076 args[str] = 1 1077 else: 1078 args[pieces[0]] = pieces[1] 1079 return args
1080
1081 - def getAdditionalOptions(self,job):
1082 return {}
1083
1084 - def WriteSubmitFile(self,job,filename,td,idx=False,iter=False):
1085 1086 job.arguments = [] # otherwise we duplicate the arguments from next line 1087 job.argopts = self.GetArguments(job,td,idx,iter,'dict') 1088 job.errorfile = filename.replace(".jdl",".err") 1089 job.outputfile = filename.replace(".jdl",".out") 1090 job.logfile = filename.replace(".jdl",".log") 1091 gLite.WriteConfig(self,job,filename)
1092
1093 - def GetAdditionalOptions(self,job):
1094 #return {'x509userproxy': expandvars(job.GetSteering().GetSysOpt("globus_proxy").GetValue())} 1095 return {}
1096