1   
   2   
   3   
   4  """ 
   5   A basic wrapper for submitting and monitoring jobs to Condor. 
   6   Inherits from i3Queue 
   7    
   8   copyright  (c) 2005 the icecube collaboration 
   9    
  10   @version: $Revision: $ 
  11   @date: $Date: $ 
  12   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
  13  """ 
  14   
  15  import os 
  16  import re 
  17  import sys 
  18  import math 
  19  import dircache 
  20  import commands 
  21  import time 
  22  import string 
  23  import shutil 
  24  import ConfigParser 
  25  import logging 
  26  import iceprod 
  27  import iceprod.core.exe 
  28  from iceprod.core import metadata 
  29  from iceprod.core.dataclasses import Steering 
  30  from iceprod.server.grid import iGrid,DAG 
  31  from iceprod.core.lex import ExpParser 
  32  from os.path import basename,dirname 
  33  from copy import deepcopy 
  34   
  35  logger = logging.getLogger('eGee') 
  36   
  37  egee_status = {'1':'QUEUED', '2':'PROCESSING'} 
  38   
  40      return '\"%s\"' % str(x) 
   41   
  46   
  47   
  49      """ 
  50      This class represents a job or cluster on an egee grid. 
  51      """ 
  52      queued_state = [ 
  53         'READY', 
  54         'SCHEDULED', 
  55         'WAITING', 
  56         'SUBMITTED', 
  57         'ACCEPTED', 
  58         'PREPARING', 
  59         'QUEUEING', 
  60         'QUEUED', 
  61         'INLRMS:Q', 
  62      ] 
  63      running_state =    [ 
  64         'RUNNING', 
  65         'EXECUTED', 
  66         'FINISHING', 
  67      ] 
  68   
  69      error_state =    [ 
  70         'FAILED', 
  71         'DONE(EXITCODE!=0)', 
  72         'ABORTED', 
  73      ] 
  74   
  75      finished_state =    [ 
  76         'DONE(SUCCESS)', 
  77         'CLEARED', 
  78         'CANCELLED', 
  79      ] 
  80   
  81      undetermined_states =    [ 
  82         'UNAVAILABLE', 
  83      ] 
  84   
  85       
  87          iGrid.__init__(self) 
  88          self.cluster_id     = -1 
  89          self.post             = None 
  90          self.ids             = "ids" 
  91          self.enqueue_cmd    = "glite-wms-job-submit" 
  92          self.checkqueue_cmd = "glite-wms-job-status" 
  93          self.queue_rm_cmd   = "glite-wms-job-cancel" 
  94          self.get_output_cmd = "glite-wms-job-output" 
  95          self.suffix         = "jdl" 
  96          self.grid_storage   = "%(gridstorage)s/%(dataset_id)u/%(queue_id)u/%(filename)s" 
  97          self.logger       = logging.getLogger('gLite') 
   98   
  99   
 101          from random import choice 
 102          weighted_rlist = [] 
 103   
 104          for r in resource_list: 
 105              if len(r.split()) > 1: 
 106                 resource,weight = r.split() 
 107              else: 
 108                 resource,weight = r,1 
 109              try: 
 110                 weight = int(weight) 
 111              except Exception,e: 
 112                 self.logger.error("Exception: " + str(e)) 
 113                 self.logger.warn("Unable to get resource weight for: " +r) 
 114                 weight = 1 
 115   
 116              self.logger.debug("%s:%u " % (resource,weight)) 
 117              weighted_rlist.extend([resource]*weight) 
 118   
 119          return choice(weighted_rlist) 
  120   
 121 -    def BundleFiles(self,job,tarballname="inputfiles.tar"): 
  122          """ 
 123          Bundle multiple files as a single tarball so as not to exceed 
 124          limit of input files 
 125          @param filelist: list of filenames 
 126          @return: tarball path 
 127          """ 
 128          import tarfile 
 129          tmpdir  = os.path.join(job.GetInitialdir(),"inputfiles") 
 130          tarpath = os.path.join(self.GetInitialdir(),"inputfiles.tar") 
 131          if os.path.isfile(tarpath): return tarpath 
 132          if not os.path.exists(tmpdir): 
 133             os.makedirs(tmpdir) 
 134          cwdir = os.getcwd() 
 135          os.chdir(tmpdir) 
 136          tfile = tarfile.open(tarpath, 'w') 
 137          for file in job.GetInputFiles(): 
 138              if file.endswith('.py'): 
 139                 os.system('cp -f %s .' % file) 
 140                 self.logger.info("adding file %s to tarball" % file) 
 141                 tfile.add(os.path.basename(file)) 
 142          tfile.close() 
 143          os.chdir(cwdir) 
 144          return tarpath 
  145       
 147          """ 
 148          Write JDL to a file. 
 149          @param job: i3Job object 
 150          @param config_file: path to file were submit file will be written 
 151          """ 
 152           
 153          if not job.GetExecutable(): 
 154              raise Exception,  "no executable configured" 
 155           
 156          submitfile = open(config_file,'w') 
 157          wrapper = open(config_file.replace('jdl','sh'),'w') 
 158          self.ids= config_file.replace('jdl','ids') 
 159           
 160          job.Write(wrapper,"#!/bin/sh") 
 161          job.Write(wrapper,'echo "running iceprod on $HOSTNAME";',parse=False) 
 162          job.Write(wrapper,"uname -a;") 
 163   
 164           
 165          job.Write(wrapper,"I3SCRATCH=/tmp",parse=False) 
 166          for var in self.env.keys(): 
 167                  job.Write(wrapper, "export %s=%s" % (var, self.env[var]),parse=False ) 
 168          for var in job.env.keys(): 
 169                  job.Write(wrapper, "export %s=%s" % (var, job.env[var]),parse=False) 
 170   
 171          job.Write(wrapper,"INIT_DIR=$PWD",parse=False) 
 172          job.Write(wrapper,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 
 173          job.Write(wrapper,"mkdir -p $RUNDIR") 
 174          job.Write(wrapper,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 
 175          job.Write(wrapper,"cd $RUNDIR",parse=False) 
 176          job.Write(wrapper,"ln -s $INIT_DIR/* $RUNDIR/",parse=False) 
 177           
 178          err     = basename(job.GetErrorFile()) 
 179          out     = basename(job.GetOutputFile()) 
 180          argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 
 181          argstr  = job.GetMainScript() + " " + " ".join(argopts) 
 182          inputfiles = [] 
 183          inputfiles.append(wrapper.name) 
 184          inputfiles.append(job.GetExecutable()) 
 185          inputtar = self.BundleFiles(job,tarballname="inputfiles.tar") 
 186          inputfiles.append(inputtar) 
 187          for file in job.GetInputFiles(): 
 188              if not file.endswith('.py'): 
 189                 inputfiles.append(file) 
 190          inputfiles = map(qoute,inputfiles) 
 191           
 192          outputfiles = [] 
 193          outputfiles.append('work.tgz') 
 194          outputfiles.append("icetray.%06u.log" % job.GetProcNum()) 
 195          outputfiles.append('%s'%err) 
 196          outputfiles.append('%s'%out) 
 197          outputfiles.append(iceprod.core.exe._inventory)  
 198          outputfiles.extend(job.GetOutputFiles()) 
 199          outputfiles = map(qoute,outputfiles) 
 200           
 201          pyhome = '' 
 202          if self.env.has_key('PYROOT'): 
 203             pyhome = self.env['PYROOT'] 
 204          if job.env.has_key('PYROOT'): 
 205             pyhome = job.env['PYROOT'] 
 206          if pyhome.startswith('http:'): 
 207             job.Write(wrapper,"wget --quiet %s" % pyhome,parse=False) 
 208          if pyhome.endswith('.tgz'): 
 209             job.env['PYROOT'] = '$PWD/python-2.3' 
 210             job.Write(wrapper,"tar xzf %s" % os.path.basename(pyhome),parse=False) 
 211              
 212           
 213          job.Write(submitfile,'Executable          = "%s";' % basename(wrapper.name)) 
 214          job.Write(submitfile,'StdOutput           = "%s";' % out) 
 215          job.Write(submitfile,'StdError            = "%s";' % err) 
 216          job.Write(submitfile,'InputSandbox        = {%s};' % ','.join(inputfiles)) 
 217          job.Write(submitfile,'OutputSandbox       = {%s};' % ','.join(outputfiles)) 
 218          job.Write(submitfile,'Arguments           = "%s";' % argstr,parse=False) 
 219           
 220          job.Write(wrapper, 'cmd="$PYROOT/bin/python %s $*"' % basename(job.GetExecutable()),parse=False) 
 221          job.Write(wrapper, "echo $cmd",parse=False) 
 222          job.Write(wrapper, "$cmd",parse=False) 
 223          job.Write(wrapper, "retval=$?",parse=False) 
 224           
 225           
 226          for key in self.GetParamKeys(): 
 227              if not job.batchopts.has_key(key): 
 228                 job.AddBatchOpt(key,self.GetParam(key)) 
 229           
 230           
 231          submitdir = dirname(os.path.abspath(out)) 
 232          for key,opt in job.GetBatchOpts().items(): 
 233              if key.startswith('-'): continue 
 234              elif key.upper().startswith('OUTPUTSANDBOXBASEDESTURI'): 
 235                 opt += submitdir 
 236              job.Write(submitfile, '%s    = "%s";' % (key, opt)) 
 237           
 238          submitfile.close(); 
 239   
 240          read_from_inventory = False 
 241          if not read_from_inventory:  
 242               
 243              if not self.GetArgOpt('stageout'): 
 244                  job.Write(wrapper,"# copying files to storage element") 
 245                  job.Write(wrapper,"for file in *; do") 
 246                  job.Write(wrapper," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 
 247                  job.Write(wrapper,"   lcg-cr --vo icecube \\",parse=False) 
 248                  job.Write(wrapper,"          -d udo-dcache01.grid.uni-dortmund.de \\",parse=False) 
 249                  job.Write(wrapper,"          -l lfn:/grid/icecube/iceprod/%s/%s/$file` \\" \ 
 250                                      %(job.GetDatasetId(),job.GetProcNum()),parse=False) 
 251                  job.Write(wrapper,"           file:$PWD/$file",parse=False) 
 252                  job.Write(wrapper," fi; done") 
 253               
 254   
 255          else: 
 256               
 257               
 258               
 259               
 260               
 261               
 262              inventoryfile = os.path.join(submitdir,iceprod.core.exe._inventory) 
 263              copyok  = 0 
 264              if not os.path.exists(inventoryfile): 
 265                 self.logger.error("%d,%d: no output inventory found." % (dataset,proc)) 
 266                 return self.CopyStatusEnum['NOTREADY'] 
 267              inventory = FileInventory() 
 268              inventorylist = [] 
 269              self.logger.debug( 'reading %s' % inventoryfile ) 
 270              inventory.Read(inventoryfile) 
 271               
 272   
 273              job.Write(wrapper, "export LFC_HOST=`lcg-infosites --vo icecube lfc`",parse=False)   
 274              job.Write(wrapper, "export LCG_GFAL_INFOSYS=lcg-bdii.ifh.de:2170",parse=False)       
 275   
 276   
 277       
 278              copy_cmds = [] 
 279              for file in inventory.filelist: 
 280          
 281                  cmd = 'lcg-cr --vo icecube ' 
 282                  cmd += '-l %s/%s ' % (file["target"], os.path.split(file["source"])[1]) 
 283                  cmd += '%s' % (file["source"]) 
 284   
 285                  copy_cmds.append("lfc-mkdir -p %s" % (file["target"][4:])) 
 286                  copy_cmds.append(cmd) 
 287   
 288              cmd = "\n".join(copy_cmds) 
 289              job.Write(wrapper,"# copying files to storage element") 
 290              job.Write(wrapper,cmd,parse=False) 
 291           
 292               
 293              del_output = False 
 294              if del_output:       
 295                  del_cmds = [] 
 296                  for file in inventory.filelist: 
 297                      cmd = "/bin/rm %s" % (file["source"][5:]) 
 298                      del_cmds.append(cmd) 
 299                  cmd = "\n".join(del_cmds) 
 300               
 301                  job.Write(wrapper,"# deleting copied files") 
 302                  job.Write(wrapper,cmd,parse=False) 
 303   
 304          job.Write(wrapper,"cd $INIT_DIR",parse=False) 
 305          job.Write(wrapper,"rm -rf $RUNDIR",parse=False) 
 306          job.Write(wrapper,"exit $retval",parse=False) 
 307           
 308          wrapper.close(); 
  309       
 311          """ 
 312          Submit job/cluster to grid 
 313           
 314          @param job: i3Job object 
 315          @param config_file: path to file were submit file will be written 
 316          """ 
 317          status = 0 
 318          self.submit_status = '' 
 319          cwdir = os.getcwd() 
 320          for job in self.jobs: 
 321             self.logger.info("submitting job %u.%u" %(job.GetDatasetId(),job.GetProcNum())) 
 322             os.chdir(job.GetInitialdir()) 
 323   
 324             self.WriteConfig(job,job.config_file) 
 325           
 326             submit_args = [] 
 327   
 328              
 329             resource_list = [] 
 330             for key,opt in job.GetBatchOpts().items(): 
 331                 if key.startswith('-resource'): 
 332                    if opt.startswith('-r'):  
 333                       resource = opt[2:].strip() 
 334                    else: 
 335                       resource = opt 
 336                    resource_list.append(resource) 
 337      
 338                 elif key.startswith('-'): 
 339                    submit_args.append(opt) 
 340             if resource_list: 
 341                resource = self._choose_resource(resource_list) 
 342                submit_args.append('-r %s' % resource) 
 343              
 344             cmd = "%s %s -o %s %s" % (self.enqueue_cmd," ".join(submit_args),self.ids,job.config_file) 
 345             self.logger.debug(cmd) 
 346             status, self.submit_status = commands.getstatusoutput(cmd) 
 347             if status != 0: 
 348                self.logger.error("Failed to execute command") 
 349                self.logger.error(cmd) 
 350             try: 
 351                 id = self.get_id(self.submit_status) 
 352                 job.SetJobId(id) 
 353                 if id in (-1,None) : status = 1 
 354                 else: status = 0 
 355             except Exception, e: 
 356                 self.logger.error("Exception: " + str(e)) 
 357                 self.submit_status += "\nException: " + str(e) 
 358                 status = 1 
 359           
 360             if len(self.job_ids) and not self.cluster_id: 
 361                 self.cluster_id = self.job_ids[0] 
 362   
 363             job.submit_status = status 
 364             job.submit_msg = self.submit_status 
 365              
 366             if self.production: 
 367                  
 368                 if job.submit_status == 0:  
 369                    self.i3monitordb.jobsubmitted( 
 370                       job.GetDatasetId(), job.GetProcNum(), 
 371                       job.GetInitialdir(), job.GetJobId()) 
 372                 else:  
 373                    logger.error("failed to submit jobs:"+job.submit_msg) 
 374                    self.i3monitordb.jobabort(job.GetProcNum(),job.GetDatasetId(),3, 
 375                                    "failed to submit jobs:"+job.submit_msg) 
 376                    os.chdir('/tmp') 
 377                     
 378              
 379             os.chdir(cwdir) 
 380              
 381           
 382          return status,self.submit_status 
  383   
 384       
 385 -    def get_id(self,submit_status): 
  386          """ 
 387          Parse string returned by on submission to extract the 
 388          id of the job cluster 
 389           
 390          @param submit_status: string returned by submit cdm 
 391          """ 
 392           
 393          self.logger.debug(submit_status) 
 394          idfile = open(self.ids,'r') 
 395          job_id = None 
 396          for line in idfile.readlines(): 
 397              if line.strip().startswith('http'): 
 398                 job_id = line.strip() 
 399                 self.job_ids.append(job_id) 
 400                 break 
 401          if not job_id: 
 402             self.logger.warn('could not parse job id from "%s"' % self.ids) 
 403          return job_id 
  404   
 405       
 407          """ 
 408          Querie status of cluster or job on queue 
 409          """ 
 410           
 411          cmd = self.checkqueue_cmd 
 412          for id in self.job_ids: 
 413              cmd += " %s" % id 
 414          self.logger.debug(cmd) 
 415          pout = os.popen(cmd) 
 416          status = string.join(pout.readlines()) 
 417          pout.close() 
 418          return status 
  419       
 421          """ 
 422          Check consistency of queue and remove jobs which shouldn't be there 
 423          """ 
 424           
 425          return 0 
  426       
 428          """ 
 429          Querie status of job on glite queue 
 430          """ 
 431          status_dict = { 
 432            'SUBMITTED':   "The job has been submitted by the user but not yet processed by the Network Server", 
 433            'WAITING':     "The job has been accepted by the Network Server but not yet processed by the Workload Manager", 
 434            'READY':       "The job has been assigned to a Computing Element but not yet transferred to it", 
 435            'SCHEDULED':   "The job is waiting in the Computing Element's queue", 
 436            'RUNNING':     "The job is running", 
 437            'DONE':        "The job has finished", 
 438            'ABORTED':     "The job has been aborted by the WMS (e.g. because it was too long, or the proxy certificated expired, etc.)", 
 439            'CANCELLED':   "The job has been cancelled by the user", 
 440            'CLEARED':     "The Output Sandbox has been transferred to the User Interface ", 
 441            '?':           "Unknown status" 
 442            } 
 443          if isinstance(jobs,list): 
 444                     job_list = jobs 
 445          else: 
 446                     job_list = [jobs] 
 447          for job in job_list: 
 448                     dataset_id = job.GetDatasetId() 
 449                     queue_id   = job.GetProcNum() 
 450                     job_id     = job.GetJobId() 
 451                     status     = "?" 
 452           
 453                     if not job_id: 
 454                        job.SetStatus('?') 
 455           
 456                     cmd = self.checkqueue_cmd + ' ' + job_id 
 457                     handle = os.popen(cmd, 'r') 
 458   
 459                     for line in handle.readlines(): 
 460                        line = line.strip() 
 461                        if line.startswith('Current Status:'): 
 462                           status = line[(line.find(':') + 1):].replace(' ','').split()[0] 
 463                     handle.close() 
 464                     self.logger.info("%s.%s (%s) : %s" %(dataset_id,queue_id,job_id,status.upper())) 
 465                     if status_dict.has_key(status.upper()): self.logger.info(status_dict[status.upper()]) 
 466           
 467                     if status.upper() in self.queued_state: 
 468                        job.SetStatus('QUEUED') 
 469                     elif status.upper() in self.running_state: 
 470                        job.SetStatus('PROCESSING') 
 471                     elif status.upper() in self.error_state: 
 472                        job.SetStatus('FAILED') 
 473                     elif status.upper() in self.finished_state: 
 474                        job.SetStatus('FINISHED') 
 475                     else: 
 476                        job.SetStatus('?') 
 477           
 478          return 1 
  479   
 480       
 482          """ 
 483          Remove cluster or job from glite queue 
 484          """ 
 485           
 486          cmd = self.queue_rm_cmd  + " --noint " 
 487          if jobid: cmd += " %s" % job.job_id 
 488          else: 
 489             for id in self.job_ids: cmd += " %s" % id 
 490           
 491          self.logger.debug(cmd) 
 492          handle = os.popen(cmd, 'r') 
 493          status = string.join(handle.readlines()) 
 494          handle.close() 
 495           
 496          if status.find('Job has already finished') != -1: 
 497              cmd = self.queue_rm_cmd  + " --noint " 
 498              if job: cmd += " %s" % job.job_id 
 499              else: 
 500                  for id in self.job_ids: cmd += " %s" % id 
 501               
 502              self.logger.debug(cmd) 
 503              handle = os.popen(cmd, 'r') 
 504              status = string.join(handle.readlines()) 
 505              handle.close() 
 506           
 507          return status 
  508   
 509   
 510       
 511 -    def PostCopy(self,jobdict,target_url,maxtries=4): 
  512          """ 
 513          Interface: Remove active job/cluster from queuing system. 
 514          """ 
 515          if not jobdict['grid_queue_id']: return True 
 516           
 517          dataset    = jobdict['dataset_id'] 
 518          proc       = jobdict['queue_id'] 
 519          job_id     = jobdict['grid_queue_id'] 
 520          submitdir  = jobdict['submitdir'] 
 521           
 522          cmd = '%s --noint --dir %s %s' % (self.get_output_cmd,submitdir,job_id) 
 523          self.logger.debug(cmd) 
 524          if not os.system(cmd):  
 525              
 526             jobdict['grid_queue_id'] = None 
 527           
 528           
 529          inventoryfile = os.path.join(submitdir,iceprod.core.exe._inventory) 
 530          if not os.path.exists(inventoryfile): 
 531             self.logger.error("%d,%d: no output inventory found." % (dataset,proc)) 
 532             return self.CopyStatusEnum['NOTREADY'] 
 533          inventory = FileInventory() 
 534          inventorylist = [] 
 535          self.logger.debug( 'reading %s' % inventoryfile ) 
 536          inventory.Read(inventoryfile) 
 537           
 538           
 539          for file in inventory.filelist: 
 540               
 541              jobdict['filename'] = os.path.basename(file['source']) 
 542              cmd  = 'lcg-cp --vo icecube ' 
 543              cmd += ' -v %s ' % self.grid_storage 
 544              cmd += ' file:/%(submitdir)s/%(filename)s' 
 545               
 546               
 547              if os.system(cmd % jobdict): 
 548                 cmd  = 'lcg-del --vo icecube ' 
 549                 cmd += ' -a %s ' % self.grid_storage 
 550                 os.system(cmd % jobdict) 
 551           
 552          return self.CopyStatusEnum['OK'] 
  553   
 554       
 555 -    def Clean(self,jobdict,force=False): 
  586   
 588          """ 
 589          purge job from from queue 
 590          """ 
 591           
 592          return self.QRemove(job) 
   593   
 594   
 596      """ 
 597      This class represents a job or cluster on an egee grid. 
 598      """ 
 599       
 601          gLite.__init__(self) 
 602          self.enqueue_cmd    = "edg-job-submit" 
 603          self.checkqueue_cmd = "edg-job-status" 
 604          self.get_output_cmd = "edj-job-get-output" 
 605          self.queue_rm_cmd   = "edg-job-cancel" 
 606          self.logger         = logging.getLogger('Edg') 
  607   
 608       
 609 -    def PostCopy(self,jobdict,target_url,maxtries=4): 
  610          """ 
 611          Interface: Remove active job/cluster from queuing system. 
 612          """ 
 613           
 614          dataset    = jobdict['dataset_id'] 
 615          proc       = jobdict['queue_id'] 
 616          job_id     = jobdict['grid_queue_id'] 
 617          submitdir  = jobdict['submitdir'] 
 618           
 619          cmd = '%s -dir %s/output %s' % (self.get_output_cmd,submitdir,job_id) 
 620          self.logger.debug(cmd) 
 621          os.system(cmd) 
 622          return True 
  623       
 624 -    def Clean(self,jobdict,force=False): 
   642   
 643   
 644   
 646      """ 
 647      Next generation middleware replacement for gLite 
 648      """ 
 649      queued_state = [ 
 650         'IDLE', 
 651         'PENDING', 
 652         'REGISTERED', 
 653      ] 
 654   
 655      running_state = [ 
 656        'RUNNING', 
 657        'REALLY-RUNNING', 
 658      ] 
 659   
 660      error_state =    [ 
 661        'ABORTED', 
 662        'DONE-FAILED', 
 663        'CANCELLED', 
 664        'HELD', 
 665      ] 
 666   
 667      finished_state =    [ 
 668         'DONE-OK', 
 669      ] 
 670   
 671      undetermined_states =    [ 
 672        'UNKNOWND', 
 673      ] 
 674   
 676          gLite.__init__(self) 
 677          self.cluster_id     = -1 
 678          self.post             = None 
 679          self.ids             = "ids" 
 680          self.enqueue_cmd    = "glite-ce-job-submit" 
 681          self.checkqueue_cmd = "glite-ce-job-status" 
 682          self.list_queue_cmd = "glite-ce-job-list" 
 683          self.queue_rm_cmd   = "glite-ce-job-cancel" 
 684          self.get_output_cmd = "glite-ce-job-output" 
 685          self.purge_job_cmd  = "glite-ce-job-purge" 
 686          self.suffix         = "jdl" 
 687          self.logger         = logging.getLogger('Cream') 
  688   
 690          """ 
 691          Querie status of cluster or job on queue 
 692          """ 
 693           
 694          cmd = self.checkqueue_cmd 
 695          for id in self.job_ids: 
 696              cmd += " %s" % id 
 697          self.logger.debug(cmd) 
 698          pout = os.popen(cmd) 
 699          status = string.join(pout.readlines()) 
 700          pout.close() 
  701   
 703          """ 
 704          Check consistency of queue and remove jobs which shouldn't be there 
 705          """ 
 706          cmd = self.list_queue_cmd 
 707          pout = os.popen(cmd) 
 708          status = string.join(pout.readlines()) 
 709          pout.close() 
 710          print status 
  711   
 713          """ 
 714          purge job from from cream queue 
 715          """ 
 716           
 717          cmd = self.purge_job_cmd + " --noint " 
 718          if jobid: cmd += " %s" % jobid 
 719          else: 
 720             for id in self.job_ids: cmd += " %s" % id 
 721           
 722          self.logger.debug(cmd) 
 723          status, output = commands.getstatusoutput(cmd) 
 724          if status: self.logger.error(output) 
 725          return status 
  726   
 739   
 741          """ 
 742          Querie status of job on glite queue 
 743          """ 
 744          status_dict = { 
 745            'REGISTERED':  "The job has been registered but it has not been started yet", 
 746            'PENDING':     "The job has been started, but it has still to be submitted to the LRMS abstraction layer module", 
 747            'IDLE':        "The job is idle in the Local Resource Management System (LRMS)", 
 748            'RUNNING':     "The job wrapper, which ecompasses the user job, is running in the LRMS", 
 749            'REALLY-RUNNING':     "The job is running", 
 750            'DONE-OK':     "The job has finished", 
 751            'DONE-FAILED':     "The job has been executed but some errors occurred", 
 752            'ABORTED':     "Errors occurred during the management of the job, e.g. submission to the LRMS", 
 753            'CANCELLED':   "The job has been cancelled by the user", 
 754            '?':           "Unknown status" 
 755            } 
 756          if isinstance(jobs,list): 
 757                     job_list = jobs 
 758          else: 
 759                     job_list = [jobs] 
 760          for job in job_list: 
 761                     dataset_id = job.GetDatasetId() 
 762                     queue_id   = job.GetProcNum() 
 763                     job_id     = job.GetJobId() 
 764                     status     = "?" 
 765           
 766                     if not job_id: 
 767                        job.SetStatus('?') 
 768           
 769                     cmd = self.checkqueue_cmd + ' ' + job_id 
 770                     try: 
 771                        try: 
 772                           handle = os.popen(cmd, 'r') 
 773                           status_search = re.search(r'Status\s*=\s*\[[a-zA-Z\-]+\]',handle.read()) 
 774                        except Exception,e: 
 775                           self.logger.warning("unable to receive job status: %s"%e) 
 776                     finally: 
 777                        if handle: 
 778                           handle.close() 
 779                     self.logger.info("job status %s" %(status_search)) 
 780                     if not status_search: 
 781                        self.logger.warning("job status for %s.%s is unknown.  marking as ?"%(dataset_id,queue_id)) 
 782                        job.SetStatus('?') 
 783                        continue 
 784                     status = status_search.group() 
 785                     status = status.split('=')[1] 
 786                     status = status.strip() 
 787                     status = status.strip(']') 
 788                     status = status.strip('[') 
 789                     print status 
 790                     self.logger.info("%s.%s (%s) : %s" %(dataset_id,queue_id,job_id,status.upper())) 
 791                     if status_dict.has_key(status.upper()): self.logger.info(status_dict[status.upper()]) 
 792           
 793                     if status.upper() in self.queued_state: 
 794                        job.SetStatus('QUEUED') 
 795                     elif status.upper() in self.running_state: 
 796                        job.SetStatus('PROCESSING') 
 797                     elif status.upper() in self.error_state: 
 798                        job.SetStatus('FAILED') 
 799                     elif status.upper() in self.finished_state: 
 800                        job.SetStatus('FINISHED') 
 801                     else: 
 802                        job.SetStatus('?') 
 803           
 804          return 1 
   805   
 806   
 808      """ 
 809      This class represents a job that executes in multiple parts using a DAG. 
 810      The DAG is only supported in gLite and not in Cream. 
 811      """ 
 812   
 815   
 817          """ 
 818          Write JDL submit file. 
 819          @param job: i3Job object  
 820          @param config_file: path to file where submit file will be written 
 821          """ 
 822   
 823          if not job.GetExecutable(): 
 824              raise Exception,  "no executable configured" 
 825   
 826          from iceprod.core.dataclasses import IceTrayConfig 
 827   
 828          db = self.GetMonitorDB() 
 829   
 830          steering  = job.GetSteering() 
 831          task_defs = steering.GetTaskDefinitions() 
 832          logger.debug("Task definitions: %s" % task_defs) 
 833          if not len(task_defs): 
 834               
 835              logger.debug("No tasks specified in config file; doing regular submit") 
 836              return gLite.WriteConfig(self, job, config_file) 
 837   
 838          dagfile   = open(config_file,'w') 
 839          job_id    = job.GetDatabaseId() 
 840   
 841          job.Write(dagfile,'[') 
 842          job.Write(dagfile,' Type                = "dag";') 
 843          job.Write(dagfile,' max_running_nodes   = 25;') 
 844   
 845          file_catalog = {} 
 846          for taskname,td in task_defs.items(): 
 847              args = self.GetArguments(job,td,output="dict") 
 848              file_catalog[taskname] = self.GetFiles(job,td,args) 
 849   
 850          job.Write(dagfile,' nodes = [') 
 851          dag_nodes = [] 
 852          for taskname,td in task_defs.items(): 
 853              filename = config_file.replace('jdl',"%s.jdl" % taskname) 
 854              td_id = td.GetId() 
 855              logger.debug("Got task %s (ID=%u) with filename %s" % (taskname, td_id, filename)) 
 856              parents = self.EnumerateParentNodes(steering,td) 
 857              if td.ParallelExecutionEnabled(): 
 858                  trays = td.GetTrays() 
 859                  for idx,tray in trays.items(): 
 860                      for iter in tray.GetIters(): 
 861                          cjob = deepcopy(job)  
 862   
 863                          if not iter == -1: 
 864                              nodename = "%s_%u" % (taskname,iter) 
 865                          else: 
 866                              nodename = "%s_ext" % taskname 
 867                          filename = config_file.replace('jdl',"%s.jdl" % nodename) 
 868                          done     = db.task_is_finished(td_id, job_id, idx, iter) 
 869                          args     = self.GetArguments(cjob,td,idx,iter,output="dict") 
 870   
 871                          input,output,notes = self.GetFiles(cjob,td,args,idx,iter,file_catalog) 
 872   
 873                          self.WriteFileManifest(cjob,filename,input,output,notes) 
 874                          cjob.AddInputFile(filename.replace(".jdl", ".input")) 
 875                          cjob.AddInputFile(filename.replace(".jdl", ".output")) 
 876                          self.WriteSubmitFile(cjob,filename,td,idx,iter) 
 877                          cjob.Write(dagfile,'  %s = [ ' % nodename) 
 878                          cjob.Write(dagfile,'     file = "%s"; ' % filename) 
 879                          cjob.Write(dagfile,'     node_retry_count = 1; ') 
 880                          cjob.Write(dagfile,'  ];') 
 881                          if len(parents) == 1: 
 882                              dag_nodes.append("{%s,%s}" % (parents[0],nodename)) 
 883                          elif len(parents) > 1: 
 884                              dag_nodes.append("{%s,%s}" % ("{%s}" % ",".join(parents),nodename)) 
 885              else: 
 886                  cjob = deepcopy(job)  
 887                  done = db.task_is_finished(td_id, job_id) 
 888                  if taskname == 'trashcan':  
 889                       done = False 
 890                  args = self.GetArguments(cjob,td,output="dict") 
 891   
 892                  input,output,notes = self.GetFiles(job,td,args,catalog=file_catalog) 
 893                  self.WriteFileManifest(cjob,filename,input,output,notes) 
 894                  self.WriteSubmitFile(cjob,filename,td) 
 895                  cjob.Write(dagfile,'  %s = [ ' % taskname) 
 896                  cjob.Write(dagfile,'    file = "%s"; ' % filename) 
 897                  cjob.Write(dagfile,'    node_retry_count = 1; ') 
 898                  cjob.Write(dagfile,'  ];') 
 899                  if len(parents) == 1: 
 900                      dag_nodes.append("{%s,%s}" % (parents[0],taskname)) 
 901                  elif len(parents) > 1: 
 902                      dag_nodes.append("{%s,%s}" % ("{%s}" % ",".join(parents),taskname)) 
 903          job.Write(dagfile,'  ];   ') 
 904          job.Write(dagfile,'  dependencies = { %s };' % ",".join(dag_nodes)) 
 905          job.Write(dagfile,'];   ') 
 906          db.commit() 
 907          dagfile.close() 
 908          self.ids= config_file.replace('jdl','ids') 
  909   
 911          return bool(re.match("[^/:]+://?.*$", path)) 
  912   
 913 -    def GetFiles(self,job,td,args,idx=False,iter=False,catalog=False): 
  914          steering = job.GetSteering() 
 915          if idx is not False: 
 916              td_trays = {idx: td.GetTray(idx)} 
 917          else: 
 918              td_trays = td.GetTrays() 
 919   
 920          if args.has_key('dagtemp'): 
 921              tmp_dir = args['dagtemp'] 
 922          else: 
 923              tmp_dir = 'file:///tmp'  
 924   
 925          if args.has_key('fetch'): 
 926              global_dir = args['fetch'] 
 927          else: 
 928              global_dir = 'file:///tmp' 
 929   
 930          td_input  = {} 
 931          td_output = {} 
 932          notes     = {} 
 933   
 934          if td.IsCleanup(): 
 935               
 936              return (td_input, td_output, notes) 
 937   
 938          for idx, td_tray in td_trays.iteritems(): 
 939              args['tray'] = idx 
 940               
 941              logger.debug("GetTray(%s)" % idx) 
 942              icetray = steering.GetTray(idx) 
 943               
 944              input_files  = icetray.GetInputFiles() 
 945              parsed_input = [] 
 946               
 947              output_files  = icetray.GetOutputFiles() 
 948              parsed_output = [] 
 949               
 950              if iter is not False: 
 951                  iters = [iter] 
 952              else: 
 953                  iters = td_tray.GetIters() 
 954              for iter in iters: 
 955                  args['iter'] = iter 
 956                  parser = ExpParser(args,steering) 
 957                  for d in steering.GetDependencies(): 
 958                      d_file = parser.parse(d) 
 959                      if not td_input.has_key(d_file): 
 960                          location = d_file 
 961                          if not self.IsUrl(location): 
 962                              location = os.path.join(global_dir, location) 
 963                          td_input[os.path.basename(d_file)] = [location] 
 964                  for i_file in input_files: 
 965                      name = i_file.GetName() 
 966                      name = parser.parse(name) 
 967                      if not td_input.has_key(name) \ 
 968                          and not td_output.has_key(name): 
 969                          if catalog: 
 970                              node = self.FindFile(steering,td,catalog,name) 
 971                          else: 
 972                              node = td.GetName() 
 973   
 974                          note = False 
 975                          if node == "global": 
 976                              location = global_dir 
 977                              note = "global" 
 978                          if node != "global": 
 979                              location = tmp_dir 
 980                              location = os.path.join(location, str(job.GetDatasetId())) 
 981                              location = os.path.join(location, str(job.GetProcNum())) 
 982                              location = os.path.join(location, node) 
 983                              note = "dontextract" 
 984                          location = os.path.join(location, name) 
 985                          if i_file.IsPhotonicsTable(): 
 986                              note = "photonics"; 
 987                          if note: 
 988                              notes[name] = note 
 989                          td_input[name] = [location] 
 990                  for o_file in output_files: 
 991                      name = o_file.GetName() 
 992                      name = parser.parse(name) 
 993                      if not td_output.has_key(name): 
 994                          location = os.path.join(tmp_dir, str(job.GetDatasetId())) 
 995                          location = os.path.join(location, str(job.GetProcNum())) 
 996                          location = os.path.join(location, str(td.GetName())) 
 997                          location = os.path.join(location, name) 
 998                          td_output[name] = [location] 
 999   
1000          return (td_input, td_output, notes) 
 1001   
1002 -    def FindFile(self,steering,td,catalog,file): 
 1003          parents = td.GetParents() 
1004   
1005           
1006          for parent in parents: 
1007              if catalog.has_key(parent): 
1008                  if catalog[parent][1].has_key(file): 
1009                      return parent 
1010   
1011           
1012          for parent in parents: 
1013              parent_td = steering.GetTaskDefinition(parent) 
1014              result    = self.FindFile(steering,parent_td,catalog,file) 
1015              if result != "global": 
1016                  return result 
1017   
1018          return "global" 
 1019   
1021          logger.debug("Input files: %s" % input) 
1022          in_manifest = open(filename.replace(".jdl", ".input"), 'w') 
1023          if len(input): 
1024              padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 
1025              fmt_str = "%-" + padding + "s %s" 
1026              for i_file, locs in input.items(): 
1027                  for loc in locs: 
1028                      file = fmt_str % (loc, i_file) 
1029                      if notes.has_key(i_file): 
1030                          file += "\t%s" % notes[i_file] 
1031                      job.Write(in_manifest, file) 
1032          in_manifest.close() 
1033          logger.debug("Output files: %s" % output) 
1034          out_manifest = open(filename.replace(".jdl", ".output"), 'w') 
1035          if len(output): 
1036              padding = str(max(map(lambda x: max(map(len, x)), input.values()))) 
1037              fmt_str = "%-" + padding + "s %s" 
1038              for o_file, locs in output.items(): 
1039                  for loc in locs: 
1040                      job.Write(out_manifest, fmt_str % (o_file, loc)) 
1041          out_manifest.close() 
 1042   
1044          parents = td.GetParents() 
1045          parentnodes = [] 
1046          for parent in parents: 
1047              parentobj = steering.GetTaskDefinition(parent) 
1048              if parentobj.ParallelExecutionEnabled(): 
1049                  for idx,tray in parentobj.GetTrays().items(): 
1050                      for iter in tray.GetIters(): 
1051                          if not iter == -1: 
1052                              nodename = "%s_%u" % (parent,iter) 
1053                          else: 
1054                              nodename = "%s_ext" % parent 
1055                          parentnodes.append(nodename) 
1056              else: 
1057                  parentnodes.append(parent) 
1058          return parentnodes 
 1059   
1060 -    def GetArguments(self,job,td,idx=False,iter=False,output="str"): 
 1069   
1071          args = {} 
1072          for str in argstr.split(" "): 
1073              str = str[2:] 
1074              pieces = str.split("=") 
1075              if len(pieces) == 1: 
1076                  args[str] = 1 
1077              else: 
1078                  args[pieces[0]] = pieces[1] 
1079          return args 
 1080   
1083   
1092   
 1096