1
2
3 """
4 Provides an interface with MySQL job monitoring database
5
6 copyright (c) 2008 the icecube collaboration
7
8 @version: $Revision: $
9 @date: $Date: $
10 @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu>
11 """
12 import re,sys
13 import getpass
14 import string
15 import random
16 import iceprod
17 import logging
18 import types
19 import copy
20 import math
21 import time
22 from cPickle import loads,dumps
23
24 from iceprod.core import logger
25 from iceprod.core.dataclasses import *
26 from iceprod.core.paramdb import *
27 from iceprod.core.metadata import *
28 from iceprod.core.constants import *
29
30 import MySQLdb
31 from MySQLdb import OperationalError
32
33 import logging
34
35
37
38 logger = logging.getLogger('IceProdDB')
39
41 """
42 Constructor
43
44 """
45 self._conn = None
46 self._connected = False
47 self._auto = False
48 self.host_ = None
49 self.usr_ = None
50 self.passwd_ = None
51 self.db_ = None
52 self.port_ = 3306
53 self.auth_function = lambda x: None
54 self.version_regex = re.compile(r'[A-Z][0-9][0-9]-[0-9][0-9]-[0-9][0-9]')
55 return
56
58 if value:
59 return "\'%s\'" % value
60 return 'NULL'
61
63 if value == 'NULL':
64 return None
65 return value
66
68 if value == 'NULL' or not value:
69 return 0
70 return int(value)
71
73 return txt.replace("\'",""").replace("\"",""")
74
75 - def get(self,name,value):
76 sys.stdout.write("%s [%s] : " % (name,value))
77 str = sys.stdin.readline().strip()
78 if str:
79 return str
80 else:
81 return value
82
84 """
85 Create a copy of this instance
86 """
87 newconn = IceProdDB()
88 newconn.host_ = self.host_
89 newconn.usr_ = self.usr_
90 newconn.passwd_ = self.passwd_
91 newconn.db_ = self.db_
92 newconn._connected = False
93 return newconn
94
96 """
97 Ping server to reactivate connection
98 """
99 if not self.isconnected():
100 time.sleep(50)
101 raise OperationalError,"Not connected to database"
102 try:
103 self._conn.ping()
104 except OperationalError,e:
105 self.logger.error('%s: will attempt to reconnect.' % str(e))
106 self._conn = MySQLdb.connect(self.host_,self.usr_,self.passwd_,self.db_,self.port_)
107 self._connected = True
108
110 if self.isconnected():
111 self.ping()
112 return self._conn.cursor (MySQLdb.cursors.DictCursor)
113 else:
114 self.logger.warn('Not connected to database. Attempting to reconnect..')
115 self.logger.info('delaying for 10 sec.')
116 time.sleep(10)
117 raise OperationalError,"Not connected to database"
118
120 self.logger.debug("auto-commit set to %s" % self._auto)
121 if not self._auto:
122 return self._conn.commit()
123
125 logger.debug("rolling back transaction")
126 return self._conn.rollback()
127
128 - def execute(self,cursor,sql,commit=True):
129
130 for i in range(10):
131 try:
132 cursor.execute(sql);
133 rowcount = self._conn.affected_rows()
134 if commit: self.commit()
135 return rowcount;
136 except OperationalError,e:
137 self.logger.error(e);
138 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
139 return 0
140
143
145 self.auth_function = func
146
147 - def authenticate(self,host,usr,passwd,db,keep_open=False,port=3306):
148 """
149 Database authentication
150 @param host: ip or name of MySQL host
151 @param usr: username
152 @param passwd: account password
153 @param db: name of database
154 @param keep_open: don't close connection after authenticating
155 """
156 if (self.host_,self.usr_,self.passwd_,self.db_,self.port_) != (host,usr,passwd,db,port):
157 (self.host_,self.usr_,self.passwd_,self.db_,self.port_) = (host,usr,passwd,db,port)
158 self.disconnect()
159
160 try:
161 self.connect()
162 if not keep_open:
163 self.disconnect()
164 return True
165 except Exception,e:
166 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
167 self.logger.error('%s: %s' % (sys.exc_type,e))
168 self.logger.error('Authentication failed: %s@%s' % (usr,host))
169 return False
170
172 """
173 Simple database authentication test
174 @param host: ip or name of MySQL host
175 @param usr: username
176 @param passwd: account password
177 @param db: name of database
178 @param keep_open: don't close connection after authenticating
179 """
180 try:
181 self._conn = MySQLdb.connect(host,usr,passwd,db,port=port)
182 self._conn.close()
183 return True
184 except Exception,e:
185 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
186 self.logger.error('%s: %s' % (sys.exc_type,e))
187 self.logger.error('Authentication failed: %s@%s' % (usr,host))
188 return False
189
191 """ Set auto-commit """
192 self._auto = autocommit
193 try:
194 self._conn.autocommit(self._auto)
195 except Exception,e:
196 self.logger.warn(e)
197
199 """ Connect to database """
200 if self.isconnected():
201 try:
202 self.ping()
203 return
204 except Exception ,e:
205 self.logger.error(str(e))
206 self._connected = False
207 try:
208 self._conn = MySQLdb.connect(self.host_,self.usr_,self.passwd_,self.db_,self.port_)
209 self._connected = True
210 except Exception,e:
211 self.logger.debug('%s: %s' % (sys.exc_type,e))
212 self.logger.error('Connection failed : %s@%s' % (self.usr_,self.host_))
213 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
214 raise sys.exc_type, e
215
217 """ Disconnect from database """
218 if self._connected:
219 self._conn.close()
220 self._connected = False
221
223 return self._conn and self._connected
224
225
226 - def mkkey(self,minsize,maxsize):
227 """
228 Generate random alphanumeric sequence
229 """
230 key = ''
231 seq = ['a','b','c','d','e','f','g','h','i','j',
232 'k','l','m','n','o','p','q','r','s','t',
233 'u','v','w','x','y','z']
234 seq += map(string.upper,seq)
235 seq += range(0,9)
236 r = random.Random()
237 size = r.randint(minsize,maxsize)
238 for i in range(0,size):
239 key += str(r.choice(seq))
240 return key
241
242
264
265
267
268 logger = logging.getLogger('ConfigDB')
269
271 """
272 Constructor
273 """
274 IceProdDB.__init__(self)
275 self.submitter = ''
276 self.temporary_storage = None
277 self.global_storage = None
278 self.metaproject_dict = { }
279 self.institution = ''
280 return
281
283 """
284 Create a copy of this instance
285 """
286 newconn = ConfigDB()
287 newconn.host_ = self.host_
288 newconn.usr_ = self.usr_
289 newconn.passwd_ = self.passwd_
290 newconn.db_ = self.db_
291 newconn.port_ = self.port_
292 newconn._connected = False
293 return newconn
294
296 self.submitter = submitter
297
299 self.institution = institution
300
302 """
303 Set the temporary path for dataset.
304 @param path:
305 """
306 self.temporary_storage = path
307
309 """
310 Set the global path in datawarehouse for dataset.
311 @param path:
312 """
313 self.global_storage = path
314
315
317 """
318 update icetray configuration to database
319 @param dataset_id: primary key in production database
320 @param param_dict: Dictionary of parameters to update
321 """
322 try:
323 sql = """
324 SELECT * from dataset
325 WHERE dataset_id = %d """ % dataset_id
326
327 cursor = self.getcursor()
328 cursor.execute(sql)
329 result_set = cursor.fetchall();
330
331 r = result_set[0];
332 if r["jobs_completed"] == r['jobs_submitted']:
333 param_dict["status"] = 'COMPLETE'
334 self.logger.info("status %d" % param_dict['status'])
335
336 elif r["jobs_failed"] > 0 :
337 param_dict["status"] = 'ERRORS'
338 self.logger.info("status %d" % param_dict['status'])
339
340 except Exception, e:
341 self.logger.error(" %s could not fetch dataset %d" % (e,dataset_id))
342
343 try:
344 sql = """UPDATE dataset SET """
345
346 for key in param_dict.keys():
347 sql += " %s=%s, " % (key,param_dict[key])
348 sql += " enddate=NOW() "
349 sql += " WHERE dataset_id = %d" % dataset_id
350
351 self.logger.debug(sql)
352 cursor = self.getcursor()
353 cursor.execute(sql)
354 self.commit()
355
356 except Exception, e:
357 self.logger.error(str(e) + " rolling back transaction" )
358 self._conn.rollback()
359 raise Exception, e
360
361
363 """
364 icetray configuration to database
365 @param steering: IceTrayConfig object containing configuration
366 @param ticket: optional ticket ID to relate dataset to
367 @param template: Whether this is a template or not
368 @return: primary key for run on config db
369 """
370
371
372
373 dataset_id = None
374
375 try:
376 debug = steering.GetParameter('DEBUG')
377 geo = steering.GetParameter('geometry')
378 if debug:
379 debug = int(debug.GetValue())
380 else:
381 debug = 0
382
383 simcat_id = self.loadsimcat(steering)
384 parent_id = steering.GetParentId()
385 status = 'PROCESSING'
386 if template:
387 status = 'TEMPLATE'
388
389 sql = """INSERT IGNORE INTO dataset
390 (
391 simcat_id,
392 startdate,
393 username,
394 institution,
395 description,
396 status,
397 temporary_storage,
398 global_storage,
399 jobs_submitted ,
400 ticket_number,
401 parent_id,
402 debug,
403 dataset_category
404 )
405 VALUES """
406
407 desc = steering.GetDescription()
408 sql += """(
409 %d,
410 NOW(),
411 \'%s\',
412 \'%s\',
413 \'%s\',
414 \'%s\',
415 \'%s\',
416 \'%s\',
417 %d,
418 %d,
419 %d,
420 %d,
421 \'%s\')""" % \
422 (
423 simcat_id,
424 self.submitter,
425 self.institution,
426 re.sub('\'','\\\' ',desc),
427 status,
428 self.temporary_storage,
429 self.global_storage,
430 int(maxjobs),
431 ticket,
432 parent_id,
433 debug,
434 steering.GetDatasetType()
435 )
436
437 sql = re.sub('\s+',' ',sql)
438 cursor = self.getcursor()
439 cursor.execute(sql)
440 dataset_id = self.insert_id()
441
442 self.load_steering(dataset_id,steering)
443 self.load_steering_dependencies(dataset_id,steering)
444 self.load_job_dependencies(dataset_id,steering)
445 self.load_tasks(dataset_id,steering)
446 self.load_batch_options(dataset_id,steering)
447 self.load_externals(dataset_id,steering)
448
449 tray_index=0
450 tsql = " INSERT INTO tray "
451 tsql += " (dataset_id,tray_index,inputevents,iterations,name,python) "
452 tsql += " VALUES (%s, %s, %s, %s, %s, %s)"
453 for i3config in steering.GetTrays():
454
455 params = (dataset_id, tray_index, \
456 int(i3config.GetEvents()), \
457 int(i3config.GetIterations()),
458 i3config.GetName(),
459 i3config.GetPython() or 'NULL',
460 )
461 cursor.execute(tsql, params)
462 tray_id = self._conn.insert_id()
463
464 funcs = {'input': i3config.GetInputFiles, \
465 'output': i3config.GetOutputFiles}
466 files = []
467 for type, func in funcs.iteritems():
468 for file in func():
469 files.append((tray_id, type, file.GetName(), int(file.IsPhotonicsTable())))
470 if files:
471 fsql = " INSERT INTO tray_files"
472 fsql += " (tray_id, type, name, photonics)"
473 fsql += " VALUES (%s, %s, %s, %s)"
474 cursor.executemany(fsql, files)
475
476 self.load_projects(dataset_id,i3config,tray_index)
477 self.load_pre(dataset_id,i3config,tray_index)
478 self.load_services(dataset_id,i3config,tray_index)
479 self.load_modules(dataset_id,i3config,tray_index)
480 self.load_connections(dataset_id,i3config,tray_index)
481 self.load_post(dataset_id,i3config,tray_index)
482 tray_index+=1
483
484 if geo:
485 geo_unique = []
486 for g in geo.GetValue().replace('+',' ').replace('and',' ').replace(',',' ').split():
487 if g and g not in geo_unique:
488 geo_unique.append(g)
489 self.load_geometry(g,dataset_id)
490
491 except Exception, e:
492 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback)
493 self.logger.error(str(e) + " rolling back transaction" )
494 if not self._conn == None:
495 self._conn.rollback()
496 raise Exception, e
497
498
499 self.commit()
500 self.logger.debug("Successful load configuration to database" )
501
502 return dataset_id
503
505 """
506 Set output path for job
507 """
508 cursor = self.getcursor()
509 sql = " INSERT IGNORE INTO urlpath "
510 sql += " (dataset_id, name, path) "
511 sql += " VALUES "
512 cm = ""
513 sql1 = ""
514 for p in steering.GetParameters():
515 if p.GetName().startswith("TARGET::"):
516 sql1 += "%s (%d, '%s', '%s') " % (cm,dataset,p.GetName().replace("TARGET::",""),p.GetValue())
517 cm =","
518 if sql1:
519 cursor.execute(sql+sql1)
520
521
522
524 """
525 Get list of sim_cat names
526 """
527 cursor = self.getcursor()
528 sql = " SELECT category FROM simcat "
529 cursor.execute(sql)
530 return map(lambda x: x['category'],cursor.fetchall());
531
532
534 """
535 Get a list of new datasets.
536 """
537 cursor = self.getcursor()
538 sql = " SELECT dataset_id,jobs_submitted FROM dataset "
539 sql += " WHERE status='PROCESSING' "
540 sql += " AND verified='TRUE' "
541 self.logger.debug(sql)
542 cursor.execute(sql)
543 self.commit()
544 return cursor.fetchall();
545
547 """
548 Load simulation category
549 @return: index of category
550 """
551 self.logger.debug("loading simulation category")
552 cursor = self.getcursor()
553
554 sql = " INSERT INTO dataset_param "
555 sql += " (dataset_id,name,value) "
556 sql += " VALUES "
557 cm = ''
558 for item in paramdict.items():
559 sql += " %s(%d,'%s','%s') " % (cm,dataset,item[0],item[1])
560 cm = ','
561 cursor.execute(sql)
562 self.commit()
563
564 if paramdict.has_key('geometry'):
565 self.logger.debug("setting geometry")
566 sql = " UPDATE dataset SET geometry = '%s' " % paramdict['geometry']
567 sql += " WHERE dataset_id = %d " % dataset
568 cursor.execute(sql)
569 self.commit()
570
572 self.logger.debug("loading geometry information")
573 cursor = self.getcursor()
574 sql = " INSERT IGNORE INTO geometry "
575 sql += " (dataset_id,name) "
576 sql += " VALUES (%u,'%s') " % (dataset,geo)
577 self.logger.debug(sql)
578 cursor.execute(sql)
579
580
582 """
583 Load simulation category
584 @return: index of category
585 """
586 self.logger.debug("loading simulation category")
587 category = steering.GetCategory()
588
589 sql = """
590 SELECT simcat_id from simcat
591 WHERE category = '%s' """ % category
592
593 cursor = self.getcursor()
594 cursor.execute(sql)
595 result_set = cursor.fetchall();
596
597 if len(result_set) > 0:
598 simcat_id = result_set[0]['simcat_id']
599 else:
600
601 sql = """INSERT IGNORE INTO simcat
602 (category) VALUES ('%s')""" % category
603
604 cursor = self.getcursor()
605 cursor.execute(sql)
606 simcat_id = self.insert_id()
607
608 return simcat_id
609
611 """
612 Load external programs to run prior to icetray
613 @param dataset_id: primary key for run on config db
614 """
615 self.logger.debug("loading externals")
616 externs = steering.GetExterns()
617 if not externs:
618 return
619
620 sql = "INSERT IGNORE INTO extern ("
621 sql += " name,command,version,description,arguments, "
622 sql += " extern.infile,extern.outfile, extern.errfile, "
623 sql += " steering,steering_name, dataset_id) "
624 sql += " VALUES "
625
626 cm = ''
627 for e in externs:
628 sql += "%s\n(" % cm
629 sql += "'%s'," % e.GetName()
630 sql += "'%s'," % e.GetExec()
631 sql += "%s," % self.nullify(e.GetVersion())
632 sql += "%s," % self.nullify(e.GetDescription())
633 sql += "%s," % self.nullify(e.GetArgs())
634 sql += "%s," % self.nullify(e.GetInFile())
635 sql += "%s," % self.nullify(e.GetOutFile())
636 sql += "%s," % self.nullify(e.GetErrFile())
637 if e.GetSteering():
638 sql += "'%s'," % e.GetSteering()[0].GetText()
639 sql += "'%s'," % e.GetSteering()[0].GetName()
640 else:
641 sql += "NULL,NULL,"
642 sql += "%d)" % dataset_id
643 cm = ','
644
645 sql = re.sub('\s+',' ',sql)
646 cursor = self.getcursor()
647 self.logger.debug(sql)
648 cursor.execute(sql)
649 return
650
651
653 """
654 Load projects to database
655 @param dataset_id: primary key for run on config db
656 """
657 self.logger.debug("loading projects")
658
659
660 load_index=0
661 for mproj in i3config.GetMetaProjectList():
662 mpid = self.load_metaproject(mproj,dataset_id,tray_index,load_index)
663 load_index += 1
664
666 """
667 Load steering parameters to database
668 @param dataset_id: primary key for run on config db
669 """
670
671 sql = """INSERT IGNORE INTO steering_parameter
672 (type, name, value, description,dataset_id) VALUES """
673
674 cm = ''
675 for p in steering.GetParameters():
676 type = p.GetType()
677 name = p.GetName()
678 value = p.GetValue()
679 desc = p.GetDescription()
680 sql += "%s\n (\'%s\',\'%s\',\'%s\',\'%s\',%d)" % \
681 (cm,type,name,value,desc,dataset_id)
682 cm = ','
683
684 sql = re.sub('\s+',' ',sql)
685 cursor = self.getcursor()
686 cursor.execute(sql)
687
689 """
690 Load file dependencies in steering element to database
691 @param dataset_id: primary key for run on config db
692 """
693 dependencies = steering.GetDependencies()
694 if not dependencies:
695 return
696
697 sql = """INSERT IGNORE INTO steering_dependency
698 (filename, dataset_id) VALUES """
699
700 cm = ''
701 for dep in dependencies:
702 d = dep
703 unpack = 'false'
704 if type(dep) in types.StringTypes:
705 d = dep
706 else:
707 d = dep.GetName()
708 unpack = dep.unpack
709 sql += "%s\n (\'%s\',%d)" % (cm,d,dataset_id)
710 cm = ','
711
712 sql = re.sub('\s+',' ',sql)
713 cursor = self.getcursor()
714 cursor.execute(sql)
715 return
716
718 """
719 Load statistics to track in monitoring
720 @param dataset_id: primary key for run on config db
721 """
722 if not steering.GetStatistics():
723 return
724 cursor = self.getcursor()
725 sql = "INSERT IGNORE INTO dataset_statistics (name, value) VALUES ('%s',0.0) "
726
727 stats = map(lambda x: (x,), steering.GetStatistics())
728 cursor.executemany(sql,stats)
729 return
730
732 """
733 Load job dependencies in steering element to database
734 @param dataset_id: primary key for run on config db
735 """
736 if not steering.GetJobDependencies(): return
737
738 cursor = self.getcursor()
739 sql = 'INSERT IGNORE INTO job_dependency (dataset_id,input_dataset,input_job) VALUES (%s,%s,%s)'
740
741 inserts = []
742 for d in steering.GetJobDependencies():
743 insert = (dataset_id,d.dataset,d.job)
744 self.logger.debug(sql % insert)
745 inserts.append(insert)
746 cursor.executemany(sql, inserts)
747 return
748
750 """
751 Load tasks in steering element to database, used for
752 multi-part simulation jobs
753 @param dataset_id: primary key for run on config db
754 """
755 tasks = steering.GetTaskDefinitions()
756 if not tasks:
757 return
758
759
760 sql = "INSERT INTO task_def (dataset_id,name,reqs,opts,parallel,photonics) VALUES (%s,%s,%s,%s,%s,%s)"
761 inserts = []
762 for name,task in tasks.items():
763 reqs = task.GetRequirements()
764 opts = task.GetBatchOpts()
765 parallel = int(task.ParallelExecutionEnabled())
766 photonics = int(task.UsesPhotonics())
767 self.logger.debug(sql % (dataset_id,name,reqs,opts,parallel,photonics))
768 self.logger.debug("task %s added to DB, parallel: %s, photonics: %s, reqs: %s" \
769 % (name,parallel,photonics,reqs))
770 inserts.append((dataset_id,name,reqs,opts,parallel,photonics))
771 self.logger.debug(inserts)
772 cursor = self.getcursor()
773 cursor.executemany(sql, inserts)
774 self.logger.debug("task definitions added")
775
776
777 relationship_sql = "INSERT INTO task_def_rel" \
778 + " (parent_task_def_id,child_task_def_id)" \
779 + " VALUES (%s, %s)"
780 tray_sql = "INSERT INTO task_def_tray (task_def_id,idx,iter)" \
781 + " VALUES (%s,%s,%s)"
782 id_sql = "SELECT task_def_id FROM task_def WHERE dataset_id = %s" \
783 + " AND name = %s"
784
785
786 for name,task in tasks.items():
787 cursor.execute(id_sql, (dataset_id,name))
788 row = cursor.fetchone()
789 if not row:
790 self.logger.error("task %s didn't get inserted into DB" % \
791 name)
792 cursor.rollback()
793 return
794 task_id = row['task_def_id']
795
796
797 inserts = []
798 parents = task.GetParents()
799 for parent in parents:
800 self.logger.debug("task %s has parent %s" % (name,parent))
801 cursor.execute(id_sql, (dataset_id,parent))
802 row = cursor.fetchone()
803 if not row:
804 self.logger.error("referenced parent task %s not found in DB" % \
805 parent)
806 self.rollback()
807 return
808 parent_id = row['task_def_id']
809 inserts.append((parent_id,task_id))
810 cursor.executemany(relationship_sql, inserts)
811
812
813 inserts = []
814 trays = task.GetTrays()
815 for index,tray in trays.items():
816 for iter in tray.GetIters():
817 self.logger.debug("task %s has tray %s iter %s" \
818 % (name,index,iter))
819 inserts.append((task_id,index,iter))
820 cursor.executemany(tray_sql, inserts)
821 self.logger.debug("added all tasks")
822 self.commit()
823 return
824
826 """
827 Load batch system options from steering to database
828 @param dataset_id: primary key for run on config db
829 """
830 batchopts = steering.GetBatchOpts()
831 if not batchopts:
832 return
833
834 sql = """INSERT IGNORE INTO batch_option
835 (name, type, value, dataset_id) VALUES """
836
837 cm = ''
838 for b in batchopts:
839 name = b.GetName()
840 value = b.GetValue()
841 type = b.GetType()
842 sql += "%s\n (\'%s\',\'%s\',\'%s\',%d)" % \
843 (cm,name,type,value,dataset_id)
844 cm = ','
845
846 sql = re.sub('\s+',' ',sql)
847 cursor = self.getcursor()
848 cursor.execute(sql)
849
850
851 - def load_project(self,project,dataset_id,metaproject_id,tray_index,load_index):
852 """
853 Load project to database
854 @param project: the Project object to be loaded
855 @param dataset_id: primary key for run on config db
856 @return: primary key for projects table on config db
857
858 """
859 cursor = self.getcursor()
860 pid = self.fetch_project_id(project.GetName(), project.GetVersion())
861 self.logger.debug("%s.%s pid is %s" % (project.GetName(),project.GetVersion(),pid))
862 if not pid:
863 ver = project.GetVersion()
864 name = project.GetName()
865 sql = " INSERT IGNORE INTO project "
866 if isinstance(ver,types.StringTypes):
867 sql += "(name, versiontxt,major_version,minor_version,patch_version) "
868 sql += ' VALUES '
869
870 vt = ('00','00','00')
871
872
873 legacy_ver = self.version_regex.search(ver)
874 if legacy_ver:
875 legacy_ver = legacy_ver.group(0).replace('V','')
876 vt = legacy_ver.split('-')
877 sql += "('%s','%s','%s','%s','%s')" % (name,ver,vt[0],vt[1],vt[2])
878 self.logger.debug(sql)
879 else:
880 raise Exception, "incompatible version type: %s" % type(version)
881 cursor.execute(sql.strip())
882 pid = self.insert_id()
883 self.logger.debug("After insert: %s.%s pid is %s" % (project.GetName(),project.GetVersion(),pid))
884 if cursor.rowcount:
885 self.logger.debug("inserted id %d" % pid)
886 else:
887 self.logger.warn("could not load project %s " % name)
888 sql = " INSERT IGNORE INTO mp_pivot "
889 sql += " (project_id, metaproject_id) VALUES (%d,%d)" % (pid,metaproject_id)
890 cursor.execute(sql.strip())
891
892 project.SetId(pid)
893 sql = " INSERT INTO project_pivot "
894 sql += " (project_id, dataset_id,tray_index,load_index) "
895 sql += " VALUES (%d,%d,%d,%d)" % (pid,dataset_id,tray_index,load_index)
896 cursor.execute(sql.strip())
897 return pid
898
900 """
901 Retrieve the project_id for a given project
902 @param project_name: name of library
903 @param project_version: version string
904 """
905 sql = " SELECT project_id "
906 sql += " FROM project "
907 sql += " WHERE name = '%s' " % project_name
908 sql += " AND versiontxt = '%s' " % project_version
909
910 cursor = self.getcursor()
911 cursor.execute(sql.strip())
912
913 result_set = cursor.fetchall();
914 if result_set:
915 return result_set[0]['project_id']
916
917
918
920 """
921 Load cross-references between projects (1) and depency projects
922 @param project: Project object
923 """
924
925 sql = """
926 INSERT IGNORE INTO project_depend
927 (project_id,metaproject_id,dependency_id) VALUES
928 """
929 cm = ''
930 pid = project.GetId()
931 for p in project.GetDependencies():
932 self.logger.debug("%s - getting project dependency: %s" % \
933 (project.GetName(),p))
934 if i3config.GetProject(p):
935 did = i3config.GetProject(p).GetId()
936 sql += "%s\n(%s,%s,%s)" % (cm,pid,metaproject_id,did)
937 cm = ','
938
939 if not cm:
940 return
941
942 sql = re.sub('\s+',' ',sql)
943 cursor = self.getcursor()
944 cursor.execute (sql)
945
946 self.logger.debug(self.insert_id())
947 self.logger.debug(
948 "%d project_dependency rows were inserted" % cursor.rowcount)
949 return
950
951
953 """
954 Load metaproject to database
955 @param metaproject: the Project object to be loaded
956 @param dataset_id: primary key for run on config db
957 @return: primary key for projects table on config db
958
959 """
960 name = metaproject.GetName()
961 version = metaproject.GetVersion()
962 mpid = self.fetch_metaproject_id(name, version)
963
964 if not mpid:
965 raise Exception, "metaproject '%s-%s' not found." % (name,str(version))
966
967
968 sql = " INSERT IGNORE INTO metaproject_pivot "
969 sql += " (metaproject_id, dataset_id,tray_index,load_index) "
970 sql += " VALUES (%d,%d,%d,%d)""" % (mpid,dataset_id,tray_index,load_index)
971
972 sql = re.sub('\s+',' ',sql)
973 cursor = self.getcursor()
974 cursor.execute(sql)
975 metaproject.SetId(mpid)
976
977 project_load_index=0
978 for proj in metaproject.GetProjectList():
979 self.load_project(proj,dataset_id,mpid,tray_index,project_load_index)
980 self.logger.debug("%s.%s.GetId() = %s" %(proj.GetName(),proj.GetVersion(),proj.GetId()))
981 project_load_index+=1
982
983 return mpid
984
986 """
987 Load cross-references between modules (1) and depency projects
988 @param module: Module object
989 """
990
991 sql = """
992 INSERT IGNORE INTO module_dependency
993 (module_id,project_id) VALUES
994 """
995 cm = ''
996 for p in module.GetProjectList():
997 project = i3config.GetProject(p.GetName())
998 if project:
999 self.logger.debug("%s - getting module dependency: %s" % \
1000 (module.GetName(),project.GetName()))
1001 sql += "%s\n(%d,%d)" % (cm,module_id,project.GetId())
1002 cm = ','
1003 else:
1004 self.logger.error("project %s not found" % p.GetName())
1005
1006 if not cm:
1007 return
1008
1009 self.logger.debug(sql.strip())
1010 cursor = self.getcursor()
1011 cursor.execute (sql)
1012
1013 self.logger.debug(self.insert_id())
1014 self.logger.debug("%d module_dependency rows were inserted" % cursor.rowcount)
1015 return
1016
1017
1019 """
1020 Load module connections to database
1021 @param dataset_id: primary key for run on config db
1022 """
1023 for con in i3config.GetConnections():
1024 cid = self.load_connection(con,dataset_id,tray_index)
1025
1027 """
1028 Load connection to database
1029 @param connection: the Connection object to be loaded
1030 @param dataset_id: primary key for run on config db
1031 @return: primary key for projects table on config db
1032
1033 """
1034 sql = """INSERT IGNORE INTO `connection`
1035 (source, outbox, destination, inbox,
1036 dataset_id,tray_index) VALUES """
1037
1038
1039 source = connection.GetOutbox().GetModule()
1040 outbox = connection.GetOutbox().GetBoxName()
1041 destination = connection.GetInbox().GetModule()
1042 inbox = connection.GetInbox().GetBoxName()
1043
1044 sql += "(\'%s\',\'%s\',\'%s\',\'%s\',%d,%d)" % \
1045 (source,outbox,destination,inbox,dataset_id,tray_index)
1046 sql = re.sub('\s+',' ',sql)
1047
1048 cursor = self.getcursor()
1049 cursor.execute(sql)
1050 cid = self.insert_id()
1051 if cursor.rowcount:
1052 self.logger.debug("inserted id %d into connections table" % cid)
1053 for mesg in cursor.messages:
1054 self.logger.debug("connections: %s " % mesg)
1055 else:
1056 for mesg in cursor.messages:
1057 self.logger.error("connections: %s " % mesg)
1058 return cid
1059
1060 - def load_pre(self,dataset_id,i3config,tray_index):
1061 """
1062 Load IceProd pre modules into the database.
1063 @param dataset_id: primary key for run on config db
1064 """
1065 load_index=0
1066 for module in i3config.GetIceProdPres():
1067 self.load_module(module,dataset_id,load_index,tray_index,i3config,'iceprod','pre')
1068 self.load_params(module,dataset_id,tray_index)
1069 load_index +=1
1070
1071 - def load_post(self,dataset_id,i3config,tray_index):
1072 """
1073 Load IceProd post modules into the database.
1074 @param dataset_id: primary key for run on config db
1075 """
1076 load_index=0
1077 for module in i3config.GetIceProdPosts():
1078 self.load_module(module,dataset_id,load_index,tray_index,i3config,'iceprod','post')
1079 self.load_params(module,dataset_id,tray_index)
1080 load_index +=1
1081
1083 """
1084 Load modules into the database.
1085 @param dataset_id: primary key for run on config db
1086 """
1087 load_index=0
1088 for module in i3config.GetModules():
1089 self.load_module(module,dataset_id,load_index,tray_index,i3config)
1090 self.load_params(module,dataset_id,tray_index)
1091 load_index +=1
1092
1093
1095 """
1096 Load services into the database.
1097 @param dataset_id: primary key for run on config db
1098 """
1099 load_index=0
1100 for service in i3config.GetServices():
1101 self.load_service(service,dataset_id,load_index,tray_index,i3config)
1102 self.load_params(service,dataset_id,tray_index)
1103 load_index +=1
1104
1105
1106 - def load_module(self,module,dataset_id,load_index,tray_index,i3config,type='module',iptype='tray'):
1107 """
1108 Load individual module into the database given a run ID.
1109 @param module: the module to be loaded
1110 @param dataset_id: primary key for run on config db
1111 @param load_index: order in which module should be loaded
1112 @param tray_index: tray instance to add module to
1113 @param i3config: Steering instance
1114 @param type: module,service,iceprod
1115 @param iptype: one of tray,pre,post. Serves to distinguish pre and post modules
1116 @return: primary key for modules table on config db
1117 """
1118 cursor = self.getcursor()
1119 if type == 'iceprod':
1120 pname = 'iceprod'
1121 pver = iceprod.__version__
1122 pid = self.fetch_project_id(pname,pver)
1123 if not pid:
1124
1125 vt = ('00','00','00')
1126 legacy_ver = self.version_regex.search(pver)
1127 if legacy_ver:
1128 legacy_ver = legacy_ver.group(0).replace('V','')
1129 vt = legacy_ver.split('-')
1130
1131 sql = " INSERT INTO project "
1132 sql += " (name, versiontxt,major_version,minor_version,patch_version) "
1133 sql += " VALUES ('%s','%s','%s','%s','%s')" % (pname,pver,vt[0],vt[1],vt[2])
1134 cursor.execute(sql)
1135 pid = self.insert_id()
1136
1137 self.logger.debug("load_module: %s " % pid)
1138 else:
1139 if not module.GetProjectList():
1140 self.logger.error("module %s doesn't have project attrbute" % module.GetName())
1141 raise Exception, "module %s is missing parent project"% module.GetName()
1142 project = module.GetProjectList()[0]
1143 project = i3config.GetProject(project.GetName())
1144 pid = project.GetId()
1145 self.logger.debug("load_module: %s " % pid)
1146
1147 self.logger.debug('fectching %s module for project id %s' % (type,pid))
1148 mid = self.fetch_module_id(module,pid,type)
1149 self.logger.debug('fectched %s module with id %s' % (type,mid))
1150 if not mid:
1151 sql = " INSERT INTO module "
1152 sql += " (name,class,module_type,project_id) "
1153 sql += " VALUES (\'%s\',\'%s\',\'%s\',%d) " \
1154 % (module.GetName(),module.GetClass(),type,pid)
1155
1156 self.logger.debug(sql.strip())
1157 cursor.execute(sql.strip())
1158 mid = self.insert_id()
1159 if cursor.rowcount:
1160 self.logger.debug("inserted %s id %d" % (type,mid))
1161 else:
1162 self.logger.debug("failed to insert %s id %d" % (type,mid))
1163 self.load_dependencies(module,mid,i3config)
1164 sql = " INSERT INTO module_pivot "
1165 sql += " (module_id, name, dataset_id,tray_index,load_index,iptype) "
1166 sql += " VALUES (%d,'%s',%d,%d,%d,'%s') " % ( mid,module.GetName(),
1167 dataset_id,tray_index,load_index,iptype)
1168 self.logger.debug(sql.strip())
1169 cursor.execute(sql.strip())
1170 mpid = self.insert_id()
1171 module.SetId(mpid)
1172 return mpid
1173
1174
1175
1177 """
1178 retrive id for module with matching name, and project_id
1179 (there should only be one)
1180 @param module: module to query
1181 @param project_id: primary key of parent project
1182 @param type: ('module'|'service')
1183 """
1184 sql = " SELECT module_id FROM module "
1185 sql += " WHERE class ='%s' " % module.GetClass()
1186 sql += " AND module_type ='%s' " % type
1187 sql += " AND project_id =%d " % project_id
1188
1189 self.logger.debug(sql.strip())
1190 cursor = self.getcursor()
1191 cursor.execute (sql.strip());
1192 result = cursor.fetchone()
1193 self.logger.debug(str(result))
1194 if result:
1195 return int(result['module_id'])
1196 else:
1197 self.logger.warn("module \'%s\' not found" % module.GetClass())
1198 return
1199
1201 """
1202 retrive id for service with matching name, and project_id
1203 (there should only be one)
1204 @param service: service to query
1205 @param project_id: primary key of parent project
1206 """
1207 return self.fetch_module_id(service,project_id,'service')
1208
1209 - def load_service(self,service,dataset_id,load_index,tray_index,i3config):
1210 """
1211 Load individual service into the database given a run ID.
1212 @param service: the Service object to be loaded
1213 @param dataset_id: primary key for run on config db
1214 @return: primary key for services table on config db
1215 """
1216 return self.load_module(service,dataset_id,load_index,tray_index,i3config,type='service')
1217
1219 """
1220 Add OMKey object
1221 @param omkey: OMKeys
1222 @param pid: configured parameter id or cparameter_id
1223 """
1224 cursor = self.getcursor()
1225 sql = " INSERT INTO carray_element (name,value,cparameter_id) "
1226 sql += " VALUES "
1227 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid)
1228 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid)
1229 cursor.execute (sql.strip())
1230
1232 """
1233 Add array of OMKey objects
1234 @param omkeyvect: list of OMKeys
1235 @param pid: configured parameter id or cparameter_id
1236 """
1237 if not len(omkeyvect) > 0: return
1238 cursor = self.getcursor()
1239 sql = " INSERT INTO carray_element (name,value,cparameter_id) "
1240 sql += " VALUES "
1241 cm = ""
1242 for omkey in omkeyvect:
1243 sql += cm
1244 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid)
1245 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid)
1246 cm = ","
1247 cursor.execute (sql.strip())
1248
1250 """
1251 Add value array
1252 @param values: list of array elements
1253 @param pid: configured parameter id or cparameter_id
1254 """
1255 cursor = self.getcursor()
1256 if not len(values) > 0: return
1257 sql = " INSERT INTO carray_element (value,unit,cparameter_id) "
1258 sformat = lambda x: "('%s',%s,%s)" % (x.value,self.nullify(x.unit),pid)
1259 vals = ",".join(map(sformat,values))
1260 sql += " VALUES " + vals
1261 cursor.execute (sql.strip())
1262
1263
1265 """
1266 Load parameters into the database.
1267 @param module: whose parameters are to be loaded to database
1268 """
1269 cursor = self.getcursor()
1270 sql = " INSERT INTO cparameter "
1271 sql += " (name,type,unit,module_pivot_id,dataset_id,tray_index,value) "
1272 sql += " VALUES "
1273 count = 0
1274
1275 m_id = module.GetId()
1276 self.logger.debug('load_params: mid = %s' % m_id)
1277
1278 if not module.GetParameters():
1279 return
1280 for p in module.GetParameters():
1281 name = p.GetName()
1282 type = p.GetType()
1283 desc = p.GetDescription()
1284
1285 if type == 'OMKey' or type in VectorTypes:
1286 value = 0
1287 unit = 'NULL'
1288 else:
1289 value = p.GetValue().value
1290 unit = self.nullify(p.GetValue().unit)
1291 sql1 = sql + " ('%s','%s',%s,%d,%d,%d,'%s') " % \
1292 (name,type,unit,m_id,dataset_id,tray_index,value)
1293 self.logger.debug(sql1.strip())
1294 cursor.execute (sql1.strip())
1295 pid = self.insert_id()
1296 p.SetId(pid)
1297 count = count + cursor.rowcount
1298
1299 if type == 'OMKey':
1300 self.insert_omkey(p.GetValue(),pid)
1301 elif type == 'OMKeyv':
1302 self.insert_omkey_array(p.GetValue(),pid)
1303 elif type in VectorTypes:
1304 self.insert_array(p.GetValue(),pid)
1305
1306 self.logger.debug("%d cparameter rows were inserted" % count)
1307
1308
1310 """
1311 DOWNLOAD dataset briefs FROM database
1312 @return: resultset from database
1313 """
1314 sql = " SELECT * FROM dataset "
1315 if search_string and len(search_string):
1316 sql += " WHERE username LIKE '%%%s%%' " % search_string
1317 sql += " OR hostname LIKE '%%%s%%' " % search_string
1318 sql += " OR description LIKE '%%%s%%' " % search_string
1319 sql += " OR startdate LIKE '%%%s%%' " % search_string
1320 for token in search_string.split():
1321 try:
1322 sql += " OR dataset_id = %d " % int(token)
1323 except: pass
1324 sql += " ORDER BY dataset_id DESC "
1325
1326 cursor = self.getcursor()
1327 cursor.execute(sql)
1328 result_set = cursor.fetchall();
1329 return result_set
1330
1331
1332
1333 - def download_config(self,dataset_id, include_defaults=False,include_description=False):
1334 """
1335 DOWNLOAD icetray configuration FROM database
1336 @param dataset_id: ID of the run whose configuration we whish to download
1337 @return: IceTrayConfig object containing the IceTray configuration
1338 """
1339
1340 steering = self.download_steering(dataset_id)
1341 category = self.getsimcat(dataset_id)
1342 steering.SetCategory(category)
1343 self.download_steering_dependencies(dataset_id,steering)
1344 self.download_steering_statistics(dataset_id,steering)
1345 self.download_job_dependencies(dataset_id,steering)
1346 self.download_tasks(dataset_id,steering)
1347 self.download_batch_options(dataset_id,steering)
1348 self.download_externals(dataset_id,steering)
1349
1350
1351 sql = "SELECT * FROM dataset WHERE dataset_id = %d" % dataset_id
1352 cursor = self.getcursor()
1353 cursor.execute(sql)
1354 result = cursor.fetchone();
1355 if result:
1356 steering.SetDescription(result['description'])
1357 steering.SetParentId(result['dataset_id'])
1358
1359
1360 sql = "SELECT * FROM tray WHERE dataset_id = %d" % dataset_id
1361 cursor = self.getcursor()
1362 cursor.execute(sql)
1363 trayitems = cursor.fetchall();
1364 for tray in trayitems:
1365 i3config = IceTrayConfig()
1366
1367 tray_id = tray['tray_id']
1368 tray_index = tray['tray_index']
1369 i3config.SetEvents(tray['inputevents'])
1370 i3config.SetIterations(tray['iterations'])
1371 i3config.SetName(self.nonify(tray['name']))
1372 i3config.SetPython(self.nonify(tray['python']))
1373
1374 funcs = {'input': i3config.AddInputFile, \
1375 'output': i3config.AddOutputFile}
1376 fsql = "SELECT type, name, photonics FROM tray_files WHERE tray_id = %s"
1377 cursor.execute(fsql, (tray_id,))
1378 files = cursor.fetchall()
1379 for file in files:
1380 type = file['type'].lower()
1381 obj = IceTrayFile(file['name'], file["photonics"])
1382 func = funcs[type]
1383 func(obj)
1384
1385 self.download_metaprojects(dataset_id,tray_index,i3config)
1386 self.download_projects(dataset_id,tray_index,i3config)
1387 self.download_pre(dataset_id,tray_index,i3config,include_defaults,include_description)
1388 self.download_services(dataset_id,tray_index,i3config,include_defaults,include_description)
1389 self.download_modules(dataset_id,tray_index,i3config, include_defaults=include_defaults,include_description=include_description)
1390 self.download_connections(dataset_id,tray_index,i3config)
1391 self.download_post(dataset_id,tray_index,i3config,include_defaults,include_description)
1392 steering.AddTray(i3config)
1393
1394 self.commit()
1395 return steering
1396
1397
1399 """
1400 Download metaprojects from database
1401 Download projects from database
1402 @param dataset_id: ID of the run whose configuration we wish to download
1403 """
1404 sql = " SELECT metaproject.* "
1405 sql += " FROM metaproject,metaproject_pivot "
1406 sql += " WHERE metaproject_pivot.dataset_id = %d " % dataset_id
1407 sql += " AND metaproject.metaproject_id = "
1408 sql += " metaproject_pivot.metaproject_id "
1409 sql += " AND metaproject_pivot.tray_index = %d " % tray_index
1410 sql += " ORDER BY load_index "
1411
1412 cursor = self.getcursor()
1413 self.logger.debug(sql)
1414 cursor.execute(sql.strip())
1415 result_set = cursor.fetchall();
1416 for mp in result_set:
1417 mpname = mp['name']
1418 mpver = mp['versiontxt']
1419 mp_id = mp['metaproject_id']
1420 mproject = MetaProject()
1421 mproject.SetId(mp_id)
1422
1423 self.logger.debug("downloaded metaproject %s with id %d" % \
1424 (mproject.GetName(),mproject.GetId()))
1425
1426 mproject.SetName(mpname)
1427 mproject.SetVersion(mpver)
1428
1429
1430 i3config.AddMetaProject(mproject.GetName(),mproject)
1431 self.metaproject_dict[mp_id] = mproject
1432
1434 """
1435 Download Load external programs to run prior to icetray
1436 @param dataset_id: primary key for run on config db
1437 """
1438 self.logger.debug("downloading externals")
1439
1440 cursor = self.getcursor()
1441
1442 sql = "SELECT * FROM extern "
1443 sql += "WHERE dataset_id = %d " % dataset_id
1444 cursor.execute(sql.strip())
1445 result_set = cursor.fetchall();
1446 for e in result_set:
1447 extern = Extern()
1448 extern.SetName(self.nonify(e['name']))
1449 extern.SetVersion(self.nonify(e['version']))
1450 extern.SetExec(self.nonify(e['command']))
1451 extern.SetDescription(self.nonify(e['description']))
1452 extern.SetArgs(self.nonify(e['arguments']))
1453 extern.SetInFile(self.nonify(e['infile']))
1454 extern.SetOutFile(self.nonify(e['outfile']))
1455 extern.SetErrFile(self.nonify(e['errfile']))
1456 if self.nonify(e['steering_name']):
1457 es = ExternSteering()
1458 es.SetName(e['steering_name'])
1459 es.SetText(e['steering'])
1460 extern.AddSteering(es)
1461 steering.AddExtern(extern)
1462
1463 return
1464
1466 """
1467 Retrieve value stored in dictionary
1468 @param key: string key to dictionary entry
1469 """
1470 sql = " SELECT value FROM dictionary WHERE "
1471 sql += " keystring = '%s' " % key
1472 cursor = self.getcursor()
1473 cursor.execute(sql.strip())
1474 result_set = cursor.fetchone();
1475 if result_set:
1476 return result_set['value']
1477 else:
1478 return ''
1479
1481 """
1482 Retrieve value stored in dictionary
1483 @param key: string key to dictionary entry
1484 """
1485 sql = " SELECT * FROM file "
1486 sql += " WHERE file_number = %d " % key
1487 if dataset_id:
1488 sql += " AND dataset_id = %d " % dataset_id
1489 cursor = self.getcursor()
1490 cursor.execute(sql.strip())
1491 result_set = cursor.fetchone();
1492 if result_set:
1493 return result_set
1494 else:
1495 return ''
1496
1498 """
1499 Download projects from database
1500 @param dataset_id: ID of the run whose configuration we wish to download
1501 """
1502 projects = []
1503
1504 sql = " SELECT metaproject_pivot.metaproject_id,project.*"
1505 sql += " FROM metaproject_pivot,project,project_pivot,mp_pivot"
1506 sql += " WHERE metaproject_pivot.dataset_id = %d " % dataset_id
1507 sql += " AND project.project_id = mp_pivot.project_id "
1508 sql += " AND project.project_id = project_pivot.project_id "
1509 sql += " AND metaproject_pivot.metaproject_id = mp_pivot.metaproject_id "
1510 sql += " AND project_pivot.dataset_id = metaproject_pivot.dataset_id "
1511 sql += " ORDER BY metaproject_pivot.load_index,project_pivot.load_index "
1512 cursor = self.getcursor()
1513 self.logger.debug(sql.strip())
1514 cursor.execute(sql.strip())
1515 result_set = cursor.fetchall();
1516
1517 for p in result_set:
1518 pname = p['name']
1519 pver = p['versiontxt']
1520 mp_id = p['metaproject_id']
1521 project = Project()
1522 project.SetId(p['project_id'])
1523 self.logger.debug("downloaded project %s with id %d" % (project.GetName(),project.GetId()))
1524 project.SetName(pname)
1525 project.SetVersion(pver)
1526 projects.append(project)
1527 return projects
1528
1530 """
1531 Download projects from database
1532 @param dataset_id: ID of the run whose configuration we wish to download
1533 """
1534
1535 sql = " SELECT metaproject_pivot.metaproject_id,project.*"
1536 sql += " FROM metaproject_pivot,project,project_pivot,mp_pivot"
1537 sql += " WHERE metaproject_pivot.dataset_id = %d " % dataset_id
1538 sql += " AND project.project_id = mp_pivot.project_id "
1539 sql += " AND project.project_id = project_pivot.project_id "
1540 sql += " AND metaproject_pivot.metaproject_id = mp_pivot.metaproject_id "
1541 sql += " AND project_pivot.dataset_id = metaproject_pivot.dataset_id "
1542 sql += " AND metaproject_pivot.tray_index = project_pivot.tray_index "
1543 sql += " AND project_pivot.tray_index = %d " % tray_index
1544 sql += " ORDER BY metaproject_pivot.load_index,project_pivot.load_index "
1545 cursor = self.getcursor()
1546 self.logger.debug(sql.strip())
1547 cursor.execute(sql.strip())
1548 result_set = cursor.fetchall();
1549
1550 for p in result_set:
1551 pname = p['name']
1552 pver = p['versiontxt']
1553 mp_id = p['metaproject_id']
1554 project = Project()
1555 project.SetId(p['project_id'])
1556 self.logger.debug("downloaded project %s with id %d" % (project.GetName(),project.GetId()))
1557
1558 for d in self.fetch_project_dependencies(project.GetId(),mp_id):
1559 self.logger.debug(" %s - adding dependency %s" % (project.GetName(),d.GetName()))
1560 project.AddDependency(d.GetName())
1561 project.SetName(pname)
1562 project.SetVersion(pver)
1563 try:
1564 metaproject = self.metaproject_dict[mp_id]
1565 self.logger.debug("found metaproject %s with id %d" % (metaproject.GetName(),mp_id))
1566 metaproject.AddProject(pname,project)
1567
1568 if not i3config.HasMetaProject(metaproject.GetName()):
1569 i3config.AddMetaProject(metaproject.GetName(),metaproject)
1570 self.logger.debug("adding metaproject - %s" % metaproject.GetName())
1571
1572 except KeyError, k:
1573 self.logger.warn("could not find metaproject with id %d" % mp_id)
1574 self.logger.warn("Adding project to top-level container.")
1575 i3config.AddProject(pname,project)
1576
1577
1579 """
1580 retrive dependencies for project
1581 @param project_id: id of project
1582 @return array of project names
1583 """
1584 dependencies = []
1585
1586 sql = """
1587 SELECT
1588 project_depend.project_depend_id,
1589 project.name,
1590 project.project_id,
1591 project.versiontxt
1592 FROM
1593 project,project_depend
1594 WHERE
1595 project.project_id = project_depend.dependency_id
1596 AND
1597 project_depend.project_id = %d
1598 AND
1599 project_depend.metaproject_id = %d
1600 ORDER BY
1601 project_depend.project_depend_id
1602 """ % (project_id,metaproject_id)
1603
1604 cursor = self.getcursor()
1605 sql = re.sub('\s+',' ',sql);
1606 self.logger.debug(sql);
1607 cursor.execute (sql);
1608 result_set = cursor.fetchall ();
1609
1610 for d in result_set:
1611 dependency = Project()
1612 dependency.SetName(d['name'])
1613 dependency.SetVersion(d['versiontxt'])
1614
1615 dependency.SetId(d['project_depend_id'])
1616 dependencies.append(dependency)
1617
1618 return dependencies
1619
1620
1622 """
1623 retrive dependencies for module
1624 @param module_id: id of module
1625 @return array of project names
1626 """
1627 dependencies = []
1628
1629 sql = """
1630 SELECT
1631 module_dependency.module_dependency_id,
1632 project.name,
1633 project.project_id,
1634 project.versiontxt
1635 FROM
1636 project,module_dependency
1637 WHERE
1638 project.project_id = module_dependency.project_id
1639 AND
1640 module_dependency.module_id = %d
1641 ORDER BY
1642 module_dependency.module_dependency_id """ % module_id
1643
1644 cursor = self.getcursor()
1645 sql = re.sub('\s+',' ',sql);
1646 cursor.execute (sql);
1647 result_set = cursor.fetchall ();
1648
1649 for d in result_set:
1650 dependency = Project()
1651 dependency.SetName(d['name'])
1652 dependency.SetVersion(d['versiontxt'])
1653 dependency.SetId(d['project_id'])
1654 dependencies.append(dependency)
1655
1656 return dependencies
1657
1658
1660 """
1661 Download module connections from database
1662 @param dataset_id: ID of the run whose configuration we whish to download
1663 """
1664 sql = " SELECT * FROM `connection` "
1665 sql += " WHERE dataset_id = %d " % dataset_id
1666 sql += " AND tray_index = %d " % tray_index
1667 cursor = self.getcursor()
1668 cursor.execute(sql)
1669 result_set = cursor.fetchall();
1670 for c in result_set:
1671 csource = c['source']
1672 coutbox = c['outbox']
1673 cdest = c['destination']
1674 cinbox = c['inbox']
1675
1676 conn = Connection()
1677 conn.From(csource,coutbox)
1678 conn.To(cdest,cinbox)
1679 i3config.AddConnection(conn)
1680
1682 """
1683 Get simulation category
1684 @param dataset_id: dataset ID
1685 @return: category
1686 """
1687 self.logger.debug("retrieving simulation category")
1688
1689 sql = """
1690 SELECT simcat.category from simcat,dataset
1691 WHERE simcat.simcat_id = dataset.simcat_id
1692 AND dataset.dataset_id = %d """ % dataset_id
1693
1694 cursor = self.getcursor()
1695 cursor.execute(sql)
1696 result_set = cursor.fetchall();
1697
1698 if len(result_set) > 0:
1699 return result_set[0]['category']
1700
1702 """
1703 Get steering parameters from database
1704 @param dataset_id: ID of the run whose configuration we whish to download
1705 """
1706 steering = Steering()
1707 sql = " SELECT * FROM steering_parameter "
1708 sql += " WHERE dataset_id = '%s'" % dataset_id
1709 sql += " ORDER by name "
1710 cursor = self.getcursor()
1711 cursor.execute(sql)
1712 result_set = cursor.fetchall();
1713
1714 for p in result_set:
1715 param = Parameter()
1716 param.SetType(p['type'])
1717 param.SetName(p['name'])
1718 param.SetValue(p['value'])
1719 steering.AddParameter(param)
1720 return steering
1721
1723 sql = " SELECT value FROM steering_parameter "
1724 sql += " WHERE name = '%s'" % param
1725 sql += " AND dataset_id = '%s'" % dataset_id
1726 cursor = self.getcursor()
1727 cursor.execute(sql)
1728 result = cursor.fetchone();
1729 if result:
1730 return result['value']
1731
1733 """
1734 Get steering dependencies from database
1735 @param dataset_id: ID of the run whose configuration we whish to download
1736 """
1737 sql = "SELECT * FROM steering_dependency WHERE dataset_id = '%s'" % dataset_id
1738 cursor = self.getcursor()
1739 cursor.execute(sql)
1740 result_set = cursor.fetchall();
1741
1742 for p in result_set:
1743 steering.AddDependency(p['filename'])
1744
1746 """
1747 Get statistics that are to be tracked in database
1748 @param dataset_id: ID of the run whose configuration we whish to download
1749 """
1750 sql = "SELECT * FROM dataset_statistics WHERE dataset_id = '%s'" % dataset_id
1751 cursor = self.getcursor()
1752 cursor.execute(sql)
1753 result_set = cursor.fetchall();
1754
1755 for p in result_set:
1756 steering.AddStatistic(p['name'])
1757
1759 """
1760 Get job dependency rules
1761 @param dataset_id: ID of the run whose configuration we whish to download
1762 """
1763 sql = "SELECT * FROM job_dependency WHERE dataset_id = %s "
1764 self.logger.debug( sql % (dataset_id,) )
1765 cursor = self.getcursor()
1766 cursor.execute(sql, (dataset_id,))
1767 results = cursor.fetchall()
1768 for row in results:
1769 steering.AddJobDependency(row['input_dataset'],row['input_job'])
1770 return
1771
1773 """
1774 Get job parts from database
1775 @param dataset_id: ID of the run whose configuration we whish to download
1776 """
1777
1778 sql = "SELECT task_def_id,name,reqs,opts,parallel,photonics,grids FROM task_def" \
1779 + " WHERE dataset_id = %s ORDER BY task_def_id"
1780 cursor = self.getcursor()
1781 cursor.execute(sql, (dataset_id,))
1782 results = cursor.fetchall()
1783
1784 tray_sql = "SELECT idx,CONVERT(GROUP_CONCAT(iter),char) AS iters" \
1785 + " FROM task_def_tray WHERE task_def_id = %s" \
1786 + " GROUP BY idx,task_def_id"
1787 parent_sql = "SELECT name FROM task_def,task_def_rel" \
1788 + " WHERE child_task_def_id = %s" \
1789 + " AND parent_task_def_id = task_def_id"
1790 child_sql = "SELECT name FROM task_def,task_def_rel" \
1791 + " WHERE parent_task_def_id = %s" \
1792 + " AND child_task_def_id = task_def_id"
1793
1794 for row in results:
1795 id = row['task_def_id']
1796 name = row['name']
1797 reqs = row['reqs']
1798 opts = row['opts']
1799 parallel = row['parallel']
1800 photonics = row['photonics']
1801 grids = row['grids']
1802
1803 td = TaskDefinition(name,id)
1804 td.SetRequirements(reqs)
1805 if opts: td.SetBatchOpts(opts)
1806 td.SetParallelExecution(parallel)
1807 td.SetUsesPhotonics(photonics)
1808
1809
1810 self.logger.debug(tray_sql % id)
1811 cursor.execute(tray_sql, (id,))
1812 trays = cursor.fetchall()
1813 for tray in trays:
1814 td.AddTray(tray['idx'], tray['iters'])
1815
1816 cursor.execute(parent_sql, (id,))
1817 parents = cursor.fetchall()
1818 for parent in parents:
1819 td.AddParent(parent['name'])
1820
1821 cursor.execute(child_sql, (id,))
1822 children = cursor.fetchall()
1823 for child in children:
1824 td.AddChild(child['name'])
1825
1826 steering.AddTaskDefinition(td)
1827
1829 """
1830 Fetch batch system options from database
1831 @param dataset_id: ID of the run whose configuration we whish to download
1832 """
1833 batchopts = []
1834 sql = "SELECT * FROM batch_option WHERE dataset_id = '%s'" % dataset_id
1835 cursor = self.getcursor()
1836 cursor.execute(sql)
1837 result_set = cursor.fetchall()
1838 for b in result_set:
1839 opt = BatchOpt()
1840 opt.SetName(b['name'])
1841 opt.SetType(b['type'])
1842 opt.SetValue(b['value'])
1843 batchopts.append(opt)
1844 steering.AddBatchOpt(opt)
1845 return batchopts
1846
1848 """
1849 Get batch system options from database
1850 @param dataset_id: ID of the run whose configuration we whish to download
1851 """
1852 sql = "SELECT * FROM batch_option WHERE dataset_id = '%s'" % dataset_id
1853 cursor = self.getcursor()
1854 cursor.execute(sql)
1855 result_set = cursor.fetchall();
1856
1857 for b in result_set:
1858 opt = BatchOpt()
1859 opt.SetName(b['name'])
1860 opt.SetType(b['type'])
1861 opt.SetValue(b['value'])
1862 steering.AddBatchOpt(opt)
1863
1864
1865 - def download_modules(self,dataset_id,
1866 tray_index,
1867 i3config,
1868 type='module',
1869 iptype='tray',
1870 include_defaults=False,
1871 include_description=False):
1872 """
1873 Get modules from the database.
1874 @param dataset_id: ID of the run whose configuration we whish to download
1875 """
1876 sql = " SELECT module.class,module.module_id,module_pivot.module_pivot_id,"
1877 sql += " module_pivot.name,module_pivot.load_index "
1878 sql += " FROM module,module_pivot "
1879 sql += " WHERE module_pivot.dataset_id = %d " % dataset_id
1880 sql += " AND module.module_type = '%s' " % type
1881 sql += " AND module_pivot.iptype = '%s' " % iptype
1882 sql += " AND module.module_id = module_pivot.module_id "
1883 sql += " AND module_pivot.tray_index = %d " % tray_index
1884 sql += " ORDER BY load_index "
1885
1886 cursor = self.getcursor()
1887 self.logger.debug(sql.strip())
1888 cursor.execute(sql.strip())
1889 result_set = cursor.fetchall();
1890 for m in result_set:
1891 mod = Service()
1892
1893 mod.SetClass(m['class'])
1894 mod.SetName(m['name'])
1895 mod.SetId(m['module_id'])
1896 module_pivot_id = m['module_pivot_id']
1897 if type == 'module':
1898 i3config.AddModule(mod)
1899 elif type == 'service':
1900 i3config.AddService(mod)
1901 elif type == 'iceprod':
1902 if iptype == 'pre':
1903 i3config.AddIceProdPre(mod)
1904 elif iptype == 'post':
1905 i3config.AddIceProdPost(mod)
1906
1907 if type in ['module','service']:
1908 for p in self.fetch_module_dependencies(mod.GetId()):
1909 project = i3config.GetProject(p.GetName())
1910 if not project == None:
1911 mod.AddProject(project.GetName(),project)
1912 else:
1913 self.logger.warn('could not find dependency \'%s\'' % p.GetName() )
1914 i3config.AddProject(p.GetName(),p)
1915 mod.AddProject(p.GetName(),p)
1916
1917 self.download_params(mod,module_pivot_id,dataset_id,include_defaults,include_description)
1918
1919
1920 - def download_services(self,dataset_id,tray_index,i3config,
1921 include_defaults=False,include_description=False):
1922 """
1923 Download services from the database.
1924 @param dataset_id: ID of the run whose configuration we whish to download
1925 """
1926 return self.download_modules(dataset_id,
1927 tray_index,
1928 i3config,
1929 type='service',
1930 include_defaults=include_defaults,
1931 include_description=include_description)
1932
1933 - def download_pre(self,dataset_id,tray_index,i3config,
1934 include_defaults=False,include_description=False):
1935 """
1936 Download IceProdPre modules from the database.
1937 @param dataset_id: ID of the run whose configuration we whish to download
1938 """
1939 return self.download_modules(dataset_id,
1940 tray_index,
1941 i3config,
1942 type='iceprod',
1943 iptype='pre',
1944 include_defaults=include_defaults,
1945 include_description=include_description)
1946
1947 - def download_post(self,dataset_id,tray_index,i3config,
1948 include_defaults=False,
1949 include_description=False):
1950 """
1951 Download IceProdPost modules from the database.
1952 @param dataset_id: ID of the run whose configuration we whish to download
1953 """
1954 return self.download_modules(dataset_id,
1955 tray_index,
1956 i3config,
1957 type='iceprod',
1958 iptype='post',
1959 include_defaults=include_defaults,
1960 include_description=include_description)
1961
1963 cursor = self.getcursor()
1964 sql = " SELECT * from carray_element "
1965 sql += " WHERE cparameter_id = %d " % pid
1966 cursor.execute (sql.strip())
1967 result_set = cursor.fetchall();
1968 vect = []
1969 for item in result_set:
1970 vect.append(Value(item['value'],self.nonify(item['unit'])))
1971 return vect
1972
1974 omkeys = self.select_omkey_array(pid)
1975 if len(omkeys) < 1:
1976 raise Exception,'could not find omkey for param %d' % pid
1977 return omkeys[0]
1978
1980 cursor = self.getcursor()
1981 sql = " SELECT * from carray_element "
1982 sql += " WHERE cparameter_id = %d order by carray_element_id" % pid
1983 cursor.execute (sql.strip())
1984 result_set = cursor.fetchall();
1985 omkeyvect = []
1986 for item in result_set:
1987 if item['name'] == 'stringid':
1988 omkey = pyOMKey(0,0)
1989 omkey.stringid = item['value']
1990 elif item['name'] == 'omid':
1991 omkey.omid = item['value']
1992 omkeyvect.append(omkey)
1993 else:
1994 raise Exception,'expected omkey but found %s' % result_set[1]['name']
1995 return omkeyvect
1996
1997 - def download_params(self,module,mod_id,dataset_id,include_defaults=False, include_description=False):
1998 """
1999 Download module parameters from the database.
2000 @param mod_id: index corresponding to module table
2001 """
2002 paramdict = {}
2003
2004 if include_defaults or include_description:
2005 sql = " SELECT * FROM parameter "
2006 sql += " WHERE module_id = %d" % module.GetId()
2007 sql += " ORDER BY name "
2008 cursor = self.getcursor()
2009 cursor.execute(sql)
2010 result_set = cursor.fetchall();
2011 for p in result_set:
2012 param = Parameter()
2013 param.SetType(p['type'])
2014 param.SetName(p['name'])
2015 pid = p['parameter_id']
2016 if param.GetType() == 'OMKeyv':
2017 param.SetValue(self.select_omkey_array(pid))
2018 elif param.GetType() == 'OMKey':
2019 param.SetValue(self.select_omkey(pid))
2020 elif param.GetType() in VectorTypes:
2021 param.SetValue(self.select_array(pid))
2022 else:
2023 param.SetValue(Value(p['value'],self.nonify(p['unit'])))
2024 param.SetUnit(self.nonify(p['unit']))
2025 param.SetDefault(True)
2026 param.SetDefault(param.GetValue())
2027 paramdict[param.GetName().lower()] = param
2028 if include_description:
2029 param.SetDescription(p['description'])
2030
2031
2032
2033 sql = " SELECT * FROM cparameter "
2034 sql += " WHERE module_pivot_id = '%s'" % mod_id
2035 sql += " AND dataset_id = '%s'" % dataset_id
2036 sql += " ORDER BY name "
2037 cursor = self.getcursor()
2038 cursor.execute(sql)
2039 result_set = cursor.fetchall();
2040
2041 for p in result_set:
2042 param = Parameter()
2043 param.SetType(p['type'])
2044 param.SetName(p['name'])
2045 pid = p['cparameter_id']
2046 if param.GetType() == 'OMKeyv':
2047 param.SetValue(self.select_omkey_array(pid))
2048 elif param.GetType() == 'OMKey':
2049 param.SetValue(self.select_omkey(pid))
2050 elif param.GetType() in VectorTypes:
2051 param.SetValue(self.select_array(pid))
2052 else:
2053 param.SetValue(Value(p['value'],self.nonify(p['unit'])))
2054 param.SetUnit(self.nonify(p['unit']))
2055 if paramdict.has_key(param.GetName().lower()):
2056 param.SetDefault(paramdict[param.GetName().lower()].GetDefault())
2057 paramdict[param.GetName().lower()] = param
2058
2059
2060 for param in paramdict.values():
2061 module.AddParameter(param)
2062
2063
2064
2066 """
2067 Check latest version of software and see if we need to upgrade
2068 """
2069 ver = iceprod.__version__
2070 ver = ver.replace('V','').split('-')
2071 if len(ver) < 3: return None
2072
2073 if not self.isconnected(): self.connect()
2074 cursor = self.getcursor()
2075 sql = " SELECT * FROM svn "
2076 sql += " WHERE major='%s' " % ver[0]
2077 sql += " AND minor='%s' " % ver[0]
2078 cursor.execute(sql)
2079 result = cursor.fetchone()
2080 try:
2081 if int(ver[2]) < int(result['patch']):
2082 return result['url']
2083 except: pass
2084 return None
2085
2086
2087 - def validate(self,dataset_id,status='TRUE'):
2088 """
2089 Mark dataset as visible and valid.
2090 """
2091 cursor = self.getcursor()
2092 sql = " UPDATE dataset SET verified = '%s' " % status
2093 sql += " WHERE dataset_id = %d " % dataset_id
2094 self.logger.debug(sql)
2095 cursor.execute(sql)
2096 self.commit()
2097
2098
2100
2101 cursor = self.getcursor()
2102 dp = DIF_Plus()
2103
2104 sql = " SELECT * FROM dif "
2105 sql += " WHERE dataset_id = %d " % dataset_id
2106 cursor.execute(sql)
2107 row = cursor.fetchone();
2108 if not row: return dp
2109 dif = dp.GetDIF()
2110 dif.SetParameters(row['parameters'])
2111 dif.SetEntryTitle(row['entry_title'])
2112 dif.SetSummary(row['summary'])
2113 dif.SetSourceName(row['source_name'])
2114 dif.SetSensorName(row['sensorname'])
2115 td = time.strptime(str(row['dif_creation_date']),"%Y-%m-%d %H:%M:%S")
2116 td = time.strftime("%Y-%m-%d",td)
2117 dif.SetDIFCreationDate(td)
2118
2119 sql = " SELECT * FROM plus "
2120 sql += " WHERE dataset_id = %d " % dataset_id
2121 cursor.execute(sql)
2122 row = cursor.fetchone();
2123 plus = dp.GetPlus()
2124 ts = time.strptime(str(row['start_datetime']),"%Y-%m-%d %H:%M:%S")
2125 ts = time.strftime("%Y-%m-%dT%H:%M:%S",ts)
2126 plus.SetStartDatetime(ts)
2127
2128 te = time.strptime(str(row['end_datetime']),"%Y-%m-%d %H:%M:%S")
2129 te = time.strftime("%Y-%m-%dT%H:%M:%S",te)
2130 plus.SetEndDatetime(te)
2131 plus.SetCategory(row['category'])
2132 plus.SetSubCategory(row['subcategory'])
2133 plus.SetSimDBKey(dataset_id)
2134 plus.SetI3DBKey(row['i3db_key'])
2135 plus.SetSteeringFile(row['steering_file'])
2136 for project in self.fetch_project_list(dataset_id):
2137 plus.AddProject(project)
2138
2139 return dp
2140
2142
2143 cursor = self.getcursor()
2144
2145 dif = difplus.GetDIF()
2146 sql = " INSERT INTO dif "
2147 sql += " (dataset_id,parameters,entry_title,summary, "
2148 sql += " source_name,sensorname,dif_creation_date) "
2149 sql += " VALUES ( %d, " % dataset_id
2150 sql += "'%s'," % str(dif.GetParameters())
2151 sql += "'%s'," % str(dif.GetEntryTitle())
2152 sql += "'%s'," % str(dif.GetSummary())
2153 sql += "'%s'," % str(dif.GetSourceName())
2154 sql += "'%s'," % str(dif.GetSensorName())
2155 sql += "'%s')" % str(dif.GetDIFCreationDate())
2156 self.logger.debug(sql)
2157 cursor.execute(sql)
2158
2159 plus = difplus.GetPlus()
2160 sql = " INSERT INTO plus "
2161 sql += " (dataset_id,start_datetime,end_datetime,"
2162 sql += " category,subcategory,i3db_key,steering_file) "
2163 sql += " VALUES ( %d, " % dataset_id
2164 sql += "'%s'," % str(plus.GetStartDatetime())
2165 sql += "'%s'," % str(plus.GetEndDatetime())
2166 sql += "'%s'," % str(plus.GetCategory())
2167 sql += "'%s'," % str(plus.GetSubCategory())
2168 sql += "%s," % (plus.GetI3DBKey() or 'NULL')
2169 sql += "'%s')" % str(plus.GetSteeringFile())
2170 self.logger.debug(sql)
2171 cursor.execute(sql)
2172
2173 self.commit()
2174
2176 """
2177 Change Plus:subcategory in DIFPlus metadata
2178 """
2179 cursor = self.getcursor()
2180 sql = " UPDATE plus "
2181 sql += " SET subcategory='%s' " % sub_cat
2182 sql += " WHERE dataset_id=%d " % dataset_id
2183 self.logger.debug(sql)
2184 cursor.execute(sql)
2185 self.commit()
2186
2187
2189 """
2190 load a list of files for filtering.
2191 @param dataset_id: the dataset id that dictonary is bound to
2192 @param odict: the dictionary to load
2193 """
2194 cursor = self.getcursor()
2195 index = 0
2196 list = odict.keys()[
2197 min(index,len(odict.keys())):
2198 min(index+100,len(odict.keys()))]
2199 while list:
2200 sql = " INSERT INTO file (file_key,path,subdir,filename,dataset_id) "
2201 sql += " VALUES "
2202 cm = ''
2203 for key in list:
2204 file = odict[key]
2205 sql += " %s ( %d, '%s', '%s', '%s', %d ) " % \
2206 (cm,key,file[0],file[1],file[2],dataset_id)
2207 cm = ','
2208 index = index + 100
2209 list = odict.keys()[
2210 min(index,len(odict.keys())):
2211 min(index+100,len(odict.keys()))]
2212 self.logger.debug(sql)
2213 cursor.execute(sql)
2214 self.commit()
2215
2217 """
2218 Get best matching tarball in file repository
2219 @param metaproject_name
2220 @param metaproject_version
2221 @param platform
2222 @param gccversion
2223 @param ppc: boolean (get PPC version of tarball)
2224 @return: string path to tarball
2225 """
2226 cursor = self.getcursor()
2227
2228 sql = " SELECT * FROM metaproject_tarball mt "
2229 sql += " JOIN metaproject m ON m.metaproject_id = mt.metaproject_id "
2230 sql += " WHERE m.name = '%s' " % metaproject_name
2231 sql += " AND m.versiontxt = '%s' " % metaproject_version
2232 sql += " AND mt.platform = '%s' " % platform
2233 sql += " AND mt.gcc <= '%s' " % gccversion
2234 sql += " ORDER BY mt.gcc DESC "
2235
2236 self.logger.debug(sql)
2237 cursor.execute(sql)
2238 row = cursor.fetchone();
2239 if row:
2240 return row['relpath']
2241 else:
2242 return ''
2243
2244
2245
2247
2248 logger = logging.getLogger('MonitorDB')
2249
2251 """
2252 Constructor
2253 """
2254 IceProdDB.__init__(self)
2255 self.maxsuspend = 20
2256
2258 """
2259 Create a copy of this instance
2260 """
2261 newconn = MonitorDB()
2262 newconn.host_ = self.host_
2263 newconn.usr_ = self.usr_
2264 newconn.passwd_ = self.passwd_
2265 newconn.db_ = self.db_
2266 newconn._connected = False
2267 return newconn
2268
2269
2270 - def reset_old_jobs(self,
2271 grid_id,
2272 maxidletime,
2273 maxruntime,
2274 maxsubmittime,
2275 maxcopytime,
2276 maxfailures=10,
2277 maxevicttime=10,
2278 keepalive=14400):
2279 """
2280 reset status of jobs that where queued but who's status
2281 has not changed in more that maxtime minutes
2282
2283 @param grid_id: id of current cluster
2284 @param maxruntime: maximum run time for jobs
2285 @param maxsubmittime: maximum submit time for jobs
2286 @param maxcopytime: maximum time for jobs to be in 'copying' state
2287 @param maxfailures: maximum number of time a job is allowd to fail
2288 @param keepalive: how often should server expect to hear from jobs
2289 """
2290
2291 totalrows = 0
2292 cursor = self.getcursor()
2293 passkey = self.mkkey(6,9)
2294
2295
2296 self.logger.debug("reseting stale queued jobs...")
2297 sql = " UPDATE job SET "
2298 sql += " errormessage=CONCAT( "
2299 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0]
2300 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), "
2301 sql += " prev_state = status, "
2302 sql += " status='RESET', "
2303 sql += " passkey='%s', " % passkey
2304 sql += " status_changed=NOW() "
2305 sql += " WHERE grid_id=%d " % grid_id
2306 sql += " AND status='QUEUED' "
2307 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxidletime
2308 sql += " LIMIT 20 "
2309 self.logger.debug(sql)
2310 cursor.execute(sql)
2311 rowcount = self._conn.affected_rows()
2312 totalrows += rowcount
2313 self.commit()
2314 self.logger.debug('Reset %d queued jobs' % rowcount)
2315
2316
2317 self.logger.debug("reseting stale queuing jobs...")
2318 sql = " UPDATE job SET "
2319 sql += " errormessage=CONCAT( "
2320 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0]
2321 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), "
2322 sql += " prev_state = status, "
2323 sql += " status='RESET', "
2324 sql += " passkey='%s', " % passkey
2325 sql += " status_changed=NOW() "
2326 sql += " WHERE grid_id=%d " % grid_id
2327 sql += " AND status='QUEUEING' "
2328 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxsubmittime
2329 sql += " LIMIT 20 "
2330 cursor.execute(sql)
2331 rowcount = self._conn.affected_rows()
2332 totalrows += rowcount
2333 self.commit()
2334 self.logger.debug('Reset %d queueing jobs' % rowcount)
2335
2336
2337 self.logger.debug("reseting stale cleaning jobs...")
2338 sql = " UPDATE job SET "
2339 sql += " errormessage=CONCAT( "
2340 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0]
2341 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), "
2342 sql += " status=prev_state, "
2343 sql += " prev_state = 'CLEANING', "
2344 sql += " passkey='%s', " % passkey
2345 sql += " status_changed=NOW() "
2346 sql += " WHERE grid_id=%d " % grid_id
2347 sql += " AND status='CLEANING' "
2348 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxsubmittime
2349 sql += " LIMIT 20 "
2350 cursor.execute(sql)
2351 rowcount = self._conn.affected_rows()
2352 totalrows += rowcount
2353 self.commit()
2354 self.logger.debug('Reset %d cleaning jobs' % rowcount)
2355
2356
2357 self.logger.debug("reseting stale processing jobs...")
2358 sql = " UPDATE job SET "
2359 sql += " errormessage=CONCAT( "
2360 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0]
2361 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), "
2362 sql += " prev_state = status, "
2363 sql += " status='RESET', "
2364 sql += " passkey='%s', " % passkey
2365 sql += " status_changed=NOW() "
2366 sql += " WHERE grid_id=%d " % grid_id
2367 sql += " AND status='PROCESSING' "
2368 timeout = maxruntime
2369 if keepalive > 0:
2370 timeout = min(maxruntime,keepalive)
2371 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % timeout
2372 sql += " LIMIT 20 "
2373 cursor.execute(sql)
2374 rowcount = self._conn.affected_rows()
2375 totalrows += rowcount
2376 self.commit()
2377 self.logger.debug('Reset %d processing jobs' % rowcount)
2378
2379
2380 self.logger.debug("reseting evicted jobs...")
2381 sql = " UPDATE job SET "
2382 sql += " errormessage=CONCAT( "
2383 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0]
2384 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), "
2385 sql += " prev_state = status, "
2386 sql += " status='RESET', "
2387 sql += " passkey='%s', " % passkey
2388 sql += " status_changed=NOW() "
2389 sql += " WHERE grid_id=%d " % grid_id
2390 sql += " AND status='EVICTED' "
2391 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxevicttime
2392 sql += " LIMIT 20 "
2393 cursor.execute(sql)
2394 rowcount = self._conn.affected_rows()
2395 totalrows += rowcount
2396 self.commit()
2397 self.logger.debug('Reset %d evicted jobs' % rowcount)
2398
2399
2400 self.logger.debug("reseting stale copying jobs...")
2401 sql = " UPDATE job SET "
2402 sql += " errormessage=CONCAT( "
2403 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0]
2404 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), "
2405 sql += " prev_state = status, "
2406 sql += " status='RESET', "
2407 sql += " passkey='%s', " % passkey
2408 sql += " status_changed=NOW() "
2409 sql += " WHERE grid_id=%d " % grid_id
2410 sql += " AND status='COPYING' "
2411 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxcopytime
2412 sql += " LIMIT 100 "
2413 cursor.execute(sql)
2414 rowcount = self._conn.affected_rows()
2415 totalrows += rowcount
2416 self.commit()
2417 self.logger.debug('Reset %d copying jobs' % rowcount)
2418 self.logger.info('Reset %d jobs' % totalrows)
2419
2420
2421 self.logger.debug("suspending jobs with too many errors...")
2422 sql = " UPDATE job SET "
2423 sql += " errormessage=CONCAT('too many errors.',errormessage), "
2424 sql += " prev_state = status, "
2425 sql += " status='FAILED', "
2426 sql += " passkey='%s', " % passkey
2427 sql += " status_changed=NOW() "
2428 sql += " WHERE grid_id=%d " % grid_id
2429 sql += " AND status != 'SUSPENDED' "
2430 sql += " AND status != 'FAILED' "
2431 sql += " AND status != 'OK' "
2432 sql += " AND job.failures > %d " % maxfailures
2433 sql += " LIMIT 2000 "
2434 cursor.execute(sql)
2435 rowcount = self._conn.affected_rows()
2436 self.commit()
2437 if rowcount > 0:
2438 self.logger.info('Suspended %d jobs with too many errors' % rowcount)
2439
2441 """
2442 Get job parts from database
2443 @param dataset_id: ID of the run whose configuration we whish to download
2444 """
2445
2446 sql = "SELECT task_def_id,name,reqs,opts,parallel,photonics FROM task_def" \
2447 + " WHERE dataset_id = %s ORDER BY task_def_id"
2448 cursor = self.getcursor()
2449 cursor.execute(sql, (dataset_id,))
2450 results = cursor.fetchall()
2451
2452 tray_sql = "SELECT idx,CONVERT(GROUP_CONCAT(iter),char) AS iters" \
2453 + " FROM task_def_tray WHERE task_def_id = %s" \
2454 + " GROUP BY idx,task_def_id"
2455 parent_sql = "SELECT name FROM task_def,task_def_rel" \
2456 + " WHERE child_task_def_id = %s" \
2457 + " AND parent_task_def_id = task_def_id"
2458 child_sql = "SELECT name FROM task_def,task_def_rel" \
2459 + " WHERE parent_task_def_id = %s" \
2460 + " AND child_task_def_id = task_def_id"
2461
2462 for row in results:
2463 id = row['task_def_id']
2464 name = row['name']
2465 reqs = row['reqs']
2466 opts = row['opts']
2467 parallel = row['parallel']
2468 photonics = row['photonics']
2469
2470 td = TaskDefinition(name,id)
2471 td.SetRequirements(reqs)
2472 if opts: td.SetBatchOpts(opts)
2473 td.SetParallelExecution(parallel)
2474 td.SetUsesPhotonics(photonics)
2475
2476 self.logger.debug(tray_sql % id)
2477 cursor.execute(tray_sql, (id,))
2478 trays = cursor.fetchall()
2479 for tray in trays:
2480 td.AddTray(tray['idx'], tray['iters'])
2481
2482 cursor.execute(parent_sql, (id,))
2483 parents = cursor.fetchall()
2484 for parent in parents:
2485 td.AddParent(parent['name'])
2486
2487 cursor.execute(child_sql, (id,))
2488 children = cursor.fetchall()
2489 for child in children:
2490 td.AddChild(child['name'])
2491
2492 steering.AddTaskDefinition(td)
2493
2494
2495
2496
2498 """
2499 Get a list of new datasets.
2500 """
2501 cursor = self.getcursor()
2502 sql = " SELECT dataset.dataset_id,dataset.jobs_submitted, "
2503 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source, "
2504 sql += " dif.sensorname as sensor, "
2505 sql += " YEAR(plus.start_datetime) as year, "
2506 sql += " plus.category, plus.subcategory,dataset.hist "
2507 sql += " FROM dataset,plus,dif "
2508 if grid:
2509 sql += ",grid_statistics "
2510 sql += " WHERE dataset.dataset_id = plus.dataset_id "
2511 sql += " AND dataset.dataset_id=dif.dataset_id "
2512 if grid:
2513 sql += " AND grid_statistics.dataset_id = dataset.dataset_id "
2514 sql += " AND grid_statistics.grid_id = %d " % grid
2515 if dataset > 0:
2516 sql += " AND dataset.dataset_id = %d " % dataset
2517 else:
2518 sql += " AND dataset.status='PROCESSING' "
2519 self.logger.debug(sql)
2520 cursor.execute(sql)
2521 sets = cursor.fetchall();
2522 self.commit()
2523 for set in sets:
2524 set['sensor'] = set['sensor'].replace('ICECUBE','IceCube').replace('IceTop','IceTop')
2525 return sets
2526
2528 """
2529 Get a list of finished dataset for which no histos have been created.
2530 """
2531 cursor = self.getcursor()
2532 sql = " SELECT dataset.dataset_id,dataset.jobs_submitted, "
2533 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source, "
2534 sql += " dif.sensorname as sensor, "
2535 sql += " YEAR(plus.start_datetime) as year, "
2536 sql += " plus.category, plus.subcategory "
2537 sql += " FROM dataset,plus,dif "
2538 sql += " WHERE dataset.dataset_id = plus.dataset_id "
2539 sql += " AND dataset.dataset_id=dif.dataset_id "
2540 sql += " AND dataset.status='COMPLETE' "
2541 if dataset:
2542 sql += " AND dataset.dataset_id = %d " % dataset
2543 else:
2544 sql += " AND dataset.hist=1 "
2545 self.logger.debug(sql)
2546 cursor.execute(sql)
2547 sets = cursor.fetchall();
2548 self.commit()
2549 for set in sets:
2550 set['sensor'] = set['sensor'].replace('ICECUBE','IceCube').replace('IceTop','IceTop')
2551 return sets
2552
2554 """
2555 Get a list of finished dataset for which no histos have been created.
2556 """
2557 sets = []
2558 if datasetlist:
2559 cursor = self.getcursor()
2560 sql = " SELECT dataset.* FROM dataset "
2561 sql += " WHERE dataset.dataset_id "
2562 sql += " IN (%s) " % ",".join(map(str,datasetlist))
2563 sql += " AND dataset.status != 'PROCESSING' "
2564 self.logger.debug(sql)
2565 cursor.execute(sql)
2566 sets = cursor.fetchall();
2567 self.commit()
2568 return sets
2569
2571 """
2572 Get a list of new datasets.
2573 """
2574 cursor = self.getcursor()
2575 sql = " SELECT dataset.dataset_id,dataset.jobs_submitted, "
2576 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source, "
2577 sql += " dif.sensorname as sensor, "
2578 sql += " YEAR(plus.start_datetime) as year, "
2579 sql += " plus.category, plus.subcategory "
2580 sql += " FROM dataset,plus,dif "
2581 sql += " WHERE dataset.dataset_id = plus.dataset_id "
2582 sql += " AND dataset.dataset_id=dif.dataset_id "
2583 sql += " AND dataset.status='COMPLETE' AND dataset.hist=0 "
2584 sql += " AND dataset.dataset_id = %d " % dataset_id
2585 self.logger.debug(sql)
2586 cursor.execute(sql)
2587 set = cursor.fetchone();
2588 set['sensor'] = set['sensor'].replace('ICECUBE','IceCube').replace('IceTop','IceTop')
2589 self.commit()
2590 return set
2591
2593 cursor = self.getcursor()
2594 sql = " UPDATE dataset "
2595 sql += " SET dataset.hist=0 "
2596 sql += " WHERE dataset.dataset_id = %d " % dataset
2597 sql += " AND dataset.hist=1 "
2598 self.logger.debug(sql)
2599 cursor.execute(sql)
2600 self.commit()
2601 return
2602
2604 """
2605 Update statistics for datasets and return all dataset which
2606 have completed
2607 """
2608
2609 finished_sets = []
2610 cursor = self.getcursor()
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653 sql = " SELECT job.dataset_id, "
2654 sql += " MAX(job.status_changed) AS enddate, "
2655 sql += " SUM(1) AS jobs_submitted, "
2656 sql += " SUM(job.status='OK' ) AS jobs_completed, "
2657 sql += " SUM(job.status='FAILED') AS jobs_failed, "
2658 sql += " SUM(job.status='SUSPENDED') AS jobs_suspended, "
2659 sql += " SUM(job.time_real) AS real_time, "
2660 sql += " SUM(job.time_sys) AS sys_time, "
2661 sql += " SUM(job.time_user) AS user_time, "
2662 sql += " SUM(job.mem_heap) AS heap_mem, "
2663 sql += " SUM(job.mem_heap_peak) AS heap_mem_peak, "
2664 sql += " SUM(job.mem_stack_peak) AS stack_mem_peak, "
2665 sql += " SUM(job.nevents) AS events, "
2666 sql += " SUM(job.gevents) AS gevents, "
2667 sql += " SUM(job.evictions) AS sevictions, "
2668 sql += " grid_statistics.debug AS debug "
2669 sql += " FROM job, dataset, grid_statistics "
2670 sql += " WHERE dataset.dataset_id = job.dataset_id "
2671 sql += " AND dataset.dataset_id = grid_statistics.dataset_id "
2672 if grid_id is not None:
2673 sql += " AND grid_statistics.grid_id = %d " % grid_id
2674 if dataset_id:
2675 sql += " AND dataset.dataset_id = %d " % dataset_id
2676 else:
2677 sql += " AND dataset.status = 'PROCESSING' "
2678 sql += " GROUP by job.dataset_id "
2679 cursor.execute(sql)
2680 result_set = cursor.fetchall();
2681 self.commit()
2682
2683 for entry in result_set:
2684 try:
2685 entry['jobs_completed'] = self.intcast(entry['jobs_completed'])
2686 entry['jobs_failed'] = self.intcast(entry['jobs_failed'])
2687 except Exception,e:
2688 self.logger.error("Could not cast int(%s)" % entry['jobs_completed'] )
2689 entry['jobs_completed'] = 0
2690 continue;
2691
2692 if self.intcast(entry['jobs_completed']) == self.intcast(entry['jobs_submitted']):
2693
2694 finished_sets.append(entry)
2695 sql = " UPDATE dataset "
2696 sql += " SET dataset.jobs_completed = %(jobs_completed)d, "
2697 sql += " dataset.jobs_failed = %(jobs_failed)d, "
2698 sql += " dataset.status = '%(status)s', "
2699 sql += " dataset.enddate = '%(enddate)s', "
2700 sql += " time_real = %(real_time)g, "
2701 sql += " time_sys = %(sys_time)g, "
2702 sql += " time_user = %(user_time)g, "
2703 sql += " mem_heap = %(heap_mem)g, "
2704 sql += " mem_heap_peak = %(heap_mem_peak)g, "
2705 sql += " mem_stack_peak = %(stack_mem_peak)g, "
2706 sql += " events = %(events)d "
2707 sql += " WHERE dataset.dataset_id = %(dataset_id)d "
2708 sql += " AND dataset.status = 'PROCESSING' "
2709 sql += " %(and)s "
2710
2711 entry['status'] = 'READYTOPUBLISH'
2712 entry['and'] = "AND dataset.dataset_category != 'TEMPLATE'"
2713 cursor.execute(sql % entry)
2714
2715 entry['status'] = 'TEMPLATE'
2716 entry['and'] = "AND dataset.dataset_category = 'TEMPLATE'"
2717 cursor.execute(sql % entry)
2718
2719 sql = " UPDATE grid_statistics "
2720 sql += " SET suspend = 1 "
2721 sql += " WHERE dataset_id = %d " % entry['dataset_id']
2722 cursor.execute(sql)
2723
2724 elif entry['jobs_submitted'] and (self.intcast(entry['jobs_completed']) + \
2725 self.intcast(entry['jobs_failed'])) == \
2726 self.intcast(entry['jobs_submitted']):
2727 finished_sets.append(entry)
2728 sql = " UPDATE dataset "
2729 sql += " SET dataset.jobs_completed = %d, " % entry['jobs_completed']
2730 sql += " dataset.jobs_failed = %d, " % entry['jobs_failed']
2731 sql += " dataset.status = 'ERRORS', "
2732 sql += " dataset.enddate = '%s', " % entry['enddate']
2733 sql += " time_real = %g, " % entry['real_time']
2734 sql += " time_sys = %g, " % entry['sys_time']
2735 sql += " time_user = %g, " % entry['user_time']
2736 sql += " mem_heap = %g, " % entry['heap_mem']
2737 sql += " mem_heap_peak = %g, " % entry['heap_mem_peak']
2738 sql += " mem_stack_peak = %g, " % entry['stack_mem_peak']
2739 sql += " events = %d " % self.intcast(entry['events'])
2740 sql += " WHERE dataset.dataset_id = %d " % entry['dataset_id']
2741 sql += " AND dataset.status = 'PROCESSING' "
2742 cursor.execute(sql)
2743
2744 sql = " UPDATE grid_statistics "
2745 sql += " SET suspend = 1 "
2746 sql += " WHERE dataset_id = %d " % entry['dataset_id']
2747 cursor.execute(sql)
2748
2749 if self.intcast(entry['jobs_suspended']) + self.intcast(entry['jobs_failed']) > self.maxsuspend:
2750 sql = " UPDATE grid_statistics "
2751 sql += " SET suspend = 1 "
2752 sql += " WHERE suspend = 0 "
2753 sql += " AND dataset_id = %d " % entry['dataset_id']
2754 if grid_id is not None:
2755 sql += " AND grid_id = %d " % grid_id
2756 sql += " AND debug = 1"
2757 cursor.execute(sql)
2758
2759 self.commit()
2760 return finished_sets
2761
2762
2763 - def GetGridId(self,grid_name,institution=None,batchsys=None,url=None):
2764 """
2765 Retrieve the key for grid_name
2766 """
2767 ver = iceprod.__version__
2768 if not self.isconnected(): self.connect()
2769 cursor = self.getcursor()
2770 sql = " SELECT grid_id FROM grid WHERE name='%s' " % grid_name
2771 cursor.execute(sql)
2772 result = cursor.fetchone()
2773 if result:
2774 grid_id = result['grid_id']
2775 else:
2776 sql = " INSERT IGNORE INTO grid (name,institution,batchsys,version) "
2777 sql += " VALUES ('%s','%s','%s','%s') " % (grid_name,institution,batchsys,ver)
2778 cursor.execute(sql)
2779 grid_id = self.insert_id()
2780
2781 if institution and batchsys:
2782 sql = " UPDATE grid SET "
2783 if url:
2784 sql += " url = '%s', " % url
2785 sql += " institution = '%s', " % institution
2786 sql += " batchsys = '%s', " % batchsys
2787 sql += " version = '%s' " % ver
2788 sql += " WHERE grid_id=%d " % grid_id
2789 cursor.execute(sql)
2790 self.commit()
2791
2792 return grid_id
2793
2794 - def RegisterServer(self,
2795 grid_id,
2796 server_name,
2797 server_status,
2798 server_pid):
2799 """
2800 Retrieve the key for grid_name
2801 """
2802 if not self.isconnected(): self.connect()
2803 cursor = self.getcursor()
2804
2805 sql = " UPDATE grid SET "
2806 sql += " %s='%s', " % (server_name,server_status)
2807 sql += " %s_pid=%d " % (server_name,server_pid)
2808 sql += " WHERE grid_id=%d " % grid_id
2809 cursor.execute(sql)
2810 self.commit()
2811
2813 """
2814 Get status changes for daemons
2815 """
2816 if not self.isconnected(): self.connect()
2817 cursor = self.getcursor()
2818
2819 sql = " SELECT * FROM grid "
2820 sql += " WHERE grid_id=%d " % grid_id
2821 cursor.execute(sql)
2822 result = cursor.fetchone()
2823
2824 sql = " UPDATE grid SET lastupdate=NOW() "
2825 sql += " WHERE grid_id=%d " % grid_id
2826 cursor.execute(sql)
2827
2828 self.commit()
2829 return result
2830
2832 """
2833 Change status of daemons
2834 """
2835 cursor = self.getcursor()
2836
2837 sql = " UPDATE grid SET "
2838 sql += " %s = 'STOPREQUEST' " % daemon
2839 sql += " WHERE %s ='RUNNING' " % daemon
2840 if type(grid) is types.IntType:
2841 sql += " AND grid_id=%u " % grid
2842 elif grid not in ('any','*','all'):
2843 sql += " AND name='%s' " % grid
2844 cursor.execute(sql)
2845 self.commit()
2846
2848 """
2849 Change status of daemons
2850 """
2851 cursor = self.getcursor()
2852
2853 sql = " UPDATE grid SET "
2854 sql += " %s = 'STARTREQUEST' " % daemon
2855 sql += " WHERE %s = 'STOPPED' " % daemon
2856 if type(grid) is types.IntType:
2857 sql += " AND grid_id=%u " % grid
2858 else:
2859 sql += " AND name='%s' " % grid
2860 cursor.execute(sql)
2861 self.commit()
2862
2864 """
2865 Get parameters for given dataset
2866 """
2867 cursor = self.getcursor()
2868 sql = " SELECT plus.category, plus.subcategory, "
2869 sql += " YEAR(plus.start_datetime) as year,"
2870 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source "
2871 sql += " FROM plus,dif "
2872 sql += " WHERE plus.dataset_id = %d " % dataset
2873 sql += " AND plus.dataset_id = dif.dataset_id "
2874 self.logger.debug(sql)
2875 cursor.execute(sql)
2876 return cursor.fetchall()
2877
2879 """
2880 Fetch list of jobs that have completed for given grid_id
2881 """
2882
2883 cursor = self.getcursor()
2884 passkey = self.mkkey(6,9)
2885 job_list = []
2886
2887 sql = " SELECT SUM(1) copying FROM `job` "
2888 sql += " WHERE "
2889 sql += " (status='COPYING' OR status='COPIED') "
2890 sql += " AND grid_id = %d " % grid_id
2891 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay
2892 self.logger.debug(sql)
2893 cursor.execute(sql)
2894 currently_copying = self.intcast(cursor.fetchone()['copying'])
2895 if not currently_copying: currently_copying = 0
2896
2897 sql = " UPDATE job SET "
2898 sql += " passkey='%s', " % passkey
2899 sql += " status_changed=NOW() "
2900 sql += " WHERE status='COPIED' "
2901 sql += " AND grid_id = %d " % grid_id
2902 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay
2903 sql += " LIMIT %d " % max_copy
2904 self.logger.debug(sql)
2905 cursor.execute(sql)
2906 self.commit()
2907
2908 sql = " UPDATE job SET "
2909 sql += " prev_state = status, "
2910 sql += " status = 'COPYING', "
2911 sql += " passkey='%s', " % passkey
2912 sql += " status_changed=NOW() "
2913 sql += " WHERE status='READYTOCOPY' "
2914 sql += " AND grid_id = %d " % grid_id
2915 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay
2916 sql += " LIMIT %d " % max(1,max_copy - currently_copying)
2917 self.logger.debug(sql)
2918 cursor.execute(sql)
2919 self.commit()
2920
2921 sql = " SELECT job.*, plus.category, plus.subcategory, "
2922 sql += " YEAR(plus.start_datetime) as year,"
2923 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source "
2924 sql += " FROM job,plus,dif "
2925 sql += " WHERE grid_id = %d " % grid_id
2926 sql += " AND job.dataset_id = plus.dataset_id "
2927 sql += " AND plus.dataset_id = dif.dataset_id "
2928 sql += " AND (job.status='COPYING' OR job.status='COPIED') "
2929 sql += " AND job.passkey='%s' " % passkey
2930 self.logger.debug(sql)
2931 cursor.execute(sql)
2932 result_set = cursor.fetchall()
2933 self.commit()
2934
2935 return result_set
2936
2938 """
2939 Fetch list of jobs that have completed for given grid_id
2940 """
2941
2942 cursor = self.getcursor()
2943 passkey = self.mkkey(6,9)
2944
2945
2946 sql = " UPDATE job,dataset SET "
2947 sql += " job.prev_state = job.status, "
2948 sql += " job.status='SUSPENDED', "
2949 sql += " job.status_changed=NOW() "
2950 sql += " WHERE job.status='ERROR' "
2951 sql += " AND job.grid_id = %d " % grid_id
2952 sql += " AND job.dataset_id = dataset.dataset_id "
2953 sql += " AND dataset.debug = 1 "
2954 cursor.execute(sql)
2955 self.commit()
2956
2957
2958 sql = " UPDATE job SET "
2959 sql += " prev_state = status, "
2960 sql += " status='WAITING', "
2961 sql += " status_changed=NOW() "
2962 sql += " WHERE status IN ('ERROR','RESET') "
2963 sql += " AND (grid_id =0 OR grid_id IS NULL) "
2964 cursor.execute(sql)
2965 self.commit()
2966
2967
2968 sql = " UPDATE job SET "
2969 sql += " prev_state = status, "
2970 sql += " passkey='%s' " % passkey
2971 sql += " WHERE status='RESET' "
2972 sql += " AND grid_id = %d " % grid_id
2973 sql += " LIMIT %d " % max_reset
2974 cursor.execute(sql)
2975 self.commit()
2976
2977
2978 sql = " SELECT * FROM job "
2979 sql += " WHERE grid_id = %d " % grid_id
2980 sql += " AND passkey='%s' " % passkey
2981 sql += " AND status='RESET' "
2982 sql += " LIMIT %d " % max_reset
2983 cursor.execute(sql)
2984 result_set = cursor.fetchall()
2985 self.commit()
2986
2987
2988 sql = " UPDATE job SET "
2989 sql += " prev_state = status, "
2990 sql += " status = 'CLEANING', "
2991 sql += " passkey='%s', " % passkey
2992 sql += " status_changed=NOW() "
2993 sql += " WHERE status='RESET' "
2994 sql += " AND grid_id = %d " % grid_id
2995 sql += " AND NOW() > TIMESTAMPADD(MINUTE,5,status_changed) "
2996 sql += " ORDER BY priority DESC, job_id "
2997 sql += " LIMIT %d " % max_reset
2998 cursor.execute(sql)
2999 self.commit()
3000
3001
3002 sql = " UPDATE job SET "
3003 sql += " job.prev_state = job.status, "
3004 sql += " job.status='RESET', "
3005 sql += " job.status_changed=NOW() "
3006 sql += " WHERE job.status='ERROR' "
3007 sql += " AND job.grid_id = %d " % grid_id
3008 sql += " ORDER BY job.priority DESC, job.job_id "
3009 sql += " LIMIT %d " % max_reset
3010 cursor.execute(sql)
3011 self.commit()
3012
3013 return result_set
3014
3015 - def reset_old_tasks(self,
3016 grid_id,
3017 maxidletime,
3018 maxruntime,
3019 maxsubmittime,
3020 maxcopytime,
3021 maxfailures=10,
3022 maxevicttime=10,
3023 keepalive=14400):
3024 """
3025 reset status of tasks that where queued but who's status
3026 has not changed in more that maxtime minutes
3027
3028 @param grid_id: id of current cluster
3029 @param maxruntime: maximum run time for jobs
3030 @param maxsubmittime: maximum submit time for jobs
3031 @param maxcopytime: maximum time for jobs to be in 'copying' state
3032 @param maxfailures: maximum number of time a job is allowd to fail
3033 @param keepalive: how often should server expect to hear from jobs
3034 """
3035
3036 totalrows = 0
3037 cursor = self.getcursor()
3038 passkey = self.mkkey(6,9)
3039
3040
3041 self.logger.debug("reseting stale queued tasks...")
3042 sql = " UPDATE task SET "
3043 sql += " last_status = status, "
3044 sql += " status='RESET', "
3045 sql += " passkey='%s', " % passkey
3046 sql += " status_changed=NOW() "
3047 sql += " WHERE grid_id=%d " % grid_id
3048 sql += " AND status=%s "
3049 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%s,status_changed) "
3050 sql += " LIMIT 20 "
3051
3052 self.logger.debug(sql)
3053
3054 timeouts = []
3055 timeouts.append(('QUEUED',maxidletime))
3056 timeouts.append(('QUEUEING',maxsubmittime))
3057 timeouts.append(('CLEANING',maxsubmittime))
3058 timeouts.append(('EVICTED',maxevicttime))
3059 timeouts.append(('COPIED',maxcopytime))
3060 timeouts.append(('COPYINGINPUT',maxcopytime))
3061 timeouts.append(('COPYINGOUTPUT',maxcopytime))
3062 timeouts.append(('STARTING',maxcopytime))
3063 cursor.executemany(sql, timeouts)
3064
3065 rowcount = self._conn.affected_rows()
3066 totalrows += rowcount
3067 self.commit()
3068 self.logger.debug('Reset %d queued jobs' % rowcount)
3069
3070
3071
3073 """
3074 Fetch list of jobs that have completed for given grid_id
3075 """
3076
3077 cursor = self.getcursor()
3078 passkey = self.mkkey(6,9)
3079
3080
3081 sql = " UPDATE task SET "
3082 sql += " last_status = status, "
3083 sql += " status='WAITING', "
3084 sql += " status_changed=NOW() "
3085 sql += " WHERE status IN ('ERROR','RESET') "
3086 sql += " AND (grid_id =0 OR grid_id IS NULL) "
3087 cursor.execute(sql)
3088 self.commit()
3089
3090
3091 sql = " UPDATE task SET "
3092 sql += " last_status = status, "
3093 sql += " passkey='%s' " % passkey
3094 sql += " WHERE status='RESET' "
3095 sql += " AND grid_id = %d " % grid_id
3096 sql += " LIMIT %d " % max_reset
3097 cursor.execute(sql)
3098 self.commit()
3099
3100
3101 sql = " SELECT * FROM task "
3102 sql += " JOIN job ON task.job_id=job.job_id "
3103 sql += " WHERE task.grid_id = %d " % grid_id
3104 sql += " AND task.passkey='%s' " % passkey
3105 sql += " AND task.status='RESET' "
3106 sql += " LIMIT %d " % max_reset
3107 cursor.execute(sql)
3108 result_set = cursor.fetchall()
3109 self.commit()
3110
3111
3112 sql = " UPDATE task SET "
3113 sql += " last_status = status, "
3114 sql += " status = 'CLEANING', "
3115 sql += " passkey='%s', " % passkey
3116 sql += " status_changed=NOW() "
3117 sql += " WHERE status='RESET' "
3118 sql += " AND grid_id = %d " % grid_id
3119 sql += " AND NOW() > TIMESTAMPADD(MINUTE,5,status_changed) "
3120 sql += " ORDER BY job_id "
3121 sql += " LIMIT %d " % max_reset
3122 cursor.execute(sql)
3123 self.commit()
3124
3125
3126 sql = " UPDATE task SET "
3127 sql += " task.last_status = task.status, "
3128 sql += " task.status='RESET', "
3129 sql += " task.status_changed=NOW() "
3130 sql += " WHERE task.status='ERROR' "
3131 sql += " AND task.grid_id = %d " % grid_id
3132 sql += " ORDER BY task.job_id "
3133 sql += " LIMIT %d " % max_reset
3134 cursor.execute(sql)
3135 self.commit()
3136
3137 return result_set
3138
3139
3167
3194
3220
3222 """
3223 Fetch list of jobs that have completed for given grid_id
3224 """
3225
3226 cursor = self.getcursor()
3227 passkey = self.mkkey(6,9)
3228 job_list = []
3229
3230 sql = " SELECT SUM(1) copying FROM `task` "
3231 sql += " WHERE "
3232 sql += " (status='COPYING' OR status='COPIED') "
3233 sql += " AND grid_id = %d " % grid_id
3234 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay
3235 self.logger.debug(sql)
3236 cursor.execute(sql)
3237 currently_copying = self.intcast(cursor.fetchone()['copying'])
3238 if not currently_copying: currently_copying = 0
3239
3240 sql = " UPDATE task SET "
3241 sql += " passkey='%s', " % passkey
3242 sql += " status_changed=NOW() "
3243 sql += " WHERE status='COPIED' "
3244 sql += " AND grid_id = %d " % grid_id
3245 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay
3246 sql += " LIMIT %d " % max_copy
3247 self.logger.debug(sql)
3248 cursor.execute(sql)
3249 self.commit()
3250
3251 sql = " SELECT task.*, plus.category, plus.subcategory, "
3252 sql += " YEAR(plus.start_datetime) as year,"
3253 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source "
3254 sql += " FROM task,job,plus,dif "
3255 sql += " WHERE task.grid_id = %d " % grid_id
3256 sql += " AND task.job_id = job.job_id "
3257 sql += " AND job.dataset_id = plus.dataset_id "
3258 sql += " AND plus.dataset_id = dif.dataset_id "
3259 sql += " AND (task.status='COPYING' OR task.status='COPIED') "
3260 sql += " AND task.passkey='%s' " % passkey
3261 self.logger.debug(sql)
3262 cursor.execute(sql)
3263 result_set = cursor.fetchall()
3264 self.commit()
3265
3266 return result_set
3267
3268
3269
3270 - def GetTasks(self,grid_id,status=('QUEUED','QUEUEING','PROCESSING','RESET','ERROR'),delay=0):
3271 """
3272 Get list of tasks currently in any given status
3273 """
3274 from iceprod.server.job import i3Task
3275 cursor = self.getcursor()
3276 task_list = []
3277 sql = " SELECT * FROM task"
3278 sql += " JOIN job "
3279 sql += " ON job.job_id = task.job_id "
3280 sql += " WHERE task.grid_id = %d " % grid_id
3281 if isinstance(status,tuple):
3282 sql += " AND task.status IN " + str(status)
3283 else:
3284 sql += " AND task.status = '%s' " % status
3285 self.logger.debug(sql)
3286 cursor.execute(sql)
3287 for j in cursor.fetchall():
3288 task = i3Task()
3289 task.SetTaskId(j['task_id'])
3290 task.SetDatasetId(j['dataset_id'])
3291 task.SetDatabaseId(j['job_id'])
3292 task.SetProcNum(j['queue_id'])
3293 task.SetPrio(j['priority'])
3294 task.SetJobId(j['grid_queue_id'])
3295 task.AddArgOption("key",j['passkey'])
3296 task.SetLogFile( "%s/%s.log" % (j['submitdir'],task.Prefix() ))
3297 task.SetOutputFile( "%s/%s.out" % (j['submitdir'],task.Prefix() ))
3298 task.SetErrorFile( "%s/%s.err" % (j['submitdir'],task.Prefix() ))
3299
3300 task_list.append(task)
3301 self.commit()
3302 return task_list
3303
3304
3306 """
3307 Get list of tasks currently active
3308 """
3309 return self.GetTasks(grid_id,status=('QUEUED','QUEUEING','PROCESSING','RESET','ERROR'),delay=delay)
3310
3311
3313 """
3314 Get list of tasks currently in queue
3315 """
3316 return self.GetTasks(grid_id,status=('QUEUED'),delay=delay)
3317
3318
3320 """
3321 Get list of tasks currently in queue
3322 """
3323 return self.GetTasks(grid_id,status=('STARTING','COPYINGINPUT','PROCESSING','COPYINGOUTPUT','READYTOCOPY','COPYING','COPIED'),delay=delay)
3324
3325
3327
3328 cursor = self.getcursor()
3329
3330
3331 sql = " SELECT jd.* FROM job_dependency jd "
3332 sql += " JOIN dataset d ON jd.dataset_id = d.dataset_id "
3333 sql += " JOIN grid_statistics gs ON gs.dataset_id = d.dataset_id "
3334 sql += " WHERE d.status = 'PROCESSING' "
3335 sql += " AND gs.grid_id = %u " % grid_id
3336 self.logger.debug(sql)
3337
3338 try:
3339 cursor.execute(sql)
3340 except:
3341 logger.error(sql)
3342 return
3343 result = cursor.fetchall()
3344
3345
3346 rules = {}
3347 for entry in result:
3348 if not rules.has_key('dataset_id'):
3349 rules[entry['dataset_id']] = []
3350 rules[entry['dataset_id']].append(entry)
3351
3352
3353 sql = " SELECT j.* FROM job j "
3354 sql += " JOIN dataset d ON j.dataset_id = d.dataset_id "
3355 sql += " JOIN grid_statistics gs ON gs.dataset_id = d.dataset_id "
3356 sql += " WHERE d.status = 'PROCESSING' "
3357 sql += " AND gs.grid_id = %u " % grid_id
3358 sql += " AND j.status = 'IDLE' "
3359 sql += " AND d.dataset_id IN ( "
3360 sql += " SELECT d.dataset_id FROM dataset d "
3361 sql += " JOIN job_dependency jd ON jd.dataset_id = d.dataset_id "
3362 sql += " JOIN grid_statistics gs ON gs.dataset_id = d.dataset_id "
3363 sql += " WHERE d.status = 'PROCESSING' "
3364 sql += " AND gs.grid_id = %u " % grid_id
3365 sql += " GROUP BY dataset_id "
3366 sql += " ) "
3367 sql += " LIMIT 500 "
3368
3369 try:
3370 cursor.execute(sql)
3371 except:
3372 logger.error(sql)
3373 return
3374 jobs = cursor.fetchall()
3375
3376 for job in jobs:
3377 dataset = job['dataset_id']
3378 deps = []
3379 for rule in rules[dataset]:
3380 for table in ('job','archived_job'):
3381 sql = " SELECT j.* FROM %s j " % table
3382 sql += " WHERE j.dataset_id = %s " % rule['input_dataset']
3383 sql += " AND j.%s = %s " % (rule['input_job'],job['queue_id'])
3384 try:
3385 cursor.execute(sql)
3386 except Exception,e:
3387 logger.error(e)
3388 else:
3389 deps += cursor.fetchall()
3390
3391 if reduce(lambda x,y: x and y['status'] in ('OK','COPIED'), deps, True):
3392 sql = " UPDATE job j1 SET status='WAITING' "
3393 sql += " WHERE j1.job_id = %(job_id)u " % job
3394 sql += " AND j1.status = 'IDLE' "
3395 logger.debug(sql)
3396 cursor.execute(sql)
3397 self.commit()
3398
3399
3400 - def QueueJobs(self,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0,maxq=1000):
3401 """
3402 Reserve at most 'maxjobs' from a given dataset.
3403 Get proc ids and set their status to 'QUEUEING'
3404 """
3405 from iceprod.server.job import i3Job,i3Task
3406 cursor = self.getcursor()
3407 job_list = {}
3408
3409
3410 sql = " SELECT SUM(1) AS total, "
3411 sql += " SUM(IF(status = 'QUEUED', 1, 0)) AS queued, "
3412 sql += " SUM(IF(status = 'QUEUEING', 1, 0)) AS queueing, "
3413 sql += " SUM(IF(status = 'PROCESSING', 1, 0)) AS processing "
3414 sql += " FROM job "
3415 sql += " WHERE job.grid_id = %d " % grid_id
3416 sql += " AND status NOT IN ('COPIED','IDLE','WAITING','OK','SUSPENDED','RESET','FAILED','CLEANING') "
3417 cursor.execute(sql)
3418 result = cursor.fetchone()
3419 self.commit()
3420 if result['total'] == None:
3421 self.logger.debug('queue total returned None')
3422 total = 0
3423 else:
3424 total = int(result['total'])
3425 if result['queued'] == None:
3426 queued = 0
3427 else:
3428 queued = int(result['queued'])
3429 if result['queueing'] == None:
3430 queueing = 0
3431 else:
3432 queueing = int(result['queueing'])
3433 if result['processing'] == None:
3434 processing = 0
3435 else:
3436 processing = int(result['processing'])
3437
3438 self.logger.info('%d jobs are currently in the queue' % total)
3439 self.logger.info('%d jobs are currently queued' % (queued+queueing))
3440 self.logger.info('%d jobs are currently processing ' % processing)
3441
3442 maxjobs = maxjobs - min(maxjobs,total)
3443 if queued+queueing > maxq: maxjobs = 0
3444
3445
3446 sql = " SELECT gs.dataset_id FROM grid_statistics gs "
3447 sql += " JOIN dataset d ON gs.dataset_id = d.dataset_id "
3448 sql += " WHERE gs.grid_id = %d AND gs.suspend = 0 " % grid_id
3449 sql += " AND d.status = 'PROCESSING' "
3450 if debug:
3451 sql += " AND gs.debug = 1 "
3452 else:
3453 sql += " AND gs.debug != 1 "
3454
3455 self.logger.debug(sql)
3456 cursor.execute(sql)
3457 dataset_list = []
3458 result_set = cursor.fetchall()
3459 self.commit()
3460
3461 if not len(result_set) > 0:
3462 self.logger.info("no jobs to queue at this time")
3463 return {}
3464
3465 for item in result_set:
3466 dataset_list.append(item['dataset_id'])
3467
3468 sql = " SELECT job.*, dataset.temporary_storage, "
3469 sql += " plus.category, plus.subcategory, "
3470 sql += " YEAR(plus.start_datetime) as year, "
3471 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source "
3472 sql += " FROM job use index(status_dataset_index) "
3473 sql += " STRAIGHT_JOIN dataset ON job.dataset_id = dataset.dataset_id "
3474 sql += " STRAIGHT_JOIN dif ON job.dataset_id = dif.dataset_id "
3475 sql += " STRAIGHT_JOIN plus ON job.dataset_id = plus.dataset_id "
3476 sql += " WHERE job.status = 'WAITING' and job.dataset_id in ( "
3477 sep = ""
3478 for d in dataset_list:
3479 sql += sep+str(d)
3480 sep = ","
3481 sql += ") "
3482 sql += " ORDER BY job.priority DESC,"
3483 if fifo:
3484 sql += " job.dataset_id,"
3485 sql += " job.queue_id "
3486 sql += " LIMIT %u " % min(jobs_at_once,maxjobs)
3487
3488
3489 self.logger.debug(sql)
3490 cursor.execute(sql)
3491 result_set = cursor.fetchall()
3492 self.commit()
3493
3494 if not len(result_set) > 0:
3495 self.logger.info("no jobs to queue at this time")
3496 return {}
3497 else:
3498 self.logger.info("reserved %d jobs" % len(result_set))
3499
3500 for item in result_set:
3501 proc = item['queue_id']
3502 target_url = item['temporary_storage']
3503 item['subdirectory'] = "%05d-%05d" % ((proc/1000)*1000,((proc+1000)/1000)*1000-1)
3504
3505 if len(job_list) >= maxjobs:
3506 return job_list
3507
3508
3509 passkey = self.mkkey(6,9)
3510 queue_id = item['queue_id']
3511 dataset_id = item['dataset_id']
3512 priority = item['priority']
3513
3514 sql = " UPDATE job SET "
3515 sql += " job.prev_state = job.status, "
3516 sql += " job.status='QUEUEING', "
3517 sql += " job.host=NULL, "
3518 sql += " job.grid_id=%d, " % grid_id
3519 sql += " status_changed=NOW(),"
3520 sql += " passkey='%s' " % passkey
3521 sql += " WHERE job.status='WAITING' "
3522 sql += " AND job.queue_id=%d " % queue_id
3523 sql += " AND job.dataset_id=%d " % dataset_id
3524 try:
3525 cursor.execute(sql)
3526 rowcount = self._conn.affected_rows()
3527 self.commit()
3528 except Exception, e:
3529 self.logger.debug(e)
3530 continue
3531 if rowcount == 1:
3532 self.logger.debug("%d job has been marked as 'QUEUEING' " % rowcount)
3533
3534 sql = " SELECT * FROM run "
3535 sql += " WHERE run.dataset_id = %u " % dataset_id
3536 sql += " AND run.queue_id = %u " % queue_id
3537 cursor.execute(sql)
3538 runinfo = cursor.fetchone()
3539
3540 job = i3Job()
3541 job.SetDatabaseId(item['job_id'])
3542 job.SetDatasetId(dataset_id)
3543 job.SetProcNum(queue_id)
3544 job.SetPrio(priority)
3545 job.passkey = passkey
3546 job.AddArgOption("key",passkey)
3547 if runinfo:
3548 self.logger.info("%(run_id)s, date %(date)s" % runinfo)
3549 job.AddArgOption("run",runinfo["run_id"])
3550 job.AddArgOption("subrun",runinfo["sub_run"])
3551 job.AddArgOption("date",str(runinfo['date']))
3552
3553 if not job_list.has_key(dataset_id):
3554 job_list[dataset_id] = []
3555 job_list[dataset_id].append(job)
3556 elif rowcount == 0:
3557 self.logger.warn("someone beat me to job %d in dataset %d" % \
3558 (queue_id,dataset_id))
3559 else:
3560 raise Exception, "getjob:wrong number of rows affected"
3561
3562 return job_list
3563
3564
3565 - def SetTasks(self,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0):
3566 """
3567 DAG mode
3568 Reserve at most 'maxjobs' from a given dataset.
3569 Get proc ids and set their status to 'QUEUEING'
3570 """
3571
3572
3573 sql = " SELECT job.*, "
3574 sql += " plus.category, plus.subcategory, "
3575 sql += " YEAR(plus.start_datetime) as year, "
3576 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source "
3577 sql += " FROM task "
3578 sql += " JOIN job ON task.job_id = job.job_id "
3579 sql += " JOIN task_def_tray tdt ON task.task_def_tray_id = tdt.task_def_tray_id "
3580 sql += " JOIN grid_statistics gst ON gst.task_def_id = tdt.task_def_id "
3581 sql += " JOIN dif on job.dataset_id = dif.dataset_id "
3582 sql += " JOIN plus on job.dataset_id = plus.dataset_id "
3583 sql += " WHERE gst.grid_id = %d " % grid_id
3584 sql += " AND gst.suspend != 1 "
3585
3586 sql += " AND task.status='WAITING' "
3587 sql += " ORDER BY job.priority DESC,"
3588 if fifo:
3589 sql += " dataset_id,"
3590 sql += " queue_id "
3591 sql += " LIMIT %u " % min(jobs_at_once,maxjobs)
3592
3593 self.logger.debug(sql)
3594 cursor.execute(sql)
3595 result_set = cursor.fetchall()
3596 self.commit()
3597
3598 if not len(result_set) > 0:
3599 self.logger.info("no jobs to queue at this time")
3600 return {}
3601 else:
3602 self.logger.info("reserved %d jobs" % len(result_set))
3603
3604
3605
3606
3607 - def QueueTasks(self,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0):
3608 """
3609 DAG mode
3610 Reserve at most 'maxjobs' from a given dataset.
3611 Get proc ids and set their status to 'QUEUEING'
3612 """
3613 from iceprod.server.job import i3Job,i3Task
3614 cursor = self.getcursor()
3615 job_list = {}
3616
3617
3618 sql = " SELECT SUM(1) AS total "
3619 sql += " FROM task "
3620 sql += " JOIN task_def_tray tdt ON task.task_def_tray_id = tdt.task_def_tray_id "
3621 sql += " JOIN task_def td ON tdt.task_def_id = td.task_def_id "
3622 sql += " JOIN grid_statistics gst ON gst.dataset_id = td.dataset_id "
3623 sql += " WHERE gst.grid_id = %d " % grid_id
3624 sql += " AND task.status = 'WAITING' "
3625 cursor.execute(sql)
3626 result = cursor.fetchone()
3627 self.commit()
3628 if result['total'] == None:
3629 self.logger.debug('queue total returned None')
3630 total = 0
3631 else:
3632 total = int(result['total'])
3633
3634
3635 maxjobs = min(maxjobs,total)
3636 self.logger.info('%d jobs are currently in the queue' % total)
3637
3638
3639 sql = " SELECT tdt.task_def_id, min(tdt.idx) as idx, tdt.task_def_tray_id "
3640 sql += " FROM task_def_tray tdt JOIN task_def td ON tdt.task_def_id = td.task_def_id "
3641 sql += " JOIN grid_statistics gs ON gs.task_def_id = td.task_def_id "
3642 sql += " JOIN dataset d ON d.dataset_id = td.dataset_id "
3643 sql += " WHERE d.status='PROCESSING' AND gs.grid_id = %u " % grid_id
3644 sql += " GROUP BY tdt.task_def_id "
3645 sql += " ORDER BY task_def_tray_id "
3646 self.logger.debug(sql)
3647 cursor.execute(sql)
3648 result_set = cursor.fetchall()
3649 tdts = map(lambda x: "%u" % x['task_def_tray_id'],result_set)
3650
3651
3652
3653 sql = " SELECT job.*,task.*, tdt.*, td.*, "
3654 sql += " plus.category, plus.subcategory, "
3655 sql += " YEAR(plus.start_datetime) as year, "
3656 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source "
3657 sql += " FROM task "
3658 sql += " JOIN task_def_tray tdt ON task.task_def_tray_id = tdt.task_def_tray_id "
3659 sql += " JOIN task_def td ON tdt.task_def_id = td.task_def_id "
3660 sql += " JOIN job ON task.job_id = job.job_id "
3661 sql += " JOIN dif on job.dataset_id = dif.dataset_id "
3662 sql += " JOIN plus on job.dataset_id = plus.dataset_id "
3663 sql += " JOIN grid_statistics gst ON gst.dataset_id = td.dataset_id AND gst.task_def_id = td.task_def_id "
3664 sql += " WHERE gst.grid_id = %d " % grid_id
3665 sql += " AND task.status='WAITING' "
3666 sql += " AND job.status IN ('QUEUED','PROCESSING') "
3667 sql += " AND task.task_def_tray_id IN %s " % str(tuple(tdts))
3668 sql += " GROUP BY tdt.task_def_id "
3669 sql += " ORDER BY job.priority DESC,"
3670 if fifo:
3671 sql += " job.dataset_id,"
3672 sql += " job.queue_id "
3673 sql += " LIMIT %u " % min(jobs_at_once,maxjobs)
3674
3675 self.logger.debug(sql)
3676 cursor.execute(sql)
3677 result_set = cursor.fetchall()
3678 self.commit()
3679
3680 if not len(result_set) > 0:
3681 self.logger.info("no jobs to queue at this time")
3682 return {}
3683 else:
3684 self.logger.info("reserved %d jobs" % len(result_set))
3685
3686 for item in result_set:
3687 proc = item['queue_id']
3688 item['subdirectory'] = "%05d-%05d" % ((proc/1000)*1000,((proc+1000)/1000)*1000-1)
3689
3690 if len(job_list) >= maxjobs:
3691 return job_list
3692
3693 passkey = item['passkey']
3694
3695 queue_id = item['queue_id']
3696 dataset_id = item['dataset_id']
3697 task_id = item['task_id']
3698 priority = item['priority']
3699 sql = " UPDATE task,job SET "
3700 sql += " task.last_status = task.status, "
3701 sql += " task.status='QUEUEING', "
3702 sql += " task.host=NULL, "
3703 sql += " task.grid_id=%s, " % grid_id
3704 sql += " task.status_changed=NOW(), "
3705 sql += " task.passkey = '%s' " % passkey
3706 sql += " WHERE task.job_id= job.job_id "
3707 sql += " AND task.task_id= %s " % task_id
3708 sql += " AND task.status='WAITING' "
3709 sql += " AND job.status IN ('QUEUED','PROCESSING') "
3710 self.logger.debug(sql)
3711 try:
3712 cursor.execute(sql)
3713 rowcount = self._conn.affected_rows()
3714 self.commit()
3715 except Exception, e:
3716 self.logger.error(e)
3717 continue
3718 if rowcount == 1:
3719 self.logger.debug("%d job has been marked as 'QUEUEING' " % rowcount)
3720 task = i3Task()
3721 task.SetDatabaseId(item['job_id'])
3722 task.idx = item['idx']
3723 task.iter = item['iter']
3724 task.SetDatasetId(dataset_id)
3725 task.SetProcNum(queue_id)
3726 task.SetPrio(priority)
3727 task.SetTaskId(task_id)
3728 task.task_name = item['name']
3729 task.passkey = passkey
3730 task.AddArgOption("key",passkey)
3731 if not job_list.has_key(dataset_id):
3732 job_list[dataset_id] = []
3733 job_list[dataset_id].append(task)
3734 elif rowcount == 0:
3735 self.logger.warn("someone beat me to task %u" % task_id)
3736 else:
3737 raise Exception, "get task:wrong number of rows affected"
3738
3739 return job_list
3740
3741
3743 """
3744 Update grid participation in dataset
3745 dataset.
3746 @param grid: grid or cluster
3747 @param dataset: dataset id
3748 """
3749
3750 cursor = self.getcursor()
3751 sql = " SELECT g.grid_id FROM grid_statistics gs "
3752 sql += " JOIN grid g on gs.grid_id = g.grid_id "
3753 sql += " WHERE gs.dataset_id = %u " % dataset
3754 if type(grid) is types.IntType:
3755 sql += " AND g.grid_id=%u " % grid
3756 elif grid not in ('any','*','all'):
3757 sql += " AND g.name='%s' " % grid
3758 cursor.execute(sql)
3759 gridmap = cursor.fetchall()
3760
3761 if not gridmap:
3762 sql = " INSERT INTO grid_statistics (grid_id,dataset_id) "
3763 if type(grid) is types.IntType:
3764 sql += " VALUES (%s,%s) " % (grid,dataset)
3765 else:
3766 sql += " SELECT grid_id,%s FROM grid where name = '%s' " % (dataset,grid)
3767 cursor.execute(sql)
3768
3769 sql = " UPDATE grid_statistics gs, grid g SET gs.suspend = %u " % suspend
3770 sql += " WHERE gs.dataset_id = %u " % dataset
3771 if type(grid) is types.IntType:
3772 sql += " AND g.grid_id=%u " % grid
3773 elif grid not in ('any','*','all'):
3774 sql += " AND g.name='%s' " % grid
3775 cursor.execute(sql)
3776
3777 self.commit()
3778
3779
3781 """
3782 Insert grid_statistics entries for grids which should run this
3783 dataset.
3784 @param grids: list of grids or clusters
3785 @param dataset_id: dataset id
3786 """
3787
3788 delim = ""
3789 sql = " SELECT grid_id FROM grid WHERE "
3790 for grid in grids:
3791 if type(grid) is types.IntType:
3792 sql += " %s grid_id=%u " % (delim,grid)
3793 elif grid not in ('any','*','all'):
3794 sql += " %s name='%s' " % (delim,grid)
3795 delim = "OR"
3796 logger.debug(sql)
3797 cursor = self.getcursor()
3798 cursor.execute(sql)
3799 result_set = cursor.fetchall()
3800 self.commit()
3801
3802 if len(result_set) == 0:
3803 self.logger.error("could not match grid name '%s'" % ":".join(grids))
3804 return
3805
3806
3807 sql = " SELECT grid_id,dataset_id,suspend FROM grid_statistics WHERE grid_id in ("
3808 delim = ""
3809 for item in result_set:
3810 sql += " %s %d " % (delim, item['grid_id'])
3811 delim = ","
3812 sql += ") and dataset_id = %d" % dataset_id
3813 logger.debug(sql)
3814 cursor = self.getcursor()
3815 cursor.execute(sql)
3816 result_set2 = cursor.fetchall()
3817
3818 insert_grids = []
3819 unsuspend_grids = []
3820 for item in result_set:
3821 flag = None
3822 for item2 in result_set2:
3823 if item['grid_id'] == item2['grid_id']:
3824 if item2['suspend'] == 1 or item2['suspend'] == '1':
3825 flag = 'u'
3826 else:
3827 flag = 'f'
3828 break
3829 if flag == 'u':
3830 unsuspend_grids.append(item['grid_id'])
3831 elif flag == None:
3832 insert_grids.append(item['grid_id'])
3833
3834
3835 if len(insert_grids) > 0:
3836 delim = ""
3837 sql = " INSERT INTO grid_statistics "
3838 sql += " (grid_id,dataset_id) VALUES "
3839 for g in insert_grids:
3840 sql += " %s (%d,%d) " % ( delim, g, dataset_id)
3841 delim = ","
3842 logger.debug(sql)
3843 cursor = self.getcursor()
3844 cursor.execute(sql)
3845 self.commit()
3846
3847
3848 if len(unsuspend_grids) > 0:
3849 delim = ""
3850 sql = " UPDATE grid_statistics "
3851 sql += " set suspend = 1 WHERE grid_id in ( "
3852 for g in unsuspend_grids:
3853 sql += " %s %d " % ( delim, g)
3854 delim = ","
3855 sql += ") and dataset_id = %d" % dataset_id
3856 logger.debug(sql)
3857 cursor = self.getcursor()
3858 cursor.execute(sql)
3859 self.commit()
3860
3861
3863 """
3864 Insert grid_statistics entries for grids which should run this
3865 dataset.
3866 @param grids: list of grids or clusters
3867 @param dataset_id: dataset id
3868 """
3869 cursor = self.getcursor()
3870
3871
3872
3873
3874 sql = " SELECT grid_id,task_def_id FROM grid_statistics WHERE dataset_id = %s" % dataset_id
3875 cursor.execute(sql)
3876 matches = {}
3877 for item in cursor.fetchall():
3878 matches[ (item['grid_id'],item['task_def_id']) ] = True
3879
3880
3881
3882
3883 delim = ""
3884 sql = " SELECT grid.name, grid_req.* FROM grid "
3885 sql += " JOIN grid_req ON grid.grid_id = grid_req.grid_id "
3886 sql += " WHERE grid.name in (%s) " % ",".join(map(lambda x: '"'+x+'"', grids))
3887 grid_ids = filter(lambda x: isinstance(x,int),grids)
3888 if len(grid_ids):
3889 sql += " OR grid.grid_id in (%s) " % ",".join(grid_ids)
3890 self.logger.debug(sql)
3891 cursor.execute(sql)
3892 gridmap = cursor.fetchall()
3893
3894
3895
3896
3897 sql = " SELECT task_def.* FROM task_def "
3898 sql += " WHERE dataset_id = %u " % dataset_id
3899 self.logger.debug(sql)
3900 cursor.execute(sql)
3901 result_set = cursor.fetchall()
3902 import string
3903
3904
3905
3906
3907
3908 inserts = []
3909 sql = " INSERT IGNORE INTO grid_statistics (grid_id,dataset_id,task_def_id) VALUES (%s,%s,%s) "
3910 for entry in result_set:
3911 tdef = steering.GetTaskDefinition(entry['name'])
3912 for grid in gridmap:
3913 include_grid = True
3914 for req in map(string.strip,tdef.requirements.split("&&")):
3915 include_grid = include_grid and grid['req'].lower() == req.lower()
3916 if include_grid and not matches.has_key( (grid['grid_id'],entry['task_def_id'])) :
3917 inserts.append((grid['grid_id'],dataset_id,entry['task_def_id']))
3918 self.logger.info("found match for '%s' reqs on grid %s" % (entry['name'],grid['grid_id'] ))
3919 else:
3920 self.logger.error("no match for '%s' reqs on grid %s" % (entry['name'],grid['grid_id'] ))
3921 cursor.executemany(sql, inserts)
3922 self.commit()
3923
3924
3925 - def InitializeJobTable(self,maxjobs,dataset_id,priority=0,stepsize=1000, start_qid=0, status='WAITING'):
3926 """
3927 Create job monitoring entries in database
3928 """
3929 cursor = self.getcursor()
3930
3931 sql = " INSERT INTO job "
3932 sql += " (queue_id,status,dataset_id,priority,status_changed) VALUES "
3933
3934 qstart = start_qid
3935 qend = start_qid + maxjobs
3936 for i in range(qstart,qend,min(stepsize,maxjobs)):
3937 comma = ""
3938 sql1 = sql
3939 for job in range(i,min(i+stepsize,qend)):
3940 sql1 += comma + " (%d,'%s',%d,%d,NOW()) " % ( job, status, dataset_id, priority )
3941 comma = ","
3942 cursor.execute(sql1)
3943 self.commit()
3944
3945
3946 - def GetJob(self,dataset_id=0,queue_id=0):
3972
3973
3974 - def jobstart(self,hostname,grid_id,dataset_id=0,queue_id=0,key=None):
3975 """
3976 Change the status of a job to indicate it is currently running
3977 @param hostname: host where job was queued from
3978 @param grid_id: ID of iceprod queue
3979 @param dataset_id: Optional dataset ID
3980 @param queue_id: Optional job ID (within dataset)
3981 @param key: temporary passkey to avoid job spoofs
3982 @return: dataset_id,nproc,procnum
3983 """
3984
3985 cursor = self.getcursor()
3986
3987 sql = " SELECT jobs_submitted "
3988 sql += " FROM dataset "
3989 sql += " WHERE dataset_id = %d " % dataset_id
3990 cursor.execute(sql)
3991 item = cursor.fetchone()
3992
3993 jobs_submitted = item['jobs_submitted']
3994
3995 sql = " UPDATE job SET "
3996 sql += " job.prev_state = job.status, "
3997 sql += " job.status='PROCESSING', "
3998 sql += " job.grid_id=%d, " % grid_id
3999 sql += " job.host='%s', " % hostname
4000 sql += " job.tray=0, "
4001 sql += " job.iter=0, "
4002 sql += " status_changed=NOW(), "
4003 sql += " keepalive=NOW() "
4004 sql += " WHERE job.status IN ('QUEUEING','QUEUED','PROCESSING','EVICTED') "
4005 sql += " AND job.queue_id=%d " % queue_id
4006 sql += " AND job.dataset_id=%d " % dataset_id
4007 sql += " AND job.passkey='%s' " % key
4008 rowcount = self.execute(cursor,sql)
4009 self.commit()
4010
4011 sql = " SELECT * FROM job "
4012 sql += " WHERE dataset_id = %d " % dataset_id
4013 sql += " AND job.queue_id=%d " % queue_id
4014 sql += " AND job.passkey='%s' " % key
4015 cursor.execute(sql)
4016 job = cursor.fetchone()
4017
4018
4019 if job:
4020 self.logger.debug("updated job %(dataset_id)s.%(queue_id)s with passkey %(passkey)s" % job )
4021 return job['dataset_id'],jobs_submitted,job['queue_id']
4022 return -1,0,0
4023
4024 - def jobreset(self,dataset_id,job_id,reason=None,passkey=None):
4025 """
4026 Update status for job
4027 @param dataset_id: dataset index
4028 @param job_id: process number within dataset
4029 """
4030 cursor = self.getcursor()
4031 sql = " SELECT grid_queue_id FROM job "
4032 sql += " WHERE dataset_id=%d " % dataset_id
4033 sql += " AND queue_id=%d " % job_id
4034 if passkey:
4035 sql += " AND passkey='%s' " % passkey
4036 cursor.execute(sql)
4037 qid = cursor.fetchone()
4038
4039 sql = " UPDATE job SET "
4040 sql += " tray=0, "
4041 sql += " iter=0, "
4042 sql += " prev_state = status, "
4043 if reason:
4044 sql += " errormessage = '%s', " % self.defang(reason)
4045 sql += " status='RESET', "
4046 sql += " status_changed=NOW() "
4047 sql += " WHERE dataset_id=%d " % dataset_id
4048 sql += " AND queue_id=%d " % job_id
4049 if passkey:
4050 sql += " AND passkey='%s' " % passkey
4051 self.execute(cursor,sql)
4052
4053 return qid['grid_queue_id']
4054
4055 - def jobcopying(self,dataset_id,job_id,passkey=None):
4056 """
4057 Update status for job
4058 @param dataset_id: dataset index
4059 @param job_id: process number within dataset
4060 """
4061 cursor = self.getcursor()
4062 sql = " UPDATE job SET "
4063 sql += " prev_state = status, "
4064 sql += " status='COPYING', "
4065 sql += " status_changed=NOW() "
4066 sql += " WHERE dataset_id=%d " % dataset_id
4067 sql += " AND queue_id=%d " % job_id
4068 if passkey:
4069 sql += " AND passkey='%s' " % passkey
4070 self.execute(cursor,sql)
4071 return 1
4072
4073 - def jobfinalize(self,dataset_id,job_id,job,status='OK',clear_errors=True):
4074 """
4075 Update status for job
4076 @param dataset_id: dataset index
4077 @param job_id: process number within dataset
4078 """
4079 cursor = self.getcursor()
4080
4081 sql = " SELECT * FROM job "
4082 sql += " WHERE dataset_id=%d " % dataset_id
4083 sql += " AND queue_id=%d " % job_id
4084 cursor.execute(sql)
4085 job = cursor.fetchone()
4086
4087 sql = " UPDATE job SET "
4088 sql += " prev_state = status, "
4089 sql += " status='%s', " % status
4090 if clear_errors:
4091 sql += " errormessage = NULL, "
4092 sql += " status_changed=NOW() "
4093 sql += " WHERE dataset_id=%d " % dataset_id
4094 sql += " AND queue_id=%d " % job_id
4095 sql += " AND passkey='%s' " % job['passkey']
4096 self.execute(cursor,sql)
4097
4098 if status == 'OK':
4099
4100 sql = " UPDATE grid_statistics SET "
4101 sql += " grid_statistics.ok = grid_statistics.ok+1 "
4102 sql += " WHERE grid_statistics.dataset_id= %u " % job['dataset_id']
4103 sql += " AND grid_statistics.grid_id= %u " % job['grid_id']
4104 self.logger.debug(sql)
4105 try:
4106 self.execute(cursor,sql)
4107 except Exception,e:
4108 self.logger.error(e)
4109 self.logger.debug("wrote stats for job %d,%d " % (dataset_id,job_id))
4110
4111 return 0,job['submitdir']
4112
4114 """
4115 Get collected dataset statistics
4116 @param dataset_id: dataset index
4117 """
4118 stats = {}
4119 cursor = self.getcursor()
4120 sql = " SELECT name,value FROM dataset_statistics "
4121 sql += " WHERE dataset_id='%s' " % dataset_id
4122 cursor.execute(sql)
4123 for entry in cursor.fetchall():
4124 stats[entry['name']] = entry['value']
4125 self.commit()
4126 return stats
4127
4128 - def jobfinish(self,dataset_id,job_id,stats,key=None,mode=0):
4129 """
4130 Update monitoring for job and write statistics
4131 @param dataset_id: dataset index
4132 @param job_id: process number within dataset
4133 @param stats: dictonary of stat entries
4134 """
4135 passkey = self.mkkey(6,9)
4136 cursor = self.getcursor()
4137 sql = " UPDATE job SET "
4138 sql += " prev_state = status, "
4139 if mode == 1:
4140 sql += " status = 'COPIED', "
4141
4142 else:
4143 sql += " status = 'READYTOCOPY', "
4144 if stats.has_key('mem_heap'):
4145 sql += " mem_heap = %s, " % stats['mem_heap']
4146 if stats.has_key('mem_heap_peak'):
4147 sql += " mem_heap_peak = %s, " % stats['mem_heap_peak']
4148 if stats.has_key('user_time'):
4149 sql += " time_user = %s, " % stats['user_time']
4150 if stats.has_key('sys_time'):
4151 sql += " time_sys = %s, " % stats['sys_time']
4152 if stats.has_key('real_time'):
4153 sql += " time_real = %s, " % stats['real_time']
4154 if stats.has_key('Triggered Events'):
4155 sql += " nevents = %s, " % stats['Triggered Events']
4156 if stats.has_key('Generated Events'):
4157 sql += " gevents = %s, " % stats['Generated Events']
4158 sql += " job.passkey='%s', " % passkey
4159 sql += " status_changed=NOW() "
4160 sql += " WHERE dataset_id=%d " % dataset_id
4161 sql += " AND queue_id=%d " % job_id
4162 sql += " AND job.passkey='%s' " % key
4163 self.logger.debug(sql)
4164 rowcount = self.execute(cursor,sql)
4165
4166 sql = " SELECT grid_id,host FROM job "
4167 sql += " WHERE queue_id=%d " % job_id
4168 sql += " AND dataset_id=%d " % dataset_id
4169 self.logger.debug(sql)
4170 cursor.execute(sql)
4171 gridinfo = cursor.fetchone()
4172 self.commit()
4173 if not gridinfo:
4174 return (rowcount+1)%2
4175
4176
4177 if len(stats):
4178
4179 if gridinfo("host"):
4180
4181 self.update_node_statistics(gridinfo,stats)
4182
4183 cm = ""
4184 sql = " REPLACE INTO job_statistics "
4185 sql += " (queue_id,dataset_id,name,value) VALUES "
4186 for key,value in stats.items():
4187 try:
4188 value = float(value)
4189 sql += "%s (%d,%d,'%s',%f) " % (cm,job_id,dataset_id,key,value)
4190 cm = ","
4191 except: continue
4192 cursor.execute(sql)
4193
4194 sql = " UPDATE grid_statistics SET "
4195 sql += " time_real = time_real + %g, " % stats['real_time']
4196 sql += " time_sys = time_sys + %g, " % stats['sys_time']
4197 sql += " time_user = time_user + %g " % stats['user_time']
4198 sql += " WHERE grid_statistics.dataset_id= %u " % dataset_id
4199 sql += " AND grid_statistics.grid_id= %u " % gridinfo['grid_id']
4200 cursor.execute(sql)
4201
4202 self.commit()
4203
4204 return (rowcount+1)%2
4205
4206
4208
4209 cursor = self.getcursor()
4210
4211
4212 gridinfo['hostname'] = gridinfo['host']
4213 if stats.has_key('hostname'):
4214 gridinfo['hostname'] = stats['hostname']
4215
4216
4217 sql = " INSERT IGNORE INTO node_statistics "
4218 sql += " (name,grid_id,host_id) VALUES ('%s',1,'%s') " % (gridinfo['hostname'],gridinfo['host'])
4219 self.logger.debug(sql)
4220 cursor.execute(sql)
4221 self.commit()
4222
4223 sql = " UPDATE node_statistics SET "
4224 if retval == 0:
4225 sql += " completed = completed+1, "
4226 elif retval == 1:
4227 sql += " evictions = evictions+1, "
4228 elif retval == 2:
4229 sql += " failures = failures+1, "
4230 if stats.has_key('platform'):
4231 sql += " node_statistics.platform='%s', " % stats['platform']
4232 if stats.has_key('mem_heap'):
4233 sql += " mem_heap = mem_heap+%s, " % stats['mem_heap']
4234 if stats.has_key('mem_heap_peak'):
4235 sql += " mem_heap_peak = mem_heap_peak+%s, " % stats['mem_heap_peak']
4236 if stats.has_key('user_time'):
4237 sql += " time_user = time_user+%s, " % stats['user_time']
4238 if stats.has_key('sys_time'):
4239 sql += " time_sys = time_sys+%s, " % stats['sys_time']
4240 if stats.has_key('real_time'):
4241 sql += " time_real = time_real+%s, " % stats['real_time']
4242 sql += " name = '%s' " % gridinfo['hostname']
4243
4244 sql += " WHERE host_id='%s' " % gridinfo['hostname']
4245 self.logger.debug(sql)
4246 cursor.execute(sql)
4247 self.commit()
4248
4249
4250 - def jobsubmitted(self,dataset_id,job_id,submitdir,grid_queue_id=None):
4251 """
4252 Set the submission path of job so that it can be post processed
4253 on termination.
4254 """
4255 cursor = self.getcursor()
4256 sql = " UPDATE job SET "
4257 if not grid_queue_id == -1:
4258 sql += " grid_queue_id='%s', " % grid_queue_id
4259 sql += " submitdir=\"%s\" " % submitdir
4260 sql += " WHERE queue_id=%d " % job_id
4261 sql += " AND dataset_id=%d " % dataset_id
4262 self.logger.debug(sql)
4263 self.execute(cursor,sql)
4264
4265 cursor = self.getcursor()
4266 sql = " UPDATE job SET "
4267 sql += " prev_state = status, "
4268 sql += " status='QUEUED' "
4269 sql += " WHERE queue_id=%d " % job_id
4270 sql += " AND dataset_id=%d " % dataset_id
4271 sql += " AND status='QUEUEING' "
4272 self.logger.debug(sql)
4273 self.execute(cursor,sql)
4274 return 1
4275
4276 - def jobping(self,dataset_id,job_id,host,key=None,tray=0,iter=0):
4277 """
4278 Update status_changed time for job
4279 """
4280 cursor = self.getcursor()
4281 sql = " SELECT status from job "
4282 sql += " WHERE queue_id=%d " % job_id
4283 sql += " AND dataset_id=%d " % dataset_id
4284 sql += " AND passkey='%s' " % key
4285 self.logger.debug(sql)
4286 cursor.execute(sql)
4287 row = cursor.fetchone();
4288 if not (row and row['status'] == 'PROCESSING'):
4289 return False
4290
4291 sql = " UPDATE job SET "
4292 sql += " tray=%d, " % tray
4293 sql += " iter=%d, " % iter
4294 sql += " keepalive=NOW() "
4295 sql += " WHERE queue_id=%d " % job_id
4296 sql += " AND dataset_id=%d " % dataset_id
4297 sql += " AND status='PROCESSING' "
4298 sql += " AND passkey='%s' " % key
4299 self.logger.debug(sql)
4300 self.execute(cursor,sql)
4301 return True
4302
4303 - def jobabort(self,job_id,dataset_id,error,errormessage='',key=None,stats={}):
4304 """
4305 Reset any pending jobs to they get reprocesses.
4306 This would typically be run at startup in case the daemon
4307 crashed previously.
4308 @todo: update node statistics
4309 """
4310 cursor = self.getcursor()
4311
4312 sql = " SELECT debug FROM dataset WHERE dataset_id = %d " % dataset_id
4313 self.logger.debug(sql)
4314 cursor.execute(sql)
4315 debug = cursor.fetchone()['debug']
4316
4317 sql = " SELECT * FROM job "
4318 sql += " WHERE dataset_id = %u AND queue_id = %u " % (dataset_id,job_id)
4319 cursor.execute(sql)
4320 job = cursor.fetchone()
4321
4322 sql = " UPDATE job SET "
4323 sql += " prev_state = status, "
4324 sql += " errormessage=CONCAT(NOW(),QUOTE('%s')), " % self.defang(errormessage)
4325 sql += " status_changed=NOW(), "
4326
4327 if error == 1:
4328 sql += " evictions=evictions+1, "
4329 sql += " status='EVICTED', "
4330 if error == 2:
4331 sql += " failures=failures+1, "
4332 if debug:
4333 sql += " status='SUSPENDED', "
4334 else:
4335 sql += " status='ERROR', "
4336 sql += " nevents=0 "
4337 sql += " WHERE queue_id=%d " % job['queue_id']
4338 sql += " AND dataset_id=%d " % job['dataset_id']
4339 if key:
4340 sql += " AND passkey='%s' " % key
4341 self.logger.debug(sql)
4342 self.execute(cursor,sql)
4343 self.commit()
4344
4345 if stats:
4346
4347 sql = " SELECT grid_id,host FROM job "
4348 sql += " WHERE queue_id=%d " % job['queue_id']
4349 sql += " AND dataset_id=%d " % job['dataset_id']
4350 self.logger.debug(sql)
4351 cursor.execute(sql)
4352 gridinfo = cursor.fetchone()
4353 if gridinfo and gridinfo["host"]:
4354 self.update_node_statistics(gridinfo,stats,error)
4355
4356 try:
4357 self.execute(cursor,sql)
4358 except Exception,e:
4359 self.logger.error(e)
4360
4361
4362 sql = " UPDATE grid_statistics SET "
4363 if stats:
4364 try:
4365 sql += " time_real = time_real + %g, " % stats['real_time']
4366 sql += " time_sys = time_sys + %g, " % stats['sys_time']
4367 sql += " time_user = time_user + %g, " % stats['user_time']
4368 except: pass
4369
4370 if error == 1:
4371 sql += " grid_statistics.evictions = grid_statistics.evictions+1 "
4372 else:
4373 sql += " grid_statistics.failures = grid_statistics.failures+1 "
4374 sql += " WHERE grid_statistics.dataset_id= %u " % job['dataset_id']
4375 sql += " AND grid_statistics.grid_id= %u " % job['grid_id']
4376 self.logger.debug(sql)
4377 try:
4378 self.execute(cursor,sql)
4379 except Exception,e:
4380 self.logger.error(e)
4381
4382 self.commit()
4383
4384
4385 - def jobclean(self,dataset_id,archive=True):
4386 """
4387 Remove jobs from queueue
4388 """
4389 cursor = self.getcursor()
4390
4391 sql = " SELECT name, SUM(value) as total "
4392 sql += " FROM job_statistics "
4393 sql += " WHERE dataset_id=%d " % dataset_id
4394 sql += " GROUP BY name "
4395 cursor.execute(sql)
4396 results = cursor.fetchall()
4397 self.commit()
4398 if results:
4399
4400
4401 cm = ""
4402 sql = " REPLACE INTO dataset_statistics "
4403 sql += " (name,value,dataset_id) VALUES "
4404 for entry in results:
4405 sql += "%s ('%s',%f,%d) " % (cm,entry['name'],entry['total'],dataset_id)
4406 cm = ","
4407 cursor.execute(sql)
4408 self.commit()
4409 else:
4410 self.logger.warn("jobclean: no job statistics found for dataset %d" % dataset_id)
4411
4412 if archive:
4413
4414 sql = " INSERT INTO archived_job "
4415 sql += " SELECT job.* FROM job WHERE dataset_id = %u " % dataset_id
4416 self.logger.debug(sql)
4417 cursor.execute(sql)
4418 self.commit()
4419
4420
4421 sql = " INSERT INTO archived_job_statistics "
4422 sql += " SELECT job_statistics.* FROM job_statistics WHERE dataset_id = %u " % dataset_id
4423 self.logger.debug(sql)
4424
4425
4426
4427 sql = " DELETE FROM job "
4428 sql += " WHERE dataset_id=%d " % dataset_id
4429 self.logger.debug(sql)
4430 cursor.execute(sql)
4431 self.commit()
4432
4433
4434 sql = " DELETE FROM job_statistics "
4435 sql += " WHERE dataset_id=%d " % dataset_id
4436 sql += " limit 10000 "
4437 try:
4438 while True:
4439 cursor.execute(sql)
4440 if cursor.rowcount < 1:
4441 break
4442 self.commit()
4443 time.sleep(10)
4444 except Exception,e:
4445 self.logger.error('%s: could not delete job_statistics entry.' % str(e))
4446 self.commit()
4447
4448
4449 - def jobsuspend(self,job_id,dataset_id,suspend=True):
4450 """
4451 Reset any pending jobs to they get reprocesses.
4452 This would typically be run at startup in case the daemon
4453 crashed previously.
4454 """
4455 if suspend: status = 'SUSPENDED'
4456 else: status = 'RESET'
4457
4458 passkey = self.mkkey(6,9)
4459 cursor = self.getcursor()
4460
4461 sql = " UPDATE job SET "
4462 sql += " prev_state = status, "
4463 if status == 'RESET':
4464 sql += " failures=0, "
4465 sql += " status='%s', " % status
4466 sql += " passkey='%s', " % passkey
4467 sql += " status_changed=NOW() "
4468 sql += " WHERE dataset_id=%d " % dataset_id
4469 sql += " AND status != 'OK' "
4470 sql += " AND status NOT LIKE 'IDL%' "
4471 if not job_id < 0:
4472 sql += " AND queue_id=%d " % job_id
4473 self.execute(cursor,sql)
4474
4475 - def jobsetstatus(self,dataset_id,job_id=-1,status="RESET",reason=None,passkey=None):
4476 cursor = self.getcursor()
4477 sql = " UPDATE job SET "
4478 sql += " tray=0, "
4479 sql += " iter=0, "
4480 sql += " prev_state = status, "
4481 if reason:
4482 sql += " errormessage = '%s', " % reason
4483 sql += " status='%s', " % status
4484 sql += " status_changed=NOW() "
4485 sql += " WHERE dataset_id=%d " % dataset_id
4486 if job_id != -1:
4487 sql += " AND queue_id=%d " % job_id
4488 if passkey:
4489 sql += " AND passkey='%s' " % passkey
4490 self.execute(cursor,sql)
4491
4493 """
4494 Update status of dataset
4495 """
4496 cursor = self.getcursor()
4497 sql = " UPDATE dataset "
4498 sql += " SET debug = !debug "
4499 sql += " WHERE dataset_id=%d " % dataset
4500 cursor.execute(sql)
4501 self.commit()
4502
4504 """
4505 Get status of dataset
4506 """
4507 cursor = self.getcursor()
4508 sql = " SELECT dataset_id,status from dataset "
4509 sql += " WHERE dataset_id=%d " % dataset
4510 cursor.execute(sql)
4511 return cursor.fetchall()
4512
4514 """
4515 Update status of dataset
4516 """
4517 cursor = self.getcursor()
4518 sql = " UPDATE dataset "
4519 sql += " SET status='%s' " % status
4520 sql += " WHERE dataset_id=%d " % dataset
4521 cursor.execute(sql)
4522 self.commit()
4523
4525 """
4526 Change Plus:subcategory in DIFPlus metadata
4527 """
4528 cursor = self.getcursor()
4529 sql = " UPDATE plus "
4530 sql += " SET subcategory='%s' " % sub_cat
4531 sql += " WHERE dataset_id=%d " % dataset_id
4532 self.logger.debug(sql)
4533 cursor.execute(sql)
4534
4535 - def validate(self,dataset_id,status='TRUE'):
4536 """
4537 Mark dataset as visible and valid.
4538 """
4539 cursor = self.getcursor()
4540 sql = " UPDATE dataset SET verified = '%s' " % status
4541 sql += " WHERE dataset_id = %d " % dataset_id
4542 self.logger.debug(sql)
4543 cursor.execute(sql)
4544 self.commit()
4545
4547 """
4548 Change the status of a job to indicate it is currently running
4549 @param dataset_id: Dataset ID
4550 @param queue_id: Queue ID (within dataset)
4551 @param key: temporary passkey to avoid job spoofs
4552 @return: dataset_id,nproc,procnum
4553 """
4554
4555 cursor = self.getcursor()
4556
4557 sql = " SELECT jobs_submitted "
4558 sql += " FROM dataset "
4559 sql += " WHERE dataset_id = %s "
4560 cursor.execute(sql,(dataset_id,))
4561 item = cursor.fetchone()
4562
4563 jobs_submitted = item['jobs_submitted']
4564
4565 self.logger.debug("Job %d.%d starting with key %s" % (dataset_id,queue_id,key))
4566
4567
4568 sql = " UPDATE job SET "
4569 sql += " job.prev_state = job.status, "
4570 sql += " job.status='PROCESSING', "
4571
4572 sql += " status_changed=NOW(), "
4573 sql += " keepalive=NOW() "
4574 sql += " WHERE "
4575 sql += " (job.status='QUEUED' OR job.status='PROCESSING' OR job.status='EVICTED')"
4576 sql += " AND job.dataset_id=%s "
4577 sql += " AND job.queue_id=%s "
4578 sql += " AND job.passkey=%s "
4579 cursor.execute(sql,(dataset_id,queue_id,key))
4580
4581
4582 sql = " SELECT EXISTS("
4583 sql += " SELECT *"
4584 sql += " FROM job j"
4585 sql += " WHERE status = 'PROCESSING'"
4586 sql += " AND dataset_id = %s"
4587 sql += " AND queue_id = %s"
4588 sql += " AND passkey = %s"
4589 sql += ") AS found"
4590 cursor.execute(sql,(dataset_id,queue_id,key))
4591 row = cursor.fetchone()
4592 if row and int(row['found']) == 1:
4593 return (dataset_id,jobs_submitted,queue_id)
4594 return (TASK_DATASET_ERROR_ID,0,0)
4595
4597 it = self.get_iterations(dataset_id)
4598 tray,iter = 0,0
4599 if it: tray,iter = it[-1]['tray_index'],it[-1]['iterations']+1
4600 cursor = self.getcursor()
4601 sql = " UPDATE job SET "
4602 sql += " prev_state = status, "
4603 sql += " status = 'OK', "
4604 sql += " tray = %s, "
4605 sql += " iter = %s, "
4606 sql += " errormessage = NULL, "
4607 sql += " status_changed=NOW() "
4608 sql += " WHERE dataset_id=%s "
4609 sql += " AND queue_id=%s "
4610 sql += " AND passkey=%s "
4611 self.logger.debug(sql % (tray,iter,dataset_id,queue_id,key))
4612 cursor.execute(sql,(tray,iter,dataset_id,queue_id,key))
4613 rows = self._conn.affected_rows()
4614 if not rows: return rows
4615
4616
4617 sql = " REPLACE INTO job_statistics "
4618 sql += " ( "
4619 sql += " SELECT NULL,j.dataset_id, j.queue_id, "
4620 sql += " ts.name, AVG(ts.value) AS value "
4621 sql += " FROM task_statistics ts "
4622 sql += " JOIN task t ON ts.task_id = t.task_id "
4623 sql += " JOIN job j ON j.job_id = t.job_id "
4624 sql += " WHERE j.dataset_id = %s AND j.queue_id = %s "
4625 sql += " GROUP BY name "
4626 sql += " ) "
4627 cursor.execute(sql,(dataset_id,queue_id))
4628
4629 return self._conn.affected_rows()
4630
4632 cursor = self.getcursor()
4633 sql = " SELECT tray.tray_index, tray.iterations "
4634 sql += " FROM tray WHERE dataset_id = %s "
4635 cursor.execute(sql,(dataset_id,))
4636 return cursor.fetchall()
4637
4639 cursor = self.getcursor()
4640
4641
4642 sql = " SELECT task_def_tray_id"
4643 sql += " FROM task_def_tray tdt"
4644 sql += " WHERE task_def_id = %s"
4645 sql += " AND idx = %s"
4646 sql += " AND iter = %s"
4647 cursor.execute(sql,(task_def_id,tray,iter))
4648 row = cursor.fetchone()
4649 if not row:
4650 self.logger.error("could not find task_def_tray_id %s %s" % (task_def_id,idx))
4651 return 0
4652 tdt_id = row['task_def_tray_id']
4653
4654 sql = " SELECT task_id"
4655 sql += " FROM task t, task_def_tray tdt"
4656 sql += " WHERE t.task_def_tray_id = tdt.task_def_tray_id"
4657 sql += " AND tdt.task_def_tray_id = %s"
4658 sql += " AND t.job_id = %s"
4659 cursor.execute(sql,(tdt_id,job_id,))
4660 row = cursor.fetchone()
4661 if row:
4662 return row['task_id']
4663 return 0
4664
4665
4666 - def task_start(self,dataset_id,queue_id,taskname,tray,iter,hostname,key=''):
4667 cursor = self.getcursor()
4668
4669 self.logger.info("task %s (tray: %d iter: %d) starting for job %d.%d" \
4670 % (taskname,tray,iter,dataset_id,queue_id))
4671
4672
4673 sql = " SELECT job_id"
4674 sql += " FROM job j"
4675 sql += " WHERE dataset_id = %s"
4676 sql += " AND queue_id = %s"
4677 sql += " AND passkey = %s"
4678 cursor.execute(sql,(dataset_id,queue_id,key))
4679 row = cursor.fetchone()
4680 if not row:
4681 msg = "task %s tried to start for job %d.%d with wrong passkey" \
4682 % (taskname,dataset_id,queue_id)
4683 self.logger.warn(msg)
4684 return TASK_ERROR_ID
4685 job_id = row['job_id']
4686
4687
4688 sql = " SELECT task_def_id"
4689 sql += " FROM task_def td"
4690 sql += " WHERE dataset_id = %s"
4691 sql += " AND name = %s"
4692 cursor.execute(sql,(dataset_id,taskname))
4693 row = cursor.fetchone()
4694 if not row:
4695 msg = "task %s not found for job %d.%d" % (taskname,dataset_id,queue_id)
4696 self.logger.error(msg)
4697 return TASK_ERROR_ID
4698 task_def_id = row['task_def_id']
4699
4700
4701 sql = " SELECT task_def_tray_id"
4702 sql += " FROM task_def_tray tdt"
4703 sql += " WHERE task_def_id = %s"
4704 sql += " AND idx = %s"
4705 sql += " AND iter = %s"
4706 cursor.execute(sql,(task_def_id,tray,iter))
4707 row = cursor.fetchone()
4708 if not row:
4709 msg = "tray %d, iter %d not found for task %s in job %d.%d" \
4710 % (tray,iter,taskname,dataset_id,queue_id)
4711 self.logger.error(msg)
4712 return TASK_ERROR_ID
4713 tdt_id = row['task_def_tray_id']
4714
4715 sql = " SELECT task_id"
4716 sql += " FROM task t, task_def_tray tdt"
4717 sql += " WHERE t.task_def_tray_id = tdt.task_def_tray_id"
4718 sql += " AND tdt.task_def_tray_id = %s"
4719 sql += " AND t.job_id = %s"
4720 cursor.execute(sql,(tdt_id,job_id))
4721 row = cursor.fetchone()
4722 if row:
4723
4724 task_id = row['task_id']
4725 sql = " UPDATE task"
4726 sql += " SET last_status = status,"
4727 sql += " status = 'STARTING',"
4728 sql += " status_changed = NOW(),"
4729 sql += " host = %s,"
4730 sql += " start = NOW(),"
4731 sql += " finish = NULL"
4732 sql += " WHERE task_id = %s"
4733 cursor.execute(sql,(hostname,task_id))
4734
4735
4736 sql = " DELETE FROM task_statistics WHERE task_id = %s"
4737 cursor.execute(sql,(task_id,))
4738 else:
4739
4740 sql = "INSERT INTO task (task_def_tray_id, job_id, "
4741 sql += " host, status, status_changed, start)"
4742 sql += " VALUES (%s, %s, %s, 'STARTING', NOW(), NOW())"
4743 cursor.execute(sql,(tdt_id,job_id,hostname))
4744 task_id = self._conn.insert_id()
4745 return task_id
4746
4747 - def task_init(self,dataset_id,job_id,tray=None,iter=None):
4748 """
4749 Initialize task entry and set status to IDLE
4750 @param: dataset_id
4751 @param: job_id
4752 @param: taskname
4753 @param: tray
4754 @param: iter
4755 """
4756 cursor = self.getcursor()
4757
4758
4759 sql = " SELECT tdt.task_def_tray_id, td.task_def_id, tdt.idx, tdt.iter "
4760 sql += " FROM task_def_tray tdt"
4761 sql += " JOIN task_def td "
4762 sql += " ON td.task_def_id = tdt.task_def_id "
4763 sql += " WHERE dataset_id = %s" % dataset_id
4764 if tray:
4765 sql += " AND idx = %s" % tray
4766 if iter:
4767 sql += " AND iter = %s" % iter
4768 cursor.execute(sql)
4769 rows = cursor.fetchall()
4770 if not rows: return 0
4771
4772 inserts = map(lambda x: (x['task_def_tray_id'],job_id), rows)
4773 sql = "INSERT IGNORE INTO task (task_def_tray_id, job_id, status, status_changed)"
4774 sql += " VALUES (%s, %s, 'IDLE', NOW())"
4775 cursor.executemany(sql, inserts)
4776 self.commit()
4777 return self.get_task_id(rows[0]["task_def_id"],job_id, rows[0]['idx'], rows[0]['iter'])
4778
4780 """
4781 Get status for task
4782 @param: task_id
4783 """
4784 cursor = self.getcursor()
4785
4786
4787 sql = " SELECT * FROM task WHERE task_id = %s " % task_id
4788 cursor.execute(sql)
4789 row = cursor.fetchone()
4790 if not row: return 0
4791 return row["status"]
4792
4793
4795 if not cursor:
4796 cursor = self.getcursor()
4797 sql = " UPDATE task t,job j"
4798 sql += " SET t.last_status = t.status,"
4799 sql += " t.status = %s,"
4800 if grid_id:
4801 sql += " t.grid_id = %s,"
4802 sql += " t.status_changed = NOW()"
4803 sql += " WHERE t.job_id = j.job_id"
4804 sql += " AND t.task_id = %s"
4805 sql += " AND j.passkey = %s"
4806 sql += " AND t.status != 'OK'"
4807 self.logger.debug("task status update starting for task %d" % task_id)
4808 if grid_id:
4809 rowcount = cursor.execute(sql,(status,grid_id,task_id,key))
4810 self.logger.debug(sql % (status,grid_id,task_id,key))
4811 else:
4812 rowcount = cursor.execute(sql,(status,task_id,key))
4813 self.logger.debug(sql % (status,task_id,key))
4814 self.logger.debug("task status update done for task %d" % task_id)
4815 self.commit()
4816 return rowcount
4817
4818
4820 cursor = self.getcursor()
4821 sql = " SELECT COUNT(*) AS expected"
4822 sql += " FROM task_def_tray tdt"
4823 sql += " WHERE tdt.task_def_id = %s"
4824 params = [td_id]
4825 add_sql = ""
4826 if tray is not None:
4827 add_sql += " AND tdt.idx = %s"
4828 params.append(tray)
4829 if iter is not None:
4830 add_sql += " AND tdt.iter = %s"
4831 params.append(iter)
4832 if add_sql:
4833 sql += add_sql
4834 cursor.execute(sql,params)
4835 row = cursor.fetchone()
4836 if not row:
4837 return False
4838 expected = int(row['expected'])
4839
4840 params.insert(1, job_id)
4841 sql = " SELECT COUNT(*) AS actual"
4842 sql += " FROM task t, task_def_tray tdt"
4843 sql += " WHERE tdt.task_def_tray_id = t.task_def_tray_id"
4844 sql += " AND tdt.task_def_id = %s"
4845 sql += " AND t.job_id = %s"
4846 sql += " AND t.status = 'OK'"
4847 if add_sql:
4848 sql += add_sql
4849
4850 cursor.execute(sql,params)
4851 row = cursor.fetchone()
4852 if not row:
4853 return False
4854 actual = int(row['actual'])
4855
4856 return expected > 0 and actual == expected
4857
4859 cursor = self.getcursor()
4860 ret = self.task_update_status(task_id,'ERROR',key,cursor)
4861 if not ret:
4862 self.logger.error("unable to update status for task %d" % task_id)
4863 return ret
4864 sql = " DELETE FROM task_statistics WHERE task_id = %s"
4865 self.logger.debug("aborting task %d" % task_id)
4866 cursor.execute(sql,(task_id,))
4867
4868 if stats:
4869
4870 sql = " SELECT grid_id,host FROM task "
4871 sql += " WHERE task_id=%d " % task_id
4872 self.logger.debug(sql)
4873 cursor.execute(sql)
4874 gridinfo = cursor.fetchone()
4875 if gridinfo:
4876 self.update_node_statistics(gridinfo,stats,error=2)
4877
4878 return True
4879
4881 cursor = self.getcursor()
4882 ret = self.task_update_status(task_id,'OK',key,cursor)
4883 if not ret:
4884 self.logger.error("unable to update status for task %d" % task_id)
4885 return ret
4886
4887 sql = " UPDATE task SET finish = NOW()"
4888 sql += " WHERE task_id=%s"
4889 self.logger.debug(sql % task_id)
4890 cursor.execute(sql,(task_id,))
4891 ret = self._conn.affected_rows()
4892 if not ret:
4893 self.logger.error("unable to set finish time for task %d" % task_id)
4894
4895
4896 if stats:
4897 inserts = []
4898 self.logger.debug("stats: %s" % stats)
4899 for name,value in stats.iteritems():
4900 if type(value) == float and not str(value)=='nan':
4901 inserts.append((task_id,name,value))
4902 sql = " REPLACE INTO task_statistics (task_id,name,value) VALUES (%s,%s,%s)"
4903 cursor.executemany(sql, inserts)
4904
4905
4906 sql = " SELECT grid_id,host FROM task "
4907 sql += " WHERE task_id=%d " % task_id
4908 self.logger.debug(sql)
4909 cursor.execute(sql)
4910 gridinfo = cursor.fetchone()
4911 if gridinfo:
4912 self.update_node_statistics(gridinfo,stats)
4913
4914 return True
4915
4916
4918 """
4919 Reset any pending jobs to they get reprocesses.
4920 This would typically be run at startup in case the daemon
4921 crashed previously.
4922 """
4923 cursor = self.getcursor()
4924 sql = " SELECT * FROM job "
4925 sql += " WHERE grid_id=%d " % grid_id
4926 sql += " AND status='COPYING' "
4927 if delay:
4928 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay
4929 cursor.execute(sql)
4930 resuslts = cursor.fetchall();
4931 self.commit()
4932 return results
4933
4934 - def SetFileURL(self,queue_id,dataset_id,location,filename,md5sum,filesize,transfertime,key):
4935 """
4936 Add or change the global location of a file
4937 """
4938 cursor = self.getcursor()
4939
4940 sql = " SELECT job_id FROM job "
4941 sql += " WHERE dataset_id=%u " % dataset_id
4942 sql += " AND queue_id = %u " % queue_id
4943 sql += " AND passkey='%s' " % key
4944 self.logger.debug(sql)
4945 cursor.execute(sql)
4946 results = cursor.fetchone()
4947 if not results: return 0
4948
4949 sql = " INSERT IGNORE INTO urlpath "
4950 sql += " (dataset_id,queue_id,name,path,md5sum,size,transfertime) VALUES "
4951 sql += " (%u,%u,'%s','%s','%s',%f,%f)" % \
4952 (dataset_id,queue_id,filename,location,md5sum,filesize,transfertime)
4953 self.logger.debug(sql)
4954 cursor.execute(sql)
4955 if not cursor.rowcount:
4956 sql = " UPDATE urlpath SET "
4957 sql += " name ='%s', " % filename
4958 sql += " path ='%s', " % location
4959 sql += " md5sum ='%s', " % md5sum
4960 sql += " size =%f, " % filesize
4961 sql += " transfertime =%f " % transfertime
4962 sql += " WHERE dataset_id =%u " % dataset_id
4963 sql += " AND queue_id =%u " % queue_id
4964 sql += " AND name ='%s' " % filename
4965 self.logger.debug(sql)
4966 cursor.execute(sql)
4967 return 1
4968
4969 - def GetStorageURL(self,dataset_id,queue_id,passkey,storage_type='INPUT'):
4970 """
4971 Get status of dataset
4972 """
4973 cursor = self.getcursor()
4974 sql = " SELECT urlpath.* "
4975 sql += " FROM urlpath "
4976 sql += " JOIN job ON urlpath.dataset_id = job.dataset_id AND urlpath.queue_id = job.queue_id "
4977 sql += " WHERE urlpath.dataset_id = %u " % dataset_id
4978 sql += " AND urlpath.queue_id = %u " % queue_id
4979 sql += " AND job.passkey = '%s' " % passkey
4980 sql += " AND urlpath.type = '%s' " % storage_type
4981 self.logger.debug(sql)
4982 cursor.execute(sql)
4983 return cursor.fetchall();
4984
4986 """
4987 Get status of dataset
4988 """
4989 cursor = self.getcursor()
4990 sql = " DELETE FROM urlpath "
4991 sql += " WHERE urlpath.dataset_id = %u " % dataset_id
4992 self.logger.debug(sql)
4993 cursor.execute(sql)
4994 return 1
4995
4996
4998 """
4999 @param days: number of days to get summary from
5000 @param groupby: how to group statistics
5001 """
5002 cursor = self.getcursor()
5003 sql = " SELECT "
5004 sql += " SUM(job.status = 'OK') as ok , "
5005 sql += " SUM(job.status = 'ERROR') as error , "
5006 sql += " SUM(job.status = 'SUSPENDED') as suspended, "
5007 if groupby:
5008 sql += " job.%s, " % groupby
5009 sql += " SUM(job.time_sys) as sys_t, "
5010 sql += " SUM(job.time_user) as usr_t, "
5011 sql += " SUM(job.time_real) as real_t "
5012 sql += " FROM job "
5013 sql += " WHERE DATE_SUB(CURDATE(),INTERVAL %d DAY) " % days
5014 sql += " < job.status_changed "
5015 sql += " AND job.grid_id != 0 "
5016 sql += " AND job.grid_id IS NOT NULL "
5017 if groupby:
5018 sql += " GROUP BY %s " % groupby
5019 cursor.execute(sql)
5020 return cursor.fetchall();
5021
5023 """
5024 @param days: number of days to get summary from
5025 @param groupby: how to group statistics
5026 """
5027 cursor = self.getcursor()
5028 sql = " SELECT job_statistics.name AS name, "
5029 sql += " SUM(job_statistics.value) AS value, "
5030 if groupby:
5031 sql += " job.%s, " % groupby
5032 sql += " SUM(1) AS completed "
5033 sql += " FROM job,job_statistics "
5034 sql += " WHERE DATE_SUB(CURDATE(),INTERVAL %d DAY) " % days
5035 sql += " < job.status_changed "
5036 sql += " AND job.status = 'OK' "
5037 sql += " AND job.dataset_id = job_statistics.dataset_id "
5038 sql += " AND job.queue_id = job_statistics.queue_id "
5039 sql += " AND job.grid_id IS NOT NULL "
5040 sql += " AND job.grid_id != 0 "
5041 if groupby:
5042 sql += " GROUP BY job_statistics.name,%s " % groupby
5043 else:
5044 sql += " GROUP BY job_statistics.name "
5045 cursor.execute(sql)
5046 return cursor.fetchall();
5047
5048
5050 grid_ids = {}
5051 cursor = self.getcursor()
5052 sql = " SELECT name,grid_id FROM grid "
5053 cursor.execute(sql)
5054 for item in cursor.fetchall():
5055 grid_ids[item['grid_id']] = item['name']
5056 return grid_ids
5057
5059 """
5060 @param dataset: dataset id
5061 @param job: optional job id
5062 @return: formated string with dataset/job summary
5063 """
5064 cursor = self.getcursor()
5065 if job >= 0:
5066 sql = " SELECT "
5067 sql += " job.dataset_id,job.job_id,status,job.grid_id,job.errormessage, "
5068 sql += " ( "
5069 sql += " (job.iter + SUM(IF(tray.tray_index < job.tray, tray.iterations, 0))) "
5070 sql += " /SUM(tray.iterations) "
5071 sql += " ) * 100 AS completion_percent "
5072 sql += " FROM job join tray on job.dataset_id = tray.dataset_id"
5073 sql += " WHERE job.dataset_id = %u " % dataset
5074 sql += " AND job.queue_id = %u " % job
5075 else:
5076 sql = " SELECT "
5077 sql += " SUM(job.status = 'RESET') as reset , "
5078 sql += " SUM(job.status = 'WAITING') as waiting , "
5079 sql += " SUM(job.status = 'OK') as ok , "
5080 sql += " SUM(job.status = 'ERROR') as error , "
5081 sql += " SUM(job.status = 'SUSPENDED') as suspended, "
5082 sql += " SUM(job.status = 'PROCESSING') as processing, "
5083 sql += " SUM(job.status = 'COPIED') as copied, "
5084 sql += " SUM(job.status = 'READYTOCOPY') as readytocopy, "
5085 sql += " SUM(job.status = 'FAILED') as failed, "
5086 sql += " SUM(job.status = 'QUEUED') as queued, "
5087 sql += " SUM(job.status = 'QUEUING') as queueing "
5088 sql += " FROM job "
5089 sql += " WHERE dataset_id = %u " % dataset
5090 cursor.execute(sql)
5091 return cursor.fetchall()
5092
5093
5095 """
5096 @param days: number of days to gather information from starting from today
5097 @return: formated string with production summary
5098 """
5099 div = '+'+('-'*9+'+')
5100 s ='iceprod summary for %s in the last %d days' % (self.db_,days)
5101 s += '-'*10
5102 s += '\n\n'
5103 grid_id = self.getgrid_ids()
5104 row = "| %-7s | " % "grid"
5105 gridsum = self.getsummary(days,'grid_id')
5106 for key in gridsum[0].keys():
5107 row += "%-7s | " % key[0:min(7,len(key))]
5108 div += ('-'*9+'+')
5109 s += row + '\n'
5110 s += div + '\n'
5111 for entry in gridsum:
5112 gridname = grid_id[entry['grid_id']]
5113 row = "| %-7s | " % gridname[0:min(7,len(gridname))]
5114 for key in gridsum[0].keys():
5115 row += "%-7.2g | " % entry[key]
5116 s += row + '\n'
5117 s += div + '\n'
5118 totals = self.getsummary(days)
5119 for entry in totals:
5120 row = "| %-7s | " % "Total"
5121 for key in gridsum[0].keys():
5122 if entry.has_key(key):
5123 row += "%-7.2g | " % entry[key]
5124 else:
5125 row += "%-7.2g | " % 0.
5126 s += row + '\n'
5127 s += div + '\n'
5128
5129 column_size = 10
5130 strfmt = "%%-%ds|" % column_size
5131 gfmt = "%%-%d.2g|" % column_size
5132 dictfmt = "%%%%(%%s)-%ds|" % column_size
5133 s += '\n\n'
5134 gridsum = self.getsummary_stats(days,'grid_id')
5135 newgridsum = {}
5136 grids = ["Total"]
5137 keys = {}
5138 totals = {}
5139 for entry in gridsum:
5140 grid = entry["grid_id"]
5141 name = entry["name"]
5142 gridname = grid_id[grid]
5143 gridname = gridname.strip()[0:min(column_size,len(gridname))]
5144 key = re.sub(r'[^a-z0-9_]','',entry["name"].lower())
5145 if not keys.has_key(key):
5146 keys[key] = entry["name"][0:min(column_size,len(entry["name"]))]
5147 if ("time" in key) or ("event" in key):
5148 if not newgridsum.has_key(key):
5149 newgridsum[key] = {}
5150 if gridname not in grids:
5151 grids.append(gridname)
5152 try:
5153 newgridsum[key][gridname] = "%2.2g" % float(entry["value"])
5154 if not newgridsum[key].has_key("Total"):
5155 newgridsum[key]["Total"] = 0.0
5156 newgridsum[key]["Total"] += float(entry["value"])
5157 except Exception,e:
5158 print e,key
5159 newgridsum[key][gridname] = str(entry["value"])
5160 rowhdr = "|" + "".join(map(lambda x: strfmt % x, [" "] + grids)) + "\n"
5161 div = "+" + ('-'*column_size+'+')*(len(grids)+1) + "\n"
5162 s += div + rowhdr
5163 for key in newgridsum.keys():
5164 entry = newgridsum[key]
5165 rowfmt = "|" + strfmt % keys[key] + "".join(map(lambda x: dictfmt % x, grids)) + "\n"
5166 for grid in grids:
5167 if not entry.has_key(grid): entry[grid] = "N/A"
5168 entry["Total"] = "%2.2g" % float(entry["Total"])
5169 row = rowfmt % entry
5170 s += div + row
5171 totals = self.getsummary_stats(days)
5172 s += div
5173 return s
5174
5175
5176 - def add_history(self,user,command):
5177 """
5178 Add a history item
5179 """
5180 cursor = self.getcursor()
5181 sql = " INSERT INTO history (user,cmd,time)"
5182 sql += " VALUES ('%s','%s',NOW())" % (user,command)
5183 cursor.execute(sql)
5184 self.commit()
5185
5186
5187
5189
5190 """
5191 Class paramdb uploads parsed IceTrayConfig+Metaproject structure
5192 to the parameter database
5193
5194 """
5195 logger = logging.getLogger('MySQLParamDb')
5196
5198
5199 IceProdDB.__init__(self)
5200 self.metaprojects = {}
5201 self.projects = {}
5202 self.modules = {}
5203 self.parameters = {}
5204 self.auth_function = lambda x: \
5205 self.authenticate(
5206 get('host',x.host_),
5207 get('username',x.usr_),
5208 getpass.getpass(),
5209 get('database',x.db_),True)
5210 self.auth_function = lambda x: x
5211 return
5212
5214 """
5215 Create a copy of this instance
5216 """
5217 newconn = MySQLParamDb()
5218 newconn.host_ = self.host_
5219 newconn.usr_ = self.usr_
5220 newconn.passwd_ = self.passwd_
5221 newconn.db_ = self.db_
5222 newconn._connected = False
5223 return newconn
5224
5225 - def load(self,metaproject):
5226 """
5227 load contents of metaproject tree to database
5228 @param metaproject: metaproject object
5229 """
5230 self.connect()
5231 self.load_metaproject(metaproject)
5232 self.load_projects(metaproject)
5233 self.load_mp_pivot(metaproject)
5234 self.load_project_dependencies(metaproject)
5235 self.commit()
5236 return 1
5237
5238
5240 for metaproject in self.GetMetaProjects():
5241 self.metaprojects[metaproject.GetId()]= metaproject
5242 for project in self.GetProjects(metaproject.GetId()):
5243 self.projects[project.GetId()] = project
5244 metaproject.AddProject(project.GetName(),project)
5245
5246 for dependency in self.GetProjectDependencies(
5247 project.GetId(), metaproject.GetId()):
5248 project.AddDependency(dependency)
5249
5250 for service in self.GetServices(project.GetId()):
5251 self.modules[service.GetId()] = service
5252 project.AddService(service)
5253
5254 for param in self.GetParameters(service.GetId()):
5255 service.AddParameter(param)
5256 self.parameters[param.GetId()] = param
5257
5258 for module in self.GetModules(project.GetId()):
5259 self.modules[module.GetId()] = module
5260 project.AddModule(module)
5261
5262 for param in self.GetParameters(module.GetId()):
5263 module.AddParameter(param)
5264 self.parameters[param.GetId()] = param
5265 return self.metaprojects
5266
5276
5349
5350
5352 mplist = []
5353 for m in self.fetch_metaproject_list():
5354 metaproject = MetaProject()
5355 metaproject.SetName(m['name'])
5356 metaproject.SetVersion(m['versiontxt'])
5357 metaproject.SetId(m['metaproject_id'])
5358 mplist.append(metaproject)
5359 return mplist
5360
5362 return self.GetProjectsMM(module,metaproj,'service')
5363
5365 sql = " SELECT metaproject.* FROM metaproject "
5366 sql += " WHERE metaproject.name = '%s' " % metaproj.GetName()
5367 sql += " AND metaproject.versiontxt= '%s' " % metaproj.GetVersion()
5368 cursor = self.getcursor()
5369 cursor.execute (sql.strip());
5370 mp = cursor.fetchone ();
5371 metaproj.SetId(mp['metaproject_id'])
5372 return metaproj
5373
5375
5376 sql = " SELECT project.* FROM project,module,mp_pivot,metaproject "
5377 sql += " WHERE module.project_id = project.project_id "
5378 sql += " AND mp_pivot.project_id = project.project_id "
5379 sql += " AND mp_pivot.metaproject_id = metaproject.metaproject_id "
5380 sql += " AND metaproject.name = '%s' " % metaproj.GetName()
5381 sql += " AND metaproject.versiontxt = '%s' " % metaproj.GetVersion()
5382 sql += " AND module.class = '%s' " % module.GetClass()
5383 cursor = self.getcursor()
5384 cursor.execute (sql.strip());
5385 p = cursor.fetchone ();
5386 if not p: return []
5387
5388 project = Project()
5389 project.SetName(p['name'])
5390 project.SetVersion(p['versiontxt'])
5391 project.SetId(p['project_id'])
5392 project_list = self.GetProjectDependencies(project.GetId(),metaproj.GetId())
5393 project_list.insert(0,project)
5394 return project_list
5395
5397 plist = []
5398 for p in self.fetch_project_list(metaproject_id):
5399 project = Container()
5400 project.SetName(p['name'])
5401 project.SetVersion(p['versiontxt'])
5402 project.SetId(p['project_id'])
5403 plist.append(project)
5404 return plist
5405
5407 dlist = []
5408 for d in self.fetch_project_dependencies(project_id,metaproject_id):
5409 dependency = Container()
5410 dependency.SetName(d['name'])
5411 dependency.SetVersion(d['versiontxt'])
5412 dependency.SetId(d['project_id'])
5413 dlist.append(dependency)
5414 return dlist
5415
5417 slist = []
5418 for s in self.fetch_modules_from_project_id(project_id,'service'):
5419 service = Service()
5420 service.SetName(s['name'])
5421 service.SetClass(s['class'])
5422 service.SetId(s['module_id'])
5423 slist.append(service)
5424 return slist
5425
5427 slist = []
5428 for s in self.fetch_services_for_project(name,version):
5429 service = Service()
5430 service.SetName(s['name'])
5431 service.SetClass(s['class'])
5432 service.SetId(s['module_id'])
5433 slist.append(service)
5434 return slist
5435
5436
5438 mlist = []
5439 for m in self.fetch_modules_from_project_id(project_id):
5440 module = Module()
5441 module.SetName(m['name'])
5442 module.SetClass(m['class'])
5443 module.SetId(m['module_id'])
5444 mlist.append(module)
5445 return mlist
5446
5448 mlist = []
5449 for m in self.fetch_modules_for_project(name,version):
5450 module = Module()
5451 module.SetName(m['name'])
5452 module.SetClass(m['class'])
5453 module.SetId(m['module_id'])
5454 mlist.append(module)
5455 return mlist
5456
5458 mlist = []
5459 for m in self.fetch_iceprodmodules():
5460 module = IceProdPre()
5461 module.SetName(m['name'])
5462 module.SetClass(m['class'])
5463 module.SetId(m['module_id'])
5464 mlist.append(module)
5465 return mlist
5466
5467
5469 plist = []
5470 for param in self.fetch_service_parameters(module_id):
5471 parameter = Parameter()
5472 parameter.SetName(param['name'])
5473 parameter.SetType(param['type'])
5474 pid = param['parameter_id']
5475 parameter.SetId(pid)
5476
5477 if parameter.GetType() == 'OMKeyv' :
5478 parameter.SetValue(self.select_omkey_array(pid))
5479 elif parameter.GetType() == 'OMKey' :
5480 parameter.SetValue(self.select_omkey(pid))
5481 elif parameter.GetType() in VectorTypes:
5482 parameter.SetValue(self.select_array(pid))
5483 else:
5484 parameter.SetValue(Value(param['value'],param['unit']))
5485 plist.append(parameter)
5486 return plist
5487
5488
5490 """
5491 retrive IDs for metaprojects in the database
5492 """
5493
5494 sql = "SELECT * FROM metaproject"
5495 cursor = self.getcursor()
5496 sql = re.sub('\s+',' ',sql);
5497 cursor.execute (sql);
5498 result_set = cursor.fetchall ();
5499
5500 return map(self.nonify,result_set);
5501
5502
5504 """
5505 retrive IDs for projects in the database
5506 @param metaproject_id: table_id of metaproject
5507 """
5508
5509 sql = """
5510 SELECT
5511 project.project_id,project.name,project.versiontxt
5512 FROM project,mp_pivot """
5513 if metaproject_id:
5514 sql += """ WHERE
5515 project.project_id = mp_pivot.project_id
5516 AND mp_pivot.metaproject_id = %s""" % metaproject_id
5517
5518 cursor = self.getcursor()
5519 sql = re.sub('\s+',' ',sql);
5520 cursor.execute (sql);
5521 result_set = cursor.fetchall ();
5522 return map(self.nonify,result_set);
5523
5525 """
5526 retrieve project with given id
5527 @param id: primary key of project
5528 @param table: table to query (project or metaprojects)
5529 """
5530 sql="""SELECT * FROM %s WHERE %s_id ='%d'
5531 """ % (table,table,id)
5532
5533 sql = re.sub('\s+',' ',sql)
5534 cursor = self.getcursor()
5535 cursor.execute (sql);
5536
5537 return cursor.fetchall ()
5538
5539
5541 """
5542 retrive id for project with matching name, version
5543 (there should only be one)
5544 @param pname: name of project to query
5545 @param pversion: tuple representing the major,minor,patch version
5546 @param table: table to query (project or metaprojects)
5547 """
5548 sql="""SELECT %s_id FROM %s
5549 WHERE name ='%s'
5550 AND versiontxt ='%s'
5551 """ % (table,table,pname,pversion)
5552
5553 sql = re.sub('\s+',' ',sql)
5554 cursor = self.getcursor()
5555 cursor.execute (sql);
5556 result = cursor.fetchall ()
5557 if result:
5558 return int(result[0]['%s_id' % table ])
5559 else:
5560 self.logger.warn("project \'%s\' not found" % pname)
5561 return
5562
5563
5564
5565
5567 return self.fetch_module_id(service,pid,'service')
5568
5570 """
5571 retrive id for module with matching name, and project id
5572 (there should only be one)
5573 @param module: module to query
5574 """
5575 if not pid: return None
5576 cname = module.GetClass()
5577
5578 sql =" SELECT module_id FROM module "
5579 sql +=" WHERE class ='%s' " % cname
5580 sql +=" AND project_id ='%d' """ % pid
5581
5582 sql = re.sub('\s+',' ',sql)
5583 self.logger.debug(sql)
5584 cursor = self.getcursor()
5585 cursor.execute (sql);
5586 result = cursor.fetchone()
5587 self.logger.debug(result)
5588
5589 if result:
5590 return int(result['module_id'])
5591 else:
5592 self.logger.error("%s \'%s\' not found" % (table,cname))
5593 return
5594
5596 """
5597 retrive dependencys for project
5598 @param project_id: id of project
5599 @return array of project names
5600 """
5601 dependencies = []
5602
5603 sql = " SELECT project.* "
5604 sql += " FROM project, project_depend "
5605 sql += " WHERE "
5606 sql += " project.project_id = project_depend.dependency_id "
5607 sql += " AND "
5608 sql += " project_depend.project_id = %d " % project_id
5609 sql += " AND "
5610 sql += " project_depend.metaproject_id = %d " % metaproject_id
5611
5612 cursor = self.getcursor()
5613 cursor.execute (sql.strip());
5614 result_set = cursor.fetchall ();
5615
5616 return result_set
5617
5618
5620 """
5621 retrive modules for with a given pid
5622 @param project_id: id of project
5623 """
5624 sql="""
5625 SELECT * FROM module WHERE project_id = %s
5626 AND module_type='%s' ORDER BY class
5627 """ % (project_id,table)
5628
5629 cursor = self.getcursor()
5630 sql = re.sub('\s+',' ',sql);
5631 cursor.execute (sql);
5632 result_set = cursor.fetchall ();
5633
5634 return map(self.nonify,result_set);
5635
5636
5638 """
5639 retrive modules for with a project given by
5640 name and version
5641 @param name: name of project
5642 @param version: version tuple
5643 """
5644 pid = self.fetch_project_id(name,version)
5645 if pid:
5646 return self.fetch_modules_from_project_id(pid)
5647 else:
5648 return []
5649
5651 """
5652 retrive modules for with a project given by
5653 name and version
5654 @param name: name of project
5655 @param version: version tuple
5656 """
5657 pid = self.fetch_project_id(name,version)
5658 if pid:
5659 return self.fetch_modules_from_project_id(pid,'service')
5660 else:
5661 return []
5662
5664 """
5665 retrive modules for with a project given by
5666 name and version
5667 """
5668 pid = self.fetch_project_id('iceprod',iceprod.__version__)
5669 if pid:
5670 return self.fetch_modules_from_project_id(pid,'iceprod')
5671 else:
5672 return []
5673
5675 """
5676 retrive parameters for with a service/module given by
5677 the service/module id
5678 @param module_id: primary key in modules table on db
5679 """
5680 sql="SELECT * FROM parameter WHERE module_id = %d" % module_id
5681
5682 cursor = self.getcursor()
5683 sql = re.sub('\s+',' ',sql);
5684 cursor.execute (sql);
5685 result_set = cursor.fetchall ();
5686 return map(self.nonify,result_set);
5687
5689 """
5690 retrive parameters for with a service/module given by
5691 the service/module id
5692 @param service_id: primary key in services table on db
5693 """
5694 return self.fetch_module_parameters(service_id)
5695
5697 """
5698 Load cross-references between meta-projects (1) and projects
5699 The purpose of this is provide a way simulatneously for projects to
5700 reference which meta-projects they are members of and for
5701 meta-projects to list which projects belong to them (many-to-many).
5702 @param metaproject: Metaproject object
5703 """
5704
5705 sql = """
5706 INSERT IGNORE INTO mp_pivot
5707 (metaproject_id,project_id) VALUES
5708 """
5709 cm = ''
5710 mid = metaproject.GetId()
5711 for p in metaproject.GetProjectList():
5712 sql += "%s\n(%s,%s)" % (cm,mid,p.GetId())
5713 cm = ','
5714
5715 sql = re.sub('\s+',' ',sql)
5716 self.logger.debug(sql)
5717 cursor = self.getcursor()
5718 cursor.execute (sql)
5719
5720 self.logger.debug(self.insert_id())
5721 self.logger.debug("%d mp_pivot rows were inserted" % cursor.rowcount)
5722 cursor.execute("SHOW WARNINGS")
5723 warn = cursor.fetchone()
5724 if warn: self.logger.warn(warn)
5725 return
5726
5728 """
5729 Load cross-references between projects (1) and depency projects
5730 @param project: Project object
5731 """
5732
5733 sql = " INSERT IGNORE INTO project_depend "
5734 sql += " (project_id,metaproject_id,dependency_id) "
5735 sql += " VALUES "
5736 pid = project.GetId()
5737 mpid = metaproject.GetId()
5738 cursor = self.getcursor()
5739
5740 for p in project.GetDependencies():
5741 if not p: continue
5742 self.logger.info("%s - getting project dependency: %s" % \
5743 (project.GetName(),p.GetName()))
5744 if not metaproject.GetProject(p.GetName()):
5745 self.logger.fatal('failed dependency:%s needs %s' % \
5746 (project.GetName(),p.GetName()) )
5747 os._exit(1)
5748 did = metaproject.GetProject(p.GetName()).GetId()
5749 sql2 = sql + "(%s,%s,%s)" % (pid,mpid,did)
5750 try:
5751 cursor.execute (sql2)
5752 self.logger.info(
5753 "%d project_depend rows were inserted" % cursor.rowcount)
5754 except Exception, e:
5755 self.logger.error(e)
5756 return
5757
5758
5759
5761 """
5762 Load projects to database
5763 @param metaproject: MetaProject object
5764 """
5765 for proj in metaproject.GetProjectList():
5766 self.load_project(proj)
5767 self.load_services(proj)
5768 self.load_modules(proj)
5769 return
5770
5772 """
5773 Load project dependencies to database
5774 @param metaproject: MetaProject object
5775 """
5776 for proj in metaproject.GetProjectList():
5777 self.load_dependencies(proj,metaproject)
5778 return
5779
5781 return self.load_project(metaproject,"metaproject")
5782
5783
5785 """
5786 Load project to database
5787 @param project: the project to be loaded
5788 @param table: table to which project should be loaded
5789 """
5790 pid = self.fetch_project_id(project.GetName(),project.GetVersion(),table)
5791 self.logger.debug("%s %s.%s pid is %s" % (table,project.GetName(),project.GetVersion(),pid))
5792 if not pid:
5793 sql = "INSERT INTO %s " % table
5794 sql += "(name, versiontxt,major_version,minor_version,patch_version) "
5795 sql += " VALUES "
5796
5797 ver = project.GetVersion()
5798 name = project.GetName()
5799 vt = ('00','00','00')
5800
5801
5802 legacy_ver = self.version_regex.search(ver)
5803 if legacy_ver:
5804 legacy_ver = legacy_ver.group(0).replace('V','')
5805 vt = legacy_ver.split('-')
5806 sql += "('%s','%s','%s','%s','%s')" % (name,ver,vt[0],vt[1],vt[2])
5807 self.logger.debug(sql)
5808
5809 cursor = self.getcursor()
5810 cursor.execute(sql)
5811 pid = self.insert_id()
5812 if pid:
5813 self.logger.debug("inserted id %d" % pid)
5814 else:
5815 self.logger.error(
5816 "could not load project %s to parameter database" % project.GetName())
5817
5818 project.SetId(pid)
5819 self.logger.debug("%s.GetId(): %s" % (project.GetName(),project.GetId()))
5820 return pid
5821
5822
5824 """
5825 Load modules into the database.
5826 @param project: Project object
5827 """
5828 for module in project.GetModules():
5829 self.load_module(module,project)
5830 self.load_params(module)
5831 return
5832
5834 """
5835 Load services into the database.
5836 @param project: Project object
5837 """
5838 for service in project.GetServices():
5839 self.load_service(service,project)
5840 self.load_params(service)
5841 return
5842
5843
5845 """
5846 Load individual module to database
5847 @param module: object to load
5848 """
5849 pid = project.GetId()
5850 mid = self.fetch_module_id(module,pid,type)
5851 if not mid:
5852 sql = " INSERT INTO module "
5853 sql += " (name,class,project_id,module_type) "
5854 sql += " VALUES "
5855 sql += " (\'%s\',\'%s\',%d,'%s') " % (module.GetName(),module.GetClass(),pid,type)
5856
5857 self.logger.debug(sql.strip())
5858
5859 cursor = self.getcursor()
5860 cursor.execute(sql.strip())
5861 mid = self.insert_id()
5862
5863 cursor.execute("SHOW WARNINGS")
5864 warn = cursor.fetchone()
5865 if warn: self.logger.warn(warn)
5866
5867 cursor.execute("SHOW ERRORS")
5868 err = cursor.fetchone()
5869 if err: self.logger.error(err)
5870
5871 rowcount = cursor.rowcount
5872 if mid:
5873 self.logger.debug("inserted module id %d" % mid)
5874 else:
5875 self.logger.error("could not fetch id for '%s'" % module.GetClass())
5876
5877 module.SetId(mid)
5878 return mid
5879
5881 """
5882 Load individual service to database
5883 @param service: object to load
5884 """
5885 return self.load_module(service,project,type='service')
5886
5888 """
5889 Load individual service to database
5890 @param module: object to load
5891 """
5892 return self.load_module(service,project,type='iceprod')
5893
5894
5896 """
5897 Load parameters for module or service to database
5898 @param module: object whose parameter will be loaded
5899 """
5900 cursor = self.getcursor()
5901 sql = " INSERT IGNORE INTO parameter "
5902 sql += " (name,type,unit,description,module_id,value) "
5903 sql += " VALUES "
5904 sql2 = ''
5905 count = 0
5906
5907 m_id = module.GetId()
5908 self.logger.debug('module_id %d'% m_id)
5909
5910 cm = ''
5911 if not module.GetParameters(): return
5912 for p in module.GetParameters():
5913 name = p.GetName()
5914 type = p.GetType()
5915 desc = p.GetDescription()
5916 desc = re.sub(r'\"','"',desc)
5917 desc = re.sub(r'\'','"',desc)
5918
5919 if type == 'OMKey' or type in VectorTypes:
5920 value = 0
5921 sql1 = sql + " ('%s','%s','%s','%s',%d,'%s') " % \
5922 (name,type,'NULL',desc,m_id,value)
5923 cursor.execute (sql1.strip())
5924 pid = self.insert_id()
5925 cursor.execute ('show warnings')
5926 retval = cursor.fetchall ()
5927 if not pid:
5928 sql1a = " SELECT parameter_id FROM parameter "
5929 sql1a += " WHERE name='%s' " % name
5930 sql1a += " AND module_id=%d " % m_id
5931 self.logger.debug(sql1a.strip())
5932 cursor.execute (sql1a.strip())
5933 pid = cursor.fetchone ()
5934 if pid: pid = pid['parameter_id']
5935 else: raise Exception,"Failed to insert/fetch parameter id"
5936
5937 count = count + cursor.rowcount
5938 if type == 'OMKey':
5939 self.insert_omkey(p.GetValue(),pid)
5940 elif type == 'OMKeyv':
5941 self.insert_omkey_array(p.GetValue(),pid)
5942 else:
5943 self.insert_array(p.GetValue(),pid)
5944
5945 else:
5946 value = p.GetValue().value
5947 unit = self.nullify(p.GetValue().unit)
5948 sql2 += "%s (\'%s\',\'%s\',%s,\"%s\",%d,\'%s\') " % \
5949 (cm,name,type,unit,desc,m_id,value)
5950 cm = ','
5951
5952 if sql2:
5953 sql += sql2
5954 self.logger.debug(sql.strip())
5955 cursor.execute(sql.strip())
5956 cursor.execute ('show warnings')
5957 retval = cursor.fetchall ()
5958 count = count + cursor.rowcount
5959 self.logger.debug("%d parameter rows were inserted " % count)
5960
5962 cursor = self.getcursor()
5963 sql = " INSERT INTO array_element (name,value,parameter_id) "
5964 sql += " VALUES "
5965 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid)
5966 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid)
5967 self.logger.debug(sql.strip())
5968 cursor.execute (sql.strip())
5969
5971 cursor = self.getcursor()
5972 sql = " INSERT INTO array_element (name,value,parameter_id) "
5973 sql += " VALUES "
5974 cm = ""
5975 if len(omkeyvect) < 1: return
5976 for omkey in omkeyvect:
5977 sql += cm
5978 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid)
5979 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid)
5980 cm = ","
5981 cursor.execute (sql.strip())
5982
5984 cursor = self.getcursor()
5985 sql = " INSERT INTO array_element (value,unit,parameter_id) "
5986 sformat = lambda x: "('%s','%s',%s)" % (x.value,self.nullify(x.unit),pid)
5987 vals = ",".join(map(sformat,values))
5988 if len(vals) > 0:
5989 sql += " VALUES " + vals
5990 cursor.execute (sql.strip())
5991
5993 cursor = self.getcursor()
5994 sql = " SELECT * from array_element "
5995 sql += " WHERE parameter_id = %d " % pid
5996 cursor.execute (sql.strip())
5997 result_set = cursor.fetchall();
5998 vect = []
5999 for item in result_set:
6000 vect.append(Value(item['value'],self.nonify(item['unit'])))
6001 return vect
6002
6004 omkeys = self.select_omkey_array(pid)
6005 if len(omkeys) < 1:
6006 raise Exception,'could not find omkey for param %d' % pid
6007 return omkeys[0]
6008
6010 cursor = self.getcursor()
6011 sql = " SELECT * from array_element "
6012 sql += " WHERE parameter_id = %d order by array_element_id" % pid
6013 cursor.execute (sql.strip())
6014 result_set = cursor.fetchall();
6015 omkeyvect = []
6016 for item in result_set:
6017 if item['name'] == 'stringid':
6018 omkey = pyOMKey(0,0)
6019 omkey.stringid = item['value']
6020 elif item['name'] == 'omid':
6021 omkey.omid = item['value']
6022 omkeyvect.append(omkey)
6023 else:
6024 raise Exception,'expected omkey but found %s' % result_set[1]['name']
6025 return omkeyvect
6026
6028
6031
6033
6034 def LoadParams(metaproject,username,passwd):
6035 db = self.paramdb.new()
6036 db.disconnect()
6037 if parent.auth_db(db,username, passwd,keep_open=True):
6038 retval = db.load(loads(metaproject))
6039 db.disconnect()
6040 logger.info("Uploaded metaproject for %s %s %s" % (username,db.db_,db.host_))
6041 return retval
6042 else:
6043 logger.error("Failed to authenticate %s on %s@%s with password" % (username,db.db_,db.host_))
6044 return 0
6045 parent.server.register_function(LoadParams)
6046
6047 def DownloadParams():
6048 return dumps(self.paramdb.download())
6049 parent.server.register_function(DownloadParams)
6050
6051 def SwitchMetaProject(iconfig,id,name,version):
6052 return dumps(self.paramdb.SwitchMetaProject(loads(iconfig),id,name,loads(version)))
6053 parent.server.register_function(SwitchMetaProject)
6054
6055 def GetMetaProjects():
6056 return dumps(self.paramdb.GetMetaProjects())
6057 parent.server.register_function(GetMetaProjects)
6058
6059 def GetProjectsSM(module,metaproj):
6060 return dumps(self.paramdb.GetProjectsSM(loads(module),loads(metaproj)))
6061 parent.server.register_function(GetProjectsSM)
6062
6063 def GetProjectsMM(module,metaproj):
6064 return dumps(self.paramdb.GetProjectsMM(loads(module),loads(metaproj)))
6065 parent.server.register_function(GetProjectsMM)
6066
6067 def GetProjects(metaproject_id):
6068 return dumps(self.paramdb.GetProjects(metaproject_id))
6069 parent.server.register_function(GetProjects)
6070
6071 def GetProjectDependencies(project_id,metaproject_id):
6072 return dumps(self.paramdb.GetProjectDependencies(project_id,metaproject_id))
6073 parent.server.register_function(GetProjectDependencies)
6074
6075 def GetServices(project_id):
6076 return dumps(self.paramdb.GetServices(project_id))
6077 parent.server.register_function(GetServices)
6078
6079 def GetServicesP(name,version):
6080 return dumps(self.paramdb.GetServicesP(name,loads(version)))
6081 parent.server.register_function(GetServicesP)
6082
6083 def GetModules(project_id):
6084 return dumps(self.paramdb.GetModules(project_id))
6085 parent.server.register_function(GetModules)
6086
6087 def GetModulesP(name,version):
6088 return dumps(self.paramdb.GetModulesP(name,loads(version)))
6089 parent.server.register_function(GetModulesP)
6090
6091 def GetIceProdModules():
6092 return dumps(self.paramdb.GetIceProdModules())
6093 parent.server.register_function(GetIceProdModules)
6094
6095 def GetParameters(module_id):
6096 return dumps(self.paramdb.GetParameters(module_id))
6097 parent.server.register_function(GetParameters)
6098
6099 def fetch_metaproject_id(name, version):
6100 return self.paramdb.fetch_metaproject_id(name,loads(version))
6101 parent.server.register_function(fetch_metaproject_id)
6102
6103 def fetch_metaproject_list():
6104 return dumps(self.paramdb.fetch_metaproject_list())
6105 parent.server.register_function(fetch_metaproject_list)
6106
6107 def fetch_project_list(metaproject_id):
6108 return dumps(self.paramdb.fetch_project_list(metaproject_id))
6109 parent.server.register_function(fetch_project_list)
6110
6111 def fetch_project(id):
6112 return dumps(self.paramdb.fetch_project(id))
6113 parent.server.register_function(fetch_project)
6114
6115 def fetch_project_id(pname,pversion):
6116 return dumps(self.paramdb.fetch_project_id(name,loads(pversion)))
6117 parent.server.register_function(fetch_project_id)
6118
6119 def fetch_service_id(service,pid):
6120 return self.paramdb.fetch_service_id(loads(service),pid)
6121 parent.server.register_function(fetch_service_id)
6122
6123 def fetch_module_id(module,mid):
6124 return self.paramdb.fetch_module_id(loads(module),mid)
6125 parent.server.register_function(fetch_module_id)
6126
6127 def fetch_project_dependencies(project_id,metaproject_id):
6128 return dumps(self.paramdb.fetch_project_dependencies(project_id,metaproject_id))
6129 parent.server.register_function(fetch_project_dependencies)
6130
6131 def fetch_modules_from_project_id(project_id):
6132 return dumps(self.paramdb.fetch_modules_from_project_id(project_id))
6133 parent.server.register_function(fetch_modules_from_project_id)
6134
6135 def fetch_modules_for_project(name,version):
6136 return dumps(self.paramdb.fetch_modules_for_project(name,loads(version)))
6137 parent.server.register_function(fetch_modules_for_project)
6138
6139 def fetch_services_for_project(name,version):
6140 return dumps(self.paramdb.fetch_services_for_project(name,loads(version)))
6141 parent.server.register_function(fetch_services_for_project)
6142
6143 def fetch_module_parameters(module_id):
6144 return dumps(self.paramdb.fetch_module_parameters(module_id))
6145 parent.server.register_function(fetch_module_parameters)
6146
6147 def fetch_service_parameters(module_id):
6148 return dumps(self.paramdb.fetch_service_parameters(module_id))
6149 parent.server.register_function(fetch_service_parameters)
6150
6151
6152 if __name__ == '__main__':
6153
6154 from xmlparser import IceTrayXMLParser
6155 from xmlwriter import IceTrayXMLWriter
6156
6157 if len(sys.argv) < 2:
6158 print 'Usage: python config.py <xml in> <xml out>'
6159 sys.exit()
6160
6161
6162 steering = Steering()
6163
6164 i3db = ConfigDB()
6165 passwd = getpass.getpass()
6166 i3db.authenticate('dbs2.icecube.wisc.edu','i3iceprod-uw',passwd,'i3iceprod',True)
6167 runid = int(sys.argv[1])
6168 i3config = i3db.download_config(runid,include_defaults=True,include_description=False)
6169 i3db.disconnect()
6170 writer = IceTrayXMLWriter(i3config)
6171 writer.write_to_file(sys.argv[2])
6172