Package iceprod :: Package modules :: Module gsiftp
[hide private]
[frames] | no frames]

Source Code for Module iceprod.modules.gsiftp

  1  #!/bin/env python 
  2  # 
  3  """ 
  4   Interface for configuring pre/post icetray scripts 
  5   
  6   copyright  (c) 2005 the icecube collaboration 
  7   
  8   @version: $Revision: $ 
  9   @date: $Date: $ 
 10   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 11  """ 
 12   
 13  import os,sys 
 14  import time 
 15  import string 
 16  import glob 
 17  import commands 
 18  from os.path import expandvars,basename,join 
 19  from ipmodule import IPBaseClass 
 20  from iceprod.core.inventory import FileInventory 
 21  from iceprod.core import functions 
 22  import xmlrpclib 
 23  import logging 
 24   
25 -def boolcast(s): return s not in ["False","0","","[]"]
26 27
28 -class URLCopy(IPBaseClass):
29 """ 30 This class provides an interface for preprocessing files in iceprod 31 """ 32
33 - def __init__(self):
34 IPBaseClass.__init__(self) 35 self.executable = 'globus-url-copy' 36 self.AddParameter('source','source URL to copy','') 37 self.AddParameter('destination','detination URL to copy to','') 38 self.AddParameter('certdir','certificate directory','') 39 self.AddParameter('proxyfile','File path to globus proxy','$X509_USER_PROXY') 40 self.AddParameter('ldpath','library path to globus','globus/lib') 41 self.AddParameter('path','path to globus bin directory','globus/bin') 42 self.AddParameter('opts','globus-url-copy options', 43 ['-rst','-cd','-r','-nodcau','-rst-retries 5','-rst-interval 60'] 44 ) 45 self.AddParameter('executable','name of gridFTP executable','globus-url-copy') 46 self.AddParameter('StorageElement','LFN SE','') 47 self.AddParameter('lfn-opts','LFN Options','') 48 self.AddParameter('inventory','File with source dest mappings','$I3_TOPDIR/inventory.xml') 49 self.AddParameter('emulate',"Don't actually transfer files. Just write inventory",False) 50 self.logger = logging.getLogger('iceprod::URLCopy')
51
52 - def CleanDestination(self,src,dest):
53 """ 54 remove file at destination if it exists 55 """ 56 try: 57 filename = basename(src) 58 fulldest = dest 59 if basename(dest) != filename: 60 fulldest = join(dest,filename) 61 62 self.logger.info("filename: %s" % filename) 63 self.logger.info("fulldest: %s" % fulldest) 64 65 ##DG 66 parse_path=fulldest.rsplit('/') #parse path into single elements 67 hostname=parse_path[2] # extract hostname from path; path format gsiftp://gridftp1.ifh.de/pnfs/... 68 file_path=fulldest.rsplit('/')[3:] #parse path into single elements, string: gsftp://hostname are skipped 69 file_name_path='/'+'/'.join(file_path) #add extra slash at the begining of the path and join strings (create path) 70 71 self.logger.info("hostname: %s" % hostname) 72 self.logger.info("file_name_path: %s" % file_name_path) 73 74 75 self.logger.info("checking if file exists at target: %s" % fulldest) 76 #status, output = commands.getstatusoutput('/usr/bin/uberftp -size '+ fulldest) 77 #NEW DG 78 status, output = commands.getstatusoutput('uberftp '+hostname+' " dir '+file_name_path+' " ') 79 80 # this is a rude implementation based on number of lines 81 ## in output from uberftp command. 82 # 83 ### if FILE NO exist/command fails than 'output' contains 2/or more lines: 84 #220 GSI FTP door ready 85 #200 PASS command successful 86 ### 87 ### but status in this case is 0 but should be 1 88 89 90 #### if file exist than 'output' should cointain exactly 3 lines: 91 #220 GSI FTP door ready 92 #200 PASS command successful 93 #-r-------- 1 /C=DE/O=GermanGrid/OU=DESY/CN=xxxxx xxx 0 xxx 94 95 #redefinition of status 96 output_from_uberftp=output.split('\r\n') 97 if len(output_from_uberftp)==3: # file exist 98 status=0 99 if len(output_from_uberftp)!=3: # file not exists 100 status=1 101 102 self.logger.info("after checking if file exists at target: %s" % fulldest) 103 self.logger.info("status: %s" % status) 104 self.logger.info("output: %s" % output) 105 106 107 self.logger.info("try to removing file at target test: %s" % fulldest) 108 # old status, output = commands.getstatusoutput('uberftp -rm '+ fulldest) 109 status, output = commands.getstatusoutput('uberftp '+hostname+' " rm '+file_name_path+' " ') 110 self.logger.info("status: %s" % status) 111 self.logger.info("output: %s" % output) 112 113 114 115 if not int(status): # ffile exists 116 self.logger.info("removing file at target: %s" % fulldest) 117 # old status, output = commands.getstatusoutput('uberftp -rm '+ fulldest) 118 status, output = commands.getstatusoutput('uberftp '+hostname+' " rm '+file_name_path+' " ') 119 120 self.logger.info(" file removed at target: %s" % fulldest) 121 self.logger.info("status: %s" % status) 122 self.logger.info("output: %s" % output) 123 124 if status: # error occurred 125 self.logger.error("could not delete existing file1: %s" % output) 126 self.logger.debug(output) 127 128 #if status: # file exists 129 #self.logger.info("problem with removing file: %s" % fulldest) 130 #self.logger.info("status: %s" % status) 131 #self.logger.info("output: %s" % output) 132 133 134 return status 135 except Exception,e: 136 self.logger.error(e) 137 self.logger.error("could not delete existing file2") 138 return 1
139 140 141 142
143 - def Execute(self,stats):
144 if not IPBaseClass.Execute(self,stats): return 0 145 146 if not stats.has_key('data-out'): stats['data-out'] = 0 147 if not stats.has_key('data-in'): stats['data-in'] = 0 148 149 src = self.GetParameter('source') 150 dest = self.GetParameter('destination') 151 proxy = self.GetParameter('proxyfile') 152 opts = self.GetParameter('opts') 153 ldpath = self.GetParameter('ldpath') 154 path = self.GetParameter('path') 155 inventory= self.GetParameter('inventory') 156 emulate = self.GetParameter('emulate') 157 exe = self.GetParameter('executable') 158 se = self.GetParameter('StorageElement') 159 lfnopts = self.GetParameter('lfn-opts') 160 161 try: 162 ignore_errors = boolcast(self.parser.parse("$system(ignore_gftp_errors)")) 163 except: 164 ignore_errors = False 165 #exe = os.path.join(path,exe) 166 167 if src.startswith('lfn:') or dest.startswith('lfn:'): 168 self.logger.info("detected LFN URL. Hading control to lfn module") 169 from lfn import LFN_CR_Copy 170 ipmod = LFN_CR_Copy() 171 ipmod.SetParameter('source',src) 172 ipmod.SetParameter('destinationPATH',dest) 173 if se: 174 ipmod.SetParameter('destination',se) 175 if lfnopts: 176 ipmod.SetParameter('opts',lfnopts) 177 return ipmod.Execute(stats) 178 179 180 certdir = self.GetParameter('certdir') 181 if certdir and os.path.exists(expandvars(certdir)): 182 os.putenv('X509_CERT_DIR',expandvars(certdir)) 183 184 inventory = expandvars(inventory) 185 oi = FileInventory() 186 if os.path.exists(inventory): 187 oi.Read(inventory) 188 189 proxyfile=expandvars(proxy) 190 if os.path.exists(proxyfile): 191 os.putenv('X509_USER_PROXY',proxyfile) 192 os.chmod(proxyfile,0600) 193 os.putenv('LD_LIBRARY_PATH',expandvars("%s:$LD_LIBRARY_PATH" % ldpath)) 194 os.putenv('PATH',expandvars("%s:$PATH" % path)) 195 196 cmd = [] 197 cmd.append(exe) 198 cmd.extend(opts) 199 cmd.append(src) 200 cmd.append(dest) 201 cmd = " ".join(cmd) 202 203 if not emulate: 204 if dest.startswith("gsiftp:"): 205 206 status = self.CleanDestination(src,dest) 207 208 209 210 status, output = commands.getstatusoutput(cmd) 211 if ignore_errors: status = 0 212 213 if status: 214 self.logger.error("Failed to execute command '%s',%s" % (cmd,output)) 215 raise Exception, "Failed to execute command '%s',%s" % (cmd,output) 216 self.logger.info(output) 217 try: # get xfer stats 218 if dest.startswith('gsiftp:') and src.startswith('file:'): 219 stats['data-out'] += float(os.path.getsize(expandvars(src.replace('file:','')))) 220 elif src.startswith('gsiftp:') and dest.startswith('file:'): 221 stats['data-in'] += float( os.path.getsize( 222 os.path.join(expandvars(dest.replace('file:','')),basename(src)) 223 )) 224 except: pass 225 else: 226 oi.AddFile(src,dest) 227 oi.Write(inventory) 228 return 0
229 230
231 -class URLGlobCopy(URLCopy):
232 """ 233 This class provides an interface for preprocessing files in iceprod 234 """ 235
236 - def __init__(self):
237 URLCopy.__init__(self) 238 self.logger = logging.getLogger('iceprod::URLGlobCopy')
239 240
241 - def Execute(self,stats):
242 if not IPBaseClass.Execute(self,stats): return 0 243 244 if not stats.has_key('data-out'): stats['data-out'] = 0 245 if not stats.has_key('data-in'): stats['data-in'] = 0 246 247 src = self.GetParameter('source') 248 dest = self.GetParameter('destination') 249 proxy = self.GetParameter('proxyfile') 250 opts = self.GetParameter('opts') 251 ldpath = self.GetParameter('ldpath') 252 path = self.GetParameter('path') 253 inventory= self.GetParameter('inventory') 254 emulate = self.GetParameter('emulate') 255 exe = self.GetParameter('executable') 256 #exe = os.path.join(path,exe) 257 258 certdir = self.GetParameter('certdir') 259 if certdir: 260 os.putenv('X509_CERT_DIR',expandvars(certdir)) 261 262 try: 263 ignore_errors = boolcast(self.parser.parse("$system(ignore_gftp_errors)")) 264 except: 265 ignore_errors = False 266 267 proxyfile=expandvars(proxy) 268 if os.path.exists(proxyfile): 269 os.putenv('X509_USER_PROXY',proxyfile) 270 os.chmod(proxyfile,0600) 271 os.putenv('LD_LIBRARY_PATH',os.path.expandvars("%s:$LD_LIBRARY_PATH" % ldpath)) 272 os.putenv('PATH',os.path.expandvars("%s:$PATH" % path)) 273 274 oi = FileInventory() 275 inventory = expandvars(inventory) 276 if os.path.exists(inventory): 277 oi.Read(inventory) 278 279 retval = 0 280 if src.startswith('file:'): 281 for file in glob.glob(expandvars(src.replace('file:',''))): 282 283 oi.AddFile(file,dest) 284 cmd = [] 285 cmd.append(exe) 286 cmd.extend(opts) 287 cmd.append('file://'+os.path.abspath(os.path.normpath(file))) 288 cmd.append(dest) 289 cmd = " ".join(cmd) 290 if not emulate: 291 self.logger.info(cmd) 292 status, output = commands.getstatusoutput(cmd) 293 self.logger.info(output) 294 if status: 295 self.logger.error("Failed to execute command '%s',%s" % (cmd,output)) 296 raise Exception, "Failed to execute command '%s',%s" % (cmd,output) 297 else: 298 oi.AddFile(src,dest) 299 cmd = [] 300 cmd.append(exe) 301 cmd.extend(opts) 302 cmd.append(src) 303 cmd.append(dest) 304 cmd = " ".join(cmd) 305 if not emulate: 306 self.logger.info(cmd) 307 status, output = commands.getstatusoutput(cmd) 308 self.logger.info(output) 309 if ignore_errors: status = 0 310 311 if status: 312 self.logger.error("Failed to execute command '%s',%s" % (cmd,output)) 313 raise Exception, "Failed to execute command '%s',%s" % (cmd,output) 314 315 if emulate: 316 oi.Write(inventory) 317 return 0
318 319
320 -class URLMultiCopy(URLCopy):
321 """ 322 This class provides an interface for preprocessing files in iceprod 323 """ 324
325 - def __init__(self):
326 URLCopy.__init__(self) 327 self.logger = logging.getLogger('iceprod::URLMultiCopy') 328 self.AddParameter('sourcelist','list of source URLs (files) to copy','')
329 330
331 - def Execute(self,stats):
332 if not IPBaseClass.Execute(self,stats): return 0 333 334 if not stats.has_key('data-out'): stats['data-out'] = 0 335 if not stats.has_key('data-in'): stats['data-in'] = 0 336 337 src = self.GetParameter('sourcelist') 338 dest = self.GetParameter('destination') 339 proxy = self.GetParameter('proxyfile') 340 opts = self.GetParameter('opts') 341 ldpath = self.GetParameter('ldpath') 342 path = self.GetParameter('path') 343 inventory= self.GetParameter('inventory') 344 emulate = self.GetParameter('emulate') 345 exe = self.GetParameter('executable') 346 #exe = os.path.join(path,exe) 347 348 certdir = self.GetParameter('certdir') 349 if certdir: 350 os.putenv('X509_CERT_DIR',expandvars(certdir)) 351 352 try: 353 ignore_errors = boolcast(self.parser.parse("$system(ignore_gftp_errors)")) 354 except: 355 ignore_errors = False 356 357 proxyfile=expandvars(proxy) 358 if os.path.exists(proxyfile): 359 os.putenv('X509_USER_PROXY',proxyfile) 360 os.chmod(proxyfile,0600) 361 os.putenv('LD_LIBRARY_PATH',os.path.expandvars("%s:$LD_LIBRARY_PATH" % ldpath)) 362 os.putenv('PATH',os.path.expandvars("%s:$PATH" % path)) 363 364 oi = FileInventory() 365 inventory = expandvars(inventory) 366 if os.path.exists(inventory): 367 oi.Read(inventory) 368 369 retval = 0 370 for file in src: 371 oi.AddFile(file,dest) 372 cmd = [] 373 cmd.append(exe) 374 cmd.extend(opts) 375 cmd.append(expandvars(file)) 376 cmd.append(dest) 377 cmd = " ".join(cmd) 378 379 if not emulate: 380 self.logger.info(cmd) 381 status, output = commands.getstatusoutput(cmd) 382 self.logger.info(output) 383 if ignore_errors: status = 0 384 385 if status: 386 self.logger.error("Failed to execute command '%s',%s" % (cmd,output)) 387 raise Exception, "Failed to execute command '%s',%s" % (cmd,output) 388 try: # get xfer stats 389 if dest.startswith('gsiftp:') and src.startswith('file:'): 390 stats['data-out'] += float(os.path.getsize(expandvars(src.replace('file:','')))) 391 elif src.startswith('gsiftp:') and dest.startswith('file:'): 392 stats['data-in'] += float( os.path.getsize( 393 os.path.join(expandvars(dest.replace('file:','')),basename(src)) 394 )) 395 except: pass 396 if emulate: 397 oi.Write(inventory) 398 return 0
399
400 -class TrackURLCopy(URLCopy):
401 """ 402 This class provides an interface for preprocessing files in iceprod 403 It also tracks the destination of files in the monitoring database 404 throught the soapmon server. 405 """ 406
407 - def __init__(self):
408 URLCopy.__init__(self) 409 self.logger = logging.getLogger('iceprod::TrackURLCopy') 410 self.AddParameter( 411 'monitorURL', 412 'soapmon url', 413 'http://x2100.icecube.wisc.edu/cgi-bin/simulation/mon/soapmon-cgi') 414 self.AddParameter('dataset','dataset ID',0) 415 self.AddParameter('job','job ID',0) 416 self.AddParameter('key','Temporary password for soapmon','')
417
418 - def Execute(self,stats):
419 if not IPBaseClass.Execute(self,stats): return 0 420 421 if not stats.has_key('data-out'): stats['data-out'] = 0 422 if not stats.has_key('data-in'): stats['data-in'] = 0 423 424 url = self.GetParameter('monitorURL') 425 src = self.GetParameter('source') 426 dest = self.GetParameter('destination') 427 datasetid = int(self.GetParameter('dataset')) 428 jobid = int(self.GetParameter('job')) 429 passcode = self.GetParameter('key') 430 starttime = time.time() 431 if not URLCopy.Execute(self,stats): 432 md5sum = '' 433 filesize = 0. 434 transfertime = time.time() - starttime 435 436 if src.startswith('file:'): 437 md5sum = functions.md5sum(expandvars(src.replace('file:',''))) 438 filesize = float(os.path.getsize(expandvars(src.replace('file:','')))) 439 server = xmlrpclib.ServerProxy(url) 440 if dest.endswith('/'): 441 dest += basename(src) 442 self.logger.info('%s %s' % (dest,md5sum)) 443 if not server.AddFileURL(datasetid,jobid,dest,md5sum,filesize,transfertime,passcode): 444 raise Exception, "Failed to set URL for for %s -> %s" % (src,dest) 445 return 0 446 return 1
447
448 -class AltSourceURLCopy(URLCopy):
449 """ 450 This class provides an interface for preprocessing files in iceprod 451 It also tracks the destination of files in the monitoring database 452 throught the soapmon server. 453 """ 454
455 - def __init__(self):
456 URLCopy.__init__(self) 457 self.logger = logging.getLogger('iceprod::AltSourceURLCopy') 458 self.AddParameter('source1','backup source URL to copy','') 459 self.AddParameter('source2','backup source URL to copy','') 460 self.AddParameter('source3','backup source URL to copy','')
461
462 - def Execute(self,stats):
463 if not IPBaseClass.Execute(self,stats): return 0 464 src = self.GetParameter('source') 465 src1 = self.GetParameter('source1') 466 src2 = self.GetParameter('source2') 467 src3 = self.GetParameter('source3') 468 dest = self.GetParameter('destination') 469 for s in [src,src1,src2,src3]: 470 if not s: continue 471 self.SetParameter('source',s) 472 try: 473 if not URLCopy.Execute(self,stats) : return 0 474 except Exception,e: 475 self.logger.error(e) 476 raise Exception, "Failed to copy %s to %s" % (src,dest)
477