1
2
3
4 """
5 A basic submitfile for submitting and monitoring jobs to PBS.
6 This module implements only a small subset of PBS features.
7 It's interface is like that of condor however.
8 (http://www.cs.wisc.edu/condor)
9 Inherits from i3Queue
10
11 copyright (c) 2005 the icecube collaboration
12
13 @version: $Revision: $
14 @date: $Date: $
15 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
16 @todo: implement more functionality of pbs and condor.
17 """
18
19 import os
20 import re
21 import sys
22 import math
23 import random
24 import dircache
25 import time
26 import string
27 import os.path
28 import getpass
29 import commands
30 from iceprod.core import metadata
31 from iceprod.server.grid import iGrid
32 import logging
33
34 logger = logging.getLogger('Shell')
35
36 pbs_status = {'S':'PROCESSING','Q':'QUEUED', 'R':'PROCESSING'}
37
42
44 """
45 This class represents a job or cluster on a pbs system.
46 """
47
48 - def get_id(self,submit_status):
49 """
50 Parse string returned by condor on submission to extract the
51 id of the job cluster
52
53 @param submit_status: string returned by condor_submit
54 """
55 matches = re.findall("Job submitted with id [0-9]+", submit_status)
56 logger.debug(submit_status)
57 if matches:
58 cluster_info = matches[0].split()
59 job_id = cluster_info[-1]
60
61 self.job_ids.append(job_id)
62 return job_id
63 else:
64 logger.warn('could not parse job id from "%s"' % submit_status)
65 return -1
66
67
69
70 iGrid.__init__(self)
71 self.proc = 0
72 self.sleeptime = 6
73 self.enqueue_cmd = "shell.py"
74 self.checkqueue_cmd = "ps -f"
75 self.queue_rm_cmd = "kill -QUIT"
76 self.suffix = "sh"
77
78
80 """
81 Write pbs submit file to a file.
82 @param job: i3Job object
83 @param config_file: path to file were submit file will be written
84 """
85
86 if not job.GetExecutable():
87 raise Exception, "no executable configured"
88
89 submitfile = open("%s" % config_file,'w')
90 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile())
91 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile())
92 job.Write(submitfile,"#!/bin/sh")
93 job.Write(submitfile,"#PBS -o %s" % outfile )
94 job.Write(submitfile,"#PBS -e %s" % errfile )
95
96
97 for key in self.GetParamKeys():
98 job.Write(submitfile,"#%s" % (self.GetParam(key)))
99
100
101 for key,opt in job.GetBatchOpts().items():
102 job.Write(submitfile,"#%s " % opt)
103
104 job.Write(submitfile, "ARCH=`uname -m | sed -e 's/Power Macintosh/ppc/ ; s/i686/i386/'`")
105 job.Write(submitfile, "PVER=`uname -r | awk '{split($1,a,\".\"); print a[2]}'`")
106 job.Write(submitfile, "if [ $ARCH == 'i386' ]; then",parse=False)
107 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False)
108 job.Write(submitfile, " PLATFORM=Linux-i386")
109 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False)
110 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-i386")
111 job.Write(submitfile, " fi")
112 job.Write(submitfile, "elif [ $ARCH == 'x86_64' ]; then",parse=False)
113 job.Write(submitfile, " if [ $PVER == '4' ]; then",parse=False)
114 job.Write(submitfile, " PLATFORM=Linux-x86_64")
115 job.Write(submitfile, " elif [ $PVER == '6' ]; then",parse=False)
116 job.Write(submitfile, " PLATFORM=Linux-libstdc++6-x86_64")
117 job.Write(submitfile, " fi")
118 job.Write(submitfile, "fi")
119
120
121 for var in self.env.keys():
122 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False)
123 for var in job.env.keys():
124 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False)
125 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False)
126 job.Write(submitfile,"unset I3SIMPRODPATH")
127
128 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False)
129 job.Write(submitfile,"mkdir -p $RUNDIR")
130 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False)
131
132 for file in job.GetInputFiles()+[job.GetExecutable()]:
133 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False)
134
135 job.Write(submitfile,"cd $RUNDIR",parse=False)
136
137 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments()
138 argstr = job.GetMainScript() + " " + " ".join(argopts)
139 executable = os.path.basename(job.GetExecutable())
140 job.Write(submitfile,
141 "$PYROOT/bin/python %s %s 1>%s.tmp 2>%s.tmp" % (executable,argstr,outfile,errfile),
142 parse=False)
143 job.Write(submitfile, "cat %s.tmp>&1" % outfile, parse=False)
144 job.Write(submitfile, "cat %s.tmp>&2" % errfile, parse=False)
145
146 job.Write(submitfile,"rm -f wgetrc" )
147
148 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'):
149 job.Write(submitfile,"for file in *; do")
150 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir())
151 job.Write(submitfile," mv $file %s" % job.GetInitialdir())
152 job.Write(submitfile," fi; done")
153
154 job.Write(submitfile,"#clean directory")
155 job.Write(submitfile,"cd /tmp")
156 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False)
157
158 submitfile.close()
159
160
161
163 """
164 Querie status of cluster or job on condor queue
165 """
166
167 cmd = self.checkqueue_cmd
168 for id in self.job_ids:
169 cmd += " %s" % id
170 status,output = commands.getstatusoutput(cmd)
171 return output
172
174 """
175 Querie status of cluster or job on condor queue
176 """
177
178 if not jobs: return 0
179
180 job_dict = dict()
181 for job in jobs:
182 job_dict[job.GetJobId()] = job
183
184 cmd = self.checkqueue_cmd
185 status,output = commands.getstatusoutput(cmd)
186 if not status:
187 for line in output.split('\n')[2:]:
188 try:
189 tok = line.split()
190 jobId = tok[0]
191 tty = tok[1]
192 jobStatus = tok[2]
193 time = tok[3]
194 executable = tok[4]
195 if executable.startswith("iceprod."):
196 if job_dict.has_key(jobId):
197 logger.warn("removing job %s with status %s. Reason: job not found in list" % \
198 (jobId,cstatus(jobStatus)))
199 logger.debug("job list [%s]" % str(job_dict.keys()))
200 os.system("%s %s" % (self.queue_rm_cmd,jobId))
201 except Exception,e:
202 logger.error("%s:%s" %(e,line))
203 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
204 return status
205
206
208 """
209 Querie status of job on condor queue
210 """
211 if isinstance(jobs,list):
212 job_list = jobs
213 else:
214 job_list = [jobs]
215
216 for job in job_list:
217 job.SetStatus('FINISHED')
218 job_id = job.GetJobId()
219 if job_id < 0: return 0
220 cmd = "%s %s " % (self.checkqueue_cmd,job_id)
221 status,output = commands.getstatusoutput(cmd)
222 if status:
223 job.SetStatus('?')
224 for line in output.split('\n')[2:]:
225 try:
226 tok = line.split()
227 jobId = tok[0]
228 tty = tok[1]
229 jobStatus = tok[2]
230 time = tok[3]
231 command = tok[4]
232 logger.debug("jobid:%s" %jobId)
233 if jobId == job.GetJobId():
234 logger.debug("status for jobid %s is %s" %(jobId,jobStatus))
235 status = cstatus(jobStatus)
236 job.SetStatus(status)
237 break
238 except Exception,e:
239 logger.error("%s:%s" %(e,line))
240 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
241 return 1
242
244 """
245 Remove cluster or job from queue
246 """
247 if not jobid: return "Unknown jobid. Cannot remove job."
248 cmd = "%s %s" % (self.queue_rm_cmd,jobid)
249 logger.info(cmd)
250
251 handle = os.popen(cmd, 'r')
252 status = string.join(handle.readlines())
253 if status.strip():
254 logger.info(status.strip())
255 handle.close()
256 return status
257
258
260 usage = "usage: %prog [options]"
261
262 if len(args) < 1:
263 print >> sys.stderr,'no inputfile specified'
264
265 infile = args[1]
266 fdin = open(infile,'r')
267 outfile = infile + '.out'
268 errfile = infile + '.err'
269 for line in fdin.readlines():
270 if line.startswith('#? -o'):
271 outfile = line.replace('#? -o','').strip()
272 elif line.startswith('#? -e'):
273 errfile = line.replace('#? -e','').strip()
274 fdin.close()
275
276 chpid = os.fork()
277 if (chpid == 0):
278 os.setsid()
279
280 try:
281 import iceprod.procname
282 iceprod.procname.setprocname(os.path.basename(infile))
283 except ImportError,e:
284 print >> sys.stderr,"Could not import procname module. "
285 print >> sys.stderr,"Will not be able to set process name for job"
286
287 import resource
288 maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
289 if (maxfd == resource.RLIM_INFINITY): maxfd = MAXFD
290 for fd in range(0, maxfd):
291 try: os.close(fd)
292 except OSError: pass
293
294 outf = open(outfile, 'w+')
295 errf = open(errfile, 'w+')
296 os.dup2(errf.fileno(), 2)
297 os.dup2(outf.fileno(), 1)
298
299 os.execvp('/bin/sh', ['/bin/sh',infile])
300 else:
301 print "Job submitted with id %d" % chpid
302 sys.stdout.flush()
303 sys.stderr.flush()
304 os._exit(0)
305
306
307
308 if __name__ == '__main__':
309 main(sys.argv)
310