1
2
3 """
4 Provides an interface with MySQL job monitoring database
5
6 copyright (c) 2008 the icecube collaboration
7
8 @version: $Revision: $
9 @date: $Date: $
10 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
11 """
12 import re,sys
13 import getpass
14 import string
15 import random
16 import iceprod
17 import logging
18 import types
19 import copy
20 import time
21 from cPickle import loads,dumps
22
23 from iceprod.core import logger
24 from iceprod.core.dataclasses import *
25 from iceprod.core.paramdb import *
26 from iceprod.core.metadata import *
27 from iceprod.core.constants import *
28
29 import MySQLdb
30 from MySQLdb import OperationalError
31
32 import logging
33
34
36
37 logger = logging.getLogger('IceProdDB')
38
40 """
41 Constructor
42
43 """
44 self._conn = None
45 self._connected = False
46 self._auto = False
47 self.host_ = None
48 self.usr_ = None
49 self.passwd_ = None
50 self.db_ = None
51 self.port_ = 3306
52 self.auth_function = lambda x: None
53 self.version_regex = re.compile(r'[A-Z][0-9][0-9]-[0-9][0-9]-[0-9][0-9]')
54 return
55
57 if value:
58 return "\'%s\'" % value
59 return 'NULL'
60
62 if value == 'NULL':
63 return None
64 return value
65
67 if value == 'NULL' or not value:
68 return 0
69 return int(value)
70
72 return txt.replace("\'","\\\'").replace("\"","\\\"")
73
74 - def get(self,name,value):
75 sys.stdout.write("%s [%s] : " % (name,value))
76 str = sys.stdin.readline().strip()
77 if str:
78 return str
79 else:
80 return value
81
83 """
84 Create a copy of this instance
85 """
86 newconn = IceProdDB()
87 newconn.host_ = self.host_
88 newconn.usr_ = self.usr_
89 newconn.passwd_ = self.passwd_
90 newconn.db_ = self.db_
91 newconn._connected = False
92 return newconn
93
95 """
96 Ping server to reactivate connection
97 """
98 if not self.isconnected():
99 time.sleep(50)
100 raise OperationalError,"Not connected to database"
101 try:
102 self._conn.ping()
103 except OperationalError,e:
104 self.logger.error('%s: will attempt to reconnect.' % str(e))
105 self._conn = MySQLdb.connect(self.host_,self.usr_,self.passwd_,self.db_,self.port_)
106 self._connected = True
107
109 if self.isconnected():
110 self.ping()
111 return self._conn.cursor (MySQLdb.cursors.DictCursor)
112 else:
113 self.logger.warn('Not connected to database. Attempting to reconnect..')
114 self.logger.info('delaying for 10 sec.')
115 time.sleep(10)
116 raise OperationalError,"Not connected to database"
117
119 self.logger.debug("auto-commit set to %s" % self._auto)
120 if not self._auto:
121 return self._conn.commit()
122
124 logger.debug("rolling back transaction")
125 return self._conn.rollback()
126
127 - def execute(self,cursor,sql,commit=True):
128
129 for i in range(10):
130 try:
131 cursor.execute(sql);
132 rowcount = self._conn.affected_rows()
133 if commit: self.commit()
134 return rowcount;
135 except OperationalError,e:
136 self.logger.error(e);
137 return 0
138
141
143 self.auth_function = func
144
145 - def authenticate(self,host,usr,passwd,db,keep_open=False,port=3306):
146 """
147 Database authentication
148 @param host: ip or name of MySQL host
149 @param usr: username
150 @param passwd: account password
151 @param db: name of database
152 @param keep_open: don't close connection after authenticating
153 """
154 self.host_ = host
155 self.usr_ = usr
156 self.passwd_ = passwd
157 self.db_ = db
158 self.port_ = port
159 try:
160 self.connect()
161 if not keep_open:
162 self.disconnect()
163 return True
164 except Exception,e:
165 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
166 self.logger.error('%s: %s' % (sys.exc_type,e))
167 self.logger.error('Authentication failed: %s@%s' % (usr,host))
168 return False
169
171 """
172 Simple database authentication test
173 """
174 try:
175 self._conn = MySQLdb.connect(host,usr,passwd,db,port=port)
176 self._conn.close()
177 return True
178 except Exception,e:
179 return False
180
182 """ Set auto-commit """
183 self._auto = autocommit
184 try:
185 self._conn.autocommit(self._auto)
186 except Exception,e:
187 self.logger.warn(e)
188
190 """ Connect to database """
191 if self.isconnected():
192 try:
193 self.ping()
194 return
195 except Exception ,e:
196 self.logger.error(str(e))
197 self._connected = False
198 if not self.host_ or not self.usr_ or not self.passwd_ or not self.db_:
199 self.auth_function(self)
200 return
201 try:
202 self._conn = MySQLdb.connect(self.host_,self.usr_,self.passwd_,self.db_,self.port_)
203 self._connected = True
204 except Exception,e:
205 self.logger.debug('%s: %s' % (sys.exc_type,e))
206 self.logger.error('Connection failed : %s@%s' % (self.usr_,self.host_))
207 self.auth_function(self)
208 return
209
211 """ Disconnect from database """
212 if self._connected:
213 self._conn.close()
214 self._connected = False
215
217 return self._conn and self._connected
218
219
220 - def mkkey(self,minsize,maxsize):
221 """
222 Generate random alphanumeric sequence
223 """
224 key = ''
225 seq = ['a','b','c','d','e','f','g','h','i','j',
226 'k','l','m','n','o','p','q','r','s','t',
227 'u','v','w','x','y','z']
228 seq += map(string.upper,seq)
229 seq += range(0,9)
230 r = random.Random()
231 size = r.randint(minsize,maxsize)
232 for i in range(0,size):
233 key += str(r.choice(seq))
234 return key
235
236
258
259
261
262 logger = logging.getLogger('ConfigDB')
263
265 """
266 Constructor
267 """
268 IceProdDB.__init__(self)
269 self.submitter = ''
270 self.temporary_storage = None
271 self.global_storage = None
272 self.metaproject_dict = { }
273 self.institution = ''
274 return
275
277 """
278 Create a copy of this instance
279 """
280 newconn = ConfigDB()
281 newconn.host_ = self.host_
282 newconn.usr_ = self.usr_
283 newconn.passwd_ = self.passwd_
284 newconn.db_ = self.db_
285 newconn.port_ = self.port_
286 newconn._connected = False
287 return newconn
288
290 self.submitter = submitter
291
293 self.institution = institution
294
296 """
297 Set the temporary path for dataset.
298 @param path:
299 """
300 self.temporary_storage = path
301
303 """
304 Set the global path in datawarehouse for dataset.
305 @param path:
306 """
307 self.global_storage = path
308
309
311 """
312 update icetray configuration to database
313 @param dataset_id: primary key in production database
314 @param param_dict: Dictionary of parameters to update
315 """
316 try:
317 sql = """
318 SELECT * from dataset
319 WHERE dataset_id = %d """ % dataset_id
320
321 cursor = self.getcursor()
322 cursor.execute(sql)
323 result_set = cursor.fetchall();
324
325 r = result_set[0];
326 if r["jobs_completed"] == r['jobs_submitted']:
327 param_dict["status"] = 'COMPLETE'
328 self.logger.info("status %d" % param_dict['status'])
329
330 elif r["jobs_failed"] > 0 :
331 param_dict["status"] = 'ERRORS'
332 self.logger.info("status %d" % param_dict['status'])
333
334 except Exception, e:
335 self.logger.error(" %s could not fetch dataset %d" % (e,dataset_id))
336
337 try:
338 sql = """UPDATE dataset SET """
339
340 for key in param_dict.keys():
341 sql += " %s=%s, " % (key,param_dict[key])
342 sql += " enddate=NOW() "
343 sql += " WHERE dataset_id = %d" % dataset_id
344
345 self.logger.debug(sql)
346 cursor = self.getcursor()
347 cursor.execute(sql)
348 self.commit()
349
350 except Exception, e:
351 self.logger.error(str(e) + " rolling back transaction" )
352 self._conn.rollback()
353 raise Exception, e
354
355
357 """
358 icetray configuration to database
359 @param steering: IceTrayConfig object containing configuration
360 @param ticket: optional ticket ID to relate dataset to
361 @param template: Whether this is a template or not
362 @return: primary key for run on config db
363 """
364
365
366
367 dataset_id = None
368
369 try:
370
371 debug = steering.GetParameter('DEBUG')
372 geo = steering.GetParameter('geometry')
373 if debug:
374 debug = int(debug.GetValue())
375 else:
376 debug = 0
377
378 simcat_id = self.loadsimcat(steering)
379 parent_id = steering.GetParentId()
380 status = 'PROCESSING'
381 if template:
382 status = 'TEMPLATE'
383
384 sql = """INSERT IGNORE INTO dataset
385 (
386 simcat_id,
387 startdate,
388 username,
389 institution,
390 description,
391 status,
392 temporary_storage,
393 global_storage,
394 jobs_submitted ,
395 ticket_number,
396 parent_id,
397 debug,
398 dataset_category
399 )
400 VALUES """
401
402 desc = steering.GetDescription()
403 sql += """(
404 %d,
405 NOW(),
406 \'%s\',
407 \'%s\',
408 \'%s\',
409 \'%s\',
410 \'%s\',
411 \'%s\',
412 %d,
413 %d,
414 %d,
415 %d,
416 \'%s\')""" % \
417 (
418 simcat_id,
419 self.submitter,
420 self.institution,
421 re.sub('\'','\\\' ',desc),
422 status,
423 self.temporary_storage,
424 self.global_storage,
425 int(maxjobs),
426 ticket,
427 parent_id,
428 debug,
429 steering.GetDatasetType()
430 )
431
432 sql = re.sub('\s+',' ',sql)
433 cursor = self.getcursor()
434 cursor.execute(sql)
435 dataset_id = self.insert_id()
436
437 self.load_steering(dataset_id,steering)
438 self.load_steering_dependencies(dataset_id,steering)
439 self.load_tasks(dataset_id,steering)
440 self.load_batch_options(dataset_id,steering)
441 self.load_externals(dataset_id,steering)
442
443 tray_index=0
444 tsql = " INSERT INTO tray "
445 tsql += " (dataset_id,tray_index,inputevents,iterations,name) "
446 tsql += " VALUES (%s, %s, %s, %s, %s)"
447 for i3config in steering.GetTrays():
448
449 params = (dataset_id, tray_index, \
450 int(i3config.GetEvents()), \
451 int(i3config.GetIterations()),
452 self.nullify(i3config.GetName())
453 )
454 cursor.execute(tsql, params)
455 tray_id = self._conn.insert_id()
456
457 funcs = {'input': i3config.GetInputFiles, \
458 'output': i3config.GetOutputFiles}
459 files = []
460 for type, func in funcs.iteritems():
461 for file in func():
462 files.append((tray_id, type, file.GetName(), int(file.IsPhotonicsTable())))
463 if files:
464 fsql = " INSERT INTO tray_files"
465 fsql += " (tray_id, type, name, photonics)"
466 fsql += " VALUES (%s, %s, %s, %s)"
467 cursor.executemany(fsql, files)
468
469 self.load_projects(dataset_id,i3config,tray_index)
470 self.load_pre(dataset_id,i3config,tray_index)
471 self.load_services(dataset_id,i3config,tray_index)
472 self.load_modules(dataset_id,i3config,tray_index)
473 self.load_connections(dataset_id,i3config,tray_index)
474 self.load_post(dataset_id,i3config,tray_index)
475 tray_index+=1
476
477 if geo:
478 geo_unique = []
479 for g in geo.GetValue().replace('+',' ').replace('and',' ').replace(',',' ').split():
480 if g and g not in geo_unique:
481 geo_unique.append(g)
482 self.load_geometry(g,dataset_id)
483
484 except Exception, e:
485 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
486 self.logger.error(str(e) + " rolling back transaction" )
487 if not self._conn == None:
488 self._conn.rollback()
489 raise Exception, e
490
491
492 self.commit()
493 self.logger.debug("Successful load configuration to database" )
494
495 return dataset_id
496
498 """
499 Update status of dataset
500 """
501 cursor = self.getcursor()
502 sql = " INSERT IGNORE INTO urlpath "
503 sql += " (dataset_id, name, path) "
504 sql += " VALUES "
505 cm = ""
506 sql1 = ""
507 for p in steering.GetParameters():
508 if p.GetName().startswith("TARGET::"):
509 sql1 += "%s (%d, '%s', '%s') " % (cm,dataset,p.GetName().replace("TARGET::",""),p.GetValue())
510 cm =","
511 if sql1:
512 cursor.execute(sql+sql1)
513
514
516 """
517 Get list of sim_cat names
518 """
519 cursor = self.getcursor()
520 sql = " SELECT category FROM simcat "
521 cursor.execute(sql)
522 return map(lambda x: x['category'],cursor.fetchall());
523
524
526 """
527 Get a list of new datasets.
528 """
529 cursor = self.getcursor()
530 sql = " SELECT dataset_id,jobs_submitted FROM dataset "
531 sql += " WHERE status='PROCESSING' "
532 sql += " AND verified='TRUE' "
533 self.logger.debug(sql)
534 cursor.execute(sql)
535 self.commit()
536 return cursor.fetchall();
537
539 """
540 Load simulation category
541 @return: index of category
542 """
543 self.logger.debug("loading simulation category")
544 cursor = self.getcursor()
545
546 sql = " INSERT INTO dataset_param "
547 sql += " (dataset_id,name,value) "
548 sql += " VALUES "
549 cm = ''
550 for item in paramdict.items():
551 sql += " %s(%d,'%s','%s') " % (cm,dataset,item[0],item[1])
552 cm = ','
553 cursor.execute(sql)
554 self.commit()
555
556 if paramdict.has_key('geometry'):
557 self.logger.debug("setting geometry")
558 sql = " UPDATE dataset SET geometry = '%s' " % paramdict['geometry']
559 sql += " WHERE dataset_id = %d " % dataset
560 cursor.execute(sql)
561 self.commit()
562
564 self.logger.debug("loading geometry information")
565 cursor = self.getcursor()
566 sql = " INSERT IGNORE INTO geometry "
567 sql += " (dataset_id,name) "
568 sql += " VALUES (%u,'%s') " % (dataset,geo)
569 self.logger.info(sql)
570 cursor.execute(sql)
571
572
574 """
575 Load simulation category
576 @return: index of category
577 """
578 self.logger.debug("loading simulation category")
579 category = steering.GetCategory()
580
581 sql = """
582 SELECT simcat_id from simcat
583 WHERE category = '%s' """ % category
584
585 cursor = self.getcursor()
586 cursor.execute(sql)
587 result_set = cursor.fetchall();
588
589 if len(result_set) > 0:
590 simcat_id = result_set[0]['simcat_id']
591 else:
592
593 sql = """INSERT IGNORE INTO simcat
594 (category) VALUES ('%s')""" % category
595
596 cursor = self.getcursor()
597 cursor.execute(sql)
598 simcat_id = self.insert_id()
599
600 return simcat_id
601
603 """
604 Load external programs to run prior to icetray
605 @param dataset_id: primary key for run on config db
606 """
607 self.logger.debug("loading externals")
608 externs = steering.GetExterns()
609 if not externs:
610 return
611
612 sql = "INSERT IGNORE INTO extern ("
613 sql += " name,command,version,description,arguments, "
614 sql += " extern.infile,extern.outfile, extern.errfile, "
615 sql += " steering,steering_name, dataset_id) "
616 sql += " VALUES "
617
618 cm = ''
619 for e in externs:
620 sql += "%s\n(" % cm
621 sql += "'%s'," % e.GetName()
622 sql += "'%s'," % e.GetExec()
623 sql += "%s," % self.nullify(e.GetVersion())
624 sql += "%s," % self.nullify(e.GetDescription())
625 sql += "%s," % self.nullify(e.GetArgs())
626 sql += "%s," % self.nullify(e.GetInFile())
627 sql += "%s," % self.nullify(e.GetOutFile())
628 sql += "%s," % self.nullify(e.GetErrFile())
629 if e.GetSteering():
630 sql += "'%s'," % e.GetSteering()[0].GetText()
631 sql += "'%s'," % e.GetSteering()[0].GetName()
632 else:
633 sql += "NULL,NULL,"
634 sql += "%d)" % dataset_id
635 cm = ','
636
637 sql = re.sub('\s+',' ',sql)
638 cursor = self.getcursor()
639 self.logger.debug(sql)
640 cursor.execute(sql)
641 return
642
643
645 """
646 Load projects to database
647 @param dataset_id: primary key for run on config db
648 """
649 self.logger.debug("loading projects")
650
651
652 load_index=0
653 for mproj in i3config.GetMetaProjectList():
654 mpid = self.load_metaproject(mproj,dataset_id,tray_index,load_index)
655 load_index += 1
656
658 """
659 Load steering parameters to database
660 @param dataset_id: primary key for run on config db
661 """
662
663 sql = """INSERT IGNORE INTO steering_parameter
664 (type, name, value, description,dataset_id) VALUES """
665
666 cm = ''
667 for p in steering.GetParameters():
668 type = p.GetType()
669 name = p.GetName()
670 value = p.GetValue()
671 desc = p.GetDescription()
672 sql += "%s\n (\'%s\',\'%s\',\'%s\',\'%s\',%d)" % \
673 (cm,type,name,value,desc,dataset_id)
674 cm = ','
675
676 sql = re.sub('\s+',' ',sql)
677 cursor = self.getcursor()
678 cursor.execute(sql)
679
681 """
682 Load file dependencies in steering element to database
683 @param dataset_id: primary key for run on config db
684 """
685 dependencies = steering.GetDependencies()
686 if not dependencies:
687 return
688
689 sql = """INSERT IGNORE INTO steering_dependency
690 (filename, dataset_id) VALUES """
691
692 cm = ''
693 for d in dependencies:
694 sql += "%s\n (\'%s\',%d)" % (cm,d,dataset_id)
695 cm = ','
696
697 sql = re.sub('\s+',' ',sql)
698 cursor = self.getcursor()
699 cursor.execute(sql)
700 return
701
703 """
704 Load tasks in steering element to database, used for
705 multi-part simulation jobs
706 @param dataset_id: primary key for run on config db
707 """
708 tasks = steering.GetTaskDefinitions()
709 if not tasks:
710 return
711
712
713 sql = "INSERT INTO task_def (dataset_id,name,reqs,parallel,photonics) VALUES (%s,%s,%s,%s,%s)"
714 inserts = []
715 for name,task in tasks.items():
716 reqs = task.GetRequirements()
717 parallel = int(task.ParallelExecutionEnabled())
718 photonics = int(task.UsesPhotonics())
719 self.logger.debug(sql % (dataset_id,name,reqs,parallel,photonics))
720 self.logger.debug("task %s added to DB, parallel: %s, photonics: %s, reqs: %s" \
721 % (name,parallel,photonics,reqs))
722 inserts.append((dataset_id,name,reqs,parallel,photonics))
723 self.logger.debug(inserts)
724 cursor = self.getcursor()
725 cursor.executemany(sql, inserts)
726 self.logger.debug("task definitions added")
727
728
729 relationship_sql = "INSERT INTO task_def_rel" \
730 + " (parent_task_def_id,child_task_def_id)" \
731 + " VALUES (%s, %s)"
732 tray_sql = "INSERT INTO task_def_tray (task_def_id,idx,iter)" \
733 + " VALUES (%s,%s,%s)"
734 id_sql = "SELECT task_def_id FROM task_def WHERE dataset_id = %s" \
735 + " AND name = %s"
736 for name,task in tasks.items():
737 cursor.execute(id_sql, (dataset_id,name))
738 row = cursor.fetchone()
739 if not row:
740 self.logger.error("task %s didn't get inserted into DB" % \
741 name)
742 cursor.rollback()
743 return
744 task_id = row['task_def_id']
745
746
747 inserts = []
748 parents = task.GetParents()
749 for parent in parents:
750 self.logger.debug("task %s has parent %s" % (name,parent))
751 cursor.execute(id_sql, (dataset_id,parent))
752 row = cursor.fetchone()
753 if not row:
754 self.logger.error("referenced parent task %s not found in DB" % \
755 parent)
756 cursor.rollback()
757 return
758 parent_id = row['task_def_id']
759 inserts.append((parent_id,task_id))
760 cursor.executemany(relationship_sql, inserts)
761
762
763 inserts = []
764 trays = task.GetTrays()
765 for index,tray in trays.items():
766 for iter in tray.GetIters():
767 self.logger.debug("task %s has tray %s iter %s" \
768 % (name,index,iter))
769 inserts.append((task_id,index,iter))
770 cursor.executemany(tray_sql, inserts)
771 self.logger.debug("added all tasks")
772 self.commit()
773 return
774
776 """
777 Load batch system options from steering to database
778 @param dataset_id: primary key for run on config db
779 """
780 batchopts = steering.GetBatchOpts()
781 if not batchopts:
782 return
783
784 sql = """INSERT IGNORE INTO batch_option
785 (name, type, value, dataset_id) VALUES """
786
787 cm = ''
788 for b in batchopts:
789 name = b.GetName()
790 value = b.GetValue()
791 type = b.GetType()
792 sql += "%s\n (\'%s\',\'%s\',\'%s\',%d)" % \
793 (cm,name,type,value,dataset_id)
794 cm = ','
795
796 sql = re.sub('\s+',' ',sql)
797 cursor = self.getcursor()
798 cursor.execute(sql)
799
800
801 - def load_project(self,project,dataset_id,metaproject_id,tray_index,load_index):
802 """
803 Load project to database
804 @param project: the Project object to be loaded
805 @param dataset_id: primary key for run on config db
806 @return: primary key for projects table on config db
807
808 """
809 cursor = self.getcursor()
810 pid = self.fetch_project_id(project.GetName(), project.GetVersion())
811 self.logger.debug("%s.%s pid is %s" % (project.GetName(),project.GetVersion(),pid))
812 if not pid:
813 ver = project.GetVersion()
814 name = project.GetName()
815 sql = " INSERT IGNORE INTO project "
816 if isinstance(ver,types.StringTypes):
817 sql += "(name, versiontxt,major_version,minor_version,patch_version) "
818 sql += ' VALUES '
819
820 vt = ('00','00','00')
821
822
823 legacy_ver = self.version_regex.search(ver)
824 if legacy_ver:
825 legacy_ver = legacy_ver.group(0).replace('V','')
826 vt = legacy_ver.split('-')
827 sql += "('%s','%s','%s','%s','%s')" % (name,ver,vt[0],vt[1],vt[2])
828 self.logger.debug(sql)
829 else:
830 raise Exception, "incompatible version type: %s" % type(version)
831 cursor.execute(sql.strip())
832 pid = self.insert_id()
833 self.logger.debug("After insert: %s.%s pid is %s" % (project.GetName(),project.GetVersion(),pid))
834 if cursor.rowcount:
835 self.logger.debug("inserted id %d" % pid)
836 else:
837 self.logger.warn("could not load project %s " % name)
838 sql = " INSERT IGNORE INTO mp_pivot "
839 sql += " (project_id, metaproject_id) VALUES (%d,%d)" % (pid,metaproject_id)
840 cursor.execute(sql.strip())
841
842 project.SetId(pid)
843 sql = " INSERT INTO project_pivot "
844 sql += " (project_id, dataset_id,tray_index,load_index) "
845 sql += " VALUES (%d,%d,%d,%d)" % (pid,dataset_id,tray_index,load_index)
846 cursor.execute(sql.strip())
847 return pid
848
850 """
851 Retrieve the project_id for a given project
852 @param project_name: name of library
853 @param project_version: version string
854 """
855 sql = " SELECT project_id "
856 sql += " FROM project "
857 sql += " WHERE name = '%s' " % project_name
858 sql += " AND versiontxt = '%s' " % project_version
859
860 cursor = self.getcursor()
861 cursor.execute(sql.strip())
862
863 result_set = cursor.fetchall();
864 if result_set:
865 return result_set[0]['project_id']
866
867
868
870 """
871 Load cross-references between projects (1) and depency projects
872 @param project: Project object
873 """
874
875 sql = """
876 INSERT IGNORE INTO project_depend
877 (project_id,metaproject_id,dependency_id) VALUES
878 """
879 cm = ''
880 pid = project.GetId()
881 for p in project.GetDependencies():
882 self.logger.debug("%s - getting project dependency: %s" % \
883 (project.GetName(),p))
884 if i3config.GetProject(p):
885 did = i3config.GetProject(p).GetId()
886 sql += "%s\n(%s,%s,%s)" % (cm,pid,metaproject_id,did)
887 cm = ','
888
889 if not cm:
890 return
891
892 sql = re.sub('\s+',' ',sql)
893 cursor = self.getcursor()
894 cursor.execute (sql)
895
896 self.logger.debug(self.insert_id())
897 self.logger.debug(
898 "%d project_dependency rows were inserted" % cursor.rowcount)
899 return
900
901
903 """
904 Load metaproject to database
905 @param metaproject: the Project object to be loaded
906 @param dataset_id: primary key for run on config db
907 @return: primary key for projects table on config db
908
909 """
910 name = metaproject.GetName()
911 version = metaproject.GetVersion()
912 mpid = self.fetch_metaproject_id(name, version)
913
914 if not mpid:
915 raise Exception, "metaproject '%s-%s' not found." % (name,str(version))
916
917
918 sql = " INSERT IGNORE INTO metaproject_pivot "
919 sql += " (metaproject_id, dataset_id,tray_index,load_index) "
920 sql += " VALUES (%d,%d,%d,%d)""" % (mpid,dataset_id,tray_index,load_index)
921
922 sql = re.sub('\s+',' ',sql)
923 cursor = self.getcursor()
924 cursor.execute(sql)
925 metaproject.SetId(mpid)
926
927 project_load_index=0
928 for proj in metaproject.GetProjectList():
929 self.load_project(proj,dataset_id,mpid,tray_index,project_load_index)
930 self.logger.debug("%s.%s.GetId() = %s" %(proj.GetName(),proj.GetVersion(),proj.GetId()))
931 project_load_index+=1
932
933 return mpid
934
936 """
937 Load cross-references between modules (1) and depency projects
938 @param module: Module object
939 """
940
941 sql = """
942 INSERT IGNORE INTO module_dependency
943 (module_id,project_id) VALUES
944 """
945 cm = ''
946 for p in module.GetProjectList():
947 project = i3config.GetProject(p.GetName())
948 if project:
949 self.logger.debug("%s - getting module dependency: %s" % \
950 (module.GetName(),project.GetName()))
951 sql += "%s\n(%d,%d)" % (cm,module_id,project.GetId())
952 cm = ','
953 else:
954 self.logger.error("project %s not found" % p.GetName())
955
956 if not cm:
957 return
958
959 self.logger.debug(sql.strip())
960 cursor = self.getcursor()
961 cursor.execute (sql)
962
963 self.logger.debug(self.insert_id())
964 self.logger.debug("%d module_dependency rows were inserted" % cursor.rowcount)
965 return
966
967
969 """
970 Load module connections to database
971 @param dataset_id: primary key for run on config db
972 """
973 for con in i3config.GetConnections():
974 cid = self.load_connection(con,dataset_id,tray_index)
975
977 """
978 Load connection to database
979 @param connection: the Connection object to be loaded
980 @param dataset_id: primary key for run on config db
981 @return: primary key for projects table on config db
982
983 """
984 sql = """INSERT IGNORE INTO `connection`
985 (source, outbox, destination, inbox,
986 dataset_id,tray_index) VALUES """
987
988
989 source = connection.GetOutbox().GetModule()
990 outbox = connection.GetOutbox().GetBoxName()
991 destination = connection.GetInbox().GetModule()
992 inbox = connection.GetInbox().GetBoxName()
993
994 sql += "(\'%s\',\'%s\',\'%s\',\'%s\',%d,%d)" % \
995 (source,outbox,destination,inbox,dataset_id,tray_index)
996 sql = re.sub('\s+',' ',sql)
997
998 cursor = self.getcursor()
999 cursor.execute(sql)
1000 cid = self.insert_id()
1001 if cursor.rowcount:
1002 self.logger.debug("inserted id %d into connections table" % cid)
1003 for mesg in cursor.messages:
1004 self.logger.debug("connections: %s " % mesg)
1005 else:
1006 for mesg in cursor.messages:
1007 self.logger.error("connections: %s " % mesg)
1008 return cid
1009
1010 - def load_pre(self,dataset_id,i3config,tray_index):
1011 """
1012 Load IceProd pre modules into the database.
1013 @param dataset_id: primary key for run on config db
1014 """
1015 load_index=0
1016 for module in i3config.GetIceProdPres():
1017 self.load_module(module,dataset_id,load_index,tray_index,i3config,'iceprod','pre')
1018 self.load_params(module,dataset_id,tray_index)
1019 load_index +=1
1020
1021 - def load_post(self,dataset_id,i3config,tray_index):
1022 """
1023 Load IceProd post modules into the database.
1024 @param dataset_id: primary key for run on config db
1025 """
1026 load_index=0
1027 for module in i3config.GetIceProdPosts():
1028 self.load_module(module,dataset_id,load_index,tray_index,i3config,'iceprod','post')
1029 self.load_params(module,dataset_id,tray_index)
1030 load_index +=1
1031
1033 """
1034 Load modules into the database.
1035 @param dataset_id: primary key for run on config db
1036 """
1037 load_index=0
1038 for module in i3config.GetModules():
1039 self.load_module(module,dataset_id,load_index,tray_index,i3config)
1040 self.load_params(module,dataset_id,tray_index)
1041 load_index +=1
1042
1043
1045 """
1046 Load services into the database.
1047 @param dataset_id: primary key for run on config db
1048 """
1049 load_index=0
1050 for service in i3config.GetServices():
1051 self.load_service(service,dataset_id,load_index,tray_index,i3config)
1052 self.load_params(service,dataset_id,tray_index)
1053 load_index +=1
1054
1055
1056 - def load_module(self,module,dataset_id,load_index,tray_index,i3config,type='module',iptype='tray'):
1057 """
1058 Load individual module into the database given a run ID.
1059 @param module: the module to be loaded
1060 @param dataset_id: primary key for run on config db
1061 @param load_index: order in which module should be loaded
1062 @param tray_index: tray instance to add module to
1063 @param i3config: Steering instance
1064 @param type: module,service,iceprod
1065 @param iptype: one of tray,pre,post. Serves to distinguish pre and post modules
1066 @return: primary key for modules table on config db
1067 """
1068 cursor = self.getcursor()
1069 if type == 'iceprod':
1070 pname = 'iceprod'
1071 pver = iceprod.__version__
1072 pid = self.fetch_project_id(pname,pver)
1073 if not pid:
1074
1075 vt = ('00','00','00')
1076 legacy_ver = self.version_regex.search(pver)
1077 if legacy_ver:
1078 legacy_ver = legacy_ver.group(0).replace('V','')
1079 vt = legacy_ver.split('-')
1080
1081 sql = " INSERT INTO project "
1082 sql += " (name, versiontxt,major_version,minor_version,patch_version) "
1083 sql += " VALUES ('%s','%s','%s','%s','%s')" % (pname,pver,vt[0],vt[1],vt[2])
1084 cursor.execute(sql)
1085 pid = self.insert_id()
1086
1087 self.logger.debug("load_module: %s " % pid)
1088 else:
1089 if not module.GetProjectList():
1090 self.logger.error("module %s doesn't have project attrbute" % module.GetName())
1091 raise Exception, "module %s is missing parent project"% module.GetName()
1092 project = module.GetProjectList()[0]
1093 project = i3config.GetProject(project.GetName())
1094 pid = project.GetId()
1095 self.logger.debug("load_module: %s " % pid)
1096
1097 self.logger.debug('fectching %s module for project id %s' % (type,pid))
1098 mid = self.fetch_module_id(module,pid,type)
1099 self.logger.debug('fectched %s module with id %s' % (type,mid))
1100 if not mid:
1101 sql = " INSERT INTO module "
1102 sql += " (name,class,module_type,project_id) "
1103 sql += " VALUES (\'%s\',\'%s\',\'%s\',%d) " \
1104 % (module.GetName(),module.GetClass(),type,pid)
1105
1106 self.logger.debug(sql.strip())
1107 cursor.execute(sql.strip())
1108 mid = self.insert_id()
1109 if cursor.rowcount:
1110 self.logger.debug("inserted %s id %d" % (type,mid))
1111 else:
1112 self.logger.debug("failed to insert %s id %d" % (type,mid))
1113 self.load_dependencies(module,mid,i3config)
1114 sql = " INSERT INTO module_pivot "
1115 sql += " (module_id, name, dataset_id,tray_index,load_index,iptype) "
1116 sql += " VALUES (%d,'%s',%d,%d,%d,'%s') " % ( mid,module.GetName(),
1117 dataset_id,tray_index,load_index,iptype)
1118 self.logger.debug(sql.strip())
1119 cursor.execute(sql.strip())
1120 mpid = self.insert_id()
1121 module.SetId(mpid)
1122 return mpid
1123
1124
1125
1127 """
1128 retrive id for module with matching name, and project_id
1129 (there should only be one)
1130 @param module: module to query
1131 @param project_id: primary key of parent project
1132 @param type: ('module'|'service')
1133 """
1134 sql = " SELECT module_id FROM module "
1135 sql += " WHERE class ='%s' " % module.GetClass()
1136 sql += " AND module_type ='%s' " % type
1137 sql += " AND project_id =%d " % project_id
1138
1139 self.logger.debug(sql.strip())
1140 cursor = self.getcursor()
1141 cursor.execute (sql.strip());
1142 result = cursor.fetchone()
1143 self.logger.debug(str(result))
1144 if result:
1145 return int(result['module_id'])
1146 else:
1147 self.logger.warn("module \'%s\' not found" % module.GetClass())
1148 return
1149
1151 """
1152 retrive id for service with matching name, and project_id
1153 (there should only be one)
1154 @param service: service to query
1155 @param project_id: primary key of parent project
1156 """
1157 return self.fetch_module_id(service,project_id,'service')
1158
1159 - def load_service(self,service,dataset_id,load_index,tray_index,i3config):
1160 """
1161 Load individual service into the database given a run ID.
1162 @param service: the Service object to be loaded
1163 @param dataset_id: primary key for run on config db
1164 @return: primary key for services table on config db
1165 """
1166 return self.load_module(service,dataset_id,load_index,tray_index,i3config,type='service')
1167
1169 """
1170 Add OMKey object
1171 @param omkey: OMKeys
1172 @param pid: configured parameter id or cparameter_id
1173 """
1174 cursor = self.getcursor()
1175 sql = " INSERT INTO carray_element (name,value,cparameter_id) "
1176 sql += " VALUES "
1177 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid)
1178 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid)
1179 cursor.execute (sql.strip())
1180
1182 """
1183 Add array of OMKey objects
1184 @param omkeyvect: list of OMKeys
1185 @param pid: configured parameter id or cparameter_id
1186 """
1187 if not len(omkeyvect) > 0: return
1188 cursor = self.getcursor()
1189 sql = " INSERT INTO carray_element (name,value,cparameter_id) "
1190 sql += " VALUES "
1191 cm = ""
1192 for omkey in omkeyvect:
1193 sql += cm
1194 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid)
1195 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid)
1196 cm = ","
1197 cursor.execute (sql.strip())
1198
1200 """
1201 Add value array
1202 @param values: list of array elements
1203 @param pid: configured parameter id or cparameter_id
1204 """
1205 cursor = self.getcursor()
1206 if not len(values) > 0: return
1207 sql = " INSERT INTO carray_element (value,unit,cparameter_id) "
1208 sformat = lambda x: "('%s',%s,%s)" % (x.value,self.nullify(x.unit),pid)
1209 vals = ",".join(map(sformat,values))
1210 sql += " VALUES " + vals
1211 cursor.execute (sql.strip())
1212
1213
1215 """
1216 Load parameters into the database.
1217 @param module: whose parameters are to be loaded to database
1218 """
1219 cursor = self.getcursor()
1220 sql = " INSERT INTO cparameter "
1221 sql += " (name,type,unit,module_pivot_id,dataset_id,tray_index,value) "
1222 sql += " VALUES "
1223 count = 0
1224
1225 m_id = module.GetId()
1226 self.logger.debug('load_params: mid = %s' % m_id)
1227
1228 if not module.GetParameters():
1229 return
1230 for p in module.GetParameters():
1231 name = p.GetName()
1232 type = p.GetType()
1233 desc = p.GetDescription()
1234
1235 if type == 'OMKey' or type in VectorTypes:
1236 value = 0
1237 unit = 'NULL'
1238 else:
1239 value = p.GetValue().value
1240 unit = self.nullify(p.GetValue().unit)
1241 sql1 = sql + " ('%s','%s',%s,%d,%d,%d,'%s') " % \
1242 (name,type,unit,m_id,dataset_id,tray_index,value)
1243 self.logger.debug(sql1.strip())
1244 cursor.execute (sql1.strip())
1245 pid = self.insert_id()
1246 p.SetId(pid)
1247 count = count + cursor.rowcount
1248
1249 if type == 'OMKey':
1250 self.insert_omkey(p.GetValue(),pid)
1251 elif type == 'OMKeyv':
1252 self.insert_omkey_array(p.GetValue(),pid)
1253 elif type in VectorTypes:
1254 self.insert_array(p.GetValue(),pid)
1255
1256 self.logger.debug("%d cparameter rows were inserted" % count)
1257
1258
1260 """
1261 DOWNLOAD dataset briefs FROM database
1262 @return: resultset from database
1263 """
1264 sql = " SELECT * FROM dataset "
1265 if search_string and len(search_string):
1266 sql += " WHERE username LIKE '%%%s%%' " % search_string
1267 sql += " OR hostname LIKE '%%%s%%' " % search_string
1268 sql += " OR description LIKE '%%%s%%' " % search_string
1269 sql += " OR startdate LIKE '%%%s%%' " % search_string
1270 for token in search_string.split():
1271 try:
1272 sql += " OR dataset_id = %d " % int(token)
1273 except: pass
1274 sql += " ORDER BY dataset_id DESC "
1275
1276 cursor = self.getcursor()
1277 cursor.execute(sql)
1278 result_set = cursor.fetchall();
1279 return result_set
1280
1281
1282
1283 - def download_config(self,dataset_id, include_defaults=False,include_description=False):
1284 """
1285 DOWNLOAD icetray configuration FROM database
1286 @param dataset_id: ID of the run whose configuration we whish to download
1287 @return: IceTrayConfig object containing the IceTray configuration
1288 """
1289
1290 steering = self.download_steering(dataset_id)
1291 category = self.getsimcat(dataset_id)
1292 steering.SetCategory(category)
1293 self.download_steering_dependencies(dataset_id,steering)
1294 self.download_tasks(dataset_id,steering)
1295 self.download_batch_options(dataset_id,steering)
1296 self.download_externals(dataset_id,steering)
1297
1298
1299 sql = "SELECT * FROM dataset WHERE dataset_id = %d" % dataset_id
1300 cursor = self.getcursor()
1301 cursor.execute(sql)
1302 result = cursor.fetchone();
1303 if result:
1304 steering.SetDescription(result['description'])
1305 steering.SetParentId(result['dataset_id'])
1306
1307
1308 sql = "SELECT * FROM tray WHERE dataset_id = %d" % dataset_id
1309 cursor = self.getcursor()
1310 cursor.execute(sql)
1311 trayitems = cursor.fetchall();
1312 for tray in trayitems:
1313 i3config = IceTrayConfig()
1314
1315 tray_id = tray['tray_id']
1316 tray_index = tray['tray_index']
1317 i3config.SetEvents(tray['inputevents'])
1318 i3config.SetIterations(tray['iterations'])
1319 i3config.SetName(self.nonify(tray['name']))
1320
1321 funcs = {'input': i3config.AddInputFile, \
1322 'output': i3config.AddOutputFile}
1323 fsql = "SELECT type, name, photonics FROM tray_files WHERE tray_id = %s"
1324 cursor.execute(fsql, (tray_id,))
1325 files = cursor.fetchall()
1326 for file in files:
1327 type = file['type'].lower()
1328 obj = IceTrayFile(file['name'], file["photonics"])
1329 func = funcs[type]
1330 func(obj)
1331
1332 self.download_metaprojects(dataset_id,tray_index,i3config)
1333 self.download_projects(dataset_id,tray_index,i3config)
1334 self.download_pre(dataset_id,tray_index,i3config,include_defaults,include_description)
1335 self.download_services(dataset_id,tray_index,i3config,include_defaults,include_description)
1336 self.download_modules(dataset_id,tray_index,i3config, include_defaults=include_defaults,include_description=include_description)
1337 self.download_connections(dataset_id,tray_index,i3config)
1338 self.download_post(dataset_id,tray_index,i3config,include_defaults,include_description)
1339 steering.AddTray(i3config)
1340
1341 self.commit()
1342 return steering
1343
1344
1346 """
1347 Download metaprojects from database
1348 Download projects from database
1349 @param dataset_id: ID of the run whose configuration we wish to download
1350 """
1351 sql = " SELECT metaproject.* "
1352 sql += " FROM metaproject,metaproject_pivot "
1353 sql += " WHERE metaproject_pivot.dataset_id = %d " % dataset_id
1354 sql += " AND metaproject.metaproject_id = "
1355 sql += " metaproject_pivot.metaproject_id "
1356 sql += " AND metaproject_pivot.tray_index = %d " % tray_index
1357 sql += " ORDER BY load_index "
1358
1359 cursor = self.getcursor()
1360 self.logger.debug(sql)
1361 cursor.execute(sql.strip())
1362 result_set = cursor.fetchall();
1363 for mp in result_set:
1364 mpname = mp['name']
1365 mpver = mp['versiontxt']
1366 mp_id = mp['metaproject_id']
1367 mproject = MetaProject()
1368 mproject.SetId(mp_id)
1369
1370 self.logger.debug("downloaded metaproject %s with id %d" % \
1371 (mproject.GetName(),mproject.GetId()))
1372
1373 mproject.SetName(mpname)
1374 mproject.SetVersion(mpver)
1375
1376
1377 i3config.AddMetaProject(mproject.GetName(),mproject)
1378 self.metaproject_dict[mp_id] = mproject
1379
1381 """
1382 Download Load external programs to run prior to icetray
1383 @param dataset_id: primary key for run on config db
1384 """
1385 self.logger.debug("downloading externals")
1386
1387 cursor = self.getcursor()
1388
1389 sql = "SELECT * FROM extern "
1390 sql += "WHERE dataset_id = %d " % dataset_id
1391 cursor.execute(sql.strip())
1392 result_set = cursor.fetchall();
1393 for e in result_set:
1394 extern = Extern()
1395 extern.SetName(self.nonify(e['name']))
1396 extern.SetVersion(self.nonify(e['version']))
1397 extern.SetExec(self.nonify(e['command']))
1398 extern.SetDescription(self.nonify(e['description']))
1399 extern.SetArgs(self.nonify(e['arguments']))
1400 extern.SetInFile(self.nonify(e['infile']))
1401 extern.SetOutFile(self.nonify(e['outfile']))
1402 extern.SetErrFile(self.nonify(e['errfile']))
1403 if self.nonify(e['steering_name']):
1404 es = ExternSteering()
1405 es.SetName(e['steering_name'])
1406 es.SetText(e['steering'])
1407 extern.AddSteering(es)
1408 steering.AddExtern(extern)
1409
1410 return
1411
1413 """
1414 Retrieve value stored in dictionary
1415 @param key: string key to dictionary entry
1416 """
1417 sql = " SELECT value FROM dictionary WHERE "
1418 sql += " keystring = '%s' " % key
1419 cursor = self.getcursor()
1420 cursor.execute(sql.strip())
1421 result_set = cursor.fetchone();
1422 if result_set:
1423 return result_set['value']
1424 else:
1425 return ''
1426
1428 """
1429 Retrieve value stored in dictionary
1430 @param key: string key to dictionary entry
1431 """
1432 sql = " SELECT * FROM file "
1433 sql += " WHERE file_number = %d " % key
1434 if dataset_id:
1435 sql += " AND dataset_id = %d " % dataset_id
1436 cursor = self.getcursor()
1437 cursor.execute(sql.strip())
1438 result_set = cursor.fetchone();
1439 if result_set:
1440 return result_set
1441 else:
1442 return ''
1443
1445 """
1446 Download projects from database
1447 @param dataset_id: ID of the run whose configuration we wish to download
1448 """
1449 projects = []
1450
1451 sql = " SELECT metaproject_pivot.metaproject_id,project.*"
1452 sql += " FROM metaproject_pivot,project,project_pivot,mp_pivot"
1453 sql += " WHERE metaproject_pivot.dataset_id = %d " % dataset_id
1454 sql += " AND project.project_id = mp_pivot.project_id "
1455 sql += " AND project.project_id = project_pivot.project_id "
1456 sql += " AND metaproject_pivot.metaproject_id = mp_pivot.metaproject_id "
1457 sql += " AND project_pivot.dataset_id = metaproject_pivot.dataset_id "
1458 sql += " ORDER BY metaproject_pivot.load_index,project_pivot.load_index "
1459 cursor = self.getcursor()
1460 self.logger.debug(sql.strip())
1461 cursor.execute(sql.strip())
1462 result_set = cursor.fetchall();
1463
1464 for p in result_set:
1465 pname = p['name']
1466 pver = p['versiontxt']
1467 mp_id = p['metaproject_id']
1468 project = Project()
1469 project.SetId(p['project_id'])
1470 self.logger.debug("downloaded project %s with id %d" % (project.GetName(),project.GetId()))
1471 project.SetName(pname)
1472 project.SetVersion(pver)
1473 projects.append(project)
1474 return projects
1475
1477 """
1478 Download projects from database
1479 @param dataset_id: ID of the run whose configuration we wish to download
1480 """
1481
1482 sql = " SELECT metaproject_pivot.metaproject_id,project.*"
1483 sql += " FROM metaproject_pivot,project,project_pivot,mp_pivot"
1484 sql += " WHERE metaproject_pivot.dataset_id = %d " % dataset_id
1485 sql += " AND project.project_id = mp_pivot.project_id "
1486 sql += " AND project.project_id = project_pivot.project_id "
1487 sql += " AND metaproject_pivot.metaproject_id = mp_pivot.metaproject_id "
1488 sql += " AND project_pivot.dataset_id = metaproject_pivot.dataset_id "
1489 sql += " AND metaproject_pivot.tray_index = project_pivot.tray_index "
1490 sql += " AND project_pivot.tray_index = %d " % tray_index
1491 sql += " ORDER BY metaproject_pivot.load_index,project_pivot.load_index "
1492 cursor = self.getcursor()
1493 self.logger.debug(sql.strip())
1494 cursor.execute(sql.strip())
1495 result_set = cursor.fetchall();
1496
1497 for p in result_set:
1498 pname = p['name']
1499 pver = p['versiontxt']
1500 mp_id = p['metaproject_id']
1501 project = Project()
1502 project.SetId(p['project_id'])
1503 self.logger.debug("downloaded project %s with id %d" % (project.GetName(),project.GetId()))
1504
1505 for d in self.fetch_project_dependencies(project.GetId(),mp_id):
1506 self.logger.debug(" %s - adding dependency %s" % (project.GetName(),d.GetName()))
1507 project.AddDependency(d.GetName())
1508 project.SetName(pname)
1509 project.SetVersion(pver)
1510 try:
1511 metaproject = self.metaproject_dict[mp_id]
1512 self.logger.debug("found metaproject %s with id %d" % (metaproject.GetName(),mp_id))
1513 metaproject.AddProject(pname,project)
1514
1515 if not i3config.HasMetaProject(metaproject.GetName()):
1516 i3config.AddMetaProject(metaproject.GetName(),metaproject)
1517 self.logger.debug("adding metaproject - %s" % metaproject.GetName())
1518
1519 except KeyError, k:
1520 self.logger.warn("could not find metaproject with id %d" % mp_id)
1521 self.logger.warn("Adding project to top-level container.")
1522 i3config.AddProject(pname,project)
1523
1524
1526 """
1527 retrive dependencies for project
1528 @param project_id: id of project
1529 @return array of project names
1530 """
1531 dependencies = []
1532
1533 sql = """
1534 SELECT
1535 project_depend.project_depend_id,
1536 project.name,
1537 project.project_id,
1538 project.versiontxt
1539 FROM
1540 project,project_depend
1541 WHERE
1542 project.project_id = project_depend.dependency_id
1543 AND
1544 project_depend.project_id = %d
1545 AND
1546 project_depend.metaproject_id = %d
1547 ORDER BY
1548 project_depend.project_depend_id
1549 """ % (project_id,metaproject_id)
1550
1551 cursor = self.getcursor()
1552 sql = re.sub('\s+',' ',sql);
1553 self.logger.debug(sql);
1554 cursor.execute (sql);
1555 result_set = cursor.fetchall ();
1556
1557 for d in result_set:
1558 dependency = Project()
1559 dependency.SetName(d['name'])
1560 dependency.SetVersion(d['versiontxt'])
1561
1562 dependency.SetId(d['project_depend_id'])
1563 dependencies.append(dependency)
1564
1565 return dependencies
1566
1567
1569 """
1570 retrive dependencies for module
1571 @param module_id: id of module
1572 @return array of project names
1573 """
1574 dependencies = []
1575
1576 sql = """
1577 SELECT
1578 module_dependency.module_dependency_id,
1579 project.name,
1580 project.project_id,
1581 project.versiontxt
1582 FROM
1583 project,module_dependency
1584 WHERE
1585 project.project_id = module_dependency.project_id
1586 AND
1587 module_dependency.module_id = %d
1588 ORDER BY
1589 module_dependency.module_dependency_id """ % module_id
1590
1591 cursor = self.getcursor()
1592 sql = re.sub('\s+',' ',sql);
1593 cursor.execute (sql);
1594 result_set = cursor.fetchall ();
1595
1596 for d in result_set:
1597 dependency = Project()
1598 dependency.SetName(d['name'])
1599 dependency.SetVersion(d['versiontxt'])
1600 dependency.SetId(d['project_id'])
1601 dependencies.append(dependency)
1602
1603 return dependencies
1604
1605
1607 """
1608 Download module connections from database
1609 @param dataset_id: ID of the run whose configuration we whish to download
1610 """
1611 sql = " SELECT * FROM `connection` "
1612 sql += " WHERE dataset_id = %d " % dataset_id
1613 sql += " AND tray_index = %d " % tray_index
1614 cursor = self.getcursor()
1615 cursor.execute(sql)
1616 result_set = cursor.fetchall();
1617 for c in result_set:
1618 csource = c['source']
1619 coutbox = c['outbox']
1620 cdest = c['destination']
1621 cinbox = c['inbox']
1622
1623 conn = Connection()
1624 conn.From(csource,coutbox)
1625 conn.To(cdest,cinbox)
1626 i3config.AddConnection(conn)
1627
1629 """
1630 Get simulation category
1631 @param dataset_id: dataset ID
1632 @return: category
1633 """
1634 self.logger.debug("retrieving simulation category")
1635
1636 sql = """
1637 SELECT simcat.category from simcat,dataset
1638 WHERE simcat.simcat_id = dataset.simcat_id
1639 AND dataset.dataset_id = %d """ % dataset_id
1640
1641 cursor = self.getcursor()
1642 cursor.execute(sql)
1643 result_set = cursor.fetchall();
1644
1645 if len(result_set) > 0:
1646 return result_set[0]['category']
1647
1649 """
1650 Get steering parameters from database
1651 @param dataset_id: ID of the run whose configuration we whish to download
1652 """
1653 steering = Steering()
1654 sql = " SELECT * FROM steering_parameter "
1655 sql += " WHERE dataset_id = '%s'" % dataset_id
1656 sql += " ORDER by name "
1657 cursor = self.getcursor()
1658 cursor.execute(sql)
1659 result_set = cursor.fetchall();
1660
1661 for p in result_set:
1662 param = Parameter()
1663 param.SetType(p['type'])
1664 param.SetName(p['name'])
1665 param.SetValue(p['value'])
1666 steering.AddParameter(param)
1667 return steering
1668
1670 sql = " SELECT value FROM steering_parameter "
1671 sql += " WHERE name = '%s'" % param
1672 sql += " AND dataset_id = '%s'" % dataset_id
1673 cursor = self.getcursor()
1674 cursor.execute(sql)
1675 result = cursor.fetchone();
1676 if result:
1677 return result['value']
1678
1680 """
1681 Get steering dependencies from database
1682 @param dataset_id: ID of the run whose configuration we whish to download
1683 """
1684 sql = "SELECT * FROM steering_dependency WHERE dataset_id = '%s'" % dataset_id
1685 cursor = self.getcursor()
1686 cursor.execute(sql)
1687 result_set = cursor.fetchall();
1688
1689 for p in result_set:
1690 steering.AddDependency(p['filename'])
1691
1693 """
1694 Get job parts from database
1695 @param dataset_id: ID of the run whose configuration we whish to download
1696 """
1697
1698 sql = "SELECT task_def_id,name,reqs,parallel,photonics FROM task_def" \
1699 + " WHERE dataset_id = %s ORDER BY task_def_id"
1700 cursor = self.getcursor()
1701 cursor.execute(sql, (dataset_id,))
1702 results = cursor.fetchall()
1703
1704 tray_sql = "SELECT idx,CONVERT(GROUP_CONCAT(iter),char) AS iters" \
1705 + " FROM task_def_tray WHERE task_def_id = %s" \
1706 + " GROUP BY idx,task_def_id"
1707 parent_sql = "SELECT name FROM task_def,task_def_rel" \
1708 + " WHERE child_task_def_id = %s" \
1709 + " AND parent_task_def_id = task_def_id"
1710 child_sql = "SELECT name FROM task_def,task_def_rel" \
1711 + " WHERE parent_task_def_id = %s" \
1712 + " AND child_task_def_id = task_def_id"
1713
1714 for row in results:
1715 id = row['task_def_id']
1716 name = row['name']
1717 reqs = row['reqs']
1718 parallel = row['parallel']
1719 photonics = row['photonics']
1720
1721 td = TaskDefinition(name,id)
1722 td.SetRequirements(reqs)
1723 td.SetParallelExecution(parallel)
1724 td.SetUsesPhotonics(photonics)
1725
1726 self.logger.debug(tray_sql % id)
1727 cursor.execute(tray_sql, (id,))
1728 trays = cursor.fetchall()
1729 for tray in trays:
1730 td.AddTray(tray['idx'], tray['iters'])
1731
1732 cursor.execute(parent_sql, (id,))
1733 parents = cursor.fetchall()
1734 for parent in parents:
1735 td.AddParent(parent['name'])
1736
1737 cursor.execute(child_sql, (id,))
1738 children = cursor.fetchall()
1739 for child in children:
1740 td.AddChild(child['name'])
1741
1742 steering.AddTaskDefinition(td)
1743
1745 """
1746 Fetch batch system options from database
1747 @param dataset_id: ID of the run whose configuration we whish to download
1748 """
1749 batchopts = []
1750 sql = "SELECT * FROM batch_option WHERE dataset_id = '%s'" % dataset_id
1751 cursor = self.getcursor()
1752 cursor.execute(sql)
1753 result_set = cursor.fetchall()
1754 for b in result_set:
1755 opt = BatchOpt()
1756 opt.SetName(b['name'])
1757 opt.SetType(b['type'])
1758 opt.SetValue(b['value'])
1759 batchopts.append(opt)
1760 steering.AddBatchOpt(opt)
1761 return batchopts
1762
1764 """
1765 Get batch system options from database
1766 @param dataset_id: ID of the run whose configuration we whish to download
1767 """
1768 sql = "SELECT * FROM batch_option WHERE dataset_id = '%s'" % dataset_id
1769 cursor = self.getcursor()
1770 cursor.execute(sql)
1771 result_set = cursor.fetchall();
1772
1773 for b in result_set:
1774 opt = BatchOpt()
1775 opt.SetName(b['name'])
1776 opt.SetType(b['type'])
1777 opt.SetValue(b['value'])
1778 steering.AddBatchOpt(opt)
1779
1780
1781 - def download_modules(self,dataset_id,
1782 tray_index,
1783 i3config,
1784 type='module',
1785 iptype='tray',
1786 include_defaults=False,
1787 include_description=False):
1788 """
1789 Get modules from the database.
1790 @param dataset_id: ID of the run whose configuration we whish to download
1791 """
1792 sql = " SELECT module.class,module.module_id,module_pivot.module_pivot_id,"
1793 sql += " module_pivot.name,module_pivot.load_index "
1794 sql += " FROM module,module_pivot "
1795 sql += " WHERE module_pivot.dataset_id = %d " % dataset_id
1796 sql += " AND module.module_type = '%s' " % type
1797 sql += " AND module_pivot.iptype = '%s' " % iptype
1798 sql += " AND module.module_id = module_pivot.module_id "
1799 sql += " AND module_pivot.tray_index = %d " % tray_index
1800 sql += " ORDER BY load_index "
1801
1802 cursor = self.getcursor()
1803 self.logger.debug(sql.strip())
1804 cursor.execute(sql.strip())
1805 result_set = cursor.fetchall();
1806 for m in result_set:
1807 mod = Service()
1808
1809 mod.SetClass(m['class'])
1810 mod.SetName(m['name'])
1811 mod.SetId(m['module_id'])
1812 module_pivot_id = m['module_pivot_id']
1813 if type == 'module':
1814 i3config.AddModule(mod)
1815 elif type == 'service':
1816 i3config.AddService(mod)
1817 elif type == 'iceprod':
1818 if iptype == 'pre':
1819 i3config.AddIceProdPre(mod)
1820 elif iptype == 'post':
1821 i3config.AddIceProdPost(mod)
1822
1823 if type in ['module','service']:
1824 for p in self.fetch_module_dependencies(mod.GetId()):
1825 project = i3config.GetProject(p.GetName())
1826 if not project == None:
1827 mod.AddProject(project.GetName(),project)
1828 else:
1829 self.logger.warn('could not find dependency \'%s\'' % p.GetName() )
1830 i3config.AddProject(p.GetName(),p)
1831 mod.AddProject(p.GetName(),p)
1832
1833 self.download_params(mod,module_pivot_id,dataset_id,include_defaults,include_description)
1834
1835
1836 - def download_services(self,dataset_id,tray_index,i3config,
1837 include_defaults=False,include_description=False):
1838 """
1839 Download services from the database.
1840 @param dataset_id: ID of the run whose configuration we whish to download
1841 """
1842 return self.download_modules(dataset_id,
1843 tray_index,
1844 i3config,
1845 type='service',
1846 include_defaults=include_defaults,
1847 include_description=include_description)
1848
1849 - def download_pre(self,dataset_id,tray_index,i3config,
1850 include_defaults=False,include_description=False):
1851 """
1852 Download IceProdPre modules from the database.
1853 @param dataset_id: ID of the run whose configuration we whish to download
1854 """
1855 return self.download_modules(dataset_id,
1856 tray_index,
1857 i3config,
1858 type='iceprod',
1859 iptype='pre',
1860 include_defaults=include_defaults,
1861 include_description=include_description)
1862
1863 - def download_post(self,dataset_id,tray_index,i3config,
1864 include_defaults=False,
1865 include_description=False):
1866 """
1867 Download IceProdPost modules from the database.
1868 @param dataset_id: ID of the run whose configuration we whish to download
1869 """
1870 return self.download_modules(dataset_id,
1871 tray_index,
1872 i3config,
1873 type='iceprod',
1874 iptype='post',
1875 include_defaults=include_defaults,
1876 include_description=include_description)
1877
1879 cursor = self.getcursor()
1880 sql = " SELECT * from carray_element "
1881 sql += " WHERE cparameter_id = %d " % pid
1882 cursor.execute (sql.strip())
1883 result_set = cursor.fetchall();
1884 vect = []
1885 for item in result_set:
1886 vect.append(Value(item['value'],self.nonify(item['unit'])))
1887 return vect
1888
1890 omkeys = self.select_omkey_array(pid)
1891 if len(omkeys) < 1:
1892 raise Exception,'could not find omkey for param %d' % pid
1893 return omkeys[0]
1894
1896 cursor = self.getcursor()
1897 sql = " SELECT * from carray_element "
1898 sql += " WHERE cparameter_id = %d order by carray_element_id" % pid
1899 cursor.execute (sql.strip())
1900 result_set = cursor.fetchall();
1901 omkeyvect = []
1902 for item in result_set:
1903 if item['name'] == 'stringid':
1904 omkey = pyOMKey(0,0)
1905 omkey.stringid = item['value']
1906 elif item['name'] == 'omid':
1907 omkey.omid = item['value']
1908 omkeyvect.append(omkey)
1909 else:
1910 raise Exception,'expected omkey but found %s' % result_set[1]['name']
1911 return omkeyvect
1912
1913 - def download_params(self,module,mod_id,dataset_id,include_defaults=False, include_description=False):
1914 """
1915 Download module parameters from the database.
1916 @param mod_id: index corresponding to module table
1917 """
1918 paramdict = {}
1919
1920 if include_defaults or include_description:
1921 sql = " SELECT * FROM parameter "
1922 sql += " WHERE module_id = %d" % module.GetId()
1923 sql += " ORDER BY name "
1924 cursor = self.getcursor()
1925 cursor.execute(sql)
1926 result_set = cursor.fetchall();
1927 for p in result_set:
1928 param = Parameter()
1929 param.SetType(p['type'])
1930 param.SetName(p['name'])
1931 pid = p['parameter_id']
1932 if param.GetType() == 'OMKeyv':
1933 param.SetValue(self.select_omkey_array(pid))
1934 elif param.GetType() == 'OMKey':
1935 param.SetValue(self.select_omkey(pid))
1936 elif param.GetType() in VectorTypes:
1937 param.SetValue(self.select_array(pid))
1938 else:
1939 param.SetValue(Value(p['value'],self.nonify(p['unit'])))
1940 param.SetUnit(self.nonify(p['unit']))
1941 param.SetDefault(True)
1942 param.SetDefault(param.GetValue())
1943 paramdict[param.GetName().lower()] = param
1944 if include_description:
1945 param.SetDescription(p['description'])
1946
1947
1948
1949 sql = " SELECT * FROM cparameter "
1950 sql += " WHERE module_pivot_id = '%s'" % mod_id
1951 sql += " AND dataset_id = '%s'" % dataset_id
1952 sql += " ORDER BY name "
1953 cursor = self.getcursor()
1954 cursor.execute(sql)
1955 result_set = cursor.fetchall();
1956
1957 for p in result_set:
1958 param = Parameter()
1959 param.SetType(p['type'])
1960 param.SetName(p['name'])
1961 pid = p['cparameter_id']
1962 if param.GetType() == 'OMKeyv':
1963 param.SetValue(self.select_omkey_array(pid))
1964 elif param.GetType() == 'OMKey':
1965 param.SetValue(self.select_omkey(pid))
1966 elif param.GetType() in VectorTypes:
1967 param.SetValue(self.select_array(pid))
1968 else:
1969 param.SetValue(Value(p['value'],self.nonify(p['unit'])))
1970 param.SetUnit(self.nonify(p['unit']))
1971 if paramdict.has_key(param.GetName().lower()):
1972 param.SetDefault(paramdict[param.GetName().lower()].GetDefault())
1973 paramdict[param.GetName().lower()] = param
1974
1975
1976 for param in paramdict.values():
1977 module.AddParameter(param)
1978
1979
1980 - def GetGridId(self,grid_name,module_name,on,institution='unknown',batchsys='unknown',url=''):
1981 """
1982 Retrieve the key for grid_name
1983 """
1984 ver = iceprod.__version__
1985 if not self.isconnected(): self.connect()
1986 cursor = self.getcursor()
1987 sql = " SELECT grid_id FROM grid WHERE name='%s' " % grid_name
1988 cursor.execute(sql)
1989 result = cursor.fetchone()
1990 if result:
1991 grid_id = result['grid_id']
1992 else:
1993 sql = " INSERT IGNORE INTO grid (name,institution,batchsys,version) "
1994 sql += " VALUES ('%s','%s','%s','%s') " % (grid_name,institution,batchsys,ver)
1995 cursor.execute(sql)
1996 grid_id = self.insert_id()
1997
1998
1999 vertuple = ver.split('.')
2000 if len(vertuple) < 3:
2001 vertuple= (0,0,0)
2002 sql = " INSERT IGNORE INTO project (name,major_version,minor_version,patch_version,versiontxt) "
2003 sql += " VALUES ('iceprod','%s','%s','%s','%s') " % (vertuple[0],vertuple[1],vertuple[2],ver)
2004 cursor.execute(sql)
2005
2006 sql = " UPDATE grid SET "
2007 sql += " institution = '%s', " % institution
2008 sql += " batchsys = '%s', " % batchsys
2009 sql += " version = '%s', " % ver
2010 if url:
2011 sql += " url = '%s', " % url
2012 if on:
2013 sql += " %s='RUNNING' " % module_name
2014 else:
2015 sql += " %s='STOPPED' " % module_name
2016 sql += " WHERE grid_id=%d " % grid_id
2017 cursor.execute(sql)
2018 self.commit()
2019 return grid_id
2020
2022 """
2023 Check latest version of software and see if we need to upgrade
2024 """
2025 ver = iceprod.__version__
2026 ver = ver.replace('V','').split('-')
2027 if len(ver) < 3: return None
2028
2029 if not self.isconnected(): self.connect()
2030 cursor = self.getcursor()
2031 sql = " SELECT * FROM svn "
2032 sql += " WHERE major='%s' " % ver[0]
2033 sql += " AND minor='%s' " % ver[0]
2034 cursor.execute(sql)
2035 result = cursor.fetchone()
2036 try:
2037 if int(ver[2]) < int(result['patch']):
2038 return result['url']
2039 except: pass
2040 return None
2041
2042
2044 """
2045 Insert grid_statistics entries for grids which should run this
2046 dataset.
2047 @param grids: list of grids or clusters
2048 @param dataset_id: dataset id
2049 """
2050
2051 delim = ""
2052 sql = " SELECT grid_id FROM grid WHERE "
2053 for grid in grids:
2054 sql += "%s name = '%s' " % (delim,grid)
2055 delim = "OR"
2056 cursor = self.getcursor()
2057 cursor.execute(sql)
2058 result_set = cursor.fetchall()
2059
2060 if len(result_set) == 0:
2061 self.logger.error("could not match grid name '%s'" % ":".join(grids))
2062 return
2063
2064 delim = ""
2065 sql = " INSERT INTO grid_statistics "
2066 sql += " (grid_id,dataset_id) VALUES "
2067 for item in result_set:
2068 sql += " %s (%d,%d) " % ( delim, item['grid_id'], dataset_id)
2069 delim = ","
2070 cursor = self.getcursor()
2071 cursor.execute(sql)
2072 self.commit()
2073
2075 """
2076 Create job monitoring entries in database
2077 """
2078 cursor = self.getcursor()
2079
2080 sql = " INSERT INTO job "
2081 sql += " (queue_id,status,dataset_id,priority,status_changed) VALUES "
2082
2083 qstart = start_qid
2084 qend = start_qid + maxjobs
2085 for i in range(qstart,qend,min(stepsize,maxjobs)):
2086 comma = ""
2087 sql1 = sql
2088 for job in range(i,min(i+stepsize,qend)):
2089 sql1 += comma + " (%d,'WAITING',%d,%d,NOW()) " % ( job, dataset_id, priority )
2090 comma = ","
2091 cursor.execute(sql1)
2092 self.commit()
2093
2094
2095 - def validate(self,dataset_id,status='TRUE'):
2096 """
2097 Mark dataset as visible and valid.
2098 """
2099 cursor = self.getcursor()
2100 sql = " UPDATE dataset SET verified = '%s' " % status
2101 sql += " WHERE dataset_id = %d " % dataset_id
2102 self.logger.debug(sql)
2103 cursor.execute(sql)
2104 self.commit()
2105
2106
2108
2109 cursor = self.getcursor()
2110 dp = DIF_Plus()
2111
2112 sql = " SELECT * FROM dif "
2113 sql += " WHERE dataset_id = %d " % dataset_id
2114 cursor.execute(sql)
2115 row = cursor.fetchone();
2116 if not row: return dp
2117 dif = dp.GetDIF()
2118 dif.SetParameters(row['parameters'])
2119 dif.SetEntryTitle(row['entry_title'])
2120 dif.SetSummary(row['summary'])
2121 dif.SetSourceName(row['source_name'])
2122 dif.SetSensorName(row['sensorname'])
2123 td = time.strptime(str(row['dif_creation_date']),"%Y-%m-%d %H:%M:%S")
2124 td = time.strftime("%Y-%m-%d",td)
2125 dif.SetDIFCreationDate(td)
2126
2127 sql = " SELECT * FROM plus "
2128 sql += " WHERE dataset_id = %d " % dataset_id
2129 cursor.execute(sql)
2130 row = cursor.fetchone();
2131 plus = dp.GetPlus()
2132 ts = time.strptime(str(row['start_datetime']),"%Y-%m-%d %H:%M:%S")
2133 ts = time.strftime("%Y-%m-%dT%H:%M:%S",ts)
2134 plus.SetStartDatetime(ts)
2135
2136 te = time.strptime(str(row['end_datetime']),"%Y-%m-%d %H:%M:%S")
2137 te = time.strftime("%Y-%m-%dT%H:%M:%S",te)
2138 plus.SetEndDatetime(te)
2139 plus.SetCategory(row['category'])
2140 plus.SetSubCategory(row['subcategory'])
2141 plus.SetSimDBKey(dataset_id)
2142 plus.SetI3DBKey(row['i3db_key'])
2143 plus.SetSteeringFile(row['steering_file'])
2144 for project in self.fetch_project_list(dataset_id):
2145 plus.AddProject(project)
2146
2147 return dp
2148
2150
2151 cursor = self.getcursor()
2152
2153 dif = difplus.GetDIF()
2154 sql = " INSERT INTO dif "
2155 sql += " (dataset_id,parameters,entry_title,summary, "
2156 sql += " source_name,sensorname,dif_creation_date) "
2157 sql += " VALUES ( %d, " % dataset_id
2158 sql += "'%s'," % str(dif.GetParameters())
2159 sql += "'%s'," % str(dif.GetEntryTitle())
2160 sql += "'%s'," % str(dif.GetSummary())
2161 sql += "'%s'," % str(dif.GetSourceName())
2162 sql += "'%s'," % str(dif.GetSensorName())
2163 sql += "'%s')" % str(dif.GetDIFCreationDate())
2164 self.logger.debug(sql)
2165 cursor.execute(sql)
2166
2167 plus = difplus.GetPlus()
2168 sql = " INSERT INTO plus "
2169 sql += " (dataset_id,start_datetime,end_datetime,"
2170 sql += " category,subcategory,i3db_key,steering_file) "
2171 sql += " VALUES ( %d, " % dataset_id
2172 sql += "'%s'," % str(plus.GetStartDatetime())
2173 sql += "'%s'," % str(plus.GetEndDatetime())
2174 sql += "'%s'," % str(plus.GetCategory())
2175 sql += "'%s'," % str(plus.GetSubCategory())
2176 sql += "%s," % (plus.GetI3DBKey() or 'NULL')
2177 sql += "'%s')" % str(plus.GetSteeringFile())
2178 self.logger.debug(sql)
2179 cursor.execute(sql)
2180
2181 self.commit()
2182
2184 """
2185 Change Plus:subcategory in DIFPlus metadata
2186 """
2187 cursor = self.getcursor()
2188 sql = " UPDATE plus "
2189 sql += " SET subcategory='%s' " % sub_cat
2190 sql += " WHERE dataset_id=%d " % dataset_id
2191 self.logger.debug(sql)
2192 cursor.execute(sql)
2193 self.commit()
2194
2195
2197 """
2198 load a list of files for filtering.
2199 @param dataset_id: the dataset id that dictonary is bound to
2200 @param odict: the dictionary to load
2201 """
2202 cursor = self.getcursor()
2203 index = 0
2204 list = odict.keys()[
2205 min(index,len(odict.keys())):
2206 min(index+100,len(odict.keys()))]
2207 while list:
2208 sql = " INSERT INTO file (file_key,path,subdir,filename,dataset_id) "
2209 sql += " VALUES "
2210 cm = ''
2211 for key in list:
2212 file = odict[key]
2213 sql += " %s ( %d, '%s', '%s', '%s', %d ) " % \
2214 (cm,key,file[0],file[1],file[2],dataset_id)
2215 cm = ','
2216 index = index + 100
2217 list = odict.keys()[
2218 min(index,len(odict.keys())):
2219 min(index+100,len(odict.keys()))]
2220 self.logger.debug(sql)
2221 cursor.execute(sql)
2222 self.commit()
2223
2224
2225
2227
2228 logger = logging.getLogger('MonitorDB')
2229
2231 """
2232 Constructor
2233 """
2234 IceProdDB.__init__(self)
2235 self.maxsuspend = 20
2236
2238 """
2239 Create a copy of this instance
2240 """
2241 newconn = MonitorDB()
2242 newconn.host_ = self.host_
2243 newconn.usr_ = self.usr_
2244 newconn.passwd_ = self.passwd_
2245 newconn.db_ = self.db_
2246 newconn._connected = False
2247 return newconn
2248
2249
2250 - def reset_old_jobs(self,
2251 grid_id,
2252 maxidletime,
2253 maxruntime,
2254 maxsubmittime,
2255 maxcopytime,
2256 maxfailures=10,
2257 maxevicttime=10,
2258 keepalive=14400):
2259 """
2260 reset status of jobs that where queued but who's status
2261 has not changed in more that maxtime minutes
2262
2263 @param grid_id: id of current cluster
2264 @param maxruntime: maximum run time for jobs
2265 @param maxsubmittime: maximum submit time for jobs
2266 @param maxcopytime: maximum time for jobs to be in 'copying' state
2267 @param maxfailures: maximum number of time a job is allowd to fail
2268 @param keepalive: how often should server expect to hear from jobs
2269 """
2270
2271 totalrows = 0
2272 cursor = self.getcursor()
2273 passkey = self.mkkey(6,9)
2274
2275
2276 self.logger.debug("reseting stale queued jobs...")
2277 sql = " UPDATE job SET "
2278 sql += " errormessage=CONCAT( "
2279 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0]
2280 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), "
2281 sql += " prev_state = status, "
2282 sql += " status='RESET', "
2283 sql += " passkey='%s', " % passkey
2284 sql += " status_changed=NOW() "
2285 sql += " WHERE grid_id=%d " % grid_id
2286 sql += " AND status='QUEUED' "
2287 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxidletime
2288 sql += " LIMIT 20 "
2289 self.logger.debug(sql)
2290 cursor.execute(sql)
2291 rowcount = self._conn.affected_rows()
2292 totalrows += rowcount
2293 self.commit()
2294 self.logger.debug('Reset %d queued jobs' % rowcount)
2295
2296
2297 self.logger.debug("reseting stale queuing jobs...")
2298 sql = " UPDATE job SET "
2299 sql += " errormessage=CONCAT( "
2300 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0]
2301 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), "
2302 sql += " prev_state = status, "
2303 sql += " status='RESET', "
2304 sql += " passkey='%s', " % passkey
2305 sql += " status_changed=NOW() "
2306 sql += " WHERE grid_id=%d " % grid_id
2307 sql += " AND status='QUEUEING' "
2308 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxsubmittime
2309 sql += " LIMIT 20 "
2310 cursor.execute(sql)
2311 rowcount = self._conn.affected_rows()
2312 totalrows += rowcount
2313 self.commit()
2314 self.logger.debug('Reset %d queueing jobs' % rowcount)
2315
2316
2317 self.logger.debug("reseting stale cleaning jobs...")
2318 sql = " UPDATE job SET "
2319 sql += " errormessage=CONCAT( "
2320 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0]
2321 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), "
2322 sql += " status=prev_state, "
2323 sql += " prev_state = 'CLEANING', "
2324 sql += " passkey='%s', " % passkey
2325 sql += " status_changed=NOW() "
2326 sql += " WHERE grid_id=%d " % grid_id
2327 sql += " AND status='CLEANING' "
2328 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxsubmittime
2329 sql += " LIMIT 20 "
2330 cursor.execute(sql)
2331 rowcount = self._conn.affected_rows()
2332 totalrows += rowcount
2333 self.commit()
2334 self.logger.debug('Reset %d cleaning jobs' % rowcount)
2335
2336
2337 self.logger.debug("reseting stale processing jobs...")
2338 sql = " UPDATE job SET "
2339 sql += " errormessage=CONCAT( "
2340 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0]
2341 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), "
2342 sql += " prev_state = status, "
2343 sql += " status='RESET', "
2344 sql += " passkey='%s', " % passkey
2345 sql += " status_changed=NOW() "
2346 sql += " WHERE grid_id=%d " % grid_id
2347 sql += " AND status='PROCESSING' "
2348 timeout = maxruntime
2349 if keepalive > 0:
2350 timeout = min(maxruntime,keepalive)
2351 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % timeout
2352 sql += " LIMIT 20 "
2353 cursor.execute(sql)
2354 rowcount = self._conn.affected_rows()
2355 totalrows += rowcount
2356 self.commit()
2357 self.logger.debug('Reset %d processing jobs' % rowcount)
2358
2359
2360 self.logger.debug("reseting evicted jobs...")
2361 sql = " UPDATE job SET "
2362 sql += " errormessage=CONCAT( "
2363 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0]
2364 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), "
2365 sql += " prev_state = status, "
2366 sql += " status='RESET', "
2367 sql += " passkey='%s', " % passkey
2368 sql += " status_changed=NOW() "
2369 sql += " WHERE grid_id=%d " % grid_id
2370 sql += " AND status='EVICTED' "
2371 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxevicttime
2372 sql += " LIMIT 20 "
2373 cursor.execute(sql)
2374 rowcount = self._conn.affected_rows()
2375 totalrows += rowcount
2376 self.commit()
2377 self.logger.debug('Reset %d evicted jobs' % rowcount)
2378
2379
2380 self.logger.debug("reseting stale copying jobs...")
2381 sql = " UPDATE job SET "
2382 sql += " errormessage=CONCAT( "
2383 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0]
2384 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), "
2385 sql += " prev_state = status, "
2386 sql += " status='RESET', "
2387 sql += " passkey='%s', " % passkey
2388 sql += " status_changed=NOW() "
2389 sql += " WHERE grid_id=%d " % grid_id
2390 sql += " AND status='COPYING' "
2391 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxcopytime
2392 sql += " LIMIT 100 "
2393 cursor.execute(sql)
2394 rowcount = self._conn.affected_rows()
2395 totalrows += rowcount
2396 self.commit()
2397 self.logger.debug('Reset %d copying jobs' % rowcount)
2398 self.logger.info('Reset %d jobs' % totalrows)
2399
2400
2401 self.logger.debug("suspending jobs with too many errors...")
2402 sql = " UPDATE job SET "
2403 sql += " errormessage=CONCAT('too many errors.',errormessage), "
2404 sql += " prev_state = status, "
2405 sql += " status='FAILED', "
2406 sql += " passkey='%s', " % passkey
2407 sql += " status_changed=NOW() "
2408 sql += " WHERE grid_id=%d " % grid_id
2409 sql += " AND status != 'SUSPENDED' "
2410 sql += " AND status != 'FAILED' "
2411 sql += " AND status != 'OK' "
2412 sql += " AND job.failures > %d " % maxfailures
2413 sql += " LIMIT 2000 "
2414 cursor.execute(sql)
2415 rowcount = self._conn.affected_rows()
2416 self.commit()
2417 if rowcount > 0:
2418 self.logger.info('Suspended %d jobs with too many errors' % rowcount)
2419
2421 """
2422 Get a list of new datasets.
2423 """
2424 cursor = self.getcursor()
2425 sql = " SELECT SQL_NO_CACHE dataset_id, IFNULL(grid_id, 0) "
2426 sql += " AS grid_id, name, SUM(amount) AS amount "
2427 sql += " FROM dataset_statistics_log "
2428 sql += " GROUP BY dataset_id, grid_id, name "
2429 cursor.execute(sql)
2430 sets = cursor.fetchall();
2431
2432 metaprojects = {};
2433 simcats = {};
2434 for ref in sets:
2435 dataset_id = ref['dataset_id'];
2436 grid_id = ref['grid_id'];
2437 name = ref['name'];
2438 amount = ref['amount'];
2439
2440 if not metaprojects.has_key(dataset_id):
2441 metaprojects[dataset_id] = [];
2442 cursor.execute(
2443 " SELECT DISTINCT metaproject_id"
2444 + " FROM metaproject_pivot"
2445 + " WHERE dataset_id = %u " % dataset_id
2446 + " LIMIT 1");
2447 mp_sth = cursor.fetchall();
2448 for mp_ref in mp_sth:
2449 metaprojects[dataset_id].append(mp_ref['metaproject_id']);
2450
2451 if not simcats.has_key(dataset_id):
2452 simcats[dataset_id] = [];
2453 cursor.execute(
2454 " SELECT simcat_id"
2455 + " FROM dataset"
2456 + " WHERE dataset_id = %u " % dataset_id);
2457 sc_sth = cursor.fetchall();
2458 for sc_ref in sc_sth:
2459 simcats[dataset_id].append(sc_ref['simcat_id']);
2460
2461 if '0.00' != '%.2f' % amount:
2462 for mp_id in metaprojects[dataset_id]:
2463 for sc_id in simcats[dataset_id]:
2464 sql = " INSERT INTO dataset_statistics_mv"
2465 sql += " (dataset_id,metaproject_id,simcat_id,grid_id,name,value)"
2466 sql += " VALUES (%s,%s,%s,%s,%s,%s)"
2467 sql += " ON DUPLICATE KEY UPDATE value=value+%s";
2468 cursor.execute(sql,[dataset_id,mp_id,sc_id,grid_id,name,amount,amount]);
2469
2470 cursor.execute("DELETE FROM dataset_statistics_log");
2471 self.commit()
2472
2473
2474
2476 """
2477 Get a list of new datasets.
2478 """
2479 cursor = self.getcursor()
2480 sql = " SELECT dataset.dataset_id,dataset.jobs_submitted, "
2481 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source, "
2482 sql += " dif.sensorname as sensor, "
2483 sql += " YEAR(plus.start_datetime) as year, "
2484 sql += " plus.category, plus.subcategory,dataset.hist "
2485 sql += " FROM dataset,plus,dif "
2486 if grid:
2487 sql += ",grid_statistics "
2488 sql += " WHERE dataset.dataset_id = plus.dataset_id "
2489 sql += " AND dataset.dataset_id=dif.dataset_id "
2490 if grid:
2491 sql += " AND grid_statistics.dataset_id = dataset.dataset_id "
2492 sql += " AND grid_statistics.grid_id = %d " % grid
2493 if dataset > 0:
2494 sql += " AND dataset.dataset_id = %d " % dataset
2495 else:
2496 sql += " AND dataset.status='PROCESSING' "
2497 self.logger.debug(sql)
2498 cursor.execute(sql)
2499 sets = cursor.fetchall();
2500 self.commit()
2501 for set in sets:
2502 set['sensor'] = set['sensor'].replace('ICECUBE','IceCube').replace('IceTop','IceTop')
2503 return sets
2504
2506 """
2507 Get a list of finished dataset for which no histos have been created.
2508 """
2509 cursor = self.getcursor()
2510 sql = " SELECT dataset.dataset_id,dataset.jobs_submitted, "
2511 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source, "
2512 sql += " dif.sensorname as sensor, "
2513 sql += " YEAR(plus.start_datetime) as year, "
2514 sql += " plus.category, plus.subcategory "
2515 sql += " FROM dataset,plus,dif "
2516 sql += " WHERE dataset.dataset_id = plus.dataset_id "
2517 sql += " AND dataset.dataset_id=dif.dataset_id "
2518 sql += " AND dataset.status='COMPLETE' "
2519 if dataset:
2520 sql += " AND dataset.dataset_id = %d " % dataset
2521 else:
2522 sql += " AND dataset.hist=1 "
2523 self.logger.debug(sql)
2524 cursor.execute(sql)
2525 sets = cursor.fetchall();
2526 self.commit()
2527 for set in sets:
2528 set['sensor'] = set['sensor'].replace('ICECUBE','IceCube').replace('IceTop','IceTop')
2529 return sets
2530
2532 """
2533 Get a list of finished dataset for which no histos have been created.
2534 """
2535 sets = []
2536 if datasetlist:
2537 cursor = self.getcursor()
2538 sql = " SELECT dataset.* FROM dataset "
2539 sql += " WHERE dataset.dataset_id "
2540 sql += " IN (%s) " % ",".join(map(str,datasetlist))
2541 sql += " AND dataset.status != 'PROCESSING' "
2542 self.logger.debug(sql)
2543 cursor.execute(sql)
2544 sets = cursor.fetchall();
2545 self.commit()
2546 return sets
2547
2549 """
2550 Get a list of new datasets.
2551 """
2552 cursor = self.getcursor()
2553 sql = " SELECT dataset.dataset_id,dataset.jobs_submitted, "
2554 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source, "
2555 sql += " dif.sensorname as sensor, "
2556 sql += " YEAR(plus.start_datetime) as year, "
2557 sql += " plus.category, plus.subcategory "
2558 sql += " FROM dataset,plus,dif "
2559 sql += " WHERE dataset.dataset_id = plus.dataset_id "
2560 sql += " AND dataset.dataset_id=dif.dataset_id "
2561 sql += " AND dataset.status='COMPLETE' AND dataset.hist=0 "
2562 sql += " AND dataset.dataset_id = %d " % dataset_id
2563 self.logger.debug(sql)
2564 cursor.execute(sql)
2565 set = cursor.fetchone();
2566 set['sensor'] = set['sensor'].replace('ICECUBE','IceCube').replace('IceTop','IceTop')
2567 self.commit()
2568 return set
2569
2571 cursor = self.getcursor()
2572 sql = " UPDATE dataset "
2573 sql += " SET dataset.hist=0 "
2574 sql += " WHERE dataset.dataset_id = %d " % dataset
2575 sql += " AND dataset.hist=1 "
2576 self.logger.debug(sql)
2577 cursor.execute(sql)
2578 self.commit()
2579 return
2580
2582 """
2583 Update statistics for datasets and return all dataset which
2584 have completed
2585 """
2586
2587 finished_sets = []
2588 cursor = self.getcursor()
2589
2590 self.logger.debug("updating monitoring tables")
2591 sql = " SELECT job.dataset_id, "
2592 sql += " job.grid_id, "
2593 sql += " SUM(job.time_real) AS real_time, "
2594 sql += " SUM(job.time_sys) AS sys_time, "
2595 sql += " SUM(job.time_user) AS user_time, "
2596 sql += " SUM(job.mem_heap) AS heap_mem, "
2597 sql += " SUM(job.mem_heap_peak) AS heap_mem_peak, "
2598 sql += " SUM(job.mem_stack_peak) AS stack_mem_peak, "
2599 sql += " SUM(job.nevents) AS events, "
2600 sql += " SUM(job.gevents) AS gevents, "
2601 sql += " SUM(job.evictions) AS sevictions "
2602 sql += " FROM job,dataset "
2603 sql += " WHERE job.status = 'OK' "
2604 sql += " AND dataset.dataset_id = job.dataset_id "
2605 if dataset_id:
2606 sql += " AND dataset.dataset_id = %d " % dataset_id
2607 sql += " GROUP BY job.grid_id "
2608 else:
2609 sql += " AND dataset.status = 'PROCESSING' "
2610 sql += " AND dataset.jobs_completed >= dataset.jobs_submitted "
2611 sql += " GROUP BY job.dataset_id, job.grid_id "
2612 cursor.execute(sql)
2613 result_set = cursor.fetchall();
2614 self.commit()
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631 sql = " SELECT job.dataset_id, "
2632 sql += " MAX(job.status_changed) AS enddate, "
2633 sql += " SUM(1) AS jobs_submitted, "
2634 sql += " SUM(job.status='OK' ) AS jobs_completed, "
2635 sql += " SUM(job.status='FAILED') AS jobs_failed, "
2636 sql += " SUM(job.status='SUSPENDED') AS jobs_suspended, "
2637 sql += " SUM(job.time_real) AS real_time, "
2638 sql += " SUM(job.time_sys) AS sys_time, "
2639 sql += " SUM(job.time_user) AS user_time, "
2640 sql += " SUM(job.mem_heap) AS heap_mem, "
2641 sql += " SUM(job.mem_heap_peak) AS heap_mem_peak, "
2642 sql += " SUM(job.mem_stack_peak) AS stack_mem_peak, "
2643 sql += " SUM(job.nevents) AS events, "
2644 sql += " SUM(job.gevents) AS gevents, "
2645 sql += " SUM(job.evictions) AS sevictions, "
2646 sql += " grid_statistics.debug AS debug "
2647 sql += " FROM job, dataset, grid_statistics "
2648 sql += " WHERE dataset.dataset_id = job.dataset_id "
2649 sql += " AND dataset.dataset_id = grid_statistics.dataset_id "
2650 sql += " AND grid_statistics.grid_id = %d " % grid_id
2651 if dataset_id:
2652 sql += " AND dataset.dataset_id = %d " % dataset_id
2653 else:
2654 sql += " AND dataset.status = 'PROCESSING' "
2655 sql += " GROUP by job.dataset_id "
2656 cursor.execute(sql)
2657 result_set = cursor.fetchall();
2658 self.commit()
2659
2660 for entry in result_set:
2661 try:
2662 entry['jobs_completed'] = self.intcast(entry['jobs_completed'])
2663 entry['jobs_failed'] = self.intcast(entry['jobs_failed'])
2664 except Exception,e:
2665 self.logger.error("Could not cast int(%s)" % entry['jobs_completed'] )
2666 entry['jobs_completed'] = 0
2667 continue;
2668
2669 if self.intcast(entry['jobs_completed']) == self.intcast(entry['jobs_submitted']):
2670
2671 finished_sets.append(entry)
2672 sql = " UPDATE dataset "
2673 sql += " SET dataset.jobs_completed = %d, " % entry['jobs_completed']
2674 sql += " dataset.jobs_failed = %d, " % entry['jobs_failed']
2675 sql += " dataset.status = 'READYTOPUBLISH', "
2676 sql += " dataset.enddate = '%s', " % entry['enddate']
2677 sql += " time_real = %g, " % entry['real_time']
2678 sql += " time_sys = %g, " % entry['sys_time']
2679 sql += " time_user = %g, " % entry['user_time']
2680 sql += " mem_heap = %g, " % entry['heap_mem']
2681 sql += " mem_heap_peak = %g, " % entry['heap_mem_peak']
2682 sql += " mem_stack_peak = %g, " % entry['stack_mem_peak']
2683 sql += " events = %d " % self.intcast(entry['events'])
2684 sql += " WHERE dataset.dataset_id = %d " % entry['dataset_id']
2685 sql += " AND dataset.status = 'PROCESSING' "
2686 cursor.execute(sql)
2687 elif (self.intcast(entry['jobs_completed']) + \
2688 self.intcast(entry['jobs_failed'])) == \
2689 self.intcast(entry['jobs_submitted']):
2690 finished_sets.append(entry)
2691 sql = " UPDATE dataset "
2692 sql += " SET dataset.jobs_completed = %d, " % entry['jobs_completed']
2693 sql += " dataset.jobs_failed = %d, " % entry['jobs_failed']
2694 sql += " dataset.status = 'ERRORS', "
2695 sql += " dataset.enddate = '%s', " % entry['enddate']
2696 sql += " time_real = %g, " % entry['real_time']
2697 sql += " time_sys = %g, " % entry['sys_time']
2698 sql += " time_user = %g, " % entry['user_time']
2699 sql += " mem_heap = %g, " % entry['heap_mem']
2700 sql += " mem_heap_peak = %g, " % entry['heap_mem_peak']
2701 sql += " mem_stack_peak = %g, " % entry['stack_mem_peak']
2702 sql += " events = %d " % self.intcast(entry['events'])
2703 sql += " WHERE dataset.dataset_id = %d " % entry['dataset_id']
2704 sql += " AND dataset.status = 'PROCESSING' "
2705 cursor.execute(sql)
2706
2707 if self.intcast(entry['jobs_suspended']) + self.intcast(entry['jobs_failed']) > self.maxsuspend:
2708 sql = " UPDATE grid_statistics "
2709 sql += " SET suspend = 1 "
2710 sql += " WHERE suspend = 0 "
2711 sql += " AND dataset_id = %d " % entry['dataset_id']
2712 sql += " AND grid_id = %d " % grid_id
2713 sql += " AND debug = 1"
2714 cursor.execute(sql)
2715
2716 self.commit()
2717 return finished_sets
2718
2719
2720 - def GetGridId(self,grid_name,institution=None,batchsys=None,url=None):
2721 """
2722 Retrieve the key for grid_name
2723 """
2724 ver = iceprod.__version__
2725 if not self.isconnected(): self.connect()
2726 cursor = self.getcursor()
2727 sql = " SELECT grid_id FROM grid WHERE name='%s' " % grid_name
2728 cursor.execute(sql)
2729 result = cursor.fetchone()
2730 if result:
2731 grid_id = result['grid_id']
2732 else:
2733 sql = " INSERT IGNORE INTO grid (name,institution,batchsys,version) "
2734 sql += " VALUES ('%s','%s','%s','%s') " % (grid_name,institution,batchsys,ver)
2735 cursor.execute(sql)
2736 grid_id = self.insert_id()
2737
2738 if institution and batchsys:
2739 sql = " UPDATE grid SET "
2740 sql += " institution = '%s', " % institution
2741 sql += " batchsys = '%s', " % batchsys
2742 sql += " version = '%s' " % ver
2743 sql += " WHERE grid_id=%d " % grid_id
2744 cursor.execute(sql)
2745 self.commit()
2746
2747 return grid_id
2748
2749 - def RegisterServer(self,
2750 grid_id,
2751 server_name,
2752 server_status,
2753 server_pid):
2754 """
2755 Retrieve the key for grid_name
2756 """
2757 if not self.isconnected(): self.connect()
2758 cursor = self.getcursor()
2759
2760 sql = " UPDATE grid SET "
2761 sql += " %s='%s', " % (server_name,server_status)
2762 sql += " %s_pid=%d " % (server_name,server_pid)
2763 sql += " WHERE grid_id=%d " % grid_id
2764 cursor.execute(sql)
2765 self.commit()
2766
2768 """
2769 Get status changes for daemons
2770 """
2771 if not self.isconnected(): self.connect()
2772 cursor = self.getcursor()
2773
2774 sql = " SELECT * FROM grid "
2775 sql += " WHERE grid_id=%d " % grid_id
2776 cursor.execute(sql)
2777 result = cursor.fetchone()
2778
2779 sql = " UPDATE grid SET lastupdate=NOW() "
2780 sql += " WHERE grid_id=%d " % grid_id
2781 cursor.execute(sql)
2782
2783 self.commit()
2784 return result
2785
2787 """
2788 Change status of daemons
2789 """
2790 cursor = self.getcursor()
2791
2792 sql = " UPDATE grid SET "
2793 sql += " %s = 'STOPREQUEST' " % daemon
2794 sql += " WHERE %s ='RUNNING' " % daemon
2795 if type(grid) is types.IntType:
2796 sql += " AND grid_id=%u " % grid
2797 elif grid not in ('any','*','all'):
2798 sql += " AND name='%s' " % grid
2799 cursor.execute(sql)
2800 self.commit()
2801
2803 """
2804 Change status of daemons
2805 """
2806 cursor = self.getcursor()
2807
2808 sql = " UPDATE grid SET "
2809 sql += " %s = 'STARTREQUEST' " % daemon
2810 sql += " WHERE %s = 'STOPPED' " % daemon
2811 if type(grid) is types.IntType:
2812 sql += " AND grid_id=%u " % grid
2813 else:
2814 sql += " AND name='%s' " % grid
2815 cursor.execute(sql)
2816 self.commit()
2817
2819 """
2820 Get parameters for given dataset
2821 """
2822 cursor = self.getcursor()
2823 sql = " SELECT plus.category, plus.subcategory, "
2824 sql += " YEAR(plus.start_datetime) as year,"
2825 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source "
2826 sql += " FROM plus,dif "
2827 sql += " WHERE plus.dataset_id = %d " % dataset
2828 sql += " AND plus.dataset_id = dif.dataset_id "
2829 self.logger.debug(sql)
2830 cursor.execute(sql)
2831 return cursor.fetchall()
2832
2834 """
2835 Fetch list of jobs that have completed for given grid_id
2836 """
2837
2838 cursor = self.getcursor()
2839 passkey = self.mkkey(6,9)
2840 job_list = []
2841
2842 sql = " SELECT SUM(1) copying FROM `job` "
2843 sql += " WHERE "
2844 sql += " (status='COPYING' OR status='COPIED') "
2845 sql += " AND grid_id = %d " % grid_id
2846 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay
2847 self.logger.debug(sql)
2848 cursor.execute(sql)
2849 currently_copying = self.intcast(cursor.fetchone()['copying'])
2850 if not currently_copying: currently_copying = 0
2851
2852 sql = " UPDATE job SET "
2853 sql += " passkey='%s', " % passkey
2854 sql += " status_changed=NOW() "
2855 sql += " WHERE status='COPIED' "
2856 sql += " AND grid_id = %d " % grid_id
2857 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay
2858 sql += " LIMIT %d " % max_copy
2859 self.logger.debug(sql)
2860 cursor.execute(sql)
2861 self.commit()
2862
2863 sql = " UPDATE job SET "
2864 sql += " prev_state = status, "
2865 sql += " status = 'COPYING', "
2866 sql += " passkey='%s', " % passkey
2867 sql += " status_changed=NOW() "
2868 sql += " WHERE status='READYTOCOPY' "
2869 sql += " AND grid_id = %d " % grid_id
2870 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay
2871 sql += " LIMIT %d " % max(1,max_copy - currently_copying)
2872 self.logger.debug(sql)
2873 cursor.execute(sql)
2874 self.commit()
2875
2876 sql = " SELECT job.*, plus.category, plus.subcategory, "
2877 sql += " YEAR(plus.start_datetime) as year,"
2878 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source "
2879 sql += " FROM job,plus,dif "
2880 sql += " WHERE grid_id = %d " % grid_id
2881 sql += " AND job.dataset_id = plus.dataset_id "
2882 sql += " AND plus.dataset_id = dif.dataset_id "
2883 sql += " AND (job.status='COPYING' OR job.status='COPIED') "
2884 sql += " AND job.passkey='%s' " % passkey
2885 self.logger.debug(sql)
2886 cursor.execute(sql)
2887 result_set = cursor.fetchall()
2888 self.commit()
2889
2890 return result_set
2891
2893 """
2894 Fetch list of jobs that have completed for given grid_id
2895 """
2896
2897 cursor = self.getcursor()
2898 passkey = self.mkkey(6,9)
2899
2900
2901 sql = " UPDATE job,dataset SET "
2902 sql += " job.prev_state = job.status, "
2903 sql += " job.status='SUSPENDED', "
2904 sql += " job.status_changed=NOW() "
2905 sql += " WHERE job.status='ERROR' "
2906 sql += " AND job.grid_id = %d " % grid_id
2907 sql += " AND job.dataset_id = dataset.dataset_id "
2908 sql += " AND dataset.debug = 1 "
2909 cursor.execute(sql)
2910 self.commit()
2911
2912
2913 sql = " UPDATE job SET "
2914 sql += " prev_state = status, "
2915 sql += " status='WAITING', "
2916 sql += " status_changed=NOW() "
2917 sql += " WHERE status IN ('ERROR','RESET') "
2918 sql += " AND (grid_id =0 OR grid_id IS NULL) "
2919 cursor.execute(sql)
2920 self.commit()
2921
2922
2923 sql = " UPDATE job SET "
2924 sql += " prev_state = status, "
2925 sql += " passkey='%s' " % passkey
2926 sql += " WHERE status='RESET' "
2927 sql += " AND grid_id = %d " % grid_id
2928 sql += " LIMIT %d " % max_reset
2929 cursor.execute(sql)
2930 self.commit()
2931
2932
2933 sql = " SELECT * FROM job "
2934 sql += " WHERE grid_id = %d " % grid_id
2935 sql += " AND passkey='%s' " % passkey
2936 sql += " AND status='RESET' "
2937 sql += " LIMIT %d " % max_reset
2938 cursor.execute(sql)
2939 result_set = cursor.fetchall()
2940 self.commit()
2941
2942
2943 sql = " UPDATE job SET "
2944 sql += " prev_state = status, "
2945 sql += " status = 'CLEANING', "
2946 sql += " passkey='%s', " % passkey
2947 sql += " status_changed=NOW() "
2948 sql += " WHERE status='RESET' "
2949 sql += " AND grid_id = %d " % grid_id
2950 sql += " AND NOW() > TIMESTAMPADD(MINUTE,5,status_changed) "
2951 sql += " ORDER BY priority DESC, job_id "
2952 sql += " LIMIT %d " % max_reset
2953 cursor.execute(sql)
2954 self.commit()
2955
2956
2957 sql = " UPDATE job SET "
2958 sql += " job.prev_state = job.status, "
2959 sql += " job.status='RESET', "
2960 sql += " job.status_changed=NOW() "
2961 sql += " WHERE job.status='ERROR' "
2962 sql += " AND job.grid_id = %d " % grid_id
2963 sql += " ORDER BY job.priority DESC, job.job_id "
2964 sql += " LIMIT %d " % max_reset
2965 cursor.execute(sql)
2966 self.commit()
2967
2968 return result_set
2969
2971 """
2972 Get list of jobs currently in any active state
2973 """
2974 from iceprod.server.job import i3Job
2975 cursor = self.getcursor()
2976 job_list = []
2977 sql = " SELECT * FROM job "
2978 sql += " WHERE job.grid_id = %d " % grid_id
2979 sql += " AND job.status NOT IN "
2980 sql += "('WAITING','OK','SUSPENDED','FAILED')"
2981 cursor.execute(sql)
2982 for j in cursor.fetchall():
2983 job = i3Job()
2984 job.SetDatasetId(j['dataset_id'])
2985 job.SetProcNum(j['queue_id'])
2986 job.SetPrio(j['priority'])
2987 job.SetJobId(j['grid_queue_id'])
2988 job.AddArgOption("key",j['passkey'])
2989 job_list.append(job)
2990 self.commit()
2991 return job_list
2992
2994 """
2995 Get list of jobs currently in queued status
2996 """
2997 from iceprod.server.job import i3Job
2998 cursor = self.getcursor()
2999 job_list = []
3000 sql = " SELECT * FROM job "
3001 sql += " WHERE job.grid_id = %d " % grid_id
3002 sql += " AND job.status = 'QUEUED' "
3003 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay
3004 cursor.execute(sql)
3005 for j in cursor.fetchall():
3006 job = i3Job()
3007 job.SetDatabaseId(j['job_id'])
3008 job.SetDatasetId(j['dataset_id'])
3009 job.SetProcNum(j['queue_id'])
3010 job.SetPrio(j['priority'])
3011 job.SetJobId(j['grid_queue_id'])
3012 job.AddArgOption("key",j['passkey'])
3013 job_list.append(job)
3014 self.commit()
3015 return job_list
3016
3018 """
3019 Get list of jobs currently in queued status
3020 """
3021 from iceprod.server.job import i3Job
3022 cursor = self.getcursor()
3023 job_list = []
3024 sql = " SELECT * FROM job "
3025 sql += " WHERE job.grid_id = %d " % grid_id
3026 sql += " AND job.status = 'PROCESSING' "
3027 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay
3028 cursor.execute(sql)
3029 for j in cursor.fetchall():
3030 job = i3Job()
3031 job.SetDatasetId(j['dataset_id'])
3032 job.SetProcNum(j['queue_id'])
3033 job.SetPrio(j['priority'])
3034 job.SetJobId(j['grid_queue_id'])
3035 job.AddArgOption("key",j['passkey'])
3036 job_list.append(job)
3037 self.commit()
3038 return job_list
3039
3040
3041 - def QueueJobs(self,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0):
3042 """
3043 Reserve at most 'maxjobs' from a given dataset.
3044 Get proc ids and set their status to 'QUEUEING'
3045 """
3046 from iceprod.server.job import i3Job
3047 cursor = self.getcursor()
3048 job_list = {}
3049
3050
3051 sql = " SELECT SUM(1) AS total "
3052 sql += " FROM job "
3053 sql += " WHERE job.grid_id = %d " % grid_id
3054 sql += " AND NOT "
3055 sql += " (status='WAITING' OR status='OK' OR status='SUSPENDED' "
3056 sql += " OR status='RESET' OR status='FAILED' ) "
3057 cursor.execute(sql)
3058 result = cursor.fetchone()
3059 self.commit()
3060 if result['total'] == None:
3061 self.logger.debug('queue total returned None')
3062 total = 0
3063 else:
3064 total = int(result['total'])
3065
3066 maxjobs = maxjobs - min(maxjobs,total)
3067 self.logger.info('%d jobs are currently in the queue' % total)
3068
3069
3070 sql = " SELECT job.*, dataset.temporary_storage, "
3071 sql += " plus.category, plus.subcategory, "
3072 sql += " YEAR(plus.start_datetime) as year, "
3073 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source "
3074 sql += " FROM job,grid_statistics,dif,plus, dataset "
3075 sql += " WHERE job.dataset_id = grid_statistics.dataset_id "
3076 sql += " AND job.dataset_id = plus.dataset_id "
3077 sql += " AND plus.dataset_id = dif.dataset_id "
3078 sql += " AND job.dataset_id = dataset.dataset_id "
3079 sql += " AND grid_statistics.grid_id = %d " % grid_id
3080 sql += " AND grid_statistics.suspend != 1 "
3081
3082 if debug:
3083 sql += " AND grid_statistics.debug = 1 "
3084 sql += " AND job.queue_id = 0 "
3085 else:
3086 sql += " AND grid_statistics.debug != 1 "
3087
3088 sql += " AND job.status='WAITING' "
3089 sql += " ORDER BY priority DESC,"
3090 if fifo:
3091 sql += " dataset_id,"
3092 sql += " queue_id "
3093 sql += " LIMIT %u " % min(jobs_at_once,maxjobs)
3094
3095 self.logger.debug(sql)
3096 cursor.execute(sql)
3097 result_set = cursor.fetchall()
3098 self.commit()
3099
3100 if not len(result_set) > 0:
3101 self.logger.info("no jobs to queue at this time")
3102 return {}
3103 else:
3104 self.logger.info("reserved %d jobs" % len(result_set))
3105
3106 for item in result_set:
3107 proc = item['queue_id']
3108 target_url = item['temporary_storage']
3109 item['subdirectory'] = "%05d-%05d" % ((proc/1000)*1000,((proc+1000)/1000)*1000-1)
3110
3111 if len(job_list) >= maxjobs:
3112 return job_list
3113
3114
3115 passkey = self.mkkey(6,9)
3116 queue_id = item['queue_id']
3117 dataset_id = item['dataset_id']
3118 priority = item['priority']
3119 sql = " UPDATE job SET "
3120 sql += " job.prev_state = job.status, "
3121 sql += " job.status='QUEUEING', "
3122 sql += " job.host=NULL, "
3123 sql += " job.grid_id=%d, " % grid_id
3124 sql += " status_changed=NOW(),"
3125 sql += " passkey='%s' " % passkey
3126 sql += " WHERE job.status='WAITING' "
3127 sql += " AND job.queue_id=%d " % queue_id
3128 sql += " AND job.dataset_id=%d " % dataset_id
3129 try:
3130 cursor.execute(sql)
3131 rowcount = self._conn.affected_rows()
3132 self.commit()
3133 except Exception, e:
3134 self.logger.debug(e)
3135 continue
3136 if rowcount == 1:
3137 self.logger.debug("%d job has been marked as 'QUEUEING' " % rowcount)
3138 job = i3Job()
3139 job.SetDatabaseId(item['job_id'])
3140 job.SetDatasetId(dataset_id)
3141 job.SetProcNum(queue_id)
3142 job.SetPrio(priority)
3143 job.AddArgOption("key",passkey)
3144 if not job_list.has_key(dataset_id):
3145 job_list[dataset_id] = []
3146 job_list[dataset_id].append(job)
3147 elif rowcount == 0:
3148 self.logger.warn("someone beat me to job %d in dataset %d" % \
3149 (queue_id,dataset_id))
3150 else:
3151 raise Exception, "getjob:wrong number of rows affected"
3152
3153 return job_list
3154
3155
3157 """
3158 Insert grid_statistics entries for grids which should run this
3159 dataset.
3160 @param grids: list of grids or clusters
3161 @param dataset_id: dataset id
3162 """
3163
3164 delim = ""
3165 sql = " SELECT grid_id FROM grid WHERE "
3166 for grid in grids:
3167 sql += "%s name = '%s' " % (delim,grid)
3168 delim = "OR"
3169 cursor = self.getcursor()
3170 cursor.execute(sql)
3171 result_set = cursor.fetchall()
3172 self.commit()
3173
3174 if len(result_set) == 0:
3175 self.logger.error("could not match grid name '%s'" % ":".join(grids))
3176 return
3177
3178 delim = ""
3179 sql = " INSERT INTO grid_statistics "
3180 sql += " (grid_id,dataset_id) VALUES "
3181 for item in result_set:
3182 sql += " %s (%d,%d) " % ( delim, item['grid_id'], dataset_id)
3183 delim = ","
3184 cursor = self.getcursor()
3185 cursor.execute(sql)
3186 self.commit()
3187
3189 """
3190 Create job monitoring entries in database
3191 """
3192 cursor = self.getcursor()
3193
3194 sql = " INSERT INTO job "
3195 sql += " (queue_id,status,dataset_id,priority,status_changed) VALUES "
3196
3197 for i in range(0,maxjobs,min(stepsize,maxjobs)):
3198 comma = ""
3199 sql1 = sql
3200 jlist = range(i,i+min(stepsize,maxjobs-i))
3201 for job in jlist:
3202 sql1 += comma + " (%d,'WAITING',%d,%d,NOW()) " % ( job, dataset_id, priority )
3203 comma = ","
3204 if jlist:
3205 cursor.execute(sql1)
3206 self.commit()
3207
3208 - def jobstart(self,hostname,grid_id,dataset_id=0,queue_id=0,key=None):
3209 """
3210 Change the status of a job to indicate it is currently running
3211 @param hostname: host where job was queued from
3212 @param grid_id: ID of iceprod queue
3213 @param dataset_id: Optional dataset ID
3214 @param queue_id: Optional job ID (within dataset)
3215 @param key: temporary passkey to avoid job spoofs
3216 @return: dataset_id,nproc,procnum
3217 """
3218
3219 cursor = self.getcursor()
3220
3221 sql = " SELECT jobs_submitted "
3222 sql += " FROM dataset "
3223 sql += " WHERE dataset_id = %d " % dataset_id
3224 cursor.execute(sql)
3225 item = cursor.fetchone()
3226 self.commit()
3227 jobs_submitted = item['jobs_submitted']
3228
3229 sql = " UPDATE job SET "
3230 sql += " job.prev_state = job.status, "
3231 sql += " job.status='PROCESSING', "
3232 sql += " job.grid_id=%d, " % grid_id
3233 sql += " job.host='%s', " % hostname
3234 sql += " job.tray=0, "
3235 sql += " job.iter=0, "
3236 sql += " status_changed=NOW(), "
3237 sql += " keepalive=NOW() "
3238 sql += " WHERE "
3239 sql += " (job.status='QUEUED' OR job.status='PROCESSING' OR job.status='EVICTED')"
3240 sql += " AND job.queue_id=%d " % queue_id
3241 sql += " AND job.dataset_id=%d " % dataset_id
3242 sql += " AND job.passkey='%s' " % key
3243 rowcount = self.execute(cursor,sql)
3244 if rowcount > 0:
3245 return dataset_id,jobs_submitted,queue_id
3246 return -1,0,0
3247
3248 - def jobreset(self,dataset_id,job_id,reason=None,passkey=None):
3249 """
3250 Update status for job
3251 @param dataset_id: dataset index
3252 @param job_id: process number within dataset
3253 """
3254 cursor = self.getcursor()
3255 sql = " SELECT grid_queue_id FROM job "
3256 sql += " WHERE dataset_id=%d " % dataset_id
3257 sql += " AND queue_id=%d " % job_id
3258 if passkey:
3259 sql += " AND passkey='%s' " % passkey
3260 cursor.execute(sql)
3261 qid = cursor.fetchone()
3262
3263 sql = " UPDATE job SET "
3264 sql += " tray=0, "
3265 sql += " iter=0, "
3266 sql += " prev_state = status, "
3267 if reason:
3268 sql += " errormessage = '%s', " % reason
3269 sql += " status='RESET', "
3270 sql += " status_changed=NOW() "
3271 sql += " WHERE dataset_id=%d " % dataset_id
3272 sql += " AND queue_id=%d " % job_id
3273 if passkey:
3274 sql += " AND passkey='%s' " % passkey
3275 self.execute(cursor,sql)
3276
3277 return qid['grid_queue_id']
3278
3279 - def jobcopying(self,dataset_id,job_id,passkey=None):
3280 """
3281 Update status for job
3282 @param dataset_id: dataset index
3283 @param job_id: process number within dataset
3284 """
3285 cursor = self.getcursor()
3286 sql = " UPDATE job SET "
3287 sql += " prev_state = status, "
3288 sql += " status='COPYING', "
3289 sql += " status_changed=NOW() "
3290 sql += " WHERE dataset_id=%d " % dataset_id
3291 sql += " AND queue_id=%d " % job_id
3292 if passkey:
3293 sql += " AND passkey='%s' " % passkey
3294 self.execute(cursor,sql)
3295 return 1
3296
3297 - def jobfinalize(self,dataset_id,job_id,passkey,status='OK',clear_errors=True):
3298 """
3299 Update status for job
3300 @param dataset_id: dataset index
3301 @param job_id: process number within dataset
3302 """
3303 cursor = self.getcursor()
3304
3305 sql = " SELECT * FROM job "
3306 sql += " WHERE dataset_id=%d " % dataset_id
3307 sql += " AND queue_id=%d " % job_id
3308 cursor.execute(sql)
3309 job = cursor.fetchone()
3310
3311 sql = " UPDATE job SET "
3312 sql += " prev_state = status, "
3313 sql += " status='%s', " % status
3314 if clear_errors:
3315 sql += " errormessage = NULL, "
3316 sql += " status_changed=NOW() "
3317 sql += " WHERE dataset_id=%d " % dataset_id
3318 sql += " AND queue_id=%d " % job_id
3319 sql += " AND passkey='%s' " % passkey
3320 self.execute(cursor,sql)
3321
3322 if status == 'OK':
3323
3324 sql = " UPDATE grid_statistics SET "
3325 sql += " grid_statistics.ok = grid_statistics.ok+1 "
3326 sql += " WHERE grid_statistics.dataset_id= %u " % job['dataset_id']
3327 sql += " AND grid_statistics.grid_id= %u " % job['grid_id']
3328 self.logger.debug(sql)
3329 try:
3330 self.execute(cursor,sql)
3331 except Exception,e:
3332 self.logger.error(e)
3333 self.logger.debug("wrote stats for job %d,%d " % (dataset_id,job_id))
3334
3335 return 0,job['submitdir']
3336
3338 """
3339 Get collected dataset statistics
3340 @param dataset_id: dataset index
3341 """
3342 stats = {}
3343 cursor = self.getcursor()
3344 sql = " SELECT name,value FROM dataset_statistics_mv "
3345 sql += " WHERE dataset_id='%s' " % dataset_id
3346 cursor.execute(sql)
3347 for entry in cursor.fetchall():
3348 stats[entry['name']] = entry['value']
3349 self.commit()
3350 return stats
3351
3352 - def jobfinish(self,dataset_id,job_id,stats,key=None,mode=0):
3353 """
3354 Update monitoring for job and write statistics
3355 @param dataset_id: dataset index
3356 @param job_id: process number within dataset
3357 @param stats: dictonary of stat entries
3358 """
3359 passkey = self.mkkey(6,9)
3360 cursor = self.getcursor()
3361 sql = " UPDATE job SET "
3362 sql += " prev_state = status, "
3363 if mode == 1:
3364 sql += " status = 'COPIED', "
3365 else:
3366 sql += " status = 'READYTOCOPY', "
3367 if stats.has_key('mem_heap'):
3368 sql += " mem_heap = %s, " % stats['mem_heap']
3369 if stats.has_key('mem_heap_peak'):
3370 sql += " mem_heap_peak = %s, " % stats['mem_heap_peak']
3371 if stats.has_key('user_time'):
3372 sql += " time_user = %s, " % stats['user_time']
3373 if stats.has_key('sys_time'):
3374 sql += " time_sys = %s, " % stats['sys_time']
3375 if stats.has_key('real_time'):
3376 sql += " time_real = %s, " % stats['real_time']
3377 if stats.has_key('Triggered Events'):
3378 sql += " nevents = %s, " % stats['Triggered Events']
3379 if stats.has_key('Generated Events'):
3380 sql += " gevents = %s, " % stats['Generated Events']
3381 sql += " job.passkey='%s', " % passkey
3382 sql += " status_changed=NOW() "
3383 sql += " WHERE dataset_id=%d " % dataset_id
3384 sql += " AND queue_id=%d " % job_id
3385 sql += " AND job.passkey='%s' " % key
3386 self.logger.debug(sql)
3387 rowcount = self.execute(cursor,sql)
3388
3389 sql = " SELECT grid_id,host FROM job "
3390 sql += " WHERE queue_id=%d " % job_id
3391 sql += " AND dataset_id=%d " % dataset_id
3392 self.logger.debug(sql)
3393 cursor.execute(sql)
3394 gridinfo = cursor.fetchone()
3395 self.commit()
3396 if not gridinfo:
3397 return (rowcount+1)%2
3398
3399
3400 sql = " INSERT IGNORE INTO node_statistics "
3401 if stats.has_key('host_id'):
3402 sql += " (name,grid_id,host_id) VALUES ('%s',%d,'%s') " % (gridinfo['host'],gridinfo['grid_id'],stats['host_id'])
3403 else:
3404 sql += " (name,grid_id) VALUES ('%s',%d) " % (gridinfo['host'],gridinfo['grid_id'])
3405 self.logger.debug(sql)
3406 cursor.execute(sql)
3407 self.commit()
3408
3409 sql = " UPDATE node_statistics SET "
3410 sql += " completed = completed+1, "
3411 if stats.has_key('platform'):
3412 sql += " node_statistics.platform='%s', " % stats['platform']
3413 if stats.has_key('host_id'):
3414 sql += " node_statistics.host_id='%s', " % stats['host_id']
3415 sql += " mem_heap = mem_heap+%s, " % stats['mem_heap']
3416 sql += " mem_heap_peak = mem_heap_peak+%s, " % stats['mem_heap_peak']
3417 sql += " time_user = time_user+%s, " % stats['user_time']
3418 sql += " time_sys = time_sys+%s, " % stats['sys_time']
3419 sql += " time_real = time_real+%s " % stats['real_time']
3420 sql += " WHERE name='%s' " % gridinfo['host']
3421 sql += " AND grid_id=%d " % gridinfo['grid_id']
3422 self.logger.debug(sql)
3423 cursor.execute(sql)
3424 self.commit()
3425
3426
3427 if len(stats):
3428 cm = ""
3429 sql = " REPLACE INTO job_statistics "
3430 sql += " (queue_id,dataset_id,name,value) VALUES "
3431 for key,value in stats.items():
3432 try:
3433 value = float(value)
3434 sql += "%s (%d,%d,'%s',%f) " % (cm,job_id,dataset_id,key,value)
3435 cm = ","
3436 except: continue
3437 cursor.execute(sql)
3438
3439 sql = " UPDATE grid_statistics SET "
3440 sql += " time_real = time_real + %g, " % stats['real_time']
3441 sql += " time_sys = time_sys + %g, " % stats['sys_time']
3442 sql += " time_user = time_user + %g " % stats['user_time']
3443 sql += " WHERE grid_statistics.dataset_id= %u " % dataset_id
3444 sql += " AND grid_statistics.grid_id= %u " % gridinfo['grid_id']
3445 cursor.execute(sql)
3446
3447 self.commit()
3448
3449 return (rowcount+1)%2
3450
3451
3452
3453 - def jobsubmitted(self,dataset_id,job_id,submitdir,grid_queue_id=None):
3454 """
3455 Set the submission path of job so that it can be post processed
3456 on termination.
3457 """
3458 cursor = self.getcursor()
3459 sql = " UPDATE job SET "
3460 sql += " prev_state = status, "
3461 sql += " status='QUEUED', "
3462 if not grid_queue_id == -1:
3463 sql += " grid_queue_id='%s', " % grid_queue_id
3464 sql += " submitdir=\"%s\" " % submitdir
3465 sql += " WHERE queue_id=%d " % job_id
3466 sql += " AND dataset_id=%d " % dataset_id
3467 self.logger.debug(sql)
3468 self.execute(cursor,sql)
3469 return 1
3470
3471 - def jobping(self,dataset_id,job_id,host,key=None,tray=0,iter=0):
3472 """
3473 Update status_changed time for job
3474 """
3475 cursor = self.getcursor()
3476 sql = " SELECT status from job "
3477 sql += " WHERE queue_id=%d " % job_id
3478 sql += " AND dataset_id=%d " % dataset_id
3479 sql += " AND passkey='%s' " % key
3480 self.logger.debug(sql)
3481 cursor.execute(sql)
3482 row = cursor.fetchone();
3483 if not (row and row['status'] == 'PROCESSING'):
3484 return False
3485
3486 sql = " UPDATE job SET "
3487 sql += " tray=%d, " % tray
3488 sql += " iter=%d, " % iter
3489 sql += " keepalive=NOW() "
3490 sql += " WHERE queue_id=%d " % job_id
3491 sql += " AND dataset_id=%d " % dataset_id
3492 sql += " AND status='PROCESSING' "
3493 sql += " AND passkey='%s' " % key
3494 self.logger.debug(sql)
3495 self.execute(cursor,sql)
3496 return True
3497
3498 - def jobabort(self,job_id,dataset_id,error,errormessage='',key=None,stats={}):
3499 """
3500 Reset any pending jobs to they get reprocesses.
3501 This would typically be run at startup in case the daemon
3502 crashed previously.
3503 @todo: update node statistics
3504 """
3505 cursor = self.getcursor()
3506
3507 sql = " SELECT debug FROM dataset WHERE dataset_id = %d " % dataset_id
3508 self.logger.debug(sql)
3509 cursor.execute(sql)
3510 debug = cursor.fetchone()['debug']
3511
3512 sql = " SELECT * FROM job "
3513 sql += " WHERE dataset_id = %u AND queue_id = %u " % (dataset_id,job_id)
3514 cursor.execute(sql)
3515 job = cursor.fetchone()
3516
3517 sql = " UPDATE job SET "
3518 sql += " prev_state = status, "
3519 sql += " errormessage=CONCAT(NOW(),QUOTE('%s')), " % self.defang(errormessage)
3520 sql += " status_changed=NOW(), "
3521
3522 if error == 1:
3523 sql += " evictions=evictions+1, "
3524 sql += " status='EVICTED', "
3525 if error == 2:
3526 sql += " failures=failures+1, "
3527 if debug:
3528 sql += " status='SUSPENDED', "
3529 else:
3530 sql += " status='ERROR', "
3531 sql += " nevents=0 "
3532 sql += " WHERE queue_id=%d " % job['queue_id']
3533 sql += " AND dataset_id=%d " % job['dataset_id']
3534 if key:
3535 sql += " AND passkey='%s' " % key
3536 self.logger.debug(sql)
3537 self.execute(cursor,sql)
3538 self.commit()
3539
3540 sql = " UPDATE node_statistics,job SET "
3541 if stats.has_key('platform'):
3542 sql += " node_statistics.platform='%s', " % stats['platform']
3543 if error == 1:
3544 sql += " node_statistics.evictions=node_statistics.evictions+1 "
3545 else:
3546 sql += " node_statistics.failures = node_statistics.failures+1 "
3547
3548 sql += " WHERE node_statistics.name=job.host "
3549 if stats.has_key('host_id'):
3550 sql += " AND node_statistics.host_id='%s' " % stats['host_id']
3551 sql += " AND node_statistics.grid_id=job.grid_id "
3552 sql += " AND job.queue_id=%d " % job_id
3553 sql += " AND job.dataset_id=%d " % dataset_id
3554 self.logger.debug(sql)
3555 try:
3556 self.execute(cursor,sql)
3557 except Exception,e:
3558 self.logger.error(e)
3559
3560
3561 sql = " UPDATE grid_statistics SET "
3562 if stats:
3563 try:
3564 sql += " time_real = time_real + %g, " % stats['real_time']
3565 sql += " time_sys = time_sys + %g, " % stats['sys_time']
3566 sql += " time_user = time_user + %g, " % stats['user_time']
3567 except: pass
3568
3569 if error == 1:
3570 sql += " grid_statistics.evictions = grid_statistics.evictions+1 "
3571 else:
3572 sql += " grid_statistics.failures = grid_statistics.failures+1 "
3573 sql += " WHERE grid_statistics.dataset_id= %u " % job['dataset_id']
3574 sql += " AND grid_statistics.grid_id= %u " % job['grid_id']
3575 self.logger.debug(sql)
3576 try:
3577 self.execute(cursor,sql)
3578 except Exception,e:
3579 self.logger.error(e)
3580
3581 self.commit()
3582
3583
3584 - def jobclean(self,dataset_id,archive=True):
3585 """
3586 Remove jobs from queueue
3587 """
3588 cursor = self.getcursor()
3589
3590 sql = " SELECT name, SUM(value) as total "
3591 sql += " FROM job_statistics "
3592 sql += " WHERE dataset_id=%d " % dataset_id
3593 sql += " GROUP BY name "
3594 cursor.execute(sql)
3595 results = cursor.fetchall()
3596 self.commit()
3597 if results:
3598
3599
3600 cm = ""
3601 sql = " REPLACE INTO dataset_statistics "
3602 sql += " (name,value,dataset_id) VALUES "
3603 for entry in results:
3604 sql += "%s ('%s',%f,%d) " % (cm,entry['name'],entry['total'],dataset_id)
3605 cm = ","
3606 cursor.execute(sql)
3607 self.commit()
3608 else:
3609 self.logger.warn("jobclean: no job statistics found for dataset %d" % dataset_id)
3610
3611 if archive:
3612
3613 sql = " INSERT INTO archived_job "
3614 sql += " SELECT job.* FROM job WHERE dataset_id = %u " % dataset_id
3615 self.logger.debug(sql)
3616 cursor.execute(sql)
3617
3618
3619 sql = " INSERT INTO archived_job_statistics "
3620 sql += " SELECT job_statistics.* FROM job_statistics WHERE dataset_id = %u " % dataset_id
3621 self.logger.debug(sql)
3622 cursor.execute(sql)
3623
3624
3625 sql = " DELETE FROM job "
3626 sql += " WHERE dataset_id=%d " % dataset_id
3627 self.logger.debug(sql)
3628 cursor.execute(sql)
3629
3630
3631 sql = " DELETE FROM job_statistics "
3632 sql += " WHERE dataset_id=%d " % dataset_id
3633 try:
3634 cursor.execute(sql)
3635 except Exception,e:
3636 self.logger.error('%s: could not delete job_statistics entry.' % str(e))
3637 self.commit()
3638
3639
3640 - def jobsuspend(self,job_id,dataset_id,suspend=True):
3641 """
3642 Reset any pending jobs to they get reprocesses.
3643 This would typically be run at startup in case the daemon
3644 crashed previously.
3645 """
3646 if suspend: status = 'SUSPENDED'
3647 else: status = 'RESET'
3648
3649 passkey = self.mkkey(6,9)
3650 cursor = self.getcursor()
3651
3652 sql = " UPDATE job SET "
3653 sql += " prev_state = status, "
3654 sql += " status='%s', " % status
3655 sql += " passkey='%s', " % passkey
3656 sql += " status_changed=NOW() "
3657 sql += " WHERE dataset_id=%d " % dataset_id
3658 sql += " AND status != 'OK' "
3659 if not job_id < 0:
3660 sql += " AND queue_id=%d " % job_id
3661 self.execute(cursor,sql)
3662
3664 """
3665 Update status of dataset
3666 """
3667 cursor = self.getcursor()
3668 sql = " UPDATE dataset "
3669 sql += " SET debug = !debug "
3670 sql += " WHERE dataset_id=%d " % dataset
3671 cursor.execute(sql)
3672 self.commit()
3673
3675 """
3676 Update status of dataset
3677 """
3678 cursor = self.getcursor()
3679 sql = " UPDATE dataset "
3680 sql += " SET status='%s' " % status
3681 sql += " WHERE dataset_id=%d " % dataset
3682 cursor.execute(sql)
3683 self.commit()
3684
3685 - def validate(self,dataset_id,status='TRUE'):
3686 """
3687 Mark dataset as visible and valid.
3688 """
3689 cursor = self.getcursor()
3690 sql = " UPDATE dataset SET verified = '%s' " % status
3691 sql += " WHERE dataset_id = %d " % dataset_id
3692 self.logger.debug(sql)
3693 cursor.execute(sql)
3694 self.commit()
3695
3697 """
3698 Change the status of a job to indicate it is currently running
3699 @param dataset_id: Dataset ID
3700 @param queue_id: Queue ID (within dataset)
3701 @param key: temporary passkey to avoid job spoofs
3702 @return: dataset_id,nproc,procnum
3703 """
3704
3705 cursor = self.getcursor()
3706
3707 sql = " SELECT jobs_submitted "
3708 sql += " FROM dataset "
3709 sql += " WHERE dataset_id = %s "
3710 cursor.execute(sql,(dataset_id,))
3711 item = cursor.fetchone()
3712
3713 jobs_submitted = item['jobs_submitted']
3714
3715 self.logger.debug("Job %d.%d starting with key %s" % (dataset_id,queue_id,key))
3716
3717
3718 sql = " UPDATE job SET "
3719 sql += " job.prev_state = job.status, "
3720 sql += " job.status='PROCESSING', "
3721
3722 sql += " status_changed=NOW(), "
3723 sql += " keepalive=NOW() "
3724 sql += " WHERE "
3725 sql += " (job.status='QUEUED' OR job.status='PROCESSING' OR job.status='EVICTED')"
3726 sql += " AND job.dataset_id=%s "
3727 sql += " AND job.queue_id=%s "
3728 sql += " AND job.passkey=%s "
3729 cursor.execute(sql,(dataset_id,queue_id,key))
3730
3731
3732 sql = " SELECT EXISTS("
3733 sql += " SELECT *"
3734 sql += " FROM job j"
3735 sql += " WHERE status = 'PROCESSING'"
3736 sql += " AND dataset_id = %s"
3737 sql += " AND queue_id = %s"
3738 sql += " AND passkey = %s"
3739 sql += ") AS found"
3740 cursor.execute(sql,(dataset_id,queue_id,key))
3741 row = cursor.fetchone()
3742 if row and int(row['found']) == 1:
3743 return (dataset_id,jobs_submitted,queue_id)
3744 return (TASK_DATASET_ERROR_ID,0,0)
3745
3747 it = self.get_iterations(dataset_id)
3748 tray,iter = 0,0
3749 if it: tray,iter = it[-1]['tray_index'],it[-1]['iterations']+1
3750 cursor = self.getcursor()
3751 sql = " UPDATE job SET "
3752 sql += " prev_state = status, "
3753 sql += " status = 'OK', "
3754 sql += " tray = %u, "
3755 sql += " iter = %u, "
3756 sql += " errormessage = NULL, "
3757 sql += " status_changed=NOW() "
3758 sql += " WHERE dataset_id=%s "
3759 sql += " AND queue_id=%s "
3760 sql += " AND passkey=%s "
3761 cursor.execute(sql,(tray,iter,dataset_id,queue_id,key))
3762 return self._conn.affected_rows()
3763
3765 cursor = self.getcursor()
3766 sql = " SELECT tray.tray_index, tray.iterations "
3767 sql += " FROM tray WHERE dataset_id = %u "
3768 cursor.execute(sql,(dataset_id,))
3769 return cursor.fetchall()
3770
3771 - def task_start(self,dataset_id,queue_id,taskname,tray,iter,hostname,key=''):
3772 cursor = self.getcursor()
3773
3774 self.logger.info("task %s (tray: %d iter: %d) starting for job %d.%d" \
3775 % (taskname,tray,iter,dataset_id,queue_id))
3776
3777
3778 sql = " SELECT job_id"
3779 sql += " FROM job j"
3780 sql += " WHERE dataset_id = %s"
3781 sql += " AND queue_id = %s"
3782 sql += " AND passkey = %s"
3783 cursor.execute(sql,(dataset_id,queue_id,key))
3784 row = cursor.fetchone()
3785 if not row:
3786 msg = "task %s tried to start for job %d.%d with wrong passkey" \
3787 % (taskname,dataset_id,queue_id)
3788 self.logger.warn(msg)
3789 return TASK_ERROR_ID
3790 job_id = row['job_id']
3791
3792
3793 sql = " SELECT task_def_id"
3794 sql += " FROM task_def td"
3795 sql += " WHERE dataset_id = %s"
3796 sql += " AND name = %s"
3797 cursor.execute(sql,(dataset_id,taskname))
3798 row = cursor.fetchone()
3799 if not row:
3800 msg = "task %s not found for job %d.%d" % (taskname,dataset_id,queue_id)
3801 self.logger.error(msg)
3802 return TASK_ERROR_ID
3803 task_def_id = row['task_def_id']
3804
3805
3806 sql = " SELECT task_def_tray_id"
3807 sql += " FROM task_def_tray tdt"
3808 sql += " WHERE task_def_id = %s"
3809 sql += " AND idx = %s"
3810 sql += " AND iter = %s"
3811 cursor.execute(sql,(task_def_id,tray,iter))
3812 row = cursor.fetchone()
3813 if not row:
3814 msg = "tray %d, iter %d not found for task %s in job %d.%d" \
3815 % (tray,iter,taskname,dataset_id,queue_id)
3816 self.logger.error(msg)
3817 return TASK_ERROR_ID
3818 tdt_id = row['task_def_tray_id']
3819
3820 sql = " SELECT task_id"
3821 sql += " FROM task t, task_def_tray tdt"
3822 sql += " WHERE t.task_def_tray_id = tdt.task_def_tray_id"
3823 sql += " AND tdt.task_def_tray_id = %s"
3824 sql += " AND t.job_id = %s"
3825 cursor.execute(sql,(tdt_id,job_id))
3826 row = cursor.fetchone()
3827 if row:
3828
3829 task_id = row['task_id']
3830 sql = " UPDATE task"
3831 sql += " SET last_status = status,"
3832 sql += " status = 'STARTING',"
3833 sql += " status_changed = NOW(),"
3834 sql += " host = %s,"
3835 sql += " start = NOW(),"
3836 sql += " finish = NULL"
3837 sql += " WHERE task_id = %s"
3838 cursor.execute(sql,(hostname,task_id))
3839
3840
3841 sql = " DELETE FROM task_statistics WHERE task_id = %s"
3842 cursor.execute(sql,(task_id,))
3843 else:
3844
3845 sql = "INSERT INTO task (task_def_tray_id, job_id, "
3846 sql += " host, status, status_changed, start)"
3847 sql += " VALUES (%s, %s, %s, 'STARTING', NOW(), NOW())"
3848 cursor.execute(sql,(tdt_id,job_id,hostname))
3849 task_id = self._conn.insert_id()
3850 return task_id
3851
3853 if not cursor:
3854 cursor = self.getcursor()
3855 sql = " UPDATE task t,job j"
3856 sql += " SET t.last_status = t.status,"
3857 sql += " t.status = %s,"
3858 sql += " t.status_changed = NOW()"
3859 sql += " WHERE t.job_id = j.job_id"
3860 sql += " AND t.task_id = %s"
3861 sql += " AND j.passkey = %s"
3862 self.logger.debug("task status update starting for task %d" % task_id)
3863 cursor.execute(sql,(status,task_id,key))
3864 self.logger.debug("task status update done for task %d" % task_id)
3865 return self._conn.affected_rows()
3866
3868 cursor = self.getcursor()
3869 sql = " SELECT COUNT(*) AS expected"
3870 sql += " FROM task_def_tray tdt"
3871 sql += " WHERE tdt.task_def_id = %s"
3872 params = [td_id]
3873 add_sql = ""
3874 if tray is not None:
3875 add_sql += " AND tdt.idx = %s"
3876 params.append(tray)
3877 if iter is not None:
3878 add_sql += " AND tdt.iter = %s"
3879 params.append(iter)
3880 if add_sql:
3881 sql += add_sql
3882 cursor.execute(sql,params)
3883 row = cursor.fetchone()
3884 if not row:
3885 return False
3886 expected = int(row['expected'])
3887
3888 params.insert(1, job_id)
3889 sql = " SELECT COUNT(*) AS actual"
3890 sql += " FROM task t, task_def_tray tdt"
3891 sql += " WHERE tdt.task_def_tray_id = t.task_def_tray_id"
3892 sql += " AND tdt.task_def_id = %s"
3893 sql += " AND t.job_id = %s"
3894 sql += " AND t.status = 'OK'"
3895 if add_sql:
3896 sql += add_sql
3897
3898 cursor.execute(sql,params)
3899 row = cursor.fetchone()
3900 if not row:
3901 return False
3902 actual = int(row['actual'])
3903
3904 return expected > 0 and actual == expected
3905
3907 cursor = self.getcursor()
3908 ret = self.task_update_status(task_id,'ERROR',key,cursor)
3909 if not ret:
3910 self.logger.error("unable to update status for task %d" % task_id)
3911 return ret
3912 sql = " DELETE FROM task_statistics WHERE task_id = %s"
3913 self.logger.debug("aborting task %d" % task_id)
3914 cursor.execute(sql,(task_id,))
3915 return True
3916
3918 cursor = self.getcursor()
3919 ret = self.task_update_status(task_id,'OK',key,cursor)
3920 if not ret:
3921 self.logger.error("unable to update status for task %d" % task_id)
3922 return ret
3923
3924 sql = " UPDATE task SET finish = NOW()"
3925 sql += " WHERE task_id=%s"
3926 ret = cursor.execute(sql,(task_id,))
3927 if not ret:
3928 self.logger.error("unable to set finish time for task %d" % task_id)
3929 return ret
3930
3931 if stats:
3932 inserts = []
3933 self.logger.debug("stats: %s" % stats)
3934 for name,value in stats.iteritems():
3935 inserts.append((task_id,name,value))
3936 sql = " REPLACE INTO task_statistics (task_id,name,value) VALUES (%s,%s,%s)"
3937 cursor.executemany(sql, inserts)
3938
3939 return True
3940
3941
3943 """
3944 Reset any pending jobs to they get reprocesses.
3945 This would typically be run at startup in case the daemon
3946 crashed previously.
3947 """
3948 cursor = self.getcursor()
3949 sql = " SELECT * FROM job "
3950 sql += " WHERE grid_id=%d " % grid_id
3951 sql += " AND status='COPYING' "
3952 if delay:
3953 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay
3954 cursor.execute(sql)
3955 resuslts = cursor.fetchall();
3956 self.commit()
3957 return results
3958
3959 - def SetFileURL(self,queue_id,dataset_id,location,filename,md5sum,filesize,transfertime,key):
3960 """
3961 Add or change the global location of a file
3962 """
3963 cursor = self.getcursor()
3964
3965 sql = " SELECT job_id FROM job "
3966 sql += " WHERE dataset_id=%u " % dataset_id
3967 sql += " AND queue_id = %u " % queue_id
3968 sql += " AND passkey='%s' " % key
3969 self.logger.debug(sql)
3970 cursor.execute(sql)
3971 results = cursor.fetchone()
3972 if not results: return 0
3973
3974 sql = " INSERT IGNORE INTO urlpath "
3975 sql += " (dataset_id,queue_id,name,path,md5sum,size,transfertime) VALUES "
3976 sql += " (%u,%u,'%s','%s','%s',%f,%f)" % \
3977 (dataset_id,queue_id,filename,location,md5sum,filesize,transfertime)
3978 self.logger.debug(sql)
3979 cursor.execute(sql)
3980 if not cursor.rowcount:
3981 sql = " UPDATE urlpath SET "
3982 sql += " name ='%s', " % filename
3983 sql += " path ='%s', " % location
3984 sql += " md5sum ='%s', " % md5sum
3985 sql += " size =%f, " % filesize
3986 sql += " transfertime =%f " % transfertime
3987 sql += " WHERE dataset_id =%u " % dataset_id
3988 sql += " AND queue_id =%u " % queue_id
3989 sql += " AND name ='%s' " % filename
3990 self.logger.debug(sql)
3991 cursor.execute(sql)
3992 return 1
3993
3994
3996 """
3997 @param days: number of days to get summary from
3998 @param groupby: how to group statistics
3999 """
4000 cursor = self.getcursor()
4001 sql = " SELECT "
4002 sql += " SUM(job.status = 'OK') as ok , "
4003 sql += " SUM(job.status = 'ERROR') as error , "
4004 sql += " SUM(job.status = 'SUSPENDED') as suspended, "
4005 if groupby:
4006 sql += " job.%s, " % groupby
4007 sql += " SUM(job.time_sys) as sys_t, "
4008 sql += " SUM(job.time_user) as usr_t, "
4009 sql += " SUM(job.time_real) as real_t "
4010 sql += " FROM job "
4011 sql += " WHERE DATE_SUB(CURDATE(),INTERVAL %d DAY) " % days
4012 sql += " < job.status_changed "
4013 sql += " AND job.grid_id != 0 "
4014 sql += " AND job.grid_id IS NOT NULL "
4015 if groupby:
4016 sql += " GROUP BY %s " % groupby
4017 cursor.execute(sql)
4018 return cursor.fetchall();
4019
4021 """
4022 @param days: number of days to get summary from
4023 @param groupby: how to group statistics
4024 """
4025 cursor = self.getcursor()
4026 sql = " SELECT job_statistics.name AS name, "
4027 sql += " SUM(job_statistics.value) AS value, "
4028 if groupby:
4029 sql += " job.%s, " % groupby
4030 sql += " SUM(1) AS completed "
4031 sql += " FROM job,job_statistics "
4032 sql += " WHERE DATE_SUB(CURDATE(),INTERVAL %d DAY) " % days
4033 sql += " < job.status_changed "
4034 sql += " AND job.status = 'OK' "
4035 sql += " AND job.dataset_id = job_statistics.dataset_id "
4036 sql += " AND job.queue_id = job_statistics.queue_id "
4037 sql += " AND job.grid_id IS NOT NULL "
4038 sql += " AND job.grid_id != 0 "
4039 if groupby:
4040 sql += " GROUP BY job_statistics.name,%s " % groupby
4041 else:
4042 sql += " GROUP BY job_statistics.name "
4043 cursor.execute(sql)
4044 return cursor.fetchall();
4045
4046
4048 grid_ids = {}
4049 cursor = self.getcursor()
4050 sql = " SELECT name,grid_id FROM grid "
4051 cursor.execute(sql)
4052 for item in cursor.fetchall():
4053 grid_ids[item['grid_id']] = item['name']
4054 return grid_ids
4055
4057 """
4058 @param dataset: dataset id
4059 @param job: optional job id
4060 @return: formated string with dataset/job summary
4061 """
4062 cursor = self.getcursor()
4063 if job >= 0:
4064 sql = " SELECT "
4065 sql += " dataset_id,job_id,status,grid_id,errormessage, "
4066 sql += " ( "
4067 sql += " (job.iter + SUM(IF(tray.tray_index < job.tray, tray.iterations, 0))) "
4068 sql += " /SUM(tray.iterations) "
4069 sql += " ) * 100 AS completion_percent "
4070 sql += " FROM job "
4071 sql += " WHERE dataset_id = %u " % dataset
4072 sql += " AND queue_id = %u " % job
4073 else:
4074 sql = " SELECT "
4075 sql += " SUM(job.status = 'OK') as ok , "
4076 sql += " SUM(job.status = 'ERROR') as error , "
4077 sql += " SUM(job.status = 'SUSPENDED') as suspended, "
4078 sql += " SUM(job.status = 'PROCESSING') as processing, "
4079 sql += " SUM(job.status = 'COPIED') as copied, "
4080 sql += " SUM(job.status = 'READYTOCOPY') as readytocopy, "
4081 sql += " SUM(job.status = 'FAILED') as failed, "
4082 sql += " SUM(job.status = 'QUEUED') as queued, "
4083 sql += " SUM(job.status = 'QUEUING') as queueing "
4084 sql += " FROM job "
4085 sql += " WHERE dataset_id = %u " % dataset
4086 cursor.execute(sql)
4087 return cursor.fetchall()
4088
4089
4091 """
4092 @param days: number of days to gather information from starting from today
4093 @return: formated string with production summary
4094 """
4095 div = '+'+('-'*9+'+')
4096 s ='iceprod summary for %s in the last %d days' % (self.db_,days)
4097 s += '-'*10
4098 s += '\n\n'
4099 grid_id = self.getgrid_ids()
4100 row = "| %-7s | " % "grid"
4101 gridsum = self.getsummary(days,'grid_id')
4102 for key in gridsum[0].keys():
4103 row += "%-7s | " % key[0:min(7,len(key))]
4104 div += ('-'*9+'+')
4105 s += row + '\n'
4106 s += div + '\n'
4107 for entry in gridsum:
4108 gridname = grid_id[entry['grid_id']]
4109 row = "| %-7s | " % gridname[0:min(7,len(gridname))]
4110 for key in gridsum[0].keys():
4111 row += "%-7.2g | " % entry[key]
4112 s += row + '\n'
4113 s += div + '\n'
4114 totals = self.getsummary(days)
4115 for entry in totals:
4116 row = "| %-7s | " % "Total"
4117 for key in gridsum[0].keys():
4118 if entry.has_key(key):
4119 row += "%-7.2g | " % entry[key]
4120 else:
4121 row += "%-7.2g | " % 0.
4122 s += row + '\n'
4123 s += div + '\n'
4124
4125 column_size = 10
4126 strfmt = "%%-%ds|" % column_size
4127 gfmt = "%%-%d.2g|" % column_size
4128 dictfmt = "%%%%(%%s)-%ds|" % column_size
4129 s += '\n\n'
4130 gridsum = self.getsummary_stats(days,'grid_id')
4131 newgridsum = {}
4132 grids = ["Total"]
4133 keys = {}
4134 totals = {}
4135 for entry in gridsum:
4136 grid = entry["grid_id"]
4137 name = entry["name"]
4138 gridname = grid_id[grid]
4139 gridname = gridname.strip()[0:min(column_size,len(gridname))]
4140 key = re.sub(r'[^a-z0-9_]','',entry["name"].lower())
4141 if not keys.has_key(key):
4142 keys[key] = entry["name"][0:min(column_size,len(entry["name"]))]
4143 if ("time" in key) or ("event" in key):
4144 if not newgridsum.has_key(key):
4145 newgridsum[key] = {}
4146 if gridname not in grids:
4147 grids.append(gridname)
4148 try:
4149 newgridsum[key][gridname] = "%2.2g" % float(entry["value"])
4150 if not newgridsum[key].has_key("Total"):
4151 newgridsum[key]["Total"] = 0.0
4152 newgridsum[key]["Total"] += float(entry["value"])
4153 except Exception,e:
4154 print e,key
4155 newgridsum[key][gridname] = str(entry["value"])
4156 rowhdr = "|" + "".join(map(lambda x: strfmt % x, [" "] + grids)) + "\n"
4157 div = "+" + ('-'*column_size+'+')*(len(grids)+1) + "\n"
4158 s += div + rowhdr
4159 for key in newgridsum.keys():
4160 entry = newgridsum[key]
4161 rowfmt = "|" + strfmt % keys[key] + "".join(map(lambda x: dictfmt % x, grids)) + "\n"
4162 for grid in grids:
4163 if not entry.has_key(grid): entry[grid] = "N/A"
4164 entry["Total"] = "%2.2g" % float(entry["Total"])
4165 row = rowfmt % entry
4166 s += div + row
4167 totals = self.getsummary_stats(days)
4168 s += div
4169 return s
4170
4171
4172 - def add_history(self,user,command):
4173 """
4174 Add a history item
4175 """
4176 cursor = self.getcursor()
4177 sql = " INSERT INTO history (user,cmd,time)"
4178 sql += " VALUES ('%s','%s',NOW())" % (user,command)
4179 cursor.execute(sql)
4180 self.commit()
4181
4182
4183
4185
4186 """
4187 Class paramdb uploads parsed IceTrayConfig+Metaproject structure
4188 to the parameter database
4189
4190 """
4191 logger = logging.getLogger('MySQLParamDb')
4192
4194
4195 IceProdDB.__init__(self)
4196 self.metaprojects = {}
4197 self.projects = {}
4198 self.modules = {}
4199 self.parameters = {}
4200 self.auth_function = lambda x: \
4201 self.authenticate(
4202 get('host',x.host_),
4203 get('username',x.usr_),
4204 getpass.getpass(),
4205 get('database',x.db_),True)
4206 self.auth_function = lambda x: x
4207 return
4208
4210 """
4211 Create a copy of this instance
4212 """
4213 newconn = MySQLParamDb()
4214 newconn.host_ = self.host_
4215 newconn.usr_ = self.usr_
4216 newconn.passwd_ = self.passwd_
4217 newconn.db_ = self.db_
4218 newconn._connected = False
4219 return newconn
4220
4221 - def load(self,metaproject):
4222 """
4223 load contents of metaproject tree to database
4224 @param metaproject: metaproject object
4225 """
4226 self.connect()
4227 self.load_metaproject(metaproject)
4228 self.load_projects(metaproject)
4229 self.load_mp_pivot(metaproject)
4230 self.load_project_dependencies(metaproject)
4231 self.commit()
4232 return 1
4233
4234
4236 for metaproject in self.GetMetaProjects():
4237 self.metaprojects[metaproject.GetId()]= metaproject
4238 for project in self.GetProjects(metaproject.GetId()):
4239 self.projects[project.GetId()] = project
4240 metaproject.AddProject(project.GetName(),project)
4241
4242 for dependency in self.GetProjectDependencies(
4243 project.GetId(), metaproject.GetId()):
4244 project.AddDependency(dependency)
4245
4246 for service in self.GetServices(project.GetId()):
4247 self.modules[service.GetId()] = service
4248 project.AddService(service)
4249
4250 for param in self.GetParameters(service.GetId()):
4251 service.AddParameter(param)
4252 self.parameters[param.GetId()] = param
4253
4254 for module in self.GetModules(project.GetId()):
4255 self.modules[module.GetId()] = module
4256 project.AddModule(module)
4257
4258 for param in self.GetParameters(module.GetId()):
4259 module.AddParameter(param)
4260 self.parameters[param.GetId()] = param
4261 return self.metaprojects
4262
4272
4345
4346
4348 mplist = []
4349 for m in self.fetch_metaproject_list():
4350 metaproject = MetaProject()
4351 metaproject.SetName(m['name'])
4352 metaproject.SetVersion(m['versiontxt'])
4353 metaproject.SetId(m['metaproject_id'])
4354 mplist.append(metaproject)
4355 return mplist
4356
4358 return self.GetProjectsMM(module,metaproj,'service')
4359
4361 sql = " SELECT metaproject.* FROM metaproject "
4362 sql += " WHERE metaproject.name = '%s' " % metaproj.GetName()
4363 sql += " AND metaproject.versiontxt= '%s' " % metaproj.GetVersion()
4364 cursor = self.getcursor()
4365 cursor.execute (sql.strip());
4366 mp = cursor.fetchone ();
4367 metaproj.SetId(mp['metaproject_id'])
4368 return metaproj
4369
4371
4372 sql = " SELECT project.* FROM project,module,mp_pivot,metaproject "
4373 sql += " WHERE module.project_id = project.project_id "
4374 sql += " AND mp_pivot.project_id = project.project_id "
4375 sql += " AND mp_pivot.metaproject_id = metaproject.metaproject_id "
4376 sql += " AND metaproject.name = '%s' " % metaproj.GetName()
4377 sql += " AND metaproject.versiontxt = '%s' " % metaproj.GetVersion()
4378 sql += " AND module.class = '%s' " % module.GetClass()
4379 cursor = self.getcursor()
4380 cursor.execute (sql.strip());
4381 p = cursor.fetchone ();
4382 if not p: return []
4383
4384 project = Project()
4385 project.SetName(p['name'])
4386 project.SetVersion(p['versiontxt'])
4387 project.SetId(p['project_id'])
4388 project_list = self.GetProjectDependencies(project.GetId(),metaproj.GetId())
4389 project_list.insert(0,project)
4390 return project_list
4391
4393 plist = []
4394 for p in self.fetch_project_list(metaproject_id):
4395 project = Container()
4396 project.SetName(p['name'])
4397 project.SetVersion(p['versiontxt'])
4398 project.SetId(p['project_id'])
4399 plist.append(project)
4400 return plist
4401
4403 dlist = []
4404 for d in self.fetch_project_dependencies(project_id,metaproject_id):
4405 dependency = Container()
4406 dependency.SetName(d['name'])
4407 dependency.SetVersion(d['versiontxt'])
4408 dependency.SetId(d['project_id'])
4409 dlist.append(dependency)
4410 return dlist
4411
4413 slist = []
4414 for s in self.fetch_modules_from_project_id(project_id,'service'):
4415 service = Service()
4416 service.SetName(s['name'])
4417 service.SetClass(s['class'])
4418 service.SetId(s['module_id'])
4419 slist.append(service)
4420 return slist
4421
4423 slist = []
4424 for s in self.fetch_services_for_project(name,version):
4425 service = Service()
4426 service.SetName(s['name'])
4427 service.SetClass(s['class'])
4428 service.SetId(s['module_id'])
4429 slist.append(service)
4430 return slist
4431
4432
4434 mlist = []
4435 for m in self.fetch_modules_from_project_id(project_id):
4436 module = Module()
4437 module.SetName(m['name'])
4438 module.SetClass(m['class'])
4439 module.SetId(m['module_id'])
4440 mlist.append(module)
4441 return mlist
4442
4444 mlist = []
4445 for m in self.fetch_modules_for_project(name,version):
4446 module = Module()
4447 module.SetName(m['name'])
4448 module.SetClass(m['class'])
4449 module.SetId(m['module_id'])
4450 mlist.append(module)
4451 return mlist
4452
4454 mlist = []
4455 for m in self.fetch_iceprodmodules():
4456 module = IceProdPre()
4457 module.SetName(m['name'])
4458 module.SetClass(m['class'])
4459 module.SetId(m['module_id'])
4460 mlist.append(module)
4461 return mlist
4462
4463
4465 plist = []
4466 for param in self.fetch_service_parameters(module_id):
4467 parameter = Parameter()
4468 parameter.SetName(param['name'])
4469 parameter.SetType(param['type'])
4470 pid = param['parameter_id']
4471 parameter.SetId(pid)
4472
4473 if parameter.GetType() == 'OMKeyv' :
4474 parameter.SetValue(self.select_omkey_array(pid))
4475 elif parameter.GetType() == 'OMKey' :
4476 parameter.SetValue(self.select_omkey(pid))
4477 elif parameter.GetType() in VectorTypes:
4478 parameter.SetValue(self.select_array(pid))
4479 else:
4480 parameter.SetValue(Value(param['value'],param['unit']))
4481 plist.append(parameter)
4482 return plist
4483
4484
4486 """
4487 retrive IDs for metaprojects in the database
4488 """
4489
4490 sql = "SELECT * FROM metaproject"
4491 cursor = self.getcursor()
4492 sql = re.sub('\s+',' ',sql);
4493 cursor.execute (sql);
4494 result_set = cursor.fetchall ();
4495
4496 return map(self.nonify,result_set);
4497
4498
4500 """
4501 retrive IDs for projects in the database
4502 @param metaproject_id: table_id of metaproject
4503 """
4504
4505 sql = """
4506 SELECT
4507 project.project_id,project.name,project.versiontxt
4508 FROM project,mp_pivot """
4509 if metaproject_id:
4510 sql += """ WHERE
4511 project.project_id = mp_pivot.project_id
4512 AND mp_pivot.metaproject_id = %s""" % metaproject_id
4513
4514 cursor = self.getcursor()
4515 sql = re.sub('\s+',' ',sql);
4516 cursor.execute (sql);
4517 result_set = cursor.fetchall ();
4518 return map(self.nonify,result_set);
4519
4521 """
4522 retrieve project with given id
4523 @param id: primary key of project
4524 @param table: table to query (project or metaprojects)
4525 """
4526 sql="""SELECT * FROM %s WHERE %s_id ='%d'
4527 """ % (table,table,id)
4528
4529 sql = re.sub('\s+',' ',sql)
4530 cursor = self.getcursor()
4531 cursor.execute (sql);
4532
4533 return cursor.fetchall ()
4534
4535
4537 """
4538 retrive id for project with matching name, version
4539 (there should only be one)
4540 @param pname: name of project to query
4541 @param pversion: tuple representing the major,minor,patch version
4542 @param table: table to query (project or metaprojects)
4543 """
4544 sql="""SELECT %s_id FROM %s
4545 WHERE name ='%s'
4546 AND versiontxt ='%s'
4547 """ % (table,table,pname,pversion)
4548
4549 sql = re.sub('\s+',' ',sql)
4550 cursor = self.getcursor()
4551 cursor.execute (sql);
4552 result = cursor.fetchall ()
4553 if result:
4554 return int(result[0]['%s_id' % table ])
4555 else:
4556 self.logger.warn("project \'%s\' not found" % pname)
4557 return
4558
4559
4560
4561
4563 return self.fetch_module_id(service,pid,'service')
4564
4566 """
4567 retrive id for module with matching name, and project id
4568 (there should only be one)
4569 @param module: module to query
4570 """
4571 if not pid: return None
4572 cname = module.GetClass()
4573
4574 sql =" SELECT module_id FROM module "
4575 sql +=" WHERE class ='%s' " % cname
4576 sql +=" AND project_id ='%d' """ % pid
4577
4578 sql = re.sub('\s+',' ',sql)
4579 self.logger.debug(sql)
4580 cursor = self.getcursor()
4581 cursor.execute (sql);
4582 result = cursor.fetchone()
4583 self.logger.debug(result)
4584
4585 if result:
4586 return int(result['module_id'])
4587 else:
4588 self.logger.error("%s \'%s\' not found" % (table,cname))
4589 return
4590
4592 """
4593 retrive dependencys for project
4594 @param project_id: id of project
4595 @return array of project names
4596 """
4597 dependencies = []
4598
4599 sql = " SELECT project.* "
4600 sql += " FROM project, project_depend "
4601 sql += " WHERE "
4602 sql += " project.project_id = project_depend.dependency_id "
4603 sql += " AND "
4604 sql += " project_depend.project_id = %d " % project_id
4605 sql += " AND "
4606 sql += " project_depend.metaproject_id = %d " % metaproject_id
4607
4608 cursor = self.getcursor()
4609 cursor.execute (sql.strip());
4610 result_set = cursor.fetchall ();
4611
4612 return result_set
4613
4614
4616 """
4617 retrive modules for with a given pid
4618 @param project_id: id of project
4619 """
4620 sql="""
4621 SELECT * FROM module WHERE project_id = %s
4622 AND module_type='%s' ORDER BY class
4623 """ % (project_id,table)
4624
4625 cursor = self.getcursor()
4626 sql = re.sub('\s+',' ',sql);
4627 cursor.execute (sql);
4628 result_set = cursor.fetchall ();
4629
4630 return map(self.nonify,result_set);
4631
4632
4634 """
4635 retrive modules for with a project given by
4636 name and version
4637 @param name: name of project
4638 @param version: version tuple
4639 """
4640 pid = self.fetch_project_id(name,version)
4641 if pid:
4642 return self.fetch_modules_from_project_id(pid)
4643 else:
4644 return []
4645
4647 """
4648 retrive modules for with a project given by
4649 name and version
4650 @param name: name of project
4651 @param version: version tuple
4652 """
4653 pid = self.fetch_project_id(name,version)
4654 if pid:
4655 return self.fetch_modules_from_project_id(pid,'service')
4656 else:
4657 return []
4658
4660 """
4661 retrive modules for with a project given by
4662 name and version
4663 """
4664 pid = self.fetch_project_id('iceprod',iceprod.__version__)
4665 if pid:
4666 return self.fetch_modules_from_project_id(pid,'iceprod')
4667 else:
4668 return []
4669
4671 """
4672 retrive parameters for with a service/module given by
4673 the service/module id
4674 @param module_id: primary key in modules table on db
4675 """
4676 sql="SELECT * FROM parameter WHERE module_id = %d" % module_id
4677
4678 cursor = self.getcursor()
4679 sql = re.sub('\s+',' ',sql);
4680 cursor.execute (sql);
4681 result_set = cursor.fetchall ();
4682 return map(self.nonify,result_set);
4683
4685 """
4686 retrive parameters for with a service/module given by
4687 the service/module id
4688 @param service_id: primary key in services table on db
4689 """
4690 return self.fetch_module_parameters(service_id)
4691
4693 """
4694 Load cross-references between meta-projects (1) and projects
4695 The purpose of this is provide a way simulatneously for projects to
4696 reference which meta-projects they are members of and for
4697 meta-projects to list which projects belong to them (many-to-many).
4698 @param metaproject: Metaproject object
4699 """
4700
4701 sql = """
4702 INSERT IGNORE INTO mp_pivot
4703 (metaproject_id,project_id) VALUES
4704 """
4705 cm = ''
4706 mid = metaproject.GetId()
4707 for p in metaproject.GetProjectList():
4708 sql += "%s\n(%s,%s)" % (cm,mid,p.GetId())
4709 cm = ','
4710
4711 sql = re.sub('\s+',' ',sql)
4712 self.logger.debug(sql)
4713 cursor = self.getcursor()
4714 cursor.execute (sql)
4715
4716 self.logger.debug(self.insert_id())
4717 self.logger.debug("%d mp_pivot rows were inserted" % cursor.rowcount)
4718 cursor.execute("SHOW WARNINGS")
4719 warn = cursor.fetchone()
4720 if warn: self.logger.warn(warn)
4721 return
4722
4724 """
4725 Load cross-references between projects (1) and depency projects
4726 @param project: Project object
4727 """
4728
4729 sql = " INSERT IGNORE INTO project_depend "
4730 sql += " (project_id,metaproject_id,dependency_id) "
4731 sql += " VALUES "
4732 pid = project.GetId()
4733 mpid = metaproject.GetId()
4734 cursor = self.getcursor()
4735
4736 for p in project.GetDependencies():
4737 if not p: continue
4738 self.logger.info("%s - getting project dependency: %s" % \
4739 (project.GetName(),p.GetName()))
4740 if not metaproject.GetProject(p.GetName()):
4741 self.logger.fatal('failed dependency:%s needs %s' % \
4742 (project.GetName(),p.GetName()) )
4743 os._exit(1)
4744 did = metaproject.GetProject(p.GetName()).GetId()
4745 sql2 = sql + "(%s,%s,%s)" % (pid,mpid,did)
4746 try:
4747 cursor.execute (sql2)
4748 self.logger.info(
4749 "%d project_depend rows were inserted" % cursor.rowcount)
4750 except Exception, e:
4751 self.logger.error(e)
4752 return
4753
4754
4755
4757 """
4758 Load projects to database
4759 @param metaproject: MetaProject object
4760 """
4761 for proj in metaproject.GetProjectList():
4762 self.load_project(proj)
4763 self.load_services(proj)
4764 self.load_modules(proj)
4765 return
4766
4768 """
4769 Load project dependencies to database
4770 @param metaproject: MetaProject object
4771 """
4772 for proj in metaproject.GetProjectList():
4773 self.load_dependencies(proj,metaproject)
4774 return
4775
4777 return self.load_project(metaproject,"metaproject")
4778
4779
4781 """
4782 Load project to database
4783 @param project: the project to be loaded
4784 @param table: table to which project should be loaded
4785 """
4786 pid = self.fetch_project_id(project.GetName(),project.GetVersion(),table)
4787 self.logger.debug("%s %s.%s pid is %s" % (table,project.GetName(),project.GetVersion(),pid))
4788 if not pid:
4789 sql = "INSERT INTO %s " % table
4790 sql += "(name, versiontxt,major_version,minor_version,patch_version) "
4791 sql += " VALUES "
4792
4793 ver = project.GetVersion()
4794 name = project.GetName()
4795 vt = ('00','00','00')
4796
4797
4798 legacy_ver = self.version_regex.search(ver)
4799 if legacy_ver:
4800 legacy_ver = legacy_ver.group(0).replace('V','')
4801 vt = legacy_ver.split('-')
4802 sql += "('%s','%s','%s','%s','%s')" % (name,ver,vt[0],vt[1],vt[2])
4803 self.logger.debug(sql)
4804
4805 cursor = self.getcursor()
4806 cursor.execute(sql)
4807 pid = self.insert_id()
4808 if pid:
4809 self.logger.debug("inserted id %d" % pid)
4810 else:
4811 self.logger.error(
4812 "could not load project %s to parameter database" % project.GetName())
4813
4814 project.SetId(pid)
4815 self.logger.debug("%s.GetId(): %s" % (project.GetName(),project.GetId()))
4816 return pid
4817
4818
4820 """
4821 Load modules into the database.
4822 @param project: Project object
4823 """
4824 for module in project.GetModules():
4825 self.load_module(module,project)
4826 self.load_params(module)
4827 return
4828
4830 """
4831 Load services into the database.
4832 @param project: Project object
4833 """
4834 for service in project.GetServices():
4835 self.load_service(service,project)
4836 self.load_params(service)
4837 return
4838
4839
4841 """
4842 Load individual module to database
4843 @param module: object to load
4844 """
4845 pid = project.GetId()
4846 mid = self.fetch_module_id(module,pid,type)
4847 if not mid:
4848 sql = " INSERT INTO module "
4849 sql += " (name,class,project_id,module_type) "
4850 sql += " VALUES "
4851 sql += " (\'%s\',\'%s\',%d,'%s') " % (module.GetName(),module.GetClass(),pid,type)
4852
4853 self.logger.debug(sql.strip())
4854
4855 cursor = self.getcursor()
4856 cursor.execute(sql.strip())
4857 mid = self.insert_id()
4858
4859 cursor.execute("SHOW WARNINGS")
4860 warn = cursor.fetchone()
4861 if warn: self.logger.warn(warn)
4862
4863 cursor.execute("SHOW ERRORS")
4864 err = cursor.fetchone()
4865 if err: self.logger.error(err)
4866
4867 rowcount = cursor.rowcount
4868 if mid:
4869 self.logger.debug("inserted module id %d" % mid)
4870 else:
4871 self.logger.error("could not fetch id for '%s'" % module.GetClass())
4872
4873 module.SetId(mid)
4874 return mid
4875
4877 """
4878 Load individual service to database
4879 @param service: object to load
4880 """
4881 return self.load_module(service,project,type='service')
4882
4884 """
4885 Load individual service to database
4886 @param module: object to load
4887 """
4888 return self.load_module(service,project,type='iceprod')
4889
4890
4892 """
4893 Load parameters for module or service to database
4894 @param module: object whose parameter will be loaded
4895 """
4896 cursor = self.getcursor()
4897 sql = " INSERT IGNORE INTO parameter "
4898 sql += " (name,type,unit,description,module_id,value) "
4899 sql += " VALUES "
4900 sql2 = ''
4901 count = 0
4902
4903 m_id = module.GetId()
4904 self.logger.debug('module_id %d'% m_id)
4905
4906 cm = ''
4907 if not module.GetParameters(): return
4908 for p in module.GetParameters():
4909 name = p.GetName()
4910 type = p.GetType()
4911 desc = p.GetDescription()
4912 desc = re.sub(r'\"','"e;',desc)
4913 desc = re.sub(r'\'','"e;',desc)
4914
4915 if type == 'OMKey' or type in VectorTypes:
4916 value = 0
4917 sql1 = sql + " ('%s','%s','%s','%s',%d,'%s') " % \
4918 (name,type,'NULL',desc,m_id,value)
4919 cursor.execute (sql1.strip())
4920 pid = self.insert_id()
4921 cursor.execute ('show warnings')
4922 retval = cursor.fetchall ()
4923 if not pid:
4924 sql1a = " SELECT parameter_id FROM parameter "
4925 sql1a += " WHERE name='%s' " % name
4926 sql1a += " AND module_id=%d " % m_id
4927 self.logger.debug(sql1a.strip())
4928 cursor.execute (sql1a.strip())
4929 pid = cursor.fetchone ()
4930 if pid: pid = pid['parameter_id']
4931 else: raise Exception,"Failed to insert/fetch parameter id"
4932
4933 count = count + cursor.rowcount
4934 if type == 'OMKey':
4935 self.insert_omkey(p.GetValue(),pid)
4936 elif type == 'OMKeyv':
4937 self.insert_omkey_array(p.GetValue(),pid)
4938 else:
4939 self.insert_array(p.GetValue(),pid)
4940
4941 else:
4942 value = p.GetValue().value
4943 unit = self.nullify(p.GetValue().unit)
4944 sql2 += "%s (\'%s\',\'%s\',%s,\"%s\",%d,\'%s\') " % \
4945 (cm,name,type,unit,desc,m_id,value)
4946 cm = ','
4947
4948 if sql2:
4949 sql += sql2
4950 self.logger.debug(sql.strip())
4951 cursor.execute(sql.strip())
4952 cursor.execute ('show warnings')
4953 retval = cursor.fetchall ()
4954 count = count + cursor.rowcount
4955 self.logger.debug("%d parameter rows were inserted " % count)
4956
4958 cursor = self.getcursor()
4959 sql = " INSERT INTO array_element (name,value,parameter_id) "
4960 sql += " VALUES "
4961 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid)
4962 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid)
4963 self.logger.debug(sql.strip())
4964 cursor.execute (sql.strip())
4965
4967 cursor = self.getcursor()
4968 sql = " INSERT INTO array_element (name,value,parameter_id) "
4969 sql += " VALUES "
4970 cm = ""
4971 if len(omkeyvect) < 1: return
4972 for omkey in omkeyvect:
4973 sql += cm
4974 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid)
4975 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid)
4976 cm = ","
4977 cursor.execute (sql.strip())
4978
4980 cursor = self.getcursor()
4981 sql = " INSERT INTO array_element (value,unit,parameter_id) "
4982 sformat = lambda x: "('%s','%s',%s)" % (x.value,self.nullify(x.unit),pid)
4983 vals = ",".join(map(sformat,values))
4984 if len(vals) > 0:
4985 sql += " VALUES " + vals
4986 cursor.execute (sql.strip())
4987
4989 cursor = self.getcursor()
4990 sql = " SELECT * from array_element "
4991 sql += " WHERE parameter_id = %d " % pid
4992 cursor.execute (sql.strip())
4993 result_set = cursor.fetchall();
4994 vect = []
4995 for item in result_set:
4996 vect.append(Value(item['value'],self.nonify(item['unit'])))
4997 return vect
4998
5000 omkeys = self.select_omkey_array(pid)
5001 if len(omkeys) < 1:
5002 raise Exception,'could not find omkey for param %d' % pid
5003 return omkeys[0]
5004
5006 cursor = self.getcursor()
5007 sql = " SELECT * from array_element "
5008 sql += " WHERE parameter_id = %d order by array_element_id" % pid
5009 cursor.execute (sql.strip())
5010 result_set = cursor.fetchall();
5011 omkeyvect = []
5012 for item in result_set:
5013 if item['name'] == 'stringid':
5014 omkey = pyOMKey(0,0)
5015 omkey.stringid = item['value']
5016 elif item['name'] == 'omid':
5017 omkey.omid = item['value']
5018 omkeyvect.append(omkey)
5019 else:
5020 raise Exception,'expected omkey but found %s' % result_set[1]['name']
5021 return omkeyvect
5022
5024
5027
5029
5030 def LoadParams(metaproject,username,passwd):
5031 db = self.paramdb.new()
5032 db.disconnect()
5033 if parent.auth_db(db,username, passwd,keep_open=True):
5034 retval = db.load(loads(metaproject))
5035 db.disconnect()
5036 logger.info("Uploaded metaproject for %s %s %s" % (username,db.db_,db.host_))
5037 return retval
5038 else:
5039 logger.error("Failed to authenticate %s on %s@%s with password" % (username,db.db_,db.host_))
5040 return 0
5041 parent.server.register_function(LoadParams)
5042
5043 def DownloadParams():
5044 return dumps(self.paramdb.download())
5045 parent.server.register_function(DownloadParams)
5046
5047 def SwitchMetaProject(iconfig,id,name,version):
5048 return dumps(self.paramdb.SwitchMetaProject(loads(iconfig),id,name,loads(version)))
5049 parent.server.register_function(SwitchMetaProject)
5050
5051 def GetMetaProjects():
5052 return dumps(self.paramdb.GetMetaProjects())
5053 parent.server.register_function(GetMetaProjects)
5054
5055 def GetProjectsSM(module,metaproj):
5056 return dumps(self.paramdb.GetProjectsSM(loads(module),loads(metaproj)))
5057 parent.server.register_function(GetProjectsSM)
5058
5059 def GetProjectsMM(module,metaproj):
5060 return dumps(self.paramdb.GetProjectsMM(loads(module),loads(metaproj)))
5061 parent.server.register_function(GetProjectsMM)
5062
5063 def GetProjects(metaproject_id):
5064 return dumps(self.paramdb.GetProjects(metaproject_id))
5065 parent.server.register_function(GetProjects)
5066
5067 def GetProjectDependencies(project_id,metaproject_id):
5068 return dumps(self.paramdb.GetProjectDependencies(project_id,metaproject_id))
5069 parent.server.register_function(GetProjectDependencies)
5070
5071 def GetServices(project_id):
5072 return dumps(self.paramdb.GetServices(project_id))
5073 parent.server.register_function(GetServices)
5074
5075 def GetServicesP(name,version):
5076 return dumps(self.paramdb.GetServicesP(name,loads(version)))
5077 parent.server.register_function(GetServicesP)
5078
5079 def GetModules(project_id):
5080 return dumps(self.paramdb.GetModules(project_id))
5081 parent.server.register_function(GetModules)
5082
5083 def GetModulesP(name,version):
5084 return dumps(self.paramdb.GetModulesP(name,loads(version)))
5085 parent.server.register_function(GetModulesP)
5086
5087 def GetIceProdModules():
5088 return dumps(self.paramdb.GetIceProdModules())
5089 parent.server.register_function(GetIceProdModules)
5090
5091 def GetParameters(module_id):
5092 return dumps(self.paramdb.GetParameters(module_id))
5093 parent.server.register_function(GetParameters)
5094
5095 def fetch_metaproject_id(name, version):
5096 return self.paramdb.fetch_metaproject_id(name,loads(version))
5097 parent.server.register_function(fetch_metaproject_id)
5098
5099 def fetch_metaproject_list():
5100 return dumps(self.paramdb.fetch_metaproject_list())
5101 parent.server.register_function(fetch_metaproject_list)
5102
5103 def fetch_project_list(metaproject_id):
5104 return dumps(self.paramdb.fetch_project_list(metaproject_id))
5105 parent.server.register_function(fetch_project_list)
5106
5107 def fetch_project(id):
5108 return dumps(self.paramdb.fetch_project(id))
5109 parent.server.register_function(fetch_project)
5110
5111 def fetch_project_id(pname,pversion):
5112 return dumps(self.paramdb.fetch_project_id(name,loads(pversion)))
5113 parent.server.register_function(fetch_project_id)
5114
5115 def fetch_service_id(service,pid):
5116 return self.paramdb.fetch_service_id(loads(service),pid)
5117 parent.server.register_function(fetch_service_id)
5118
5119 def fetch_module_id(module,mid):
5120 return self.paramdb.fetch_module_id(loads(module),mid)
5121 parent.server.register_function(fetch_module_id)
5122
5123 def fetch_project_dependencies(project_id,metaproject_id):
5124 return dumps(self.paramdb.fetch_project_dependencies(project_id,metaproject_id))
5125 parent.server.register_function(fetch_project_dependencies)
5126
5127 def fetch_modules_from_project_id(project_id):
5128 return dumps(self.paramdb.fetch_modules_from_project_id(project_id))
5129 parent.server.register_function(fetch_modules_from_project_id)
5130
5131 def fetch_modules_for_project(name,version):
5132 return dumps(self.paramdb.fetch_modules_for_project(name,loads(version)))
5133 parent.server.register_function(fetch_modules_for_project)
5134
5135 def fetch_services_for_project(name,version):
5136 return dumps(self.paramdb.fetch_services_for_project(name,loads(version)))
5137 parent.server.register_function(fetch_services_for_project)
5138
5139 def fetch_module_parameters(module_id):
5140 return dumps(self.paramdb.fetch_module_parameters(module_id))
5141 parent.server.register_function(fetch_module_parameters)
5142
5143 def fetch_service_parameters(module_id):
5144 return dumps(self.paramdb.fetch_service_parameters(module_id))
5145 parent.server.register_function(fetch_service_parameters)
5146
5147
5148 if __name__ == '__main__':
5149
5150 from xmlparser import IceTrayXMLParser
5151 from xmlwriter import IceTrayXMLWriter
5152
5153 if len(sys.argv) < 2:
5154 print 'Usage: python config.py <xml in> <xml out>'
5155 sys.exit()
5156
5157
5158 steering = Steering()
5159
5160 i3db = ConfigDB()
5161 passwd = getpass.getpass()
5162 i3db.authenticate('dbs2.icecube.wisc.edu','i3iceprod-uw',passwd,'i3iceprod',True)
5163 runid = int(sys.argv[1])
5164 i3config = i3db.download_config(runid,include_defaults=True,include_description=False)
5165 i3db.disconnect()
5166 writer = IceTrayXMLWriter(i3config)
5167 writer.write_to_file(sys.argv[2])
5168