1
2
3 """
4 Interface for configuring pre/post icetray scripts
5
6 copyright (c) 2005 the icecube collaboration
7
8 @version: $Revision: $
9 @date: $Date: $
10 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
11 """
12
13 import os,sys
14 import time
15 import string
16 import glob
17 import commands
18 from os.path import expandvars,basename,join
19 from ipmodule import IPBaseClass
20 from iceprod.core.inventory import FileInventory
21 from iceprod.core import functions
22 import xmlrpclib
23 import logging
24
25 -def boolcast(s): return s not in ["False","0","","[]"]
26
27
29 """
30 This class provides an interface for preprocessing files in iceprod
31 """
32
34 IPBaseClass.__init__(self)
35 self.executable = 'globus-url-copy'
36 self.AddParameter('source','source URL to copy','')
37 self.AddParameter('destination','detination URL to copy to','')
38 self.AddParameter('certdir','certificate directory','')
39 self.AddParameter('proxyfile','File path to globus proxy','$X509_USER_PROXY')
40 self.AddParameter('ldpath','library path to globus','globus/lib')
41 self.AddParameter('path','path to globus bin directory','globus/bin')
42 self.AddParameter('opts','globus-url-copy options',
43 ['-rst','-cd','-r','-nodcau','-rst-retries 5','-rst-interval 60']
44 )
45 self.AddParameter('executable','name of gridFTP executable','globus-url-copy')
46 self.AddParameter('StorageElement','LFN SE','')
47 self.AddParameter('lfn-opts','LFN Options','')
48 self.AddParameter('inventory','File with source dest mappings','$I3_TOPDIR/inventory.xml')
49 self.AddParameter('emulate',"Don't actually transfer files. Just write inventory",False)
50 self.logger = logging.getLogger('iceprod::URLCopy')
51
53 """
54 remove file at destination if it exists
55 """
56 try:
57 filename = basename(src)
58 fulldest = dest
59 if basename(dest) != filename:
60 fulldest = join(dest,filename)
61
62 self.logger.info("filename: %s" % filename)
63 self.logger.info("fulldest: %s" % fulldest)
64
65
66 parse_path=fulldest.rsplit('/')
67 hostname=parse_path[2]
68 file_path=fulldest.rsplit('/')[3:]
69 file_name_path='/'+'/'.join(file_path)
70
71 self.logger.info("hostname: %s" % hostname)
72 self.logger.info("file_name_path: %s" % file_name_path)
73
74
75 self.logger.info("checking if file exists at target: %s" % fulldest)
76
77
78 status, output = commands.getstatusoutput('uberftp '+hostname+' " dir '+file_name_path+' " ')
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 output_from_uberftp=output.split('\r\n')
97 if len(output_from_uberftp)==3:
98 status=0
99 if len(output_from_uberftp)!=3:
100 status=1
101
102 self.logger.info("after checking if file exists at target: %s" % fulldest)
103 self.logger.info("status: %s" % status)
104 self.logger.info("output: %s" % output)
105
106
107 self.logger.info("try to removing file at target test: %s" % fulldest)
108
109 status, output = commands.getstatusoutput('uberftp '+hostname+' " rm '+file_name_path+' " ')
110 self.logger.info("status: %s" % status)
111 self.logger.info("output: %s" % output)
112
113
114
115 if not int(status):
116 self.logger.info("removing file at target: %s" % fulldest)
117
118 status, output = commands.getstatusoutput('uberftp '+hostname+' " rm '+file_name_path+' " ')
119
120 self.logger.info(" file removed at target: %s" % fulldest)
121 self.logger.info("status: %s" % status)
122 self.logger.info("output: %s" % output)
123
124 if status:
125 self.logger.error("could not delete existing file1: %s" % output)
126 self.logger.debug(output)
127
128
129
130
131
132
133
134 return status
135 except Exception,e:
136 self.logger.error(e)
137 self.logger.error("could not delete existing file2")
138 return 1
139
140
141
142
144 if not IPBaseClass.Execute(self,stats): return 0
145
146 if not stats.has_key('data-out'): stats['data-out'] = 0
147 if not stats.has_key('data-in'): stats['data-in'] = 0
148
149 src = self.GetParameter('source')
150 dest = self.GetParameter('destination')
151 proxy = self.GetParameter('proxyfile')
152 opts = self.GetParameter('opts')
153 ldpath = self.GetParameter('ldpath')
154 path = self.GetParameter('path')
155 inventory= self.GetParameter('inventory')
156 emulate = self.GetParameter('emulate')
157 exe = self.GetParameter('executable')
158 se = self.GetParameter('StorageElement')
159 lfnopts = self.GetParameter('lfn-opts')
160
161 try:
162 ignore_errors = boolcast(self.parser.parse("$system(ignore_gftp_errors)"))
163 except:
164 ignore_errors = False
165
166
167 if src.startswith('lfn:') or dest.startswith('lfn:'):
168 self.logger.info("detected LFN URL. Hading control to lfn module")
169 from lfn import LFN_CR_Copy
170 ipmod = LFN_CR_Copy()
171 ipmod.SetParameter('source',src)
172 ipmod.SetParameter('destinationPATH',dest)
173 if se:
174 ipmod.SetParameter('destination',se)
175 if lfnopts:
176 ipmod.SetParameter('opts',lfnopts)
177 return ipmod.Execute(stats)
178
179
180 certdir = self.GetParameter('certdir')
181 if certdir and os.path.exists(expandvars(certdir)):
182 os.putenv('X509_CERT_DIR',expandvars(certdir))
183
184 inventory = expandvars(inventory)
185 oi = FileInventory()
186 if os.path.exists(inventory):
187 oi.Read(inventory)
188
189 proxyfile=expandvars(proxy)
190 if os.path.exists(proxyfile):
191 os.putenv('X509_USER_PROXY',proxyfile)
192 os.chmod(proxyfile,0600)
193 os.putenv('LD_LIBRARY_PATH',expandvars("%s:$LD_LIBRARY_PATH" % ldpath))
194 os.putenv('PATH',expandvars("%s:$PATH" % path))
195
196 cmd = []
197 cmd.append(exe)
198 cmd.extend(opts)
199 cmd.append(src)
200 cmd.append(dest)
201 cmd = " ".join(cmd)
202
203 if not emulate:
204 if dest.startswith("gsiftp:"):
205
206 status = self.CleanDestination(src,dest)
207
208
209
210 status, output = commands.getstatusoutput(cmd)
211 if ignore_errors: status = 0
212
213 if status:
214 self.logger.error("Failed to execute command '%s',%s" % (cmd,output))
215 raise Exception, "Failed to execute command '%s',%s" % (cmd,output)
216 self.logger.info(output)
217 try:
218 if dest.startswith('gsiftp:') and src.startswith('file:'):
219 stats['data-out'] += float(os.path.getsize(expandvars(src.replace('file:',''))))
220 elif src.startswith('gsiftp:') and dest.startswith('file:'):
221 stats['data-in'] += float( os.path.getsize(
222 os.path.join(expandvars(dest.replace('file:','')),basename(src))
223 ))
224 except: pass
225 else:
226 oi.AddFile(src,dest)
227 oi.Write(inventory)
228 return 0
229
230
232 """
233 This class provides an interface for preprocessing files in iceprod
234 """
235
239
240
242 if not IPBaseClass.Execute(self,stats): return 0
243
244 if not stats.has_key('data-out'): stats['data-out'] = 0
245 if not stats.has_key('data-in'): stats['data-in'] = 0
246
247 src = self.GetParameter('source')
248 dest = self.GetParameter('destination')
249 proxy = self.GetParameter('proxyfile')
250 opts = self.GetParameter('opts')
251 ldpath = self.GetParameter('ldpath')
252 path = self.GetParameter('path')
253 inventory= self.GetParameter('inventory')
254 emulate = self.GetParameter('emulate')
255 exe = self.GetParameter('executable')
256
257
258 certdir = self.GetParameter('certdir')
259 if certdir:
260 os.putenv('X509_CERT_DIR',expandvars(certdir))
261
262 try:
263 ignore_errors = boolcast(self.parser.parse("$system(ignore_gftp_errors)"))
264 except:
265 ignore_errors = False
266
267 proxyfile=expandvars(proxy)
268 if os.path.exists(proxyfile):
269 os.putenv('X509_USER_PROXY',proxyfile)
270 os.chmod(proxyfile,0600)
271 os.putenv('LD_LIBRARY_PATH',os.path.expandvars("%s:$LD_LIBRARY_PATH" % ldpath))
272 os.putenv('PATH',os.path.expandvars("%s:$PATH" % path))
273
274 oi = FileInventory()
275 inventory = expandvars(inventory)
276 if os.path.exists(inventory):
277 oi.Read(inventory)
278
279 retval = 0
280 if src.startswith('file:'):
281 for file in glob.glob(expandvars(src.replace('file:',''))):
282
283 oi.AddFile(file,dest)
284 cmd = []
285 cmd.append(exe)
286 cmd.extend(opts)
287 cmd.append('file://'+os.path.abspath(os.path.normpath(file)))
288 cmd.append(dest)
289 cmd = " ".join(cmd)
290 if not emulate:
291 self.logger.info(cmd)
292 status, output = commands.getstatusoutput(cmd)
293 self.logger.info(output)
294 if status:
295 self.logger.error("Failed to execute command '%s',%s" % (cmd,output))
296 raise Exception, "Failed to execute command '%s',%s" % (cmd,output)
297 else:
298 oi.AddFile(src,dest)
299 cmd = []
300 cmd.append(exe)
301 cmd.extend(opts)
302 cmd.append(src)
303 cmd.append(dest)
304 cmd = " ".join(cmd)
305 if not emulate:
306 self.logger.info(cmd)
307 status, output = commands.getstatusoutput(cmd)
308 self.logger.info(output)
309 if ignore_errors: status = 0
310
311 if status:
312 self.logger.error("Failed to execute command '%s',%s" % (cmd,output))
313 raise Exception, "Failed to execute command '%s',%s" % (cmd,output)
314
315 if emulate:
316 oi.Write(inventory)
317 return 0
318
319
321 """
322 This class provides an interface for preprocessing files in iceprod
323 """
324
329
330
332 if not IPBaseClass.Execute(self,stats): return 0
333
334 if not stats.has_key('data-out'): stats['data-out'] = 0
335 if not stats.has_key('data-in'): stats['data-in'] = 0
336
337 src = self.GetParameter('sourcelist')
338 dest = self.GetParameter('destination')
339 proxy = self.GetParameter('proxyfile')
340 opts = self.GetParameter('opts')
341 ldpath = self.GetParameter('ldpath')
342 path = self.GetParameter('path')
343 inventory= self.GetParameter('inventory')
344 emulate = self.GetParameter('emulate')
345 exe = self.GetParameter('executable')
346
347
348 certdir = self.GetParameter('certdir')
349 if certdir:
350 os.putenv('X509_CERT_DIR',expandvars(certdir))
351
352 try:
353 ignore_errors = boolcast(self.parser.parse("$system(ignore_gftp_errors)"))
354 except:
355 ignore_errors = False
356
357 proxyfile=expandvars(proxy)
358 if os.path.exists(proxyfile):
359 os.putenv('X509_USER_PROXY',proxyfile)
360 os.chmod(proxyfile,0600)
361 os.putenv('LD_LIBRARY_PATH',os.path.expandvars("%s:$LD_LIBRARY_PATH" % ldpath))
362 os.putenv('PATH',os.path.expandvars("%s:$PATH" % path))
363
364 oi = FileInventory()
365 inventory = expandvars(inventory)
366 if os.path.exists(inventory):
367 oi.Read(inventory)
368
369 retval = 0
370 for file in src:
371 oi.AddFile(file,dest)
372 cmd = []
373 cmd.append(exe)
374 cmd.extend(opts)
375 cmd.append(expandvars(file))
376 cmd.append(dest)
377 cmd = " ".join(cmd)
378
379 if not emulate:
380 self.logger.info(cmd)
381 status, output = commands.getstatusoutput(cmd)
382 self.logger.info(output)
383 if ignore_errors: status = 0
384
385 if status:
386 self.logger.error("Failed to execute command '%s',%s" % (cmd,output))
387 raise Exception, "Failed to execute command '%s',%s" % (cmd,output)
388 try:
389 if dest.startswith('gsiftp:') and src.startswith('file:'):
390 stats['data-out'] += float(os.path.getsize(expandvars(src.replace('file:',''))))
391 elif src.startswith('gsiftp:') and dest.startswith('file:'):
392 stats['data-in'] += float( os.path.getsize(
393 os.path.join(expandvars(dest.replace('file:','')),basename(src))
394 ))
395 except: pass
396 if emulate:
397 oi.Write(inventory)
398 return 0
399
401 """
402 This class provides an interface for preprocessing files in iceprod
403 It also tracks the destination of files in the monitoring database
404 throught the soapmon server.
405 """
406
408 URLCopy.__init__(self)
409 self.logger = logging.getLogger('iceprod::TrackURLCopy')
410 self.AddParameter(
411 'monitorURL',
412 'soapmon url',
413 'http://x2100.icecube.wisc.edu/cgi-bin/simulation/mon/soapmon-cgi')
414 self.AddParameter('dataset','dataset ID',0)
415 self.AddParameter('job','job ID',0)
416 self.AddParameter('key','Temporary password for soapmon','')
417
419 if not IPBaseClass.Execute(self,stats): return 0
420
421 if not stats.has_key('data-out'): stats['data-out'] = 0
422 if not stats.has_key('data-in'): stats['data-in'] = 0
423
424 url = self.GetParameter('monitorURL')
425 src = self.GetParameter('source')
426 dest = self.GetParameter('destination')
427 datasetid = int(self.GetParameter('dataset'))
428 jobid = int(self.GetParameter('job'))
429 passcode = self.GetParameter('key')
430 starttime = time.time()
431 if not URLCopy.Execute(self,stats):
432 md5sum = ''
433 filesize = 0.
434 transfertime = time.time() - starttime
435
436 if src.startswith('file:'):
437 md5sum = functions.md5sum(expandvars(src.replace('file:','')))
438 filesize = float(os.path.getsize(expandvars(src.replace('file:',''))))
439 server = xmlrpclib.ServerProxy(url)
440 if dest.endswith('/'):
441 dest += basename(src)
442 self.logger.info('%s %s' % (dest,md5sum))
443 if not server.AddFileURL(datasetid,jobid,dest,md5sum,filesize,transfertime,passcode):
444 raise Exception, "Failed to set URL for for %s -> %s" % (src,dest)
445 return 0
446 return 1
447
449 """
450 This class provides an interface for preprocessing files in iceprod
451 It also tracks the destination of files in the monitoring database
452 throught the soapmon server.
453 """
454
461
477