Package iceprod :: Package modules :: Module i3
[hide private]
[frames] | no frames]

Source Code for Module iceprod.modules.i3

  1   
  2  #!/bin/env python 
  3  # 
  4  """ 
  5   Interface for configuring and running python filtering scripts  
  6   
  7   copyright  (c) 2005 the icecube collaboration 
  8   
  9   @version: $Revision: $ 
 10   @date: $Date: $ 
 11   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 12  """ 
 13   
 14  import os 
 15  import re 
 16  import sys 
 17  import string 
 18  import os.path 
 19  from os.path import expandvars 
 20  import iceprod.modules 
 21  from ipmodule import IPBaseClass 
 22  from gsiftp import URLCopy,TrackURLCopy 
 23  from iceprod.core import odict,functions 
 24  import logging 
 25   
26 -def untar(file):
27 """untar a file and return the archive list""" 28 import tarfile 29 tar = tarfile.open(file) 30 outfiles = [] 31 for tarinfo in tar: 32 outfiles.append(tarinfo.name) 33 tar.extract(tarinfo.name) 34 tar.close() 35 return outfiles
36
37 -class IceTray(IPBaseClass):
38 """ 39 This class provides an interface for preprocessing files in iceprod 40 """ 41
42 - def __init__(self):
43 IPBaseClass.__init__(self) 44 self.AddParameter('IPModuleURL','SVN URL of python script', '') 45 self.AddParameter('IPModuleDependencies','SVN python dependencies', []) 46 self.AddParameter('IPModuleRevision','SVN revision of python script',0) 47 self.AddParameter('IPModuleClass','class to load','') 48 self.AddParameter('IPModuleCache','should cache downloads',True) 49 self.logger = logging.getLogger('iceprod::IPIceTray') 50 51 self.child_parameters = odict.OrderedDict()
52
53 - def SetParameter(self, param, value):
54 """ 55 Overload SetParameter in order to allow for undefined parameters 56 """ 57 if param.lower() in self.parameters.keys(): 58 IPBaseClass.SetParameter(self,param,value) 59 else: 60 self.child_parameters[param] = value
61 62
63 - def Execute(self,stats):
64 if not IPBaseClass.Execute(self,stats): return 0 65 66 # set python path 67 sys.path.insert(0,os.getcwd()) 68 69 url = self.GetParameter('IPModuleURL') 70 classname = self.GetParameter('IPModuleClass') 71 cache_src = self.GetParameter('IPModuleCache') 72 filt = os.path.basename(url) 73 74 if functions.wget(url,cache=cache_src): 75 raise Exception, "Failed to retrieve i3filter from '%s'" % url 76 77 for dep in self.GetParameter("IPModuleDependencies"): 78 depurl = dep 79 if not functions.isurl(depurl): 80 depurl = os.path.join(os.path.dirname(url),dep) 81 if functions.wget(depurl,cache=cache_src): 82 raise Exception, "Failed to retrieve i3filter from '%s'" % depurl 83 84 mod = iceprod.modules.get_plugin(classname)() 85 mod.SetParser(self.parser) 86 87 # Pass parameters to module 88 for name,value in self.child_parameters.items(): 89 mod.SetParameter(name,value) 90 91 return mod.Execute(stats)
92
93 -class Processing(IceTray,TrackURLCopy):
94 """ 95 This class provides an interface for preprocessing files in iceprod 96 """ 97
98 - def __init__(self):
99 100 import copy 101 IceTray.__init__(self) 102 TrackURLCopy.__init__(self) 103 104 self.AddParameter('NameOfInputFileList', 105 'Name of input file list to pass to child module', 106 'InputFileList') 107 self.AddParameter('OutFilePattern','Name or pattern of ouput files to transfer on completion','') 108 self.logger = logging.getLogger('iceprod-modules::i3.Processing')
109
110 - def SetParameter(self, param, value):
111 """ 112 Overload SetParameter in order to allow for undefined parameters 113 """ 114 if param.lower() in self.parameters.keys(): 115 IPBaseClass.SetParameter(self,param,value) 116 else: 117 self.child_parameters[param] = value
118 119
120 - def Execute(self,stats):
121 if not IPBaseClass.Execute(self,stats): return 0 122 import xmlrpclib 123 import cPickle 124 import time 125 import glob 126 127 # set python path 128 sys.path.insert(0,os.getcwd()) 129 130 module_url = self.GetParameter('IPModuleURL') 131 classname = self.GetParameter('IPModuleClass') 132 cache_src = self.GetParameter('IPModuleCache') 133 filt = os.path.basename(module_url) 134 135 # SoapMon configuration 136 monitor_url = self.GetParameter('monitorURL') 137 src = self.GetParameter('source') 138 dest = self.GetParameter('destination') 139 datasetid = int(self.GetParameter('dataset')) 140 jobid = int(self.GetParameter('job')) 141 passcode = self.GetParameter('key') 142 143 outfile_pat = self.GetParameter('OutFilePattern') 144 145 starttime = time.time() 146 147 print "module_url", module_url 148 if functions.wget(module_url,cache=cache_src): 149 raise Exception, "Failed to retrieve i3filter from '%s'" % module_url 150 151 for dep in self.GetParameter("IPModuleDependencies"): 152 depurl = dep 153 if not functions.isurl(depurl): 154 depurl = os.path.join(os.path.dirname(url),dep) 155 if functions.wget(depurl,cache=cache_src): 156 raise Exception, "Failed to retrieve i3filter from '%s'" % depurl 157 158 159 mod = iceprod.modules.get_plugin(classname)() 160 mod.SetParser(self.parser) 161 162 server = xmlrpclib.ServerProxy(monitor_url) 163 files = cPickle.loads(server.get_storage_url(datasetid,jobid,passcode,'INPUT')) 164 165 input_file_list = [] 166 self.SetParameter('destination',expandvars("file:$PWD/")) 167 for file in files: # fetch input files 168 self.SetParameter('source',os.path.join(file['path'],file['name'])) 169 170 URLCopy.Execute(self,stats) 171 md5sum = functions.md5sum(file['name']) 172 filesize = float(os.path.getsize(file['name'])) 173 174 if md5sum != file['md5sum'].replace(':',''): 175 self.logger.error('md5sum mismatch %s : %s - for %s' % (file['md5sum'],md5sum,file['name'])) 176 raise Exception, 'md5sum mismatch %s : %s - for %s' % (file['md5sum'],md5sum,file['name']) 177 178 input_file_list.append(file['name']) 179 180 # Pass parameters to module 181 for name,value in self.child_parameters.items(): 182 mod.SetParameter(name,value) 183 184 listname = self.GetParameter('NameOfInputFileList') 185 mod.SetParameter(listname,input_file_list) 186 187 retval = mod.Execute(stats) 188 189 self.SetParameter('destination',dest) # set original destination 190 191 # Upload output files 192 for outfile in glob.glob(outfile_pat): # upload output files 193 if not outfile in input_file_list: 194 self.SetParameter('source',expandvars(os.path.join("file:$PWD",outfile))) 195 TrackURLCopy.Execute(self,stats) 196 197 # Finally upload log files 198 self.SetParameter('source',expandvars("file:$ICEPROD_STDOUT")) 199 URLCopy.Execute(self,stats) 200 self.SetParameter('source',expandvars("file:$ICEPROD_STDERR")) 201 URLCopy.Execute(self,stats) 202 icetraylog = expandvars("$I3_TOPDIR/icetray.%06u.log" % jobid) 203 if os.path.exists(icetraylog): 204 self.SetParameter('source',expandvars("file:%s" % icetraylog)) 205 URLCopy.Execute(self,stats) 206 207 return retval
208 209
210 -class L2Processing(IceTray,TrackURLCopy):
211 """ 212 This class provides an interface for L2 processing in iceprod 213 """ 214
215 - def __init__(self):
216 IceTray.__init__(self) 217 TrackURLCopy.__init__(self) 218 219 self.AddParameter('mc','Simulation?',False) 220 self.AddParameter('outfile','Name to use for outfile', 221 'Level2_IC86.2012_data_Run0_Part0') 222 self.AddParameter('compression','Set compression of i3 files','bz2') 223 self.AddParameter('writeIceTop','Write IceTop output?',False) 224 self.AddParameter('writeEHE','Write EHE output?',False) 225 self.AddParameter('writeSLOP','Write SLOP output?',False) 226 self.AddParameter('writeRoot','Write Root output?',False) 227 self.AddParameter('writeDST','Write DST output?',False) 228 self.AddParameter('writeGaps','Write gaps output?',False) 229 self.AddParameter('photonicsdir','Set photonics directory',False) 230 231 self.logger = logging.getLogger('iceprod-modules::i3.L2Processing')
232 233 # restore original SetParameter to raise an error on undefined parameters 234 SetParameter = IPBaseClass.SetParameter 235
236 - def Execute(self,stats):
237 if not IPBaseClass.Execute(self,stats): return 0 238 import xmlrpclib 239 import cPickle 240 import time 241 import glob 242 243 # set python path 244 sys.path.insert(0,os.getcwd()) 245 246 module_url = self.GetParameter('IPModuleURL') 247 classname = self.GetParameter('IPModuleClass') 248 cache_src = self.GetParameter('IPModuleCache') 249 250 # SoapMon configuration 251 monitor_url = self.GetParameter('monitorURL') 252 src = self.GetParameter('source') 253 dest = self.GetParameter('destination') 254 datasetid = int(self.GetParameter('dataset')) 255 jobid = int(self.GetParameter('job')) 256 passcode = self.GetParameter('key') 257 258 # get scripts 259 print "module_url", module_url 260 if functions.wget(module_url,cache=cache_src): 261 raise Exception, "Failed to retrieve i3filter from '%s'" % module_url 262 263 for dep in self.GetParameter("IPModuleDependencies"): 264 depurl = dep 265 if not functions.isurl(depurl): 266 depurl = os.path.join(os.path.dirname(url),dep) 267 if functions.wget(depurl,cache=cache_src): 268 raise Exception, "Failed to retrieve i3filter from '%s'" % depurl 269 270 mod = iceprod.modules.get_plugin(classname) 271 mod.SetParser(self.parser) 272 273 # get GCD and input files from the DB urlpath table 274 server = xmlrpclib.ServerProxy(monitor_url) 275 files = cPickle.loads(server.get_storage_url(datasetid,jobid,passcode,'INPUT')) 276 277 # copy the GCD and input files to the local directory 278 gcd_file = '' 279 input_files = [] 280 self.SetParameter('destination',expandvars("file:$PWD/")) 281 for file in files: # fetch input files 282 self.SetParameter('source',os.path.join(file['path'],file['name'])) 283 URLCopy.Execute(self,stats) 284 md5sum = functions.md5sum(file['name']) 285 filesize = float(os.path.getsize(file['name'])) 286 if md5sum != file['md5sum'].replace(':',''): 287 self.logger.error('md5sum mismatch %s : %s - for %s' % (file['md5sum'],md5sum,file['name'])) 288 raise Exception, 'md5sum mismatch %s : %s - for %s' % (file['md5sum'],md5sum,file['name']) 289 if '.tar' in file['name']: 290 # untar the file to make it usable 291 file_list = untar(file['name']) 292 else: 293 file_list = [file['name']] 294 295 # save file names for later use 296 for f in file_list: 297 if 'GCD' in f or 'gcd' in f: 298 gcd_file = f 299 elif '.i3' in f: 300 input_files.append(f) 301 # else the file is probably meta information 302 303 # get compression 304 compression = self.GetParameter('compression') 305 if compression[0] == '.': 306 compression = compression[1:] 307 if compression not in ('gz','bz2','xz','lzma'): 308 raise Exception, 'compression type %s not supported'%compression 309 310 # make the output filenames 311 output_file = self.GetParameter('outfile') 312 output_file_main = output_file+'.i3.'+compression 313 icetopoutput = output_file+'_IT'+'.i3.'+compression 314 eheoutput = output_file+'_EHE'+'.i3.'+compression 315 slopoutput = output_file+'_SLOP'+'.i3.'+compression 316 rootoutput = output_file+'.root' 317 dstoutput = output_file+'_DST.root' 318 gapsoutput = output_file+'_gaps.txt' 319 outfile_list = [] # list for transfer 320 321 # make the module parameters 322 mod.SetParameter('gcdfile',gcd_file) 323 mod.SetParameter('infile',input_files) 324 mod.SetParameter('outfile',output_file_main) 325 outfile_list.append(output_file_main) 326 # check if we want to make special outputs 327 if self.GetParameter('writeIceTop'): 328 mod.SetParameter('icetopoutput',icetopoutput) 329 outfile_list.append(icetopoutput) 330 if self.GetParameter('writeEHE'): 331 mod.SetParameter('eheoutput',eheoutput) 332 outfile_list.append(eheoutput) 333 if self.GetParameter('writeSLOP'): 334 mod.SetParameter('slopoutput',slopoutput) 335 outfile_list.append(slopoutput) 336 if self.GetParameter('writeRoot'): 337 mod.SetParameter('rootoutput',rootoutput) 338 outfile_list.append(rootoutput) 339 if self.GetParameter('writeDST'): 340 mod.SetParameter('dstfile',dstoutput) 341 outfile_list.append(dstoutput) 342 if self.GetParameter('writeGaps'): 343 mod.SetParameter('gapsfile',gapsoutput) 344 outfile_list.append(gapsoutput) 345 # set photonics dir 346 photonics = self.GetParameter('photonicsdir') 347 if photonics: 348 mod.SetParameter('photonicsdir',photonics) 349 350 # execute L2 processing 351 retval = mod.Execute(stats) 352 353 # Upload output files 354 self.SetParameter('destination',dest) # set original destination 355 for outfile in outfile_list: # upload output files 356 self.SetParameter('source',expandvars(os.path.join("file:$PWD",outfile))) 357 TrackURLCopy.Execute(self,stats) 358 359 # Finally upload log files 360 sys.stdout.flush() 361 sys.stderr.flush() 362 self.SetParameter('source',expandvars("file:$ICEPROD_STDOUT")) 363 self.SetParameter('destination',os.path.join(dest,output_file+'.logout')) 364 URLCopy.Execute(self,stats) 365 self.SetParameter('source',expandvars("file:$ICEPROD_STDERR")) 366 self.SetParameter('destination',os.path.join(dest,output_file+'.logerr')) 367 URLCopy.Execute(self,stats) 368 icetraylog = expandvars("$I3_TOPDIR/icetray.%06u.log" % jobid) 369 if os.path.exists(icetraylog): 370 self.SetParameter('source',expandvars("file:%s" % icetraylog)) 371 self.SetParameter('destination',os.path.join(dest,output_file+'.logicetray')) 372 URLCopy.Execute(self,stats) 373 374 return retval
375