Package iceprod :: Package server :: Package plugins :: Module shell
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.plugins.shell

  1  #!/home/juancarlos/python-2.3/bin/python 
  2  # 
  3   
  4  """ 
  5   A basic submitfile for submitting and monitoring jobs to PBS.  
  6   This module implements only a small subset of PBS features. 
  7   It's interface is like that of condor however. 
  8   (http://www.cs.wisc.edu/condor)  
  9   Inherits from i3Queue 
 10   
 11   copyright  (c) 2005 the icecube collaboration 
 12   
 13   @version: $Revision: $ 
 14   @date: $Date: $ 
 15   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 16   @todo: implement more functionality of pbs and condor. 
 17  """ 
 18   
 19  import os 
 20  import re 
 21  import sys 
 22  import math 
 23  import random  
 24  import dircache 
 25  import time 
 26  import string 
 27  import os.path 
 28  import getpass 
 29  import commands 
 30  from iceprod.core import metadata 
 31  from iceprod.server.grid import iGrid 
 32  import logging 
 33   
 34  logger = logging.getLogger('Shell') 
 35   
 36  pbs_status = {'S':'PROCESSING','Q':'QUEUED', 'R':'PROCESSING'} 
 37   
38 -def cstatus(istatus):
39 if pbs_status.has_key(istatus): 40 return pbs_status[istatus] 41 return '?'
42
43 -class Shell(iGrid):
44 """ 45 This class represents a job or cluster on a pbs system. 46 """ 47
48 - def get_id(self,submit_status):
49 """ 50 Parse string returned by condor on submission to extract the 51 id of the job cluster 52 53 @param submit_status: string returned by condor_submit 54 """ 55 matches = re.findall("Job submitted with id [0-9]+", submit_status) 56 logger.debug(submit_status) 57 if matches: 58 cluster_info = matches[0].split() 59 job_id = cluster_info[-1] 60 61 self.job_ids.append(job_id) 62 return job_id 63 else: 64 logger.warn('could not parse job id from "%s"' % submit_status) 65 return -1
66 67
68 - def __init__(self):
69 70 iGrid.__init__(self) 71 self.proc = 0 72 self.sleeptime = 6 73 self.enqueue_cmd = "shell.py" 74 self.checkqueue_cmd = "ps -f" 75 self.queue_rm_cmd = "kill -QUIT" 76 self.suffix = "sh"
77 78
79 - def WriteConfig(self,job,config_file):
80 """ 81 Write pbs submit file to a file. 82 @param job: i3Job object 83 @param config_file: path to file were submit file will be written 84 """ 85 86 if not job.GetExecutable(): 87 raise Exception, "no executable configured" 88 89 submitfile = open("%s" % config_file,'w') 90 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile()) 91 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile()) 92 job.Write(submitfile,"#!/bin/sh") 93 job.Write(submitfile,"#PBS -o %s" % outfile ) 94 job.Write(submitfile,"#PBS -e %s" % errfile ) 95 96 # Add general batch options 97 for key in self.GetParamKeys(): 98 job.Write(submitfile,"#%s" % (self.GetParam(key))) 99 100 # Add job specific batch options 101 for key,opt in job.GetBatchOpts().items(): 102 job.Write(submitfile,"#%s " % opt) 103 104 job.Write(submitfile, "ARCH=`uname -m | sed -e 's/Power Macintosh/ppc/ ; s/i686/i386/'`") 105 job.Write(submitfile, "PVER=`uname -r | awk '{split($1,a,\".\"); print a[2]}'`") 106 job.Write(submitfile, "if [ $ARCH == 'i386' ]; then",parse=False) 107 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False) 108 job.Write(submitfile, " PLATFORM=Linux-i386") 109 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False) 110 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-i386") 111 job.Write(submitfile, " fi") 112 job.Write(submitfile, "elif [ $ARCH == 'x86_64' ]; then",parse=False) 113 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False) 114 job.Write(submitfile, " PLATFORM=Linux-x86_64") 115 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False) 116 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-x86_64") 117 job.Write(submitfile, " fi") 118 job.Write(submitfile, "fi") 119 120 #Export environment variable,value pairs 121 for var in self.env.keys(): 122 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False) 123 for var in job.env.keys(): 124 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False) 125 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False) 126 job.Write(submitfile,"unset I3SIMPRODPATH") 127 128 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 129 job.Write(submitfile,"mkdir -p $RUNDIR") 130 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 131 132 for file in job.GetInputFiles()+[job.GetExecutable()]: 133 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False) 134 135 job.Write(submitfile,"cd $RUNDIR",parse=False) 136 137 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 138 argstr = job.GetMainScript() + " " + " ".join(argopts) 139 executable = os.path.basename(job.GetExecutable()) 140 job.Write(submitfile, 141 "$PYROOT/bin/python %s %s 1>%s.tmp 2>%s.tmp" % (executable,argstr,outfile,errfile), 142 parse=False) 143 job.Write(submitfile, "cat %s.tmp>&1" % outfile, parse=False) 144 job.Write(submitfile, "cat %s.tmp>&2" % errfile, parse=False) 145 146 job.Write(submitfile,"rm -f wgetrc" ) 147 148 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'): 149 job.Write(submitfile,"for file in *; do") 150 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 151 job.Write(submitfile," mv $file %s" % job.GetInitialdir()) 152 job.Write(submitfile," fi; done") 153 154 job.Write(submitfile,"#clean directory") 155 job.Write(submitfile,"cd /tmp") 156 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False) 157 158 submitfile.close()
159 160 161
162 - def CheckQ(self,db=None):
163 """ 164 Querie status of cluster or job on condor queue 165 """ 166 167 cmd = self.checkqueue_cmd 168 for id in self.job_ids: 169 cmd += " %s" % id 170 status,output = commands.getstatusoutput(cmd) 171 return output
172
173 - def CleanQ(self,jobs=None):
174 """ 175 Querie status of cluster or job on condor queue 176 """ 177 178 if not jobs: return 0 179 180 job_dict = dict() 181 for job in jobs: 182 job_dict[job.GetJobId()] = job 183 184 cmd = self.checkqueue_cmd 185 status,output = commands.getstatusoutput(cmd) 186 if not status: 187 for line in output.split('\n')[2:]: # skip first two lines 188 try: 189 tok = line.split() 190 jobId = tok[0] 191 tty = tok[1] 192 jobStatus = tok[2] 193 time = tok[3] 194 executable = tok[4] 195 if executable.startswith("iceprod."): 196 if job_dict.has_key(jobId): 197 logger.warn("removing job %s with status %s. Reason: job not found in list" % \ 198 (jobId,cstatus(jobStatus))) 199 logger.debug("job list [%s]" % str(job_dict.keys())) 200 os.system("%s %s" % (self.queue_rm_cmd,jobId)) 201 except Exception,e: 202 logger.error("%s:%s" %(e,line)) 203 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 204 return status
205 206
207 - def CheckJobStatus(self,jobs):
208 """ 209 Querie status of job on condor queue 210 """ 211 if isinstance(jobs,list): 212 job_list = jobs 213 else: 214 job_list = [jobs] 215 216 for job in job_list: 217 job.SetStatus('FINISHED') 218 job_id = job.GetJobId() 219 if job_id < 0: return 0 220 cmd = "%s %s " % (self.checkqueue_cmd,job_id) 221 status,output = commands.getstatusoutput(cmd) 222 if status: 223 job.SetStatus('?') 224 for line in output.split('\n')[2:]: # skip first two lines 225 try: 226 tok = line.split() 227 jobId = tok[0] 228 tty = tok[1] 229 jobStatus = tok[2] 230 time = tok[3] 231 command = tok[4] 232 logger.debug("jobid:%s" %jobId) 233 if jobId == job.GetJobId(): 234 logger.debug("status for jobid %s is %s" %(jobId,jobStatus)) 235 status = cstatus(jobStatus) 236 job.SetStatus(status) 237 break 238 except Exception,e: 239 logger.error("%s:%s" %(e,line)) 240 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 241 return 1
242
243 - def QRemove(self,jobid):
244 """ 245 Remove cluster or job from queue 246 """ 247 if not jobid: return "Unknown jobid. Cannot remove job." 248 cmd = "%s %s" % (self.queue_rm_cmd,jobid) 249 logger.info(cmd) 250 251 handle = os.popen(cmd, 'r') 252 status = string.join(handle.readlines()) 253 if status.strip(): 254 logger.info(status.strip()) 255 handle.close() 256 return status
257 258
259 -def main(args):
260 usage = "usage: %prog [options]" 261 262 if len(args) < 1: 263 print >> sys.stderr,'no inputfile specified' 264 265 infile = args[1] 266 fdin = open(infile,'r') 267 outfile = infile + '.out' 268 errfile = infile + '.err' 269 for line in fdin.readlines(): 270 if line.startswith('#? -o'): 271 outfile = line.replace('#? -o','').strip() 272 elif line.startswith('#? -e'): 273 errfile = line.replace('#? -e','').strip() 274 fdin.close() 275 276 chpid = os.fork() 277 if (chpid == 0): 278 os.setsid() 279 # Change name of process for ps 280 try: 281 import iceprod.procname 282 iceprod.procname.setprocname(os.path.basename(infile)) 283 except ImportError,e: 284 print >> sys.stderr,"Could not import procname module. " 285 print >> sys.stderr,"Will not be able to set process name for job" 286 287 import resource # Resource usage information. 288 maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] 289 if (maxfd == resource.RLIM_INFINITY): maxfd = MAXFD 290 for fd in range(0, maxfd): 291 try: os.close(fd) 292 except OSError: pass 293 294 outf = open(outfile, 'w+') 295 errf = open(errfile, 'w+') 296 os.dup2(errf.fileno(), 2) # standard error (2) 297 os.dup2(outf.fileno(), 1) # standard output (1) 298 299 os.execvp('/bin/sh', ['/bin/sh',infile]) 300 else: 301 print "Job submitted with id %d" % chpid 302 sys.stdout.flush() 303 sys.stderr.flush() 304 os._exit(0)
305 306 307 308 if __name__ == '__main__': 309 main(sys.argv) 310