Package iceprod :: Package server :: Package plugins :: Module sge
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.plugins.sge

  1  #!/bin/env python 
  2  # 
  3   
  4  """ 
  5   A basic submitfile for submitting and monitoring jobs to SGE.  
  6   This module implements only a small subset of SGE features. 
  7   It's interface is like that of condor however. 
  8   Inherits from i3Queue 
  9   
 10   copyright  (c) 2005 the icecube collaboration 
 11   
 12   @version: $Revision: $ 
 13   @date: $Date: $ 
 14   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 15   @todo: implement more functionality of sge. 
 16  """ 
 17   
 18  import os 
 19  import re 
 20  import sys 
 21  import math 
 22  import random  
 23  import dircache 
 24  import time 
 25  import string 
 26  import logging 
 27  import os.path 
 28  import getpass 
 29  import commands 
 30  from os import popen2 
 31  from iceprod.core import metadata 
 32  from iceprod.server.grid import iGrid 
 33  from iceprod.server.job  import i3Job 
 34   
 35  logger = logging.getLogger('SGE') 
 36   
 37  sge_status = {'qw':'QUEUED', 'r':'PROCESSING'} 
 38   
39 -def cstatus(istatus):
40 if sge_status.has_key(istatus): 41 return sge_status[istatus] 42 return 'FINISHED'
43
44 -class SGE(iGrid):
45 """ 46 This class represents a job or cluster on a sge system. 47 """ 48
49 - def __init__(self):
50 51 iGrid.__init__(self) 52 self.sleeptime = 30 53 self.enqueue_cmd = "qsub" 54 self.checkqueue_cmd = "qstat -u %s " % getpass.getuser() 55 self.queue_rm_cmd = "qdel" 56 self.suffix = "sge" 57 logger.debug('Made a SGE(iGrid)')
58 59
60 - def WriteConfig(self,job,config_file):
61 """ 62 Write sge submit file to a file. 63 @param job: i3Job object 64 @param config_file: path to file were submit file will be written 65 """ 66 logger.debug('WriteConfig') 67 68 if not job.GetExecutable(): 69 raise Exception, "no executable configured" 70 71 submitfile = open("%s" % config_file,'w') 72 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile()) 73 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile()) 74 75 job.Write(submitfile,"#!/bin/sh") 76 logger.debug("#!/bin/sh") 77 78 job.Write(submitfile,"#$ -o %s" % outfile) 79 logger.debug("#$ -o %s" % outfile) 80 81 job.Write(submitfile,"#$ -e %s" % errfile ) 82 logger.debug("#$ -e %s" % errfile ) 83 84 # Add general batch options 85 for key in self.GetParamKeys(): 86 job.Write(submitfile,"#$ %s" % self.GetParam(key),parse=True) 87 logger.debug("#$ %s" % self.GetParam(key)) 88 89 # Add job specific batch options 90 for key,opt in job.GetBatchOpts().items(): 91 job.Write(submitfile,"#$ %s" % opt,parse=True) 92 logger.debug("#$ %s" % opt) 93 94 #Export environment variable,value pairs 95 for var in self.env.keys(): 96 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False) 97 for var in job.env.keys(): 98 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False) 99 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False) 100 job.Write(submitfile,"unset I3SIMPRODPATH") 101 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 102 job.Write(submitfile,"mkdir -p $RUNDIR",parse=False) 103 logger.debug("RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time())) 104 logger.debug("mkdir -p $RUNDIR") 105 106 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 107 logger.debug("echo \"running on $HOSTNAME:$RUNDIR\"") 108 109 logger.debug('%d' %len(job.GetInputFiles())) 110 for file in job.GetInputFiles()+[job.GetExecutable()]: 111 logger.debug('%s' %file) 112 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False) 113 job.Write(submitfile,"cd $RUNDIR",parse=False) 114 115 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 116 argstr = job.GetMainScript() + " " + " ".join(argopts) 117 executable = os.path.basename(job.GetExecutable()) 118 logger.debug('executable: %s' % job.GetExecutable()) 119 logger.debug('main script: %s' % job.GetMainScript()) 120 logger.debug('args options: %s' % argopts) 121 logger.debug('arguments: %s' % job.GetArguments()) 122 job.Write(submitfile, "$PYROOT/bin/python %s %s" % (executable, argstr), parse=False) 123 job.Write(submitfile, 'echo "job exited with status $?";',parse=False) 124 125 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'): 126 job.Write(submitfile,"for file in *; do") 127 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 128 job.Write(submitfile," mv $file %s" % job.GetInitialdir()) 129 job.Write(submitfile," fi; done") 130 131 job.Write(submitfile,"#clean directory") 132 job.Write(submitfile,"cd /tmp") 133 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False) 134 logger.debug('Submit file written') 135 submitfile.close();
136
137 - def get_id(self,submit_status):
138 """ 139 Parse string returned by condor on submission to extract the 140 id of the job cluster 141 142 @param submit_status: string returned by condor_submit 143 """ 144 matches = re.findall(r'Your job [0-9]+', submit_status) 145 if matches: 146 cluster_info = matches[0].split() 147 job_id = cluster_info[-1] 148 149 self.job_ids.append(job_id) 150 return job_id 151 else: 152 logger.warn('could not parse job id from "%s"' % submit_status) 153 return -1
154
155 - def CheckJobStatus(self,jobs):
156 """ 157 Querie status of job on condor queue 158 """ 159 if isinstance(jobs,list): 160 job_list = jobs 161 else: 162 job_list = [jobs] 163 164 job_dict = {} 165 for job in job_list: # initialize status to FINISHED 166 if job.GetJobId() < 0: continue 167 job_dict[job.GetJobId()] = job 168 job.SetStatus('FINISHED') 169 170 cmd = self.checkqueue_cmd 171 logger.debug(cmd) 172 retval,output = commands.getstatusoutput(cmd) 173 174 if retval: # failed to get status from sge 175 for job in job_list: job.SetStatus('?') 176 return retval 177 178 for line in output.split('\n')[2:]: # skip first two lines 179 try: 180 tok = line.split() 181 jobId = tok[0] 182 prio = tok[1] 183 name = tok[2] 184 user = tok[3] 185 jobStatus = tok[4] 186 runtime = tok[5] 187 queue = tok[6] 188 logger.debug("jobid:%s" %jobId) 189 if jobId in job_dict.keys(): 190 logger.debug("status for jobid %s is %s" %(jobId,jobStatus)) 191 status = cstatus(jobStatus) 192 job_dict[jobId].SetStatus(status) 193 except Exception,e: 194 logger.error("%s:%s" %(e,line)) 195 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 196 break 197 return 1
198 199
200 - def CheckQ(self,db=None):
201 """ 202 Querie status of cluster or job on condor queue 203 """ 204 205 cmd = self.checkqueue_cmd 206 status,output = commands.getstatusoutput(cmd) 207 return output
208 209
210 - def CleanQ(self,jobs=None):
211 """ 212 Querie status of cluster or job on condor queue 213 """ 214 215 if not jobs: return 0 216 217 if isinstance(jobs,list): 218 job_list = jobs 219 else: 220 job_list = [jobs] 221 job_dict = dict() 222 for job in job_list: 223 job_dict[job.GetJobId()] = job 224 225 cmd = self.checkqueue_cmd 226 status,output = commands.getstatusoutput(cmd) 227 if not status: 228 for line in output.split('\n')[2:]: # skip first two lines 229 try: 230 tok = line.split() 231 jobId = tok[0] 232 prio = tok[1] 233 name = tok[2] 234 user = tok[3] 235 jobStatus = tok[4] 236 runtime = tok[5] 237 queue = tok[6] 238 if executable.startswith("iceprod."): 239 if not job_dict.has_key(jobId): 240 logger.warn("removing job %s with status %s. Reason: job not found in list" % \ 241 (jobId,cstatus(jobStatus))) 242 logger.debug("job list [%s]" % str(job_dict.keys())) 243 os.system("%s %s" % (self.queue_rm_cmd,jobId)) 244 except Exception,e: 245 logger.error("%s:%s" %(e,line)) 246 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 247 return status
248 249
250 - def QRemove(self,job):
251 """ 252 Remove cluster or job from queue 253 """ 254 if isinstance(job,i3Job) and job.GetStatus() == "FINISHED": 255 return 0 # no need to remove something that is not there 256 257 cmd = "%s %s" % (self.queue_rm_cmd,job) 258 status,output = commands.getstatusoutput(cmd) 259 return status
260