1
2
3
4 """
5 A basic wrapper for submitting and monitoring jobs to Condor.
6 Inherits from i3Queue
7
8 copyright (c) 2005 the icecube collaboration
9
10 @version: $Revision: $
11 @date: $Date: $
12 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
13 """
14
15 import os
16 import re
17 import sys
18 import math
19 import dircache
20 import commands
21 import time
22 import string
23 import shutil
24 import ConfigParser
25 import logging
26 import iceprod
27 import iceprod.core.exe
28 from iceprod.core import metadata
29 from iceprod.core.dataclasses import Steering
30 from iceprod.server.grid import iGrid
31 from os.path import basename
32
33 logger = logging.getLogger('eGee')
34
35 egee_status = {'1':'QUEUED', '2':'PROCESSING'}
36 queued_state = [
37 'READY',
38 'SCHEDULED',
39 'WAITING',
40 'SUBMITTED',
41 'ACCEPTED',
42 'PREPARING',
43 'QUEUEING',
44 'QUEUED',
45 'INLRMS:Q',
46 ]
47 running_state = [
48 'RUNNING',
49 'EXECUTED',
50 'FINISHING',
51 ]
52
53 error_state = [
54 'FAILED',
55 'DONE(EXITCODE!=0)',
56 'ABORTED',
57 ]
58
59 finished_state = [
60 'DONE(SUCCESS)',
61 'CLEARED',
62 'CANCELLED',
63 ]
64
65 undetermined_states = [
66 'UNAVAILABLE',
67 ]
68
70 return '\"%s\"' % str(x)
71
76
77
79 """
80 This class represents a job or cluster on an egee grid.
81 """
82
83
85 iGrid.__init__(self)
86 self.cluster_id = -1
87 self.post = None
88 self.ids = "ids"
89 self.enqueue_cmd = "glite-wms-job-submit"
90 self.checkqueue_cmd = "glite-wms-job-status"
91 self.queue_rm_cmd = "glite-wms-job-cancel"
92 self.get_output_cmd = "glite-wms-job-output"
93 self.suffix = "jdl"
94 self.grid_storage = "%(gridstorage)s/%(dataset_id)u/%(queue_id)u/%(filename)s"
95
96
98 from random import choice
99 weighted_rlist = []
100
101 for r in resource_list:
102 if len(r.split()) > 1:
103 resource,weight = r.split()
104 else:
105 resource,weight = r,1
106 try:
107 weight = int(weight)
108 except Exception,e:
109 logger.error("Exception: " + str(e))
110 logger.warn("Unable to get resource weight for: " +r)
111 weight = 1
112
113 logger.debug("%s:%u " % (resource,weight))
114 weighted_rlist.extend([resource]*weight)
115
116 return choice(weighted_rlist)
117
119 """
120 Write JDL to a file.
121 @param job: i3Job object
122 @param config_file: path to file were submit file will be written
123 """
124
125 if not job.GetExecutable():
126 raise Exception, "no executable configured"
127
128 submitfile = open(config_file,'w')
129 wrapper = open(config_file.replace('jdl','sh'),'w')
130 self.ids= config_file.replace('jdl','ids')
131
132 job.Write(wrapper,"#!/bin/sh")
133 job.Write(wrapper,'echo "running iceprod on $HOSTNAME";',parse=False)
134 job.Write(wrapper,"uname -a;")
135
136
137 job.Write(wrapper,"I3SCRATCH=/tmp",parse=False)
138 for var in self.env.keys():
139 job.Write(wrapper, "export %s=%s" % (var, self.env[var]),parse=False )
140 for var in job.env.keys():
141 job.Write(wrapper, "export %s=%s" % (var, job.env[var]),parse=False)
142
143 job.Write(wrapper,"INIT_DIR=$PWD",parse=False)
144 job.Write(wrapper,"RUNDIR=$I3SCRATCH/i3sim_%s_%s" % (job.GetProcNum(),time.time()),parse=False)
145 job.Write(wrapper,"mkdir -p $RUNDIR")
146 job.Write(wrapper,"echo \"running on $HOSTNAME:$RUNDIR\"",parse=False)
147 job.Write(wrapper,"cd $RUNDIR",parse=False)
148 job.Write(wrapper,"ln -s $INIT_DIR/* $RUNDIR/",parse=False)
149
150 err = basename(job.GetErrorFile())
151 out = basename(job.GetOutputFile())
152 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments()
153 argstr = job.GetMainScript() + " " + " ".join(argopts)
154 inputfiles = []
155 inputfiles.append(wrapper.name)
156 inputfiles.append(job.GetExecutable())
157 inputfiles.extend(job.GetInputFiles())
158 inputfiles = map(qoute,inputfiles)
159
160 outputfiles = []
161 outputfiles.append('work.tgz')
162 outputfiles.append("icetray.%06u.log" % job.GetProcNum())
163 outputfiles.append('%s'%err)
164 outputfiles.append('%s'%out)
165 outputfiles.append(iceprod.core.exe._inventory)
166 outputfiles.extend(job.GetOutputFiles())
167 outputfiles = map(qoute,outputfiles)
168
169 pyhome = ''
170 if self.env.has_key('PYROOT'):
171 pyhome = self.env['PYROOT']
172 if job.env.has_key('PYROOT'):
173 pyhome = job.env['PYROOT']
174 if pyhome.startswith('http:'):
175 job.Write(wrapper,"wget --quiet %s" % pyhome,parse=False)
176 if pyhome.endswith('.tgz'):
177 job.env['PYROOT'] = '$PWD/python-2.3'
178 job.Write(wrapper,"tar xzf %s" % os.path.basename(pyhome),parse=False)
179
180
181 job.Write(submitfile,'Executable = "%s";' % basename(wrapper.name))
182 job.Write(submitfile,'StdOutput = "%s";' % out)
183 job.Write(submitfile,'StdError = "%s";' % err)
184 job.Write(submitfile,'InputSandbox = {%s};' % ','.join(inputfiles))
185 job.Write(submitfile,'OutputSandbox = {%s};' % ','.join(outputfiles))
186 job.Write(submitfile,'Arguments = "%s";' % argstr,parse=False)
187
188 job.Write(wrapper, 'cmd="$PYROOT/bin/python %s $*"' % basename(job.GetExecutable()),parse=False)
189 job.Write(wrapper, "echo $cmd",parse=False)
190 job.Write(wrapper, "$cmd",parse=False)
191 job.Write(wrapper, "retval=$?",parse=False)
192
193
194 for key in self.GetParamKeys():
195 if not job.batchopts.has_key(key):
196 job.AddBatchOpt(key,self.GetParam(key))
197
198
199 for key,opt in job.GetBatchOpts().items():
200 if key.startswith('-'): continue
201 job.Write(submitfile, "%s = %s;" % (key, opt))
202
203 submitfile.close();
204
205 read_from_inventory = False
206 if not read_from_inventory:
207
208 if not self.GetArgOpt('stageout'):
209 job.Write(wrapper,"# copying files to storage element")
210 job.Write(wrapper,"for file in *; do")
211 job.Write(wrapper," if [ -f $file -a ! -f %s/`basename $file` ]; then " % job.GetInitialdir())
212 job.Write(wrapper," lcg-cr --vo icecube \\",parse=False)
213 job.Write(wrapper," -d udo-dcache01.grid.uni-dortmund.de \\",parse=False)
214 job.Write(wrapper," -l lfn:/grid/icecube/iceprod/%s/%s/$file` \\" \
215 %(job.GetDatasetId(),job.GetProcNum()),parse=False)
216 job.Write(wrapper," file:$PWD/$file",parse=False)
217 job.Write(wrapper," fi; done")
218
219
220 else:
221
222
223
224
225
226
227 inventoryfile = os.path.join(submitdir,iceprod.core.exe._inventory)
228 copyok = 0
229 if not os.path.exists(inventoryfile):
230 logger.error("%d,%d: no output inventory found." % (dataset,proc))
231 return self.CopyStatusEnum['NOTREADY']
232 inventory = FileInventory()
233 inventorylist = []
234 logger.debug( 'reading %s' % inventoryfile )
235 inventory.Read(inventoryfile)
236
237
238 job.Write(wrapper, "export LFC_HOST=`lcg-infosites --vo icecube lfc`",parse=False)
239 job.Write(wrapper, "export LCG_GFAL_INFOSYS=lcg-bdii.ifh.de:2170",parse=False)
240
241
242
243 copy_cmds = []
244 for file in inventory.filelist:
245
246 cmd = 'lcg-cr --vo icecube '
247 cmd += '-l %s/%s ' % (file["target"], os.path.split(file["source"])[1])
248 cmd += '%s' % (file["source"])
249
250 copy_cmds.append("lfc-mkdir -p %s" % (file["target"][4:]))
251 copy_cmds.append(cmd)
252
253 cmd = "\n".join(copy_cmds)
254 job.Write(wrapper,"# copying files to storage element")
255 job.Write(wrapper,cmd,parse=False)
256
257
258 del_output = False
259 if del_output:
260 del_cmds = []
261 for file in inventory.filelist:
262 cmd = "/bin/rm %s" % (file["source"][5:])
263 del_cmds.append(cmd)
264 cmd = "\n".join(del_cmds)
265
266 job.Write(wrapper,"# deleting copied files")
267 job.Write(wrapper,cmd,parse=False)
268
269 job.Write(wrapper,"cd $INIT_DIR",parse=False)
270 job.Write(wrapper,"rm -rf $RUNDIR",parse=False)
271 job.Write(wrapper,"exit $retval",parse=False)
272
273 wrapper.close();
274
275 - def Submit(self,job,config_file):
276 """
277 Submit job/cluster to grid
278
279 @param job: i3Job object
280 @param config_file: path to file were submit file will be written
281 """
282 self.submit_status = ''
283 self.WriteConfig(job,config_file)
284
285 submit_args = []
286
287
288 resource_list = []
289 for key,opt in job.GetBatchOpts().items():
290 if key.startswith('-resource'):
291 if opt.startswith('-r'):
292 resource = opt[2:].strip()
293 else:
294 resource = opt
295 resource_list.append(resource)
296
297 elif key.startswith('-'):
298 submit_args.append(opt)
299 if resource_list:
300 resource = self._choose_resource(resource_list)
301 submit_args.append('-r %s' % resource)
302
303 cmd = "%s %s -o %s %s" % (self.enqueue_cmd," ".join(submit_args),self.ids,config_file)
304 logger.debug(cmd)
305 status, self.submit_status = commands.getstatusoutput(cmd)
306 if status != 0:
307 logger.error("Failed to execute command")
308 logger.error(cmd)
309 try:
310 id = self.get_id(self.submit_status)
311 job.SetJobId(id)
312 if id in (-1,None) : status = 1
313 else: status = 0
314 except Exception, e:
315 logger.error("Exception: " + str(e))
316 self.submit_status += "\nException: " + str(e)
317 status = 1
318
319 if len(self.job_ids) and not self.cluster_id:
320 self.cluster_id = self.job_ids[0]
321
322 return status,self.submit_status
323
324
325 - def get_id(self,submit_status):
326 """
327 Parse string returned by on submission to extract the
328 id of the job cluster
329
330 @param submit_status: string returned by submit cdm
331 """
332
333 logger.debug(submit_status)
334 idfile = open(self.ids,'r')
335 job_id = None
336 for line in idfile.readlines():
337 if line.strip().startswith('http'):
338 job_id = line.strip()
339 self.job_ids.append(job_id)
340 break
341 if not job_id:
342 logger.warn('could not parse job id from "%s"' % self.ids)
343 return job_id
344
345
347 """
348 Querie status of cluster or job on queue
349 """
350
351 cmd = self.checkqueue_cmd
352 for id in self.job_ids:
353 cmd += " %s" % id
354 logger.debug(cmd)
355 pout = os.popen(cmd)
356 status = string.join(pout.readlines())
357 pout.close()
358 return status
359
361 """
362 Check consistency of queue and remove jobs which shouldn't be there
363 """
364
365 return 0
366
368 """
369 Querie status of job on glite queue
370 """
371 if isinstance(jobs,list):
372 job_list = jobs
373 else:
374 job_list = [jobs]
375 for job in job_list:
376 dataset_id = job.GetDatasetId()
377 queue_id = job.GetProcNum()
378 job_id = job.GetJobId()
379 status = "?"
380
381 if not job_id:
382 job.SetStatus('?')
383
384 cmd = self.checkqueue_cmd + ' ' + job_id
385 handle = os.popen(cmd, 'r')
386
387 for line in handle.readlines():
388 line = line.strip()
389 if line.startswith('Current Status:'):
390 status = line[(line.find(':') + 1):].replace(' ','').split()[0]
391 handle.close()
392 logger.info("%s.%s (%s) : %s" %(dataset_id,queue_id,job_id,status.upper()))
393
394 if status.upper() in queued_state:
395 job.SetStatus('QUEUED')
396 elif status.upper() in running_state:
397 job.SetStatus('PROCESSING')
398 elif status.upper() in error_state:
399 job.SetStatus('FAILED')
400 elif status.upper() in finished_state:
401 job.SetStatus('FINISHED')
402 else:
403 job.SetStatus('?')
404
405 return 1
406
407
409 """
410 Remove cluster or job from glite queue
411 """
412
413 cmd = self.queue_rm_cmd + " --noint "
414 if jobid: cmd += " %s" % jobid
415 else:
416 for id in self.job_ids: cmd += " %s" % id
417
418 logger.debug(cmd)
419 handle = os.popen(cmd, 'r')
420 status = string.join(handle.readlines())
421 handle.close()
422
423 if status.find('Job has already finished') != -1:
424 cmd = self.queue_rm_cmd + " --noint "
425 if jobid: cmd += " %s" % jobid
426 else:
427 for id in self.job_ids: cmd += " %s" % id
428
429 logger.debug(cmd)
430 handle = os.popen(cmd, 'r')
431 status = string.join(handle.readlines())
432 handle.close()
433
434 return status
435
436
437 - def PostCopy(self,jobdict,target_url,maxtries=4):
438 """
439 Interface: Remove active job/cluster from queuing system.
440 """
441 if not jobdict['grid_queue_id']: return True
442
443 dataset = jobdict['dataset_id']
444 proc = jobdict['queue_id']
445 job_id = jobdict['grid_queue_id']
446 submitdir = jobdict['submitdir']
447
448 cmd = '%s --noint --dir %s %s' % (self.get_output_cmd,submitdir,job_id)
449 logger.debug(cmd)
450 if not os.system(cmd):
451
452 jobdict['grid_queue_id'] = None
453
454
455 inventoryfile = os.path.join(submitdir,iceprod.core.exe._inventory)
456 if not os.path.exists(inventoryfile):
457 logger.error("%d,%d: no output inventory found." % (dataset,proc))
458 return self.CopyStatusEnum['NOTREADY']
459 inventory = FileInventory()
460 inventorylist = []
461 logger.debug( 'reading %s' % inventoryfile )
462 inventory.Read(inventoryfile)
463
464
465 for file in inventory.filelist:
466
467 jobdict['filename'] = os.path.basename(file['source'])
468 cmd = 'lcg-cp --vo icecube '
469 cmd += ' -v %s ' % self.grid_storage
470 cmd += ' file:/%(submitdir)s/%(filename)s'
471
472
473 if os.system(cmd % jobdict):
474 cmd = 'lcg-del --vo icecube '
475 cmd += ' -a %s ' % self.grid_storage
476 os.system(cmd % jobdict)
477
478 return self.CopyStatusEnum['OK']
479
480
481 - def Clean(self,jobdict):
482 """
483 remove job from queue
484 """
485 if not jobdict['grid_queue_id']: return True
486
487 dataset = jobdict['dataset_id']
488 proc = jobdict['queue_id']
489 job_id = jobdict['grid_queue_id']
490 submitdir = jobdict['submitdir']
491
492 if job_id:
493 logger.info('removing job: %s' % job_id)
494 self.QRemove(job_id)
495
496 cmd = '%s --noint --dir %s %s' % (self.get_output_cmd,submitdir,job_id)
497 logger.debug(cmd)
498 os.system(cmd)
499
500 iGrid.Clean(self,jobdict)
501
502
504 """
505 This class represents a job or cluster on an egee grid.
506 """
507
509 gLite.__init__(self)
510 self.enqueue_cmd = "edg-job-submit"
511 self.checkqueue_cmd = "edg-job-status"
512 self.get_output_cmd = "edj-job-get-output"
513 self.queue_rm_cmd = "edg-job-cancel"
514
515
516 - def PostCopy(self,jobdict,target_url,maxtries=4):
517 """
518 Interface: Remove active job/cluster from queuing system.
519 """
520
521 dataset = jobdict['dataset_id']
522 proc = jobdict['queue_id']
523 job_id = jobdict['grid_queue_id']
524 submitdir = jobdict['submitdir']
525
526 cmd = '%s -dir %s/output %s' % (self.get_output_cmd,submitdir,job_id)
527 logger.debug(cmd)
528 os.system(cmd)
529 return True
530
531 - def Clean(self,jobdict):
532 """
533 remove job from queue
534 """
535 dataset = jobdict['dataset_id']
536 proc = jobdict['queue_id']
537 job_id = jobdict['grid_queue_id']
538 submitdir = jobdict['submitdir']
539
540 if job_id:
541 logger.info('removing job: %s' % job_id)
542 self.QRemove(job_id)
543
544 cmd = '%s -dir %s %s' % (self.get_output_cmd,submitdir,job_id)
545 logger.debug(cmd)
546 os.system(cmd)
547 iGrid.Clean(self,jobdict)
548