1
2
3 """
4 XML-RPC Server module for running remote batch jobs on IceTray.
5
6 copyright (c) 2005 the icecube collaboration
7
8 @version: $Revision: $
9 @date: $Date: $
10 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
11 @todo: Add HTTPS support
12 @todo: Add XML validation in suporting classes
13
14 U{Userguide and other documentation <http://www.icecube.wisc.edu/~juancarlos/software/sim-prod>}
15
16 """
17
18 import sys
19 import os
20 import string
21 import re
22 import time
23 import getopt
24 import cPickle
25 import logging
26 import signal
27 import exceptions
28 import pwd,grp
29 import thread,threading
30 from threading import Thread
31 from os.path import expandvars,join
32 from cPickle import dumps,loads
33 from MySQLdb import OperationalError
34 from Queue import Queue
35
36 try:
37 import ldap
38 ldap_installed = True
39 except ImportError:
40 ldap_installed = False
41
42 import iceprod
43 from iceprod.server import rpc
44 from iceprod.core.dataclasses import *
45 from iceprod.core.xmlwriter import IceTrayXMLWriter
46 from iceprod.server.db import ConfigDB,MonitorDB
47 from iceprod.core.configuration import Config
48 from iceprod.server.db import MySQLParamDb,SoapParamDB
49 from iceprod.server.queue import i3ProdQueue
50 from iceprod.server.job import i3Job
51
52 import iceprod.core.logger
53 logger = logging.getLogger('server')
54
55 localhost = os.uname()[1].split(".")[0]
56
57 soapdaemons = ['soaptray','soapmon','soapqueue','soapdh','soaphisto']
58
59 ssl_supported = False
60
61 try:
62 from OpenSSL import SSL
63 ssl_supported = True
64 except Exception,e:
65 print >> sys.stderr,'exception: %s : communicaton will not be encrypted' % str(e)
66 pass
67
68
69 -def fail(msg,exception=Exception()):
70 return msg,dumps(None),dumps(exception)
71
72
74 """
75 Dummy class that shares interface with Semaphore class but does nothing.
76 """
77
78 - def __init__(self, value=1, verbose=None):
79 self.counter = value
80 self.verbose = verbose
81
83 self.counter -= 1
84 if self.verbose: logger.info("count: %d" % self.counter)
85 return self.counter
86
88 self.counter += 1
89 if self.verbose: logger.info("count: %d" % self.counter)
90 return self.counter
91
93 logger = logging.getLogger('QueueSemaphore')
94
95 - def __init__(self, value=1, verbose=None):
96 self.verbose = verbose
97 self.queue = Queue(value)
98
100 self.queue.put(1)
101 if self.verbose: self.logger.info("count: %d" % self.queue.qsize())
102
106
107
109
110 """
111 XMLRPC server class for submitting jobs to IceProd
112 """
113
115 self.cfg = cfg
116
117 self.use_ldap = self.cfg.getboolean('ldap','enable') and ldap_installed
118 self.logger = logging.getLogger('SoapTray')
119 self.dbserver = self.cfg.get('database','server')
120 self.dbport = self.cfg.getint('database','port')
121 self.database = self.cfg.get('database','database')
122 self.username = self.cfg.get('database','username')
123 self.password = self.cfg.get('database','password')
124 self.grid_name = self.cfg.get('queue','name')
125 self.batchsys = self.cfg.get('queue','batchsys')
126 self.institution = self.cfg.get('info','INSTITUTION')
127 self.address = self.cfg.get('server','server')
128 self.port = self.cfg.getint('server','port')
129 self.rootdir = self.cfg.get('path','basedir')
130 self.datawarehouse = expandvars(self.cfg.get('path','datawarehouse',raw=True))
131 self.usesecure = True
132 if self.cfg.has_option('server','USESSL'):
133 self.usesecure = self.cfg.getboolean('server','USESSL')
134 if self.cfg.has_option('security','USESSL'):
135 self.usesecure = self.usesecure and self.cfg.getboolean('security','USESSL')
136
137 if self.use_ldap:
138 self.ldap_users = self.cfg.get('ldap','users').split(',')
139 self.ldap_users = map(string.strip,self.ldap_users)
140
141 self.iceprodcert = None
142 self.iceprodkey = None
143 if self.usesecure:
144 self.iceprodcert = expandvars(self.cfg.get('security','SSLCERT'))
145 self.iceprodkey = expandvars(self.cfg.get('security','SSLKEY'))
146
147 self.submithost = os.getenv('HOSTNAME')
148 if self.cfg.has_option('queue','SUBMITHOST'):
149 self.submithost = self.cfg.get('queue','SUBMITHOST')
150
151
152 if ssl_supported and self.usesecure:
153 sslctx = SSL.Context(SSL.SSLv23_METHOD)
154 if not os.path.exists(self.iceprodcert):
155 self.logger.fatal("Cannot find SSL certificate in %s" % self.iceprodcert)
156 if not os.path.exists(self.iceprodkey):
157 self.logger.fatal("Cannot find SSL key in %s" % self.iceprodkey)
158
159 sslctx.use_privatekey_file (self.iceprodkey)
160 sslctx.use_certificate_file(self.iceprodcert)
161 self.server = rpc.ThreadedSecureXMLRPCServer(
162 (self.address, self.port),
163 logRequests=False, ssl_context = sslctx)
164 self.logger.info("soaptray server running **encrypted** on addr:%s:%d" % (self.address,self.port))
165
166
167 else:
168 self.server = rpc.ThreadedXMLRPCServer((self.address, self.port))
169 self.logger.info("soaptray server running **un-encrypted** on addr:%s:%d" % (self.address,self.port))
170
171
173 """
174 Register daemon with database and update status
175 """
176 i3db = MonitorDB()
177 if not i3db.authenticate(self.dbserver,self.username, self.password,
178 self.database, port=self.dbport,keep_open=True):
179 raise Exception, 'unable to authenticate database user'
180 self.grid_id = i3db.GetGridId(self.grid_name)
181 i3db.disconnect()
182 return self.grid_id
183
185 """
186 For some reason SOAPpy throws an exception if we get an
187 HTTP instead of HTTPS request when using SSL
188 """
189 if ssl_supported:
190 try:
191 self.server.serve_forever()
192 except KeyboardInterrupt:
193 self.logger.info("Received keyboard interrupt")
194 self.logger.info("Exiting")
195 os._exit(0)
196
197 except SSL.Error,e:
198 self.logger.error("received: " + str(e))
199 self.serve_forever()
200 except socket.error,e:
201 self.logger.error(str(e))
202 os._exit(1)
203 else:
204 try:
205 self.server.serve_forever()
206 except KeyboardInterrupt:
207 self.logger.info("Received keyboard interrupt")
208 self.logger.info("Exiting")
209 os._exit(0)
210 except socket.error,e:
211 self.logger.error(str(e))
212 os._exit(1)
213
214 - def auth_db(self,db_obj,username,password,keep_open=False):
215 """
216 Authenticate remotely against database
217 This is the exposed method
218 @param db_obj: database instance
219 @param username:
220 @param password:
221 @param : keep_open
222 @return: True if authenticated False otherwise
223 """
224 if not password:
225 return False
226
227 self.logger.info("ldap is %s" % self.use_ldap)
228 if self.cfg.getboolean('ldap','enable') and not ldap_installed:
229 self.logger.warn("ldap is enabled but not supported.")
230
231 if self.use_ldap:
232 l=ldap.initialize(self.cfg.get('ldap','url'))
233 try:
234 result = l.bind(self.cfg.get('ldap','cn',raw=True)%username, password,ldap.AUTH_SIMPLE)
235 self.logger.info(str(l.result(result)))
236 except ldap.INVALID_CREDENTIALS, error:
237 logger.error(error)
238 except ldap.LDAPError, error:
239 self.logger.error(error)
240 except Exception, error:
241 self.logger.error(error)
242 else:
243 if username in self.ldap_users:
244 logger.info("found %s in ldap users" % username)
245 return db_obj.authenticate(self.dbserver,self.username,self.password,self.database,port=self.dbport,keep_open=keep_open)
246
247 logger.info("Assuming %s is a database account " % username)
248 return db_obj.authenticate(self.dbserver,username,password,self.database,port=self.dbport,keep_open=keep_open)
249
250
251
253 """
254 Authenticate remotely against database
255 This is the exposed method
256 @param username:
257 @param password:
258 @return: True if authenticated False otherwise
259 """
260 if ConfigDB().authenticate2(self.dbserver,username,password,self.database,port=self.dbport):
261 return True
262
263 if self.use_ldap:
264 l=ldap.initialize(self.cfg.get('ldap','url'))
265 try:
266 result = l.bind(self.cfg.get('ldap','cn',raw=True)%username, password,ldap.AUTH_SIMPLE)
267 except ldap.INVALID_CREDENTIALS, error:
268 logger.error(error)
269 return False
270 else:
271 return True
272 else:
273 return False
274
275
276 - def enqueue(self,i3steering,username,password,submitter):
277 configdb = ConfigDB()
278 configdb.SetSubmitter(submitter)
279 configdb.SetInstitution(self.institution)
280 if self.cfg.has_option('soapdh','tempdata'):
281 configdb.SetTempStoragePath(expandvars(self.cfg.get('soapdh','tempdata',raw=True)))
282 else:
283 configdb.SetTempStoragePath(expandvars(self.cfg.get('path','datawarehouse',raw=True)))
284 configdb.SetGlobalStoragePath(expandvars(self.cfg.get('path','datawarehouse',raw=True)))
285 mondb = MonitorDB()
286
287 if not self.auth_db(configdb,username, password,keep_open=True):
288 self.logger.info("ConfigDB: Access denied for user %s" % username)
289 configdb.disconnect()
290 return fail("ConfigDB: Access denied for user %s" % username)
291 if not self.auth_db(mondb,username, password,keep_open=True):
292 self.logger.info("MonitorDB: Access denied for user %s" % username)
293 mondb.disconnect()
294 return fail("MonitorDB: Access denied for user %s" % username)
295
296 q = i3ProdQueue(self.cfg)
297 q.SetConfigDB(configdb)
298 q.SetMonitorDB(mondb)
299 q.SetRootDir(self.rootdir)
300 q.SetSubmitHost(self.submithost)
301 q.SetSubmitter(submitter)
302 q.SetInstitution(self.institution)
303 status,i3q = q.EnQueue(i3steering)
304
305
306 configdb.disconnect()
307 mondb.disconnect()
308 del q
309 return status,i3q
310
311
312 - def submit(self,
313 sconfig,
314 username,
315 password,
316 submitter,
317 production=False,
318 start=0,
319 end=0,
320 dataset=0):
321 """
322 Receive a remote request for a job submission
323 @param sconfig: pickled steering configuration
324 @param username: (needed for connecting to the configuration database
325 @param password: (needed for connecting to the configuration database
326 @param submitter: username of person who submitted dataset
327 @param production: boolean flag
328 @param start: optional begining of job sequence (non-prod)
329 @param end: optional end of job sequence (non-prod)
330 @param dataset: optional (non-prod)
331 @return: Output generated by queue
332 """
333
334 i3db = ConfigDB()
335 if not self.auth_db(i3db,username, password):
336 self.logger.info("Access denied for user %s" % username)
337 return fail("Access denied for user %s" % username)
338
339 self.logger.info("Handling submission from user %s" % username)
340
341 i3steering = cPickle.loads(sconfig)
342 i3q = None
343 if production:
344 try:
345 status,i3q = self.enqueue(i3steering,username,password,submitter)
346 except Exception, e:
347 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
348 self.logger.error('Caught %s: %s' % (sys.exc_type, str(e)))
349 i3db.disconnect()
350 return fail(str(e),e)
351 else:
352 q = i3ProdQueue(self.cfg)
353 q.SetRootDir(self.rootdir)
354 q.SetSubmitHost(self.submithost)
355 q.SetSubmitter(submitter)
356
357 try:
358 status,i3q = q.Submit(i3steering,first=start,last=end,npid=dataset)
359 except Exception, e:
360 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
361 self.logger.error('Caught %s: %s' % (sys.exc_type, str(e)))
362 i3db.disconnect()
363 del q
364 return fail(str(e),e)
365 del q
366 i3db.disconnect()
367 del i3steering
368 return status,dumps(i3q),dumps(None)
369
370
371 - def checkjobs(self,i3q_pkl,username,password):
372 """
373 Receive a remote request for a job status check
374 @param i3q_pkl: a serialized iGrid object
375 """
376
377 i3db = ConfigDB()
378 if not self.auth_db(i3db,username, password,keep_open=True):
379 self.logger.error("Access denied for user %s" % username)
380 return fail("Access denied for user %s" % username)
381
382 i3q = cPickle.loads(i3q_pkl)
383 i3q.batchsys = self.batchsys
384 status = i3q.CheckQ(i3db)
385 i3db.disconnect()
386 return status
387
389 """
390 Receive a remote request for a job status check
391 @param i3q_pkl: a serialized iGrid object
392 """
393
394 i3db = MonitorDB()
395 if not self.auth_db(i3db,username, password,keep_open=True):
396 self.logger.info("Access denied for user %s" % username)
397 return fail("Access denied for user %s" % username)
398
399 i3q = cPickle.loads(i3q_pkl)
400 i3q.batchsys = self.batchsys
401 status = i3q.QRemove(i3db)
402 i3db.disconnect()
403 return status
404
406 jobstr = ".%d" % job
407 if job < 0: jobstr = ""
408 i3db = MonitorDB()
409 if not self.auth_db(i3db,username, password,keep_open=True):
410 self.logger.warn("suspend %d%s denied for user %s" % (dataset,jobstr,username))
411 return fail("suspend %d%s denied for user %s" % (dataset,jobstr,username))
412 try:
413 i3db.jobsuspend(job,dataset,True)
414 self.logger.info("suspend %d%s granted for user %s" % (dataset,jobstr,username))
415 i3db.add_history(username,"suspend %d%s" % (dataset,jobstr))
416 self.logger.info("updated stats")
417 i3db.disconnect()
418 return "suspend %d%s granted for user %s" % (dataset,jobstr,username)
419 except Exception,e:
420 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
421 self.logger.error("suspend %d%s failed:%s" % (dataset,jobstr,str(e)))
422 i3db.disconnect()
423 return fail("suspend %d%s failed:%s" % (dataset,jobstr,str(e)))
424
426 jobstr = ".%d" % job
427 if job < 0: jobstr = ""
428 i3db = MonitorDB()
429 if not self.auth_db(i3db,username, password,keep_open=True):
430 self.logger.warn("resume %d%s denied for user %s" % (dataset,jobstr,username))
431 i3db.disconnect()
432 return fail("resume %d%s denied for user %s" % (dataset,jobstr,username))
433 try:
434 i3db.jobsuspend(job,dataset,False)
435 self.logger.info("resume %d%s granted for user %s" % (dataset,jobstr,username))
436 i3db.add_history(username,"resume %d%s" % (dataset,jobstr))
437 self.logger.info("updated stats")
438 i3db.disconnect()
439 return "resume %d%s granted for user %s" % (dataset,jobstr,username)
440 except Exception, e:
441 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
442 self.logger.error("resume %d%s failed:%s" % (dataset,jobstr,e))
443 i3db.disconnect()
444 return "resume %d%s failed:%s" % (dataset,jobstr,e)
445
447 jobstr = ".%d" % job
448 if job < 0: jobstr = ""
449 i3db = MonitorDB()
450 if not self.auth_db(i3db,username, password,keep_open=True):
451 self.logger.warn("reset %d%s denied for user %s" % (dataset,jobstr,username))
452 return fail("reset %d%s denied for user %s" % (dataset,jobstr,username))
453 try:
454 i3db.jobsuspend(job,dataset,False)
455 i3db.add_history(username,"reset %d%s" % (dataset,jobstr))
456 self.logger.info("reset %d%s granted for user %s" % (dataset,jobstr,username))
457 i3db.disconnect()
458 return "reset %d%s granted for user %s" % (dataset,jobstr,username)
459 except Exception, e:
460 self.logger.error(sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) )
461 self.logger.error("reset %d%s failed:%s" % (dataset,jobstr,e))
462 i3db.disconnect()
463 return "reset %d%s failed:%s" % (dataset,jobstr,e)
464
465
467 i3db = MonitorDB()
468 if not self.auth_db(i3db,username, password,keep_open=True):
469 self.logger.warn("clean %d denied for user %s" % (dataset,username))
470 i3db.disconnect()
471 return fail("clean %d denied for user %s" % (dataset,username))
472 try:
473 i3db.update_monitoring(self.grid_id,dataset_id=dataset)
474 i3db.jobclean(dataset)
475 i3db.add_history(username,"clean %d" % dataset)
476 self.logger.info("clean %d granted for user %s" % (dataset,username))
477 i3db.disconnect()
478 return "clean %d granted for user %s" % (dataset,username)
479 except Exception,e:
480 self.logger.error("clean %d failed:%s" % (dataset,str(e)))
481 i3db.disconnect()
482 return fail("clean %d failed:%s" % (dataset,str(e)))
483
485 i3db = ConfigDB()
486 i3mon = MonitorDB()
487 if not self.auth_db(i3db,username, password,keep_open=True):
488 self.logger.warn("delete %d denied for user %s" % (dataset,username))
489 i3db.disconnect()
490 return fail("delete %d denied for user %s" % (dataset,username))
491 self.auth_db(i3mon,username, password,keep_open=True)
492 try:
493 i3mon.update_monitoring(grid_id=None,dataset_id=dataset)
494 i3mon.jobclean(dataset,archive=False)
495 i3db.set_metadata_subcat(dataset,sub_cat="obsolete")
496 i3mon.setDatasetStatus(dataset,'OBSOLETE')
497 i3mon.validate(dataset,'FALSE')
498 i3mon.add_history(username,"nuke %d" % dataset)
499 i3mon.clearStorageURL(dataset)
500 self.logger.info("delete %d granted for user %s" % (dataset,username))
501 i3db.disconnect()
502 i3mon.disconnect()
503 return "delete %d granted for user %s" % (dataset,username)
504 except Exception,e:
505 i3db.disconnect()
506 i3mon.disconnect()
507 self.logger.error("delete %d failed:%s" % (dataset,str(e)))
508 return fail("delete %d failed:%s" % (dataset,str(e)))
509
511 i3db = MonitorDB()
512 if not self.auth_db(i3db,username, password,keep_open=True):
513 i3db.disconnect()
514 self.logger.warn("clean %d denied for user %s" % (dataset,username))
515 return fail("clean %d denied for user %s" % (dataset,username))
516 try:
517 i3db.update_monitoring(self.grid_id,dataset_id=dataset)
518 i3db.jobclean(dataset)
519 i3db.setDatasetStatus(dataset,'COMPLETE')
520 i3db.SuspendGridDataset('any',dataset)
521 i3db.add_history(username,"finish %d" % dataset)
522 i3db.disconnect()
523 self.logger.info("finish %d granted for user %s" % (dataset,username))
524 return "finish %d granted for user %s" % (dataset,username)
525 except Exception,e:
526 i3db.disconnect()
527 self.logger.error("finish %d failed:%s" % (dataset,str(e)))
528 return fail("finish %d failed:%s" % (dataset,str(e)))
529
531 i3db = MonitorDB()
532 if not self.auth_db(i3db,username, password,keep_open=True):
533 i3db.disconnect()
534 self.logger.warn("toggle debug %d denied for user %s" % (dataset,username))
535 return fail("toggle debug %d denied for user %s" % (dataset,username))
536 try:
537 i3db.ToggleDatasetDebug(dataset)
538 i3db.add_history(username,"toggle debug %d" % dataset)
539 i3db.disconnect()
540 self.logger.info("toggle debug %d granted for user %s" % (dataset,username))
541 return "toggle debug %d granted for user %s" % (dataset,username)
542 except Exception,e:
543 i3db.disconnect()
544 self.logger.error("toggle debug %d failed:%s" % (dataset,str(e)))
545 return fail("toggle debug %d failed:%s" % (dataset,str(e)))
546
547
548 - def loaddict(self,odict_pkl,username,password,dataset_id=0):
549 i3db = ConfigDB()
550 if not self.auth_db(i3db,username, password,keep_open=True):
551 i3db.disconnect()
552 self.logger.warn("validate %d denied for user %s" % (dataset,username))
553 return fail("validate %d denied for user %s" % (dataset,username))
554 try:
555 odict = cPickle.loads(odict_pkl)
556 i3db.load_dictionary(odict,dataset_id)
557 i3db.disconnect()
558 except Exception,e:
559 i3db.disconnect()
560 self.logger.warn("failed to load dictionary :\n%s" % str(e))
561 return fail("failed to load dictionary :\n%s" % str(e))
562
564 i3db = MonitorDB()
565 if not self.auth_db(i3db,username, password,keep_open=True):
566 i3db.disconnect()
567 self.logger.warn("validate %d denied for user %s" % (dataset,username))
568 return fail("validate %d denied for user %s" % (dataset,username))
569 try:
570 i3db.setDatasetStatus(dataset,status)
571 if status == 'PROCESSING':
572 i3db.SuspendGridDataset('all',dataset,0)
573 i3db.add_history(username,"dataset set status %s %d" % (status,dataset))
574 i3db.disconnect()
575 self.logger.info("dataset_setstatus '%s' on dataset %d granted for user %s" % (status,dataset,username))
576 return "setstatus '%s' on dataset %d granted for user %s" % (status,dataset,username)
577 except Exception,e:
578 i3db.disconnect()
579 self.logger.error("setstatus '%s' on dataset %d failed \n%s" % (status,dataset,str(e)))
580 return fail("setstatus '%s' on dataset %d failed \n%s" % (status,dataset,str(e)))
581
583 i3db = MonitorDB()
584 if not self.auth_db(i3db,username, password,keep_open=True):
585 i3db.disconnect()
586 self.logger.warn("validate %d denied for user %s" % (dataset,username))
587 return fail("validate %d denied for user %s" % (dataset,username))
588 try:
589 i3db.jobsetstatus(job,dataset,status)
590 i3db.add_history(username,"set status %s %d.%d" % (status,dataset,job))
591 i3db.disconnect()
592 self.logger.info("setstatus '%s' on %d.%d granted for user %s" % (status,dataset,job,username))
593 return "setstatus '%s' on %d.%d granted for user %s" % (status,dataset,job,username)
594 except Exception,e:
595 i3db.disconnect()
596 self.logger.error("setstatus '%s' on %d.%d failed \n%s" % (status,dataset,job,str(e)))
597 return fail("setstatus '%s' on %d.%d failed \n%s" % (status,dataset,job,str(e)))
598
600 """
601 Mark dataset as obsolete
602 """
603 i3db = ConfigDB()
604 i3mon = MonitorDB()
605 if not self.auth_db(i3db,username, password,keep_open=True):
606 i3db.disconnect()
607 self.logger.warn("retire %d denied for user %s" % (dataset,username))
608 return fail("retire %d denied for user %s" % (dataset,username))
609 self.auth_db(i3mon,username, password,keep_open=True)
610 try:
611 i3mon.update_monitoring(grid_id=None,dataset_id=dataset)
612 i3db.set_metadata_subcat(dataset,sub_cat="obsolete")
613 i3mon.setDatasetStatus(dataset,'OBSOLETE')
614 i3mon.add_history(username,"retire s %d" % dataset)
615 i3db.disconnect()
616 i3mon.disconnect()
617 self.logger.info("retire dataset %d granted for user %s" % (dataset,username))
618 return "retire dataset %d granted for user %s" % (dataset,username)
619 except Exception,e:
620 i3db.disconnect()
621 i3mon.disconnect()
622 self.logger.error("retire dataset %d failed \n%s" % (dataset,str(e)))
623 return fail("retire dataset %d failed \n%s" % (dataset,str(e)))
624
625
627 i3db = MonitorDB()
628 if not self.auth_db(i3db,username, password,keep_open=True):
629 i3db.disconnect()
630 self.logger.warn("validate %d denied for user %s" % (dataset,username))
631 return fail("validate %d denied for user %s" % (dataset,username))
632 try:
633 if valid:
634 i3db.validate(dataset,'TRUE')
635 else:
636 i3db.validate(dataset,'FALSE')
637 i3db.add_history(username,"validate %d" % dataset)
638 i3db.disconnect()
639 self.logger.info("change %d granted for user %s" % (dataset,username))
640 return "change %d granted for user %s" % (dataset,username)
641 except Exception,e:
642 i3db.disconnect()
643 self.logger.error("validate %d failed:%s" % (dataset,str(e)))
644 return fail("validate %d failed:\n%s" % (dataset,str(e)))
645
646
648 i3db = MonitorDB()
649 if not self.auth_db(i3db,username, password,keep_open=True):
650 i3db.disconnect()
651 self.logger.warn("suspend %s.%s denied for user %s" % (grid,daemon,username))
652 return fail("suspend %s.%s denied for user %s" % (grid,daemon,username))
653 try:
654 if daemon=='all':
655 for d in soapdaemons:
656 i3db.GridRequestSuspend(grid,d)
657 elif daemon in soapdaemons:
658 i3db.GridRequestSuspend(grid,daemon)
659 else:
660 self.logger.error("%s: invalid daemon %s" % (grid,daemon))
661 return fail("%s: invalid daemon %s" % (grid,daemon))
662
663 i3db.add_history(username,"suspend daemon %s.%s" % (grid,daemon) )
664 i3db.disconnect()
665 self.logger.info("suspend %s.%s granted for user %s" % (grid,daemon,username))
666 return "suspend %s.%s granted for user %s" % (grid,daemon,username)
667 except Exception,e:
668 i3db.disconnect()
669 self.logger.error("suspend %s.%s failed:%s" % (grid,daemon,username))
670 return fail("suspend %s.%s failed:%s" % (grid,daemon,username))
671
673 i3db = MonitorDB()
674 if not self.auth_db(i3db,username, password,keep_open=True):
675 i3db.disconnect()
676 self.logger.warn("resume %s.%s denied for user %s" % (grid,daemon,username))
677 return fail("resume %s.%s denied for user %s" % (grid,daemon,username))
678 try:
679 if daemon=='all':
680 for d in soapdaemons:
681 i3db.GridRequestResume(grid,d)
682 elif daemon in soapdaemons:
683 i3db.GridRequestResume(grid,daemon)
684 else:
685 i3db.disconnect()
686 self.logger.error("%s: invalid daemon %s" % (grid,daemon))
687 return fail("%s: invalid daemon %s" % (grid,daemon))
688
689 i3db.add_history(username,"resume daemon %s.%s" % (grid,daemon) )
690 i3db.disconnect()
691 self.logger.info("resume %s.%s granted for user %s" % (grid,daemon,username))
692 return "resume %s.%s granted for user %s" % (grid,daemon,username)
693 except Exception,e:
694 i3db.disconnect()
695 self.logger.error("resume %s.%s failed:%s" % (grid,daemon,username))
696 return fail("resume %s.%s failed:%s" % (grid,daemon,username))
697
698 - def grid_add(self,username,password,grid,dataset):
699 i3db = MonitorDB()
700 if not self.auth_db(i3db,username, password,keep_open=True):
701 i3db.disconnect()
702 self.logger.warn("grid_add %s %s denied for user %s" % (grid,dataset,username))
703 return fail("grid_add %s %s denied for user %s" % (grid,dataset,username))
704 try:
705 i3db.InitializeGridStats([grid],dataset)
706 i3db.add_history(username,"Grid %s added to dataset %s" % (grid,dataset) )
707 i3db.disconnect()
708 self.logger.info("grid_add %s %s granted for user %s" % (grid,dataset,username))
709 return "grid_add %s %s granted for user %s" % (grid,dataset,username)
710 except Exception,e:
711 i3db.disconnect()
712 self.logger.error("grid_add %s %s failed:%s" % (grid,dataset,username))
713
715 i3db = MonitorDB()
716 if not self.auth_db(i3db,username, password,keep_open=True):
717 i3db.disconnect()
718 self.logger.warn("grid_suspend_dataset %s %s denied for user %s" % (grid,dataset,username))
719 return "grid_suspend_dataset %s %s denied for user %s" % (grid,dataset,username)
720 try:
721 i3db.SuspendGridDataset(grid,dataset,suspend)
722 i3db.add_history(username,"Grid %s suspend set to %s for dataset %s" % (grid,suspend,dataset) )
723 i3db.disconnect()
724 self.logger.info("grid_suspend_dataset %s %s granted for user %s" % (grid,dataset,username))
725 return "grid_suspend_dataset %s %s granted for user %s" % (grid,dataset,username)
726 except Exception,e:
727 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
728 i3db.disconnect()
729 self.logger.error("grid_suspend_dataset %s %s failed:%s" % (grid,dataset,username))
730 return "grid_suspend_dataset %s %s failed %s" % (grid,dataset,e)
731
732
734 """
735 Get list of valid simcats
736 """
737 i3db = ConfigDB()
738 if not i3db.authenticate(self.dbserver,self.username, self.password,
739 self.database,port=self.dbport,keep_open=True):
740 i3db.disconnect()
741 self.logger.error("Access denied for user %s" % username)
742 retval = fail("Access denied for user %s" % username)
743 else:
744 retval = dumps(i3db.get_simcat_categories())
745 i3db.disconnect()
746 return retval
747
749 """
750 get usage statistics
751 @param days number of days (from today) to get data from
752 @return formated string with usage statistics
753 """
754 i3db = MonitorDB()
755 if not i3db.authenticate(self.dbserver,self.username, self.password,
756 self.database,port=self.dbport,keep_open=True):
757 i3db.disconnect()
758 self.logger.warn("printsummary %s.%s denied for user %s" % (grid,daemon,username))
759 return fail("printsummary %s.%s denied for user %s" % (grid,daemon,username))
760 try:
761 retval = i3db.printsummary(days)
762 self.logger.info("printsummary: serving request")
763 i3db.disconnect()
764 return retval
765 except Exception,e:
766 i3db.disconnect()
767 self.logger.error("print summary failed: %s" % e)
768 return fail("print summary failed")
769
771 """
772 Get list of datasets in database
773 @param search_string: a string containing key words to use
774 """
775 i3db = MonitorDB()
776 if not i3db.authenticate(self.dbserver,self.username, self.password,
777 self.database,port=self.dbport,keep_open=True):
778 self.logger.error("Access denied for user %s" % username)
779 retval = fail("Access denied for user %s" % username)
780 else:
781 retval = dumps(i3db.getstatus(dataset,job))
782 i3db.disconnect()
783 return retval
784
786 """
787 Get list of datasets in database
788 @param search_string: a string containing key words to use
789 """
790 i3db = MonitorDB()
791 if not i3db.authenticate(self.dbserver,self.username, self.password,
792 self.database,port=self.dbport,keep_open=True):
793 self.logger.error("Access denied for user %s" % username)
794 retval = fail("Access denied for user %s" % username)
795 else:
796 retval = dumps(i3db.getDatasetStatus(dataset))
797 i3db.disconnect()
798 return retval
799
800
802 """
803 Get list of datasets in database
804 @param search_string: a string containing key words to use
805 """
806 i3db = ConfigDB()
807 if not i3db.authenticate(self.dbserver,self.username, self.password,
808 self.database,port=self.dbport,keep_open=True):
809 self.logger.error("Access denied for user %s" % username)
810 retval = fail("Access denied for user %s" % username)
811 else:
812 retval = dumps(i3db.show_dataset_table(search_string))
813 i3db.disconnect()
814 return retval
815
817 """
818 Fetch configuration from database
819 @param datasest: dataset id for configuration
820 """
821 i3db = ConfigDB()
822 if not i3db.authenticate(self.dbserver,self.username, self.password,
823 self.database,port=self.dbport,keep_open=True):
824 self.logger.error("Access denied for user %s" % username)
825 retval = fail("Access denied for user %s" % username)
826 else:
827 retval = dumps(i3db.download_config(dataset,defaults,descriptions))
828 i3db.disconnect()
829 return retval
830
832 """
833 Fetch configuration from database
834 @param datasest: dataset id for configuration
835 """
836 msg = "%80s\n" % ('*'*80)
837 msg += '\n'
838 msg += "%80s\n" % time.asctime()
839 msg += '\n'
840 msg += "%80s\n" % ("IceProd Server version %s" % iceprod.server.__version__)
841 msg += '\n'
842 msg += "%80s\n" % ("Running on host %s" % os.uname()[1])
843 msg += '\n'
844 msg += "%80s\n" % ('*'*80)
845 return dumps(msg)
846
847 - def echo(self,msg):
849
850
852 """
853 expose the methods
854 """
855 self.server.register_function(self.echo)
856 self.server.register_function(self.submit)
857 self.server.register_function(self.authenticate)
858 self.server.register_function(self.checkjobs)
859 self.server.register_function(self.queue_remove)
860 self.server.register_function(self.queue_suspend)
861 self.server.register_function(self.queue_clean)
862 self.server.register_function(self.queue_delete)
863 self.server.register_function(self.queue_dataset_finish)
864 self.server.register_function(self.queue_dataset_toggle_debug)
865 self.server.register_function(self.queue_resume)
866 self.server.register_function(self.queue_reset)
867 self.server.register_function(self.queue_validate)
868 self.server.register_function(self.queue_setstatus)
869 self.server.register_function(self.queue_dataset_setstatus)
870 self.server.register_function(self.getstatus)
871 self.server.register_function(self.getdatasetstatus)
872 self.server.register_function(self.loaddict)
873 self.server.register_function(self.queue_retire)
874 self.server.register_function(self.daemon_suspend)
875 self.server.register_function(self.daemon_resume)
876 self.server.register_function(self.printsummary)
877 self.server.register_function(self.showrunlist)
878 self.server.register_function(self.download_config)
879 self.server.register_function(self.check_connection)
880 self.server.register_function(self.grid_add)
881 self.server.register_function(self.grid_suspend_dataset)
882 self.server.register_function(self.get_simcat_categories)
883
897
898
900 """
901 XMLRPC server class for submitting jobs to IceProd
902 job connect to server from compute nodes and make status updates
903 Similar to SoapTray class but runs CGI embeded in existing HTTP server
904 """
905
907
908 SoapTray.__init__(self,cfg)
909 self.cfg = cfg
910 self.use_ldap = self.cfg.getboolean('ldap','enable')
911 self.logger = logging.getLogger('SoapTray')
912 self.semaphore = DummySemaphore(verbose=1)
913 self.grid_id = 0
914 self.submithost = os.getenv('HOSTNAME')
915
916 self.use_ldap = self.cfg.getboolean('ldap','enable')
917 self.dbserver = self.cfg.get('database','server')
918 self.dbport = self.cfg.getint('database','port')
919 self.database = self.cfg.get('database','database')
920 self.username = self.cfg.get('database','username')
921 self.password = self.cfg.get('database','password')
922 self.grid_name = self.cfg.get('queue','name')
923 self.batchsys = self.cfg.get('queue','batchsys')
924 self.institution = self.cfg.get('info','INSTITUTION')
925 self.address = self.cfg.get('server','server')
926 self.port = self.cfg.getint('server','port')
927 self.rootdir = self.cfg.get('path','basedir')
928 self.datawarehouse = expandvars(self.cfg.get('path','datawarehouse',raw=True))
929
930 if self.cfg.has_option('queue','SUBMITHOST'):
931 self.submithost = self.cfg.get('queue','SUBMITHOST')
932
933 self.xmluri = 'http://www.icecube.wisc.edu/simulation/dtd/iceprod.v2.dtd'
934 self.server = rpc.MyCGIXMLRPCRequestHandler()
935
936 self.i3db = ConfigDB()
937 self.i3mondb = MonitorDB()
938
953
954 - def submit(self,sconfig,username,password,submitter,production=False):
955 """
956 Receive a remote request for a job submission
957 @param sconfig: pickled steering configuration
958 @param username: (needed for connecting to the configuration database
959 @param password: (needed for connecting to the configuration database
960 @return: Output generated by queue
961 """
962
963 i3db = ConfigDB()
964 if not self.auth_db(i3db,username, password,keep_open=True):
965 self.logger.info("Access denied for user %s" % username)
966 return fail("Access denied for user %s" % username)
967
968 self.logger.info("Handling submission from user %s" % username)
969
970 i3steering = cPickle.loads(sconfig)
971 i3q = None
972 if production:
973 try:
974 status,i3q = self.enqueue(i3steering,username,password,submitter)
975 except Exception, e:
976 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
977 self.logger.error('Caught %s: %s' % (sys.exc_type, str(e)))
978 return fail(str(e),e)
979 else:
980 return fail("cgi-soaptray does not support non-production jobs.")
981 del i3steering
982 return status,dumps(i3q),dumps(None)
983
984
985
987 """
988 XMLRPC server class for monitoring jobs
989 job connect to server from compute nodes and make status updates
990 Creates own HTTP server
991 """
992
994 """
995 Retrive dictionary entry
996 @param: key string
997 @return: string value
998 """
999 i3db = self.i3db.new()
1000 self.semaphore.acquire()
1001 i3db.connect()
1002 logger.debug("fetching entry %s " % key )
1003 value = cPickle.dumps(None)
1004 try:
1005 value = i3db.fetch_dict_value(key)
1006 logger.debug("fetched entry %s,%s " % (key,value) )
1007 except Exception,e:
1008 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1009 logger.error(e)
1010 i3db.disconnect()
1011 self.semaphore.release()
1012 return value
1013
1014
1015 - def getfile(self,key,dataset_id=0):
1016 """
1017 Retrive dictionary entry
1018 @param: key string
1019 @return: string value
1020 """
1021 i3db = self.i3db.new()
1022 self.semaphore.acquire()
1023 i3db.connect()
1024 value = cPickle.dumps(None)
1025 try:
1026 value = i3db.fetch_filename(key,dataset_id)
1027 value = tuple(value)
1028 except Exception,e:
1029 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1030 logger.error(e)
1031 i3db.disconnect()
1032 self.semaphore.release()
1033 return value
1034
1035 - def get_tarball(self,metaproject_name,metaproject_version,platform,gccversion,ppc=0):
1036 """
1037 Get best matching tarball in file repository
1038 @param metaproject_name
1039 @param metaproject_version
1040 @param platform
1041 @param gccversion
1042 @return: string path to tarball
1043 """
1044 i3db = self.i3db.new()
1045 i3db.connect()
1046 path = ''
1047 try:
1048 path = i3db.get_metaproject_tarball(metaproject_name,metaproject_version,platform,gccversion)
1049 except Exception,e:
1050 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1051 logger.error(e)
1052 i3db.disconnect()
1053 del i3db
1054 return path
1055
1056
1057
1059 """
1060 Download configuration from database
1061 """
1062 i3db = self.i3db.new()
1063 self.semaphore.acquire()
1064 i3db.connect()
1065 value = cPickle.dumps(None)
1066 try:
1067 i3config = i3db.download_config(dataset)
1068 value = cPickle.dumps(IceTrayXMLWriter(i3config,self.xmluri).getDOM())
1069 except Exception,e:
1070 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1071 logger.error(e)
1072 i3db.disconnect()
1073 self.semaphore.release()
1074 return value
1075
1076
1097
1098 - def start(self,hostname,dataset=0,job_id=0,key='',grid=0):
1099 """
1100 Change the status of a job to indicate it is currently running
1101 @param : hostname
1102 @param : dataset simdb_id
1103 @param : job_id
1104 @param : key a passkey to prevent processes from overriding entries
1105 @return: dataset_id,nproc,procnum
1106 """
1107 i3mondb = self.i3mondb.new()
1108 self.semaphore.acquire()
1109 i3mondb.connect()
1110 i3mondb.set_auto(True)
1111 logger.info("job %d.%d starting on %s" % (dataset,job_id,hostname))
1112 if not len(key): key = None
1113 try:
1114 if not grid: grid = self.grid_id
1115 dataset,jobs,queue_id = i3mondb.jobstart(hostname,grid,dataset,job_id,key)
1116 i3mondb.commit()
1117 except Exception,e:
1118 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1119 logger.error(e)
1120 dataset,jobs,queue_id = (0,0,0)
1121 i3mondb.disconnect()
1122 logger.info("job %d.%d started on %s" % (dataset,job_id,hostname))
1123 self.semaphore.release()
1124 return dataset,jobs,queue_id
1125
1126 - def ping(self,dataset_id,job_id,host,key='',tray=0,iter=0):
1127 """
1128 Let server know that job is still running
1129 @param dataset_id: runconfig index
1130 @param job_id: process number within dataset
1131 @param host: hostname of computing node
1132 """
1133 if not len(key): key = None
1134 logger.info("Received ping from %s with job %d.%d " % (host,dataset_id,job_id))
1135 logger.info("processing tray %d, iter %d " % (tray,iter))
1136 i3mondb = self.i3mondb.new()
1137 self.semaphore.acquire()
1138 i3mondb.connect()
1139 i3mondb.set_auto(True)
1140 try:
1141 retval = i3mondb.jobping(dataset_id,job_id,host,key,tray,iter)
1142 i3mondb.commit()
1143 except Exception,e:
1144 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1145 logger.error(e)
1146 retval = -1
1147 i3mondb.disconnect()
1148 self.semaphore.release()
1149 return retval
1150
1151 - def finish(self,dataset_id,job_id,stats,key='',mode=0):
1152 """
1153 Update monitoring for job and write statistics
1154 @param dataset_id: runconfig index
1155 @param job_id: process number within dataset
1156 @param stats: dictonary of stat entries
1157 @param key: security passkey assigned to job
1158 @param mode: if true, finalize job and set status to COPIED instead of
1159 setting its status to READYTOCOPY
1160 """
1161 if not len(key): key = None
1162 logger.info('job %d.%d finished (mode %u)' % (dataset_id,job_id,mode))
1163 i3mondb = self.i3mondb.new()
1164 self.semaphore.acquire()
1165 i3mondb.connect()
1166 i3mondb.set_auto(True)
1167 count = 10
1168 try:
1169 logger.debug("stats: "+str(stats))
1170 retval = i3mondb.jobfinish(dataset_id,job_id,cPickle.loads(stats),key,mode)
1171 if mode:
1172 i3mondb.jobfinalize(dataset_id,job_id,key,status='COPIED')
1173 i3mondb.commit()
1174 except Exception,e:
1175 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1176 logger.error(e)
1177 logger.error("failed to set finish status for job %d.%d." % (dataset_id,job_id))
1178 retval = -1
1179 i3mondb.disconnect()
1180 self.semaphore.release()
1181 logger.debug('finish - done')
1182 return retval
1183
1184 - def copying(self,dataset_id,job_id,key=''):
1185 """
1186 Update monitoring for job and write statistics
1187 @param dataset_id: runconfig index
1188 @param job_id: process number within dataset
1189 @param stats: dictonary of stat entries
1190 """
1191 if not len(key): key = None
1192 logger.info('job %d.%d is copying data' % (dataset_id,job_id))
1193 i3mondb = self.i3mondb.new()
1194 self.semaphore.acquire()
1195 i3mondb.connect()
1196 i3mondb.set_auto(True)
1197 count = 10
1198 try:
1199 logger.debug("setting job status to 'COPYING for %d.%d' " % (dataset_id,job_id))
1200 retval = i3mondb.jobfinalize(dataset_id,job_id,key,status='COPYING')
1201 i3mondb.commit()
1202 except Exception,e:
1203 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1204 logger.error(e)
1205 logger.error("failed to set copying status for job %d.%d." % (dataset_id,job_id))
1206 retval = -1
1207 i3mondb.disconnect()
1208 self.semaphore.release()
1209 logger.debug('copying - done')
1210 return retval
1211
1212
1213 - def abort(self,job_id,dataset_id,error,errormessage='',key='',stats=cPickle.dumps({})):
1214 """
1215 Reset any pending jobs to they get reprocesses.
1216 This would typically be run at startup in case the daemon
1217 crashed previously.
1218 """
1219 if not len(key): key = None
1220 logger.warn('aborting job %d.%d - %s ' % (dataset_id,job_id,errormessage))
1221 i3mondb = self.i3mondb.new()
1222 self.semaphore.acquire()
1223 i3mondb.connect()
1224 i3mondb.set_auto(True)
1225 retval = 1
1226 try:
1227 i3mondb.jobabort(job_id,dataset_id,error,errormessage,key,cPickle.loads(stats))
1228 i3mondb.commit()
1229 except Exception,e:
1230 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1231 logger.error(e)
1232 logger.error("failed to abort job %d.%d." % (dataset_id,job_id))
1233 retval = 0
1234 i3mondb.disconnect()
1235 self.semaphore.release()
1236 return retval
1237
1238 - def AddFileURL(self,dataset_id,job_id,url,md5sum,filesize,transfertime,key=''):
1239 """
1240 Add or change the global location of a file
1241 """
1242 if not len(key): key = None
1243 filename = os.path.basename(url)
1244 location = os.path.dirname(url)
1245 logger.debug('%06u.%06u: %s' % (dataset_id,job_id,url))
1246
1247 i3mondb = self.i3mondb.new()
1248 self.semaphore.acquire()
1249 i3mondb.connect()
1250 try:
1251 logger.info('SetFileURL: %06u.%06u:%s' % (dataset_id,job_id,url))
1252 retval = i3mondb.SetFileURL(job_id,dataset_id,location,filename,md5sum,filesize,transfertime,key)
1253 i3mondb.commit()
1254 retval = 1
1255 except Exception,e:
1256 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1257 logger.error(e)
1258 logger.error("failed to set url %s for job %d.%d." % (url,dataset_id,job_id))
1259 retval = 0
1260 except Exception,e:
1261 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1262 logger.error(e)
1263
1264 i3mondb.disconnect()
1265 self.semaphore.release()
1266 return retval
1267
1268 - def get_storage_url(self,dataset_id,queue_id,passkey='',storage_type='INPUT'):
1269 i3mondb = self.i3mondb.new()
1270 i3mondb.connect()
1271 try:
1272 self.logger.info("storage url for %u.%u" % (dataset_id,queue_id))
1273 urldict_s = cPickle.dumps(i3mondb.GetStorageURL(dataset_id,queue_id,passkey,storage_type))
1274 except Exception,e:
1275 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1276 logger.error(e)
1277 i3mondb.disconnect()
1278 raise
1279 i3mondb.disconnect()
1280 return urldict_s
1281
1282
1283
1285 """
1286 Change the status of a job to indicate it is currently running
1287 @param : dataset simdb_id
1288 @param : job_id
1289 @param : key a passkey to prevent processes from overriding entries
1290 @return: dataset_id,nproc,procnum
1291 """
1292 i3mondb = self.i3mondb.new()
1293 self.semaphore.acquire()
1294 i3mondb.connect()
1295 i3mondb.set_auto(False)
1296 logger.info("job %d.%d starting" % (dataset_id,queue_id))
1297 if not len(key): key = None
1298 try:
1299 dataset,jobs,queue_id = i3mondb.multipart_job_start(dataset_id,queue_id,key)
1300 if dataset_id != TASK_DATASET_ERROR_ID:
1301 i3mondb.commit()
1302 else:
1303 i3mondb.rollback()
1304 logger.info("job %d.%d starting error" % (dataset_id,queue_id))
1305 except Exception,e:
1306 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1307 logger.error(e)
1308 dataset,jobs,queue_id = (TASK_DATASET_ERROR_ID,0,0)
1309 i3mondb.disconnect()
1310 self.semaphore.release()
1311 return dataset,jobs,queue_id
1312
1331
1332 - def task_start(self,dataset_id,queue_id,taskname,tray,iter,hostname,key=''):
1333 i3mondb = self.i3mondb.new()
1334 self.semaphore.acquire()
1335 i3mondb.connect()
1336 i3mondb.set_auto(False)
1337 logger.info("job %d.%d starting task %s, tray %s, iter %s" \
1338 % (dataset_id,queue_id,taskname,tray,iter))
1339 logger.info("Checking that task parents have completed..")
1340 steering = Steering()
1341 i3mondb.download_tasks(dataset_id,steering)
1342 job = i3mondb.GetJob(dataset_id,queue_id)
1343 td = steering.GetTaskDefinition(taskname)
1344 for parent in td.GetParents():
1345 parent_td = steering.GetTaskDefinition(parent)
1346 logger.debug(parent_td.GetName())
1347 if not i3mondb.task_is_finished(parent_td.GetId(), job.GetDatabaseId()):
1348 logger.error("task id %s, '%s' job id %s" % (parent_td.GetId(),parent_td.GetName(),job.DatabaseId()))
1349 logger.error("Parent tasks for task %s for %u.%u have not completed" % (taskname,dataset_id,queue_id))
1350 i3mondb.disconnect()
1351 self.semaphore.release()
1352 return TASK_ERROR_ID
1353 logger.info("..OK")
1354
1355 if not len(key): key = None
1356 try:
1357 task_id = i3mondb.task_start(dataset_id,queue_id,taskname,tray,iter,hostname,key)
1358 if task_id != TASK_ERROR_ID:
1359 i3mondb.commit()
1360 else:
1361 i3mondb.rollback()
1362 except Exception,e:
1363 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1364 logger.error(e)
1365 task_id = TASK_ERROR_ID
1366 i3mondb.disconnect()
1367 self.semaphore.release()
1368 return task_id
1369
1373
1377
1381
1383 i3mondb = self.i3mondb.new()
1384 self.semaphore.acquire()
1385 i3mondb.connect()
1386 i3mondb.set_auto(False)
1387 if not len(key): key = None
1388 try:
1389 ret = i3mondb.task_update_status(task_id,status,key)
1390 if ret:
1391 i3mondb.commit()
1392 else:
1393 i3mondb.rollback()
1394 except Exception,e:
1395 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1396 logger.error(e)
1397 ret = False
1398 i3mondb.disconnect()
1399 self.semaphore.release()
1400 return ret
1401
1403 i3mondb = self.i3mondb.new()
1404 self.semaphore.acquire()
1405 i3mondb.connect()
1406 i3mondb.set_auto(False)
1407 logger.warn("task %d aborted" % task_id)
1408 if not len(key): key = None
1409 try:
1410 ret = i3mondb.task_abort(task_id,key)
1411 if ret:
1412 i3mondb.commit()
1413 else:
1414 i3mondb.rollback()
1415 except Exception,e:
1416 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1417 logger.error(e)
1418 ret = False
1419 i3mondb.disconnect()
1420 self.semaphore.release()
1421 return ret
1422
1424 stats = cPickle.loads(stats)
1425 i3mondb = self.i3mondb.new()
1426 self.semaphore.acquire()
1427 i3mondb.connect()
1428 i3mondb.set_auto(False)
1429 logger.info("task %d finished" % task_id)
1430 if not len(key): key = None
1431 try:
1432 ret = i3mondb.task_finish(task_id,stats,key)
1433 if ret:
1434 i3mondb.commit()
1435 else:
1436 i3mondb.rollback()
1437 except Exception,e:
1438 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
1439 logger.error(e)
1440 ret = False
1441 i3mondb.disconnect()
1442 self.semaphore.release()
1443 return ret
1444
1445
1446
1448 """
1449 For some reason SOAPpy throws an exception if we get an
1450 HTTP instead of HTTPS request when using SSL
1451 """
1452 if ssl_supported:
1453 try:
1454 server.serve_forever()
1455 except KeyboardInterrupt:
1456 logger.info("Received keyboard interrupt")
1457 logger.info("Exiting")
1458 os._exit(0)
1459
1460 except SSL.Error,e:
1461 logger.error("received: " + str(e))
1462 serve_forever(server)
1463 except socket.error,e:
1464 self.logger.error(str(e))
1465 os._exit(1)
1466 else:
1467 try:
1468 server.serve_forever()
1469 except KeyboardInterrupt:
1470 logger.info("Received keyboard interrupt")
1471 logger.info("Exiting")
1472 os._exit(0)
1473 except socket.error,e:
1474 self.logger.error(str(e))
1475 os._exit(1)
1476
1477 - def echo(self,msg):
1479
1480
1482 """
1483 Register daemon with database and update status
1484 """
1485 i3db = MonitorDB()
1486 if not i3db.authenticate(self.host,self.user,self.passwd,
1487 self.database,port=self.dbport,keep_open=True):
1488 raise Exception, 'unable to authenticate database user'
1489 self.grid_id = i3db.GetGridId(self.grid_name)
1490 i3db.disconnect()
1491 return self.grid_id
1492
1518
1526
1528 self.semaphore = semaphore
1529
1531
1532 self.cfg = cfg
1533 self.use_ldap = self.cfg.getboolean('ldap','enable')
1534 self.logger = logging.getLogger('SoapMon')
1535 self.grid_name = self.cfg.get('queue','name')
1536 address = self.cfg.get('monitoring','server')
1537 port = self.cfg.getint('monitoring','port')
1538 self.semaphore = DummySemaphore()
1539 self.usesecure = True
1540
1541 try:
1542 self.xmluri = self.cfg.get('path','uri')
1543 except:
1544 self.xmluri = 'http://www.icecube.wisc.edu/simulation/dtd/iceprod.v2.dtd'
1545
1546 if self.cfg.has_option('monitoring','USESSL'):
1547 self.usesecure = self.cfg.getboolean('monitoring','USESSL')
1548 if self.cfg.has_option('security','USESSL'):
1549 self.usesecure = self.usesecure and self.cfg.getboolean('security','USESSL')
1550 if self.usesecure:
1551 try:
1552 cert = expandvars(self.cfg.get('security','SSLCERT'))
1553 key = expandvars(self.cfg.get('security','SSLKEY'))
1554 if not os.path.exists(cert):
1555 self.logger.fatal("Cannot find SSL certificate in %s" % cert)
1556 if not os.path.exists(key):
1557 self.logger.fatal("Cannot find SSL key in %s" % key)
1558
1559 sslctx = SSL.Context(SSL.SSLv23_METHOD)
1560 sslctx.use_privatekey_file (key)
1561 sslctx.use_certificate_file(cert)
1562 self.server = rpc.ThreadedSecureXMLRPCServer((address, port), ssl_context = sslctx)
1563 self.logger.info("Monitoring server running **encrypted** on addr:%s:%d" % (address,port))
1564 except Exception,e:
1565 self.logger.warn(e)
1566 self.server = rpc.ThreadedXMLRPCServer((address, port))
1567 self.logger.info("Monitoring server running **un-encrypted** on addr:%s:%d" % (address,port))
1568 else:
1569 self.server = rpc.ThreadedXMLRPCServer((address, port))
1570 self.logger.info("Monitoring server running **un-encrypted** on addr:%s:%d" % (address,port))
1571
1572 self.host = self.cfg.get('database','server')
1573 self.user = self.cfg.get('database','username')
1574 self.passwd = self.cfg.get('database','password')
1575 self.database = self.cfg.get('database','database')
1576 self.dbport = self.cfg.getint('database','port')
1577
1578 self.i3db = ConfigDB()
1579 self.i3db.authenticate(self.host,self.user,self.passwd,
1580 self.database,port=self.dbport,keep_open=False)
1581
1582 self.i3mondb = MonitorDB()
1583 self.i3mondb.authenticate(self.host,self.user,self.passwd,
1584 self.database,port=self.dbport,keep_open=False)
1585
1586
1587
1589 """
1590 XMLRPC server class for monitoring jobs
1591 job connect to server from compute nodes and make status updates
1592 Similar to Monitor class but runs CGI embeded in existing HTTP server
1593
1594 """
1595
1597
1598 self.semaphore = DummySemaphore(verbose=1)
1599 self.cfg = cfg
1600 self.use_ldap = self.cfg.getboolean('ldap','enable')
1601 self.grid_id = 79
1602
1603 self.xmluri = 'http://www.icecube.wisc.edu/simulation/dtd/iceprod.v2.dtd'
1604 self.server = rpc.MyCGIXMLRPCRequestHandler()
1605
1606 self.host = self.cfg.get('database','server')
1607 self.user = self.cfg.get('database','username')
1608 self.passwd = self.cfg.get('database','password')
1609 self.database = self.cfg.get('database','database')
1610 self.dbport = self.cfg.getint('database','port')
1611
1612 self.i3db = ConfigDB()
1613 self.i3db.authenticate(self.host,self.user,self.passwd,
1614 self.database,port=self.dbport,keep_open=False)
1615
1616 self.i3mondb = MonitorDB()
1617 self.i3mondb.authenticate(self.host,self.user,self.passwd,
1618 self.database,port=self.dbport,keep_open=False)
1619
1623
1625 """
1626 XMLRPC server class for monitoring jobs
1627 job connect to server from compute nodes and make status updates
1628 Similar to Monitor class but runs CGI embeded in existing HTTP server
1629
1630 """
1631
1633
1634 self.semaphore = DummySemaphore(verbose=1)
1635 self.cfg = cfg
1636
1637 self.xmluri = 'http://www.icecube.wisc.edu/simulation/dtd/iceprod.v2.dtd'
1638 self.server = rpc.MyCGIXMLRPCRequestHandler()
1639
1640 self.url = self.cfg.get('proxy','soapmon')
1641 import xmlrpclib
1642 self.client = xmlrpclib.ServerProxy(self.url)
1643 self.logger = logging.getLogger('SoapMonProxy')
1644
1649
1650
1653
1654 - def getfile(self,key,dataset_id=0):
1656
1657 - def get_tarball(self,metaproject_name,metaproject_version,platform,gccversion,ppc=0):
1658 self.logger.info("metaproject %s %s %s %s" % (metaproject_name,metaproject_version,platform,gccversion))
1659 return self.client.get_tarball(metaproject_name,metaproject_version,platform,gccversion,ppc)
1660
1663
1666
1667 - def start(self,hostname,dataset=0,job_id=0,key='',grid=0):
1670
1671 - def ping(self,dataset_id,job_id,host,key='',tray=0,iter=0):
1674
1675 - def finish(self,dataset_id,job_id,stats,key='',mode=0):
1678
1679 - def copying(self,dataset_id,job_id,key=''):
1681
1682 - def abort(self,job_id,dataset_id,error,errormessage='',key='',stats=cPickle.dumps({})):
1685
1686 - def AddFileURL(self,dataset_id,job_id,url,md5sum,filesize,transfertime,key=''):
1688
1689 - def get_storage_url(self,dataset_id,queue_id,passkey='',storage_type='INPUT'):
1691
1694
1697
1698 - def task_start(self,dataset_id,queue_id,taskname,tray,iter,hostname,key=''):
1700
1703
1706
1709
1712
1715
1718
1719 - def echo(self,msg):
1721