Package iceprod :: Package server :: Package plugins :: Module shell
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.plugins.shell

  1  #!/home/juancarlos/python-2.3/bin/python 
  2  # 
  3   
  4  """ 
  5   A basic submitfile for submitting and monitoring jobs to PBS.  
  6   This module implements only a small subset of PBS features. 
  7   It's interface is like that of condor however. 
  8   (http://www.cs.wisc.edu/condor)  
  9   Inherits from i3Queue 
 10   
 11   copyright  (c) 2005 the icecube collaboration 
 12   
 13   @version: $Revision: $ 
 14   @date: $Date: $ 
 15   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 16   @todo: implement more functionality of pbs and condor. 
 17  """ 
 18   
 19  import os 
 20  import re 
 21  import sys 
 22  import math 
 23  import random  
 24  import dircache 
 25  import time 
 26  import string 
 27  import os.path 
 28  import getpass 
 29  import commands 
 30  import thread,threading 
 31  from threading import Thread 
 32  from threading import Semaphore,BoundedSemaphore 
 33  from iceprod.core import metadata 
 34  from iceprod.server.grid import iGrid 
 35  import logging 
 36   
 37   
38 -class Shell(iGrid):
39 """ 40 This class represents a job or cluster on a pbs system. 41 """ 42
43 - def __init__(self,size=3):
44 45 iGrid.__init__(self,size) 46 self.logger = logging.getLogger('Shell') 47 self.proc = 0 48 self.sleeptime = 6 49 self.enqueue_cmd = "/bin/bash" 50 self.checkqueue_cmd = "ps -f" 51 self.queue_rm_cmd = "kill -QUIT" 52 self.suffix = "sh" 53 self.semaphore = Semaphore() 54 self.size = size
55
56 - def Run(self,gpu=0):
57 58 for job in self.jobs: 59 60 skip = True 61 self.semaphore.acquire() 62 if job.GetStatus() == 1: 63 job.SetStatus(2) 64 skip = False 65 self.semaphore.release() 66 if skip: 67 logger.info("skipping job %u.%u" %(job.GetDatasetId(),job.GetProcNum())) 68 continue 69 70 job.SetJobId("%06.2f" % time.time()) 71 job.AddArgOpt('gpu',gpu) 72 73 logger.info("processing job %u.%u" %(job.GetDatasetId(),job.GetProcNum())) 74 self.submit_status = '' 75 self.WriteConfig(job,job.config_file) 76 77 cmd = "%s %s" % (self.enqueue_cmd,job.config_file) 78 self.logger.info(cmd) 79 status,output = commands.getstatusoutput(cmd) 80 self.logger.info("Job exited with code %u" % status) 81 self.logger.info(output) 82 83 self.semaphore.acquire() 84 job.SetStatus(3) 85 self.semaphore.release() 86 87 thread.exit() 88 89 return status
90
91 - def Submit(self,cookie):
92 """ 93 Submit job/cluster to Shell 94 95 @param job: i3Job object 96 @param config_file: path to file were submit file will be written 97 Submit job/cluster to Shell 98 """ 99 for job in self.jobs: job.SetStatus(1) # queued 100 101 hilos = [] 102 for i in range(self.size): 103 try: 104 t = Thread(target=self.Run,args=(i,)) 105 t.start() 106 hilos.append(t) 107 status = 0 108 except Exception,e: 109 self.logger.error("failed to start thread:%s",e) 110 apply(sys.excepthook,sys.exc_info()) 111 status = 1 112 113 for t in hilos: 114 t.join() 115 116 return status,"started"
117 118 119
120 - def WriteConfig(self,job,config_file):
121 """ 122 Write pbs submit file to a file. 123 @param job: i3Job object 124 @param config_file: path to file were submit file will be written 125 """ 126 127 if not job.GetExecutable(): 128 raise Exception, "no executable configured" 129 130 submitfile = open("%s" % config_file,'w') 131 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile()) 132 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile()) 133 job.Write(submitfile,"#!/bin/sh") 134 135 # Add general batch options 136 for key in self.GetParamKeys(): 137 job.Write(submitfile,"#%s" % (self.GetParam(key))) 138 139 # Add job specific batch options 140 for key,opt in job.GetBatchOpts().items(): 141 job.Write(submitfile,"#%s " % opt) 142 143 #Export environment variable,value pairs 144 for var in self.env.keys(): 145 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False) 146 for var in job.env.keys(): 147 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False) 148 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False) 149 job.Write(submitfile,"unset I3SIMPRODPATH") 150 151 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 152 job.Write(submitfile,"mkdir -p $RUNDIR") 153 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 154 155 for file in job.GetInputFiles()+[job.GetExecutable()]: 156 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False) 157 158 job.Write(submitfile,"cd $RUNDIR",parse=False) 159 160 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 161 argstr = job.GetMainScript() + " " + " ".join(argopts) 162 executable = os.path.basename(job.GetExecutable()) 163 job.Write(submitfile, 164 "$PYROOT/bin/python %s %s 1>%s.tmp 2>%s.tmp" % (executable,argstr,outfile,errfile), 165 parse=False) 166 job.Write(submitfile, "cat %s.tmp>&1" % outfile, parse=False) 167 job.Write(submitfile, "cat %s.tmp>&2" % errfile, parse=False) 168 169 job.Write(submitfile,"rm -f wgetrc" ) 170 171 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'): 172 job.Write(submitfile,"for file in *; do") 173 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 174 job.Write(submitfile," mv $file %s" % job.GetInitialdir()) 175 job.Write(submitfile," fi; done") 176 177 job.Write(submitfile,"#clean directory") 178 job.Write(submitfile,"cd /tmp") 179 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False) 180 181 submitfile.close()
182 183 184
185 - def CheckQ(self,db=None):
186 """ 187 Querie status of cluster or job on condor queue 188 """ 189 190 cmd = self.checkqueue_cmd 191 for id in self.job_ids: 192 for t in threading.enumerate(): 193 if t.getName() == id and t.isAlive(): 194 return 'job %s is running' % t.getName() 195 return ''
196
197 - def CleanQ(self,jobs=None):
198 """ 199 Querie status of cluster or job on condor queue 200 """ 201 for t in threading.enumerate(): 202 if not t.isAlive(): 203 self.logger("job %s is not running" % t.getName())
204 205
206 - def CheckJobStatus(self,jobs):
207 """ 208 Querie status of job on condor queue 209 """ 210 if isinstance(jobs,list): 211 job_list = jobs 212 else: 213 job_list = [jobs] 214 215 for job in job_list: 216 job.SetStatus('FINISHED') 217 job_id = job.GetJobId() 218 for t in threading.enumerate(): 219 if t.getName() == job_id and t.isAlive(): 220 job.SetStatus('PROCESSING') 221 break 222 return 1
223
224 - def QRemove(self,job):
225 """ 226 Remove cluster or job from queue 227 """ 228 if not job: return "Unknown jobid. Cannot remove job." 229 cmd = "%s %s" % (self.queue_rm_cmd,job.job_id) 230 self.logger.info(cmd) 231 232 handle = os.popen(cmd, 'r') 233 status = string.join(handle.readlines()) 234 if status.strip(): 235 self.logger.info(status.strip()) 236 handle.close() 237 return status
238 239
240 -def main(args):
241 usage = "usage: %prog [options]" 242 243 if len(args) < 1: 244 print >> sys.stderr,'no inputfile specified' 245 246 infile = args[1] 247 fdin = open(infile,'r') 248 outfile = infile + '.out' 249 errfile = infile + '.err' 250 for line in fdin.readlines(): 251 if line.startswith('#? -o'): 252 outfile = line.replace('#? -o','').strip() 253 elif line.startswith('#? -e'): 254 errfile = line.replace('#? -e','').strip() 255 fdin.close() 256 257 chpid = os.fork() 258 if (chpid == 0): 259 os.setsid() 260 # Change name of process for ps 261 try: 262 import iceprod.procname 263 iceprod.procname.setprocname(os.path.basename(infile)) 264 except ImportError,e: 265 print >> sys.stderr,"Could not import procname module. " 266 print >> sys.stderr,"Will not be able to set process name for job" 267 268 import resource # Resource usage information. 269 maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] 270 if (maxfd == resource.RLIM_INFINITY): maxfd = MAXFD 271 for fd in range(0, maxfd): 272 try: os.close(fd) 273 except OSError: pass 274 275 outf = open(outfile, 'w+') 276 errf = open(errfile, 'w+') 277 os.dup2(errf.fileno(), 2) # standard error (2) 278 os.dup2(outf.fileno(), 1) # standard output (1) 279 280 os.execvp('/bin/sh', ['/bin/sh',infile]) 281 else: 282 print "Job submitted with id %d" % chpid 283 sys.stdout.flush() 284 sys.stderr.flush() 285 os._exit(0)
286 287 288 289 if __name__ == '__main__': 290 main(sys.argv) 291