1
2
3
4 """
5 A basic submitfile for submitting and monitoring jobs to SGE.
6 This module implements only a small subset of SGE features.
7 It's interface is like that of condor however.
8 Inherits from i3Queue
9
10 copyright (c) 2005 the icecube collaboration
11
12 @version: $Revision: $
13 @date: $Date: $
14 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
15 @todo: implement more functionality of sge.
16 """
17
18 import os
19 import re
20 import sys
21 import math
22 import random
23 import dircache
24 import time
25 import string
26 import logging
27 import os.path
28 import getpass
29 import commands
30 from os import popen2
31 from iceprod.core import metadata
32 from iceprod.server.grid import iGrid
33 from iceprod.server.job import i3Job
34
35 logger = logging.getLogger('SGE')
36
37 sge_status = {'qw':'QUEUED', 'r':'PROCESSING'}
38
43
45 """
46 This class represents a job or cluster on a sge system.
47 """
48
50
51 iGrid.__init__(self)
52 self.sleeptime = 30
53 self.enqueue_cmd = "qsub"
54 self.checkqueue_cmd = "qstat -u %s " % getpass.getuser()
55 self.queue_rm_cmd = "qdel"
56 self.suffix = "sge"
57 logger.debug('Made a SGE(iGrid)')
58
59
61 """
62 Write sge submit file to a file.
63 @param job: i3Job object
64 @param config_file: path to file were submit file will be written
65 """
66 logger.debug('WriteConfig')
67
68 if not job.GetExecutable():
69 raise Exception, "no executable configured"
70
71 submitfile = open("%s" % config_file,'w')
72 outfile = os.path.join(job.GetInitialdir(),job.GetOutputFile())
73 errfile = os.path.join(job.GetInitialdir(),job.GetErrorFile())
74
75 job.Write(submitfile,"#!/bin/sh")
76 logger.debug("#!/bin/sh")
77
78 job.Write(submitfile,"#$ -o %s" % outfile)
79 logger.debug("#$ -o %s" % outfile)
80
81 job.Write(submitfile,"#$ -e %s" % errfile )
82 logger.debug("#$ -e %s" % errfile )
83
84
85 for key in self.GetParamKeys():
86 job.Write(submitfile,"#$ %s" % self.GetParam(key),parse=True)
87 logger.debug("#$ %s" % self.GetParam(key))
88
89
90 for key,opt in job.GetBatchOpts().items():
91 job.Write(submitfile,"#$ %s" % opt,parse=True)
92 logger.debug("#$ %s" % opt)
93
94
95 for var in self.env.keys():
96 job.Write(submitfile,"export %s=%s" % (var,self.env[var]),parse=False)
97 for var in job.env.keys():
98 job.Write(submitfile,"export %s=%s" % (var,job.env[var]),parse=False)
99 job.Write(submitfile,"export PYTHONPATH=$PYROOT/lib/python2.3/site-packages:$PYTHONPATH",parse=False)
100 job.Write(submitfile,"unset I3SIMPRODPATH")
101 job.Write(submitfile,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False)
102 job.Write(submitfile,"mkdir -p $RUNDIR",parse=False)
103 logger.debug("RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()))
104 logger.debug("mkdir -p $RUNDIR")
105
106 job.Write(submitfile,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False)
107 logger.debug("echo \"running on $HOSTNAME:$RUNDIR\"")
108
109 logger.debug('%d' %len(job.GetInputFiles()))
110 for file in job.GetInputFiles()+[job.GetExecutable()]:
111 logger.debug('%s' %file)
112 job.Write(submitfile,"cp %s $RUNDIR" % file,parse=False)
113 job.Write(submitfile,"cd $RUNDIR",parse=False)
114
115 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments()
116 argstr = job.GetMainScript() + " " + " ".join(argopts)
117 executable = os.path.basename(job.GetExecutable())
118 logger.debug('executable: %s' % job.GetExecutable())
119 logger.debug('main script: %s' % job.GetMainScript())
120 logger.debug('args options: %s' % argopts)
121 logger.debug('arguments: %s' % job.GetArguments())
122 job.Write(submitfile, "$PYROOT/bin/python %s %s" % (executable, argstr), parse=False)
123 job.Write(submitfile, 'echo "job exited with status $?";',parse=False)
124
125 if not self.GetArgOpt('stageout') or job.GetArgOpt('debug'):
126 job.Write(submitfile,"for file in *; do")
127 job.Write(submitfile," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir())
128 job.Write(submitfile," mv $file %s" % job.GetInitialdir())
129 job.Write(submitfile," fi; done")
130
131 job.Write(submitfile,"#clean directory")
132 job.Write(submitfile,"cd /tmp")
133 job.Write(submitfile,"rm -rf $RUNDIR ",parse=False)
134 logger.debug('Submit file written')
135 submitfile.close();
136
137 - def get_id(self,submit_status):
138 """
139 Parse string returned by condor on submission to extract the
140 id of the job cluster
141
142 @param submit_status: string returned by condor_submit
143 """
144 matches = re.findall(r'Your job [0-9]+', submit_status)
145 if matches:
146 cluster_info = matches[0].split()
147 job_id = cluster_info[-1]
148
149 self.job_ids.append(job_id)
150 return job_id
151 else:
152 logger.warn('could not parse job id from "%s"' % submit_status)
153 return -1
154
156 """
157 Querie status of job on condor queue
158 """
159 if isinstance(jobs,list):
160 job_list = jobs
161 else:
162 job_list = [jobs]
163
164 job_dict = {}
165 for job in job_list:
166 if job.GetJobId() < 0: continue
167 job_dict[job.GetJobId()] = job
168 job.SetStatus('FINISHED')
169
170 cmd = self.checkqueue_cmd
171 logger.debug(cmd)
172 retval,output = commands.getstatusoutput(cmd)
173
174 if retval:
175 for job in job_list: job.SetStatus('?')
176 return retval
177
178 for line in output.split('\n')[2:]:
179 try:
180 tok = line.split()
181 jobId = tok[0]
182 prio = tok[1]
183 name = tok[2]
184 user = tok[3]
185 jobStatus = tok[4]
186 runtime = tok[5]
187 queue = tok[6]
188 logger.debug("jobid:%s" %jobId)
189 if jobId in job_dict.keys():
190 logger.debug("status for jobid %s is %s" %(jobId,jobStatus))
191 status = cstatus(jobStatus)
192 job_dict[jobId].SetStatus(status)
193 except Exception,e:
194 logger.error("%s:%s" %(e,line))
195 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
196 break
197 return 1
198
199
201 """
202 Querie status of cluster or job on condor queue
203 """
204
205 cmd = self.checkqueue_cmd
206 status,output = commands.getstatusoutput(cmd)
207 return output
208
209
211 """
212 Querie status of cluster or job on condor queue
213 """
214
215 if not jobs: return 0
216
217 if isinstance(jobs,list):
218 job_list = jobs
219 else:
220 job_list = [jobs]
221 job_dict = dict()
222 for job in job_list:
223 job_dict[job.GetJobId()] = job
224
225 cmd = self.checkqueue_cmd
226 status,output = commands.getstatusoutput(cmd)
227 if not status:
228 for line in output.split('\n')[2:]:
229 try:
230 tok = line.split()
231 jobId = tok[0]
232 prio = tok[1]
233 name = tok[2]
234 user = tok[3]
235 jobStatus = tok[4]
236 runtime = tok[5]
237 queue = tok[6]
238 if executable.startswith("iceprod."):
239 if not job_dict.has_key(jobId):
240 logger.warn("removing job %s with status %s. Reason: job not found in list" % \
241 (jobId,cstatus(jobStatus)))
242 logger.debug("job list [%s]" % str(job_dict.keys()))
243 os.system("%s %s" % (self.queue_rm_cmd,jobId))
244 except Exception,e:
245 logger.error("%s:%s" %(e,line))
246 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
247 return status
248
249
251 """
252 Remove cluster or job from queue
253 """
254 if isinstance(job,i3Job) and job.GetStatus() == "FINISHED":
255 return 0
256
257 cmd = "%s %s" % (self.queue_rm_cmd,job)
258 status,output = commands.getstatusoutput(cmd)
259 return status
260