1
2
3 """
4 XML-RPC Server module for running remote batch jobs on IceTray.
5
6 copyright (c) 2005 the icecube collaboration
7
8 @version: $Revision: $
9 @date: $Date: $
10 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
11 @todo: Add HTTPS support
12 @todo: Add XML validation in suporting classes
13
14 U{Userguide and other documentation <http://www.icecube.wisc.edu/~juancarlos/software/sim-prod>}
15
16 """
17
18 import sys
19 import os
20 import string
21 import re
22 import time
23 import getopt
24 import cPickle
25 import logging
26 import signal
27 import exceptions
28 import pwd,grp
29 import thread,threading
30 from threading import Thread
31 from os.path import expandvars,join
32 from cPickle import dumps,loads
33 from MySQLdb import OperationalError
34 from Queue import Queue
35
36 try:
37 import ldap
38 ldap_installed = True
39 except ImportError:
40 ldap_installed = False
41
42 import iceprod
43 from iceprod.server import rpc
44 from iceprod.core.dataclasses import *
45 from iceprod.core.xmlwriter import IceTrayXMLWriter
46 from iceprod.server.db import ConfigDB,MonitorDB
47 from iceprod.core.configuration import Config
48 from iceprod.server.db import MySQLParamDb,SoapParamDB
49 from iceprod.server.queue import i3ProdQueue
50 from iceprod.server.job import i3Job
51
52 import iceprod.core.logger
53 logger = logging.getLogger('server')
54
55 localhost = os.uname()[1].split(".")[0]
56
57 soapdaemons = ['soaptray','soapmon','soapqueue','soapdh','soaphisto']
58
59 ssl_supported = False
60
61 try:
62 from OpenSSL import SSL
63 ssl_supported = True
64 except Exception,e:
65 print >> sys.stderr,'exception: %s : communicaton will not be encrypted' % str(e)
66 pass
67
68
69 -def fail(msg,exception=Exception()):
70 return msg,dumps(None),dumps(exception)
71
72
74 """
75 Dummy class that shares interface with Semaphore class but does nothing.
76 """
77
78 - def __init__(self, value=1, verbose=None):
79 self.counter = value
80 self.verbose = verbose
81
83 self.counter -= 1
84 if self.verbose: logger.info("count: %d" % self.counter)
85 return self.counter
86
88 self.counter += 1
89 if self.verbose: logger.info("count: %d" % self.counter)
90 return self.counter
91
93 logger = logging.getLogger('QueueSemaphore')
94
95 - def __init__(self, value=1, verbose=None):
96 self.verbose = verbose
97 self.queue = Queue(value)
98
100 self.queue.put(1)
101 if self.verbose: self.logger.info("count: %d" % self.queue.qsize())
102
106
107
109
110 """
111 XMLRPC server class for submitting jobs to IceProd
112 """
113
115 self.cfg = cfg
116
117 self.use_ldap = self.cfg.getboolean('ldap','enable') and ldap_installed
118 self.logger = logging.getLogger('SoapTray')
119 self.dbserver = self.cfg.get('database','server')
120 self.dbport = self.cfg.getint('database','port')
121 self.database = self.cfg.get('database','database')
122 self.username = self.cfg.get('database','username')
123 self.password = self.cfg.get('database','password')
124 self.grid_name = self.cfg.get('queue','name')
125 self.batchsys = self.cfg.get('queue','batchsys')
126 self.institution = self.cfg.get('info','INSTITUTION')
127 self.address = self.cfg.get('server','server')
128 self.port = self.cfg.getint('server','port')
129 self.rootdir = self.cfg.get('path','basedir')
130 self.datawarehouse = expandvars(self.cfg.get('path','datawarehouse',raw=True))
131 self.usesecure = True
132 if self.cfg.has_option('server','USESSL'):
133 self.usesecure = self.cfg.getboolean('server','USESSL')
134 if self.cfg.has_option('security','USESSL'):
135 self.usesecure = self.usesecure and self.cfg.getboolean('security','USESSL')
136
137 if self.use_ldap:
138 self.ldap_users = self.cfg.get('ldap','users').split(',')
139 self.ldap_users = map(string.strip,self.ldap_users)
140
141 self.iceprodcert = None
142 self.iceprodkey = None
143 if self.usesecure:
144 self.iceprodcert = expandvars(self.cfg.get('security','SSLCERT'))
145 self.iceprodkey = expandvars(self.cfg.get('security','SSLKEY'))
146
147 self.submithost = os.getenv('HOSTNAME')
148 if self.cfg.has_option('queue','SUBMITHOST'):
149 self.submithost = self.cfg.get('queue','SUBMITHOST')
150
151
152 if ssl_supported and self.usesecure:
153 sslctx = SSL.Context(SSL.SSLv23_METHOD)
154 if not os.path.exists(self.iceprodcert):
155 self.logger.fatal("Cannot find SSL certificate in %s" % self.iceprodcert)
156 if not os.path.exists(self.iceprodkey):
157 self.logger.fatal("Cannot find SSL key in %s" % self.iceprodkey)
158
159 sslctx.use_privatekey_file (self.iceprodkey)
160 sslctx.use_certificate_file(self.iceprodcert)
161 self.server = rpc.ThreadedSecureXMLRPCServer(
162 (self.address, self.port),
163 logRequests=False, ssl_context = sslctx)
164 self.logger.info("soaptray server running **encrypted** on addr:%s:%d" % (self.address,self.port))
165
166
167 else:
168 self.server = rpc.ThreadedXMLRPCServer((self.address, self.port))
169 self.logger.info("soaptray server running **un-encrypted** on addr:%s:%d" % (self.address,self.port))
170
171
173 """
174 Register daemon with database and update status
175 """
176 i3db = MonitorDB()
177 if not i3db.authenticate(self.dbserver,self.username, self.password,
178 self.database, port=self.dbport,keep_open=True):
179 raise Exception, 'unable to authenticate database user'
180 self.grid_id = i3db.GetGridId(self.grid_name)
181 i3db.disconnect()
182 return self.grid_id
183
185 """
186 For some reason SOAPpy throws an exception if we get an
187 HTTP instead of HTTPS request when using SSL
188 """
189 if ssl_supported:
190 try:
191 self.server.serve_forever()
192 except KeyboardInterrupt:
193 self.logger.info("Received keyboard interrupt")
194 self.logger.info("Exiting")
195 os._exit(0)
196
197 except SSL.Error,e:
198 self.logger.error("received: " + str(e))
199 self.serve_forever()
200 except socket.error,e:
201 self.logger.error(str(e))
202 os._exit(1)
203 else:
204 try:
205 self.server.serve_forever()
206 except KeyboardInterrupt:
207 self.logger.info("Received keyboard interrupt")
208 self.logger.info("Exiting")
209 os._exit(0)
210 except socket.error,e:
211 self.logger.error(str(e))
212 os._exit(1)
213
214 - def auth_db(self,db_obj,username,password,keep_open=False):
215 """
216 Authenticate remotely against database
217 This is the exposed method
218 @param db_obj: database instance
219 @param username:
220 @param password:
221 @param : keep_open
222 @return: True if authenticated False otherwise
223 """
224
225 self.logger.info("ldap is %s" % self.use_ldap)
226 if self.cfg.getboolean('ldap','enable') and not ldap_installed:
227 self.logger.warn("ldap is enabled but not supported.")
228
229 if self.use_ldap:
230 l=ldap.initialize(self.cfg.get('ldap','url'))
231 try:
232 result = l.bind(self.cfg.get('ldap','cn',raw=True)%username, password,ldap.AUTH_SIMPLE)
233 self.logger.info(str(l.result(result)))
234 except ldap.INVALID_CREDENTIALS, error:
235 logger.error(error)
236 except ldap.LDAPError, error:
237 self.logger.error(error)
238 except Exception, error:
239 self.logger.error(error)
240 else:
241 if username in self.ldap_users:
242 return db_obj.authenticate(self.dbserver,self.username,self.password,self.database,port=self.dbport,keep_open=keep_open)
243
244 return db_obj.authenticate(self.dbserver,username,password,self.database,port=self.dbport,keep_open=keep_open)
245
246
248 """
249 Authenticate remotely against database
250 This is the exposed method
251 @param username:
252 @param password:
253 @return: True if authenticated False otherwise
254 """
255 if ConfigDB().authenticate2(self.dbserver,username,password,self.database,port=self.dbport):
256 return True
257
258 if self.use_ldap:
259 l=ldap.initialize(self.cfg.get('ldap','url'))
260 try:
261 result = l.bind(self.cfg.get('ldap','cn',raw=True)%username, password,ldap.AUTH_SIMPLE)
262 except ldap.INVALID_CREDENTIALS, error:
263 logger.error(error)
264 return False
265 else:
266 return True
267 else:
268 return False
269
270
271 - def enqueue(self,i3steering,username,password,submitter):
272 configdb = ConfigDB()
273 configdb.SetSubmitter(submitter)
274 configdb.SetInstitution(self.institution)
275 if self.cfg.has_option('soapdh','tempdata'):
276 configdb.SetTempStoragePath(expandvars(self.cfg.get('soapdh','tempdata',raw=True)))
277 else:
278 configdb.SetTempStoragePath(expandvars(self.cfg.get('path','datawarehouse',raw=True)))
279 configdb.SetGlobalStoragePath(expandvars(self.cfg.get('path','datawarehouse',raw=True)))
280 mondb = MonitorDB()
281
282 if not self.auth_db(configdb,username, password,keep_open=True):
283 self.logger.info("ConfigDB: Access denied for user %s" % username)
284 configdb.disconnect()
285 return fail("ConfigDB: Access denied for user %s" % username)
286 if not self.auth_db(mondb,username, password,keep_open=True):
287 self.logger.info("MonitorDB: Access denied for user %s" % username)
288 mondb.disconnect()
289 return fail("MonitorDB: Access denied for user %s" % username)
290
291 q = i3ProdQueue(self.cfg)
292 q.SetConfigDB(configdb)
293 q.SetMonitorDB(mondb)
294 q.SetRootDir(self.rootdir)
295 q.SetSubmitHost(self.submithost)
296 q.SetSubmitter(submitter)
297 q.SetInstitution(self.institution)
298 status,i3q = q.EnQueue(i3steering)
299
300
301 configdb.disconnect()
302 mondb.disconnect()
303 del q
304 return status,i3q
305
306
307 - def submit(self,
308 sconfig,
309 username,
310 password,
311 submitter,
312 production=False,
313 start=0,
314 end=0,
315 dataset=0):
316 """
317 Receive a remote request for a job submission
318 @param sconfig: pickled steering configuration
319 @param username: (needed for connecting to the configuration database
320 @param password: (needed for connecting to the configuration database
321 @param submitter: username of person who submitted dataset
322 @param production: boolean flag
323 @param start: optional begining of job sequence (non-prod)
324 @param end: optional end of job sequence (non-prod)
325 @param dataset: optional (non-prod)
326 @return: Output generated by queue
327 """
328
329 i3db = ConfigDB()
330 if not self.auth_db(i3db,username, password):
331 self.logger.info("Access denied for user %s" % username)
332 return fail("Access denied for user %s" % username)
333
334 self.logger.info("Handling submission from user %s" % username)
335
336 i3steering = cPickle.loads(sconfig)
337 i3q = None
338 if production:
339 try:
340 status,i3q = self.enqueue(i3steering,username,password,submitter)
341 except Exception, e:
342 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
343 self.logger.error('Caught %s: %s' % (sys.exc_type, str(e)))
344 i3db.disconnect()
345 return fail(str(e),e)
346 else:
347 q = i3ProdQueue(self.cfg)
348 q.SetRootDir(self.rootdir)
349 q.SetSubmitHost(self.submithost)
350 q.SetSubmitter(submitter)
351
352 try:
353 status,i3q = q.Submit(i3steering,first=start,last=end,npid=dataset)
354 except Exception, e:
355 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
356 self.logger.error('Caught %s: %s' % (sys.exc_type, str(e)))
357 i3db.disconnect()
358 del q
359 return fail(str(e),e)
360 del q
361 i3db.disconnect()
362 del i3steering
363 return status,dumps(i3q),dumps(None)
364
365
366 - def checkjobs(self,i3q_pkl,username,password):
367 """
368 Receive a remote request for a job status check
369 @param i3q_pkl: a serialized iGrid object
370 """
371
372 i3db = ConfigDB()
373 if not self.auth_db(i3db,username, password,keep_open=True):
374 self.logger.error("Access denied for user %s" % username)
375 return fail("Access denied for user %s" % username)
376
377 i3q = cPickle.loads(i3q_pkl)
378 i3q.batchsys = self.batchsys
379 status = i3q.CheckQ(i3db)
380 i3db.disconnect()
381 return status
382
384 """
385 Receive a remote request for a job status check
386 @param i3q_pkl: a serialized iGrid object
387 """
388
389 i3db = MonitorDB()
390 if not self.auth_db(i3db,username, password,keep_open=True):
391 self.logger.info("Access denied for user %s" % username)
392 return fail("Access denied for user %s" % username)
393
394 i3q = cPickle.loads(i3q_pkl)
395 i3q.batchsys = self.batchsys
396 status = i3q.QRemove(i3db)
397 i3db.disconnect()
398 return status
399
401 jobstr = ".%d" % job
402 if job < 0: jobstr = ""
403 i3db = MonitorDB()
404 if not self.auth_db(i3db,username, password,keep_open=True):
405 self.logger.warn("suspend %d%s denied for user %s" % (dataset,jobstr,username))
406 return fail("suspend %d%s denied for user %s" % (dataset,jobstr,username))
407 try:
408 i3db.jobsuspend(job,dataset,True)
409 self.logger.info("suspend %d%s granted for user %s" % (dataset,jobstr,username))
410 i3db.add_history(username,"suspend %d%s" % (dataset,jobstr))
411 self.logger.info("updated stats")
412 i3db.disconnect()
413 return "suspend %d%s granted for user %s" % (dataset,jobstr,username)
414 except Exception,e:
415 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
416 self.logger.error("suspend %d%s failed:%s" % (dataset,jobstr,str(e)))
417 i3db.disconnect()
418 return fail("suspend %d%s failed:%s" % (dataset,jobstr,str(e)))
419
421 jobstr = ".%d" % job
422 if job < 0: jobstr = ""
423 i3db = MonitorDB()
424 if not self.auth_db(i3db,username, password,keep_open=True):
425 self.logger.warn("resume %d%s denied for user %s" % (dataset,jobstr,username))
426 i3db.disconnect()
427 return fail("resume %d%s denied for user %s" % (dataset,jobstr,username))
428 try:
429 i3db.jobsuspend(job,dataset,False)
430 self.logger.info("resume %d%s granted for user %s" % (dataset,jobstr,username))
431 i3db.add_history(username,"resume %d%s" % (dataset,jobstr))
432 self.logger.info("updated stats")
433 i3db.disconnect()
434 return "resume %d%s granted for user %s" % (dataset,jobstr,username)
435 except Exception, e:
436 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
437 self.logger.error("resume %d%s failed:%s" % (dataset,jobstr,e))
438 i3db.disconnect()
439 return "resume %d%s failed:%s" % (dataset,jobstr,e)
440
442 jobstr = ".%d" % job
443 if job < 0: jobstr = ""
444 i3db = MonitorDB()
445 if not self.auth_db(i3db,username, password,keep_open=True):
446 self.logger.warn("reset %d%s denied for user %s" % (dataset,jobstr,username))
447 return fail("reset %d%s denied for user %s" % (dataset,jobstr,username))
448 try:
449 i3db.jobsuspend(job,dataset,False)
450 i3db.add_history(username,"reset %d%s" % (dataset,jobstr))
451 self.logger.info("reset %d%s granted for user %s" % (dataset,jobstr,username))
452 i3db.disconnect()
453 return "reset %d%s granted for user %s" % (dataset,jobstr,username)
454 except Exception, e:
455 self.logger.error(sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) )
456 self.logger.error("reset %d%s failed:%s" % (dataset,jobstr,e))
457 i3db.disconnect()
458 return "reset %d%s failed:%s" % (dataset,jobstr,e)
459
460
462 i3db = MonitorDB()
463 if not self.auth_db(i3db,username, password,keep_open=True):
464 self.logger.warn("clean %d denied for user %s" % (dataset,username))
465 i3db.disconnect()
466 return fail("clean %d denied for user %s" % (dataset,username))
467 try:
468 i3db.update_monitoring(self.grid_id,dataset_id=dataset)
469 i3db.jobclean(dataset)
470 i3db.add_history(username,"clean %d" % dataset)
471 self.logger.info("clean %d granted for user %s" % (dataset,username))
472 i3db.disconnect()
473 return "clean %d granted for user %s" % (dataset,username)
474 except Exception,e:
475 self.logger.error("clean %d failed:%s" % (dataset,str(e)))
476 i3db.disconnect()
477 return fail("clean %d failed:%s" % (dataset,str(e)))
478
480 i3db = MonitorDB()
481 if not self.auth_db(i3db,username, password,keep_open=True):
482 self.logger.warn("clean %d denied for user %s" % (dataset,username))
483 i3db.disconnect()
484 return fail("clean %d denied for user %s" % (dataset,username))
485 try:
486 i3db.jobclean(dataset,archive=False)
487 i3db.set_metadata_subcat(dataset,sub_cat="obsolete")
488 i3db.setDatasetStatus(dataset,'OBSOLETE')
489 i3db.validate(dataset,'FALSE')
490 i3db.add_history(username,"nuke %d" % dataset)
491 self.logger.info("delete %d granted for user %s" % (dataset,username))
492 i3db.disconnect()
493 return "delete %d granted for user %s" % (dataset,username)
494 except Exception,e:
495 i3db.disconnect()
496 self.logger.error("delete %d failed:%s" % (dataset,str(e)))
497 return fail("clean %d failed:%s" % (dataset,str(e)))
498
500 i3db = MonitorDB()
501 if not self.auth_db(i3db,username, password,keep_open=True):
502 i3db.disconnect()
503 self.logger.warn("clean %d denied for user %s" % (dataset,username))
504 return fail("clean %d denied for user %s" % (dataset,username))
505 try:
506 i3db.update_monitoring(self.grid_id,dataset_id=dataset)
507 i3db.jobclean(dataset)
508 i3db.setDatasetStatus(dataset,'COMPLETE')
509 i3db.add_history(username,"finish %d" % dataset)
510 i3db.disconnect()
511 self.logger.info("finish %d granted for user %s" % (dataset,username))
512 return "finish %d granted for user %s" % (dataset,username)
513 except Exception,e:
514 i3db.disconnect()
515 self.logger.error("finish %d failed:%s" % (dataset,str(e)))
516 return fail("finish %d failed:%s" % (dataset,str(e)))
517
519 i3db = MonitorDB()
520 if not self.auth_db(i3db,username, password,keep_open=True):
521 i3db.disconnect()
522 self.logger.warn("toggle debug %d denied for user %s" % (dataset,username))
523 return fail("toggle debug %d denied for user %s" % (dataset,username))
524 try:
525 i3db.ToggleDatasetDebug(dataset)
526 i3db.add_history(username,"toggle debug %d" % dataset)
527 i3db.disconnect()
528 self.logger.info("toggle debug %d granted for user %s" % (dataset,username))
529 return "toggle debug %d granted for user %s" % (dataset,username)
530 except Exception,e:
531 i3db.disconnect()
532 self.logger.error("toggle debug %d failed:%s" % (dataset,str(e)))
533 return fail("toggle debug %d failed:%s" % (dataset,str(e)))
534
535
536 - def loaddict(self,odict_pkl,username,password,dataset_id=0):
537 i3db = ConfigDB()
538 if not self.auth_db(i3db,username, password,keep_open=True):
539 i3db.disconnect()
540 self.logger.warn("validate %d denied for user %s" % (dataset,username))
541 return fail("validate %d denied for user %s" % (dataset,username))
542 try:
543 odict = cPickle.loads(odict_pkl)
544 i3db.load_dictionary(odict,dataset_id)
545 i3db.disconnect()
546 except Exception,e:
547 i3db.disconnect()
548 self.logger.warn("failed to load dictionary :\n%s" % str(e))
549 return fail("failed to load dictionary :\n%s" % str(e))
550
552 i3db = MonitorDB()
553 if not self.auth_db(i3db,username, password,keep_open=True):
554 i3db.disconnect()
555 self.logger.warn("validate %d denied for user %s" % (dataset,username))
556 return fail("validate %d denied for user %s" % (dataset,username))
557 try:
558 i3db.setDatasetStatus(dataset,status)
559 i3db.add_history(username,"set status %s %d" % (status,dataset))
560 i3db.disconnect()
561 self.logger.info("setstatus '%s' on dataset %d granted for user %s" % (status,dataset,username))
562 return "setstatus '%s' on dataset %d granted for user %s" % (status,dataset,username)
563 except Exception,e:
564 i3db.disconnect()
565 self.logger.error("setstatus '%s' on dataset %d failed \n%s" % (status,dataset,str(e)))
566 return fail("setstatus '%s' on dataset %d failed \n%s" % (status,dataset,str(e)))
567
569 """
570 Mark dataset as obsolete
571 """
572 i3db = ConfigDB()
573 i3mon = MonitorDB()
574 if not self.auth_db(i3db,username, password,keep_open=True):
575 i3db.disconnect()
576 self.logger.warn("validate %d denied for user %s" % (dataset,username))
577 return fail("validate %d denied for user %s" % (dataset,username))
578 self.auth_db(i3mon,username, password,keep_open=True)
579 try:
580 i3db.set_metadata_subcat(dataset,sub_cat="obsolete")
581 i3mon.setDatasetStatus(dataset,'OBSOLETE')
582 i3mon.add_history(username,"retire s %d" % dataset)
583 i3db.disconnect()
584 i3mon.disconnect()
585 self.logger.info("retire dataset %d granted for user %s" % (dataset,username))
586 return "retire dataset %d granted for user %s" % (dataset,username)
587 except Exception,e:
588 i3db.disconnect()
589 i3mon.disconnect()
590 self.logger.error("retire dataset %d failed \n%s" % (dataset,str(e)))
591 return fail("retire dataset %d failed \n%s" % (dataset,str(e)))
592
593
595 i3db = MonitorDB()
596 if not self.auth_db(i3db,username, password,keep_open=True):
597 i3db.disconnect()
598 self.logger.warn("validate %d denied for user %s" % (dataset,username))
599 return fail("validate %d denied for user %s" % (dataset,username))
600 try:
601 if valid:
602 i3db.validate(dataset,'TRUE')
603 else:
604 i3db.validate(dataset,'FALSE')
605 i3db.add_history(username,"validate %d" % dataset)
606 i3db.disconnect()
607 self.logger.info("change %d granted for user %s" % (dataset,username))
608 return "change %d granted for user %s" % (dataset,username)
609 except Exception,e:
610 i3db.disconnect()
611 self.logger.error("validate %d failed:%s" % (dataset,str(e)))
612 return fail("validate %d failed:\n%s" % (dataset,str(e)))
613
614
616 i3db = MonitorDB()
617 if not self.auth_db(i3db,username, password,keep_open=True):
618 i3db.disconnect()
619 self.logger.warn("suspend %s.%s denied for user %s" % (grid,daemon,username))
620 return fail("suspend %s.%s denied for user %s" % (grid,daemon,username))
621 try:
622 if daemon=='all':
623 for d in soapdaemons:
624 i3db.GridRequestSuspend(grid,d)
625 elif daemon in soapdaemons:
626 i3db.GridRequestSuspend(grid,daemon)
627 else:
628 self.logger.error("%s: invalid daemon %s" % (grid,daemon))
629 return fail("%s: invalid daemon %s" % (grid,daemon))
630
631 i3db.add_history(username,"suspend daemon %s.%s" % (grid,daemon) )
632 i3db.disconnect()
633 self.logger.info("suspend %s.%s granted for user %s" % (grid,daemon,username))
634 return "suspend %s.%s granted for user %s" % (grid,daemon,username)
635 except Exception,e:
636 i3db.disconnect()
637 self.logger.error("suspend %s.%s failed:%s" % (grid,daemon,username))
638 return fail("suspend %s.%s failed:%s" % (grid,daemon,username))
639
641 i3db = MonitorDB()
642 if not self.auth_db(i3db,username, password,keep_open=True):
643 i3db.disconnect()
644 self.logger.warn("resume %s.%s denied for user %s" % (grid,daemon,username))
645 return fail("resume %s.%s denied for user %s" % (grid,daemon,username))
646 try:
647 if daemon=='all':
648 for d in soapdaemons:
649 i3db.GridRequestResume(grid,d)
650 elif daemon in soapdaemons:
651 i3db.GridRequestResume(grid,daemon)
652 else:
653 i3db.disconnect()
654 self.logger.error("%s: invalid daemon %s" % (grid,daemon))
655 return fail("%s: invalid daemon %s" % (grid,daemon))
656
657 i3db.add_history(username,"resume daemon %s.%s" % (grid,daemon) )
658 i3db.disconnect()
659 self.logger.info("resume %s.%s granted for user %s" % (grid,daemon,username))
660 return "resume %s.%s granted for user %s" % (grid,daemon,username)
661 except Exception,e:
662 i3db.disconnect()
663 self.logger.error("resume %s.%s failed:%s" % (grid,daemon,username))
664 return fail("resume %s.%s failed:%s" % (grid,daemon,username))
665
667 """
668 Get list of valid simcats
669 """
670 i3db = ConfigDB()
671 if not i3db.authenticate(self.dbserver,self.username, self.password,
672 self.database,port=self.dbport,keep_open=True):
673 i3db.disconnect()
674 self.logger.error("Access denied for user %s" % username)
675 retval = fail("Access denied for user %s" % username)
676 else:
677 retval = dumps(i3db.get_simcat_categories())
678 i3db.disconnect()
679 return retval
680
682 """
683 get usage statistics
684 @param days number of days (from today) to get data from
685 @return formated string with usage statistics
686 """
687 i3db = MonitorDB()
688 if not i3db.authenticate(self.dbserver,self.username, self.password,
689 self.database,port=self.dbport,keep_open=True):
690 i3db.disconnect()
691 self.logger.warn("resume %s.%s denied for user %s" % (grid,daemon,username))
692 return fail("resume %s.%s denied for user %s" % (grid,daemon,username))
693 try:
694 retval = i3db.printsummary(days)
695 self.logger.info("printsummary: serving request")
696 i3db.disconnect()
697 return retval
698 except Exception,e:
699 i3db.disconnect()
700 self.logger.error("print summary failed: %s" % e)
701 return fail("print summary failed")
702
704 """
705 Get list of datasets in database
706 @param search_string: a string containing key words to use
707 """
708 i3db = MonitorDB()
709 if not i3db.authenticate(self.dbserver,self.username, self.password,
710 self.database,port=self.dbport,keep_open=True):
711 self.logger.error("Access denied for user %s" % username)
712 retval = fail("Access denied for user %s" % username)
713 else:
714 retval = dumps(i3db.getstatus(dataset,job))
715 i3db.disconnect()
716 return retval
717
718
720 """
721 Get list of datasets in database
722 @param search_string: a string containing key words to use
723 """
724 i3db = ConfigDB()
725 if not i3db.authenticate(self.dbserver,self.username, self.password,
726 self.database,port=self.dbport,keep_open=True):
727 self.logger.error("Access denied for user %s" % username)
728 retval = fail("Access denied for user %s" % username)
729 else:
730 retval = dumps(i3db.show_dataset_table(search_string))
731 i3db.disconnect()
732 return retval
733
735 """
736 Fetch configuration from database
737 @param datasest: dataset id for configuration
738 """
739 i3db = ConfigDB()
740 if not i3db.authenticate(self.dbserver,self.username, self.password,
741 self.database,port=self.dbport,keep_open=True):
742 self.logger.error("Access denied for user %s" % username)
743 retval = fail("Access denied for user %s" % username)
744 else:
745 retval = dumps(i3db.download_config(dataset,defaults,descriptions))
746 i3db.disconnect()
747 return retval
748
750 """
751 Fetch configuration from database
752 @param datasest: dataset id for configuration
753 """
754 msg = "%80s\n" % ('*'*80)
755 msg += '\n'
756 msg += "%80s\n" % time.asctime()
757 msg += '\n'
758 msg += "%80s\n" % ("IceProd Server version %s" % iceprod.server.__version__)
759 msg += '\n'
760 msg += "%80s\n" % ("Running on host %s" % os.uname()[1])
761 msg += '\n'
762 msg += "%80s\n" % ('*'*80)
763 return dumps(msg)
764
765 - def echo(self,msg):
767
768
770 """
771 expose the methods
772 """
773 self.server.register_function(self.echo)
774 self.server.register_function(self.submit)
775 self.server.register_function(self.authenticate)
776 self.server.register_function(self.checkjobs)
777 self.server.register_function(self.queue_remove)
778 self.server.register_function(self.queue_suspend)
779 self.server.register_function(self.queue_clean)
780 self.server.register_function(self.queue_delete)
781 self.server.register_function(self.queue_dataset_finish)
782 self.server.register_function(self.queue_dataset_toggle_debug)
783 self.server.register_function(self.queue_resume)
784 self.server.register_function(self.queue_validate)
785 self.server.register_function(self.queue_setstatus)
786 self.server.register_function(self.getstatus)
787 self.server.register_function(self.loaddict)
788 self.server.register_function(self.queue_retire)
789 self.server.register_function(self.daemon_suspend)
790 self.server.register_function(self.daemon_resume)
791 self.server.register_function(self.printsummary)
792 self.server.register_function(self.showrunlist)
793 self.server.register_function(self.download_config)
794 self.server.register_function(self.check_connection)
795 self.server.register_function(self.get_simcat_categories)
796
810
811
813 """
814 XMLRPC server class for submitting jobs to IceProd
815 job connect to server from compute nodes and make status updates
816 Similar to SoapTray class but runs CGI embeded in existing HTTP server
817 """
818
820
821 self.cfg = cfg
822 self.use_ldap = self.cfg.getboolean('ldap','enable')
823 self.logger = logging.getLogger('SoapTray')
824 self.semaphore = DummySemaphore(verbose=1)
825 self.grid_id = 0
826 self.submithost = os.getenv('HOSTNAME')
827
828 self.use_ldap = self.cfg.getboolean('ldap','enable')
829 self.dbserver = self.cfg.get('database','server')
830 self.dbport = self.cfg.getint('database','port')
831 self.database = self.cfg.get('database','database')
832 self.username = self.cfg.get('database','username')
833 self.password = self.cfg.get('database','password')
834 self.grid_name = self.cfg.get('queue','name')
835 self.batchsys = self.cfg.get('queue','batchsys')
836 self.institution = self.cfg.get('info','INSTITUTION')
837 self.address = self.cfg.get('server','server')
838 self.port = self.cfg.getint('server','port')
839 self.rootdir = self.cfg.get('path','basedir')
840 self.datawarehouse = expandvars(self.cfg.get('path','datawarehouse',raw=True))
841
842 if self.cfg.has_option('queue','SUBMITHOST'):
843 self.submithost = self.cfg.get('queue','SUBMITHOST')
844
845 self.xmluri = 'http://www.icecube.wisc.edu/simulation/dtd/icetray.dtd'
846 self.server = rpc.MyCGIXMLRPCRequestHandler()
847
848 self.i3db = ConfigDB()
849 self.i3mondb = MonitorDB()
850
865
866 - def submit(self,sconfig,username,password,submitter,production=False):
867 """
868 Receive a remote request for a job submission
869 @param sconfig: pickled steering configuration
870 @param username: (needed for connecting to the configuration database
871 @param password: (needed for connecting to the configuration database
872 @return: Output generated by queue
873 """
874
875 i3db = ConfigDB()
876 if not self.auth_db(i3db,username, password,keep_open=True):
877 self.logger.info("Access denied for user %s" % username)
878 return fail("Access denied for user %s" % username)
879
880 self.logger.info("Handling submission from user %s" % username)
881
882 i3steering = cPickle.loads(sconfig)
883 i3q = None
884 if production:
885 try:
886 status,i3q = self.enqueue(i3steering,username,password,submitter)
887 except Exception, e:
888 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
889 self.logger.error('Caught %s: %s' % (sys.exc_type, str(e)))
890 return fail(str(e),e)
891 else:
892 return fail("cgi-soaptray does not support non-production jobs.")
893 del i3steering
894 return status,dumps(i3q),dumps(None)
895
896
897
899 """
900 XMLRPC server class for monitoring jobs
901 job connect to server from compute nodes and make status updates
902 Creates own HTTP server
903 """
904
906 """
907 Retrive dictionary entry
908 @param: key string
909 @return: string value
910 """
911 i3db = self.i3db.new()
912 self.semaphore.acquire()
913 i3db.connect()
914 logger.debug("fetching entry %s " % key )
915 value = cPickle.dumps(None)
916 try:
917 value = i3db.fetch_dict_value(key)
918 logger.debug("fetched entry %s,%s " % (key,value) )
919 except Exception,e:
920 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
921 logger.error(e)
922 i3db.disconnect()
923 self.semaphore.release()
924 return value
925
926
927 - def getfile(self,key,dataset_id=0):
928 """
929 Retrive dictionary entry
930 @param: key string
931 @return: string value
932 """
933 i3db = self.i3db.new()
934 self.semaphore.acquire()
935 i3db.connect()
936 value = cPickle.dumps(None)
937 try:
938 value = i3db.fetch_filename(key,dataset_id)
939 value = tuple(value)
940 except Exception,e:
941 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
942 logger.error(e)
943 i3db.disconnect()
944 self.semaphore.release()
945 return value
946
947
949 """
950 Download configuration from database
951 """
952 i3db = self.i3db.new()
953 self.semaphore.acquire()
954 i3db.connect()
955 value = cPickle.dumps(None)
956 try:
957 i3config = i3db.download_config(dataset)
958 value = cPickle.dumps(IceTrayXMLWriter(i3config,self.xmluri).getDOM())
959 except Exception,e:
960 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
961 logger.error(e)
962 i3db.disconnect()
963 self.semaphore.release()
964 return value
965
966
987
988 - def start(self,hostname,dataset=0,job_id=0,key='',grid=0):
989 """
990 Change the status of a job to indicate it is currently running
991 @param : hostname
992 @param : dataset simdb_id
993 @param : job_id
994 @param : key a passkey to prevent processes from overriding entries
995 @return: dataset_id,nproc,procnum
996 """
997 i3mondb = self.i3mondb.new()
998 self.semaphore.acquire()
999 i3mondb.connect()
1000 i3mondb.set_auto(True)
1001 logger.info("job %d.%d starting on %s" % (dataset,job_id,hostname))
1002 if not len(key): key = None
1003 try:
1004 if not grid: grid = self.grid_id
1005 dataset,jobs,queue_id = i3mondb.jobstart(hostname,grid,dataset,job_id,key)
1006 i3mondb.commit()
1007 except Exception,e:
1008 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1009 logger.error(e)
1010 dataset,jobs,queue_id = (0,0,0)
1011 i3mondb.disconnect()
1012 logger.info("job %d.%d started on %s" % (dataset,job_id,hostname))
1013 self.semaphore.release()
1014 return dataset,jobs,queue_id
1015
1016 - def ping(self,dataset_id,job_id,host,key='',tray=0,iter=0):
1017 """
1018 Let server know that job is still running
1019 @param dataset_id: runconfig index
1020 @param job_id: process number within dataset
1021 @param host: hostname of computing node
1022 """
1023 if not len(key): key = None
1024 logger.info("Received ping from %s with job %d.%d " % (host,dataset_id,job_id))
1025 logger.info("processing tray %d, iter %d " % (tray,iter))
1026 i3mondb = self.i3mondb.new()
1027 self.semaphore.acquire()
1028 i3mondb.connect()
1029 i3mondb.set_auto(True)
1030 try:
1031 retval = i3mondb.jobping(dataset_id,job_id,host,key,tray,iter)
1032 i3mondb.commit()
1033 except Exception,e:
1034 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1035 logger.error(e)
1036 retval = -1
1037 i3mondb.disconnect()
1038 self.semaphore.release()
1039 return retval
1040
1041 - def finish(self,dataset_id,job_id,stats,key='',mode=0):
1042 """
1043 Update monitoring for job and write statistics
1044 @param dataset_id: runconfig index
1045 @param job_id: process number within dataset
1046 @param stats: dictonary of stat entries
1047 @param key: security passkey assigned to job
1048 @param mode: if true, finalize job and set status to COPIED instead of
1049 setting its status to READYTOCOPY
1050 """
1051 if not len(key): key = None
1052 logger.info('job %d.%d finished (mode %u)' % (dataset_id,job_id,mode))
1053 i3mondb = self.i3mondb.new()
1054 self.semaphore.acquire()
1055 i3mondb.connect()
1056 i3mondb.set_auto(True)
1057 count = 10
1058 try:
1059 logger.debug("stats: "+str(stats))
1060 retval = i3mondb.jobfinish(dataset_id,job_id,cPickle.loads(stats),key,mode)
1061 if mode:
1062 i3mondb.jobfinalize(dataset_id,job_id,key,status='COPIED')
1063 i3mondb.commit()
1064 except OperationalError,e:
1065 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1066 logger.error(e)
1067 logger.error("failed to set finish status for job %d.%d." % (dataset_id,job_id))
1068 retval = -1
1069 i3mondb.disconnect()
1070 self.semaphore.release()
1071 logger.debug('finish - done')
1072 return retval
1073
1074 - def copying(self,dataset_id,job_id,key=''):
1075 """
1076 Update monitoring for job and write statistics
1077 @param dataset_id: runconfig index
1078 @param job_id: process number within dataset
1079 @param stats: dictonary of stat entries
1080 """
1081 if not len(key): key = None
1082 logger.info('job %d.%d is copying data' % (dataset_id,job_id))
1083 i3mondb = self.i3mondb.new()
1084 self.semaphore.acquire()
1085 i3mondb.connect()
1086 i3mondb.set_auto(True)
1087 count = 10
1088 try:
1089 logger.debug("setting job status to 'COPYING for %d.%d' " % (dataset_id,job_id))
1090 retval = i3mondb.jobfinalize(dataset_id,job_id,key,status='COPYING')
1091 i3mondb.commit()
1092 except OperationalError,e:
1093 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1094 logger.error(e)
1095 logger.error("failed to set copying status for job %d.%d." % (dataset_id,job_id))
1096 retval = -1
1097 i3mondb.disconnect()
1098 self.semaphore.release()
1099 logger.debug('copying - done')
1100 return retval
1101
1102
1103 - def abort(self,job_id,dataset_id,error,errormessage='',key='',stats=cPickle.dumps({})):
1104 """
1105 Reset any pending jobs to they get reprocesses.
1106 This would typically be run at startup in case the daemon
1107 crashed previously.
1108 """
1109 if not len(key): key = None
1110 logger.warn('aborting job %d.%d - %s ' % (dataset_id,job_id,errormessage))
1111 i3mondb = self.i3mondb.new()
1112 self.semaphore.acquire()
1113 i3mondb.connect()
1114 i3mondb.set_auto(True)
1115 retval = 1
1116 try:
1117 i3mondb.jobabort(job_id,dataset_id,error,errormessage,key,cPickle.loads(stats))
1118 i3mondb.commit()
1119 except OperationalError,e:
1120 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1121 logger.error(e)
1122 logger.error("failed to abort job %d.%d." % (dataset_id,job_id))
1123 retval = 0
1124 i3mondb.disconnect()
1125 self.semaphore.release()
1126 return retval
1127
1128 - def AddFileURL(self,dataset_id,job_id,url,md5sum,filesize,transfertime,key=''):
1129 """
1130 Add or change the global location of a file
1131 """
1132 if not len(key): key = None
1133 filename = os.path.basename(url)
1134 location = os.path.dirname(url)
1135 logger.debug('%06u.%06u: %s' % (dataset_id,job_id,url))
1136
1137 i3mondb = self.i3mondb.new()
1138 self.semaphore.acquire()
1139 i3mondb.connect()
1140 try:
1141 logger.info('SetFileURL: %06u.%06u:%s' % (dataset_id,job_id,url))
1142 retval = i3mondb.SetFileURL(job_id,dataset_id,location,filename,md5sum,filesize,transfertime,key)
1143 i3mondb.commit()
1144 retval = 1
1145 except OperationalError,e:
1146 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1147 logger.error(e)
1148 logger.error("failed to set url %s for job %d.%d." % (url,dataset_id,job_id))
1149 retval = 0
1150 except Exception,e:
1151 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1152 logger.error(e)
1153
1154 i3mondb.disconnect()
1155 self.semaphore.release()
1156 return retval
1157
1158
1159
1161 """
1162 Change the status of a job to indicate it is currently running
1163 @param : dataset simdb_id
1164 @param : job_id
1165 @param : key a passkey to prevent processes from overriding entries
1166 @return: dataset_id,nproc,procnum
1167 """
1168 i3mondb = self.i3mondb.new()
1169 self.semaphore.acquire()
1170 i3mondb.connect()
1171 i3mondb.set_auto(False)
1172 logger.info("job %d.%d starting" % (dataset_id,queue_id))
1173 if not len(key): key = None
1174 try:
1175 dataset,jobs,queue_id = i3mondb.multipart_job_start(dataset_id,queue_id,key)
1176 if dataset_id != TASK_DATASET_ERROR_ID:
1177 i3mondb.commit()
1178 else:
1179 i3mondb.rollback()
1180 except OperationalError,e:
1181 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1182 logger.error(e)
1183 dataset,jobs,queue_id = (TASK_DATASET_ERROR_ID,0,0)
1184 i3mondb.disconnect()
1185 self.semaphore.release()
1186 return dataset,jobs,queue_id
1187
1189 i3mondb = self.i3mondb.new()
1190 self.semaphore.acquire()
1191 i3mondb.connect()
1192 i3mondb.set_auto(False)
1193 logger.info("job %d.%d finished successfully" % (dataset_id,queue_id))
1194 if not len(key): key = None
1195 try:
1196 ret = i3mondb.multipart_job_finish(dataset_id,queue_id,key)
1197 if ret:
1198 i3mondb.commit()
1199 else:
1200 i3mondb.rollback()
1201 except OperationalError,e:
1202 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1203 logger.error(e)
1204 i3mondb.disconnect()
1205 self.semaphore.release()
1206 return True
1207
1208 - def task_start(self,dataset_id,queue_id,taskname,tray,iter,hostname,key=''):
1209 i3mondb = self.i3mondb.new()
1210 self.semaphore.acquire()
1211 i3mondb.connect()
1212 i3mondb.set_auto(False)
1213 logger.info("job %d.%d starting task %s, tray %s, iter %s" \
1214 % (dataset_id,queue_id,taskname,tray,iter))
1215 if not len(key): key = None
1216 try:
1217 task_id = i3mondb.task_start(dataset_id,queue_id,taskname,tray,iter,hostname,key)
1218 if task_id != TASK_ERROR_ID:
1219 i3mondb.commit()
1220 else:
1221 i3mondb.rollback()
1222 except OperationalError,e:
1223 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1224 logger.error(e)
1225 task_id = TASK_ERROR_ID
1226 i3mondb.disconnect()
1227 self.semaphore.release()
1228 return task_id
1229
1233
1237
1241
1243 i3mondb = self.i3mondb.new()
1244 self.semaphore.acquire()
1245 i3mondb.connect()
1246 i3mondb.set_auto(False)
1247 if not len(key): key = None
1248 try:
1249 ret = i3mondb.task_update_status(task_id,status,key)
1250 if ret:
1251 i3mondb.commit()
1252 else:
1253 i3mondb.rollback()
1254 except OperationalError,e:
1255 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1256 logger.error(e)
1257 ret = False
1258 i3mondb.disconnect()
1259 self.semaphore.release()
1260 return ret
1261
1263 i3mondb = self.i3mondb.new()
1264 self.semaphore.acquire()
1265 i3mondb.connect()
1266 i3mondb.set_auto(False)
1267 logger.warn("task %d aborted" % task_id)
1268 if not len(key): key = None
1269 try:
1270 ret = i3mondb.task_abort(task_id,key)
1271 if ret:
1272 i3mondb.commit()
1273 else:
1274 i3mondb.rollback()
1275 except OperationalError,e:
1276 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1277 logger.error(e)
1278 ret = False
1279 i3mondb.disconnect()
1280 self.semaphore.release()
1281 return ret
1282
1284 stats = cPickle.loads(stats)
1285 i3mondb = self.i3mondb.new()
1286 self.semaphore.acquire()
1287 i3mondb.connect()
1288 i3mondb.set_auto(False)
1289 logger.info("task %d finished" % task_id)
1290 if not len(key): key = None
1291 try:
1292 ret = i3mondb.task_finish(task_id,stats,key)
1293 if ret:
1294 i3mondb.commit()
1295 else:
1296 i3mondb.rollback()
1297 except OperationalError,e:
1298 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1299 logger.error(e)
1300 ret = False
1301 i3mondb.disconnect()
1302 self.semaphore.release()
1303 return ret
1304
1305
1306
1308 """
1309 For some reason SOAPpy throws an exception if we get an
1310 HTTP instead of HTTPS request when using SSL
1311 """
1312 if ssl_supported:
1313 try:
1314 server.serve_forever()
1315 except KeyboardInterrupt:
1316 logger.info("Received keyboard interrupt")
1317 logger.info("Exiting")
1318 os._exit(0)
1319
1320 except SSL.Error,e:
1321 logger.error("received: " + str(e))
1322 serve_forever(server)
1323 except socket.error,e:
1324 self.logger.error(str(e))
1325 os._exit(1)
1326 else:
1327 try:
1328 server.serve_forever()
1329 except KeyboardInterrupt:
1330 logger.info("Received keyboard interrupt")
1331 logger.info("Exiting")
1332 os._exit(0)
1333 except socket.error,e:
1334 self.logger.error(str(e))
1335 os._exit(1)
1336
1337 - def echo(self,msg):
1339
1340
1342 """
1343 Register daemon with database and update status
1344 """
1345 i3db = MonitorDB()
1346 if not i3db.authenticate(self.host,self.user,self.passwd,
1347 self.database,port=self.dbport,keep_open=True):
1348 raise Exception, 'unable to authenticate database user'
1349 self.grid_id = i3db.GetGridId(self.grid_name)
1350 i3db.disconnect()
1351 return self.grid_id
1352
1375
1383
1385 self.semaphore = semaphore
1386
1388
1389 self.cfg = cfg
1390 self.use_ldap = self.cfg.getboolean('ldap','enable')
1391 self.logger = logging.getLogger('SoapMon')
1392 self.grid_name = self.cfg.get('queue','name')
1393 address = self.cfg.get('monitoring','server')
1394 port = self.cfg.getint('monitoring','port')
1395 self.semaphore = DummySemaphore()
1396 self.usesecure = True
1397
1398 try:
1399 self.xmluri = self.cfg.get('path','uri')
1400 except:
1401 self.xmluri = 'http://www.icecube.wisc.edu/simulation/dtd/icetray.dtd'
1402
1403 if self.cfg.has_option('monitoring','USESSL'):
1404 self.usesecure = self.cfg.getboolean('monitoring','USESSL')
1405 if self.cfg.has_option('security','USESSL'):
1406 self.usesecure = self.usesecure and self.cfg.getboolean('security','USESSL')
1407 if self.usesecure:
1408 try:
1409 cert = expandvars(self.cfg.get('security','SSLCERT'))
1410 key = expandvars(self.cfg.get('security','SSLKEY'))
1411 if not os.path.exists(cert):
1412 self.logger.fatal("Cannot find SSL certificate in %s" % cert)
1413 if not os.path.exists(key):
1414 self.logger.fatal("Cannot find SSL key in %s" % key)
1415
1416 sslctx = SSL.Context(SSL.SSLv23_METHOD)
1417 sslctx.use_privatekey_file (key)
1418 sslctx.use_certificate_file(cert)
1419 self.server = rpc.ThreadedSecureXMLRPCServer((address, port), ssl_context = sslctx)
1420 self.logger.info("Monitoring server running **encrypted** on addr:%s:%d" % (address,port))
1421 except Exception,e:
1422 self.logger.warn(e)
1423 self.server = rpc.ThreadedXMLRPCServer((address, port))
1424 self.logger.info("Monitoring server running **un-encrypted** on addr:%s:%d" % (address,port))
1425 else:
1426 self.server = rpc.ThreadedXMLRPCServer((address, port))
1427 self.logger.info("Monitoring server running **un-encrypted** on addr:%s:%d" % (address,port))
1428
1429 self.host = self.cfg.get('database','server')
1430 self.user = self.cfg.get('database','username')
1431 self.passwd = self.cfg.get('database','password')
1432 self.database = self.cfg.get('database','database')
1433 self.dbport = self.cfg.getint('database','port')
1434
1435 self.i3db = ConfigDB()
1436 self.i3db.authenticate(self.host,self.user,self.passwd,
1437 self.database,port=self.dbport,keep_open=False)
1438
1439 self.i3mondb = MonitorDB()
1440 self.i3mondb.authenticate(self.host,self.user,self.passwd,
1441 self.database,port=self.dbport,keep_open=False)
1442
1443
1444
1446 """
1447 XMLRPC server class for monitoring jobs
1448 job connect to server from compute nodes and make status updates
1449 Similar to Monitor class but runs CGI embeded in existing HTTP server
1450
1451 """
1452
1454
1455 self.semaphore = DummySemaphore(verbose=1)
1456 self.cfg = cfg
1457 self.use_ldap = self.cfg.getboolean('ldap','enable')
1458 self.grid_id = 79
1459
1460 self.xmluri = 'http://www.icecube.wisc.edu/simulation/dtd/icetray.dtd'
1461 self.server = rpc.MyCGIXMLRPCRequestHandler()
1462
1463 self.host = self.cfg.get('database','server')
1464 self.user = self.cfg.get('database','username')
1465 self.passwd = self.cfg.get('database','password')
1466 self.database = self.cfg.get('database','database')
1467 self.dbport = self.cfg.getint('database','port')
1468
1469 self.i3db = ConfigDB()
1470 self.i3db.authenticate(self.host,self.user,self.passwd,
1471 self.database,port=self.dbport,keep_open=False)
1472
1473 self.i3mondb = MonitorDB()
1474 self.i3mondb.authenticate(self.host,self.user,self.passwd,
1475 self.database,port=self.dbport,keep_open=False)
1476
1480