Package iceprod :: Package server :: Module server
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.server

   1  #! /usr/bin/env python 
   2  # 
   3  """ 
   4    XML-RPC Server module for running remote batch jobs on IceTray. 
   5   
   6    copyright (c) 2005 the icecube collaboration 
   7   
   8    @version: $Revision: $ 
   9    @date: $Date: $ 
  10    @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
  11    @todo: Add HTTPS support 
  12    @todo: Add XML validation in suporting classes 
  13   
  14    U{Userguide and other documentation <http://www.icecube.wisc.edu/~juancarlos/software/sim-prod>} 
  15   
  16  """ 
  17   
  18  import sys 
  19  import os 
  20  import string 
  21  import re 
  22  import time 
  23  import getopt 
  24  import cPickle 
  25  import logging 
  26  import signal 
  27  import exceptions 
  28  import pwd,grp  
  29  import thread,threading 
  30  from threading import Thread 
  31  from os.path import expandvars,join 
  32  from cPickle import dumps,loads 
  33  from MySQLdb import OperationalError 
  34  from Queue import Queue 
  35   
  36  try: 
  37     import ldap 
  38     ldap_installed = True 
  39  except ImportError: 
  40     ldap_installed = False 
  41   
  42  import iceprod 
  43  from iceprod.server import rpc 
  44  from iceprod.core.dataclasses import * 
  45  from iceprod.core.xmlwriter import IceTrayXMLWriter 
  46  from iceprod.server.db import ConfigDB,MonitorDB 
  47  from iceprod.core.configuration import Config 
  48  from iceprod.server.db import MySQLParamDb,SoapParamDB 
  49  from iceprod.server.queue import i3ProdQueue 
  50  from iceprod.server.job import i3Job 
  51   
  52  import iceprod.core.logger 
  53  logger = logging.getLogger('server') 
  54   
  55  localhost = os.uname()[1].split(".")[0] 
  56   
  57  soapdaemons = ['soaptray','soapmon','soapqueue','soapdh','soaphisto'] 
  58   
  59  ssl_supported = False 
  60   
  61  try: 
  62      from OpenSSL import SSL 
  63      ssl_supported = True 
  64  except Exception,e: 
  65      print >> sys.stderr,'exception: %s : communicaton will not be encrypted' % str(e) 
  66      pass 
  67   
  68   
69 -def fail(msg,exception=Exception()):
70 return msg,dumps(None),dumps(exception)
71 72
73 -class DummySemaphore:
74 """ 75 Dummy class that shares interface with Semaphore class but does nothing. 76 """ 77
78 - def __init__(self, value=1, verbose=None):
79 self.counter = value 80 self.verbose = verbose
81
82 - def acquire(self, blocking=1):
83 self.counter -= 1 84 if self.verbose: logger.info("count: %d" % self.counter) 85 return self.counter
86
87 - def release(self):
88 self.counter += 1 89 if self.verbose: logger.info("count: %d" % self.counter) 90 return self.counter
91
92 -class QSemaphore(DummySemaphore):
93 logger = logging.getLogger('QueueSemaphore') 94
95 - def __init__(self, value=1, verbose=None):
96 self.verbose = verbose 97 self.queue = Queue(value)
98
99 - def acquire(self, blocking=1):
100 self.queue.put(1) 101 if self.verbose: self.logger.info("count: %d" % self.queue.qsize())
102
103 - def release(self):
104 self.queue.get() 105 if self.verbose: self.logger.info("count: %d" % self.queue.qsize())
106 107
108 -class SoapTray:
109 110 """ 111 XMLRPC server class for submitting jobs to IceProd 112 """ 113
114 - def __init__(self,cfg):
115 self.cfg = cfg 116 117 self.use_ldap = self.cfg.getboolean('ldap','enable') and ldap_installed 118 self.logger = logging.getLogger('SoapTray') 119 self.dbserver = self.cfg.get('database','server') 120 self.dbport = self.cfg.getint('database','port') 121 self.database = self.cfg.get('database','database') 122 self.username = self.cfg.get('database','username') 123 self.password = self.cfg.get('database','password') 124 self.grid_name = self.cfg.get('queue','name') 125 self.batchsys = self.cfg.get('queue','batchsys') 126 self.institution = self.cfg.get('info','INSTITUTION') 127 self.address = self.cfg.get('server','server') 128 self.port = self.cfg.getint('server','port') 129 self.rootdir = self.cfg.get('path','basedir') 130 self.datawarehouse = expandvars(self.cfg.get('path','datawarehouse',raw=True)) 131 self.usesecure = True 132 if self.cfg.has_option('server','USESSL'): 133 self.usesecure = self.cfg.getboolean('server','USESSL') 134 if self.cfg.has_option('security','USESSL'): 135 self.usesecure = self.usesecure and self.cfg.getboolean('security','USESSL') 136 137 if self.use_ldap: 138 self.ldap_users = self.cfg.get('ldap','users').split(',') 139 self.ldap_users = map(string.strip,self.ldap_users) 140 141 self.iceprodcert = None 142 self.iceprodkey = None 143 if self.usesecure: 144 self.iceprodcert = expandvars(self.cfg.get('security','SSLCERT')) 145 self.iceprodkey = expandvars(self.cfg.get('security','SSLKEY')) 146 147 self.submithost = os.getenv('HOSTNAME') 148 if self.cfg.has_option('queue','SUBMITHOST'): 149 self.submithost = self.cfg.get('queue','SUBMITHOST') 150 151 # Try to setup a https server (SSL) 152 if ssl_supported and self.usesecure: 153 sslctx = SSL.Context(SSL.SSLv23_METHOD) 154 if not os.path.exists(self.iceprodcert): 155 self.logger.fatal("Cannot find SSL certificate in %s" % self.iceprodcert) 156 if not os.path.exists(self.iceprodkey): 157 self.logger.fatal("Cannot find SSL key in %s" % self.iceprodkey) 158 159 sslctx.use_privatekey_file (self.iceprodkey) 160 sslctx.use_certificate_file(self.iceprodcert) 161 self.server = rpc.ThreadedSecureXMLRPCServer( 162 (self.address, self.port), 163 logRequests=False, ssl_context = sslctx) 164 self.logger.info("soaptray server running **encrypted** on addr:%s:%d" % (self.address,self.port)) 165 166 # Try to setup a plain http server 167 else: 168 self.server = rpc.ThreadedXMLRPCServer((self.address, self.port)) 169 self.logger.info("soaptray server running **un-encrypted** on addr:%s:%d" % (self.address,self.port))
170 171
172 - def register_module(self):
173 """ 174 Register daemon with database and update status 175 """ 176 i3db = MonitorDB() 177 if not i3db.authenticate(self.dbserver,self.username, self.password, 178 self.database, port=self.dbport,keep_open=True): 179 raise Exception, 'unable to authenticate database user' 180 self.grid_id = i3db.GetGridId(self.grid_name) 181 i3db.disconnect() 182 return self.grid_id
183
184 - def serve_forever(self):
185 """ 186 For some reason SOAPpy throws an exception if we get an 187 HTTP instead of HTTPS request when using SSL 188 """ 189 if ssl_supported: 190 try: 191 self.server.serve_forever() 192 except KeyboardInterrupt: 193 self.logger.info("Received keyboard interrupt") 194 self.logger.info("Exiting") 195 os._exit(0) 196 197 except SSL.Error,e: 198 self.logger.error("received: " + str(e)) 199 self.serve_forever() 200 except socket.error,e: 201 self.logger.error(str(e)) 202 os._exit(1) 203 else: # don't catch the SSL exception 204 try: 205 self.server.serve_forever() 206 except KeyboardInterrupt: 207 self.logger.info("Received keyboard interrupt") 208 self.logger.info("Exiting") 209 os._exit(0) 210 except socket.error,e: 211 self.logger.error(str(e)) 212 os._exit(1)
213
214 - def auth_db(self,db_obj,username,password,keep_open=False):
215 """ 216 Authenticate remotely against database 217 This is the exposed method 218 @param db_obj: database instance 219 @param username: 220 @param password: 221 @param : keep_open 222 @return: True if authenticated False otherwise 223 """ 224 225 self.logger.info("ldap is %s" % self.use_ldap) 226 if self.cfg.getboolean('ldap','enable') and not ldap_installed: 227 self.logger.warn("ldap is enabled but not supported.") 228 229 if self.use_ldap: 230 l=ldap.initialize(self.cfg.get('ldap','url')) 231 try: 232 result = l.bind(self.cfg.get('ldap','cn',raw=True)%username, password,ldap.AUTH_SIMPLE) 233 self.logger.info(str(l.result(result))) 234 except ldap.INVALID_CREDENTIALS, error: 235 logger.error(error) 236 except ldap.LDAPError, error: 237 self.logger.error(error) 238 except Exception, error: 239 self.logger.error(error) 240 else: 241 if username in self.ldap_users: 242 return db_obj.authenticate(self.dbserver,self.username,self.password,self.database,port=self.dbport,keep_open=keep_open) 243 244 return db_obj.authenticate(self.dbserver,username,password,self.database,port=self.dbport,keep_open=keep_open)
245 246
247 - def authenticate(self,username,password):
248 """ 249 Authenticate remotely against database 250 This is the exposed method 251 @param username: 252 @param password: 253 @return: True if authenticated False otherwise 254 """ 255 if ConfigDB().authenticate2(self.dbserver,username,password,self.database,port=self.dbport): 256 return True 257 258 if self.use_ldap: 259 l=ldap.initialize(self.cfg.get('ldap','url')) 260 try: 261 result = l.bind(self.cfg.get('ldap','cn',raw=True)%username, password,ldap.AUTH_SIMPLE) 262 except ldap.INVALID_CREDENTIALS, error: 263 logger.error(error) 264 return False 265 else: 266 return True 267 else: 268 return False
269 270
271 - def enqueue(self,i3steering,username,password,submitter):
272 configdb = ConfigDB() 273 configdb.SetSubmitter(submitter) 274 configdb.SetInstitution(self.institution) 275 if self.cfg.has_option('soapdh','tempdata'): 276 configdb.SetTempStoragePath(expandvars(self.cfg.get('soapdh','tempdata',raw=True))) 277 else: 278 configdb.SetTempStoragePath(expandvars(self.cfg.get('path','datawarehouse',raw=True))) 279 configdb.SetGlobalStoragePath(expandvars(self.cfg.get('path','datawarehouse',raw=True))) 280 mondb = MonitorDB() 281 282 if not self.auth_db(configdb,username, password,keep_open=True): 283 self.logger.info("ConfigDB: Access denied for user %s" % username) 284 configdb.disconnect() 285 return fail("ConfigDB: Access denied for user %s" % username) 286 if not self.auth_db(mondb,username, password,keep_open=True): 287 self.logger.info("MonitorDB: Access denied for user %s" % username) 288 mondb.disconnect() 289 return fail("MonitorDB: Access denied for user %s" % username) 290 291 q = i3ProdQueue(self.cfg) 292 q.SetConfigDB(configdb) 293 q.SetMonitorDB(mondb) 294 q.SetRootDir(self.rootdir) 295 q.SetSubmitHost(self.submithost) 296 q.SetSubmitter(submitter) 297 q.SetInstitution(self.institution) 298 status,i3q = q.EnQueue(i3steering) 299 300 # don't keep connections hanging 301 configdb.disconnect() 302 mondb.disconnect() 303 del q 304 return status,i3q
305 306
307 - def submit(self, 308 sconfig, 309 username, 310 password, 311 submitter, 312 production=False, 313 start=0, 314 end=0, 315 dataset=0):
316 """ 317 Receive a remote request for a job submission 318 @param sconfig: pickled steering configuration 319 @param username: (needed for connecting to the configuration database 320 @param password: (needed for connecting to the configuration database 321 @param submitter: username of person who submitted dataset 322 @param production: boolean flag 323 @param start: optional begining of job sequence (non-prod) 324 @param end: optional end of job sequence (non-prod) 325 @param dataset: optional (non-prod) 326 @return: Output generated by queue 327 """ 328 329 i3db = ConfigDB() 330 if not self.auth_db(i3db,username, password): 331 self.logger.info("Access denied for user %s" % username) 332 return fail("Access denied for user %s" % username) 333 334 self.logger.info("Handling submission from user %s" % username) 335 336 i3steering = cPickle.loads(sconfig) 337 i3q = None 338 if production: 339 try: 340 status,i3q = self.enqueue(i3steering,username,password,submitter) 341 except Exception, e: 342 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 343 self.logger.error('Caught %s: %s' % (sys.exc_type, str(e))) 344 i3db.disconnect() 345 return fail(str(e),e) 346 else: 347 q = i3ProdQueue(self.cfg) 348 q.SetRootDir(self.rootdir) 349 q.SetSubmitHost(self.submithost) 350 q.SetSubmitter(submitter) 351 352 try: 353 status,i3q = q.Submit(i3steering,first=start,last=end,npid=dataset) 354 except Exception, e: 355 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 356 self.logger.error('Caught %s: %s' % (sys.exc_type, str(e))) 357 i3db.disconnect() 358 del q 359 return fail(str(e),e) 360 del q 361 i3db.disconnect() 362 del i3steering 363 return status,dumps(i3q),dumps(None)
364 365
366 - def checkjobs(self,i3q_pkl,username,password):
367 """ 368 Receive a remote request for a job status check 369 @param i3q_pkl: a serialized iGrid object 370 """ 371 372 i3db = ConfigDB() 373 if not self.auth_db(i3db,username, password,keep_open=True): 374 self.logger.error("Access denied for user %s" % username) 375 return fail("Access denied for user %s" % username) 376 377 i3q = cPickle.loads(i3q_pkl) 378 i3q.batchsys = self.batchsys 379 status = i3q.CheckQ(i3db) 380 i3db.disconnect() 381 return status
382
383 - def queue_remove(self,i3q_pkl,username,password):
384 """ 385 Receive a remote request for a job status check 386 @param i3q_pkl: a serialized iGrid object 387 """ 388 389 i3db = MonitorDB() 390 if not self.auth_db(i3db,username, password,keep_open=True): 391 self.logger.info("Access denied for user %s" % username) 392 return fail("Access denied for user %s" % username) 393 394 i3q = cPickle.loads(i3q_pkl) 395 i3q.batchsys = self.batchsys 396 status = i3q.QRemove(i3db) 397 i3db.disconnect() 398 return status
399
400 - def queue_suspend(self,username,password,dataset,job):
401 jobstr = ".%d" % job 402 if job < 0: jobstr = "" 403 i3db = MonitorDB() 404 if not self.auth_db(i3db,username, password,keep_open=True): 405 self.logger.warn("suspend %d%s denied for user %s" % (dataset,jobstr,username)) 406 return fail("suspend %d%s denied for user %s" % (dataset,jobstr,username)) 407 try: 408 i3db.jobsuspend(job,dataset,True) 409 self.logger.info("suspend %d%s granted for user %s" % (dataset,jobstr,username)) 410 i3db.add_history(username,"suspend %d%s" % (dataset,jobstr)) 411 self.logger.info("updated stats") 412 i3db.disconnect() 413 return "suspend %d%s granted for user %s" % (dataset,jobstr,username) 414 except Exception,e: 415 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 416 self.logger.error("suspend %d%s failed:%s" % (dataset,jobstr,str(e))) 417 i3db.disconnect() 418 return fail("suspend %d%s failed:%s" % (dataset,jobstr,str(e)))
419
420 - def queue_resume(self,username,password,dataset,job):
421 jobstr = ".%d" % job 422 if job < 0: jobstr = "" 423 i3db = MonitorDB() 424 if not self.auth_db(i3db,username, password,keep_open=True): 425 self.logger.warn("resume %d%s denied for user %s" % (dataset,jobstr,username)) 426 i3db.disconnect() 427 return fail("resume %d%s denied for user %s" % (dataset,jobstr,username)) 428 try: 429 i3db.jobsuspend(job,dataset,False) 430 self.logger.info("resume %d%s granted for user %s" % (dataset,jobstr,username)) 431 i3db.add_history(username,"resume %d%s" % (dataset,jobstr)) 432 self.logger.info("updated stats") 433 i3db.disconnect() 434 return "resume %d%s granted for user %s" % (dataset,jobstr,username) 435 except Exception, e: 436 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 437 self.logger.error("resume %d%s failed:%s" % (dataset,jobstr,e)) 438 i3db.disconnect() 439 return "resume %d%s failed:%s" % (dataset,jobstr,e)
440
441 - def queue_reset(self,username,password,dataset,job):
442 jobstr = ".%d" % job 443 if job < 0: jobstr = "" 444 i3db = MonitorDB() 445 if not self.auth_db(i3db,username, password,keep_open=True): 446 self.logger.warn("reset %d%s denied for user %s" % (dataset,jobstr,username)) 447 return fail("reset %d%s denied for user %s" % (dataset,jobstr,username)) 448 try: 449 i3db.jobsuspend(job,dataset,False) 450 i3db.add_history(username,"reset %d%s" % (dataset,jobstr)) 451 self.logger.info("reset %d%s granted for user %s" % (dataset,jobstr,username)) 452 i3db.disconnect() 453 return "reset %d%s granted for user %s" % (dataset,jobstr,username) 454 except Exception, e: 455 self.logger.error(sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) ) 456 self.logger.error("reset %d%s failed:%s" % (dataset,jobstr,e)) 457 i3db.disconnect() 458 return "reset %d%s failed:%s" % (dataset,jobstr,e)
459 460
461 - def queue_clean(self,username,password,dataset):
462 i3db = MonitorDB() 463 if not self.auth_db(i3db,username, password,keep_open=True): 464 self.logger.warn("clean %d denied for user %s" % (dataset,username)) 465 i3db.disconnect() 466 return fail("clean %d denied for user %s" % (dataset,username)) 467 try: 468 i3db.update_monitoring(self.grid_id,dataset_id=dataset) 469 i3db.jobclean(dataset) 470 i3db.add_history(username,"clean %d" % dataset) 471 self.logger.info("clean %d granted for user %s" % (dataset,username)) 472 i3db.disconnect() 473 return "clean %d granted for user %s" % (dataset,username) 474 except Exception,e: 475 self.logger.error("clean %d failed:%s" % (dataset,str(e))) 476 i3db.disconnect() 477 return fail("clean %d failed:%s" % (dataset,str(e)))
478
479 - def queue_delete(self,username,password,dataset):
480 i3db = MonitorDB() 481 if not self.auth_db(i3db,username, password,keep_open=True): 482 self.logger.warn("clean %d denied for user %s" % (dataset,username)) 483 i3db.disconnect() 484 return fail("clean %d denied for user %s" % (dataset,username)) 485 try: 486 i3db.jobclean(dataset,archive=False) 487 i3db.set_metadata_subcat(dataset,sub_cat="obsolete") 488 i3db.setDatasetStatus(dataset,'OBSOLETE') 489 i3db.validate(dataset,'FALSE') 490 i3db.add_history(username,"nuke %d" % dataset) 491 self.logger.info("delete %d granted for user %s" % (dataset,username)) 492 i3db.disconnect() 493 return "delete %d granted for user %s" % (dataset,username) 494 except Exception,e: 495 i3db.disconnect() 496 self.logger.error("delete %d failed:%s" % (dataset,str(e))) 497 return fail("clean %d failed:%s" % (dataset,str(e)))
498
499 - def queue_dataset_finish(self,username,password,dataset):
500 i3db = MonitorDB() 501 if not self.auth_db(i3db,username, password,keep_open=True): 502 i3db.disconnect() 503 self.logger.warn("clean %d denied for user %s" % (dataset,username)) 504 return fail("clean %d denied for user %s" % (dataset,username)) 505 try: 506 i3db.update_monitoring(self.grid_id,dataset_id=dataset) 507 i3db.jobclean(dataset) 508 i3db.setDatasetStatus(dataset,'COMPLETE') 509 i3db.add_history(username,"finish %d" % dataset) 510 i3db.disconnect() 511 self.logger.info("finish %d granted for user %s" % (dataset,username)) 512 return "finish %d granted for user %s" % (dataset,username) 513 except Exception,e: 514 i3db.disconnect() 515 self.logger.error("finish %d failed:%s" % (dataset,str(e))) 516 return fail("finish %d failed:%s" % (dataset,str(e)))
517
518 - def queue_dataset_toggle_debug(self,username,password,dataset):
519 i3db = MonitorDB() 520 if not self.auth_db(i3db,username, password,keep_open=True): 521 i3db.disconnect() 522 self.logger.warn("toggle debug %d denied for user %s" % (dataset,username)) 523 return fail("toggle debug %d denied for user %s" % (dataset,username)) 524 try: 525 i3db.ToggleDatasetDebug(dataset) 526 i3db.add_history(username,"toggle debug %d" % dataset) 527 i3db.disconnect() 528 self.logger.info("toggle debug %d granted for user %s" % (dataset,username)) 529 return "toggle debug %d granted for user %s" % (dataset,username) 530 except Exception,e: 531 i3db.disconnect() 532 self.logger.error("toggle debug %d failed:%s" % (dataset,str(e))) 533 return fail("toggle debug %d failed:%s" % (dataset,str(e)))
534 535
536 - def loaddict(self,odict_pkl,username,password,dataset_id=0):
537 i3db = ConfigDB() 538 if not self.auth_db(i3db,username, password,keep_open=True): 539 i3db.disconnect() 540 self.logger.warn("validate %d denied for user %s" % (dataset,username)) 541 return fail("validate %d denied for user %s" % (dataset,username)) 542 try: 543 odict = cPickle.loads(odict_pkl) 544 i3db.load_dictionary(odict,dataset_id) 545 i3db.disconnect() 546 except Exception,e: 547 i3db.disconnect() 548 self.logger.warn("failed to load dictionary :\n%s" % str(e)) 549 return fail("failed to load dictionary :\n%s" % str(e))
550
551 - def queue_setstatus(self,username,password,dataset,status):
552 i3db = MonitorDB() 553 if not self.auth_db(i3db,username, password,keep_open=True): 554 i3db.disconnect() 555 self.logger.warn("validate %d denied for user %s" % (dataset,username)) 556 return fail("validate %d denied for user %s" % (dataset,username)) 557 try: 558 i3db.setDatasetStatus(dataset,status) 559 i3db.add_history(username,"set status %s %d" % (status,dataset)) 560 i3db.disconnect() 561 self.logger.info("setstatus '%s' on dataset %d granted for user %s" % (status,dataset,username)) 562 return "setstatus '%s' on dataset %d granted for user %s" % (status,dataset,username) 563 except Exception,e: 564 i3db.disconnect() 565 self.logger.error("setstatus '%s' on dataset %d failed \n%s" % (status,dataset,str(e))) 566 return fail("setstatus '%s' on dataset %d failed \n%s" % (status,dataset,str(e)))
567
568 - def queue_retire(self,username,password,dataset):
569 """ 570 Mark dataset as obsolete 571 """ 572 i3db = ConfigDB() 573 i3mon = MonitorDB() 574 if not self.auth_db(i3db,username, password,keep_open=True): 575 i3db.disconnect() 576 self.logger.warn("validate %d denied for user %s" % (dataset,username)) 577 return fail("validate %d denied for user %s" % (dataset,username)) 578 self.auth_db(i3mon,username, password,keep_open=True) 579 try: 580 i3db.set_metadata_subcat(dataset,sub_cat="obsolete") 581 i3mon.setDatasetStatus(dataset,'OBSOLETE') 582 i3mon.add_history(username,"retire s %d" % dataset) 583 i3db.disconnect() 584 i3mon.disconnect() 585 self.logger.info("retire dataset %d granted for user %s" % (dataset,username)) 586 return "retire dataset %d granted for user %s" % (dataset,username) 587 except Exception,e: 588 i3db.disconnect() 589 i3mon.disconnect() 590 self.logger.error("retire dataset %d failed \n%s" % (dataset,str(e))) 591 return fail("retire dataset %d failed \n%s" % (dataset,str(e)))
592 593
594 - def queue_validate(self,username,password,dataset,valid=True):
595 i3db = MonitorDB() 596 if not self.auth_db(i3db,username, password,keep_open=True): 597 i3db.disconnect() 598 self.logger.warn("validate %d denied for user %s" % (dataset,username)) 599 return fail("validate %d denied for user %s" % (dataset,username)) 600 try: 601 if valid: 602 i3db.validate(dataset,'TRUE') 603 else: 604 i3db.validate(dataset,'FALSE') 605 i3db.add_history(username,"validate %d" % dataset) 606 i3db.disconnect() 607 self.logger.info("change %d granted for user %s" % (dataset,username)) 608 return "change %d granted for user %s" % (dataset,username) 609 except Exception,e: 610 i3db.disconnect() 611 self.logger.error("validate %d failed:%s" % (dataset,str(e))) 612 return fail("validate %d failed:\n%s" % (dataset,str(e)))
613 614
615 - def daemon_suspend(self,username,password,grid,daemon='all'):
616 i3db = MonitorDB() 617 if not self.auth_db(i3db,username, password,keep_open=True): 618 i3db.disconnect() 619 self.logger.warn("suspend %s.%s denied for user %s" % (grid,daemon,username)) 620 return fail("suspend %s.%s denied for user %s" % (grid,daemon,username)) 621 try: 622 if daemon=='all': 623 for d in soapdaemons: 624 i3db.GridRequestSuspend(grid,d) 625 elif daemon in soapdaemons: 626 i3db.GridRequestSuspend(grid,daemon) 627 else: 628 self.logger.error("%s: invalid daemon %s" % (grid,daemon)) 629 return fail("%s: invalid daemon %s" % (grid,daemon)) 630 631 i3db.add_history(username,"suspend daemon %s.%s" % (grid,daemon) ) 632 i3db.disconnect() 633 self.logger.info("suspend %s.%s granted for user %s" % (grid,daemon,username)) 634 return "suspend %s.%s granted for user %s" % (grid,daemon,username) 635 except Exception,e: 636 i3db.disconnect() 637 self.logger.error("suspend %s.%s failed:%s" % (grid,daemon,username)) 638 return fail("suspend %s.%s failed:%s" % (grid,daemon,username))
639
640 - def daemon_resume(self,username,password,grid,daemon='all'):
641 i3db = MonitorDB() 642 if not self.auth_db(i3db,username, password,keep_open=True): 643 i3db.disconnect() 644 self.logger.warn("resume %s.%s denied for user %s" % (grid,daemon,username)) 645 return fail("resume %s.%s denied for user %s" % (grid,daemon,username)) 646 try: 647 if daemon=='all': 648 for d in soapdaemons: 649 i3db.GridRequestResume(grid,d) 650 elif daemon in soapdaemons: 651 i3db.GridRequestResume(grid,daemon) 652 else: 653 i3db.disconnect() 654 self.logger.error("%s: invalid daemon %s" % (grid,daemon)) 655 return fail("%s: invalid daemon %s" % (grid,daemon)) 656 657 i3db.add_history(username,"resume daemon %s.%s" % (grid,daemon) ) 658 i3db.disconnect() 659 self.logger.info("resume %s.%s granted for user %s" % (grid,daemon,username)) 660 return "resume %s.%s granted for user %s" % (grid,daemon,username) 661 except Exception,e: 662 i3db.disconnect() 663 self.logger.error("resume %s.%s failed:%s" % (grid,daemon,username)) 664 return fail("resume %s.%s failed:%s" % (grid,daemon,username))
665
666 - def get_simcat_categories(self):
667 """ 668 Get list of valid simcats 669 """ 670 i3db = ConfigDB() 671 if not i3db.authenticate(self.dbserver,self.username, self.password, 672 self.database,port=self.dbport,keep_open=True): 673 i3db.disconnect() 674 self.logger.error("Access denied for user %s" % username) 675 retval = fail("Access denied for user %s" % username) 676 else: 677 retval = dumps(i3db.get_simcat_categories()) 678 i3db.disconnect() 679 return retval
680
681 - def printsummary(self,days):
682 """ 683 get usage statistics 684 @param days number of days (from today) to get data from 685 @return formated string with usage statistics 686 """ 687 i3db = MonitorDB() 688 if not i3db.authenticate(self.dbserver,self.username, self.password, 689 self.database,port=self.dbport,keep_open=True): 690 i3db.disconnect() 691 self.logger.warn("resume %s.%s denied for user %s" % (grid,daemon,username)) 692 return fail("resume %s.%s denied for user %s" % (grid,daemon,username)) 693 try: 694 retval = i3db.printsummary(days) 695 self.logger.info("printsummary: serving request") 696 i3db.disconnect() 697 return retval 698 except Exception,e: 699 i3db.disconnect() 700 self.logger.error("print summary failed: %s" % e) 701 return fail("print summary failed")
702
703 - def getstatus(self,dataset,job=-1):
704 """ 705 Get list of datasets in database 706 @param search_string: a string containing key words to use 707 """ 708 i3db = MonitorDB() 709 if not i3db.authenticate(self.dbserver,self.username, self.password, 710 self.database,port=self.dbport,keep_open=True): 711 self.logger.error("Access denied for user %s" % username) 712 retval = fail("Access denied for user %s" % username) 713 else: 714 retval = dumps(i3db.getstatus(dataset,job)) 715 i3db.disconnect() 716 return retval
717 718
719 - def showrunlist(self,search_string=""):
720 """ 721 Get list of datasets in database 722 @param search_string: a string containing key words to use 723 """ 724 i3db = ConfigDB() 725 if not i3db.authenticate(self.dbserver,self.username, self.password, 726 self.database,port=self.dbport,keep_open=True): 727 self.logger.error("Access denied for user %s" % username) 728 retval = fail("Access denied for user %s" % username) 729 else: 730 retval = dumps(i3db.show_dataset_table(search_string)) 731 i3db.disconnect() 732 return retval
733
734 - def download_config(self,dataset,defaults=False,descriptions=False):
735 """ 736 Fetch configuration from database 737 @param datasest: dataset id for configuration 738 """ 739 i3db = ConfigDB() 740 if not i3db.authenticate(self.dbserver,self.username, self.password, 741 self.database,port=self.dbport,keep_open=True): 742 self.logger.error("Access denied for user %s" % username) 743 retval = fail("Access denied for user %s" % username) 744 else: 745 retval = dumps(i3db.download_config(dataset,defaults,descriptions)) 746 i3db.disconnect() 747 return retval
748
749 - def check_connection(self):
750 """ 751 Fetch configuration from database 752 @param datasest: dataset id for configuration 753 """ 754 msg = "%80s\n" % ('*'*80) 755 msg += '\n' 756 msg += "%80s\n" % time.asctime() 757 msg += '\n' 758 msg += "%80s\n" % ("IceProd Server version %s" % iceprod.server.__version__) 759 msg += '\n' 760 msg += "%80s\n" % ("Running on host %s" % os.uname()[1]) 761 msg += '\n' 762 msg += "%80s\n" % ('*'*80) 763 return dumps(msg)
764
765 - def echo(self,msg):
766 return msg
767 768
769 - def register_functions(self):
770 """ 771 expose the methods 772 """ 773 self.server.register_function(self.echo) 774 self.server.register_function(self.submit) 775 self.server.register_function(self.authenticate) 776 self.server.register_function(self.checkjobs) 777 self.server.register_function(self.queue_remove) 778 self.server.register_function(self.queue_suspend) 779 self.server.register_function(self.queue_clean) 780 self.server.register_function(self.queue_delete) 781 self.server.register_function(self.queue_dataset_finish) 782 self.server.register_function(self.queue_dataset_toggle_debug) 783 self.server.register_function(self.queue_resume) 784 self.server.register_function(self.queue_validate) 785 self.server.register_function(self.queue_setstatus) 786 self.server.register_function(self.getstatus) 787 self.server.register_function(self.loaddict) 788 self.server.register_function(self.queue_retire) 789 self.server.register_function(self.daemon_suspend) 790 self.server.register_function(self.daemon_resume) 791 self.server.register_function(self.printsummary) 792 self.server.register_function(self.showrunlist) 793 self.server.register_function(self.download_config) 794 self.server.register_function(self.check_connection) 795 self.server.register_function(self.get_simcat_categories)
796
797 - def startsoap(self):
798 self.logger.info('Registering module...') 799 self.register_functions() 800 self.register_module() 801 802 # Now expose some functionality from parameter database 803 paramdb = MySQLParamDb() 804 paramdb.authenticate(self.dbserver,self.username, self.password, 805 self.database,port=self.dbport,keep_open=True) 806 paramdb.connect() 807 soapdb = SoapParamDB(paramdb) 808 soapdb.Register(self) 809 self.serve_forever()
810 811
812 -class SoapTrayCGI(SoapTray):
813 """ 814 XMLRPC server class for submitting jobs to IceProd 815 job connect to server from compute nodes and make status updates 816 Similar to SoapTray class but runs CGI embeded in existing HTTP server 817 """ 818
819 - def __init__(self,cfg):
820 821 self.cfg = cfg 822 self.use_ldap = self.cfg.getboolean('ldap','enable') 823 self.logger = logging.getLogger('SoapTray') 824 self.semaphore = DummySemaphore(verbose=1) 825 self.grid_id = 0 826 self.submithost = os.getenv('HOSTNAME') 827 828 self.use_ldap = self.cfg.getboolean('ldap','enable') 829 self.dbserver = self.cfg.get('database','server') 830 self.dbport = self.cfg.getint('database','port') 831 self.database = self.cfg.get('database','database') 832 self.username = self.cfg.get('database','username') 833 self.password = self.cfg.get('database','password') 834 self.grid_name = self.cfg.get('queue','name') 835 self.batchsys = self.cfg.get('queue','batchsys') 836 self.institution = self.cfg.get('info','INSTITUTION') 837 self.address = self.cfg.get('server','server') 838 self.port = self.cfg.getint('server','port') 839 self.rootdir = self.cfg.get('path','basedir') 840 self.datawarehouse = expandvars(self.cfg.get('path','datawarehouse',raw=True)) 841 842 if self.cfg.has_option('queue','SUBMITHOST'): 843 self.submithost = self.cfg.get('queue','SUBMITHOST') 844 845 self.xmluri = 'http://www.icecube.wisc.edu/simulation/dtd/icetray.dtd' 846 self.server = rpc.MyCGIXMLRPCRequestHandler() 847 848 self.i3db = ConfigDB() 849 self.i3mondb = MonitorDB()
850
851 - def startsoap(self):
852 853 self.logger.info('Registering module...') 854 self.register_functions() 855 self.register_module() 856 857 # Now expose some functionality from parameter database 858 paramdb = MySQLParamDb() 859 paramdb.authenticate(self.dbserver,self.username, self.password, 860 self.database,port=self.dbport,keep_open=True) 861 paramdb.connect() 862 soapdb = SoapParamDB(paramdb) 863 soapdb.Register(self) 864 self.server.handle_request()
865
866 - def submit(self,sconfig,username,password,submitter,production=False):
867 """ 868 Receive a remote request for a job submission 869 @param sconfig: pickled steering configuration 870 @param username: (needed for connecting to the configuration database 871 @param password: (needed for connecting to the configuration database 872 @return: Output generated by queue 873 """ 874 875 i3db = ConfigDB() 876 if not self.auth_db(i3db,username, password,keep_open=True): 877 self.logger.info("Access denied for user %s" % username) 878 return fail("Access denied for user %s" % username) 879 880 self.logger.info("Handling submission from user %s" % username) 881 882 i3steering = cPickle.loads(sconfig) 883 i3q = None 884 if production: 885 try: 886 status,i3q = self.enqueue(i3steering,username,password,submitter) 887 except Exception, e: 888 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 889 self.logger.error('Caught %s: %s' % (sys.exc_type, str(e))) 890 return fail(str(e),e) 891 else: 892 return fail("cgi-soaptray does not support non-production jobs.") 893 del i3steering 894 return status,dumps(i3q),dumps(None)
895 896 897
898 -class Monitor:
899 """ 900 XMLRPC server class for monitoring jobs 901 job connect to server from compute nodes and make status updates 902 Creates own HTTP server 903 """ 904
905 - def getvalue(self,key):
906 """ 907 Retrive dictionary entry 908 @param: key string 909 @return: string value 910 """ 911 i3db = self.i3db.new() 912 self.semaphore.acquire() 913 i3db.connect() 914 logger.debug("fetching entry %s " % key ) 915 value = cPickle.dumps(None) 916 try: 917 value = i3db.fetch_dict_value(key) 918 logger.debug("fetched entry %s,%s " % (key,value) ) 919 except Exception,e: 920 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 921 logger.error(e) 922 i3db.disconnect() 923 self.semaphore.release() 924 return value
925 926
927 - def getfile(self,key,dataset_id=0):
928 """ 929 Retrive dictionary entry 930 @param: key string 931 @return: string value 932 """ 933 i3db = self.i3db.new() 934 self.semaphore.acquire() 935 i3db.connect() 936 value = cPickle.dumps(None) 937 try: 938 value = i3db.fetch_filename(key,dataset_id) 939 value = tuple(value) 940 except Exception,e: 941 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 942 logger.error(e) 943 i3db.disconnect() 944 self.semaphore.release() 945 return value
946 947
948 - def getconfig(self,dataset):
949 """ 950 Download configuration from database 951 """ 952 i3db = self.i3db.new() 953 self.semaphore.acquire() 954 i3db.connect() 955 value = cPickle.dumps(None) 956 try: 957 i3config = i3db.download_config(dataset) 958 value = cPickle.dumps(IceTrayXMLWriter(i3config,self.xmluri).getDOM()) 959 except Exception,e: 960 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 961 logger.error(e) 962 i3db.disconnect() 963 self.semaphore.release() 964 return value
965 966
967 - def getmetadata(self,dataset):
968 """ 969 Download configuration from database 970 """ 971 logger.info("fetching metadata for dataset %d" % dataset) 972 i3db = self.i3db.new() 973 self.semaphore.acquire() 974 i3db.connect() 975 value = cPickle.dumps(None) 976 try: 977 i3config = i3db.download_config(dataset) 978 difplus = i3db.download_metadata(dataset) 979 difplus.GetPlus().SetSteeringFile(os.path.join(difplus.GetStoragePath(),'config.xml')) 980 value = cPickle.dumps(MetadataWriter(difplus).getDOM()) 981 except Exception,e: 982 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 983 logger.error(e) 984 i3db.disconnect() 985 self.semaphore.release() 986 return value
987
988 - def start(self,hostname,dataset=0,job_id=0,key='',grid=0):
989 """ 990 Change the status of a job to indicate it is currently running 991 @param : hostname 992 @param : dataset simdb_id 993 @param : job_id 994 @param : key a passkey to prevent processes from overriding entries 995 @return: dataset_id,nproc,procnum 996 """ 997 i3mondb = self.i3mondb.new() 998 self.semaphore.acquire() 999 i3mondb.connect() 1000 i3mondb.set_auto(True) 1001 logger.info("job %d.%d starting on %s" % (dataset,job_id,hostname)) 1002 if not len(key): key = None 1003 try: 1004 if not grid: grid = self.grid_id 1005 dataset,jobs,queue_id = i3mondb.jobstart(hostname,grid,dataset,job_id,key) 1006 i3mondb.commit() 1007 except Exception,e: 1008 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1009 logger.error(e) 1010 dataset,jobs,queue_id = (0,0,0) 1011 i3mondb.disconnect() 1012 logger.info("job %d.%d started on %s" % (dataset,job_id,hostname)) 1013 self.semaphore.release() 1014 return dataset,jobs,queue_id
1015
1016 - def ping(self,dataset_id,job_id,host,key='',tray=0,iter=0):
1017 """ 1018 Let server know that job is still running 1019 @param dataset_id: runconfig index 1020 @param job_id: process number within dataset 1021 @param host: hostname of computing node 1022 """ 1023 if not len(key): key = None 1024 logger.info("Received ping from %s with job %d.%d " % (host,dataset_id,job_id)) 1025 logger.info("processing tray %d, iter %d " % (tray,iter)) 1026 i3mondb = self.i3mondb.new() 1027 self.semaphore.acquire() 1028 i3mondb.connect() 1029 i3mondb.set_auto(True) 1030 try: 1031 retval = i3mondb.jobping(dataset_id,job_id,host,key,tray,iter) 1032 i3mondb.commit() 1033 except Exception,e: 1034 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1035 logger.error(e) 1036 retval = -1 1037 i3mondb.disconnect() 1038 self.semaphore.release() 1039 return retval
1040
1041 - def finish(self,dataset_id,job_id,stats,key='',mode=0):
1042 """ 1043 Update monitoring for job and write statistics 1044 @param dataset_id: runconfig index 1045 @param job_id: process number within dataset 1046 @param stats: dictonary of stat entries 1047 @param key: security passkey assigned to job 1048 @param mode: if true, finalize job and set status to COPIED instead of 1049 setting its status to READYTOCOPY 1050 """ 1051 if not len(key): key = None 1052 logger.info('job %d.%d finished (mode %u)' % (dataset_id,job_id,mode)) 1053 i3mondb = self.i3mondb.new() 1054 self.semaphore.acquire() 1055 i3mondb.connect() 1056 i3mondb.set_auto(True) 1057 count = 10 1058 try: 1059 logger.debug("stats: "+str(stats)) 1060 retval = i3mondb.jobfinish(dataset_id,job_id,cPickle.loads(stats),key,mode) 1061 if mode: 1062 i3mondb.jobfinalize(dataset_id,job_id,key,status='COPIED') 1063 i3mondb.commit() 1064 except OperationalError,e: 1065 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1066 logger.error(e) 1067 logger.error("failed to set finish status for job %d.%d." % (dataset_id,job_id)) 1068 retval = -1 1069 i3mondb.disconnect() 1070 self.semaphore.release() 1071 logger.debug('finish - done') 1072 return retval
1073
1074 - def copying(self,dataset_id,job_id,key=''):
1075 """ 1076 Update monitoring for job and write statistics 1077 @param dataset_id: runconfig index 1078 @param job_id: process number within dataset 1079 @param stats: dictonary of stat entries 1080 """ 1081 if not len(key): key = None 1082 logger.info('job %d.%d is copying data' % (dataset_id,job_id)) 1083 i3mondb = self.i3mondb.new() 1084 self.semaphore.acquire() 1085 i3mondb.connect() 1086 i3mondb.set_auto(True) 1087 count = 10 1088 try: 1089 logger.debug("setting job status to 'COPYING for %d.%d' " % (dataset_id,job_id)) 1090 retval = i3mondb.jobfinalize(dataset_id,job_id,key,status='COPYING') 1091 i3mondb.commit() 1092 except OperationalError,e: 1093 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1094 logger.error(e) 1095 logger.error("failed to set copying status for job %d.%d." % (dataset_id,job_id)) 1096 retval = -1 1097 i3mondb.disconnect() 1098 self.semaphore.release() 1099 logger.debug('copying - done') 1100 return retval
1101 1102
1103 - def abort(self,job_id,dataset_id,error,errormessage='',key='',stats=cPickle.dumps({})):
1104 """ 1105 Reset any pending jobs to they get reprocesses. 1106 This would typically be run at startup in case the daemon 1107 crashed previously. 1108 """ 1109 if not len(key): key = None 1110 logger.warn('aborting job %d.%d - %s ' % (dataset_id,job_id,errormessage)) 1111 i3mondb = self.i3mondb.new() 1112 self.semaphore.acquire() 1113 i3mondb.connect() 1114 i3mondb.set_auto(True) 1115 retval = 1 1116 try: 1117 i3mondb.jobabort(job_id,dataset_id,error,errormessage,key,cPickle.loads(stats)) 1118 i3mondb.commit() 1119 except OperationalError,e: 1120 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1121 logger.error(e) 1122 logger.error("failed to abort job %d.%d." % (dataset_id,job_id)) 1123 retval = 0 1124 i3mondb.disconnect() 1125 self.semaphore.release() 1126 return retval
1127
1128 - def AddFileURL(self,dataset_id,job_id,url,md5sum,filesize,transfertime,key=''):
1129 """ 1130 Add or change the global location of a file 1131 """ 1132 if not len(key): key = None 1133 filename = os.path.basename(url) 1134 location = os.path.dirname(url) 1135 logger.debug('%06u.%06u: %s' % (dataset_id,job_id,url)) 1136 1137 i3mondb = self.i3mondb.new() 1138 self.semaphore.acquire() 1139 i3mondb.connect() 1140 try: 1141 logger.info('SetFileURL: %06u.%06u:%s' % (dataset_id,job_id,url)) 1142 retval = i3mondb.SetFileURL(job_id,dataset_id,location,filename,md5sum,filesize,transfertime,key) 1143 i3mondb.commit() 1144 retval = 1 1145 except OperationalError,e: 1146 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1147 logger.error(e) 1148 logger.error("failed to set url %s for job %d.%d." % (url,dataset_id,job_id)) 1149 retval = 0 1150 except Exception,e: 1151 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1152 logger.error(e) 1153 1154 i3mondb.disconnect() 1155 self.semaphore.release() 1156 return retval
1157 1158 #------ Multi-part job (DAG) functions --------------------------------------- 1159
1160 - def multipart_job_start(self,dataset_id,queue_id,key=''):
1161 """ 1162 Change the status of a job to indicate it is currently running 1163 @param : dataset simdb_id 1164 @param : job_id 1165 @param : key a passkey to prevent processes from overriding entries 1166 @return: dataset_id,nproc,procnum 1167 """ 1168 i3mondb = self.i3mondb.new() 1169 self.semaphore.acquire() 1170 i3mondb.connect() 1171 i3mondb.set_auto(False) 1172 logger.info("job %d.%d starting" % (dataset_id,queue_id)) 1173 if not len(key): key = None 1174 try: 1175 dataset,jobs,queue_id = i3mondb.multipart_job_start(dataset_id,queue_id,key) 1176 if dataset_id != TASK_DATASET_ERROR_ID: 1177 i3mondb.commit() 1178 else: 1179 i3mondb.rollback() 1180 except OperationalError,e: 1181 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1182 logger.error(e) 1183 dataset,jobs,queue_id = (TASK_DATASET_ERROR_ID,0,0) 1184 i3mondb.disconnect() 1185 self.semaphore.release() 1186 return dataset,jobs,queue_id
1187
1188 - def multipart_job_finish(self,dataset_id,queue_id,key=''):
1189 i3mondb = self.i3mondb.new() 1190 self.semaphore.acquire() 1191 i3mondb.connect() 1192 i3mondb.set_auto(False) 1193 logger.info("job %d.%d finished successfully" % (dataset_id,queue_id)) 1194 if not len(key): key = None 1195 try: 1196 ret = i3mondb.multipart_job_finish(dataset_id,queue_id,key) 1197 if ret: 1198 i3mondb.commit() 1199 else: 1200 i3mondb.rollback() 1201 except OperationalError,e: 1202 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1203 logger.error(e) 1204 i3mondb.disconnect() 1205 self.semaphore.release() 1206 return True
1207
1208 - def task_start(self,dataset_id,queue_id,taskname,tray,iter,hostname,key=''):
1209 i3mondb = self.i3mondb.new() 1210 self.semaphore.acquire() 1211 i3mondb.connect() 1212 i3mondb.set_auto(False) 1213 logger.info("job %d.%d starting task %s, tray %s, iter %s" \ 1214 % (dataset_id,queue_id,taskname,tray,iter)) 1215 if not len(key): key = None 1216 try: 1217 task_id = i3mondb.task_start(dataset_id,queue_id,taskname,tray,iter,hostname,key) 1218 if task_id != TASK_ERROR_ID: 1219 i3mondb.commit() 1220 else: 1221 i3mondb.rollback() 1222 except OperationalError,e: 1223 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1224 logger.error(e) 1225 task_id = TASK_ERROR_ID 1226 i3mondb.disconnect() 1227 self.semaphore.release() 1228 return task_id
1229
1230 - def task_copying_input(self,task_id,key=''):
1231 logger.info("task %d copying input" % task_id) 1232 return self.task_update_status(task_id,'COPYINGINPUT',key)
1233
1234 - def task_processing(self,task_id,key=''):
1235 logger.info("task %d processing" % task_id) 1236 return self.task_update_status(task_id,'PROCESSING',key)
1237
1238 - def task_copying_output(self,task_id,key=''):
1239 logger.info("task %d copying output" % task_id) 1240 return self.task_update_status(task_id,'COPYINGOUTPUT',key)
1241
1242 - def task_update_status(self,task_id,status,key=''):
1243 i3mondb = self.i3mondb.new() 1244 self.semaphore.acquire() 1245 i3mondb.connect() 1246 i3mondb.set_auto(False) 1247 if not len(key): key = None 1248 try: 1249 ret = i3mondb.task_update_status(task_id,status,key) 1250 if ret: 1251 i3mondb.commit() 1252 else: 1253 i3mondb.rollback() 1254 except OperationalError,e: 1255 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1256 logger.error(e) 1257 ret = False 1258 i3mondb.disconnect() 1259 self.semaphore.release() 1260 return ret
1261
1262 - def task_abort(self,task_id,key=''):
1263 i3mondb = self.i3mondb.new() 1264 self.semaphore.acquire() 1265 i3mondb.connect() 1266 i3mondb.set_auto(False) 1267 logger.warn("task %d aborted" % task_id) 1268 if not len(key): key = None 1269 try: 1270 ret = i3mondb.task_abort(task_id,key) 1271 if ret: 1272 i3mondb.commit() 1273 else: 1274 i3mondb.rollback() 1275 except OperationalError,e: 1276 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1277 logger.error(e) 1278 ret = False 1279 i3mondb.disconnect() 1280 self.semaphore.release() 1281 return ret
1282
1283 - def task_finish(self,task_id,stats,key=''):
1284 stats = cPickle.loads(stats) 1285 i3mondb = self.i3mondb.new() 1286 self.semaphore.acquire() 1287 i3mondb.connect() 1288 i3mondb.set_auto(False) 1289 logger.info("task %d finished" % task_id) 1290 if not len(key): key = None 1291 try: 1292 ret = i3mondb.task_finish(task_id,stats,key) 1293 if ret: 1294 i3mondb.commit() 1295 else: 1296 i3mondb.rollback() 1297 except OperationalError,e: 1298 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1299 logger.error(e) 1300 ret = False 1301 i3mondb.disconnect() 1302 self.semaphore.release() 1303 return ret
1304 1305 #------ End of multi-part job (DAG) functions -------------------------------- 1306
1307 - def serve_forever(self,server):
1308 """ 1309 For some reason SOAPpy throws an exception if we get an 1310 HTTP instead of HTTPS request when using SSL 1311 """ 1312 if ssl_supported: 1313 try: 1314 server.serve_forever() 1315 except KeyboardInterrupt: 1316 logger.info("Received keyboard interrupt") 1317 logger.info("Exiting") 1318 os._exit(0) 1319 1320 except SSL.Error,e: 1321 logger.error("received: " + str(e)) 1322 serve_forever(server) 1323 except socket.error,e: 1324 self.logger.error(str(e)) 1325 os._exit(1) 1326 else: # don't catch the SSL exception 1327 try: 1328 server.serve_forever() 1329 except KeyboardInterrupt: 1330 logger.info("Received keyboard interrupt") 1331 logger.info("Exiting") 1332 os._exit(0) 1333 except socket.error,e: 1334 self.logger.error(str(e)) 1335 os._exit(1)
1336
1337 - def echo(self,msg):
1338 return msg
1339 1340
1341 - def register_module(self):
1342 """ 1343 Register daemon with database and update status 1344 """ 1345 i3db = MonitorDB() 1346 if not i3db.authenticate(self.host,self.user,self.passwd, 1347 self.database,port=self.dbport,keep_open=True): 1348 raise Exception, 'unable to authenticate database user' 1349 self.grid_id = i3db.GetGridId(self.grid_name) 1350 i3db.disconnect() 1351 return self.grid_id
1352
1353 - def register_functions(self):
1354 # expose the methods 1355 self.server.register_function(self.echo) 1356 self.server.register_function(self.start) 1357 self.server.register_function(self.finish) 1358 self.server.register_function(self.abort) 1359 self.server.register_function(self.ping) 1360 self.server.register_function(self.getconfig) 1361 self.server.register_function(self.getmetadata) 1362 self.server.register_function(self.getfile) 1363 self.server.register_function(self.getvalue) 1364 self.server.register_function(self.copying) 1365 self.server.register_function(self.AddFileURL) 1366 # expose the multipart job-related functions 1367 self.server.register_function(self.multipart_job_start) 1368 self.server.register_function(self.multipart_job_finish) 1369 self.server.register_function(self.task_start) 1370 self.server.register_function(self.task_copying_input) 1371 self.server.register_function(self.task_processing) 1372 self.server.register_function(self.task_copying_output) 1373 self.server.register_function(self.task_finish) 1374 self.server.register_function(self.task_abort)
1375
1376 - def monitor(self):
1377 self.register_functions() 1378 self.grid_id = self.register_module() 1379 try: 1380 self.serve_forever(self.server) 1381 except Exception,e: 1382 logger.warn(e)
1383
1384 - def SetSemaphore(self,semaphore):
1385 self.semaphore = semaphore
1386
1387 - def __init__(self,cfg):
1388 1389 self.cfg = cfg 1390 self.use_ldap = self.cfg.getboolean('ldap','enable') 1391 self.logger = logging.getLogger('SoapMon') 1392 self.grid_name = self.cfg.get('queue','name') 1393 address = self.cfg.get('monitoring','server') 1394 port = self.cfg.getint('monitoring','port') 1395 self.semaphore = DummySemaphore() 1396 self.usesecure = True 1397 1398 try: 1399 self.xmluri = self.cfg.get('path','uri') 1400 except: 1401 self.xmluri = 'http://www.icecube.wisc.edu/simulation/dtd/icetray.dtd' 1402 1403 if self.cfg.has_option('monitoring','USESSL'): 1404 self.usesecure = self.cfg.getboolean('monitoring','USESSL') 1405 if self.cfg.has_option('security','USESSL'): 1406 self.usesecure = self.usesecure and self.cfg.getboolean('security','USESSL') 1407 if self.usesecure: 1408 try: 1409 cert = expandvars(self.cfg.get('security','SSLCERT')) 1410 key = expandvars(self.cfg.get('security','SSLKEY')) 1411 if not os.path.exists(cert): 1412 self.logger.fatal("Cannot find SSL certificate in %s" % cert) 1413 if not os.path.exists(key): 1414 self.logger.fatal("Cannot find SSL key in %s" % key) 1415 1416 sslctx = SSL.Context(SSL.SSLv23_METHOD) 1417 sslctx.use_privatekey_file (key) 1418 sslctx.use_certificate_file(cert) 1419 self.server = rpc.ThreadedSecureXMLRPCServer((address, port), ssl_context = sslctx) 1420 self.logger.info("Monitoring server running **encrypted** on addr:%s:%d" % (address,port)) 1421 except Exception,e: 1422 self.logger.warn(e) 1423 self.server = rpc.ThreadedXMLRPCServer((address, port)) 1424 self.logger.info("Monitoring server running **un-encrypted** on addr:%s:%d" % (address,port)) 1425 else: 1426 self.server = rpc.ThreadedXMLRPCServer((address, port)) 1427 self.logger.info("Monitoring server running **un-encrypted** on addr:%s:%d" % (address,port)) 1428 1429 self.host = self.cfg.get('database','server') 1430 self.user = self.cfg.get('database','username') 1431 self.passwd = self.cfg.get('database','password') 1432 self.database = self.cfg.get('database','database') 1433 self.dbport = self.cfg.getint('database','port') 1434 1435 self.i3db = ConfigDB() 1436 self.i3db.authenticate(self.host,self.user,self.passwd, 1437 self.database,port=self.dbport,keep_open=False) 1438 1439 self.i3mondb = MonitorDB() 1440 self.i3mondb.authenticate(self.host,self.user,self.passwd, 1441 self.database,port=self.dbport,keep_open=False)
1442 1443 1444
1445 -class MonitorCGI(Monitor):
1446 """ 1447 XMLRPC server class for monitoring jobs 1448 job connect to server from compute nodes and make status updates 1449 Similar to Monitor class but runs CGI embeded in existing HTTP server 1450 1451 """ 1452
1453 - def __init__(self,cfg):
1454 1455 self.semaphore = DummySemaphore(verbose=1) 1456 self.cfg = cfg 1457 self.use_ldap = self.cfg.getboolean('ldap','enable') 1458 self.grid_id = 79 1459 1460 self.xmluri = 'http://www.icecube.wisc.edu/simulation/dtd/icetray.dtd' 1461 self.server = rpc.MyCGIXMLRPCRequestHandler() 1462 1463 self.host = self.cfg.get('database','server') 1464 self.user = self.cfg.get('database','username') 1465 self.passwd = self.cfg.get('database','password') 1466 self.database = self.cfg.get('database','database') 1467 self.dbport = self.cfg.getint('database','port') 1468 1469 self.i3db = ConfigDB() 1470 self.i3db.authenticate(self.host,self.user,self.passwd, 1471 self.database,port=self.dbport,keep_open=False) 1472 1473 self.i3mondb = MonitorDB() 1474 self.i3mondb.authenticate(self.host,self.user,self.passwd, 1475 self.database,port=self.dbport,keep_open=False)
1476
1477 - def monitor(self):
1478 self.register_functions() 1479 self.server.handle_request()
1480