Package iceprod :: Package server :: Package plugins :: Module glite
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.plugins.glite

  1  #!/bin/env python 
  2  # 
  3   
  4  """ 
  5   A basic wrapper for submitting and monitoring jobs to Condor. 
  6   Inherits from i3Queue 
  7    
  8   copyright  (c) 2005 the icecube collaboration 
  9    
 10   @version: $Revision: $ 
 11   @date: $Date: $ 
 12   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 13  """ 
 14   
 15  import os 
 16  import re 
 17  import sys 
 18  import math 
 19  import dircache 
 20  import commands 
 21  import time 
 22  import string 
 23  import shutil 
 24  import ConfigParser 
 25  import logging 
 26  import iceprod 
 27  import iceprod.core.exe 
 28  from iceprod.core import metadata 
 29  from iceprod.core.dataclasses import Steering 
 30  from iceprod.server.grid import iGrid 
 31  from os.path import basename 
 32   
 33  logger = logging.getLogger('eGee') 
 34   
 35  egee_status = {'1':'QUEUED', '2':'PROCESSING'} 
 36  queued_state = [ 
 37     'READY', 
 38     'SCHEDULED', 
 39     'WAITING', 
 40     'SUBMITTED', 
 41     'ACCEPTED', 
 42     'PREPARING', 
 43     'QUEUEING', 
 44     'QUEUED', 
 45     'INLRMS:Q', 
 46     ] 
 47  running_state =    [ 
 48     'RUNNING', 
 49     'EXECUTED', 
 50     'FINISHING', 
 51     ] 
 52   
 53  error_state =    [ 
 54     'FAILED', 
 55     'DONE(EXITCODE!=0)', 
 56     'ABORTED', 
 57     ] 
 58   
 59  finished_state =    [ 
 60     'DONE(SUCCESS)', 
 61     'CLEARED', 
 62     'CANCELLED', 
 63     ] 
 64   
 65  undetermined_states =    [ 
 66     'UNAVAILABLE', 
 67     ] 
 68   
69 -def qoute(x):
70 return '\"%s\"' % str(x)
71
72 -def cstatus(istatus):
73 if egee_status.has_key(istatus): 74 return egee_status[istatus] 75 return 'X'
76 77
78 -class gLite(iGrid):
79 """ 80 This class represents a job or cluster on an egee grid. 81 """ 82 83
84 - def __init__(self):
85 iGrid.__init__(self) 86 self.cluster_id = -1 87 self.post = None 88 self.ids = "ids" 89 self.enqueue_cmd = "glite-wms-job-submit" 90 self.checkqueue_cmd = "glite-wms-job-status" 91 self.queue_rm_cmd = "glite-wms-job-cancel" 92 self.get_output_cmd = "glite-wms-job-output" 93 self.suffix = "jdl" 94 self.grid_storage = "%(gridstorage)s/%(dataset_id)u/%(queue_id)u/%(filename)s"
95 96
97 - def _choose_resource(self,resource_list):
98 from random import choice 99 weighted_rlist = [] 100 101 for r in resource_list: 102 if len(r.split()) > 1: 103 resource,weight = r.split() 104 else: 105 resource,weight = r,1 106 try: 107 weight = int(weight) 108 except Exception,e: 109 logger.error("Exception: " + str(e)) 110 logger.warn("Unable to get resource weight for: " +r) 111 weight = 1 112 113 logger.debug("%s:%u " % (resource,weight)) 114 weighted_rlist.extend([resource]*weight) 115 116 return choice(weighted_rlist)
117
118 - def WriteConfig(self,job,config_file):
119 """ 120 Write JDL to a file. 121 @param job: i3Job object 122 @param config_file: path to file were submit file will be written 123 """ 124 125 if not job.GetExecutable(): 126 raise Exception, "no executable configured" 127 128 submitfile = open(config_file,'w') 129 wrapper = open(config_file.replace('jdl','sh'),'w') 130 self.ids= config_file.replace('jdl','ids') 131 132 job.Write(wrapper,"#!/bin/sh") 133 job.Write(wrapper,'echo "running iceprod on $HOSTNAME";',parse=False) 134 job.Write(wrapper,"uname -a;") 135 136 # set default in case I3SCRATCH is not defined 137 job.Write(wrapper,"I3SCRATCH=/tmp",parse=False) 138 for var in self.env.keys(): 139 job.Write(wrapper, "export %s=%s" % (var, self.env[var]),parse=False ) 140 for var in job.env.keys(): 141 job.Write(wrapper, "export %s=%s" % (var, job.env[var]),parse=False) 142 143 job.Write(wrapper,"INIT_DIR=$PWD",parse=False) 144 job.Write(wrapper,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 145 job.Write(wrapper,"mkdir -p $RUNDIR") 146 job.Write(wrapper,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 147 job.Write(wrapper,"cd $RUNDIR",parse=False) 148 job.Write(wrapper,"ln -s $INIT_DIR/* $RUNDIR/",parse=False) 149 150 err = basename(job.GetErrorFile()) 151 out = basename(job.GetOutputFile()) 152 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 153 argstr = job.GetMainScript() + " " + " ".join(argopts) 154 inputfiles = [] 155 inputfiles.append(wrapper.name) 156 inputfiles.append(job.GetExecutable()) 157 inputfiles.extend(job.GetInputFiles()) 158 inputfiles = map(qoute,inputfiles) 159 160 outputfiles = [] 161 outputfiles.append('work.tgz') 162 outputfiles.append("icetray.%06u.log" % job.GetProcNum()) 163 outputfiles.append('%s'%err) 164 outputfiles.append('%s'%out) 165 outputfiles.append(iceprod.core.exe._inventory) # inventory 166 outputfiles.extend(job.GetOutputFiles()) 167 outputfiles = map(qoute,outputfiles) 168 169 pyhome = '' 170 if self.env.has_key('PYROOT'): 171 pyhome = self.env['PYROOT'] 172 if job.env.has_key('PYROOT'): 173 pyhome = job.env['PYROOT'] 174 if pyhome.startswith('http:'): 175 job.Write(wrapper,"wget --quiet %s" % pyhome,parse=False) 176 if pyhome.endswith('.tgz'): 177 job.env['PYROOT'] = '$PWD/python-2.3' 178 job.Write(wrapper,"tar xzf %s" % os.path.basename(pyhome),parse=False) 179 #inputfiles.append(pyhome) 180 181 job.Write(submitfile,'Executable = "%s";' % basename(wrapper.name)) 182 job.Write(submitfile,'StdOutput = "%s";' % out) 183 job.Write(submitfile,'StdError = "%s";' % err) 184 job.Write(submitfile,'InputSandbox = {%s};' % ','.join(inputfiles)) 185 job.Write(submitfile,'OutputSandbox = {%s};' % ','.join(outputfiles)) 186 job.Write(submitfile,'Arguments = "%s";' % argstr,parse=False) 187 188 job.Write(wrapper, 'cmd="$PYROOT/bin/python %s $*"' % basename(job.GetExecutable()),parse=False) 189 job.Write(wrapper, "echo $cmd",parse=False) 190 job.Write(wrapper, "$cmd",parse=False) 191 job.Write(wrapper, "retval=$?",parse=False) 192 193 # Add general batch options 194 for key in self.GetParamKeys(): 195 if not job.batchopts.has_key(key): 196 job.AddBatchOpt(key,self.GetParam(key)) 197 198 # Add job specific batch options 199 for key,opt in job.GetBatchOpts().items(): 200 if key.startswith('-'): continue 201 job.Write(submitfile, "%s = %s;" % (key, opt)) 202 203 submitfile.close(); 204 205 read_from_inventory = False 206 if not read_from_inventory: # 207 ################## old stuff ######################### 208 if not self.GetArgOpt('stageout'): 209 job.Write(wrapper,"# copying files to storage element") 210 job.Write(wrapper,"for file in *; do") 211 job.Write(wrapper," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 212 job.Write(wrapper," lcg-cr --vo icecube \\",parse=False) 213 job.Write(wrapper," -d udo-dcache01.grid.uni-dortmund.de \\",parse=False) 214 job.Write(wrapper," -l lfn:/grid/icecube/iceprod/%s/%s/$file` \\" \ 215 %(job.GetDatasetId(),job.GetProcNum()),parse=False) 216 job.Write(wrapper," file:$PWD/$file",parse=False) 217 job.Write(wrapper," fi; done") 218 ####################################################### 219 220 else: 221 ################## 222 # read inventoryfile 223 # copied from PostCopy(), doesn't seem to work 224 # target and source entries in the inventory.xml should look like this: 225 # <source>file:$RUNDIR/output.tgz</source> 226 # <target>lfn:/grid/icecube/some/folders/for/organization</target> 227 inventoryfile = os.path.join(submitdir,iceprod.core.exe._inventory) 228 copyok = 0 229 if not os.path.exists(inventoryfile): 230 logger.error("%d,%d: no output inventory found." % (dataset,proc)) 231 return self.CopyStatusEnum['NOTREADY'] 232 inventory = FileInventory() 233 inventorylist = [] 234 logger.debug( 'reading %s' % inventoryfile ) 235 inventory.Read(inventoryfile) 236 ################### 237 238 job.Write(wrapper, "export LFC_HOST=`lcg-infosites --vo icecube lfc`",parse=False) # needed for lfc-mkdir !!! 239 job.Write(wrapper, "export LCG_GFAL_INFOSYS=lcg-bdii.ifh.de:2170",parse=False) # should be ok on the WN, only to be sure. 240 241 242 # Copy Output to SE 243 copy_cmds = [] 244 for file in inventory.filelist: 245 # lcg-cr --vo icecube -l lfn:/grid/icecube/fabian/tests/$FILENAME file:/$RUNDIR/$FILENAME 246 cmd = 'lcg-cr --vo icecube ' 247 cmd += '-l %s/%s ' % (file["target"], os.path.split(file["source"])[1]) 248 cmd += '%s' % (file["source"]) 249 250 copy_cmds.append("lfc-mkdir -p %s" % (file["target"][4:])) 251 copy_cmds.append(cmd) 252 253 cmd = "\n".join(copy_cmds) 254 job.Write(wrapper,"# copying files to storage element") 255 job.Write(wrapper,cmd,parse=False) 256 257 # delete files from WN 258 del_output = False 259 if del_output: # actual this shouldn't be necessary, but I put it that there don't occure problems with globus-copy 260 del_cmds = [] 261 for file in inventory.filelist: 262 cmd = "/bin/rm %s" % (file["source"][5:]) 263 del_cmds.append(cmd) 264 cmd = "\n".join(del_cmds) 265 266 job.Write(wrapper,"# deleting copied files") 267 job.Write(wrapper,cmd,parse=False) 268 269 job.Write(wrapper,"cd $INIT_DIR",parse=False) 270 job.Write(wrapper,"rm -rf $RUNDIR",parse=False) 271 job.Write(wrapper,"exit $retval",parse=False) 272 273 wrapper.close();
274
275 - def Submit(self,job,config_file):
276 """ 277 Submit job/cluster to grid 278 279 @param job: i3Job object 280 @param config_file: path to file were submit file will be written 281 """ 282 self.submit_status = '' 283 self.WriteConfig(job,config_file) 284 285 submit_args = [] 286 287 # Add job specific batch options 288 resource_list = [] 289 for key,opt in job.GetBatchOpts().items(): 290 if key.startswith('-resource'): 291 if opt.startswith('-r'): 292 resource = opt[2:].strip() 293 else: 294 resource = opt 295 resource_list.append(resource) 296 297 elif key.startswith('-'): 298 submit_args.append(opt) 299 if resource_list: 300 resource = self._choose_resource(resource_list) 301 submit_args.append('-r %s' % resource) 302 303 cmd = "%s %s -o %s %s" % (self.enqueue_cmd," ".join(submit_args),self.ids,config_file) 304 logger.debug(cmd) 305 status, self.submit_status = commands.getstatusoutput(cmd) 306 if status != 0: 307 logger.error("Failed to execute command") 308 logger.error(cmd) 309 try: 310 id = self.get_id(self.submit_status) 311 job.SetJobId(id) 312 if id in (-1,None) : status = 1 313 else: status = 0 314 except Exception, e: 315 logger.error("Exception: " + str(e)) 316 self.submit_status += "\nException: " + str(e) 317 status = 1 318 319 if len(self.job_ids) and not self.cluster_id: 320 self.cluster_id = self.job_ids[0] 321 322 return status,self.submit_status
323 324
325 - def get_id(self,submit_status):
326 """ 327 Parse string returned by on submission to extract the 328 id of the job cluster 329 330 @param submit_status: string returned by submit cdm 331 """ 332 333 logger.debug(submit_status) 334 idfile = open(self.ids,'r') 335 job_id = None 336 for line in idfile.readlines(): 337 if line.strip().startswith('http'): 338 job_id = line.strip() 339 self.job_ids.append(job_id) 340 break 341 if not job_id: 342 logger.warn('could not parse job id from "%s"' % self.ids) 343 return job_id
344 345
346 - def CheckQ(self,db=None):
347 """ 348 Querie status of cluster or job on queue 349 """ 350 351 cmd = self.checkqueue_cmd 352 for id in self.job_ids: 353 cmd += " %s" % id 354 logger.debug(cmd) 355 pout = os.popen(cmd) 356 status = string.join(pout.readlines()) 357 pout.close() 358 return status
359
360 - def CleanQ(self,jobs=None):
361 """ 362 Check consistency of queue and remove jobs which shouldn't be there 363 """ 364 365 return 0
366
367 - def CheckJobStatus(self,jobs):
368 """ 369 Querie status of job on glite queue 370 """ 371 if isinstance(jobs,list): 372 job_list = jobs 373 else: 374 job_list = [jobs] 375 for job in job_list: 376 dataset_id = job.GetDatasetId() 377 queue_id = job.GetProcNum() 378 job_id = job.GetJobId() 379 status = "?" 380 381 if not job_id: 382 job.SetStatus('?') 383 384 cmd = self.checkqueue_cmd + ' ' + job_id 385 handle = os.popen(cmd, 'r') 386 387 for line in handle.readlines(): 388 line = line.strip() 389 if line.startswith('Current Status:'): 390 status = line[(line.find(':') + 1):].replace(' ','').split()[0] 391 handle.close() 392 logger.info("%s.%s (%s) : %s" %(dataset_id,queue_id,job_id,status.upper())) 393 394 if status.upper() in queued_state: 395 job.SetStatus('QUEUED') 396 elif status.upper() in running_state: 397 job.SetStatus('PROCESSING') 398 elif status.upper() in error_state: 399 job.SetStatus('FAILED') 400 elif status.upper() in finished_state: 401 job.SetStatus('FINISHED') 402 else: 403 job.SetStatus('?') 404 405 return 1
406 407
408 - def QRemove(self,jobid=None):
409 """ 410 Remove cluster or job from glite queue 411 """ 412 413 cmd = self.queue_rm_cmd + " --noint " 414 if jobid: cmd += " %s" % jobid 415 else: 416 for id in self.job_ids: cmd += " %s" % id 417 418 logger.debug(cmd) 419 handle = os.popen(cmd, 'r') 420 status = string.join(handle.readlines()) 421 handle.close() 422 423 if status.find('Job has already finished') != -1: 424 cmd = self.queue_rm_cmd + " --noint " 425 if jobid: cmd += " %s" % jobid 426 else: 427 for id in self.job_ids: cmd += " %s" % id 428 429 logger.debug(cmd) 430 handle = os.popen(cmd, 'r') 431 status = string.join(handle.readlines()) 432 handle.close() 433 434 return status
435 436
437 - def PostCopy(self,jobdict,target_url,maxtries=4):
438 """ 439 Interface: Remove active job/cluster from queuing system. 440 """ 441 if not jobdict['grid_queue_id']: return True 442 443 dataset = jobdict['dataset_id'] 444 proc = jobdict['queue_id'] 445 job_id = jobdict['grid_queue_id'] 446 submitdir = jobdict['submitdir'] 447 448 cmd = '%s --noint --dir %s %s' % (self.get_output_cmd,submitdir,job_id) 449 logger.debug(cmd) 450 if not os.system(cmd): 451 # job finished correctly. no need to remove from queue 452 jobdict['grid_queue_id'] = None 453 454 ### Need to add copy of lfn: files 455 inventoryfile = os.path.join(submitdir,iceprod.core.exe._inventory) 456 if not os.path.exists(inventoryfile): 457 logger.error("%d,%d: no output inventory found." % (dataset,proc)) 458 return self.CopyStatusEnum['NOTREADY'] 459 inventory = FileInventory() 460 inventorylist = [] 461 logger.debug( 'reading %s' % inventoryfile ) 462 inventory.Read(inventoryfile) 463 464 # copy data back to submit dir 465 for file in inventory.filelist: 466 467 jobdict['filename'] = os.path.basename(file['source']) 468 cmd = 'lcg-cp --vo icecube ' 469 cmd += ' -v %s ' % self.grid_storage 470 cmd += ' file:/%(submitdir)s/%(filename)s' 471 472 # delete files from temporary storage 473 if os.system(cmd % jobdict): 474 cmd = 'lcg-del --vo icecube ' 475 cmd += ' -a %s ' % self.grid_storage 476 os.system(cmd % jobdict) 477 478 return self.CopyStatusEnum['OK']
479 480
481 - def Clean(self,jobdict):
482 """ 483 remove job from queue 484 """ 485 if not jobdict['grid_queue_id']: return True 486 487 dataset = jobdict['dataset_id'] 488 proc = jobdict['queue_id'] 489 job_id = jobdict['grid_queue_id'] 490 submitdir = jobdict['submitdir'] 491 492 if job_id: 493 logger.info('removing job: %s' % job_id) 494 self.QRemove(job_id) 495 496 cmd = '%s --noint --dir %s %s' % (self.get_output_cmd,submitdir,job_id) 497 logger.debug(cmd) 498 os.system(cmd) 499 500 iGrid.Clean(self,jobdict)
501 502
503 -class Edg(gLite):
504 """ 505 This class represents a job or cluster on an egee grid. 506 """ 507
508 - def __init__(self):
509 gLite.__init__(self) 510 self.enqueue_cmd = "edg-job-submit" 511 self.checkqueue_cmd = "edg-job-status" 512 self.get_output_cmd = "edj-job-get-output" 513 self.queue_rm_cmd = "edg-job-cancel"
514 515
516 - def PostCopy(self,jobdict,target_url,maxtries=4):
517 """ 518 Interface: Remove active job/cluster from queuing system. 519 """ 520 521 dataset = jobdict['dataset_id'] 522 proc = jobdict['queue_id'] 523 job_id = jobdict['grid_queue_id'] 524 submitdir = jobdict['submitdir'] 525 526 cmd = '%s -dir %s/output %s' % (self.get_output_cmd,submitdir,job_id) 527 logger.debug(cmd) 528 os.system(cmd) 529 return True
530
531 - def Clean(self,jobdict):
532 """ 533 remove job from queue 534 """ 535 dataset = jobdict['dataset_id'] 536 proc = jobdict['queue_id'] 537 job_id = jobdict['grid_queue_id'] 538 submitdir = jobdict['submitdir'] 539 540 if job_id: 541 logger.info('removing job: %s' % job_id) 542 self.QRemove(job_id) 543 544 cmd = '%s -dir %s %s' % (self.get_output_cmd,submitdir,job_id) 545 logger.debug(cmd) 546 os.system(cmd) 547 iGrid.Clean(self,jobdict)
548