Package iceprod :: Package modules :: Module gsiftp
[hide private]
[frames] | no frames]

Source Code for Module iceprod.modules.gsiftp

  1  #!/bin/env python 
  2  # 
  3  """ 
  4   Interface for configuring pre/post icetray scripts 
  5   
  6   copyright  (c) 2005 the icecube collaboration 
  7   
  8   @version: $Revision: $ 
  9   @date: $Date: $ 
 10   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 11  """ 
 12   
 13  import os,sys 
 14  import time 
 15  import string 
 16  import glob 
 17  import commands 
 18  from os.path import expandvars,basename 
 19  from ipmodule import IPBaseClass 
 20  from iceprod.core.inventory import FileInventory 
 21  from iceprod.core import functions 
 22  import xmlrpclib 
 23  import logging 
 24   
 25   
26 -class URLCopy(IPBaseClass):
27 """ 28 This class provides an interface for preprocessing files in iceprod 29 """ 30
31 - def __init__(self):
32 IPBaseClass.__init__(self) 33 self.executable = 'globus-url-copy' 34 self.AddParameter('source','source URL to copy','') 35 self.AddParameter('destination','detination URL to copy to','') 36 self.AddParameter('certdir','certificate directory','') 37 self.AddParameter('proxyfile','File path to globus proxy','$X509_USER_PROXY') 38 self.AddParameter('ldpath','library path to globus','globus/lib') 39 self.AddParameter('path','path to globus bin directory','globus/bin') 40 self.AddParameter('opts','globus-url-copy options', 41 ['-rst','-cd','-r','-nodcau','-rst-retries 5','-rst-interval 60'] 42 ) 43 self.AddParameter('executable','name of gridFTP executable','globus-url-copy') 44 self.AddParameter('StorageElement','LFN SE','') 45 self.AddParameter('lfn-opts','LFN Options','') 46 self.AddParameter('inventory','File with source dest mappings','$I3_TOPDIR/inventory.xml') 47 self.AddParameter('emulate',"Don't actually transfer files. Just write inventory",False) 48 self.logger = logging.getLogger('iceprod::URLCopy')
49 50
51 - def Execute(self,stats):
52 if not IPBaseClass.Execute(self,stats): return 0 53 src = self.GetParameter('source') 54 dest = self.GetParameter('destination') 55 proxy = self.GetParameter('proxyfile') 56 opts = self.GetParameter('opts') 57 ldpath = self.GetParameter('ldpath') 58 path = self.GetParameter('path') 59 inventory= self.GetParameter('inventory') 60 emulate = self.GetParameter('emulate') 61 exe = self.GetParameter('executable') 62 se = self.GetParameter('StorageElement') 63 lfnopts = self.GetParameter('lfn-opts') 64 #exe = os.path.join(path,exe) 65 66 if src.startswith('lfn:') or dest.startswith('lfn:'): 67 self.logger.info("detected LFN URL. Hading control to lfn module") 68 from lfn import LFN_CR_Copy 69 ipmod = LFN_CR_Copy() 70 ipmod.SetParameter('source',src) 71 ipmod.SetParameter('destinationPATH',dest) 72 if se: 73 ipmod.SetParameter('destination',se) 74 if lfnopts: 75 ipmod.SetParameter('opts',lfnopts) 76 return ipmod.Execute(stats) 77 78 79 certdir = self.GetParameter('certdir') 80 if certdir and os.path.exists(expandvars(certdir)): 81 os.putenv('X509_CERT_DIR',expandvars(certdir)) 82 83 inventory = expandvars(inventory) 84 oi = FileInventory() 85 if os.path.exists(inventory): 86 oi.Read(inventory) 87 88 os.putenv('X509_USER_PROXY',expandvars(proxy)) 89 os.chmod(expandvars(proxy),0600) 90 os.putenv('LD_LIBRARY_PATH',expandvars("%s:$LD_LIBRARY_PATH" % ldpath)) 91 os.putenv('PATH',expandvars("%s:$PATH" % path)) 92 93 cmd = [] 94 cmd.append(exe) 95 cmd.extend(opts) 96 cmd.append(src) 97 cmd.append(dest) 98 cmd = " ".join(cmd) 99 100 if not emulate: 101 status, output = commands.getstatusoutput(cmd) 102 if status: 103 self.logger.error("Failed to execute command '%s',%s" % (cmd,output)) 104 raise Exception, "Failed to execute command '%s',%s" % (cmd,output) 105 self.logger.info(output) 106 else: 107 oi.AddFile(src,dest) 108 oi.Write(inventory) 109 return 0
110 111
112 -class URLGlobCopy(URLCopy):
113 """ 114 This class provides an interface for preprocessing files in iceprod 115 """ 116
117 - def __init__(self):
118 URLCopy.__init__(self) 119 self.logger = logging.getLogger('iceprod::URLGlobCopy')
120 121
122 - def Execute(self,stats):
123 if not IPBaseClass.Execute(self,stats): return 0 124 src = self.GetParameter('source') 125 dest = self.GetParameter('destination') 126 proxy = self.GetParameter('proxyfile') 127 opts = self.GetParameter('opts') 128 ldpath = self.GetParameter('ldpath') 129 path = self.GetParameter('path') 130 inventory= self.GetParameter('inventory') 131 emulate = self.GetParameter('emulate') 132 exe = self.GetParameter('executable') 133 #exe = os.path.join(path,exe) 134 135 certdir = self.GetParameter('certdir') 136 if certdir: 137 os.putenv('X509_CERT_DIR',expandvars(certdir)) 138 139 os.putenv('X509_USER_PROXY',expandvars(proxy)) 140 os.chmod(expandvars(proxy),0600) 141 os.putenv('LD_LIBRARY_PATH',os.path.expandvars("%s:$LD_LIBRARY_PATH" % ldpath)) 142 os.putenv('PATH',os.path.expandvars("%s:$PATH" % path)) 143 144 oi = FileInventory() 145 inventory = expandvars(inventory) 146 if os.path.exists(inventory): 147 oi.Read(inventory) 148 149 retval = 0 150 if src.startswith('file:'): 151 for file in glob.glob(expandvars(src.replace('file:',''))): 152 153 oi.AddFile(file,dest) 154 cmd = [] 155 cmd.append(exe) 156 cmd.extend(opts) 157 cmd.append('file://'+os.path.abspath(os.path.normpath(file))) 158 cmd.append(dest) 159 cmd = " ".join(cmd) 160 if not emulate: 161 self.logger.info(cmd) 162 status, output = commands.getstatusoutput(cmd) 163 self.logger.info(output) 164 if status: 165 self.logger.error("Failed to execute command '%s',%s" % (cmd,output)) 166 raise Exception, "Failed to execute command '%s',%s" % (cmd,output) 167 else: 168 oi.AddFile(src,dest) 169 cmd = [] 170 cmd.append(exe) 171 cmd.extend(opts) 172 cmd.append(src) 173 cmd.append(dest) 174 cmd = " ".join(cmd) 175 if not emulate: 176 self.logger.info(cmd) 177 status, output = commands.getstatusoutput(cmd) 178 self.logger.info(output) 179 if status: 180 self.logger.error("Failed to execute command '%s',%s" % (cmd,output)) 181 raise Exception, "Failed to execute command '%s',%s" % (cmd,output) 182 183 if emulate: 184 oi.Write(inventory) 185 return 0
186 187
188 -class URLMultiCopy(URLCopy):
189 """ 190 This class provides an interface for preprocessing files in iceprod 191 """ 192
193 - def __init__(self):
194 URLCopy.__init__(self) 195 self.logger = logging.getLogger('iceprod::URLMultiCopy') 196 self.AddParameter('sourcelist','list of source URLs (files) to copy','')
197 198
199 - def Execute(self,stats):
200 if not IPBaseClass.Execute(self,stats): return 0 201 src = self.GetParameter('sourcelist') 202 dest = self.GetParameter('destination') 203 proxy = self.GetParameter('proxyfile') 204 opts = self.GetParameter('opts') 205 ldpath = self.GetParameter('ldpath') 206 path = self.GetParameter('path') 207 inventory= self.GetParameter('inventory') 208 emulate = self.GetParameter('emulate') 209 exe = self.GetParameter('executable') 210 #exe = os.path.join(path,exe) 211 212 certdir = self.GetParameter('certdir') 213 if certdir: 214 os.putenv('X509_CERT_DIR',expandvars(certdir)) 215 216 os.putenv('X509_USER_PROXY',expandvars(proxy)) 217 os.chmod(expandvars(proxy),0600) 218 os.putenv('LD_LIBRARY_PATH',os.path.expandvars("%s:$LD_LIBRARY_PATH" % ldpath)) 219 os.putenv('PATH',os.path.expandvars("%s:$PATH" % path)) 220 221 oi = FileInventory() 222 inventory = expandvars(inventory) 223 if os.path.exists(inventory): 224 oi.Read(inventory) 225 226 retval = 0 227 for file in src: 228 oi.AddFile(file,dest) 229 cmd = [] 230 cmd.append(exe) 231 cmd.extend(opts) 232 cmd.append(expandvars(file)) 233 cmd.append(dest) 234 cmd = " ".join(cmd) 235 236 if not emulate: 237 self.logger.info(cmd) 238 status, output = commands.getstatusoutput(cmd) 239 self.logger.info(output) 240 if status: 241 self.logger.error("Failed to execute command '%s',%s" % (cmd,output)) 242 raise Exception, "Failed to execute command '%s',%s" % (cmd,output) 243 if emulate: 244 oi.Write(inventory) 245 return 0
246
247 -class TrackURLCopy(URLCopy):
248 """ 249 This class provides an interface for preprocessing files in iceprod 250 It also tracks the destination of files in the monitoring database 251 throught the soapmon server. 252 """ 253
254 - def __init__(self):
255 URLCopy.__init__(self) 256 self.logger = logging.getLogger('iceprod::TrackURLCopy') 257 self.AddParameter( 258 'monitorURL', 259 'soapmon url', 260 'http://x2100.icecube.wisc.edu/cgi-bin/simulation/mon/soapmon-cgi') 261 self.AddParameter('dataset','dataset ID',0) 262 self.AddParameter('job','job ID',0) 263 self.AddParameter('key','Temporary password for soapmon','')
264
265 - def Execute(self,stats):
266 url = self.GetParameter('monitorURL') 267 src = self.GetParameter('source') 268 dest = self.GetParameter('destination') 269 datasetid = int(self.GetParameter('dataset')) 270 jobid = int(self.GetParameter('job')) 271 passcode = self.GetParameter('key') 272 starttime = time.time() 273 if not URLCopy.Execute(self,stats): 274 md5sum = '' 275 filesize = 0. 276 transfertime = time.time() - starttime 277 278 if src.startswith('file:'): 279 md5sum = functions.md5sum(expandvars(src.replace('file:',''))) 280 filesize = float(os.path.getsize(expandvars(src.replace('file:','')))) 281 server = xmlrpclib.ServerProxy(url) 282 if dest.endswith('/'): 283 dest += basename(src) 284 self.logger.info('%s %s' % (dest,md5sum)) 285 if not server.AddFileURL(datasetid,jobid,dest,md5sum,filesize,transfertime,passcode): 286 raise Exception, "Failed to set URL for for %s -> %s" % (src,dest) 287 return 0 288 return 1
289
290 -class AltSourceURLCopy(URLCopy):
291 """ 292 This class provides an interface for preprocessing files in iceprod 293 It also tracks the destination of files in the monitoring database 294 throught the soapmon server. 295 """ 296
297 - def __init__(self):
298 URLCopy.__init__(self) 299 self.logger = logging.getLogger('iceprod::AltSourceURLCopy') 300 self.AddParameter('source1','backup source URL to copy','') 301 self.AddParameter('source2','backup source URL to copy','') 302 self.AddParameter('source3','backup source URL to copy','')
303
304 - def Execute(self,stats):
305 src = self.GetParameter('source') 306 src1 = self.GetParameter('source1') 307 src2 = self.GetParameter('source2') 308 src3 = self.GetParameter('source3') 309 dest = self.GetParameter('destination') 310 for s in [src,src1,src2,src3]: 311 if not s: continue 312 self.SetParameter('source',s) 313 try: 314 if not URLCopy.Execute(self,stats) : return 0 315 except Exception,e: 316 self.logger.error(e) 317 raise Exception, "Failed to copy %s to %s" % (src,dest)
318