Package iceprod :: Package modules :: Module mapred
[hide private]
[frames] | no frames]

Source Code for Module iceprod.modules.mapred

  1   
  2  #!/bin/env python 
  3  # 
  4  """ 
  5   Interface for configuring pre/post icetray scripts 
  6   
  7   copyright  (c) 2012 the icecube collaboration 
  8   
  9   @version: $Revision: $ 
 10   @date: $Date: $ 
 11   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
 12  """ 
 13   
 14  import os 
 15  import re 
 16  import sys 
 17  import math 
 18  import dircache 
 19  import time 
 20  import string 
 21  import shutil 
 22  import cPickle 
 23  from iceprod.modules.ipmodule import IPBaseClass 
 24  import logging 
 25   
26 -def sanitize(w):
27 28 # Strip punctuation from the front 29 while len(w) > 0 and not w[0].isalnum(): 30 w = w[1:] 31 32 # String punctuation from the back 33 while len(w) > 0 and not w[-1].isalnum(): 34 w = w[:-1] 35 36 return w
37 38
39 -class MapRed(IPBaseClass):
40 """ 41 This class provides an interface for compressing files in iceprod 42 """ 43
44 - def __init__(self):
45 IPBaseClass.__init__(self) 46 self.AddParameter('input','Name of file you want to compress','') 47 self.AddParameter('outfile','Name of output file','') 48 self.logger = logging.getLogger('iceprod::CompressFile')
49 50
51 - def Execute(self,stats):
52 if not IPBaseClass.Execute(self,stats): return 0 53 infile = self.GetParameter('infile') 54 outfile = self.GetParameter('outfile') 55 retval = os.system(cmd) 56 if retval == 0: 57 return retval 58 else: 59 self.logger.error("Failed to execute command '%s'" % cmd) 60 raise Exception, "Failed to execute command '%s'" % cmd
61
62 - def ProcessEvent(self,event):
63 if not event.isalnum(): 64 event = sanitize (event) 65 66 # True if w is a title-cased token 67 if event.istitle(): 68 return (event,1) 69 return None
70
71 - def Map(self,input_list):
72 results = [] 73 for event in input_list: 74 # True if w contains non-alphanumeric characters 75 event = self.ProcessEvent(event) 76 if event: 77 results.append ((event[0], event[1])) 78 79 return results
80
81 - def Partition(self,L):
82 tf = {} 83 for sublist in L: 84 for p in sublist: 85 # Append the tuple to the list in the map 86 try: 87 tf[p[0]].append (p) 88 except KeyError: 89 tf[p[0]] = [p] 90 return tf
91
92 - def Reduce(self,Mapping):
93 """ 94 Should not need to be overloaded. 95 """ 96 return (Mapping[0], self.Combine(pair[1] for pair in Mapping[1]))
97
98 - def Combine(self,inputs):
99 """ 100 Default behaviour is to sum. This should be overloaded for other 101 purposes. 102 """ 103 return sum(inputs)
104