Package iceprod :: Package server :: Package plugins :: Module pbs
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.plugins.pbs

  1  #!/bin/env python 
  2  # 
  3   
  4  """ 
  5   A basic submitfile for submitting and monitoring jobs to PBS.  
  6   This module implements only a small subset of PBS features. 
  7   It's interface is like that of condor however. 
  8   (http://www.cs.wisc.edu/condor)  
  9   Inherits from i3Queue 
 10   
 11   copyright  (c) 2005 the icecube collaboration 
 12   
 13   @version: $Revision: $ 
 14   @date: $Date: $ 
 15   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 16   @todo: implement more functionality of pbs. 
 17  """ 
 18   
 19  import os 
 20  import re 
 21  import sys 
 22  import math 
 23  import random  
 24  import dircache 
 25  import time 
 26  import string 
 27  import logging 
 28  import os.path 
 29  import getpass 
 30  import commands 
 31  from os import popen2 
 32  from iceprod.core import metadata 
 33  from iceprod.server.grid import iGrid 
 34  from iceprod.server.job  import i3Job 
 35   
 36  logger = logging.getLogger('PBS') 
 37   
 38  pbs_status = {'Q':'QUEUED', 'R':'PROCESSING'} 
 39   
40 -def cstatus(istatus):
41 if pbs_status.has_key(istatus): 42 return pbs_status[istatus] 43 return 'FINISHED'
44
45 -class Pbs(iGrid):
46 """ 47 This class represents a job or cluster on a pbs system. 48 """ 49
50 - def __init__(self):
51 52 iGrid.__init__(self) 53 self.proc = 0 54 self.sleeptime = 6 55 self.enqueue_cmd = "qsub" 56 self.checkqueue_cmd = "qstat -u %s " % getpass.getuser() 57 self.queue_rm_cmd = "qdel" 58 self.suffix = "pbs"
59
60 - def _choose_queue(self,queue_list):
61 from random import choice 62 weighted_qlist = [] 63 for q in queue_list: 64 if len(q.split()) > 1: 65 queue,weight = q.split() 66 else: 67 queue,weight = q,1 68 try: 69 weight = int(weight) 70 except Exception,e: 71 logger.error("Exception: " + str(e)) 72 logger.warn("Unable to get queue weight for: " +q) 73 weight = 1 74 logger.debug("%s:%u " % (queue,weight)) 75 weighted_qlist.extend([queue]*weight) 76 return choice(weighted_qlist)
77
78 - def WriteConfig(self,job,config_file):
79 """ 80 Write pbs submit file to a file. 81 @param job: i3Job object 82 @param config_file: path to file were submit file will be written 83 """ 84 logger.debug('WriteConfig') 85 86 if not job.GetExecutable(): 87 raise Exception, "no executable configured" 88 89 submitfile = open("%s" % config_file,'w') 90 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile()) 91 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile()) 92 93 job.Write(submitfile,"#!/bin/sh") 94 job.Write(submitfile,"#PBS -o %s" % outfile ) 95 96 job.Write(submitfile,"#PBS -e %s" % errfile ) 97 98 # Add general batch options 99 queue_list = [] 100 for key in self.GetParamKeys(): 101 if not key.startswith("queue"): 102 job.Write(submitfile,"#PBS %s" % (self.GetParam(key))) 103 else: 104 queue_list.append(self.GetParam(key)[2:]) 105 if queue_list: 106 chosen_queue = self._choose_queue(queue_list) 107 job.Write(submitfile,"#PBS -q %s" % chosen_queue) 108 109 # Add job specific batch options 110 for key,opt in job.GetBatchOpts().items(): 111 job.Write(submitfile,"#PBS %s " % opt) 112 113 114 job.Write(submitfile, "export PBS_O_WORKDIR=%s",job.GetInitialdir()) 115 job.Write(submitfile, "ARCH=`uname -m | sed -e 's/Power Macintosh/ppc/ ; s/i686/i386/'`") 116 job.Write(submitfile, "PVER=`uname -r | awk '{split($1,a,\".\"); print a[2]}'`") 117 job.Write(submitfile, "if [ $ARCH == 'i386' ]; then",parse=False) 118 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False) 119 job.Write(submitfile, " PLATFORM=Linux-i386") 120 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False) 121 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-i386") 122 job.Write(submitfile, " fi") 123 job.Write(submitfile, "elif [ $ARCH == 'x86_64' ]; then",parse=False) 124 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False) 125 job.Write(submitfile, " PLATFORM=Linux-x86_64") 126 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False) 127 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-x86_64") 128 job.Write(submitfile, " fi") 129 job.Write(submitfile, "fi") 130 131 #Export environment variable,value pairs 132 for var in self.env.keys(): 133 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False) 134 for var in job.env.keys(): 135 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False) 136 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False) 137 job.Write(submitfile,"unset I3SIMPRODPATH") 138 139 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False) 140 job.Write(submitfile,"mkdir -p $RUNDIR") 141 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False) 142 143 for file in job.GetInputFiles()+[job.GetExecutable()]: 144 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False) 145 146 job.Write(submitfile,"cd $RUNDIR",parse=False) 147 148 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 149 argstr = job.GetMainScript() + " " + " ".join(argopts) 150 executable = os.path.basename(job.GetExecutable()) 151 job.Write(submitfile, 152 "$PYROOT/bin/python %s %s 1>%s.tmp 2>%s.tmp" % (executable,argstr,outfile,errfile), 153 parse=False) 154 job.Write(submitfile, "cat %s.tmp>&1" % outfile, parse=False) 155 job.Write(submitfile, "cat %s.tmp>&2" % errfile, parse=False) 156 157 job.Write(submitfile,"rm -f wgetrc" ) 158 159 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'): 160 job.Write(submitfile,"for file in *; do") 161 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir()) 162 job.Write(submitfile," mv $file %s" % job.GetInitialdir()) 163 job.Write(submitfile," fi; done") 164 165 job.Write(submitfile,"#clean directory") 166 job.Write(submitfile,"cd /tmp") 167 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False) 168 169 submitfile.close()
170 171
172 - def get_id(self,submit_status):
173 """ 174 Parse string returned by condor on submission to extract the 175 id of the job cluster 176 177 @param submit_status: string returned by condor_submit 178 """ 179 matches = re.findall(r'[0-9]+\.[0-9a-zA-Z_\-]*', submit_status) 180 logger.debug(submit_status) 181 if matches: 182 cluster_info = matches[0].split() 183 job_id = cluster_info[-1] 184 185 self.job_ids.append(job_id) 186 return job_id 187 else: 188 logger.warn('could not parse job id from "%s"' % submit_status) 189 return -1
190
191 - def CheckJobStatus(self,jobs):
192 """ 193 Querie status of job on condor queue 194 """ 195 if isinstance(jobs,list): 196 job_list = jobs 197 else: 198 job_list = [jobs] 199 for job in job_list: 200 job.SetStatus('?') 201 if job.GetJobId() < 0: continue 202 job_id = job.GetJobId() 203 cmd = "qstat -f %s " % job_id 204 status,output = commands.getstatusoutput(cmd) 205 if status in [153,39168]: 206 job.SetStatus('FINISHED') 207 elif status: 208 job.SetStatus('?') 209 logger.error("%s: %s: %s" % (cmd,status,output)) 210 else: 211 for line in output.split('\n'): # skip first two lines 212 line = line.strip() 213 if line.startswith('job_state'): 214 status = cstatus(line.split('=')[1].strip()) 215 job.SetStatus(status) 216 if line.startswith('exec_host'): 217 host = line.split('=')[1].strip() 218 job.SetHost(host) 219 return 1
220 221 222
223 - def CheckQ(self,db=None):
224 """ 225 Querie status of cluster or job on condor queue 226 """ 227 228 cmd = self.checkqueue_cmd 229 for id in self.job_ids: 230 cmd += " %s" % id 231 status,output = commands.getstatusoutput(cmd) 232 return output
233 234
235 - def CleanQ(self,jobs=None):
236 """ 237 Querie status of cluster or job on condor queue 238 """ 239 240 if not jobs: return 0 241 242 if isinstance(jobs,list): 243 job_list = jobs 244 else: 245 job_list = [jobs] 246 247 job_dict = dict() 248 for job in job_list: 249 job_dict[job.GetJobId()] = job 250 251 cmd = self.checkqueue_cmd 252 status,output = commands.getstatusoutput(cmd) 253 if not status: 254 for line in output.split('\n')[2:]: # skip first two lines 255 try: 256 tok = line.split() 257 jobId = tok[0] 258 user = tok[1] 259 queue = tok[2] 260 executable = tok[3] 261 sid = tok[4] 262 nds = tok[5] 263 tsk = tok[6] 264 memory = tok[7] 265 runtime = tok[8] 266 jobStatus = tok[9] 267 if executable.startswith("iceprod."): 268 if not job_dict.has_key(jobId): 269 logger.warn("removing job %s with status %s. Reason: job not found in list" % \ 270 (jobId,cstatus(jobStatus))) 271 logger.debug("job list [%s]" % str(job_dict.keys())) 272 os.system("%s %s" % (self.queue_rm_cmd,jobId)) 273 except Exception,e: 274 logger.error("%s:%s" %(e,line)) 275 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 276 else: 277 logger.error(output) 278 return status
279
280 - def QRemove(self,jobid):
281 """ 282 Remove cluster or job from queue 283 """ 284 if not jobid: return "Unknown jobid. Cannot remove job." 285 cmd = "%s %s" % (self.queue_rm_cmd,jobid) 286 logger.info(cmd) 287 288 handle = os.popen(cmd, 'r') 289 status = string.join(handle.readlines()) 290 logger.info(status) 291 handle.close() 292 return status
293