1
2
3
4 """
5 A basic submitfile for submitting and monitoring jobs to PBS.
6 This module implements only a small subset of PBS features.
7 It's interface is like that of condor however.
8 (http://www.cs.wisc.edu/condor)
9 Inherits from i3Queue
10
11 copyright (c) 2005 the icecube collaboration
12
13 @version: $Revision: $
14 @date: $Date: $
15 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
16 @todo: implement more functionality of pbs and condor.
17 """
18
19 import os
20 import re
21 import sys
22 import math
23 import random
24 import dircache
25 import time
26 import string
27 import os.path
28 import getpass
29 import commands
30 import thread,threading
31 from threading import Thread
32 from threading import Semaphore,BoundedSemaphore
33 from iceprod.core import metadata
34 from iceprod.server.grid import iGrid
35 import logging
36
37
39 """
40 This class represents a job or cluster on a pbs system.
41 """
42
44
45 iGrid.__init__(self,size)
46 self.logger = logging.getLogger('Shell')
47 self.proc = 0
48 self.sleeptime = 6
49 self.enqueue_cmd = "/bin/bash"
50 self.checkqueue_cmd = "ps -f"
51 self.queue_rm_cmd = "kill -QUIT"
52 self.suffix = "sh"
53 self.semaphore = Semaphore()
54 self.size = size
55
56 - def Run(self,gpu=0):
90
92 """
93 Submit job/cluster to Shell
94
95 @param job: i3Job object
96 @param config_file: path to file were submit file will be written
97 Submit job/cluster to Shell
98 """
99 for job in self.jobs: job.SetStatus(1)
100
101 hilos = []
102 for i in range(self.size):
103 try:
104 t = Thread(target=self.Run,args=(i,))
105 t.start()
106 hilos.append(t)
107 status = 0
108 except Exception,e:
109 self.logger.error("failed to start thread:%s",e)
110 apply(sys.excepthook,sys.exc_info())
111 status = 1
112
113 for t in hilos:
114 t.join()
115
116 return status,"started"
117
118
119
121 """
122 Write pbs submit file to a file.
123 @param job: i3Job object
124 @param config_file: path to file were submit file will be written
125 """
126
127 if not job.GetExecutable():
128 raise Exception, "no executable configured"
129
130 submitfile = open("%s" % config_file,'w')
131 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile())
132 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile())
133 job.Write(submitfile,"#!/bin/sh")
134
135
136 for key in self.GetParamKeys():
137 job.Write(submitfile,"#%s" % (self.GetParam(key)))
138
139
140 for key,opt in job.GetBatchOpts().items():
141 job.Write(submitfile,"#%s " % opt)
142
143
144 for var in self.env.keys():
145 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False)
146 for var in job.env.keys():
147 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False)
148 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False)
149 job.Write(submitfile,"unset I3SIMPRODPATH")
150
151 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False)
152 job.Write(submitfile,"mkdir -p $RUNDIR")
153 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False)
154
155 for file in job.GetInputFiles()+[job.GetExecutable()]:
156 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False)
157
158 job.Write(submitfile,"cd $RUNDIR",parse=False)
159
160 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments()
161 argstr = job.GetMainScript() + " " + " ".join(argopts)
162 executable = os.path.basename(job.GetExecutable())
163 job.Write(submitfile,
164 "$PYROOT/bin/python %s %s 1>%s.tmp 2>%s.tmp" % (executable,argstr,outfile,errfile),
165 parse=False)
166 job.Write(submitfile, "cat %s.tmp>&1" % outfile, parse=False)
167 job.Write(submitfile, "cat %s.tmp>&2" % errfile, parse=False)
168
169 job.Write(submitfile,"rm -f wgetrc" )
170
171 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'):
172 job.Write(submitfile,"for file in *; do")
173 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir())
174 job.Write(submitfile," mv $file %s" % job.GetInitialdir())
175 job.Write(submitfile," fi; done")
176
177 job.Write(submitfile,"#clean directory")
178 job.Write(submitfile,"cd /tmp")
179 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False)
180
181 submitfile.close()
182
183
184
186 """
187 Querie status of cluster or job on condor queue
188 """
189
190 cmd = self.checkqueue_cmd
191 for id in self.job_ids:
192 for t in threading.enumerate():
193 if t.getName() == id and t.isAlive():
194 return 'job %s is running' % t.getName()
195 return ''
196
198 """
199 Querie status of cluster or job on condor queue
200 """
201 for t in threading.enumerate():
202 if not t.isAlive():
203 self.logger("job %s is not running" % t.getName())
204
205
207 """
208 Querie status of job on condor queue
209 """
210 if isinstance(jobs,list):
211 job_list = jobs
212 else:
213 job_list = [jobs]
214
215 for job in job_list:
216 job.SetStatus('FINISHED')
217 job_id = job.GetJobId()
218 for t in threading.enumerate():
219 if t.getName() == job_id and t.isAlive():
220 job.SetStatus('PROCESSING')
221 break
222 return 1
223
225 """
226 Remove cluster or job from queue
227 """
228 if not job: return "Unknown jobid. Cannot remove job."
229 cmd = "%s %s" % (self.queue_rm_cmd,job.job_id)
230 self.logger.info(cmd)
231
232 handle = os.popen(cmd, 'r')
233 status = string.join(handle.readlines())
234 if status.strip():
235 self.logger.info(status.strip())
236 handle.close()
237 return status
238
239
241 usage = "usage: %prog [options]"
242
243 if len(args) < 1:
244 print >> sys.stderr,'no inputfile specified'
245
246 infile = args[1]
247 fdin = open(infile,'r')
248 outfile = infile + '.out'
249 errfile = infile + '.err'
250 for line in fdin.readlines():
251 if line.startswith('#? -o'):
252 outfile = line.replace('#? -o','').strip()
253 elif line.startswith('#? -e'):
254 errfile = line.replace('#? -e','').strip()
255 fdin.close()
256
257 chpid = os.fork()
258 if (chpid == 0):
259 os.setsid()
260
261 try:
262 import iceprod.procname
263 iceprod.procname.setprocname(os.path.basename(infile))
264 except ImportError,e:
265 print >> sys.stderr,"Could not import procname module. "
266 print >> sys.stderr,"Will not be able to set process name for job"
267
268 import resource
269 maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
270 if (maxfd == resource.RLIM_INFINITY): maxfd = MAXFD
271 for fd in range(0, maxfd):
272 try: os.close(fd)
273 except OSError: pass
274
275 outf = open(outfile, 'w+')
276 errf = open(errfile, 'w+')
277 os.dup2(errf.fileno(), 2)
278 os.dup2(outf.fileno(), 1)
279
280 os.execvp('/bin/sh', ['/bin/sh',infile])
281 else:
282 print "Job submitted with id %d" % chpid
283 sys.stdout.flush()
284 sys.stderr.flush()
285 os._exit(0)
286
287
288
289 if __name__ == '__main__':
290 main(sys.argv)
291