Package iceprod :: Package server :: Module server
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.server

   1  #! /usr/bin/env python 
   2  # 
   3  """ 
   4    XML-RPC Server module for running remote batch jobs on IceTray. 
   5   
   6    copyright (c) 2005 the icecube collaboration 
   7   
   8    @version: $Revision: $ 
   9    @date: $Date: $ 
  10    @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
  11    @todo: Add HTTPS support 
  12    @todo: Add XML validation in suporting classes 
  13   
  14    U{Userguide and other documentation <http://www.icecube.wisc.edu/~juancarlos/software/sim-prod>} 
  15   
  16  """ 
  17   
  18  import sys 
  19  import os 
  20  import string 
  21  import re 
  22  import time 
  23  import getopt 
  24  import cPickle 
  25  import logging 
  26  import signal 
  27  import exceptions 
  28  import pwd,grp  
  29  import thread,threading 
  30  from threading import Thread 
  31  from os.path import expandvars,join 
  32  from cPickle import dumps,loads 
  33  from MySQLdb import OperationalError 
  34  from Queue import Queue 
  35   
  36  try: 
  37     import ldap 
  38     ldap_installed = True 
  39  except ImportError: 
  40     ldap_installed = False 
  41   
  42  import iceprod 
  43  from iceprod.server import rpc 
  44  from iceprod.core.dataclasses import * 
  45  from iceprod.core.xmlwriter import IceTrayXMLWriter 
  46  from iceprod.server.db import ConfigDB,MonitorDB 
  47  from iceprod.core.configuration import Config 
  48  from iceprod.server.db import MySQLParamDb,SoapParamDB 
  49  from iceprod.server.queue import i3ProdQueue 
  50  from iceprod.server.job import i3Job 
  51   
  52  import iceprod.core.logger 
  53  logger = logging.getLogger('server') 
  54   
  55  localhost = os.uname()[1].split(".")[0] 
  56   
  57  soapdaemons = ['soaptray','soapmon','soapqueue','soapdh','soaphisto'] 
  58   
  59  ssl_supported = False 
  60   
  61  try: 
  62      from OpenSSL import SSL 
  63      ssl_supported = True 
  64  except Exception,e: 
  65      print >> sys.stderr,'exception: %s : communicaton will not be encrypted' % str(e) 
  66      pass 
  67   
  68   
69 -def fail(msg,exception=Exception()):
70 return msg,dumps(None),dumps(exception)
71 72
73 -class DummySemaphore:
74 """ 75 Dummy class that shares interface with Semaphore class but does nothing. 76 """ 77
78 - def __init__(self, value=1, verbose=None):
79 self.counter = value 80 self.verbose = verbose
81
82 - def acquire(self, blocking=1):
83 self.counter -= 1 84 if self.verbose: logger.info("count: %d" % self.counter) 85 return self.counter
86
87 - def release(self):
88 self.counter += 1 89 if self.verbose: logger.info("count: %d" % self.counter) 90 return self.counter
91
92 -class QSemaphore(DummySemaphore):
93 logger = logging.getLogger('QueueSemaphore') 94
95 - def __init__(self, value=1, verbose=None):
96 self.verbose = verbose 97 self.queue = Queue(value)
98
99 - def acquire(self, blocking=1):
100 self.queue.put(1) 101 if self.verbose: self.logger.info("count: %d" % self.queue.qsize())
102
103 - def release(self):
104 self.queue.get() 105 if self.verbose: self.logger.info("count: %d" % self.queue.qsize())
106 107
108 -class SoapTray:
109 110 """ 111 XMLRPC server class for submitting jobs to IceProd 112 """ 113
114 - def __init__(self,cfg):
115 self.cfg = cfg 116 117 self.use_ldap = self.cfg.getboolean('ldap','enable') and ldap_installed 118 self.logger = logging.getLogger('SoapTray') 119 self.dbserver = self.cfg.get('database','server') 120 self.dbport = self.cfg.getint('database','port') 121 self.database = self.cfg.get('database','database') 122 self.username = self.cfg.get('database','username') 123 self.password = self.cfg.get('database','password') 124 self.grid_name = self.cfg.get('queue','name') 125 self.batchsys = self.cfg.get('queue','batchsys') 126 self.institution = self.cfg.get('info','INSTITUTION') 127 self.address = self.cfg.get('server','server') 128 self.port = self.cfg.getint('server','port') 129 self.rootdir = self.cfg.get('path','basedir') 130 self.datawarehouse = expandvars(self.cfg.get('path','datawarehouse',raw=True)) 131 self.usesecure = True 132 if self.cfg.has_option('server','USESSL'): 133 self.usesecure = self.cfg.getboolean('server','USESSL') 134 if self.cfg.has_option('security','USESSL'): 135 self.usesecure = self.usesecure and self.cfg.getboolean('security','USESSL') 136 137 if self.use_ldap: 138 self.ldap_users = self.cfg.get('ldap','users').split(',') 139 self.ldap_users = map(string.strip,self.ldap_users) 140 141 self.iceprodcert = None 142 self.iceprodkey = None 143 if self.usesecure: 144 self.iceprodcert = expandvars(self.cfg.get('security','SSLCERT')) 145 self.iceprodkey = expandvars(self.cfg.get('security','SSLKEY')) 146 147 self.submithost = os.getenv('HOSTNAME') 148 if self.cfg.has_option('queue','SUBMITHOST'): 149 self.submithost = self.cfg.get('queue','SUBMITHOST') 150 151 # Try to setup a https server (SSL) 152 if ssl_supported and self.usesecure: 153 sslctx = SSL.Context(SSL.SSLv23_METHOD) 154 if not os.path.exists(self.iceprodcert): 155 self.logger.fatal("Cannot find SSL certificate in %s" % self.iceprodcert) 156 if not os.path.exists(self.iceprodkey): 157 self.logger.fatal("Cannot find SSL key in %s" % self.iceprodkey) 158 159 sslctx.use_privatekey_file (self.iceprodkey) 160 sslctx.use_certificate_file(self.iceprodcert) 161 self.server = rpc.ThreadedSecureXMLRPCServer( 162 (self.address, self.port), 163 logRequests=False, ssl_context = sslctx) 164 self.logger.info("soaptray server running **encrypted** on addr:%s:%d" % (self.address,self.port)) 165 166 # Try to setup a plain http server 167 else: 168 self.server = rpc.ThreadedXMLRPCServer((self.address, self.port)) 169 self.logger.info("soaptray server running **un-encrypted** on addr:%s:%d" % (self.address,self.port))
170 171
172 - def register_module(self):
173 """ 174 Register daemon with database and update status 175 """ 176 i3db = MonitorDB() 177 if not i3db.authenticate(self.dbserver,self.username, self.password, 178 self.database, port=self.dbport,keep_open=True): 179 raise Exception, 'unable to authenticate database user' 180 self.grid_id = i3db.GetGridId(self.grid_name) 181 i3db.disconnect() 182 return self.grid_id
183
184 - def serve_forever(self):
185 """ 186 For some reason SOAPpy throws an exception if we get an 187 HTTP instead of HTTPS request when using SSL 188 """ 189 if ssl_supported: 190 try: 191 self.server.serve_forever() 192 except KeyboardInterrupt: 193 self.logger.info("Received keyboard interrupt") 194 self.logger.info("Exiting") 195 os._exit(0) 196 197 except SSL.Error,e: 198 self.logger.error("received: " + str(e)) 199 self.serve_forever() 200 except socket.error,e: 201 self.logger.error(str(e)) 202 os._exit(1) 203 else: # don't catch the SSL exception 204 try: 205 self.server.serve_forever() 206 except KeyboardInterrupt: 207 self.logger.info("Received keyboard interrupt") 208 self.logger.info("Exiting") 209 os._exit(0) 210 except socket.error,e: 211 self.logger.error(str(e)) 212 os._exit(1)
213
214 - def auth_db(self,db_obj,username,password,keep_open=False):
215 """ 216 Authenticate remotely against database 217 This is the exposed method 218 @param db_obj: database instance 219 @param username: 220 @param password: 221 @param : keep_open 222 @return: True if authenticated False otherwise 223 """ 224 if not password: 225 return False 226 227 self.logger.info("ldap is %s" % self.use_ldap) 228 if self.cfg.getboolean('ldap','enable') and not ldap_installed: 229 self.logger.warn("ldap is enabled but not supported.") 230 231 if self.use_ldap: 232 l=ldap.initialize(self.cfg.get('ldap','url')) 233 try: 234 result = l.bind(self.cfg.get('ldap','cn',raw=True)%username, password,ldap.AUTH_SIMPLE) 235 self.logger.info(str(l.result(result))) 236 except ldap.INVALID_CREDENTIALS, error: 237 logger.error(error) 238 except ldap.LDAPError, error: 239 self.logger.error(error) 240 except Exception, error: 241 self.logger.error(error) 242 else: 243 if username in self.ldap_users: 244 logger.info("found %s in ldap users" % username) 245 return db_obj.authenticate(self.dbserver,self.username,self.password,self.database,port=self.dbport,keep_open=keep_open) 246 247 logger.info("Assuming %s is a database account " % username) 248 return db_obj.authenticate(self.dbserver,username,password,self.database,port=self.dbport,keep_open=keep_open)
249 250 251
252 - def authenticate(self,username,password):
253 """ 254 Authenticate remotely against database 255 This is the exposed method 256 @param username: 257 @param password: 258 @return: True if authenticated False otherwise 259 """ 260 if ConfigDB().authenticate2(self.dbserver,username,password,self.database,port=self.dbport): 261 return True 262 263 if self.use_ldap: 264 l=ldap.initialize(self.cfg.get('ldap','url')) 265 try: 266 result = l.bind(self.cfg.get('ldap','cn',raw=True)%username, password,ldap.AUTH_SIMPLE) 267 except ldap.INVALID_CREDENTIALS, error: 268 logger.error(error) 269 return False 270 else: 271 return True 272 else: 273 return False
274 275
276 - def enqueue(self,i3steering,username,password,submitter):
277 configdb = ConfigDB() 278 configdb.SetSubmitter(submitter) 279 configdb.SetInstitution(self.institution) 280 if self.cfg.has_option('soapdh','tempdata'): 281 configdb.SetTempStoragePath(expandvars(self.cfg.get('soapdh','tempdata',raw=True))) 282 else: 283 configdb.SetTempStoragePath(expandvars(self.cfg.get('path','datawarehouse',raw=True))) 284 configdb.SetGlobalStoragePath(expandvars(self.cfg.get('path','datawarehouse',raw=True))) 285 mondb = MonitorDB() 286 287 if not self.auth_db(configdb,username, password,keep_open=True): 288 self.logger.info("ConfigDB: Access denied for user %s" % username) 289 configdb.disconnect() 290 return fail("ConfigDB: Access denied for user %s" % username) 291 if not self.auth_db(mondb,username, password,keep_open=True): 292 self.logger.info("MonitorDB: Access denied for user %s" % username) 293 mondb.disconnect() 294 return fail("MonitorDB: Access denied for user %s" % username) 295 296 q = i3ProdQueue(self.cfg) 297 q.SetConfigDB(configdb) 298 q.SetMonitorDB(mondb) 299 q.SetRootDir(self.rootdir) 300 q.SetSubmitHost(self.submithost) 301 q.SetSubmitter(submitter) 302 q.SetInstitution(self.institution) 303 status,i3q = q.EnQueue(i3steering) 304 305 # don't keep connections hanging 306 configdb.disconnect() 307 mondb.disconnect() 308 del q 309 return status,i3q
310 311
312 - def submit(self, 313 sconfig, 314 username, 315 password, 316 submitter, 317 production=False, 318 start=0, 319 end=0, 320 dataset=0):
321 """ 322 Receive a remote request for a job submission 323 @param sconfig: pickled steering configuration 324 @param username: (needed for connecting to the configuration database 325 @param password: (needed for connecting to the configuration database 326 @param submitter: username of person who submitted dataset 327 @param production: boolean flag 328 @param start: optional begining of job sequence (non-prod) 329 @param end: optional end of job sequence (non-prod) 330 @param dataset: optional (non-prod) 331 @return: Output generated by queue 332 """ 333 334 i3db = ConfigDB() 335 if not self.auth_db(i3db,username, password): 336 self.logger.info("Access denied for user %s" % username) 337 return fail("Access denied for user %s" % username) 338 339 self.logger.info("Handling submission from user %s" % username) 340 341 i3steering = cPickle.loads(sconfig) 342 i3q = None 343 if production: 344 try: 345 status,i3q = self.enqueue(i3steering,username,password,submitter) 346 except Exception, e: 347 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 348 self.logger.error('Caught %s: %s' % (sys.exc_type, str(e))) 349 i3db.disconnect() 350 return fail(str(e),e) 351 else: 352 q = i3ProdQueue(self.cfg) 353 q.SetRootDir(self.rootdir) 354 q.SetSubmitHost(self.submithost) 355 q.SetSubmitter(submitter) 356 357 try: 358 status,i3q = q.Submit(i3steering,first=start,last=end,npid=dataset) 359 except Exception, e: 360 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 361 self.logger.error('Caught %s: %s' % (sys.exc_type, str(e))) 362 i3db.disconnect() 363 del q 364 return fail(str(e),e) 365 del q 366 i3db.disconnect() 367 del i3steering 368 return status,dumps(i3q),dumps(None)
369 370
371 - def checkjobs(self,i3q_pkl,username,password):
372 """ 373 Receive a remote request for a job status check 374 @param i3q_pkl: a serialized iGrid object 375 """ 376 377 i3db = ConfigDB() 378 if not self.auth_db(i3db,username, password,keep_open=True): 379 self.logger.error("Access denied for user %s" % username) 380 return fail("Access denied for user %s" % username) 381 382 i3q = cPickle.loads(i3q_pkl) 383 i3q.batchsys = self.batchsys 384 status = i3q.CheckQ(i3db) 385 i3db.disconnect() 386 return status
387
388 - def queue_remove(self,i3q_pkl,username,password):
389 """ 390 Receive a remote request for a job status check 391 @param i3q_pkl: a serialized iGrid object 392 """ 393 394 i3db = MonitorDB() 395 if not self.auth_db(i3db,username, password,keep_open=True): 396 self.logger.info("Access denied for user %s" % username) 397 return fail("Access denied for user %s" % username) 398 399 i3q = cPickle.loads(i3q_pkl) 400 i3q.batchsys = self.batchsys 401 status = i3q.QRemove(i3db) 402 i3db.disconnect() 403 return status
404
405 - def queue_suspend(self,username,password,dataset,job):
406 jobstr = ".%d" % job 407 if job < 0: jobstr = "" 408 i3db = MonitorDB() 409 if not self.auth_db(i3db,username, password,keep_open=True): 410 self.logger.warn("suspend %d%s denied for user %s" % (dataset,jobstr,username)) 411 return fail("suspend %d%s denied for user %s" % (dataset,jobstr,username)) 412 try: 413 i3db.jobsuspend(job,dataset,True) 414 self.logger.info("suspend %d%s granted for user %s" % (dataset,jobstr,username)) 415 i3db.add_history(username,"suspend %d%s" % (dataset,jobstr)) 416 self.logger.info("updated stats") 417 i3db.disconnect() 418 return "suspend %d%s granted for user %s" % (dataset,jobstr,username) 419 except Exception,e: 420 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 421 self.logger.error("suspend %d%s failed:%s" % (dataset,jobstr,str(e))) 422 i3db.disconnect() 423 return fail("suspend %d%s failed:%s" % (dataset,jobstr,str(e)))
424
425 - def queue_resume(self,username,password,dataset,job):
426 jobstr = ".%d" % job 427 if job < 0: jobstr = "" 428 i3db = MonitorDB() 429 if not self.auth_db(i3db,username, password,keep_open=True): 430 self.logger.warn("resume %d%s denied for user %s" % (dataset,jobstr,username)) 431 i3db.disconnect() 432 return fail("resume %d%s denied for user %s" % (dataset,jobstr,username)) 433 try: 434 i3db.jobsuspend(job,dataset,False) 435 self.logger.info("resume %d%s granted for user %s" % (dataset,jobstr,username)) 436 i3db.add_history(username,"resume %d%s" % (dataset,jobstr)) 437 self.logger.info("updated stats") 438 i3db.disconnect() 439 return "resume %d%s granted for user %s" % (dataset,jobstr,username) 440 except Exception, e: 441 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 442 self.logger.error("resume %d%s failed:%s" % (dataset,jobstr,e)) 443 i3db.disconnect() 444 return "resume %d%s failed:%s" % (dataset,jobstr,e)
445
446 - def queue_reset(self,username,password,dataset,job):
447 jobstr = ".%d" % job 448 if job < 0: jobstr = "" 449 i3db = MonitorDB() 450 if not self.auth_db(i3db,username, password,keep_open=True): 451 self.logger.warn("reset %d%s denied for user %s" % (dataset,jobstr,username)) 452 return fail("reset %d%s denied for user %s" % (dataset,jobstr,username)) 453 try: 454 i3db.jobsuspend(job,dataset,False) 455 i3db.add_history(username,"reset %d%s" % (dataset,jobstr)) 456 self.logger.info("reset %d%s granted for user %s" % (dataset,jobstr,username)) 457 i3db.disconnect() 458 return "reset %d%s granted for user %s" % (dataset,jobstr,username) 459 except Exception, e: 460 self.logger.error(sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) ) 461 self.logger.error("reset %d%s failed:%s" % (dataset,jobstr,e)) 462 i3db.disconnect() 463 return "reset %d%s failed:%s" % (dataset,jobstr,e)
464 465
466 - def queue_clean(self,username,password,dataset):
467 i3db = MonitorDB() 468 if not self.auth_db(i3db,username, password,keep_open=True): 469 self.logger.warn("clean %d denied for user %s" % (dataset,username)) 470 i3db.disconnect() 471 return fail("clean %d denied for user %s" % (dataset,username)) 472 try: 473 i3db.update_monitoring(self.grid_id,dataset_id=dataset) 474 i3db.jobclean(dataset) 475 i3db.add_history(username,"clean %d" % dataset) 476 self.logger.info("clean %d granted for user %s" % (dataset,username)) 477 i3db.disconnect() 478 return "clean %d granted for user %s" % (dataset,username) 479 except Exception,e: 480 self.logger.error("clean %d failed:%s" % (dataset,str(e))) 481 i3db.disconnect() 482 return fail("clean %d failed:%s" % (dataset,str(e)))
483
484 - def queue_delete(self,username,password,dataset):
485 i3db = ConfigDB() 486 i3mon = MonitorDB() 487 if not self.auth_db(i3db,username, password,keep_open=True): 488 self.logger.warn("delete %d denied for user %s" % (dataset,username)) 489 i3db.disconnect() 490 return fail("delete %d denied for user %s" % (dataset,username)) 491 self.auth_db(i3mon,username, password,keep_open=True) 492 try: 493 i3mon.update_monitoring(grid_id=None,dataset_id=dataset) 494 i3mon.jobclean(dataset,archive=False) 495 i3db.set_metadata_subcat(dataset,sub_cat="obsolete") 496 i3mon.setDatasetStatus(dataset,'OBSOLETE') 497 i3mon.validate(dataset,'FALSE') 498 i3mon.add_history(username,"nuke %d" % dataset) 499 i3mon.clearStorageURL(dataset) 500 self.logger.info("delete %d granted for user %s" % (dataset,username)) 501 i3db.disconnect() 502 i3mon.disconnect() 503 return "delete %d granted for user %s" % (dataset,username) 504 except Exception,e: 505 i3db.disconnect() 506 i3mon.disconnect() 507 self.logger.error("delete %d failed:%s" % (dataset,str(e))) 508 return fail("delete %d failed:%s" % (dataset,str(e)))
509
510 - def queue_dataset_finish(self,username,password,dataset):
511 i3db = MonitorDB() 512 if not self.auth_db(i3db,username, password,keep_open=True): 513 i3db.disconnect() 514 self.logger.warn("clean %d denied for user %s" % (dataset,username)) 515 return fail("clean %d denied for user %s" % (dataset,username)) 516 try: 517 i3db.update_monitoring(self.grid_id,dataset_id=dataset) 518 i3db.jobclean(dataset) 519 i3db.setDatasetStatus(dataset,'COMPLETE') 520 i3db.SuspendGridDataset('any',dataset) 521 i3db.add_history(username,"finish %d" % dataset) 522 i3db.disconnect() 523 self.logger.info("finish %d granted for user %s" % (dataset,username)) 524 return "finish %d granted for user %s" % (dataset,username) 525 except Exception,e: 526 i3db.disconnect() 527 self.logger.error("finish %d failed:%s" % (dataset,str(e))) 528 return fail("finish %d failed:%s" % (dataset,str(e)))
529
530 - def queue_dataset_toggle_debug(self,username,password,dataset):
531 i3db = MonitorDB() 532 if not self.auth_db(i3db,username, password,keep_open=True): 533 i3db.disconnect() 534 self.logger.warn("toggle debug %d denied for user %s" % (dataset,username)) 535 return fail("toggle debug %d denied for user %s" % (dataset,username)) 536 try: 537 i3db.ToggleDatasetDebug(dataset) 538 i3db.add_history(username,"toggle debug %d" % dataset) 539 i3db.disconnect() 540 self.logger.info("toggle debug %d granted for user %s" % (dataset,username)) 541 return "toggle debug %d granted for user %s" % (dataset,username) 542 except Exception,e: 543 i3db.disconnect() 544 self.logger.error("toggle debug %d failed:%s" % (dataset,str(e))) 545 return fail("toggle debug %d failed:%s" % (dataset,str(e)))
546 547
548 - def loaddict(self,odict_pkl,username,password,dataset_id=0):
549 i3db = ConfigDB() 550 if not self.auth_db(i3db,username, password,keep_open=True): 551 i3db.disconnect() 552 self.logger.warn("validate %d denied for user %s" % (dataset,username)) 553 return fail("validate %d denied for user %s" % (dataset,username)) 554 try: 555 odict = cPickle.loads(odict_pkl) 556 i3db.load_dictionary(odict,dataset_id) 557 i3db.disconnect() 558 except Exception,e: 559 i3db.disconnect() 560 self.logger.warn("failed to load dictionary :\n%s" % str(e)) 561 return fail("failed to load dictionary :\n%s" % str(e))
562
563 - def queue_dataset_setstatus(self,username,password,dataset,status):
564 i3db = MonitorDB() 565 if not self.auth_db(i3db,username, password,keep_open=True): 566 i3db.disconnect() 567 self.logger.warn("validate %d denied for user %s" % (dataset,username)) 568 return fail("validate %d denied for user %s" % (dataset,username)) 569 try: 570 i3db.setDatasetStatus(dataset,status) 571 if status == 'PROCESSING': 572 i3db.SuspendGridDataset('all',dataset,0) 573 i3db.add_history(username,"dataset set status %s %d" % (status,dataset)) 574 i3db.disconnect() 575 self.logger.info("dataset_setstatus '%s' on dataset %d granted for user %s" % (status,dataset,username)) 576 return "setstatus '%s' on dataset %d granted for user %s" % (status,dataset,username) 577 except Exception,e: 578 i3db.disconnect() 579 self.logger.error("setstatus '%s' on dataset %d failed \n%s" % (status,dataset,str(e))) 580 return fail("setstatus '%s' on dataset %d failed \n%s" % (status,dataset,str(e)))
581
582 - def queue_setstatus(self,username,password,dataset,job,status):
583 i3db = MonitorDB() 584 if not self.auth_db(i3db,username, password,keep_open=True): 585 i3db.disconnect() 586 self.logger.warn("validate %d denied for user %s" % (dataset,username)) 587 return fail("validate %d denied for user %s" % (dataset,username)) 588 try: 589 i3db.jobsetstatus(job,dataset,status) 590 i3db.add_history(username,"set status %s %d.%d" % (status,dataset,job)) 591 i3db.disconnect() 592 self.logger.info("setstatus '%s' on %d.%d granted for user %s" % (status,dataset,job,username)) 593 return "setstatus '%s' on %d.%d granted for user %s" % (status,dataset,job,username) 594 except Exception,e: 595 i3db.disconnect() 596 self.logger.error("setstatus '%s' on %d.%d failed \n%s" % (status,dataset,job,str(e))) 597 return fail("setstatus '%s' on %d.%d failed \n%s" % (status,dataset,job,str(e)))
598
599 - def queue_retire(self,username,password,dataset):
600 """ 601 Mark dataset as obsolete 602 """ 603 i3db = ConfigDB() 604 i3mon = MonitorDB() 605 if not self.auth_db(i3db,username, password,keep_open=True): 606 i3db.disconnect() 607 self.logger.warn("retire %d denied for user %s" % (dataset,username)) 608 return fail("retire %d denied for user %s" % (dataset,username)) 609 self.auth_db(i3mon,username, password,keep_open=True) 610 try: 611 i3mon.update_monitoring(grid_id=None,dataset_id=dataset) 612 i3db.set_metadata_subcat(dataset,sub_cat="obsolete") 613 i3mon.setDatasetStatus(dataset,'OBSOLETE') 614 i3mon.add_history(username,"retire s %d" % dataset) 615 i3db.disconnect() 616 i3mon.disconnect() 617 self.logger.info("retire dataset %d granted for user %s" % (dataset,username)) 618 return "retire dataset %d granted for user %s" % (dataset,username) 619 except Exception,e: 620 i3db.disconnect() 621 i3mon.disconnect() 622 self.logger.error("retire dataset %d failed \n%s" % (dataset,str(e))) 623 return fail("retire dataset %d failed \n%s" % (dataset,str(e)))
624 625
626 - def queue_validate(self,username,password,dataset,valid=True):
627 i3db = MonitorDB() 628 if not self.auth_db(i3db,username, password,keep_open=True): 629 i3db.disconnect() 630 self.logger.warn("validate %d denied for user %s" % (dataset,username)) 631 return fail("validate %d denied for user %s" % (dataset,username)) 632 try: 633 if valid: 634 i3db.validate(dataset,'TRUE') 635 else: 636 i3db.validate(dataset,'FALSE') 637 i3db.add_history(username,"validate %d" % dataset) 638 i3db.disconnect() 639 self.logger.info("change %d granted for user %s" % (dataset,username)) 640 return "change %d granted for user %s" % (dataset,username) 641 except Exception,e: 642 i3db.disconnect() 643 self.logger.error("validate %d failed:%s" % (dataset,str(e))) 644 return fail("validate %d failed:\n%s" % (dataset,str(e)))
645 646
647 - def daemon_suspend(self,username,password,grid,daemon='all'):
648 i3db = MonitorDB() 649 if not self.auth_db(i3db,username, password,keep_open=True): 650 i3db.disconnect() 651 self.logger.warn("suspend %s.%s denied for user %s" % (grid,daemon,username)) 652 return fail("suspend %s.%s denied for user %s" % (grid,daemon,username)) 653 try: 654 if daemon=='all': 655 for d in soapdaemons: 656 i3db.GridRequestSuspend(grid,d) 657 elif daemon in soapdaemons: 658 i3db.GridRequestSuspend(grid,daemon) 659 else: 660 self.logger.error("%s: invalid daemon %s" % (grid,daemon)) 661 return fail("%s: invalid daemon %s" % (grid,daemon)) 662 663 i3db.add_history(username,"suspend daemon %s.%s" % (grid,daemon) ) 664 i3db.disconnect() 665 self.logger.info("suspend %s.%s granted for user %s" % (grid,daemon,username)) 666 return "suspend %s.%s granted for user %s" % (grid,daemon,username) 667 except Exception,e: 668 i3db.disconnect() 669 self.logger.error("suspend %s.%s failed:%s" % (grid,daemon,username)) 670 return fail("suspend %s.%s failed:%s" % (grid,daemon,username))
671
672 - def daemon_resume(self,username,password,grid,daemon='all'):
673 i3db = MonitorDB() 674 if not self.auth_db(i3db,username, password,keep_open=True): 675 i3db.disconnect() 676 self.logger.warn("resume %s.%s denied for user %s" % (grid,daemon,username)) 677 return fail("resume %s.%s denied for user %s" % (grid,daemon,username)) 678 try: 679 if daemon=='all': 680 for d in soapdaemons: 681 i3db.GridRequestResume(grid,d) 682 elif daemon in soapdaemons: 683 i3db.GridRequestResume(grid,daemon) 684 else: 685 i3db.disconnect() 686 self.logger.error("%s: invalid daemon %s" % (grid,daemon)) 687 return fail("%s: invalid daemon %s" % (grid,daemon)) 688 689 i3db.add_history(username,"resume daemon %s.%s" % (grid,daemon) ) 690 i3db.disconnect() 691 self.logger.info("resume %s.%s granted for user %s" % (grid,daemon,username)) 692 return "resume %s.%s granted for user %s" % (grid,daemon,username) 693 except Exception,e: 694 i3db.disconnect() 695 self.logger.error("resume %s.%s failed:%s" % (grid,daemon,username)) 696 return fail("resume %s.%s failed:%s" % (grid,daemon,username))
697
698 - def grid_add(self,username,password,grid,dataset):
699 i3db = MonitorDB() 700 if not self.auth_db(i3db,username, password,keep_open=True): 701 i3db.disconnect() 702 self.logger.warn("grid_add %s %s denied for user %s" % (grid,dataset,username)) 703 return fail("grid_add %s %s denied for user %s" % (grid,dataset,username)) 704 try: 705 i3db.InitializeGridStats([grid],dataset) 706 i3db.add_history(username,"Grid %s added to dataset %s" % (grid,dataset) ) 707 i3db.disconnect() 708 self.logger.info("grid_add %s %s granted for user %s" % (grid,dataset,username)) 709 return "grid_add %s %s granted for user %s" % (grid,dataset,username) 710 except Exception,e: 711 i3db.disconnect() 712 self.logger.error("grid_add %s %s failed:%s" % (grid,dataset,username))
713
714 - def grid_suspend_dataset(self,username,password,grid,dataset,suspend=1):
715 i3db = MonitorDB() 716 if not self.auth_db(i3db,username, password,keep_open=True): 717 i3db.disconnect() 718 self.logger.warn("grid_suspend_dataset %s %s denied for user %s" % (grid,dataset,username)) 719 return "grid_suspend_dataset %s %s denied for user %s" % (grid,dataset,username) 720 try: 721 i3db.SuspendGridDataset(grid,dataset,suspend) 722 i3db.add_history(username,"Grid %s suspend set to %s for dataset %s" % (grid,suspend,dataset) ) 723 i3db.disconnect() 724 self.logger.info("grid_suspend_dataset %s %s granted for user %s" % (grid,dataset,username)) 725 return "grid_suspend_dataset %s %s granted for user %s" % (grid,dataset,username) 726 except Exception,e: 727 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 728 i3db.disconnect() 729 self.logger.error("grid_suspend_dataset %s %s failed:%s" % (grid,dataset,username)) 730 return "grid_suspend_dataset %s %s failed %s" % (grid,dataset,e)
731 732
733 - def get_simcat_categories(self):
734 """ 735 Get list of valid simcats 736 """ 737 i3db = ConfigDB() 738 if not i3db.authenticate(self.dbserver,self.username, self.password, 739 self.database,port=self.dbport,keep_open=True): 740 i3db.disconnect() 741 self.logger.error("Access denied for user %s" % username) 742 retval = fail("Access denied for user %s" % username) 743 else: 744 retval = dumps(i3db.get_simcat_categories()) 745 i3db.disconnect() 746 return retval
747
748 - def printsummary(self,days):
749 """ 750 get usage statistics 751 @param days number of days (from today) to get data from 752 @return formated string with usage statistics 753 """ 754 i3db = MonitorDB() 755 if not i3db.authenticate(self.dbserver,self.username, self.password, 756 self.database,port=self.dbport,keep_open=True): 757 i3db.disconnect() 758 self.logger.warn("printsummary %s.%s denied for user %s" % (grid,daemon,username)) 759 return fail("printsummary %s.%s denied for user %s" % (grid,daemon,username)) 760 try: 761 retval = i3db.printsummary(days) 762 self.logger.info("printsummary: serving request") 763 i3db.disconnect() 764 return retval 765 except Exception,e: 766 i3db.disconnect() 767 self.logger.error("print summary failed: %s" % e) 768 return fail("print summary failed")
769
770 - def getstatus(self,dataset,job=-1):
771 """ 772 Get list of datasets in database 773 @param search_string: a string containing key words to use 774 """ 775 i3db = MonitorDB() 776 if not i3db.authenticate(self.dbserver,self.username, self.password, 777 self.database,port=self.dbport,keep_open=True): 778 self.logger.error("Access denied for user %s" % username) 779 retval = fail("Access denied for user %s" % username) 780 else: 781 retval = dumps(i3db.getstatus(dataset,job)) 782 i3db.disconnect() 783 return retval
784
785 - def getdatasetstatus(self,dataset):
786 """ 787 Get list of datasets in database 788 @param search_string: a string containing key words to use 789 """ 790 i3db = MonitorDB() 791 if not i3db.authenticate(self.dbserver,self.username, self.password, 792 self.database,port=self.dbport,keep_open=True): 793 self.logger.error("Access denied for user %s" % username) 794 retval = fail("Access denied for user %s" % username) 795 else: 796 retval = dumps(i3db.getDatasetStatus(dataset)) 797 i3db.disconnect() 798 return retval
799 800
801 - def showrunlist(self,search_string=""):
802 """ 803 Get list of datasets in database 804 @param search_string: a string containing key words to use 805 """ 806 i3db = ConfigDB() 807 if not i3db.authenticate(self.dbserver,self.username, self.password, 808 self.database,port=self.dbport,keep_open=True): 809 self.logger.error("Access denied for user %s" % username) 810 retval = fail("Access denied for user %s" % username) 811 else: 812 retval = dumps(i3db.show_dataset_table(search_string)) 813 i3db.disconnect() 814 return retval
815
816 - def download_config(self,dataset,defaults=False,descriptions=False):
817 """ 818 Fetch configuration from database 819 @param datasest: dataset id for configuration 820 """ 821 i3db = ConfigDB() 822 if not i3db.authenticate(self.dbserver,self.username, self.password, 823 self.database,port=self.dbport,keep_open=True): 824 self.logger.error("Access denied for user %s" % username) 825 retval = fail("Access denied for user %s" % username) 826 else: 827 retval = dumps(i3db.download_config(dataset,defaults,descriptions)) 828 i3db.disconnect() 829 return retval
830
831 - def check_connection(self):
832 """ 833 Fetch configuration from database 834 @param datasest: dataset id for configuration 835 """ 836 msg = "%80s\n" % ('*'*80) 837 msg += '\n' 838 msg += "%80s\n" % time.asctime() 839 msg += '\n' 840 msg += "%80s\n" % ("IceProd Server version %s" % iceprod.server.__version__) 841 msg += '\n' 842 msg += "%80s\n" % ("Running on host %s" % os.uname()[1]) 843 msg += '\n' 844 msg += "%80s\n" % ('*'*80) 845 return dumps(msg)
846
847 - def echo(self,msg):
848 return msg
849 850
851 - def register_functions(self):
852 """ 853 expose the methods 854 """ 855 self.server.register_function(self.echo) 856 self.server.register_function(self.submit) 857 self.server.register_function(self.authenticate) 858 self.server.register_function(self.checkjobs) 859 self.server.register_function(self.queue_remove) 860 self.server.register_function(self.queue_suspend) 861 self.server.register_function(self.queue_clean) 862 self.server.register_function(self.queue_delete) 863 self.server.register_function(self.queue_dataset_finish) 864 self.server.register_function(self.queue_dataset_toggle_debug) 865 self.server.register_function(self.queue_resume) 866 self.server.register_function(self.queue_reset) 867 self.server.register_function(self.queue_validate) 868 self.server.register_function(self.queue_setstatus) 869 self.server.register_function(self.queue_dataset_setstatus) 870 self.server.register_function(self.getstatus) 871 self.server.register_function(self.getdatasetstatus) 872 self.server.register_function(self.loaddict) 873 self.server.register_function(self.queue_retire) 874 self.server.register_function(self.daemon_suspend) 875 self.server.register_function(self.daemon_resume) 876 self.server.register_function(self.printsummary) 877 self.server.register_function(self.showrunlist) 878 self.server.register_function(self.download_config) 879 self.server.register_function(self.check_connection) 880 self.server.register_function(self.grid_add) 881 self.server.register_function(self.grid_suspend_dataset) 882 self.server.register_function(self.get_simcat_categories)
883
884 - def startsoap(self):
885 self.logger.info('Registering module...') 886 self.register_functions() 887 self.register_module() 888 889 # Now expose some functionality from parameter database 890 paramdb = MySQLParamDb() 891 paramdb.authenticate(self.dbserver,self.username, self.password, 892 self.database,port=self.dbport,keep_open=True) 893 paramdb.connect() 894 soapdb = SoapParamDB(paramdb) 895 soapdb.Register(self) 896 self.serve_forever()
897 898
899 -class SoapTrayCGI(SoapTray):
900 """ 901 XMLRPC server class for submitting jobs to IceProd 902 job connect to server from compute nodes and make status updates 903 Similar to SoapTray class but runs CGI embeded in existing HTTP server 904 """ 905
906 - def __init__(self,cfg):
907 908 SoapTray.__init__(self,cfg) 909 self.cfg = cfg 910 self.use_ldap = self.cfg.getboolean('ldap','enable') 911 self.logger = logging.getLogger('SoapTray') 912 self.semaphore = DummySemaphore(verbose=1) 913 self.grid_id = 0 914 self.submithost = os.getenv('HOSTNAME') 915 916 self.use_ldap = self.cfg.getboolean('ldap','enable') 917 self.dbserver = self.cfg.get('database','server') 918 self.dbport = self.cfg.getint('database','port') 919 self.database = self.cfg.get('database','database') 920 self.username = self.cfg.get('database','username') 921 self.password = self.cfg.get('database','password') 922 self.grid_name = self.cfg.get('queue','name') 923 self.batchsys = self.cfg.get('queue','batchsys') 924 self.institution = self.cfg.get('info','INSTITUTION') 925 self.address = self.cfg.get('server','server') 926 self.port = self.cfg.getint('server','port') 927 self.rootdir = self.cfg.get('path','basedir') 928 self.datawarehouse = expandvars(self.cfg.get('path','datawarehouse',raw=True)) 929 930 if self.cfg.has_option('queue','SUBMITHOST'): 931 self.submithost = self.cfg.get('queue','SUBMITHOST') 932 933 self.xmluri = 'http://www.icecube.wisc.edu/simulation/dtd/iceprod.v2.dtd' 934 self.server = rpc.MyCGIXMLRPCRequestHandler() 935 936 self.i3db = ConfigDB() 937 self.i3mondb = MonitorDB()
938
939 - def startsoap(self):
940 941 self.logger.info('Registering module...') 942 self.register_functions() 943 self.register_module() 944 945 # Now expose some functionality from parameter database 946 paramdb = MySQLParamDb() 947 paramdb.authenticate(self.dbserver,self.username, self.password, 948 self.database,port=self.dbport,keep_open=True) 949 paramdb.connect() 950 soapdb = SoapParamDB(paramdb) 951 soapdb.Register(self) 952 self.server.handle_request()
953
954 - def submit(self,sconfig,username,password,submitter,production=False):
955 """ 956 Receive a remote request for a job submission 957 @param sconfig: pickled steering configuration 958 @param username: (needed for connecting to the configuration database 959 @param password: (needed for connecting to the configuration database 960 @return: Output generated by queue 961 """ 962 963 i3db = ConfigDB() 964 if not self.auth_db(i3db,username, password,keep_open=True): 965 self.logger.info("Access denied for user %s" % username) 966 return fail("Access denied for user %s" % username) 967 968 self.logger.info("Handling submission from user %s" % username) 969 970 i3steering = cPickle.loads(sconfig) 971 i3q = None 972 if production: 973 try: 974 status,i3q = self.enqueue(i3steering,username,password,submitter) 975 except Exception, e: 976 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 977 self.logger.error('Caught %s: %s' % (sys.exc_type, str(e))) 978 return fail(str(e),e) 979 else: 980 return fail("cgi-soaptray does not support non-production jobs.") 981 del i3steering 982 return status,dumps(i3q),dumps(None)
983 984 985
986 -class Monitor:
987 """ 988 XMLRPC server class for monitoring jobs 989 job connect to server from compute nodes and make status updates 990 Creates own HTTP server 991 """ 992
993 - def getvalue(self,key):
994 """ 995 Retrive dictionary entry 996 @param: key string 997 @return: string value 998 """ 999 i3db = self.i3db.new() 1000 self.semaphore.acquire() 1001 i3db.connect() 1002 logger.debug("fetching entry %s " % key ) 1003 value = cPickle.dumps(None) 1004 try: 1005 value = i3db.fetch_dict_value(key) 1006 logger.debug("fetched entry %s,%s " % (key,value) ) 1007 except Exception,e: 1008 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1009 logger.error(e) 1010 i3db.disconnect() 1011 self.semaphore.release() 1012 return value
1013 1014
1015 - def getfile(self,key,dataset_id=0):
1016 """ 1017 Retrive dictionary entry 1018 @param: key string 1019 @return: string value 1020 """ 1021 i3db = self.i3db.new() 1022 self.semaphore.acquire() 1023 i3db.connect() 1024 value = cPickle.dumps(None) 1025 try: 1026 value = i3db.fetch_filename(key,dataset_id) 1027 value = tuple(value) 1028 except Exception,e: 1029 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1030 logger.error(e) 1031 i3db.disconnect() 1032 self.semaphore.release() 1033 return value
1034
1035 - def get_tarball(self,metaproject_name,metaproject_version,platform,gccversion,ppc=0):
1036 """ 1037 Get best matching tarball in file repository 1038 @param metaproject_name 1039 @param metaproject_version 1040 @param platform 1041 @param gccversion 1042 @return: string path to tarball 1043 """ 1044 i3db = self.i3db.new() 1045 i3db.connect() 1046 path = '' 1047 try: 1048 path = i3db.get_metaproject_tarball(metaproject_name,metaproject_version,platform,gccversion) 1049 except Exception,e: 1050 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1051 logger.error(e) 1052 i3db.disconnect() 1053 del i3db 1054 return path
1055 1056 1057
1058 - def getconfig(self,dataset):
1059 """ 1060 Download configuration from database 1061 """ 1062 i3db = self.i3db.new() 1063 self.semaphore.acquire() 1064 i3db.connect() 1065 value = cPickle.dumps(None) 1066 try: 1067 i3config = i3db.download_config(dataset) 1068 value = cPickle.dumps(IceTrayXMLWriter(i3config,self.xmluri).getDOM()) 1069 except Exception,e: 1070 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1071 logger.error(e) 1072 i3db.disconnect() 1073 self.semaphore.release() 1074 return value
1075 1076
1077 - def getmetadata(self,dataset):
1078 """ 1079 Download configuration from database 1080 """ 1081 logger.info("fetching metadata for dataset %d" % dataset) 1082 i3db = self.i3db.new() 1083 self.semaphore.acquire() 1084 i3db.connect() 1085 value = cPickle.dumps(None) 1086 try: 1087 i3config = i3db.download_config(dataset) 1088 difplus = i3db.download_metadata(dataset) 1089 difplus.GetPlus().SetSteeringFile(os.path.join(difplus.GetStoragePath(),'config.xml')) 1090 value = cPickle.dumps(MetadataWriter(difplus).getDOM()) 1091 except Exception,e: 1092 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1093 logger.error(e) 1094 i3db.disconnect() 1095 self.semaphore.release() 1096 return value
1097
1098 - def start(self,hostname,dataset=0,job_id=0,key='',grid=0):
1099 """ 1100 Change the status of a job to indicate it is currently running 1101 @param : hostname 1102 @param : dataset simdb_id 1103 @param : job_id 1104 @param : key a passkey to prevent processes from overriding entries 1105 @return: dataset_id,nproc,procnum 1106 """ 1107 i3mondb = self.i3mondb.new() 1108 self.semaphore.acquire() 1109 i3mondb.connect() 1110 i3mondb.set_auto(True) 1111 logger.info("job %d.%d starting on %s" % (dataset,job_id,hostname)) 1112 if not len(key): key = None 1113 try: 1114 if not grid: grid = self.grid_id 1115 dataset,jobs,queue_id = i3mondb.jobstart(hostname,grid,dataset,job_id,key) 1116 i3mondb.commit() 1117 except Exception,e: 1118 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1119 logger.error(e) 1120 dataset,jobs,queue_id = (0,0,0) 1121 i3mondb.disconnect() 1122 logger.info("job %d.%d started on %s" % (dataset,job_id,hostname)) 1123 self.semaphore.release() 1124 return dataset,jobs,queue_id
1125
1126 - def ping(self,dataset_id,job_id,host,key='',tray=0,iter=0):
1127 """ 1128 Let server know that job is still running 1129 @param dataset_id: runconfig index 1130 @param job_id: process number within dataset 1131 @param host: hostname of computing node 1132 """ 1133 if not len(key): key = None 1134 logger.info("Received ping from %s with job %d.%d " % (host,dataset_id,job_id)) 1135 logger.info("processing tray %d, iter %d " % (tray,iter)) 1136 i3mondb = self.i3mondb.new() 1137 self.semaphore.acquire() 1138 i3mondb.connect() 1139 i3mondb.set_auto(True) 1140 try: 1141 retval = i3mondb.jobping(dataset_id,job_id,host,key,tray,iter) 1142 i3mondb.commit() 1143 except Exception,e: 1144 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1145 logger.error(e) 1146 retval = -1 1147 i3mondb.disconnect() 1148 self.semaphore.release() 1149 return retval
1150
1151 - def finish(self,dataset_id,job_id,stats,key='',mode=0):
1152 """ 1153 Update monitoring for job and write statistics 1154 @param dataset_id: runconfig index 1155 @param job_id: process number within dataset 1156 @param stats: dictonary of stat entries 1157 @param key: security passkey assigned to job 1158 @param mode: if true, finalize job and set status to COPIED instead of 1159 setting its status to READYTOCOPY 1160 """ 1161 if not len(key): key = None 1162 logger.info('job %d.%d finished (mode %u)' % (dataset_id,job_id,mode)) 1163 i3mondb = self.i3mondb.new() 1164 self.semaphore.acquire() 1165 i3mondb.connect() 1166 i3mondb.set_auto(True) 1167 count = 10 1168 try: 1169 logger.debug("stats: "+str(stats)) 1170 retval = i3mondb.jobfinish(dataset_id,job_id,cPickle.loads(stats),key,mode) 1171 if mode: 1172 i3mondb.jobfinalize(dataset_id,job_id,key,status='COPIED') 1173 i3mondb.commit() 1174 except Exception,e: 1175 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1176 logger.error(e) 1177 logger.error("failed to set finish status for job %d.%d." % (dataset_id,job_id)) 1178 retval = -1 1179 i3mondb.disconnect() 1180 self.semaphore.release() 1181 logger.debug('finish - done') 1182 return retval
1183
1184 - def copying(self,dataset_id,job_id,key=''):
1185 """ 1186 Update monitoring for job and write statistics 1187 @param dataset_id: runconfig index 1188 @param job_id: process number within dataset 1189 @param stats: dictonary of stat entries 1190 """ 1191 if not len(key): key = None 1192 logger.info('job %d.%d is copying data' % (dataset_id,job_id)) 1193 i3mondb = self.i3mondb.new() 1194 self.semaphore.acquire() 1195 i3mondb.connect() 1196 i3mondb.set_auto(True) 1197 count = 10 1198 try: 1199 logger.debug("setting job status to 'COPYING for %d.%d' " % (dataset_id,job_id)) 1200 retval = i3mondb.jobfinalize(dataset_id,job_id,key,status='COPYING') 1201 i3mondb.commit() 1202 except Exception,e: 1203 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1204 logger.error(e) 1205 logger.error("failed to set copying status for job %d.%d." % (dataset_id,job_id)) 1206 retval = -1 1207 i3mondb.disconnect() 1208 self.semaphore.release() 1209 logger.debug('copying - done') 1210 return retval
1211 1212
1213 - def abort(self,job_id,dataset_id,error,errormessage='',key='',stats=cPickle.dumps({})):
1214 """ 1215 Reset any pending jobs to they get reprocesses. 1216 This would typically be run at startup in case the daemon 1217 crashed previously. 1218 """ 1219 if not len(key): key = None 1220 logger.warn('aborting job %d.%d - %s ' % (dataset_id,job_id,errormessage)) 1221 i3mondb = self.i3mondb.new() 1222 self.semaphore.acquire() 1223 i3mondb.connect() 1224 i3mondb.set_auto(True) 1225 retval = 1 1226 try: 1227 i3mondb.jobabort(job_id,dataset_id,error,errormessage,key,cPickle.loads(stats)) 1228 i3mondb.commit() 1229 except Exception,e: 1230 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1231 logger.error(e) 1232 logger.error("failed to abort job %d.%d." % (dataset_id,job_id)) 1233 retval = 0 1234 i3mondb.disconnect() 1235 self.semaphore.release() 1236 return retval
1237
1238 - def AddFileURL(self,dataset_id,job_id,url,md5sum,filesize,transfertime,key=''):
1239 """ 1240 Add or change the global location of a file 1241 """ 1242 if not len(key): key = None 1243 filename = os.path.basename(url) 1244 location = os.path.dirname(url) 1245 logger.debug('%06u.%06u: %s' % (dataset_id,job_id,url)) 1246 1247 i3mondb = self.i3mondb.new() 1248 self.semaphore.acquire() 1249 i3mondb.connect() 1250 try: 1251 logger.info('SetFileURL: %06u.%06u:%s' % (dataset_id,job_id,url)) 1252 retval = i3mondb.SetFileURL(job_id,dataset_id,location,filename,md5sum,filesize,transfertime,key) 1253 i3mondb.commit() 1254 retval = 1 1255 except Exception,e: 1256 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1257 logger.error(e) 1258 logger.error("failed to set url %s for job %d.%d." % (url,dataset_id,job_id)) 1259 retval = 0 1260 except Exception,e: 1261 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1262 logger.error(e) 1263 1264 i3mondb.disconnect() 1265 self.semaphore.release() 1266 return retval
1267
1268 - def get_storage_url(self,dataset_id,queue_id,passkey='',storage_type='INPUT'):
1269 i3mondb = self.i3mondb.new() 1270 i3mondb.connect() 1271 try: 1272 self.logger.info("storage url for %u.%u" % (dataset_id,queue_id)) 1273 urldict_s = cPickle.dumps(i3mondb.GetStorageURL(dataset_id,queue_id,passkey,storage_type)) 1274 except Exception,e: 1275 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1276 logger.error(e) 1277 i3mondb.disconnect() 1278 raise 1279 i3mondb.disconnect() 1280 return urldict_s
1281 1282 #------ Multi-part job (DAG) functions --------------------------------------- 1283
1284 - def multipart_job_start(self,dataset_id,queue_id,key=''):
1285 """ 1286 Change the status of a job to indicate it is currently running 1287 @param : dataset simdb_id 1288 @param : job_id 1289 @param : key a passkey to prevent processes from overriding entries 1290 @return: dataset_id,nproc,procnum 1291 """ 1292 i3mondb = self.i3mondb.new() 1293 self.semaphore.acquire() 1294 i3mondb.connect() 1295 i3mondb.set_auto(False) 1296 logger.info("job %d.%d starting" % (dataset_id,queue_id)) 1297 if not len(key): key = None 1298 try: 1299 dataset,jobs,queue_id = i3mondb.multipart_job_start(dataset_id,queue_id,key) 1300 if dataset_id != TASK_DATASET_ERROR_ID: 1301 i3mondb.commit() 1302 else: 1303 i3mondb.rollback() 1304 logger.info("job %d.%d starting error" % (dataset_id,queue_id)) 1305 except Exception,e: 1306 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1307 logger.error(e) 1308 dataset,jobs,queue_id = (TASK_DATASET_ERROR_ID,0,0) 1309 i3mondb.disconnect() 1310 self.semaphore.release() 1311 return dataset,jobs,queue_id
1312
1313 - def multipart_job_finish(self,dataset_id,queue_id,key=''):
1314 i3mondb = self.i3mondb.new() 1315 self.semaphore.acquire() 1316 i3mondb.connect() 1317 i3mondb.set_auto(False) 1318 if not len(key): key = None 1319 try: 1320 i3mondb.multipart_job_finish(dataset_id,queue_id,key) 1321 i3mondb.commit() 1322 logger.info("job %s.%s finished successfully" % (dataset_id,queue_id)) 1323 except Exception,e: 1324 i3mondb.rollback() 1325 logger.error("job %s.%s failed to set finish status" % (dataset_id,queue_id)) 1326 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1327 logger.error(e) 1328 i3mondb.disconnect() 1329 self.semaphore.release() 1330 return True
1331
1332 - def task_start(self,dataset_id,queue_id,taskname,tray,iter,hostname,key=''):
1333 i3mondb = self.i3mondb.new() 1334 self.semaphore.acquire() 1335 i3mondb.connect() 1336 i3mondb.set_auto(False) 1337 logger.info("job %d.%d starting task %s, tray %s, iter %s" \ 1338 % (dataset_id,queue_id,taskname,tray,iter)) 1339 logger.info("Checking that task parents have completed..") 1340 steering = Steering() 1341 i3mondb.download_tasks(dataset_id,steering) 1342 job = i3mondb.GetJob(dataset_id,queue_id) 1343 td = steering.GetTaskDefinition(taskname) 1344 for parent in td.GetParents(): # check dependencies 1345 parent_td = steering.GetTaskDefinition(parent) 1346 logger.debug(parent_td.GetName()) 1347 if not i3mondb.task_is_finished(parent_td.GetId(), job.GetDatabaseId()): 1348 logger.error("task id %s, '%s' job id %s" % (parent_td.GetId(),parent_td.GetName(),job.DatabaseId())) 1349 logger.error("Parent tasks for task %s for %u.%u have not completed" % (taskname,dataset_id,queue_id)) 1350 i3mondb.disconnect() 1351 self.semaphore.release() 1352 return TASK_ERROR_ID 1353 logger.info("..OK") 1354 1355 if not len(key): key = None 1356 try: 1357 task_id = i3mondb.task_start(dataset_id,queue_id,taskname,tray,iter,hostname,key) 1358 if task_id != TASK_ERROR_ID: 1359 i3mondb.commit() 1360 else: 1361 i3mondb.rollback() 1362 except Exception,e: 1363 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1364 logger.error(e) 1365 task_id = TASK_ERROR_ID 1366 i3mondb.disconnect() 1367 self.semaphore.release() 1368 return task_id
1369
1370 - def task_copying_input(self,task_id,key=''):
1371 logger.info("task %d copying input" % task_id) 1372 return self.task_update_status(task_id,'COPYINGINPUT',key)
1373
1374 - def task_processing(self,task_id,key=''):
1375 logger.info("task %d processing" % task_id) 1376 return self.task_update_status(task_id,'PROCESSING',key)
1377
1378 - def task_copying_output(self,task_id,key=''):
1379 logger.info("task %d copying output" % task_id) 1380 return self.task_update_status(task_id,'COPYINGOUTPUT',key)
1381
1382 - def task_update_status(self,task_id,status,key=''):
1383 i3mondb = self.i3mondb.new() 1384 self.semaphore.acquire() 1385 i3mondb.connect() 1386 i3mondb.set_auto(False) 1387 if not len(key): key = None 1388 try: 1389 ret = i3mondb.task_update_status(task_id,status,key) 1390 if ret: 1391 i3mondb.commit() 1392 else: 1393 i3mondb.rollback() 1394 except Exception,e: 1395 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1396 logger.error(e) 1397 ret = False 1398 i3mondb.disconnect() 1399 self.semaphore.release() 1400 return ret
1401
1402 - def task_abort(self,task_id,key=''):
1403 i3mondb = self.i3mondb.new() 1404 self.semaphore.acquire() 1405 i3mondb.connect() 1406 i3mondb.set_auto(False) 1407 logger.warn("task %d aborted" % task_id) 1408 if not len(key): key = None 1409 try: 1410 ret = i3mondb.task_abort(task_id,key) 1411 if ret: 1412 i3mondb.commit() 1413 else: 1414 i3mondb.rollback() 1415 except Exception,e: 1416 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1417 logger.error(e) 1418 ret = False 1419 i3mondb.disconnect() 1420 self.semaphore.release() 1421 return ret
1422
1423 - def task_finish(self,task_id,stats,key=''):
1424 stats = cPickle.loads(stats) 1425 i3mondb = self.i3mondb.new() 1426 self.semaphore.acquire() 1427 i3mondb.connect() 1428 i3mondb.set_auto(False) 1429 logger.info("task %d finished" % task_id) 1430 if not len(key): key = None 1431 try: 1432 ret = i3mondb.task_finish(task_id,stats,key) 1433 if ret: 1434 i3mondb.commit() 1435 else: 1436 i3mondb.rollback() 1437 except Exception,e: 1438 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 1439 logger.error(e) 1440 ret = False 1441 i3mondb.disconnect() 1442 self.semaphore.release() 1443 return ret
1444 1445 #------ End of multi-part job (DAG) functions -------------------------------- 1446
1447 - def serve_forever(self,server):
1448 """ 1449 For some reason SOAPpy throws an exception if we get an 1450 HTTP instead of HTTPS request when using SSL 1451 """ 1452 if ssl_supported: 1453 try: 1454 server.serve_forever() 1455 except KeyboardInterrupt: 1456 logger.info("Received keyboard interrupt") 1457 logger.info("Exiting") 1458 os._exit(0) 1459 1460 except SSL.Error,e: 1461 logger.error("received: " + str(e)) 1462 serve_forever(server) 1463 except socket.error,e: 1464 self.logger.error(str(e)) 1465 os._exit(1) 1466 else: # don't catch the SSL exception 1467 try: 1468 server.serve_forever() 1469 except KeyboardInterrupt: 1470 logger.info("Received keyboard interrupt") 1471 logger.info("Exiting") 1472 os._exit(0) 1473 except socket.error,e: 1474 self.logger.error(str(e)) 1475 os._exit(1)
1476
1477 - def echo(self,msg):
1478 return msg
1479 1480
1481 - def register_module(self):
1482 """ 1483 Register daemon with database and update status 1484 """ 1485 i3db = MonitorDB() 1486 if not i3db.authenticate(self.host,self.user,self.passwd, 1487 self.database,port=self.dbport,keep_open=True): 1488 raise Exception, 'unable to authenticate database user' 1489 self.grid_id = i3db.GetGridId(self.grid_name) 1490 i3db.disconnect() 1491 return self.grid_id
1492
1493 - def register_functions(self):
1494 # expose the methods 1495 self.server.register_function(self.echo) 1496 self.server.register_function(self.start) 1497 self.server.register_function(self.finish) 1498 self.server.register_function(self.abort) 1499 self.server.register_function(self.ping) 1500 self.server.register_function(self.getconfig) 1501 self.server.register_function(self.getmetadata) 1502 self.server.register_function(self.getfile) 1503 self.server.register_function(self.get_tarball) 1504 self.server.register_function(self.getvalue) 1505 self.server.register_function(self.copying) 1506 self.server.register_function(self.AddFileURL) 1507 # expose the multipart job-related functions 1508 self.server.register_function(self.multipart_job_start) 1509 self.server.register_function(self.multipart_job_finish) 1510 self.server.register_function(self.task_start) 1511 self.server.register_function(self.task_copying_input) 1512 self.server.register_function(self.task_processing) 1513 self.server.register_function(self.task_copying_output) 1514 self.server.register_function(self.task_finish) 1515 self.server.register_function(self.task_abort) 1516 self.server.register_function(self.get_storage_url) 1517 self.server.register_introspection_functions()
1518
1519 - def monitor(self):
1520 self.register_functions() 1521 self.grid_id = self.register_module() 1522 try: 1523 self.serve_forever(self.server) 1524 except Exception,e: 1525 logger.warn(e)
1526
1527 - def SetSemaphore(self,semaphore):
1528 self.semaphore = semaphore
1529
1530 - def __init__(self,cfg):
1531 1532 self.cfg = cfg 1533 self.use_ldap = self.cfg.getboolean('ldap','enable') 1534 self.logger = logging.getLogger('SoapMon') 1535 self.grid_name = self.cfg.get('queue','name') 1536 address = self.cfg.get('monitoring','server') 1537 port = self.cfg.getint('monitoring','port') 1538 self.semaphore = DummySemaphore() 1539 self.usesecure = True 1540 1541 try: 1542 self.xmluri = self.cfg.get('path','uri') 1543 except: 1544 self.xmluri = 'http://www.icecube.wisc.edu/simulation/dtd/iceprod.v2.dtd' 1545 1546 if self.cfg.has_option('monitoring','USESSL'): 1547 self.usesecure = self.cfg.getboolean('monitoring','USESSL') 1548 if self.cfg.has_option('security','USESSL'): 1549 self.usesecure = self.usesecure and self.cfg.getboolean('security','USESSL') 1550 if self.usesecure: 1551 try: 1552 cert = expandvars(self.cfg.get('security','SSLCERT')) 1553 key = expandvars(self.cfg.get('security','SSLKEY')) 1554 if not os.path.exists(cert): 1555 self.logger.fatal("Cannot find SSL certificate in %s" % cert) 1556 if not os.path.exists(key): 1557 self.logger.fatal("Cannot find SSL key in %s" % key) 1558 1559 sslctx = SSL.Context(SSL.SSLv23_METHOD) 1560 sslctx.use_privatekey_file (key) 1561 sslctx.use_certificate_file(cert) 1562 self.server = rpc.ThreadedSecureXMLRPCServer((address, port), ssl_context = sslctx) 1563 self.logger.info("Monitoring server running **encrypted** on addr:%s:%d" % (address,port)) 1564 except Exception,e: 1565 self.logger.warn(e) 1566 self.server = rpc.ThreadedXMLRPCServer((address, port)) 1567 self.logger.info("Monitoring server running **un-encrypted** on addr:%s:%d" % (address,port)) 1568 else: 1569 self.server = rpc.ThreadedXMLRPCServer((address, port)) 1570 self.logger.info("Monitoring server running **un-encrypted** on addr:%s:%d" % (address,port)) 1571 1572 self.host = self.cfg.get('database','server') 1573 self.user = self.cfg.get('database','username') 1574 self.passwd = self.cfg.get('database','password') 1575 self.database = self.cfg.get('database','database') 1576 self.dbport = self.cfg.getint('database','port') 1577 1578 self.i3db = ConfigDB() 1579 self.i3db.authenticate(self.host,self.user,self.passwd, 1580 self.database,port=self.dbport,keep_open=False) 1581 1582 self.i3mondb = MonitorDB() 1583 self.i3mondb.authenticate(self.host,self.user,self.passwd, 1584 self.database,port=self.dbport,keep_open=False)
1585 1586 1587
1588 -class MonitorCGI(Monitor):
1589 """ 1590 XMLRPC server class for monitoring jobs 1591 job connect to server from compute nodes and make status updates 1592 Similar to Monitor class but runs CGI embeded in existing HTTP server 1593 1594 """ 1595
1596 - def __init__(self,cfg):
1597 1598 self.semaphore = DummySemaphore(verbose=1) 1599 self.cfg = cfg 1600 self.use_ldap = self.cfg.getboolean('ldap','enable') 1601 self.grid_id = 79 1602 1603 self.xmluri = 'http://www.icecube.wisc.edu/simulation/dtd/iceprod.v2.dtd' 1604 self.server = rpc.MyCGIXMLRPCRequestHandler() 1605 1606 self.host = self.cfg.get('database','server') 1607 self.user = self.cfg.get('database','username') 1608 self.passwd = self.cfg.get('database','password') 1609 self.database = self.cfg.get('database','database') 1610 self.dbport = self.cfg.getint('database','port') 1611 1612 self.i3db = ConfigDB() 1613 self.i3db.authenticate(self.host,self.user,self.passwd, 1614 self.database,port=self.dbport,keep_open=False) 1615 1616 self.i3mondb = MonitorDB() 1617 self.i3mondb.authenticate(self.host,self.user,self.passwd, 1618 self.database,port=self.dbport,keep_open=False)
1619
1620 - def monitor(self):
1621 self.register_functions() 1622 self.server.handle_request()
1623
1624 -class MonitorCGIProxy(MonitorCGI):
1625 """ 1626 XMLRPC server class for monitoring jobs 1627 job connect to server from compute nodes and make status updates 1628 Similar to Monitor class but runs CGI embeded in existing HTTP server 1629 1630 """ 1631
1632 - def __init__(self,cfg):
1633 1634 self.semaphore = DummySemaphore(verbose=1) 1635 self.cfg = cfg 1636 1637 self.xmluri = 'http://www.icecube.wisc.edu/simulation/dtd/iceprod.v2.dtd' 1638 self.server = rpc.MyCGIXMLRPCRequestHandler() 1639 1640 self.url = self.cfg.get('proxy','soapmon') 1641 import xmlrpclib 1642 self.client = xmlrpclib.ServerProxy(self.url) 1643 self.logger = logging.getLogger('SoapMonProxy')
1644
1645 - def monitor(self):
1646 self.logger.info("starting soapmon proxy") 1647 self.register_functions() 1648 self.server.handle_request()
1649 1650
1651 - def getvalue(self,key):
1652 return self.client.getvalue(key)
1653
1654 - def getfile(self,key,dataset_id=0):
1655 return self.client.getfile(key,dataset_id)
1656
1657 - def get_tarball(self,metaproject_name,metaproject_version,platform,gccversion,ppc=0):
1658 self.logger.info("metaproject %s %s %s %s" % (metaproject_name,metaproject_version,platform,gccversion)) 1659 return self.client.get_tarball(metaproject_name,metaproject_version,platform,gccversion,ppc)
1660
1661 - def getconfig(self,dataset):
1662 return self.client.getconfig(dataset)
1663
1664 - def getmetadata(self,dataset):
1665 return self.client.getmetadata(dataset)
1666
1667 - def start(self,hostname,dataset=0,job_id=0,key='',grid=0):
1668 self.logger.info("starting job %s.%s" % (dataset,job_id)) 1669 return self.client.start(hostname,dataset,job_id,key,grid)
1670
1671 - def ping(self,dataset_id,job_id,host,key='',tray=0,iter=0):
1672 self.logger.info("ping from job %s.%s" % (dataset_id,job_id)) 1673 return self.client.ping(dataset_id,job_id,host,key,tray,iter)
1674
1675 - def finish(self,dataset_id,job_id,stats,key='',mode=0):
1676 self.logger.info("finishing job %s.%s" % (dataset_id,job_id)) 1677 return self.client.finish(dataset_id,job_id,stats,key,mode)
1678
1679 - def copying(self,dataset_id,job_id,key=''):
1680 return self.client.copying(dataset_id,job_id,key)
1681
1682 - def abort(self,job_id,dataset_id,error,errormessage='',key='',stats=cPickle.dumps({})):
1683 self.logger.info("abort job %s.%s %s" % (dataset_id,job_id,errormessage)) 1684 return self.client.abort(job_id,dataset_id,error,errormessage,key,stats)
1685
1686 - def AddFileURL(self,dataset_id,job_id,url,md5sum,filesize,transfertime,key=''):
1687 return self.client.AddFileURL(dataset_id,job_id,url,md5sum,filesize,transfertime,key)
1688
1689 - def get_storage_url(self,dataset_id,queue_id,passkey='',storage_type='INPUT'):
1690 return self.client.get_storage_url(dataset_id,queue_id,passkey,storage_type)
1691
1692 - def multipart_job_start(self,dataset_id,queue_id,key=''):
1693 return self.client.multipart_job_start(dataset_id,queue_id,key)
1694
1695 - def multipart_job_finish(self,dataset_id,queue_id,key=''):
1696 return self.client.multipart_job_finish(dataset_id,queue_id,key)
1697
1698 - def task_start(self,dataset_id,queue_id,taskname,tray,iter,hostname,key=''):
1699 return self.client.task_start(dataset_id,queue_id,taskname,tray,iter,hostname,key)
1700
1701 - def task_copying_input(self,task_id,key=''):
1702 return self.client.task_copying_input(task_id,key)
1703
1704 - def task_processing(self,task_id,key=''):
1705 return self.client.task_processing(task_id,key)
1706
1707 - def task_copying_output(self,task_id,key=''):
1708 return self.client.task_copying_output(task_id,key)
1709
1710 - def task_update_status(self,task_id,status,key=''):
1711 return self.client.task_update_status(task_id,status,key)
1712
1713 - def task_abort(self,task_id,key=''):
1714 return self.client.task_abort(task_id,key)
1715
1716 - def task_finish(self,task_id,stats,key=''):
1717 return self.client.task_finish(task_id,stats,key)
1718
1719 - def echo(self,msg):
1720 return self.client.echo(msg)
1721