Package iceprod :: Package server :: Package plugins :: Module swegrid
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.plugins.swegrid

  1  #!/bin/env python 
  2  # 
  3   
  4  """ 
  5   A basic wrapper for submitting and monitoring jobs to SweGrid.  
  6   Inherits from i3Queue 
  7   
  8   copyright  (c) 2005 the icecube collaboration 
  9   
 10   @version: $Revision: $ 
 11   @date: $Date: $ 
 12   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>, 
 13   Henrik Johansson <henjoh@physto.se> 
 14  """ 
 15   
 16  import os 
 17  import re 
 18  import sys 
 19  import math 
 20  import dircache 
 21  import commands 
 22  import time 
 23  import string 
 24  import shutil 
 25  import ConfigParser 
 26  import logging 
 27  from iceprod.core import metadata 
 28  from iceprod.core.dataclasses import Steering 
 29  from iceprod.server.grid import iGrid 
 30   
 31  logger = logging.getLogger('SweGrid') 
 32   
 33   
34 -class SweGrid(iGrid):
35 """ 36 This class represents a job on swegrid. 37 """ 38
39 - def __init__(self):
40 41 iGrid.__init__(self) 42 self.cluster_id = -1 43 self.post = None 44 self.vo = "icecube" 45 self.proc = 0 46 self.sleeptime = 60*5 47 self.enqueue_cmd = "ngsub" 48 self.checkqueue_cmd = "ngstat" 49 self.queue_rm_cmd = "ngkill" 50 self.queue_clean_cmd = "ngclean" 51 self.clusterfilepath = "/home/simprod/icetray/iceprod/resources/clusters" 52 self.WriteClusterFile("")
53 54
55 - def GetJobIds(self):
56 """ 57 Get the cluster AND job id for the submitted jobs. 58 @return: a list of jobs with their cluster and job id 59 in the condor format 60 None if no jobs have been submitted or if submission failed. 61 """ 62 return ['%d.%d' % (self.cluster_id, job_id) \ 63 for job_id in range(self.jobs_submitted)]
64 65
66 - def WriteConfig(self,job,config_file):
67 """ 68 Write submit file to a file. 69 @param config_file: path to file were submit file will be written 70 """ 71 if not job.GetExecutable(): 72 raise Exception, "no executable configured" 73 74 cluster = '' 75 jobdesc = 'iceprod' 76 count = 0 77 memory = 0 78 runtimeenvironment = [] 79 for key,opt in job.GetBatchOpts().items(): 80 if key == 'cluster': 81 cluster = str(opt) 82 elif key == 'jobdesc': 83 jobdesc = str(opt) 84 elif key == 'count': 85 count = int(opt) 86 if count > 8: 87 count = 8 88 elif key == 'memory': 89 memory = int(opt) 90 if memory > 15000: 91 memory = 15000 92 elif key == 'runtimeenvironment': 93 runtimeenvironment = str(opt).split(',') 94 95 if cluster == '': 96 self.enqueue_cmd = 'ngsub -C %s -f ' % self.clusterfilepath 97 else: 98 self.enqueue_cmd = 'ngsub -c %s -f ' % cluster 99 100 101 err = os.path.basename(job.GetErrorFile()) 102 out = os.path.basename(job.GetOutputFile()) 103 argopts = self.GetArgOptions() + job.GetArgOptions() + job.GetArguments() 104 argstr = '"%s"' % ('" "'.join(argopts)) 105 106 submitfile = open(config_file, 'w') 107 108 job.Write(submitfile, '&(executable="%s")' % os.path.basename(job.GetExecutable())) 109 job.Write(submitfile, ' (arguments=%s)' % argstr, parse=False) 110 111 job.Write(submitfile, ' (runTimeEnvironment="DATA/PHYSICS/ICECUBE/PHOTONTABLES-AHA07V2")') 112 job.Write(submitfile, ' (runTimeEnvironment="DATA/PHYSICS/ICECUBE/PHOTORECTABLES-AHA07V2")') 113 job.Write(submitfile, ' (runTimeEnvironment="DATA/PHYSICS/ICECUBE/PHOTORECTABLES-MU-AHA07V2H2-CSCD-AHAV1ICE")') 114 115 job.Write(submitfile, ' (|(runTimeEnvironment="ENV/JAVA/SDK-1.5.0.6")') 116 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.6.0.6")') 117 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.6.0")') 118 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.5.0")') 119 job.Write(submitfile, ' (runTimeEnvironment="ENV/JAVA/SDK-1.6.0.4")') 120 job.Write(submitfile, ' )') 121 122 job.Write(submitfile, ' (runTimeEnvironment>="ENV/LOCALDISK")') 123 124 for re in runtimeenvironment: 125 job.Write(submitfile, ' (runTimeEnvironment="%s")' % re) 126 127 if count > 0: 128 job.Write(submitfile, ' (count="%d")' % count) 129 130 if memory > 0: 131 job.Write(submitfile, ' (memory>="%d")' % memory) 132 133 job.Write(submitfile, ' (inputfiles=("%s" "%s")' % (os.path.basename(job.GetExecutable()), job.GetExecutable())) 134 for file in job.GetInputFiles(): 135 job.Write(submitfile, ' ("%s" "%s")' % (os.path.basename(file), os.path.join(job.GetInitialdir(), file))) 136 job.Write(submitfile, ' )') 137 138 job.Write(submitfile, ' (stdout="' + out + '")') 139 job.Write(submitfile, ' (stderr="' + err + '")') 140 job.Write(submitfile, ' (gmlog="log")') 141 142 jobname = '%s_%d_%d' % (jobdesc, job.dataset_id, job.proc) 143 144 job.Write(submitfile, ' (jobName="%s")' % jobname) 145 job.Write(submitfile, ' (cpuTime="1200 minutes")') 146 147 # Add environment variables 148 job.Write(submitfile, ' (environment=') 149 for var in self.env.keys(): 150 if var not in ['PHOTON_TABLES_DIR', 'PHOTONTABLES', 'PYROOT']: 151 job.Write(submitfile, ' (%s "%s")' % (var, self.env[var]), parse=False) 152 #for var in job.env.keys(): 153 # job.Write(submitfile, ' (%s %s)' % (var, job.env[var]) ) 154 job.Write(submitfile, ' )') 155 156 # Add general batch options 157 for key in self.GetParamKeys(): 158 job.Write(submitfile, ' (%s=%s)' % (key, self.GetParam(key))) 159 160 submitfile.close()
161 162
163 - def WriteClusterFile(self,submit_status):
164 165 clusterfile = open(self.clusterfilepath, 'w') 166 167 grad = 'grad.uppmax.uu.se' 168 svea = 'svea.c3se.chalmers.se' # small output files 169 siri = 'siri.lunarc.lu.se' # small output files 170 smokerings = 'arc-ce.smokerings.nsc.liu.se' 171 ritsem = 'jeannedarc.hpc2n.umu.se' 172 ruth = 'arc-ce01.pdc.kth.se' 173 174 clusters = [grad, smokerings, ruth, ritsem] 175 176 for cluster in clusters: 177 if submit_status.find(cluster) == -1: 178 clusterfile.write("%s\n" % cluster) 179 180 clusterfile.close()
181 182
183 - def get_id(self,submit_status):
184 """ 185 Parse string returned by qsub on submission to extract the 186 id of the job cluster 187 188 @param submit_status: string returned by condor_submit 189 """ 190 191 logger.info(submit_status) 192 #self.WriteClusterFile(submit_status) 193 if submit_status.find('gsiftp://') != -1: 194 job_id = submit_status[submit_status.find('gsiftp://'):].strip() 195 self.job_ids.append(job_id) 196 return job_id 197 198 else: 199 logger.warn('could not parse job id from "%s"' % submit_status) 200 return -1
201 202
203 - def CheckQ(self,db=None):
204 """ 205 Querie status of cluster or job on queue 206 """ 207 cmd = self.checkqueue_cmd + " " + " ".join(self.job_ids) 208 logger.info(cmd) 209 handle = os.popen(cmd, 'r') 210 status = string.join(handle.readlines()) 211 handle.close() 212 213 return status
214
215 - def CleanQ(self,jobs=None):
216 """ 217 Querie status of cluster on queue and clean jobs 218 """ 219 return 0
220 221
222 - def CheckJobStatus(self,jobs):
223 """ 224 Querie status of job on swegrid queue 225 """ 226 if isinstance(jobs,list): 227 job_list = jobs 228 else: 229 job_list = [jobs] 230 231 for job in job_list: 232 job.SetStatus('?') 233 234 job_id = job.GetJobId() 235 if job_id < 0: return 0 236 cmd = self.checkqueue_cmd + ' ' + job_id 237 238 status = '' 239 handle = os.popen(cmd, 'r') 240 for line in handle.readlines(): 241 line = line.strip() 242 if line.startswith('Status:'): 243 status = line[(line.find(':') + 1):].replace(' ', '') 244 handle.close() 245 246 msg='%s: %s' % (job.GetJobId(), status) 247 logger.info(msg) 248 249 if status != '': 250 251 # For status states see http://www.nordugrid.org/documents/ui.pdf 252 if status in ('ACCEPTING', 'ACCEPTED', 'SUBMITTING', 'SUBMITTED', 'PREPARING', 'PREPARED', 'QUEUEING', 'QUEUED', 'INLRMS:Q'): 253 job.SetStatus('QUEUED') 254 elif status in ('INLRMS:R', 'INLRMS:E', 'INLRMS:O', 'EXECUTED', 'FINISHING'): 255 job.SetStatus('PROCESSING') 256 elif status in ('FINISHED'): 257 job.SetStatus('FINISHED') 258 elif status in ('FAILED', 'CANCELING', 'KILLING', 'KILLED'): 259 job.SetStatus('FAILED') 260 else: 261 job.SetStatus('?') 262 263 return 1
264 265
266 - def QRemove(self,jobid=None):
267 """ 268 Remove cluster or job from queue 269 """ 270 271 cmd = self.queue_rm_cmd 272 if jobid: cmd += " %s" % jobid 273 else: cmd += " " + " ".join(self.job_ids) 274 275 logger.info(cmd) 276 status = '' 277 handle = os.popen(cmd, 'r') 278 status = string.join(handle.readlines()) 279 handle.close() 280 281 if status.find('Job has already finished') != -1: 282 cmd = self.queue_clean_cmd 283 if jobid: cmd += " %s" % jobid 284 else: cmd += " " + " ".join(self.job_ids) 285 286 logger.info(cmd) 287 handle = os.popen(cmd, 'r') 288 status = string.join(handle.readlines()) 289 handle.close() 290 291 return status
292 293
294 - def Clean(self,jobdict):
295 """ 296 remove job from queue 297 """ 298 299 job_id = jobdict['grid_queue_id'] 300 logger.info('removing job: %s' % job_id) 301 302 self.QRemove(job_id)
303