1
2
3
4 """
5 A basic wrapper for submitting and monitoring jobs to SweGrid.
6 Inherits from i3Queue
7
8 copyright (c) 2005 the icecube collaboration
9
10 @version: $Revision: $
11 @date: $Date: $
12 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>,
13 Henrik Johansson <henjoh@physto.se>
14 """
15
16 import os
17 import re
18 import sys
19 import math
20 import dircache
21 import commands
22 import time
23 import string
24 import shutil
25 import ConfigParser
26 import logging
27 from iceprod.core import metadata
28 from iceprod.core.dataclasses import Steering
29 from iceprod.server.grid import iGrid
30 from iceprod.server.job import i3Job
31
32 logger = logging.getLogger('SweGrid')
33
34
36 """
37 This class represents a job on swegrid.
38 """
39
41
42 iGrid.__init__(self)
43 self.cluster_id = -1
44 self.post = None
45 self.vo = "icecube"
46 self.proc = 0
47 self.sleeptime = 60*5
48 self.enqueue_cmd = "ngsub"
49 self.checkqueue_cmd = "ngstat"
50 self.queue_rm_cmd = "ngkill"
51 self.queue_clean_cmd = "ngclean"
52 self.clusterfilepath = "/home/simprod/icetray/iceprod/resources/clusters"
53 self.WriteClusterFile("")
54
55
57 """
58 Get the cluster AND job id for the submitted jobs.
59 @return: a list of jobs with their cluster and job id
60 in the condor format
61 None if no jobs have been submitted or if submission failed.
62 """
63 return ['%d.%d' % (self.cluster_id, job_id) \
64 for job_id in range(self.jobs_submitted)]
65
66
68 """
69 Write submit file to a file.
70 @param config_file: path to file were submit file will be written
71 """
72 if not job.GetExecutable():
73 raise Exception, "no executable configured"
74
75 cluster = ''
76 jobdesc = 'iceprod'
77 count = 0
78 memory = 2000
79 runtimeenvironment = []
80 icemodel = 'spice1'
81 outputfiles = []
82 storage = ''
83 localpath = ''
84 for key,opt in job.GetBatchOpts().items():
85 if key == 'cluster':
86 cluster = str(opt)
87 elif key == 'jobdesc':
88 jobdesc = str(opt)
89 elif key == 'count':
90 count = int(opt)
91 if count > 8:
92 count = 8
93 elif key == 'memory':
94 memory = int(opt)
95 if memory > 15000:
96 memory = 15000
97 elif key == 'runtimeenvironment':
98 runtimeenvironment = str(opt).split(',')
99 elif key == 'icemodel':
100 icemodel = str(opt)
101 elif key == 'outputfiles':
102 outputfiles = str(opt).split(',')
103 elif key == 'storage':
104 storage = str(opt)
105 elif key == 'localpath':
106 localpath = str(opt)
107
108 if cluster == '':
109 self.enqueue_cmd = 'ngsub -C %s -f ' % self.clusterfilepath
110 else:
111 self.enqueue_cmd = 'ngsub -c %s -f ' % cluster
112
113
114 err = os.path.basename(job.GetErrorFile())
115 out = os.path.basename(job.GetOutputFile())
116 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments()
117 argstr = '"%s"' % ('" "'.join(argopts))
118
119 submitfile = open(config_file, 'w')
120
121 job.Write(submitfile, '&(executable="%s")' % os.path.basename(job.GetExecutable()))
122 job.Write(submitfile, ' (arguments=%s)' % argstr, parse=False)
123
124 if icemodel == 'aha07v2':
125 job.Write(submitfile, ' (runTimeEnvironment="DATA/PHYSICS/ICECUBE/PHOTONTABLES-AHA07V2")')
126 else:
127 job.Write(submitfile, ' (runTimeEnvironment="DATA/PHYSICS/ICECUBE/PHOTONTABLES-SPICE1")')
128
129 job.Write(submitfile, ' (runTimeEnvironment="DATA/PHYSICS/ICECUBE/PHOTORECTABLES-AHA07V2")')
130 job.Write(submitfile, ' (runTimeEnvironment="DATA/PHYSICS/ICECUBE/PHOTORECTABLES-MU-AHA07V2H2-CSCD-AHAV1ICE")')
131
132 job.Write(submitfile, ' (|(runTimeEnvironment="ENV/JAVA/SDK-1.5.0.6")')
133 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.6.0.6")')
134 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.6.0")')
135 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.5.0")')
136 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.6.0.4")')
137 job.Write(submitfile, ' )')
138
139 job.Write(submitfile, ' (runTimeEnvironment>="ENV/LOCALDISK")')
140
141 for re in runtimeenvironment:
142 job.Write(submitfile, ' (runTimeEnvironment="%s")' % re)
143
144 if count > 0:
145 job.Write(submitfile, ' (count="%d")' % count)
146
147 if memory > 0:
148 job.Write(submitfile, ' (memory>="%d")' % memory)
149
150 job.Write(submitfile, ' (inputfiles=("%s" "%s")' % (os.path.basename(job.GetExecutable()), job.GetExecutable()))
151 for f in job.GetInputFiles():
152 job.Write(submitfile, ' ("%s" "%s")' % (os.path.basename(f), os.path.join(job.GetInitialdir(), f)))
153 job.Write(submitfile, ' )')
154
155 if outputfiles != [] and storage != '':
156 storage = storage.replace('(dataset)', '%d' % job.dataset_id)
157 job.Write(submitfile, ' (outputfiles=')
158 for f in outputfiles:
159 f = f.replace('(dataset)', '%06d' % job.dataset_id)
160 f = f.replace('(procnum)', '%06d' % job.proc)
161 flocal = f
162 if localpath != '':
163 flocal = os.path.join(localpath, flocal)
164 fremote = os.path.join(storage, f)
165 job.Write(submitfile, ' ("%s" "%s")' % (flocal, fremote), parse=False)
166 job.Write(submitfile, ' )')
167
168 job.Write(submitfile, ' (stdout="' + out + '")')
169 job.Write(submitfile, ' (stderr="' + err + '")')
170 job.Write(submitfile, ' (gmlog="log")')
171
172 jobname = '%s_%d_%d' % (jobdesc, job.dataset_id, job.proc)
173
174 job.Write(submitfile, ' (jobName="%s")' % jobname)
175 job.Write(submitfile, ' (cpuTime="1200 minutes")')
176 job.Write(submitfile, ' (wallTime="1200 minutes")')
177
178
179 job.Write(submitfile, ' (environment=')
180 for var in self.env.keys():
181 if var not in ['PHOTON_TABLES_DIR', 'PHOTONTABLES', 'PYROOT']:
182 job.Write(submitfile, ' (%s "%s")' % (var, self.env[var]), parse=False)
183
184
185 job.Write(submitfile, ' )')
186
187
188 for key in self.GetParamKeys():
189 job.Write(submitfile, ' (%s=%s)' % (key, self.GetParam(key)))
190
191 submitfile.close()
192
193
195
196 clusterfile = open(self.clusterfilepath, 'w')
197
198 grad = 'grad.uppmax.uu.se'
199 svea = 'svea.c3se.chalmers.se'
200 siri = 'siri.lunarc.lu.se'
201 smokerings = 'arc-ce.smokerings.nsc.liu.se'
202 ritsem = 'jeannedarc.hpc2n.umu.se'
203 ruth = 'arc-ce01.pdc.kth.se'
204
205 clusters = [ruth, smokerings, grad, ritsem]
206
207 for cluster in clusters:
208 if submit_status.find(cluster) == -1:
209 clusterfile.write("%s\n" % cluster)
210
211 clusterfile.close()
212
213
214 - def get_id(self,submit_status):
232
233
235 """
236 Querie status of cluster or job on queue
237 """
238 cmd = self.checkqueue_cmd + " " + " ".join(self.job_ids)
239 logger.info(cmd)
240 handle = os.popen(cmd, 'r')
241 status = string.join(handle.readlines())
242 handle.close()
243
244 return status
245
247 """
248 Querie status of cluster on queue and clean jobs
249 """
250 return 0
251
252
254 """
255 Querie status of job on swegrid queue
256 """
257 if isinstance(jobs,list):
258 job_list = jobs
259 else:
260 job_list = [jobs]
261
262 for job in job_list:
263 job.SetStatus('?')
264
265 job_id = job.GetJobId()
266 if job_id < 0: return 0
267 cmd = self.checkqueue_cmd + ' ' + job_id
268
269 status = ''
270 handle = os.popen(cmd, 'r')
271 for line in handle.readlines():
272 line = line.strip()
273 if line.startswith('Status:'):
274 status = line[(line.find(':') + 1):].replace(' ', '')
275 handle.close()
276
277 msg='%s: %s' % (job.GetJobId(), status)
278 logger.info(msg)
279
280 if status != '':
281
282
283 if status in ('ACCEPTING', 'ACCEPTED', 'SUBMITTING', 'SUBMITTED', 'PREPARING', 'PREPARED', 'QUEUEING', 'QUEUED', 'INLRMS:Q'):
284 job.SetStatus('QUEUED')
285 elif status in ('INLRMS:R', 'INLRMS:E', 'INLRMS:O', 'EXECUTED', 'FINISHING'):
286 job.SetStatus('PROCESSING')
287 elif status in ('FINISHED'):
288 job.SetStatus('FINISHED')
289 elif status in ('FAILED', 'CANCELING', 'KILLING', 'KILLED'):
290 job.SetStatus('FAILED')
291 else:
292 job.SetStatus('?')
293
294 return 1
295
296
298 """
299 Remove cluster or job from queue
300 """
301
302 cmd = self.queue_rm_cmd
303 if job: cmd += " %s" % job.job_id
304 else: cmd += " " + " ".join(self.job_ids)
305
306 logger.info(cmd)
307 status = ''
308 handle = os.popen(cmd, 'r')
309 status = string.join(handle.readlines())
310 handle.close()
311
312 if status.find('Job has already finished') != -1:
313 cmd = self.queue_clean_cmd
314 if job: cmd += " %s" % job["grid_queue_id"]
315 else: cmd += " " + " ".join(self.job_ids)
316
317 logger.info(cmd)
318 handle = os.popen(cmd, 'r')
319 status = string.join(handle.readlines())
320 handle.close()
321
322 return status
323
324
325 - def Clean(self,jobdict,force=False):
335