1
2
3
4 """
5 A basic wrapper for submitting and monitoring jobs to SweGrid.
6 Inherits from i3Queue
7
8 copyright (c) 2005 the icecube collaboration
9
10 @version: $Revision: $
11 @date: $Date: $
12 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>,
13 Henrik Johansson <henjoh@physto.se>
14 """
15
16 import os
17 import re
18 import sys
19 import math
20 import dircache
21 import commands
22 import time
23 import string
24 import shutil
25 import ConfigParser
26 import logging
27 from iceprod.core import metadata
28 from iceprod.core.dataclasses import Steering
29 from iceprod.server.grid import iGrid
30
31 logger = logging.getLogger('SweGrid')
32
33
35 """
36 This class represents a job on swegrid.
37 """
38
40
41 iGrid.__init__(self)
42 self.cluster_id = -1
43 self.post = None
44 self.vo = "icecube"
45 self.proc = 0
46 self.sleeptime = 60*5
47 self.enqueue_cmd = "ngsub"
48 self.checkqueue_cmd = "ngstat"
49 self.queue_rm_cmd = "ngkill"
50 self.queue_clean_cmd = "ngclean"
51 self.clusterfilepath = "/home/simprod/icetray/iceprod/resources/clusters"
52 self.WriteClusterFile("")
53
54
56 """
57 Get the cluster AND job id for the submitted jobs.
58 @return: a list of jobs with their cluster and job id
59 in the condor format
60 None if no jobs have been submitted or if submission failed.
61 """
62 return ['%d.%d' % (self.cluster_id, job_id) \
63 for job_id in range(self.jobs_submitted)]
64
65
67 """
68 Write submit file to a file.
69 @param config_file: path to file were submit file will be written
70 """
71 if not job.GetExecutable():
72 raise Exception, "no executable configured"
73
74 cluster = ''
75 jobdesc = 'iceprod'
76 count = 0
77 memory = 0
78 runtimeenvironment = []
79 for key,opt in job.GetBatchOpts().items():
80 if key == 'cluster':
81 cluster = str(opt)
82 elif key == 'jobdesc':
83 jobdesc = str(opt)
84 elif key == 'count':
85 count = int(opt)
86 if count > 8:
87 count = 8
88 elif key == 'memory':
89 memory = int(opt)
90 if memory > 15000:
91 memory = 15000
92 elif key == 'runtimeenvironment':
93 runtimeenvironment = str(opt).split(',')
94
95 if cluster == '':
96 self.enqueue_cmd = 'ngsub -C %s -f ' % self.clusterfilepath
97 else:
98 self.enqueue_cmd = 'ngsub -c %s -f ' % cluster
99
100
101 err = os.path.basename(job.GetErrorFile())
102 out = os.path.basename(job.GetOutputFile())
103 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments()
104 argstr = '"%s"' % ('" "'.join(argopts))
105
106 submitfile = open(config_file, 'w')
107
108 job.Write(submitfile, '&(executable="%s")' % os.path.basename(job.GetExecutable()))
109 job.Write(submitfile, ' (arguments=%s)' % argstr, parse=False)
110
111 job.Write(submitfile, ' (runTimeEnvironment="DATA/PHYSICS/ICECUBE/PHOTONTABLES-AHA07V2")')
112 job.Write(submitfile, ' (runTimeEnvironment="DATA/PHYSICS/ICECUBE/PHOTORECTABLES-AHA07V2")')
113 job.Write(submitfile, ' (runTimeEnvironment="DATA/PHYSICS/ICECUBE/PHOTORECTABLES-MU-AHA07V2H2-CSCD-AHAV1ICE")')
114
115 job.Write(submitfile, ' (|(runTimeEnvironment="ENV/JAVA/SDK-1.5.0.6")')
116 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.6.0.6")')
117 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.6.0")')
118 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.5.0")')
119 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.6.0.4")')
120 job.Write(submitfile, ' )')
121
122 job.Write(submitfile, ' (runTimeEnvironment>="ENV/LOCALDISK")')
123
124 for re in runtimeenvironment:
125 job.Write(submitfile, ' (runTimeEnvironment="%s")' % re)
126
127 if count > 0:
128 job.Write(submitfile, ' (count="%d")' % count)
129
130 if memory > 0:
131 job.Write(submitfile, ' (memory>="%d")' % memory)
132
133 job.Write(submitfile, ' (inputfiles=("%s" "%s")' % (os.path.basename(job.GetExecutable()), job.GetExecutable()))
134 for file in job.GetInputFiles():
135 job.Write(submitfile, ' ("%s" "%s")' % (os.path.basename(file), os.path.join(job.GetInitialdir(), file)))
136 job.Write(submitfile, ' )')
137
138 job.Write(submitfile, ' (stdout="' + out + '")')
139 job.Write(submitfile, ' (stderr="' + err + '")')
140 job.Write(submitfile, ' (gmlog="log")')
141
142 jobname = '%s_%d_%d' % (jobdesc, job.dataset_id, job.proc)
143
144 job.Write(submitfile, ' (jobName="%s")' % jobname)
145 job.Write(submitfile, ' (cpuTime="1200 minutes")')
146
147
148 job.Write(submitfile, ' (environment=')
149 for var in self.env.keys():
150 if var not in ['PHOTON_TABLES_DIR', 'PHOTONTABLES', 'PYROOT']:
151 job.Write(submitfile, ' (%s "%s")' % (var, self.env[var]), parse=False)
152
153
154 job.Write(submitfile, ' )')
155
156
157 for key in self.GetParamKeys():
158 job.Write(submitfile, ' (%s=%s)' % (key, self.GetParam(key)))
159
160 submitfile.close()
161
162
164
165 clusterfile = open(self.clusterfilepath, 'w')
166
167 grad = 'grad.uppmax.uu.se'
168 svea = 'svea.c3se.chalmers.se'
169 siri = 'siri.lunarc.lu.se'
170 smokerings = 'arc-ce.smokerings.nsc.liu.se'
171 ritsem = 'jeannedarc.hpc2n.umu.se'
172 ruth = 'arc-ce01.pdc.kth.se'
173
174 clusters = [grad, smokerings, ruth, ritsem]
175
176 for cluster in clusters:
177 if submit_status.find(cluster) == -1:
178 clusterfile.write("%s\n" % cluster)
179
180 clusterfile.close()
181
182
183 - def get_id(self,submit_status):
184 """
185 Parse string returned by qsub on submission to extract the
186 id of the job cluster
187
188 @param submit_status: string returned by condor_submit
189 """
190
191 logger.info(submit_status)
192
193 if submit_status.find('gsiftp://') != -1:
194 job_id = submit_status[submit_status.find('gsiftp://'):].strip()
195 self.job_ids.append(job_id)
196 return job_id
197
198 else:
199 logger.warn('could not parse job id from "%s"' % submit_status)
200 return -1
201
202
204 """
205 Querie status of cluster or job on queue
206 """
207 cmd = self.checkqueue_cmd + " " + " ".join(self.job_ids)
208 logger.info(cmd)
209 handle = os.popen(cmd, 'r')
210 status = string.join(handle.readlines())
211 handle.close()
212
213 return status
214
216 """
217 Querie status of cluster on queue and clean jobs
218 """
219 return 0
220
221
223 """
224 Querie status of job on swegrid queue
225 """
226 if isinstance(jobs,list):
227 job_list = jobs
228 else:
229 job_list = [jobs]
230
231 for job in job_list:
232 job.SetStatus('?')
233
234 job_id = job.GetJobId()
235 if job_id < 0: return 0
236 cmd = self.checkqueue_cmd + ' ' + job_id
237
238 status = ''
239 handle = os.popen(cmd, 'r')
240 for line in handle.readlines():
241 line = line.strip()
242 if line.startswith('Status:'):
243 status = line[(line.find(':') + 1):].replace(' ', '')
244 handle.close()
245
246 msg='%s: %s' % (job.GetJobId(), status)
247 logger.info(msg)
248
249 if status != '':
250
251
252 if status in ('ACCEPTING', 'ACCEPTED', 'SUBMITTING', 'SUBMITTED', 'PREPARING', 'PREPARED', 'QUEUEING', 'QUEUED', 'INLRMS:Q'):
253 job.SetStatus('QUEUED')
254 elif status in ('INLRMS:R', 'INLRMS:E', 'INLRMS:O', 'EXECUTED', 'FINISHING'):
255 job.SetStatus('PROCESSING')
256 elif status in ('FINISHED'):
257 job.SetStatus('FINISHED')
258 elif status in ('FAILED', 'CANCELING', 'KILLING', 'KILLED'):
259 job.SetStatus('FAILED')
260 else:
261 job.SetStatus('?')
262
263 return 1
264
265
267 """
268 Remove cluster or job from queue
269 """
270
271 cmd = self.queue_rm_cmd
272 if jobid: cmd += " %s" % jobid
273 else: cmd += " " + " ".join(self.job_ids)
274
275 logger.info(cmd)
276 status = ''
277 handle = os.popen(cmd, 'r')
278 status = string.join(handle.readlines())
279 handle.close()
280
281 if status.find('Job has already finished') != -1:
282 cmd = self.queue_clean_cmd
283 if jobid: cmd += " %s" % jobid
284 else: cmd += " " + " ".join(self.job_ids)
285
286 logger.info(cmd)
287 handle = os.popen(cmd, 'r')
288 status = string.join(handle.readlines())
289 handle.close()
290
291 return status
292
293
294 - def Clean(self,jobdict):
295 """
296 remove job from queue
297 """
298
299 job_id = jobdict['grid_queue_id']
300 logger.info('removing job: %s' % job_id)
301
302 self.QRemove(job_id)
303