1
2
3
4 """
5 Interface for configuring and running python filtering scripts
6
7 copyright (c) 2005 the icecube collaboration
8
9 @version: $Revision: $
10 @date: $Date: $
11 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
12 """
13
14 import os
15 import re
16 import sys
17 import string
18 import os.path
19 from os.path import expandvars
20 import iceprod.modules
21 from ipmodule import IPBaseClass
22 from gsiftp import URLCopy,TrackURLCopy
23 from iceprod.core import odict,functions
24 import logging
25
27 """untar a file and return the archive list"""
28 import tarfile
29 tar = tarfile.open(file)
30 outfiles = []
31 for tarinfo in tar:
32 outfiles.append(tarinfo.name)
33 tar.extract(tarinfo.name)
34 tar.close()
35 return outfiles
36
38 """
39 This class provides an interface for preprocessing files in iceprod
40 """
41
43 IPBaseClass.__init__(self)
44 self.AddParameter('IPModuleURL','SVN URL of python script', '')
45 self.AddParameter('IPModuleDependencies','SVN python dependencies', [])
46 self.AddParameter('IPModuleRevision','SVN revision of python script',0)
47 self.AddParameter('IPModuleClass','class to load','')
48 self.AddParameter('IPModuleCache','should cache downloads',True)
49 self.logger = logging.getLogger('iceprod::IPIceTray')
50
51 self.child_parameters = odict.OrderedDict()
52
54 """
55 Overload SetParameter in order to allow for undefined parameters
56 """
57 if param.lower() in self.parameters.keys():
58 IPBaseClass.SetParameter(self,param,value)
59 else:
60 self.child_parameters[param] = value
61
62
92
94 """
95 This class provides an interface for preprocessing files in iceprod
96 """
97
99
100 import copy
101 IceTray.__init__(self)
102 TrackURLCopy.__init__(self)
103
104 self.AddParameter('NameOfInputFileList',
105 'Name of input file list to pass to child module',
106 'InputFileList')
107 self.AddParameter('OutFilePattern','Name or pattern of ouput files to transfer on completion','')
108 self.logger = logging.getLogger('iceprod-modules::i3.Processing')
109
111 """
112 Overload SetParameter in order to allow for undefined parameters
113 """
114 if param.lower() in self.parameters.keys():
115 IPBaseClass.SetParameter(self,param,value)
116 else:
117 self.child_parameters[param] = value
118
119
121 if not IPBaseClass.Execute(self,stats): return 0
122 import xmlrpclib
123 import cPickle
124 import time
125 import glob
126
127
128 sys.path.insert(0,os.getcwd())
129
130 module_url = self.GetParameter('IPModuleURL')
131 classname = self.GetParameter('IPModuleClass')
132 cache_src = self.GetParameter('IPModuleCache')
133 filt = os.path.basename(module_url)
134
135
136 monitor_url = self.GetParameter('monitorURL')
137 src = self.GetParameter('source')
138 dest = self.GetParameter('destination')
139 datasetid = int(self.GetParameter('dataset'))
140 jobid = int(self.GetParameter('job'))
141 passcode = self.GetParameter('key')
142
143 outfile_pat = self.GetParameter('OutFilePattern')
144
145 starttime = time.time()
146
147 print "module_url", module_url
148 if functions.wget(module_url,cache=cache_src):
149 raise Exception, "Failed to retrieve i3filter from '%s'" % module_url
150
151 for dep in self.GetParameter("IPModuleDependencies"):
152 depurl = dep
153 if not functions.isurl(depurl):
154 depurl = os.path.join(os.path.dirname(url),dep)
155 if functions.wget(depurl,cache=cache_src):
156 raise Exception, "Failed to retrieve i3filter from '%s'" % depurl
157
158
159 mod = iceprod.modules.get_plugin(classname)()
160 mod.SetParser(self.parser)
161
162 server = xmlrpclib.ServerProxy(monitor_url)
163 files = cPickle.loads(server.get_storage_url(datasetid,jobid,passcode,'INPUT'))
164
165 input_file_list = []
166 self.SetParameter('destination',expandvars("file:$PWD/"))
167 for file in files:
168 self.SetParameter('source',os.path.join(file['path'],file['name']))
169
170 URLCopy.Execute(self,stats)
171 md5sum = functions.md5sum(file['name'])
172 filesize = float(os.path.getsize(file['name']))
173
174 if md5sum != file['md5sum'].replace(':',''):
175 self.logger.error('md5sum mismatch %s : %s - for %s' % (file['md5sum'],md5sum,file['name']))
176 raise Exception, 'md5sum mismatch %s : %s - for %s' % (file['md5sum'],md5sum,file['name'])
177
178 input_file_list.append(file['name'])
179
180
181 for name,value in self.child_parameters.items():
182 mod.SetParameter(name,value)
183
184 listname = self.GetParameter('NameOfInputFileList')
185 mod.SetParameter(listname,input_file_list)
186
187 retval = mod.Execute(stats)
188
189 self.SetParameter('destination',dest)
190
191
192 for outfile in glob.glob(outfile_pat):
193 if not outfile in input_file_list:
194 self.SetParameter('source',expandvars(os.path.join("file:$PWD",outfile)))
195 TrackURLCopy.Execute(self,stats)
196
197
198 self.SetParameter('source',expandvars("file:$ICEPROD_STDOUT"))
199 URLCopy.Execute(self,stats)
200 self.SetParameter('source',expandvars("file:$ICEPROD_STDERR"))
201 URLCopy.Execute(self,stats)
202 icetraylog = expandvars("$I3_TOPDIR/icetray.%06u.log" % jobid)
203 if os.path.exists(icetraylog):
204 self.SetParameter('source',expandvars("file:%s" % icetraylog))
205 URLCopy.Execute(self,stats)
206
207 return retval
208
209
211 """
212 This class provides an interface for L2 processing in iceprod
213 """
214
216 IceTray.__init__(self)
217 TrackURLCopy.__init__(self)
218
219 self.AddParameter('mc','Simulation?',False)
220 self.AddParameter('outfile','Name to use for outfile',
221 'Level2_IC86.2012_data_Run0_Part0')
222 self.AddParameter('compression','Set compression of i3 files','bz2')
223 self.AddParameter('writeIceTop','Write IceTop output?',False)
224 self.AddParameter('writeEHE','Write EHE output?',False)
225 self.AddParameter('writeSLOP','Write SLOP output?',False)
226 self.AddParameter('writeRoot','Write Root output?',False)
227 self.AddParameter('writeDST','Write DST output?',False)
228 self.AddParameter('writeGaps','Write gaps output?',False)
229 self.AddParameter('photonicsdir','Set photonics directory',False)
230
231 self.logger = logging.getLogger('iceprod-modules::i3.L2Processing')
232
233
234 SetParameter = IPBaseClass.SetParameter
235
237 if not IPBaseClass.Execute(self,stats): return 0
238 import xmlrpclib
239 import cPickle
240 import time
241 import glob
242
243
244 sys.path.insert(0,os.getcwd())
245
246 module_url = self.GetParameter('IPModuleURL')
247 classname = self.GetParameter('IPModuleClass')
248 cache_src = self.GetParameter('IPModuleCache')
249
250
251 monitor_url = self.GetParameter('monitorURL')
252 src = self.GetParameter('source')
253 dest = self.GetParameter('destination')
254 datasetid = int(self.GetParameter('dataset'))
255 jobid = int(self.GetParameter('job'))
256 passcode = self.GetParameter('key')
257
258
259 print "module_url", module_url
260 if functions.wget(module_url,cache=cache_src):
261 raise Exception, "Failed to retrieve i3filter from '%s'" % module_url
262
263 for dep in self.GetParameter("IPModuleDependencies"):
264 depurl = dep
265 if not functions.isurl(depurl):
266 depurl = os.path.join(os.path.dirname(url),dep)
267 if functions.wget(depurl,cache=cache_src):
268 raise Exception, "Failed to retrieve i3filter from '%s'" % depurl
269
270 mod = iceprod.modules.get_plugin(classname)
271 mod.SetParser(self.parser)
272
273
274 server = xmlrpclib.ServerProxy(monitor_url)
275 files = cPickle.loads(server.get_storage_url(datasetid,jobid,passcode,'INPUT'))
276
277
278 gcd_file = ''
279 input_files = []
280 self.SetParameter('destination',expandvars("file:$PWD/"))
281 for file in files:
282 self.SetParameter('source',os.path.join(file['path'],file['name']))
283 URLCopy.Execute(self,stats)
284 md5sum = functions.md5sum(file['name'])
285 filesize = float(os.path.getsize(file['name']))
286 if md5sum != file['md5sum'].replace(':',''):
287 self.logger.error('md5sum mismatch %s : %s - for %s' % (file['md5sum'],md5sum,file['name']))
288 raise Exception, 'md5sum mismatch %s : %s - for %s' % (file['md5sum'],md5sum,file['name'])
289 if '.tar' in file['name']:
290
291 file_list = untar(file['name'])
292 else:
293 file_list = [file['name']]
294
295
296 for f in file_list:
297 if 'GCD' in f or 'gcd' in f:
298 gcd_file = f
299 elif '.i3' in f:
300 input_files.append(f)
301
302
303
304 compression = self.GetParameter('compression')
305 if compression[0] == '.':
306 compression = compression[1:]
307 if compression not in ('gz','bz2','xz','lzma'):
308 raise Exception, 'compression type %s not supported'%compression
309
310
311 output_file = self.GetParameter('outfile')
312 output_file_main = output_file+'.i3.'+compression
313 icetopoutput = output_file+'_IT'+'.i3.'+compression
314 eheoutput = output_file+'_EHE'+'.i3.'+compression
315 slopoutput = output_file+'_SLOP'+'.i3.'+compression
316 rootoutput = output_file+'.root'
317 dstoutput = output_file+'_DST.root'
318 gapsoutput = output_file+'_gaps.txt'
319 outfile_list = []
320
321
322 mod.SetParameter('gcdfile',gcd_file)
323 mod.SetParameter('infile',input_files)
324 mod.SetParameter('outfile',output_file_main)
325 outfile_list.append(output_file_main)
326
327 if self.GetParameter('writeIceTop'):
328 mod.SetParameter('icetopoutput',icetopoutput)
329 outfile_list.append(icetopoutput)
330 if self.GetParameter('writeEHE'):
331 mod.SetParameter('eheoutput',eheoutput)
332 outfile_list.append(eheoutput)
333 if self.GetParameter('writeSLOP'):
334 mod.SetParameter('slopoutput',slopoutput)
335 outfile_list.append(slopoutput)
336 if self.GetParameter('writeRoot'):
337 mod.SetParameter('rootoutput',rootoutput)
338 outfile_list.append(rootoutput)
339 if self.GetParameter('writeDST'):
340 mod.SetParameter('dstfile',dstoutput)
341 outfile_list.append(dstoutput)
342 if self.GetParameter('writeGaps'):
343 mod.SetParameter('gapsfile',gapsoutput)
344 outfile_list.append(gapsoutput)
345
346 photonics = self.GetParameter('photonicsdir')
347 if photonics:
348 mod.SetParameter('photonicsdir',photonics)
349
350
351 retval = mod.Execute(stats)
352
353
354 self.SetParameter('destination',dest)
355 for outfile in outfile_list:
356 self.SetParameter('source',expandvars(os.path.join("file:$PWD",outfile)))
357 TrackURLCopy.Execute(self,stats)
358
359
360 sys.stdout.flush()
361 sys.stderr.flush()
362 self.SetParameter('source',expandvars("file:$ICEPROD_STDOUT"))
363 self.SetParameter('destination',os.path.join(dest,output_file+'.logout'))
364 URLCopy.Execute(self,stats)
365 self.SetParameter('source',expandvars("file:$ICEPROD_STDERR"))
366 self.SetParameter('destination',os.path.join(dest,output_file+'.logerr'))
367 URLCopy.Execute(self,stats)
368 icetraylog = expandvars("$I3_TOPDIR/icetray.%06u.log" % jobid)
369 if os.path.exists(icetraylog):
370 self.SetParameter('source',expandvars("file:%s" % icetraylog))
371 self.SetParameter('destination',os.path.join(dest,output_file+'.logicetray'))
372 URLCopy.Execute(self,stats)
373
374 return retval
375