Package iceprod :: Package server :: Package plugins :: Module swegrid
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.plugins.swegrid

  1  #!/bin/env python 
  2  # 
  3   
  4  """ 
  5   A basic wrapper for submitting and monitoring jobs to SweGrid.  
  6   Inherits from i3Queue 
  7   
  8   copyright  (c) 2005 the icecube collaboration 
  9   
 10   @version: $Revision: $ 
 11   @date: $Date: $ 
 12   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>, 
 13   Henrik Johansson <henjoh@physto.se> 
 14  """ 
 15   
 16  import os 
 17  import re 
 18  import sys 
 19  import math 
 20  import dircache 
 21  import commands 
 22  import time 
 23  import string 
 24  import shutil 
 25  import ConfigParser 
 26  import logging 
 27  from iceprod.core import metadata 
 28  from iceprod.core.dataclasses import Steering 
 29  from iceprod.server.grid import iGrid 
 30  from iceprod.server.job import i3Job 
 31   
 32  logger = logging.getLogger('SweGrid') 
 33   
 34   
35 -class SweGrid(iGrid):
36 """ 37 This class represents a job on swegrid. 38 """ 39
40 - def __init__(self):
41 42 iGrid.__init__(self) 43 self.cluster_id = -1 44 self.post = None 45 self.vo = "icecube" 46 self.proc = 0 47 self.sleeptime = 60*5 48 self.enqueue_cmd = "ngsub" 49 self.checkqueue_cmd = "ngstat" 50 self.queue_rm_cmd = "ngkill" 51 self.queue_clean_cmd = "ngclean" 52 self.clusterfilepath = "/home/simprod/icetray/iceprod/resources/clusters" 53 self.WriteClusterFile("")
54 55
56 - def GetJobIds(self):
57 """ 58 Get the cluster AND job id for the submitted jobs. 59 @return: a list of jobs with their cluster and job id 60 in the condor format 61 None if no jobs have been submitted or if submission failed. 62 """ 63 return ['%d.%d' % (self.cluster_id, job_id) \ 64 for job_id in range(self.jobs_submitted)]
65 66
67 - def WriteConfig(self,job,config_file):
68 """ 69 Write submit file to a file. 70 @param config_file: path to file were submit file will be written 71 """ 72 if not job.GetExecutable(): 73 raise Exception, "no executable configured" 74 75 cluster = '' 76 jobdesc = 'iceprod' 77 count = 0 78 memory = 2000 79 runtimeenvironment = [] 80 icemodel = 'spice1' 81 outputfiles = [] 82 storage = '' 83 localpath = '' 84 for key,opt in job.GetBatchOpts().items(): 85 if key == 'cluster': 86 cluster = str(opt) 87 elif key == 'jobdesc': 88 jobdesc = str(opt) 89 elif key == 'count': 90 count = int(opt) 91 if count > 8: 92 count = 8 93 elif key == 'memory': 94 memory = int(opt) 95 if memory > 15000: 96 memory = 15000 97 elif key == 'runtimeenvironment': 98 runtimeenvironment = str(opt).split(',') 99 elif key == 'icemodel': 100 icemodel = str(opt) 101 elif key == 'outputfiles': 102 outputfiles = str(opt).split(',') 103 elif key == 'storage': 104 storage = str(opt) 105 elif key == 'localpath': 106 localpath = str(opt) 107 108 if cluster == '': 109 self.enqueue_cmd = 'ngsub -C %s -f ' % self.clusterfilepath 110 else: 111 self.enqueue_cmd = 'ngsub -c %s -f ' % cluster 112 113 114 err = os.path.basename(job.GetErrorFile()) 115 out = os.path.basename(job.GetOutputFile()) 116 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 117 argstr = '"%s"' % ('" "'.join(argopts)) 118 119 submitfile = open(config_file, 'w') 120 121 job.Write(submitfile, '&(executable="%s")' % os.path.basename(job.GetExecutable())) 122 job.Write(submitfile, ' (arguments=%s)' % argstr, parse=False) 123 124 if icemodel == 'aha07v2': 125 job.Write(submitfile, ' (runTimeEnvironment="DATA/PHYSICS/ICECUBE/PHOTONTABLES-AHA07V2")') 126 else: 127 job.Write(submitfile, ' (runTimeEnvironment="DATA/PHYSICS/ICECUBE/PHOTONTABLES-SPICE1")') 128 129 job.Write(submitfile, ' (runTimeEnvironment="DATA/PHYSICS/ICECUBE/PHOTORECTABLES-AHA07V2")') 130 job.Write(submitfile, ' (runTimeEnvironment="DATA/PHYSICS/ICECUBE/PHOTORECTABLES-MU-AHA07V2H2-CSCD-AHAV1ICE")') 131 132 job.Write(submitfile, ' (|(runTimeEnvironment="ENV/JAVA/SDK-1.5.0.6")') 133 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.6.0.6")') 134 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.6.0")') 135 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.5.0")') 136 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.6.0.4")') 137 job.Write(submitfile, ' )') 138 139 job.Write(submitfile, ' (runTimeEnvironment>="ENV/LOCALDISK")') 140 141 for re in runtimeenvironment: 142 job.Write(submitfile, ' (runTimeEnvironment="%s")' % re) 143 144 if count > 0: 145 job.Write(submitfile, ' (count="%d")' % count) 146 147 if memory > 0: 148 job.Write(submitfile, ' (memory>="%d")' % memory) 149 150 job.Write(submitfile, ' (inputfiles=("%s" "%s")' % (os.path.basename(job.GetExecutable()), job.GetExecutable())) 151 for f in job.GetInputFiles(): 152 job.Write(submitfile, ' ("%s" "%s")' % (os.path.basename(f), os.path.join(job.GetInitialdir(), f))) 153 job.Write(submitfile, ' )') 154 155 if outputfiles != [] and storage != '': 156 storage = storage.replace('(dataset)', '%d' % job.dataset_id) 157 job.Write(submitfile, ' (outputfiles=') 158 for f in outputfiles: 159 f = f.replace('(dataset)', '%06d' % job.dataset_id) 160 f = f.replace('(procnum)', '%06d' % job.proc) 161 flocal = f 162 if localpath != '': 163 flocal = os.path.join(localpath, flocal) 164 fremote = os.path.join(storage, f) 165 job.Write(submitfile, ' ("%s" "%s")' % (flocal, fremote), parse=False) 166 job.Write(submitfile, ' )') 167 168 job.Write(submitfile, ' (stdout="' + out + '")') 169 job.Write(submitfile, ' (stderr="' + err + '")') 170 job.Write(submitfile, ' (gmlog="log")') 171 172 jobname = '%s_%d_%d' % (jobdesc, job.dataset_id, job.proc) 173 174 job.Write(submitfile, ' (jobName="%s")' % jobname) 175 job.Write(submitfile, ' (cpuTime="1200 minutes")') 176 job.Write(submitfile, ' (wallTime="1200 minutes")') 177 178 # Add environment variables 179 job.Write(submitfile, ' (environment=') 180 for var in self.env.keys(): 181 if var not in ['PHOTON_TABLES_DIR', 'PHOTONTABLES', 'PYROOT']: 182 job.Write(submitfile, ' (%s "%s")' % (var, self.env[var]), parse=False) 183 #for var in job.env.keys(): 184 # job.Write(submitfile, ' (%s %s)' % (var, job.env[var]) ) 185 job.Write(submitfile, ' )') 186 187 # Add general batch options 188 for key in self.GetParamKeys(): 189 job.Write(submitfile, ' (%s=%s)' % (key, self.GetParam(key))) 190 191 submitfile.close()
192 193
194 - def WriteClusterFile(self,submit_status):
195 196 clusterfile = open(self.clusterfilepath, 'w') 197 198 grad = 'grad.uppmax.uu.se' 199 svea = 'svea.c3se.chalmers.se' # small output files 200 siri = 'siri.lunarc.lu.se' # small output files 201 smokerings = 'arc-ce.smokerings.nsc.liu.se' 202 ritsem = 'jeannedarc.hpc2n.umu.se' 203 ruth = 'arc-ce01.pdc.kth.se' 204 205 clusters = [ruth, smokerings, grad, ritsem] 206 207 for cluster in clusters: 208 if submit_status.find(cluster) == -1: 209 clusterfile.write("%s\n" % cluster) 210 211 clusterfile.close()
212 213
214 - def get_id(self,submit_status):
215 """ 216 Parse string returned by qsub on submission to extract the 217 id of the job cluster 218 219 @param submit_status: string returned by condor_submit 220 """ 221 222 logger.info(submit_status) 223 #self.WriteClusterFile(submit_status) 224 if submit_status.find('gsiftp://') != -1: 225 job_id = submit_status[submit_status.find('gsiftp://'):].strip() 226 self.job_ids.append(job_id) 227 return job_id 228 229 else: 230 logger.warn('could not parse job id from "%s"' % submit_status) 231 return -1
232 233
234 - def CheckQ(self,db=None):
235 """ 236 Querie status of cluster or job on queue 237 """ 238 cmd = self.checkqueue_cmd + " " + " ".join(self.job_ids) 239 logger.info(cmd) 240 handle = os.popen(cmd, 'r') 241 status = string.join(handle.readlines()) 242 handle.close() 243 244 return status
245
246 - def CleanQ(self,jobs=None):
247 """ 248 Querie status of cluster on queue and clean jobs 249 """ 250 return 0
251 252
253 - def CheckJobStatus(self,jobs):
254 """ 255 Querie status of job on swegrid queue 256 """ 257 if isinstance(jobs,list): 258 job_list = jobs 259 else: 260 job_list = [jobs] 261 262 for job in job_list: 263 job.SetStatus('?') 264 265 job_id = job.GetJobId() 266 if job_id < 0: return 0 267 cmd = self.checkqueue_cmd + ' ' + job_id 268 269 status = '' 270 handle = os.popen(cmd, 'r') 271 for line in handle.readlines(): 272 line = line.strip() 273 if line.startswith('Status:'): 274 status = line[(line.find(':') + 1):].replace(' ', '') 275 handle.close() 276 277 msg='%s: %s' % (job.GetJobId(), status) 278 logger.info(msg) 279 280 if status != '': 281 282 # For status states see http://www.nordugrid.org/documents/ui.pdf 283 if status in ('ACCEPTING', 'ACCEPTED', 'SUBMITTING', 'SUBMITTED', 'PREPARING', 'PREPARED', 'QUEUEING', 'QUEUED', 'INLRMS:Q'): 284 job.SetStatus('QUEUED') 285 elif status in ('INLRMS:R', 'INLRMS:E', 'INLRMS:O', 'EXECUTED', 'FINISHING'): 286 job.SetStatus('PROCESSING') 287 elif status in ('FINISHED'): 288 job.SetStatus('FINISHED') 289 elif status in ('FAILED', 'CANCELING', 'KILLING', 'KILLED'): 290 job.SetStatus('FAILED') 291 else: 292 job.SetStatus('?') 293 294 return 1
295 296
297 - def QRemove(self,job=None):
298 """ 299 Remove cluster or job from queue 300 """ 301 302 cmd = self.queue_rm_cmd 303 if job: cmd += " %s" % job.job_id 304 else: cmd += " " + " ".join(self.job_ids) 305 306 logger.info(cmd) 307 status = '' 308 handle = os.popen(cmd, 'r') 309 status = string.join(handle.readlines()) 310 handle.close() 311 312 if status.find('Job has already finished') != -1: 313 cmd = self.queue_clean_cmd 314 if job: cmd += " %s" % job["grid_queue_id"] 315 else: cmd += " " + " ".join(self.job_ids) 316 317 logger.info(cmd) 318 handle = os.popen(cmd, 'r') 319 status = string.join(handle.readlines()) 320 handle.close() 321 322 return status
323 324
325 - def Clean(self,jobdict,force=False):
326 """ 327 remove job from queue 328 """ 329 330 job = i3Job() 331 job.job_id = jobdict['grid_queue_id'] 332 logger.info('removing job: %s' % job.job_id) 333 334 self.QRemove(job)
335