1
2
3
4 """
5 A basic submitfile for submitting and monitoring jobs to PBS.
6 This module implements only a small subset of PBS features.
7 It's interface is like that of condor however.
8 (http://www.cs.wisc.edu/condor)
9 Inherits from i3Queue
10
11 copyright (c) 2005 the icecube collaboration
12
13 @version: $Revision: $
14 @date: $Date: $
15 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
16 @todo: implement more functionality of pbs.
17 """
18
19 import os
20 import re
21 import sys
22 import math
23 import random
24 import dircache
25 import time
26 import string
27 import logging
28 import os.path
29 import getpass
30 import commands
31 from os import popen2
32 from iceprod.core import metadata
33 from iceprod.server.grid import iGrid
34 from iceprod.server.job import i3Job
35
36 logger = logging.getLogger('PBS')
37
38 pbs_status = {'Q':'QUEUED', 'R':'PROCESSING'}
39
44
46 """
47 This class represents a job or cluster on a pbs system.
48 """
49
51
52 iGrid.__init__(self)
53 self.proc = 0
54 self.sleeptime = 6
55 self.enqueue_cmd = "qsub"
56 self.checkqueue_cmd = "qstat -u %s " % getpass.getuser()
57 self.queue_rm_cmd = "qdel"
58 self.suffix = "pbs"
59
61 from random import choice
62 weighted_qlist = []
63 for q in queue_list:
64 if len(q.split()) > 1:
65 queue,weight = q.split()
66 else:
67 queue,weight = q,1
68 try:
69 weight = int(weight)
70 except Exception,e:
71 logger.error("Exception: " + str(e))
72 logger.warn("Unable to get queue weight for: " +q)
73 weight = 1
74 logger.debug("%s:%u " % (queue,weight))
75 weighted_qlist.extend([queue]*weight)
76 return choice(weighted_qlist)
77
79 """
80 Write pbs submit file to a file.
81 @param job: i3Job object
82 @param config_file: path to file were submit file will be written
83 """
84 logger.debug('WriteConfig')
85
86 if not job.GetExecutable():
87 raise Exception, "no executable configured"
88
89 submitfile = open("%s" % config_file,'w')
90 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile())
91 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile())
92
93 job.Write(submitfile,"#!/bin/sh")
94 job.Write(submitfile,"#PBS -o %s" % outfile )
95
96 job.Write(submitfile,"#PBS -e %s" % errfile )
97
98
99 queue_list = []
100 for key in self.GetParamKeys():
101 if not key.startswith("queue"):
102 job.Write(submitfile,"#PBS %s" % (self.GetParam(key)))
103 else:
104 queue_list.append(self.GetParam(key)[2:])
105 if queue_list:
106 chosen_queue = self._choose_queue(queue_list)
107 job.Write(submitfile,"#PBS -q %s" % chosen_queue)
108
109
110 for key,opt in job.GetBatchOpts().items():
111 job.Write(submitfile,"#PBS %s " % opt)
112
113
114 job.Write(submitfile, "export PBS_O_WORKDIR=%s",job.GetInitialdir())
115 job.Write(submitfile, "ARCH=`uname -m | sed -e 's/Power Macintosh/ppc/ ; s/i686/i386/'`")
116 job.Write(submitfile, "PVER=`uname -r | awk '{split($1,a,\".\"); print a[2]}'`")
117 job.Write(submitfile, "if [ $ARCH == 'i386' ]; then",parse=False)
118 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False)
119 job.Write(submitfile, " PLATFORM=Linux-i386")
120 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False)
121 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-i386")
122 job.Write(submitfile, " fi")
123 job.Write(submitfile, "elif [ $ARCH == 'x86_64' ]; then",parse=False)
124 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False)
125 job.Write(submitfile, " PLATFORM=Linux-x86_64")
126 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False)
127 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-x86_64")
128 job.Write(submitfile, " fi")
129 job.Write(submitfile, "fi")
130
131
132 for var in self.env.keys():
133 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False)
134 for var in job.env.keys():
135 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False)
136 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False)
137 job.Write(submitfile,"unset I3SIMPRODPATH")
138
139 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False)
140 job.Write(submitfile,"mkdir -p $RUNDIR")
141 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False)
142
143 for file in job.GetInputFiles()+[job.GetExecutable()]:
144 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False)
145
146 job.Write(submitfile,"cd $RUNDIR",parse=False)
147
148 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments()
149 argstr = job.GetMainScript() + " " + " ".join(argopts)
150 executable = os.path.basename(job.GetExecutable())
151 job.Write(submitfile,
152 "$PYROOT/bin/python %s %s 1>%s.tmp 2>%s.tmp" % (executable,argstr,outfile,errfile),
153 parse=False)
154 job.Write(submitfile, "cat %s.tmp>&1" % outfile, parse=False)
155 job.Write(submitfile, "cat %s.tmp>&2" % errfile, parse=False)
156
157 job.Write(submitfile,"rm -f wgetrc" )
158
159 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'):
160 job.Write(submitfile,"for file in *; do")
161 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir())
162 job.Write(submitfile," mv $file %s" % job.GetInitialdir())
163 job.Write(submitfile," fi; done")
164
165 job.Write(submitfile,"#clean directory")
166 job.Write(submitfile,"cd /tmp")
167 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False)
168
169 submitfile.close()
170
171
172 - def get_id(self,submit_status):
173 """
174 Parse string returned by condor on submission to extract the
175 id of the job cluster
176
177 @param submit_status: string returned by condor_submit
178 """
179 matches = re.findall(r'[0-9]+\.[0-9a-zA-Z_\-]*', submit_status)
180 logger.debug(submit_status)
181 if matches:
182 cluster_info = matches[0].split()
183 job_id = cluster_info[-1]
184
185 self.job_ids.append(job_id)
186 return job_id
187 else:
188 logger.warn('could not parse job id from "%s"' % submit_status)
189 return -1
190
192 """
193 Querie status of job on condor queue
194 """
195 if isinstance(jobs,list):
196 job_list = jobs
197 else:
198 job_list = [jobs]
199 for job in job_list:
200 job.SetStatus('?')
201 if job.GetJobId() < 0: continue
202 job_id = job.GetJobId()
203 cmd = "qstat -f %s " % job_id
204 status,output = commands.getstatusoutput(cmd)
205 if status in [153,39168]:
206 job.SetStatus('FINISHED')
207 elif status:
208 job.SetStatus('?')
209 logger.error("%s: %s: %s" % (cmd,status,output))
210 else:
211 for line in output.split('\n'):
212 line = line.strip()
213 if line.startswith('job_state'):
214 status = cstatus(line.split('=')[1].strip())
215 job.SetStatus(status)
216 if line.startswith('exec_host'):
217 host = line.split('=')[1].strip()
218 job.SetHost(host)
219 return 1
220
221
222
224 """
225 Querie status of cluster or job on condor queue
226 """
227
228 cmd = self.checkqueue_cmd
229 for id in self.job_ids:
230 cmd += " %s" % id
231 status,output = commands.getstatusoutput(cmd)
232 return output
233
234
236 """
237 Querie status of cluster or job on condor queue
238 """
239
240 if not jobs: return 0
241
242 if isinstance(jobs,list):
243 job_list = jobs
244 else:
245 job_list = [jobs]
246
247 job_dict = dict()
248 for job in job_list:
249 job_dict[job.GetJobId()] = job
250
251 cmd = self.checkqueue_cmd
252 status,output = commands.getstatusoutput(cmd)
253 if not status:
254 for line in output.split('\n')[2:]:
255 try:
256 tok = line.split()
257 jobId = tok[0]
258 user = tok[1]
259 queue = tok[2]
260 executable = tok[3]
261 sid = tok[4]
262 nds = tok[5]
263 tsk = tok[6]
264 memory = tok[7]
265 runtime = tok[8]
266 jobStatus = tok[9]
267 if executable.startswith("iceprod."):
268 if not job_dict.has_key(jobId):
269 logger.warn("removing job %s with status %s. Reason: job not found in list" % \
270 (jobId,cstatus(jobStatus)))
271 logger.debug("job list [%s]" % str(job_dict.keys()))
272 os.system("%s %s" % (self.queue_rm_cmd,jobId))
273 except Exception,e:
274 logger.error("%s:%s" %(e,line))
275 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
276 else:
277 logger.error(output)
278 return status
279
281 """
282 Remove cluster or job from queue
283 """
284 if not jobid: return "Unknown jobid. Cannot remove job."
285 cmd = "%s %s" % (self.queue_rm_cmd,jobid)
286 logger.info(cmd)
287
288 handle = os.popen(cmd, 'r')
289 status = string.join(handle.readlines())
290 logger.info(status)
291 handle.close()
292 return status
293