Package iceprod :: Package server :: Module db
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.db

   1  #!/bin/env python 
   2  # 
   3  """ 
   4   Provides an interface with MySQL job monitoring database  
   5   
   6   copyright  (c) 2008 the icecube collaboration 
   7   
   8   @version: $Revision: $ 
   9   @date: $Date: $ 
  10   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
  11  """ 
  12  import re,sys 
  13  import getpass 
  14  import string 
  15  import random 
  16  import iceprod 
  17  import logging 
  18  import types 
  19  import copy 
  20  import time 
  21  from cPickle import loads,dumps 
  22   
  23  from iceprod.core import logger  
  24  from iceprod.core.dataclasses import * 
  25  from iceprod.core.paramdb  import * 
  26  from iceprod.core.metadata    import * 
  27  from iceprod.core.constants import * 
  28   
  29  import MySQLdb 
  30  from MySQLdb import OperationalError 
  31   
  32  import logging 
  33   
  34   
35 -class IceProdDB:
36 37 logger = logging.getLogger('IceProdDB') 38
39 - def __init__(self):
40 """ 41 Constructor 42 43 """ 44 self._conn = None 45 self._connected = False 46 self._auto = False 47 self.host_ = None 48 self.usr_ = None 49 self.passwd_ = None 50 self.db_ = None 51 self.port_ = 3306 52 self.auth_function = lambda x: None 53 self.version_regex = re.compile(r'[A-Z][0-9][0-9]-[0-9][0-9]-[0-9][0-9]') 54 return
55
56 - def nullify(self,value):
57 if value: 58 return "\'%s\'" % value 59 return 'NULL'
60
61 - def nonify(self,value):
62 if value == 'NULL': 63 return None 64 return value 65
66 - def intcast(self,value):
67 if value == 'NULL' or not value: 68 return 0 69 return int(value) 70
71 - def defang(self,txt):
72 return txt.replace("\'","\\\'").replace("\"","\\\"") 73
74 - def get(self,name,value):
75 sys.stdout.write("%s [%s] : " % (name,value)) 76 str = sys.stdin.readline().strip() 77 if str: 78 return str 79 else: 80 return value
81
82 - def new(self):
83 """ 84 Create a copy of this instance 85 """ 86 newconn = IceProdDB() 87 newconn.host_ = self.host_ 88 newconn.usr_ = self.usr_ 89 newconn.passwd_ = self.passwd_ 90 newconn.db_ = self.db_ 91 newconn._connected = False 92 return newconn
93
94 - def ping(self):
95 """ 96 Ping server to reactivate connection 97 """ 98 if not self.isconnected(): 99 time.sleep(50) 100 raise OperationalError,"Not connected to database" 101 try: 102 self._conn.ping() 103 except OperationalError,e: 104 self.logger.error('%s: will attempt to reconnect.' % str(e)) 105 self._conn = MySQLdb.connect(self.host_,self.usr_,self.passwd_,self.db_,self.port_) 106 self._connected = True
107
108 - def getcursor(self):
109 if self.isconnected(): 110 self.ping() 111 return self._conn.cursor (MySQLdb.cursors.DictCursor) 112 else: 113 self.logger.warn('Not connected to database. Attempting to reconnect..') 114 self.logger.info('delaying for 10 sec.') 115 time.sleep(10) 116 raise OperationalError,"Not connected to database"
117
118 - def commit(self):
119 self.logger.debug("auto-commit set to %s" % self._auto) 120 if not self._auto: 121 return self._conn.commit()
122
123 - def rollback(self):
124 logger.debug("rolling back transaction") 125 return self._conn.rollback()
126
127 - def execute(self,cursor,sql,commit=True):
128 129 for i in range(10): 130 try: 131 cursor.execute(sql); 132 rowcount = self._conn.affected_rows() 133 if commit: self.commit() 134 return rowcount; 135 except OperationalError,e: 136 self.logger.error(e); 137 return 0
138
139 - def insert_id(self):
140 return self._conn.insert_id()
141
142 - def SetAuthFunc(self,func):
143 self.auth_function = func
144
145 - def authenticate(self,host,usr,passwd,db,keep_open=False,port=3306):
146 """ 147 Database authentication 148 @param host: ip or name of MySQL host 149 @param usr: username 150 @param passwd: account password 151 @param db: name of database 152 @param keep_open: don't close connection after authenticating 153 """ 154 self.host_ = host 155 self.usr_ = usr 156 self.passwd_ = passwd 157 self.db_ = db 158 self.port_ = port 159 try: 160 self.connect() 161 if not keep_open: 162 self.disconnect() 163 return True 164 except Exception,e: 165 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 166 self.logger.error('%s: %s' % (sys.exc_type,e)) 167 self.logger.error('Authentication failed: %s@%s' % (usr,host)) 168 return False 169
170 - def authenticate2(self,host,usr,passwd,db,port=3306):
171 """ 172 Simple database authentication test 173 """ 174 try: 175 self._conn = MySQLdb.connect(host,usr,passwd,db,port=port) 176 self._conn.close() 177 return True 178 except Exception,e: 179 return False 180
181 - def set_auto(self,autocommit=True):
182 """ Set auto-commit """ 183 self._auto = autocommit 184 try: 185 self._conn.autocommit(self._auto) 186 except Exception,e: 187 self.logger.warn(e)
188
189 - def connect(self):
190 """ Connect to database """ 191 if self.isconnected(): 192 try: 193 self.ping() 194 return 195 except Exception ,e: 196 self.logger.error(str(e)) 197 self._connected = False 198 if not self.host_ or not self.usr_ or not self.passwd_ or not self.db_: 199 self.auth_function(self) 200 return 201 try: 202 self._conn = MySQLdb.connect(self.host_,self.usr_,self.passwd_,self.db_,self.port_) 203 self._connected = True 204 except Exception,e: 205 self.logger.debug('%s: %s' % (sys.exc_type,e)) 206 self.logger.error('Connection failed : %s@%s' % (self.usr_,self.host_)) 207 self.auth_function(self) 208 return
209
210 - def disconnect(self):
211 """ Disconnect from database """ 212 if self._connected: 213 self._conn.close() 214 self._connected = False
215
216 - def isconnected(self):
217 return self._conn and self._connected
218 219
220 - def mkkey(self,minsize,maxsize):
221 """ 222 Generate random alphanumeric sequence 223 """ 224 key = '' 225 seq = ['a','b','c','d','e','f','g','h','i','j', 226 'k','l','m','n','o','p','q','r','s','t', 227 'u','v','w','x','y','z'] 228 seq += map(string.upper,seq) 229 seq += range(0,9) 230 r = random.Random() 231 size = r.randint(minsize,maxsize) 232 for i in range(0,size): 233 key += str(r.choice(seq)) 234 return key
235 236
237 - def fetch_metaproject_id(self, project_name, project_version):
238 """ 239 Retrieve the metaproject_id for a given metaproject 240 @param project_name: 241 @param project_version: version string 242 """ 243 sql = """ 244 SELECT metaproject_id 245 FROM metaproject 246 WHERE name = '%s' AND versiontxt = '%s' 247 """ % ( project_name, project_version ) 248 249 cursor = self.getcursor() 250 cursor.execute(sql) 251 252 result_set = cursor.fetchall(); 253 if result_set: 254 return result_set[0]['metaproject_id'] 255 else: 256 self.logger.error('could not find an entry for %s %s' % (project_name,str(project_version))) 257 return -1
258 259
260 -class ConfigDB(IceProdDB):
261 262 logger = logging.getLogger('ConfigDB') 263
264 - def __init__(self):
265 """ 266 Constructor 267 """ 268 IceProdDB.__init__(self) 269 self.submitter = '' 270 self.temporary_storage = None 271 self.global_storage = None 272 self.metaproject_dict = { } 273 self.institution = '' 274 return
275
276 - def new(self):
277 """ 278 Create a copy of this instance 279 """ 280 newconn = ConfigDB() 281 newconn.host_ = self.host_ 282 newconn.usr_ = self.usr_ 283 newconn.passwd_ = self.passwd_ 284 newconn.db_ = self.db_ 285 newconn.port_ = self.port_ 286 newconn._connected = False 287 return newconn
288
289 - def SetSubmitter(self,submitter):
290 self.submitter = submitter
291
292 - def SetInstitution(self,institution):
293 self.institution = institution
294
295 - def SetTempStoragePath(self,path):
296 """ 297 Set the temporary path for dataset. 298 @param path: 299 """ 300 self.temporary_storage = path
301
302 - def SetGlobalStoragePath(self,path):
303 """ 304 Set the global path in datawarehouse for dataset. 305 @param path: 306 """ 307 self.global_storage = path
308 309
310 - def updateConfig(self,dataset_id,param_dict):
311 """ 312 update icetray configuration to database 313 @param dataset_id: primary key in production database 314 @param param_dict: Dictionary of parameters to update 315 """ 316 try: 317 sql = """ 318 SELECT * from dataset 319 WHERE dataset_id = %d """ % dataset_id 320 321 cursor = self.getcursor() 322 cursor.execute(sql) 323 result_set = cursor.fetchall(); 324 325 r = result_set[0]; 326 if r["jobs_completed"] == r['jobs_submitted']: 327 param_dict["status"] = 'COMPLETE' # completed 328 self.logger.info("status %d" % param_dict['status']) 329 330 elif r["jobs_failed"] > 0 : 331 param_dict["status"] = 'ERRORS' # some jobs failed 332 self.logger.info("status %d" % param_dict['status']) 333 334 except Exception, e: 335 self.logger.error(" %s could not fetch dataset %d" % (e,dataset_id)) 336 337 try: 338 sql = """UPDATE dataset SET """ 339 340 for key in param_dict.keys(): 341 sql += " %s=%s, " % (key,param_dict[key]) 342 sql += " enddate=NOW() " 343 sql += " WHERE dataset_id = %d" % dataset_id 344 345 self.logger.debug(sql) 346 cursor = self.getcursor() 347 cursor.execute(sql) 348 self.commit() 349 350 except Exception, e: 351 self.logger.error(str(e) + " rolling back transaction" ) 352 self._conn.rollback() 353 raise Exception, e
354 355
356 - def upload_config(self,steering,ticket=0,template=0,maxjobs=0):
357 """ 358 icetray configuration to database 359 @param steering: IceTrayConfig object containing configuration 360 @param ticket: optional ticket ID to relate dataset to 361 @param template: Whether this is a template or not 362 @return: primary key for run on config db 363 """ 364 365 # Do this as a transaction in case one of the table inserts fails 366 #self._conn.begin() 367 dataset_id = None 368 369 try: 370 #maxjobs = steering.GetParameter('MAXJOBS').GetValue() 371 debug = steering.GetParameter('DEBUG') 372 geo = steering.GetParameter('geometry') 373 if debug: 374 debug = int(debug.GetValue()) 375 else: 376 debug = 0 377 378 simcat_id = self.loadsimcat(steering) 379 parent_id = steering.GetParentId() 380 status = 'PROCESSING' 381 if template: 382 status = 'TEMPLATE' 383 384 sql = """INSERT IGNORE INTO dataset 385 ( 386 simcat_id, 387 startdate, 388 username, 389 institution, 390 description, 391 status, 392 temporary_storage, 393 global_storage, 394 jobs_submitted , 395 ticket_number, 396 parent_id, 397 debug, 398 dataset_category 399 ) 400 VALUES """ 401 402 desc = steering.GetDescription() 403 sql += """( 404 %d, 405 NOW(), 406 \'%s\', 407 \'%s\', 408 \'%s\', 409 \'%s\', 410 \'%s\', 411 \'%s\', 412 %d, 413 %d, 414 %d, 415 %d, 416 \'%s\')""" % \ 417 ( 418 simcat_id, 419 self.submitter, 420 self.institution, 421 re.sub('\'','\\\' ',desc), 422 status, 423 self.temporary_storage, 424 self.global_storage, 425 int(maxjobs), 426 ticket, 427 parent_id, 428 debug, 429 steering.GetDatasetType() 430 ) 431 432 sql = re.sub('\s+',' ',sql) 433 cursor = self.getcursor() 434 cursor.execute(sql) 435 dataset_id = self.insert_id() 436 437 self.load_steering(dataset_id,steering) 438 self.load_steering_dependencies(dataset_id,steering) 439 self.load_tasks(dataset_id,steering) 440 self.load_batch_options(dataset_id,steering) 441 self.load_externals(dataset_id,steering) 442 443 tray_index=0 444 tsql = " INSERT INTO tray " 445 tsql += " (dataset_id,tray_index,inputevents,iterations,name) " 446 tsql += " VALUES (%s, %s, %s, %s, %s)" 447 for i3config in steering.GetTrays(): 448 # do inserts one at a time so we can get insert IDs as we go 449 params = (dataset_id, tray_index, \ 450 int(i3config.GetEvents()), \ 451 int(i3config.GetIterations()), 452 self.nullify(i3config.GetName()) 453 ) 454 cursor.execute(tsql, params) 455 tray_id = self._conn.insert_id() 456 457 funcs = {'input': i3config.GetInputFiles, \ 458 'output': i3config.GetOutputFiles} 459 files = [] 460 for type, func in funcs.iteritems(): 461 for file in func(): 462 files.append((tray_id, type, file.GetName(), int(file.IsPhotonicsTable()))) 463 if files: 464 fsql = " INSERT INTO tray_files" 465 fsql += " (tray_id, type, name, photonics)" 466 fsql += " VALUES (%s, %s, %s, %s)" 467 cursor.executemany(fsql, files) 468 469 self.load_projects(dataset_id,i3config,tray_index) 470 self.load_pre(dataset_id,i3config,tray_index) 471 self.load_services(dataset_id,i3config,tray_index) 472 self.load_modules(dataset_id,i3config,tray_index) 473 self.load_connections(dataset_id,i3config,tray_index) 474 self.load_post(dataset_id,i3config,tray_index) 475 tray_index+=1 476 477 if geo: 478 geo_unique = [] 479 for g in geo.GetValue().replace('+',' ').replace('and',' ').replace(',',' ').split(): 480 if g and g not in geo_unique: 481 geo_unique.append(g) 482 self.load_geometry(g,dataset_id) 483 484 except Exception, e: 485 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 486 self.logger.error(str(e) + " rolling back transaction" ) 487 if not self._conn == None: 488 self._conn.rollback() 489 raise Exception, e 490 491 # Commit changes 492 self.commit() 493 self.logger.debug("Successful load configuration to database" ) 494 495 return dataset_id 496
497 - def SetStorageURL(self,dataset,steering):
498 """ 499 Update status of dataset 500 """ 501 cursor = self.getcursor() 502 sql = " INSERT IGNORE INTO urlpath " 503 sql += " (dataset_id, name, path) " 504 sql += " VALUES " 505 cm = "" 506 sql1 = "" 507 for p in steering.GetParameters(): 508 if p.GetName().startswith("TARGET::"): 509 sql1 += "%s (%d, '%s', '%s') " % (cm,dataset,p.GetName().replace("TARGET::",""),p.GetValue()) 510 cm ="," 511 if sql1: 512 cursor.execute(sql+sql1)
513 514
515 - def get_simcat_categories(self):
516 """ 517 Get list of sim_cat names 518 """ 519 cursor = self.getcursor() 520 sql = " SELECT category FROM simcat " 521 cursor.execute(sql) 522 return map(lambda x: x['category'],cursor.fetchall());
523 524
525 - def getNewSets(self):
526 """ 527 Get a list of new datasets. 528 """ 529 cursor = self.getcursor() 530 sql = " SELECT dataset_id,jobs_submitted FROM dataset " 531 sql += " WHERE status='PROCESSING' " 532 sql += " AND verified='TRUE' " 533 self.logger.debug(sql) 534 cursor.execute(sql) 535 self.commit() 536 return cursor.fetchall();
537
538 - def load_datasetparams(self,paramdict,dataset):
539 """ 540 Load simulation category 541 @return: index of category 542 """ 543 self.logger.debug("loading simulation category") 544 cursor = self.getcursor() 545 546 sql = " INSERT INTO dataset_param " 547 sql += " (dataset_id,name,value) " 548 sql += " VALUES " 549 cm = '' 550 for item in paramdict.items(): 551 sql += " %s(%d,'%s','%s') " % (cm,dataset,item[0],item[1]) 552 cm = ',' 553 cursor.execute(sql) 554 self.commit() 555 556 if paramdict.has_key('geometry'): 557 self.logger.debug("setting geometry") 558 sql = " UPDATE dataset SET geometry = '%s' " % paramdict['geometry'] 559 sql += " WHERE dataset_id = %d " % dataset 560 cursor.execute(sql) 561 self.commit() 562
563 - def load_geometry(self,geo,dataset):
564 self.logger.debug("loading geometry information") 565 cursor = self.getcursor() 566 sql = " INSERT IGNORE INTO geometry " 567 sql += " (dataset_id,name) " 568 sql += " VALUES (%u,'%s') " % (dataset,geo) 569 self.logger.info(sql) 570 cursor.execute(sql) 571 572
573 - def loadsimcat(self,steering):
574 """ 575 Load simulation category 576 @return: index of category 577 """ 578 self.logger.debug("loading simulation category") 579 category = steering.GetCategory() 580 581 sql = """ 582 SELECT simcat_id from simcat 583 WHERE category = '%s' """ % category 584 585 cursor = self.getcursor() 586 cursor.execute(sql) 587 result_set = cursor.fetchall(); 588 589 if len(result_set) > 0: 590 simcat_id = result_set[0]['simcat_id'] 591 else: 592 593 sql = """INSERT IGNORE INTO simcat 594 (category) VALUES ('%s')""" % category 595 596 cursor = self.getcursor() 597 cursor.execute(sql) 598 simcat_id = self.insert_id() 599 600 return simcat_id 601
602 - def load_externals(self,dataset_id,steering):
603 """ 604 Load external programs to run prior to icetray 605 @param dataset_id: primary key for run on config db 606 """ 607 self.logger.debug("loading externals") 608 externs = steering.GetExterns() 609 if not externs: 610 return 611 612 sql = "INSERT IGNORE INTO extern (" 613 sql += " name,command,version,description,arguments, " 614 sql += " extern.infile,extern.outfile, extern.errfile, " 615 sql += " steering,steering_name, dataset_id) " 616 sql += " VALUES " 617 618 cm = '' 619 for e in externs: 620 sql += "%s\n(" % cm 621 sql += "'%s'," % e.GetName() 622 sql += "'%s'," % e.GetExec() 623 sql += "%s," % self.nullify(e.GetVersion()) 624 sql += "%s," % self.nullify(e.GetDescription()) 625 sql += "%s," % self.nullify(e.GetArgs()) 626 sql += "%s," % self.nullify(e.GetInFile()) 627 sql += "%s," % self.nullify(e.GetOutFile()) 628 sql += "%s," % self.nullify(e.GetErrFile()) 629 if e.GetSteering(): 630 sql += "'%s'," % e.GetSteering()[0].GetText() 631 sql += "'%s'," % e.GetSteering()[0].GetName() 632 else: 633 sql += "NULL,NULL," 634 sql += "%d)" % dataset_id 635 cm = ',' 636 637 sql = re.sub('\s+',' ',sql) 638 cursor = self.getcursor() 639 self.logger.debug(sql) 640 cursor.execute(sql) 641 return 642 643
644 - def load_projects(self,dataset_id,i3config,tray_index):
645 """ 646 Load projects to database 647 @param dataset_id: primary key for run on config db 648 """ 649 self.logger.debug("loading projects") 650 651 # First load metaprojects and their projects 652 load_index=0 653 for mproj in i3config.GetMetaProjectList(): 654 mpid = self.load_metaproject(mproj,dataset_id,tray_index,load_index) 655 load_index += 1 656
657 - def load_steering(self,dataset_id,steering):
658 """ 659 Load steering parameters to database 660 @param dataset_id: primary key for run on config db 661 """ 662 663 sql = """INSERT IGNORE INTO steering_parameter 664 (type, name, value, description,dataset_id) VALUES """ 665 666 cm = '' 667 for p in steering.GetParameters(): 668 type = p.GetType() 669 name = p.GetName() 670 value = p.GetValue() 671 desc = p.GetDescription() 672 sql += "%s\n (\'%s\',\'%s\',\'%s\',\'%s\',%d)" % \ 673 (cm,type,name,value,desc,dataset_id) 674 cm = ',' 675 676 sql = re.sub('\s+',' ',sql) 677 cursor = self.getcursor() 678 cursor.execute(sql) 679
680 - def load_steering_dependencies(self,dataset_id,steering):
681 """ 682 Load file dependencies in steering element to database 683 @param dataset_id: primary key for run on config db 684 """ 685 dependencies = steering.GetDependencies() 686 if not dependencies: 687 return 688 689 sql = """INSERT IGNORE INTO steering_dependency 690 (filename, dataset_id) VALUES """ 691 692 cm = '' 693 for d in dependencies: 694 sql += "%s\n (\'%s\',%d)" % (cm,d,dataset_id) 695 cm = ',' 696 697 sql = re.sub('\s+',' ',sql) 698 cursor = self.getcursor() 699 cursor.execute(sql) 700 return 701
702 - def load_tasks(self,dataset_id,steering):
703 """ 704 Load tasks in steering element to database, used for 705 multi-part simulation jobs 706 @param dataset_id: primary key for run on config db 707 """ 708 tasks = steering.GetTaskDefinitions() 709 if not tasks: 710 return 711 712 # insert all the tasks first 713 sql = "INSERT INTO task_def (dataset_id,name,reqs,parallel,photonics) VALUES (%s,%s,%s,%s,%s)" 714 inserts = [] 715 for name,task in tasks.items(): 716 reqs = task.GetRequirements() 717 parallel = int(task.ParallelExecutionEnabled()) 718 photonics = int(task.UsesPhotonics()) 719 self.logger.debug(sql % (dataset_id,name,reqs,parallel,photonics)) 720 self.logger.debug("task %s added to DB, parallel: %s, photonics: %s, reqs: %s" \ 721 % (name,parallel,photonics,reqs)) 722 inserts.append((dataset_id,name,reqs,parallel,photonics)) 723 self.logger.debug(inserts) 724 cursor = self.getcursor() 725 cursor.executemany(sql, inserts) 726 self.logger.debug("task definitions added") 727 728 # now set up the relationships and trays each task runs 729 relationship_sql = "INSERT INTO task_def_rel" \ 730 + " (parent_task_def_id,child_task_def_id)" \ 731 + " VALUES (%s, %s)" 732 tray_sql = "INSERT INTO task_def_tray (task_def_id,idx,iter)" \ 733 + " VALUES (%s,%s,%s)" 734 id_sql = "SELECT task_def_id FROM task_def WHERE dataset_id = %s" \ 735 + " AND name = %s" 736 for name,task in tasks.items(): 737 cursor.execute(id_sql, (dataset_id,name)) 738 row = cursor.fetchone() 739 if not row: 740 self.logger.error("task %s didn't get inserted into DB" % \ 741 name) 742 cursor.rollback() 743 return 744 task_id = row['task_def_id'] 745 746 # add parents 747 inserts = [] 748 parents = task.GetParents() 749 for parent in parents: 750 self.logger.debug("task %s has parent %s" % (name,parent)) 751 cursor.execute(id_sql, (dataset_id,parent)) 752 row = cursor.fetchone() 753 if not row: 754 self.logger.error("referenced parent task %s not found in DB" % \ 755 parent) 756 cursor.rollback() 757 return 758 parent_id = row['task_def_id'] 759 inserts.append((parent_id,task_id)) 760 cursor.executemany(relationship_sql, inserts) 761 762 # add trays 763 inserts = [] 764 trays = task.GetTrays() 765 for index,tray in trays.items(): 766 for iter in tray.GetIters(): 767 self.logger.debug("task %s has tray %s iter %s" \ 768 % (name,index,iter)) 769 inserts.append((task_id,index,iter)) 770 cursor.executemany(tray_sql, inserts) 771 self.logger.debug("added all tasks") 772 self.commit() 773 return
774
775 - def load_batch_options(self,dataset_id,steering):
776 """ 777 Load batch system options from steering to database 778 @param dataset_id: primary key for run on config db 779 """ 780 batchopts = steering.GetBatchOpts() 781 if not batchopts: 782 return 783 784 sql = """INSERT IGNORE INTO batch_option 785 (name, type, value, dataset_id) VALUES """ 786 787 cm = '' 788 for b in batchopts: 789 name = b.GetName() 790 value = b.GetValue() 791 type = b.GetType() 792 sql += "%s\n (\'%s\',\'%s\',\'%s\',%d)" % \ 793 (cm,name,type,value,dataset_id) 794 cm = ',' 795 796 sql = re.sub('\s+',' ',sql) 797 cursor = self.getcursor() 798 cursor.execute(sql) 799 800
801 - def load_project(self,project,dataset_id,metaproject_id,tray_index,load_index):
802 """ 803 Load project to database 804 @param project: the Project object to be loaded 805 @param dataset_id: primary key for run on config db 806 @return: primary key for projects table on config db 807 808 """ 809 cursor = self.getcursor() 810 pid = self.fetch_project_id(project.GetName(), project.GetVersion()) 811 self.logger.debug("%s.%s pid is %s" % (project.GetName(),project.GetVersion(),pid)) 812 if not pid: 813 ver = project.GetVersion() 814 name = project.GetName() 815 sql = " INSERT IGNORE INTO project " 816 if isinstance(ver,types.StringTypes): 817 sql += "(name, versiontxt,major_version,minor_version,patch_version) " 818 sql += ' VALUES ' 819 820 vt = ('00','00','00') 821 # We still need to fill major,minor,path for back 822 # compatibility 823 legacy_ver = self.version_regex.search(ver) 824 if legacy_ver: 825 legacy_ver = legacy_ver.group(0).replace('V','') 826 vt = legacy_ver.split('-') 827 sql += "('%s','%s','%s','%s','%s')" % (name,ver,vt[0],vt[1],vt[2]) 828 self.logger.debug(sql) 829 else: 830 raise Exception, "incompatible version type: %s" % type(version) 831 cursor.execute(sql.strip()) 832 pid = self.insert_id() 833 self.logger.debug("After insert: %s.%s pid is %s" % (project.GetName(),project.GetVersion(),pid)) 834 if cursor.rowcount: 835 self.logger.debug("inserted id %d" % pid) 836 else: 837 self.logger.warn("could not load project %s " % name) 838 sql = " INSERT IGNORE INTO mp_pivot " 839 sql += " (project_id, metaproject_id) VALUES (%d,%d)" % (pid,metaproject_id) 840 cursor.execute(sql.strip()) 841 842 project.SetId(pid) 843 sql = " INSERT INTO project_pivot " 844 sql += " (project_id, dataset_id,tray_index,load_index) " 845 sql += " VALUES (%d,%d,%d,%d)" % (pid,dataset_id,tray_index,load_index) 846 cursor.execute(sql.strip()) 847 return pid 848
849 - def fetch_project_id(self, project_name,project_version):
850 """ 851 Retrieve the project_id for a given project 852 @param project_name: name of library 853 @param project_version: version string 854 """ 855 sql = " SELECT project_id " 856 sql += " FROM project " 857 sql += " WHERE name = '%s' " % project_name 858 sql += " AND versiontxt = '%s' " % project_version 859 860 cursor = self.getcursor() 861 cursor.execute(sql.strip()) 862 863 result_set = cursor.fetchall(); 864 if result_set: 865 return result_set[0]['project_id']
866 867 868
869 - def load_project_dependencies(self,project,metaproject_id,i3config):
870 """ 871 Load cross-references between projects (1) and depency projects 872 @param project: Project object 873 """ 874 875 sql = """ 876 INSERT IGNORE INTO project_depend 877 (project_id,metaproject_id,dependency_id) VALUES 878 """ 879 cm = '' 880 pid = project.GetId() 881 for p in project.GetDependencies(): 882 self.logger.debug("%s - getting project dependency: %s" % \ 883 (project.GetName(),p)) 884 if i3config.GetProject(p): 885 did = i3config.GetProject(p).GetId() 886 sql += "%s\n(%s,%s,%s)" % (cm,pid,metaproject_id,did) 887 cm = ',' 888 889 if not cm: 890 return 891 892 sql = re.sub('\s+',' ',sql) 893 cursor = self.getcursor() 894 cursor.execute (sql) 895 896 self.logger.debug(self.insert_id()) 897 self.logger.debug( 898 "%d project_dependency rows were inserted" % cursor.rowcount) 899 return 900 901
902 - def load_metaproject(self,metaproject,dataset_id,tray_index,load_index):
903 """ 904 Load metaproject to database 905 @param metaproject: the Project object to be loaded 906 @param dataset_id: primary key for run on config db 907 @return: primary key for projects table on config db 908 909 """ 910 name = metaproject.GetName() 911 version = metaproject.GetVersion() 912 mpid = self.fetch_metaproject_id(name, version) 913 914 if not mpid: 915 raise Exception, "metaproject '%s-%s' not found." % (name,str(version)) 916 917 918 sql = " INSERT IGNORE INTO metaproject_pivot " 919 sql += " (metaproject_id, dataset_id,tray_index,load_index) " 920 sql += " VALUES (%d,%d,%d,%d)""" % (mpid,dataset_id,tray_index,load_index) 921 922 sql = re.sub('\s+',' ',sql) 923 cursor = self.getcursor() 924 cursor.execute(sql) 925 metaproject.SetId(mpid) 926 927 project_load_index=0 928 for proj in metaproject.GetProjectList(): 929 self.load_project(proj,dataset_id,mpid,tray_index,project_load_index) 930 self.logger.debug("%s.%s.GetId() = %s" %(proj.GetName(),proj.GetVersion(),proj.GetId())) 931 project_load_index+=1 932 933 return mpid 934
935 - def load_dependencies(self,module,module_id,i3config):
936 """ 937 Load cross-references between modules (1) and depency projects 938 @param module: Module object 939 """ 940 941 sql = """ 942 INSERT IGNORE INTO module_dependency 943 (module_id,project_id) VALUES 944 """ 945 cm = '' 946 for p in module.GetProjectList(): 947 project = i3config.GetProject(p.GetName()) 948 if project: 949 self.logger.debug("%s - getting module dependency: %s" % \ 950 (module.GetName(),project.GetName())) 951 sql += "%s\n(%d,%d)" % (cm,module_id,project.GetId()) 952 cm = ',' 953 else: 954 self.logger.error("project %s not found" % p.GetName()) 955 956 if not cm: 957 return 958 959 self.logger.debug(sql.strip()) 960 cursor = self.getcursor() 961 cursor.execute (sql) 962 963 self.logger.debug(self.insert_id()) 964 self.logger.debug("%d module_dependency rows were inserted" % cursor.rowcount) 965 return 966 967
968 - def load_connections(self,dataset_id,i3config,tray_index):
969 """ 970 Load module connections to database 971 @param dataset_id: primary key for run on config db 972 """ 973 for con in i3config.GetConnections(): 974 cid = self.load_connection(con,dataset_id,tray_index) 975
976 - def load_connection(self,connection,dataset_id,tray_index):
977 """ 978 Load connection to database 979 @param connection: the Connection object to be loaded 980 @param dataset_id: primary key for run on config db 981 @return: primary key for projects table on config db 982 983 """ 984 sql = """INSERT IGNORE INTO `connection` 985 (source, outbox, destination, inbox, 986 dataset_id,tray_index) VALUES """ 987 988 989 source = connection.GetOutbox().GetModule() 990 outbox = connection.GetOutbox().GetBoxName() 991 destination = connection.GetInbox().GetModule() 992 inbox = connection.GetInbox().GetBoxName() 993 994 sql += "(\'%s\',\'%s\',\'%s\',\'%s\',%d,%d)" % \ 995 (source,outbox,destination,inbox,dataset_id,tray_index) 996 sql = re.sub('\s+',' ',sql) 997 998 cursor = self.getcursor() 999 cursor.execute(sql) 1000 cid = self.insert_id() 1001 if cursor.rowcount: 1002 self.logger.debug("inserted id %d into connections table" % cid) 1003 for mesg in cursor.messages: 1004 self.logger.debug("connections: %s " % mesg) 1005 else: 1006 for mesg in cursor.messages: 1007 self.logger.error("connections: %s " % mesg) 1008 return cid 1009
1010 - def load_pre(self,dataset_id,i3config,tray_index):
1011 """ 1012 Load IceProd pre modules into the database. 1013 @param dataset_id: primary key for run on config db 1014 """ 1015 load_index=0 1016 for module in i3config.GetIceProdPres(): 1017 self.load_module(module,dataset_id,load_index,tray_index,i3config,'iceprod','pre') 1018 self.load_params(module,dataset_id,tray_index) 1019 load_index +=1 1020
1021 - def load_post(self,dataset_id,i3config,tray_index):
1022 """ 1023 Load IceProd post modules into the database. 1024 @param dataset_id: primary key for run on config db 1025 """ 1026 load_index=0 1027 for module in i3config.GetIceProdPosts(): 1028 self.load_module(module,dataset_id,load_index,tray_index,i3config,'iceprod','post') 1029 self.load_params(module,dataset_id,tray_index) 1030 load_index +=1 1031
1032 - def load_modules(self,dataset_id,i3config,tray_index):
1033 """ 1034 Load modules into the database. 1035 @param dataset_id: primary key for run on config db 1036 """ 1037 load_index=0 1038 for module in i3config.GetModules(): 1039 self.load_module(module,dataset_id,load_index,tray_index,i3config) 1040 self.load_params(module,dataset_id,tray_index) 1041 load_index +=1 1042 1043
1044 - def load_services(self,dataset_id,i3config,tray_index):
1045 """ 1046 Load services into the database. 1047 @param dataset_id: primary key for run on config db 1048 """ 1049 load_index=0 1050 for service in i3config.GetServices(): 1051 self.load_service(service,dataset_id,load_index,tray_index,i3config) 1052 self.load_params(service,dataset_id,tray_index) 1053 load_index +=1 1054 1055
1056 - def load_module(self,module,dataset_id,load_index,tray_index,i3config,type='module',iptype='tray'):
1057 """ 1058 Load individual module into the database given a run ID. 1059 @param module: the module to be loaded 1060 @param dataset_id: primary key for run on config db 1061 @param load_index: order in which module should be loaded 1062 @param tray_index: tray instance to add module to 1063 @param i3config: Steering instance 1064 @param type: module,service,iceprod 1065 @param iptype: one of tray,pre,post. Serves to distinguish pre and post modules 1066 @return: primary key for modules table on config db 1067 """ 1068 cursor = self.getcursor() 1069 if type == 'iceprod': 1070 pname = 'iceprod' 1071 pver = iceprod.__version__ 1072 pid = self.fetch_project_id(pname,pver) 1073 if not pid: 1074 1075 vt = ('00','00','00') 1076 legacy_ver = self.version_regex.search(pver) 1077 if legacy_ver: 1078 legacy_ver = legacy_ver.group(0).replace('V','') 1079 vt = legacy_ver.split('-') 1080 1081 sql = " INSERT INTO project " 1082 sql += " (name, versiontxt,major_version,minor_version,patch_version) " 1083 sql += " VALUES ('%s','%s','%s','%s','%s')" % (pname,pver,vt[0],vt[1],vt[2]) 1084 cursor.execute(sql) 1085 pid = self.insert_id() 1086 1087 self.logger.debug("load_module: %s " % pid) 1088 else: 1089 if not module.GetProjectList(): 1090 self.logger.error("module %s doesn't have project attrbute" % module.GetName()) 1091 raise Exception, "module %s is missing parent project"% module.GetName() 1092 project = module.GetProjectList()[0] 1093 project = i3config.GetProject(project.GetName()) 1094 pid = project.GetId() 1095 self.logger.debug("load_module: %s " % pid) 1096 1097 self.logger.debug('fectching %s module for project id %s' % (type,pid)) 1098 mid = self.fetch_module_id(module,pid,type) 1099 self.logger.debug('fectched %s module with id %s' % (type,mid)) 1100 if not mid: 1101 sql = " INSERT INTO module " 1102 sql += " (name,class,module_type,project_id) " 1103 sql += " VALUES (\'%s\',\'%s\',\'%s\',%d) " \ 1104 % (module.GetName(),module.GetClass(),type,pid) 1105 1106 self.logger.debug(sql.strip()) 1107 cursor.execute(sql.strip()) 1108 mid = self.insert_id() 1109 if cursor.rowcount: 1110 self.logger.debug("inserted %s id %d" % (type,mid)) 1111 else: 1112 self.logger.debug("failed to insert %s id %d" % (type,mid)) 1113 self.load_dependencies(module,mid,i3config) 1114 sql = " INSERT INTO module_pivot " 1115 sql += " (module_id, name, dataset_id,tray_index,load_index,iptype) " 1116 sql += " VALUES (%d,'%s',%d,%d,%d,'%s') " % ( mid,module.GetName(), 1117 dataset_id,tray_index,load_index,iptype) 1118 self.logger.debug(sql.strip()) 1119 cursor.execute(sql.strip()) 1120 mpid = self.insert_id() 1121 module.SetId(mpid) 1122 return mpid 1123 1124 1125
1126 - def fetch_module_id(self,module,project_id,type='module'):
1127 """ 1128 retrive id for module with matching name, and project_id 1129 (there should only be one) 1130 @param module: module to query 1131 @param project_id: primary key of parent project 1132 @param type: ('module'|'service') 1133 """ 1134 sql = " SELECT module_id FROM module " 1135 sql += " WHERE class ='%s' " % module.GetClass() 1136 sql += " AND module_type ='%s' " % type 1137 sql += " AND project_id =%d " % project_id 1138 1139 self.logger.debug(sql.strip()) 1140 cursor = self.getcursor() 1141 cursor.execute (sql.strip()); 1142 result = cursor.fetchone() 1143 self.logger.debug(str(result)) 1144 if result: 1145 return int(result['module_id']) 1146 else: 1147 self.logger.warn("module \'%s\' not found" % module.GetClass()) 1148 return 1149
1150 - def fetch_service_id(self,service,project_id):
1151 """ 1152 retrive id for service with matching name, and project_id 1153 (there should only be one) 1154 @param service: service to query 1155 @param project_id: primary key of parent project 1156 """ 1157 return self.fetch_module_id(service,project_id,'service') 1158
1159 - def load_service(self,service,dataset_id,load_index,tray_index,i3config):
1160 """ 1161 Load individual service into the database given a run ID. 1162 @param service: the Service object to be loaded 1163 @param dataset_id: primary key for run on config db 1164 @return: primary key for services table on config db 1165 """ 1166 return self.load_module(service,dataset_id,load_index,tray_index,i3config,type='service') 1167
1168 - def insert_omkey(self,omkey,pid):
1169 """ 1170 Add OMKey object 1171 @param omkey: OMKeys 1172 @param pid: configured parameter id or cparameter_id 1173 """ 1174 cursor = self.getcursor() 1175 sql = " INSERT INTO carray_element (name,value,cparameter_id) " 1176 sql += " VALUES " 1177 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid) 1178 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid) 1179 cursor.execute (sql.strip()) 1180
1181 - def insert_omkey_array(self,omkeyvect,pid):
1182 """ 1183 Add array of OMKey objects 1184 @param omkeyvect: list of OMKeys 1185 @param pid: configured parameter id or cparameter_id 1186 """ 1187 if not len(omkeyvect) > 0: return 1188 cursor = self.getcursor() 1189 sql = " INSERT INTO carray_element (name,value,cparameter_id) " 1190 sql += " VALUES " 1191 cm = "" 1192 for omkey in omkeyvect: 1193 sql += cm 1194 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid) 1195 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid) 1196 cm = "," 1197 cursor.execute (sql.strip()) 1198
1199 - def insert_array(self,values,pid):
1200 """ 1201 Add value array 1202 @param values: list of array elements 1203 @param pid: configured parameter id or cparameter_id 1204 """ 1205 cursor = self.getcursor() 1206 if not len(values) > 0: return 1207 sql = " INSERT INTO carray_element (value,unit,cparameter_id) " 1208 sformat = lambda x: "('%s',%s,%s)" % (x.value,self.nullify(x.unit),pid) 1209 vals = ",".join(map(sformat,values)) 1210 sql += " VALUES " + vals 1211 cursor.execute (sql.strip()) 1212 1213
1214 - def load_params(self,module,dataset_id,tray_index):
1215 """ 1216 Load parameters into the database. 1217 @param module: whose parameters are to be loaded to database 1218 """ 1219 cursor = self.getcursor() 1220 sql = " INSERT INTO cparameter " 1221 sql += " (name,type,unit,module_pivot_id,dataset_id,tray_index,value) " 1222 sql += " VALUES " 1223 count = 0 1224 1225 m_id = module.GetId() 1226 self.logger.debug('load_params: mid = %s' % m_id) 1227 1228 if not module.GetParameters(): 1229 return 1230 for p in module.GetParameters(): 1231 name = p.GetName() 1232 type = p.GetType() 1233 desc = p.GetDescription() 1234 1235 if type == 'OMKey' or type in VectorTypes: 1236 value = 0 1237 unit = 'NULL' 1238 else: 1239 value = p.GetValue().value 1240 unit = self.nullify(p.GetValue().unit) 1241 sql1 = sql + " ('%s','%s',%s,%d,%d,%d,'%s') " % \ 1242 (name,type,unit,m_id,dataset_id,tray_index,value) 1243 self.logger.debug(sql1.strip()) 1244 cursor.execute (sql1.strip()) 1245 pid = self.insert_id() 1246 p.SetId(pid) 1247 count = count + cursor.rowcount 1248 1249 if type == 'OMKey': 1250 self.insert_omkey(p.GetValue(),pid) 1251 elif type == 'OMKeyv': 1252 self.insert_omkey_array(p.GetValue(),pid) 1253 elif type in VectorTypes: 1254 self.insert_array(p.GetValue(),pid) 1255 1256 self.logger.debug("%d cparameter rows were inserted" % count) 1257 1258
1259 - def show_dataset_table(self,search_string=""):
1260 """ 1261 DOWNLOAD dataset briefs FROM database 1262 @return: resultset from database 1263 """ 1264 sql = " SELECT * FROM dataset " 1265 if search_string and len(search_string): 1266 sql += " WHERE username LIKE '%%%s%%' " % search_string 1267 sql += " OR hostname LIKE '%%%s%%' " % search_string 1268 sql += " OR description LIKE '%%%s%%' " % search_string 1269 sql += " OR startdate LIKE '%%%s%%' " % search_string 1270 for token in search_string.split(): 1271 try: 1272 sql += " OR dataset_id = %d " % int(token) 1273 except: pass 1274 sql += " ORDER BY dataset_id DESC " 1275 1276 cursor = self.getcursor() 1277 cursor.execute(sql) 1278 result_set = cursor.fetchall(); 1279 return result_set 1280 1281 1282
1283 - def download_config(self,dataset_id, include_defaults=False,include_description=False):
1284 """ 1285 DOWNLOAD icetray configuration FROM database 1286 @param dataset_id: ID of the run whose configuration we whish to download 1287 @return: IceTrayConfig object containing the IceTray configuration 1288 """ 1289 1290 steering = self.download_steering(dataset_id) 1291 category = self.getsimcat(dataset_id) 1292 steering.SetCategory(category) 1293 self.download_steering_dependencies(dataset_id,steering) 1294 self.download_tasks(dataset_id,steering) 1295 self.download_batch_options(dataset_id,steering) 1296 self.download_externals(dataset_id,steering) 1297 1298 # Set the description field 1299 sql = "SELECT * FROM dataset WHERE dataset_id = %d" % dataset_id 1300 cursor = self.getcursor() 1301 cursor.execute(sql) 1302 result = cursor.fetchone(); 1303 if result: 1304 steering.SetDescription(result['description']) 1305 steering.SetParentId(result['dataset_id']) 1306 1307 # Get Trays 1308 sql = "SELECT * FROM tray WHERE dataset_id = %d" % dataset_id 1309 cursor = self.getcursor() 1310 cursor.execute(sql) 1311 trayitems = cursor.fetchall(); 1312 for tray in trayitems: 1313 i3config = IceTrayConfig() 1314 1315 tray_id = tray['tray_id'] 1316 tray_index = tray['tray_index'] 1317 i3config.SetEvents(tray['inputevents']) 1318 i3config.SetIterations(tray['iterations']) 1319 i3config.SetName(self.nonify(tray['name'])) 1320 1321 funcs = {'input': i3config.AddInputFile, \ 1322 'output': i3config.AddOutputFile} 1323 fsql = "SELECT type, name, photonics FROM tray_files WHERE tray_id = %s" 1324 cursor.execute(fsql, (tray_id,)) 1325 files = cursor.fetchall() 1326 for file in files: 1327 type = file['type'].lower() 1328 obj = IceTrayFile(file['name'], file["photonics"]) 1329 func = funcs[type] 1330 func(obj) 1331 1332 self.download_metaprojects(dataset_id,tray_index,i3config) 1333 self.download_projects(dataset_id,tray_index,i3config) 1334 self.download_pre(dataset_id,tray_index,i3config,include_defaults,include_description) 1335 self.download_services(dataset_id,tray_index,i3config,include_defaults,include_description) 1336 self.download_modules(dataset_id,tray_index,i3config, include_defaults=include_defaults,include_description=include_description) 1337 self.download_connections(dataset_id,tray_index,i3config) 1338 self.download_post(dataset_id,tray_index,i3config,include_defaults,include_description) 1339 steering.AddTray(i3config) 1340 1341 self.commit() 1342 return steering 1343 1344
1345 - def download_metaprojects(self,dataset_id,tray_index,i3config):
1346 """ 1347 Download metaprojects from database 1348 Download projects from database 1349 @param dataset_id: ID of the run whose configuration we wish to download 1350 """ 1351 sql = " SELECT metaproject.* " 1352 sql += " FROM metaproject,metaproject_pivot " 1353 sql += " WHERE metaproject_pivot.dataset_id = %d " % dataset_id 1354 sql += " AND metaproject.metaproject_id = " 1355 sql += " metaproject_pivot.metaproject_id " 1356 sql += " AND metaproject_pivot.tray_index = %d " % tray_index 1357 sql += " ORDER BY load_index " 1358 1359 cursor = self.getcursor() 1360 self.logger.debug(sql) 1361 cursor.execute(sql.strip()) 1362 result_set = cursor.fetchall(); 1363 for mp in result_set: 1364 mpname = mp['name'] 1365 mpver = mp['versiontxt'] 1366 mp_id = mp['metaproject_id'] 1367 mproject = MetaProject() 1368 mproject.SetId(mp_id) 1369 1370 self.logger.debug("downloaded metaproject %s with id %d" % \ 1371 (mproject.GetName(),mproject.GetId())) 1372 1373 mproject.SetName(mpname) 1374 mproject.SetVersion(mpver) 1375 1376 #add metaproject to local dictonary for reference 1377 i3config.AddMetaProject(mproject.GetName(),mproject) 1378 self.metaproject_dict[mp_id] = mproject 1379
1380 - def download_externals(self,dataset_id,steering):
1381 """ 1382 Download Load external programs to run prior to icetray 1383 @param dataset_id: primary key for run on config db 1384 """ 1385 self.logger.debug("downloading externals") 1386 1387 cursor = self.getcursor() 1388 1389 sql = "SELECT * FROM extern " 1390 sql += "WHERE dataset_id = %d " % dataset_id 1391 cursor.execute(sql.strip()) 1392 result_set = cursor.fetchall(); 1393 for e in result_set: 1394 extern = Extern() 1395 extern.SetName(self.nonify(e['name'])) 1396 extern.SetVersion(self.nonify(e['version'])) 1397 extern.SetExec(self.nonify(e['command'])) 1398 extern.SetDescription(self.nonify(e['description'])) 1399 extern.SetArgs(self.nonify(e['arguments'])) 1400 extern.SetInFile(self.nonify(e['infile'])) 1401 extern.SetOutFile(self.nonify(e['outfile'])) 1402 extern.SetErrFile(self.nonify(e['errfile'])) 1403 if self.nonify(e['steering_name']): 1404 es = ExternSteering() 1405 es.SetName(e['steering_name']) 1406 es.SetText(e['steering']) 1407 extern.AddSteering(es) 1408 steering.AddExtern(extern) 1409 1410 return 1411
1412 - def fetch_dict_value(self, key):
1413 """ 1414 Retrieve value stored in dictionary 1415 @param key: string key to dictionary entry 1416 """ 1417 sql = " SELECT value FROM dictionary WHERE " 1418 sql += " keystring = '%s' " % key 1419 cursor = self.getcursor() 1420 cursor.execute(sql.strip()) 1421 result_set = cursor.fetchone(); 1422 if result_set: 1423 return result_set['value'] 1424 else: 1425 return ''
1426
1427 - def fetch_filename(self, key,dataset_id=0):
1428 """ 1429 Retrieve value stored in dictionary 1430 @param key: string key to dictionary entry 1431 """ 1432 sql = " SELECT * FROM file " 1433 sql += " WHERE file_number = %d " % key 1434 if dataset_id: 1435 sql += " AND dataset_id = %d " % dataset_id 1436 cursor = self.getcursor() 1437 cursor.execute(sql.strip()) 1438 result_set = cursor.fetchone(); 1439 if result_set: 1440 return result_set 1441 else: 1442 return ''
1443
1444 - def fetch_project_list(self,dataset_id):
1445 """ 1446 Download projects from database 1447 @param dataset_id: ID of the run whose configuration we wish to download 1448 """ 1449 projects = [] 1450 1451 sql = " SELECT metaproject_pivot.metaproject_id,project.*" 1452 sql += " FROM metaproject_pivot,project,project_pivot,mp_pivot" 1453 sql += " WHERE metaproject_pivot.dataset_id = %d " % dataset_id 1454 sql += " AND project.project_id = mp_pivot.project_id " 1455 sql += " AND project.project_id = project_pivot.project_id " 1456 sql += " AND metaproject_pivot.metaproject_id = mp_pivot.metaproject_id " 1457 sql += " AND project_pivot.dataset_id = metaproject_pivot.dataset_id " 1458 sql += " ORDER BY metaproject_pivot.load_index,project_pivot.load_index " 1459 cursor = self.getcursor() 1460 self.logger.debug(sql.strip()) 1461 cursor.execute(sql.strip()) 1462 result_set = cursor.fetchall(); 1463 1464 for p in result_set: 1465 pname = p['name'] 1466 pver = p['versiontxt'] 1467 mp_id = p['metaproject_id'] 1468 project = Project() 1469 project.SetId(p['project_id']) 1470 self.logger.debug("downloaded project %s with id %d" % (project.GetName(),project.GetId())) 1471 project.SetName(pname) 1472 project.SetVersion(pver) 1473 projects.append(project) 1474 return projects 1475
1476 - def download_projects(self,dataset_id,tray_index,i3config):
1477 """ 1478 Download projects from database 1479 @param dataset_id: ID of the run whose configuration we wish to download 1480 """ 1481 1482 sql = " SELECT metaproject_pivot.metaproject_id,project.*" 1483 sql += " FROM metaproject_pivot,project,project_pivot,mp_pivot" 1484 sql += " WHERE metaproject_pivot.dataset_id = %d " % dataset_id 1485 sql += " AND project.project_id = mp_pivot.project_id " 1486 sql += " AND project.project_id = project_pivot.project_id " 1487 sql += " AND metaproject_pivot.metaproject_id = mp_pivot.metaproject_id " 1488 sql += " AND project_pivot.dataset_id = metaproject_pivot.dataset_id " 1489 sql += " AND metaproject_pivot.tray_index = project_pivot.tray_index " 1490 sql += " AND project_pivot.tray_index = %d " % tray_index 1491 sql += " ORDER BY metaproject_pivot.load_index,project_pivot.load_index " 1492 cursor = self.getcursor() 1493 self.logger.debug(sql.strip()) 1494 cursor.execute(sql.strip()) 1495 result_set = cursor.fetchall(); 1496 1497 for p in result_set: 1498 pname = p['name'] 1499 pver = p['versiontxt'] 1500 mp_id = p['metaproject_id'] 1501 project = Project() 1502 project.SetId(p['project_id']) 1503 self.logger.debug("downloaded project %s with id %d" % (project.GetName(),project.GetId())) 1504 1505 for d in self.fetch_project_dependencies(project.GetId(),mp_id): 1506 self.logger.debug(" %s - adding dependency %s" % (project.GetName(),d.GetName())) 1507 project.AddDependency(d.GetName()) 1508 project.SetName(pname) 1509 project.SetVersion(pver) 1510 try: 1511 metaproject = self.metaproject_dict[mp_id] 1512 self.logger.debug("found metaproject %s with id %d" % (metaproject.GetName(),mp_id)) 1513 metaproject.AddProject(pname,project) 1514 1515 if not i3config.HasMetaProject(metaproject.GetName()): 1516 i3config.AddMetaProject(metaproject.GetName(),metaproject) 1517 self.logger.debug("adding metaproject - %s" % metaproject.GetName()) 1518 1519 except KeyError, k: 1520 self.logger.warn("could not find metaproject with id %d" % mp_id) 1521 self.logger.warn("Adding project to top-level container.") 1522 i3config.AddProject(pname,project) 1523 1524
1525 - def fetch_project_dependencies(self,project_id,metaproject_id):
1526 """ 1527 retrive dependencies for project 1528 @param project_id: id of project 1529 @return array of project names 1530 """ 1531 dependencies = [] 1532 1533 sql = """ 1534 SELECT 1535 project_depend.project_depend_id, 1536 project.name, 1537 project.project_id, 1538 project.versiontxt 1539 FROM 1540 project,project_depend 1541 WHERE 1542 project.project_id = project_depend.dependency_id 1543 AND 1544 project_depend.project_id = %d 1545 AND 1546 project_depend.metaproject_id = %d 1547 ORDER BY 1548 project_depend.project_depend_id 1549 """ % (project_id,metaproject_id) 1550 1551 cursor = self.getcursor() 1552 sql = re.sub('\s+',' ',sql); 1553 self.logger.debug(sql); 1554 cursor.execute (sql); 1555 result_set = cursor.fetchall (); 1556 1557 for d in result_set: 1558 dependency = Project() 1559 dependency.SetName(d['name']) 1560 dependency.SetVersion(d['versiontxt']) 1561 1562 dependency.SetId(d['project_depend_id']) 1563 dependencies.append(dependency) 1564 1565 return dependencies 1566 1567
1568 - def fetch_module_dependencies(self,module_id):
1569 """ 1570 retrive dependencies for module 1571 @param module_id: id of module 1572 @return array of project names 1573 """ 1574 dependencies = [] 1575 1576 sql = """ 1577 SELECT 1578 module_dependency.module_dependency_id, 1579 project.name, 1580 project.project_id, 1581 project.versiontxt 1582 FROM 1583 project,module_dependency 1584 WHERE 1585 project.project_id = module_dependency.project_id 1586 AND 1587 module_dependency.module_id = %d 1588 ORDER BY 1589 module_dependency.module_dependency_id """ % module_id 1590 1591 cursor = self.getcursor() 1592 sql = re.sub('\s+',' ',sql); 1593 cursor.execute (sql); 1594 result_set = cursor.fetchall (); 1595 1596 for d in result_set: 1597 dependency = Project() 1598 dependency.SetName(d['name']) 1599 dependency.SetVersion(d['versiontxt']) 1600 dependency.SetId(d['project_id']) 1601 dependencies.append(dependency) 1602 1603 return dependencies 1604 1605
1606 - def download_connections(self,dataset_id,tray_index,i3config):
1607 """ 1608 Download module connections from database 1609 @param dataset_id: ID of the run whose configuration we whish to download 1610 """ 1611 sql = " SELECT * FROM `connection` " 1612 sql += " WHERE dataset_id = %d " % dataset_id 1613 sql += " AND tray_index = %d " % tray_index 1614 cursor = self.getcursor() 1615 cursor.execute(sql) 1616 result_set = cursor.fetchall(); 1617 for c in result_set: 1618 csource = c['source'] 1619 coutbox = c['outbox'] 1620 cdest = c['destination'] 1621 cinbox = c['inbox'] 1622 1623 conn = Connection() 1624 conn.From(csource,coutbox) 1625 conn.To(cdest,cinbox) 1626 i3config.AddConnection(conn) 1627
1628 - def getsimcat(self,dataset_id):
1629 """ 1630 Get simulation category 1631 @param dataset_id: dataset ID 1632 @return: category 1633 """ 1634 self.logger.debug("retrieving simulation category") 1635 1636 sql = """ 1637 SELECT simcat.category from simcat,dataset 1638 WHERE simcat.simcat_id = dataset.simcat_id 1639 AND dataset.dataset_id = %d """ % dataset_id 1640 1641 cursor = self.getcursor() 1642 cursor.execute(sql) 1643 result_set = cursor.fetchall(); 1644 1645 if len(result_set) > 0: 1646 return result_set[0]['category'] 1647
1648 - def download_steering(self,dataset_id):
1649 """ 1650 Get steering parameters from database 1651 @param dataset_id: ID of the run whose configuration we whish to download 1652 """ 1653 steering = Steering() 1654 sql = " SELECT * FROM steering_parameter " 1655 sql += " WHERE dataset_id = '%s'" % dataset_id 1656 sql += " ORDER by name " 1657 cursor = self.getcursor() 1658 cursor.execute(sql) 1659 result_set = cursor.fetchall(); 1660 1661 for p in result_set: 1662 param = Parameter() 1663 param.SetType(p['type']) 1664 param.SetName(p['name']) 1665 param.SetValue(p['value']) 1666 steering.AddParameter(param) 1667 return steering 1668
1669 - def get_steering_parameter(self,dataset_id,param):
1670 sql = " SELECT value FROM steering_parameter " 1671 sql += " WHERE name = '%s'" % param 1672 sql += " AND dataset_id = '%s'" % dataset_id 1673 cursor = self.getcursor() 1674 cursor.execute(sql) 1675 result = cursor.fetchone(); 1676 if result: 1677 return result['value'] 1678
1679 - def download_steering_dependencies(self,dataset_id,steering):
1680 """ 1681 Get steering dependencies from database 1682 @param dataset_id: ID of the run whose configuration we whish to download 1683 """ 1684 sql = "SELECT * FROM steering_dependency WHERE dataset_id = '%s'" % dataset_id 1685 cursor = self.getcursor() 1686 cursor.execute(sql) 1687 result_set = cursor.fetchall(); 1688 1689 for p in result_set: 1690 steering.AddDependency(p['filename']) 1691
1692 - def download_tasks(self,dataset_id,steering):
1693 """ 1694 Get job parts from database 1695 @param dataset_id: ID of the run whose configuration we whish to download 1696 """ 1697 1698 sql = "SELECT task_def_id,name,reqs,parallel,photonics FROM task_def" \ 1699 + " WHERE dataset_id = %s ORDER BY task_def_id" 1700 cursor = self.getcursor() 1701 cursor.execute(sql, (dataset_id,)) 1702 results = cursor.fetchall() 1703 1704 tray_sql = "SELECT idx,CONVERT(GROUP_CONCAT(iter),char) AS iters" \ 1705 + " FROM task_def_tray WHERE task_def_id = %s" \ 1706 + " GROUP BY idx,task_def_id" 1707 parent_sql = "SELECT name FROM task_def,task_def_rel" \ 1708 + " WHERE child_task_def_id = %s" \ 1709 + " AND parent_task_def_id = task_def_id" 1710 child_sql = "SELECT name FROM task_def,task_def_rel" \ 1711 + " WHERE parent_task_def_id = %s" \ 1712 + " AND child_task_def_id = task_def_id" 1713 1714 for row in results: 1715 id = row['task_def_id'] 1716 name = row['name'] 1717 reqs = row['reqs'] 1718 parallel = row['parallel'] 1719 photonics = row['photonics'] 1720 1721 td = TaskDefinition(name,id) 1722 td.SetRequirements(reqs) 1723 td.SetParallelExecution(parallel) 1724 td.SetUsesPhotonics(photonics) 1725 1726 self.logger.debug(tray_sql % id) 1727 cursor.execute(tray_sql, (id,)) 1728 trays = cursor.fetchall() 1729 for tray in trays: 1730 td.AddTray(tray['idx'], tray['iters']) 1731 1732 cursor.execute(parent_sql, (id,)) 1733 parents = cursor.fetchall() 1734 for parent in parents: 1735 td.AddParent(parent['name']) 1736 1737 cursor.execute(child_sql, (id,)) 1738 children = cursor.fetchall() 1739 for child in children: 1740 td.AddChild(child['name']) 1741 1742 steering.AddTaskDefinition(td)
1743
1744 - def fetch_batch_options(self,dataset_id,steering):
1745 """ 1746 Fetch batch system options from database 1747 @param dataset_id: ID of the run whose configuration we whish to download 1748 """ 1749 batchopts = [] 1750 sql = "SELECT * FROM batch_option WHERE dataset_id = '%s'" % dataset_id 1751 cursor = self.getcursor() 1752 cursor.execute(sql) 1753 result_set = cursor.fetchall() 1754 for b in result_set: 1755 opt = BatchOpt() 1756 opt.SetName(b['name']) 1757 opt.SetType(b['type']) 1758 opt.SetValue(b['value']) 1759 batchopts.append(opt) 1760 steering.AddBatchOpt(opt) 1761 return batchopts 1762
1763 - def download_batch_options(self,dataset_id,steering):
1764 """ 1765 Get batch system options from database 1766 @param dataset_id: ID of the run whose configuration we whish to download 1767 """ 1768 sql = "SELECT * FROM batch_option WHERE dataset_id = '%s'" % dataset_id 1769 cursor = self.getcursor() 1770 cursor.execute(sql) 1771 result_set = cursor.fetchall(); 1772 1773 for b in result_set: 1774 opt = BatchOpt() 1775 opt.SetName(b['name']) 1776 opt.SetType(b['type']) 1777 opt.SetValue(b['value']) 1778 steering.AddBatchOpt(opt) 1779 1780
1781 - def download_modules(self,dataset_id, 1782 tray_index, 1783 i3config, 1784 type='module', 1785 iptype='tray', 1786 include_defaults=False, 1787 include_description=False):
1788 """ 1789 Get modules from the database. 1790 @param dataset_id: ID of the run whose configuration we whish to download 1791 """ 1792 sql = " SELECT module.class,module.module_id,module_pivot.module_pivot_id," 1793 sql += " module_pivot.name,module_pivot.load_index " 1794 sql += " FROM module,module_pivot " 1795 sql += " WHERE module_pivot.dataset_id = %d " % dataset_id 1796 sql += " AND module.module_type = '%s' " % type 1797 sql += " AND module_pivot.iptype = '%s' " % iptype 1798 sql += " AND module.module_id = module_pivot.module_id " 1799 sql += " AND module_pivot.tray_index = %d " % tray_index 1800 sql += " ORDER BY load_index " 1801 1802 cursor = self.getcursor() 1803 self.logger.debug(sql.strip()) 1804 cursor.execute(sql.strip()) 1805 result_set = cursor.fetchall(); 1806 for m in result_set: 1807 mod = Service() 1808 1809 mod.SetClass(m['class']) 1810 mod.SetName(m['name']) 1811 mod.SetId(m['module_id']) 1812 module_pivot_id = m['module_pivot_id'] 1813 if type == 'module': 1814 i3config.AddModule(mod) 1815 elif type == 'service': 1816 i3config.AddService(mod) 1817 elif type == 'iceprod': 1818 if iptype == 'pre': 1819 i3config.AddIceProdPre(mod) 1820 elif iptype == 'post': 1821 i3config.AddIceProdPost(mod) 1822 1823 if type in ['module','service']: 1824 for p in self.fetch_module_dependencies(mod.GetId()): 1825 project = i3config.GetProject(p.GetName()) 1826 if not project == None: 1827 mod.AddProject(project.GetName(),project) 1828 else: 1829 self.logger.warn('could not find dependency \'%s\'' % p.GetName() ) 1830 i3config.AddProject(p.GetName(),p) 1831 mod.AddProject(p.GetName(),p) 1832 1833 self.download_params(mod,module_pivot_id,dataset_id,include_defaults,include_description) 1834 1835
1836 - def download_services(self,dataset_id,tray_index,i3config, 1837 include_defaults=False,include_description=False):
1838 """ 1839 Download services from the database. 1840 @param dataset_id: ID of the run whose configuration we whish to download 1841 """ 1842 return self.download_modules(dataset_id, 1843 tray_index, 1844 i3config, 1845 type='service', 1846 include_defaults=include_defaults, 1847 include_description=include_description) 1848
1849 - def download_pre(self,dataset_id,tray_index,i3config, 1850 include_defaults=False,include_description=False):
1851 """ 1852 Download IceProdPre modules from the database. 1853 @param dataset_id: ID of the run whose configuration we whish to download 1854 """ 1855 return self.download_modules(dataset_id, 1856 tray_index, 1857 i3config, 1858 type='iceprod', 1859 iptype='pre', 1860 include_defaults=include_defaults, 1861 include_description=include_description) 1862
1863 - def download_post(self,dataset_id,tray_index,i3config, 1864 include_defaults=False, 1865 include_description=False):
1866 """ 1867 Download IceProdPost modules from the database. 1868 @param dataset_id: ID of the run whose configuration we whish to download 1869 """ 1870 return self.download_modules(dataset_id, 1871 tray_index, 1872 i3config, 1873 type='iceprod', 1874 iptype='post', 1875 include_defaults=include_defaults, 1876 include_description=include_description) 1877
1878 - def select_array(self,pid):
1879 cursor = self.getcursor() 1880 sql = " SELECT * from carray_element " 1881 sql += " WHERE cparameter_id = %d " % pid 1882 cursor.execute (sql.strip()) 1883 result_set = cursor.fetchall(); 1884 vect = [] 1885 for item in result_set: 1886 vect.append(Value(item['value'],self.nonify(item['unit']))) 1887 return vect 1888
1889 - def select_omkey(self,pid):
1890 omkeys = self.select_omkey_array(pid) 1891 if len(omkeys) < 1: 1892 raise Exception,'could not find omkey for param %d' % pid 1893 return omkeys[0] 1894
1895 - def select_omkey_array(self,pid):
1896 cursor = self.getcursor() 1897 sql = " SELECT * from carray_element " 1898 sql += " WHERE cparameter_id = %d order by carray_element_id" % pid 1899 cursor.execute (sql.strip()) 1900 result_set = cursor.fetchall(); 1901 omkeyvect = [] 1902 for item in result_set: 1903 if item['name'] == 'stringid': 1904 omkey = pyOMKey(0,0) 1905 omkey.stringid = item['value'] 1906 elif item['name'] == 'omid': 1907 omkey.omid = item['value'] 1908 omkeyvect.append(omkey) 1909 else: 1910 raise Exception,'expected omkey but found %s' % result_set[1]['name'] 1911 return omkeyvect 1912
1913 - def download_params(self,module,mod_id,dataset_id,include_defaults=False, include_description=False):
1914 """ 1915 Download module parameters from the database. 1916 @param mod_id: index corresponding to module table 1917 """ 1918 paramdict = {} 1919 # First include default parameters 1920 if include_defaults or include_description: 1921 sql = " SELECT * FROM parameter " 1922 sql += " WHERE module_id = %d" % module.GetId() 1923 sql += " ORDER BY name " 1924 cursor = self.getcursor() 1925 cursor.execute(sql) 1926 result_set = cursor.fetchall(); 1927 for p in result_set: 1928 param = Parameter() 1929 param.SetType(p['type']) 1930 param.SetName(p['name']) 1931 pid = p['parameter_id'] 1932 if param.GetType() == 'OMKeyv': 1933 param.SetValue(self.select_omkey_array(pid)) 1934 elif param.GetType() == 'OMKey': 1935 param.SetValue(self.select_omkey(pid)) 1936 elif param.GetType() in VectorTypes: 1937 param.SetValue(self.select_array(pid)) 1938 else: 1939 param.SetValue(Value(p['value'],self.nonify(p['unit']))) 1940 param.SetUnit(self.nonify(p['unit'])) 1941 param.SetDefault(True) 1942 param.SetDefault(param.GetValue()) 1943 paramdict[param.GetName().lower()] = param 1944 if include_description: 1945 param.SetDescription(p['description']) 1946 1947 1948 # Then override with configured values 1949 sql = " SELECT * FROM cparameter " 1950 sql += " WHERE module_pivot_id = '%s'" % mod_id 1951 sql += " AND dataset_id = '%s'" % dataset_id 1952 sql += " ORDER BY name " 1953 cursor = self.getcursor() 1954 cursor.execute(sql) 1955 result_set = cursor.fetchall(); 1956 1957 for p in result_set: 1958 param = Parameter() 1959 param.SetType(p['type']) 1960 param.SetName(p['name']) 1961 pid = p['cparameter_id'] 1962 if param.GetType() == 'OMKeyv': 1963 param.SetValue(self.select_omkey_array(pid)) 1964 elif param.GetType() == 'OMKey': 1965 param.SetValue(self.select_omkey(pid)) 1966 elif param.GetType() in VectorTypes: 1967 param.SetValue(self.select_array(pid)) 1968 else: 1969 param.SetValue(Value(p['value'],self.nonify(p['unit']))) 1970 param.SetUnit(self.nonify(p['unit'])) 1971 if paramdict.has_key(param.GetName().lower()): 1972 param.SetDefault(paramdict[param.GetName().lower()].GetDefault()) 1973 paramdict[param.GetName().lower()] = param 1974 1975 # now add all parameters to the module 1976 for param in paramdict.values(): 1977 module.AddParameter(param) 1978 1979
1980 - def GetGridId(self,grid_name,module_name,on,institution='unknown',batchsys='unknown',url=''):
1981 """ 1982 Retrieve the key for grid_name 1983 """ 1984 ver = iceprod.__version__ 1985 if not self.isconnected(): self.connect() 1986 cursor = self.getcursor() 1987 sql = " SELECT grid_id FROM grid WHERE name='%s' " % grid_name 1988 cursor.execute(sql) 1989 result = cursor.fetchone() 1990 if result: 1991 grid_id = result['grid_id'] 1992 else: 1993 sql = " INSERT IGNORE INTO grid (name,institution,batchsys,version) " 1994 sql += " VALUES ('%s','%s','%s','%s') " % (grid_name,institution,batchsys,ver) 1995 cursor.execute(sql) 1996 grid_id = self.insert_id() 1997 1998 # Make sure that iceprod version has entry in projects table 1999 vertuple = ver.split('.') 2000 if len(vertuple) < 3: 2001 vertuple= (0,0,0) 2002 sql = " INSERT IGNORE INTO project (name,major_version,minor_version,patch_version,versiontxt) " 2003 sql += " VALUES ('iceprod','%s','%s','%s','%s') " % (vertuple[0],vertuple[1],vertuple[2],ver) 2004 cursor.execute(sql) 2005 2006 sql = " UPDATE grid SET " 2007 sql += " institution = '%s', " % institution 2008 sql += " batchsys = '%s', " % batchsys 2009 sql += " version = '%s', " % ver 2010 if url: 2011 sql += " url = '%s', " % url 2012 if on: 2013 sql += " %s='RUNNING' " % module_name 2014 else: 2015 sql += " %s='STOPPED' " % module_name 2016 sql += " WHERE grid_id=%d " % grid_id 2017 cursor.execute(sql) 2018 self.commit() 2019 return grid_id
2020
2021 - def CheckVersion(self):
2022 """ 2023 Check latest version of software and see if we need to upgrade 2024 """ 2025 ver = iceprod.__version__ 2026 ver = ver.replace('V','').split('-') 2027 if len(ver) < 3: return None 2028 2029 if not self.isconnected(): self.connect() 2030 cursor = self.getcursor() 2031 sql = " SELECT * FROM svn " 2032 sql += " WHERE major='%s' " % ver[0] 2033 sql += " AND minor='%s' " % ver[0] 2034 cursor.execute(sql) 2035 result = cursor.fetchone() 2036 try: 2037 if int(ver[2]) < int(result['patch']): 2038 return result['url'] 2039 except: pass 2040 return None
2041 2042
2043 - def InitializeGridStats(self,grids,dataset_id):
2044 """ 2045 Insert grid_statistics entries for grids which should run this 2046 dataset. 2047 @param grids: list of grids or clusters 2048 @param dataset_id: dataset id 2049 """ 2050 2051 delim = "" 2052 sql = " SELECT grid_id FROM grid WHERE " 2053 for grid in grids: 2054 sql += "%s name = '%s' " % (delim,grid) 2055 delim = "OR" 2056 cursor = self.getcursor() 2057 cursor.execute(sql) 2058 result_set = cursor.fetchall() 2059 2060 if len(result_set) == 0: 2061 self.logger.error("could not match grid name '%s'" % ":".join(grids)) 2062 return 2063 2064 delim = "" 2065 sql = " INSERT INTO grid_statistics " 2066 sql += " (grid_id,dataset_id) VALUES " 2067 for item in result_set: 2068 sql += " %s (%d,%d) " % ( delim, item['grid_id'], dataset_id) 2069 delim = "," 2070 cursor = self.getcursor() 2071 cursor.execute(sql) 2072 self.commit() 2073
2074 - def InitializeJobTable(self,maxjobs,dataset_id,priority=0,stepsize=1000, start_qid=0):
2075 """ 2076 Create job monitoring entries in database 2077 """ 2078 cursor = self.getcursor() 2079 2080 sql = " INSERT INTO job " 2081 sql += " (queue_id,status,dataset_id,priority,status_changed) VALUES " 2082 2083 qstart = start_qid 2084 qend = start_qid + maxjobs 2085 for i in range(qstart,qend,min(stepsize,maxjobs)): 2086 comma = "" 2087 sql1 = sql 2088 for job in range(i,min(i+stepsize,qend)): 2089 sql1 += comma + " (%d,'WAITING',%d,%d,NOW()) " % ( job, dataset_id, priority ) 2090 comma = "," 2091 cursor.execute(sql1) 2092 self.commit() 2093 2094
2095 - def validate(self,dataset_id,status='TRUE'):
2096 """ 2097 Mark dataset as visible and valid. 2098 """ 2099 cursor = self.getcursor() 2100 sql = " UPDATE dataset SET verified = '%s' " % status 2101 sql += " WHERE dataset_id = %d " % dataset_id 2102 self.logger.debug(sql) 2103 cursor.execute(sql) 2104 self.commit()
2105 2106
2107 - def download_metadata(self,dataset_id):
2108 2109 cursor = self.getcursor() 2110 dp = DIF_Plus() 2111 2112 sql = " SELECT * FROM dif " 2113 sql += " WHERE dataset_id = %d " % dataset_id 2114 cursor.execute(sql) 2115 row = cursor.fetchone(); 2116 if not row: return dp 2117 dif = dp.GetDIF() 2118 dif.SetParameters(row['parameters']) 2119 dif.SetEntryTitle(row['entry_title']) 2120 dif.SetSummary(row['summary']) 2121 dif.SetSourceName(row['source_name']) 2122 dif.SetSensorName(row['sensorname']) 2123 td = time.strptime(str(row['dif_creation_date']),"%Y-%m-%d %H:%M:%S") 2124 td = time.strftime("%Y-%m-%d",td) 2125 dif.SetDIFCreationDate(td) 2126 2127 sql = " SELECT * FROM plus " 2128 sql += " WHERE dataset_id = %d " % dataset_id 2129 cursor.execute(sql) 2130 row = cursor.fetchone(); 2131 plus = dp.GetPlus() 2132 ts = time.strptime(str(row['start_datetime']),"%Y-%m-%d %H:%M:%S") 2133 ts = time.strftime("%Y-%m-%dT%H:%M:%S",ts) 2134 plus.SetStartDatetime(ts) 2135 2136 te = time.strptime(str(row['end_datetime']),"%Y-%m-%d %H:%M:%S") 2137 te = time.strftime("%Y-%m-%dT%H:%M:%S",te) 2138 plus.SetEndDatetime(te) 2139 plus.SetCategory(row['category']) 2140 plus.SetSubCategory(row['subcategory']) 2141 plus.SetSimDBKey(dataset_id) 2142 plus.SetI3DBKey(row['i3db_key']) 2143 plus.SetSteeringFile(row['steering_file']) 2144 for project in self.fetch_project_list(dataset_id): 2145 plus.AddProject(project) 2146 2147 return dp 2148
2149 - def upload_metadata(self,dataset_id,difplus):
2150 2151 cursor = self.getcursor() 2152 2153 dif = difplus.GetDIF() 2154 sql = " INSERT INTO dif " 2155 sql += " (dataset_id,parameters,entry_title,summary, " 2156 sql += " source_name,sensorname,dif_creation_date) " 2157 sql += " VALUES ( %d, " % dataset_id 2158 sql += "'%s'," % str(dif.GetParameters()) 2159 sql += "'%s'," % str(dif.GetEntryTitle()) 2160 sql += "'%s'," % str(dif.GetSummary()) 2161 sql += "'%s'," % str(dif.GetSourceName()) 2162 sql += "'%s'," % str(dif.GetSensorName()) 2163 sql += "'%s')" % str(dif.GetDIFCreationDate()) 2164 self.logger.debug(sql) 2165 cursor.execute(sql) 2166 2167 plus = difplus.GetPlus() 2168 sql = " INSERT INTO plus " 2169 sql += " (dataset_id,start_datetime,end_datetime," 2170 sql += " category,subcategory,i3db_key,steering_file) " 2171 sql += " VALUES ( %d, " % dataset_id 2172 sql += "'%s'," % str(plus.GetStartDatetime()) 2173 sql += "'%s'," % str(plus.GetEndDatetime()) 2174 sql += "'%s'," % str(plus.GetCategory()) 2175 sql += "'%s'," % str(plus.GetSubCategory()) 2176 sql += "%s," % (plus.GetI3DBKey() or 'NULL') 2177 sql += "'%s')" % str(plus.GetSteeringFile()) 2178 self.logger.debug(sql) 2179 cursor.execute(sql) 2180 2181 self.commit() 2182
2183 - def set_metadata_subcat(self,dataset_id,sub_cat):
2184 """ 2185 Change Plus:subcategory in DIFPlus metadata 2186 """ 2187 cursor = self.getcursor() 2188 sql = " UPDATE plus " 2189 sql += " SET subcategory='%s' " % sub_cat 2190 sql += " WHERE dataset_id=%d " % dataset_id 2191 self.logger.debug(sql) 2192 cursor.execute(sql) 2193 self.commit() 2194 2195
2196 - def load_filelist(self,odict,dataset_id=0):
2197 """ 2198 load a list of files for filtering. 2199 @param dataset_id: the dataset id that dictonary is bound to 2200 @param odict: the dictionary to load 2201 """ 2202 cursor = self.getcursor() 2203 index = 0 2204 list = odict.keys()[ 2205 min(index,len(odict.keys())): 2206 min(index+100,len(odict.keys()))] 2207 while list: 2208 sql = " INSERT INTO file (file_key,path,subdir,filename,dataset_id) " 2209 sql += " VALUES " 2210 cm = '' 2211 for key in list: 2212 file = odict[key] 2213 sql += " %s ( %d, '%s', '%s', '%s', %d ) " % \ 2214 (cm,key,file[0],file[1],file[2],dataset_id) 2215 cm = ',' 2216 index = index + 100 2217 list = odict.keys()[ 2218 min(index,len(odict.keys())): 2219 min(index+100,len(odict.keys()))] 2220 self.logger.debug(sql) 2221 cursor.execute(sql) 2222 self.commit()
2223 2224 2225
2226 -class MonitorDB(IceProdDB):
2227 2228 logger = logging.getLogger('MonitorDB') 2229
2230 - def __init__(self):
2231 """ 2232 Constructor 2233 """ 2234 IceProdDB.__init__(self) 2235 self.maxsuspend = 20
2236
2237 - def new(self):
2238 """ 2239 Create a copy of this instance 2240 """ 2241 newconn = MonitorDB() 2242 newconn.host_ = self.host_ 2243 newconn.usr_ = self.usr_ 2244 newconn.passwd_ = self.passwd_ 2245 newconn.db_ = self.db_ 2246 newconn._connected = False 2247 return newconn
2248 2249
2250 - def reset_old_jobs(self, 2251 grid_id, 2252 maxidletime, 2253 maxruntime, 2254 maxsubmittime, 2255 maxcopytime, 2256 maxfailures=10, 2257 maxevicttime=10, 2258 keepalive=14400):
2259 """ 2260 reset status of jobs that where queued but who's status 2261 has not changed in more that maxtime minutes 2262 2263 @param grid_id: id of current cluster 2264 @param maxruntime: maximum run time for jobs 2265 @param maxsubmittime: maximum submit time for jobs 2266 @param maxcopytime: maximum time for jobs to be in 'copying' state 2267 @param maxfailures: maximum number of time a job is allowd to fail 2268 @param keepalive: how often should server expect to hear from jobs 2269 """ 2270 2271 totalrows = 0 2272 cursor = self.getcursor() 2273 passkey = self.mkkey(6,9) 2274 2275 # jobs in QUEUED state 2276 self.logger.debug("reseting stale queued jobs...") 2277 sql = " UPDATE job SET " 2278 sql += " errormessage=CONCAT( " 2279 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0] 2280 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), " 2281 sql += " prev_state = status, " 2282 sql += " status='RESET', " 2283 sql += " passkey='%s', " % passkey 2284 sql += " status_changed=NOW() " 2285 sql += " WHERE grid_id=%d " % grid_id 2286 sql += " AND status='QUEUED' " 2287 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxidletime 2288 sql += " LIMIT 20 " 2289 self.logger.debug(sql) 2290 cursor.execute(sql) 2291 rowcount = self._conn.affected_rows() 2292 totalrows += rowcount 2293 self.commit() 2294 self.logger.debug('Reset %d queued jobs' % rowcount) 2295 2296 # jobs in QUEUING state 2297 self.logger.debug("reseting stale queuing jobs...") 2298 sql = " UPDATE job SET " 2299 sql += " errormessage=CONCAT( " 2300 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0] 2301 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), " 2302 sql += " prev_state = status, " 2303 sql += " status='RESET', " 2304 sql += " passkey='%s', " % passkey 2305 sql += " status_changed=NOW() " 2306 sql += " WHERE grid_id=%d " % grid_id 2307 sql += " AND status='QUEUEING' " 2308 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxsubmittime 2309 sql += " LIMIT 20 " 2310 cursor.execute(sql) 2311 rowcount = self._conn.affected_rows() 2312 totalrows += rowcount 2313 self.commit() 2314 self.logger.debug('Reset %d queueing jobs' % rowcount) 2315 2316 # jobs in CLEANING state 2317 self.logger.debug("reseting stale cleaning jobs...") 2318 sql = " UPDATE job SET " 2319 sql += " errormessage=CONCAT( " 2320 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0] 2321 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), " 2322 sql += " status=prev_state, " 2323 sql += " prev_state = 'CLEANING', " 2324 sql += " passkey='%s', " % passkey 2325 sql += " status_changed=NOW() " 2326 sql += " WHERE grid_id=%d " % grid_id 2327 sql += " AND status='CLEANING' " 2328 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxsubmittime 2329 sql += " LIMIT 20 " 2330 cursor.execute(sql) 2331 rowcount = self._conn.affected_rows() 2332 totalrows += rowcount 2333 self.commit() 2334 self.logger.debug('Reset %d cleaning jobs' % rowcount) 2335 2336 # jobs in PROCESSING state 2337 self.logger.debug("reseting stale processing jobs...") 2338 sql = " UPDATE job SET " 2339 sql += " errormessage=CONCAT( " 2340 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0] 2341 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), " 2342 sql += " prev_state = status, " 2343 sql += " status='RESET', " 2344 sql += " passkey='%s', " % passkey 2345 sql += " status_changed=NOW() " 2346 sql += " WHERE grid_id=%d " % grid_id 2347 sql += " AND status='PROCESSING' " 2348 timeout = maxruntime 2349 if keepalive > 0: 2350 timeout = min(maxruntime,keepalive) 2351 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % timeout 2352 sql += " LIMIT 20 " 2353 cursor.execute(sql) 2354 rowcount = self._conn.affected_rows() 2355 totalrows += rowcount 2356 self.commit() 2357 self.logger.debug('Reset %d processing jobs' % rowcount) 2358 2359 # jobs in EVICTED state 2360 self.logger.debug("reseting evicted jobs...") 2361 sql = " UPDATE job SET " 2362 sql += " errormessage=CONCAT( " 2363 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0] 2364 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), " 2365 sql += " prev_state = status, " 2366 sql += " status='RESET', " 2367 sql += " passkey='%s', " % passkey 2368 sql += " status_changed=NOW() " 2369 sql += " WHERE grid_id=%d " % grid_id 2370 sql += " AND status='EVICTED' " 2371 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxevicttime 2372 sql += " LIMIT 20 " 2373 cursor.execute(sql) 2374 rowcount = self._conn.affected_rows() 2375 totalrows += rowcount 2376 self.commit() 2377 self.logger.debug('Reset %d evicted jobs' % rowcount) 2378 2379 # jobs in COPYING state 2380 self.logger.debug("reseting stale copying jobs...") 2381 sql = " UPDATE job SET " 2382 sql += " errormessage=CONCAT( " 2383 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0] 2384 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), " 2385 sql += " prev_state = status, " 2386 sql += " status='RESET', " 2387 sql += " passkey='%s', " % passkey 2388 sql += " status_changed=NOW() " 2389 sql += " WHERE grid_id=%d " % grid_id 2390 sql += " AND status='COPYING' " 2391 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxcopytime 2392 sql += " LIMIT 100 " 2393 cursor.execute(sql) 2394 rowcount = self._conn.affected_rows() 2395 totalrows += rowcount 2396 self.commit() 2397 self.logger.debug('Reset %d copying jobs' % rowcount) 2398 self.logger.info('Reset %d jobs' % totalrows) 2399 2400 # suspend jobs with too many errors 2401 self.logger.debug("suspending jobs with too many errors...") 2402 sql = " UPDATE job SET " 2403 sql += " errormessage=CONCAT('too many errors.',errormessage), " 2404 sql += " prev_state = status, " 2405 sql += " status='FAILED', " 2406 sql += " passkey='%s', " % passkey 2407 sql += " status_changed=NOW() " 2408 sql += " WHERE grid_id=%d " % grid_id 2409 sql += " AND status != 'SUSPENDED' " 2410 sql += " AND status != 'FAILED' " 2411 sql += " AND status != 'OK' " 2412 sql += " AND job.failures > %d " % maxfailures 2413 sql += " LIMIT 2000 " 2414 cursor.execute(sql) 2415 rowcount = self._conn.affected_rows() 2416 self.commit() 2417 if rowcount > 0: 2418 self.logger.info('Suspended %d jobs with too many errors' % rowcount)
2419
2420 - def UpdateStats(self):
2421 """ 2422 Get a list of new datasets. 2423 """ 2424 cursor = self.getcursor() 2425 sql = " SELECT SQL_NO_CACHE dataset_id, IFNULL(grid_id, 0) " 2426 sql += " AS grid_id, name, SUM(amount) AS amount " 2427 sql += " FROM dataset_statistics_log " 2428 sql += " GROUP BY dataset_id, grid_id, name " 2429 cursor.execute(sql) 2430 sets = cursor.fetchall(); 2431 2432 metaprojects = {}; 2433 simcats = {}; 2434 for ref in sets: 2435 dataset_id = ref['dataset_id']; 2436 grid_id = ref['grid_id']; 2437 name = ref['name']; 2438 amount = ref['amount']; 2439 2440 if not metaprojects.has_key(dataset_id): 2441 metaprojects[dataset_id] = []; 2442 cursor.execute( 2443 " SELECT DISTINCT metaproject_id" 2444 + " FROM metaproject_pivot" 2445 + " WHERE dataset_id = %u " % dataset_id 2446 + " LIMIT 1"); 2447 mp_sth = cursor.fetchall(); 2448 for mp_ref in mp_sth: 2449 metaprojects[dataset_id].append(mp_ref['metaproject_id']); 2450 2451 if not simcats.has_key(dataset_id): 2452 simcats[dataset_id] = []; 2453 cursor.execute( 2454 " SELECT simcat_id" 2455 + " FROM dataset" 2456 + " WHERE dataset_id = %u " % dataset_id); 2457 sc_sth = cursor.fetchall(); 2458 for sc_ref in sc_sth: 2459 simcats[dataset_id].append(sc_ref['simcat_id']); 2460 2461 if '0.00' != '%.2f' % amount: 2462 for mp_id in metaprojects[dataset_id]: 2463 for sc_id in simcats[dataset_id]: 2464 sql = " INSERT INTO dataset_statistics_mv" 2465 sql += " (dataset_id,metaproject_id,simcat_id,grid_id,name,value)" 2466 sql += " VALUES (%s,%s,%s,%s,%s,%s)" 2467 sql += " ON DUPLICATE KEY UPDATE value=value+%s"; 2468 cursor.execute(sql,[dataset_id,mp_id,sc_id,grid_id,name,amount,amount]); 2469 2470 cursor.execute("DELETE FROM dataset_statistics_log"); 2471 self.commit()
2472 2473 2474
2475 - def getNewSets(self,grid=None,dataset=0):
2476 """ 2477 Get a list of new datasets. 2478 """ 2479 cursor = self.getcursor() 2480 sql = " SELECT dataset.dataset_id,dataset.jobs_submitted, " 2481 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source, " 2482 sql += " dif.sensorname as sensor, " 2483 sql += " YEAR(plus.start_datetime) as year, " 2484 sql += " plus.category, plus.subcategory,dataset.hist " 2485 sql += " FROM dataset,plus,dif " 2486 if grid: 2487 sql += ",grid_statistics " 2488 sql += " WHERE dataset.dataset_id = plus.dataset_id " 2489 sql += " AND dataset.dataset_id=dif.dataset_id " 2490 if grid: 2491 sql += " AND grid_statistics.dataset_id = dataset.dataset_id " 2492 sql += " AND grid_statistics.grid_id = %d " % grid 2493 if dataset > 0: 2494 sql += " AND dataset.dataset_id = %d " % dataset 2495 else: 2496 sql += " AND dataset.status='PROCESSING' " 2497 self.logger.debug(sql) 2498 cursor.execute(sql) 2499 sets = cursor.fetchall(); 2500 self.commit() 2501 for set in sets: 2502 set['sensor'] = set['sensor'].replace('ICECUBE','IceCube').replace('IceTop','IceTop') 2503 return sets
2504
2505 - def getNewFinishedSets(self,dataset=0):
2506 """ 2507 Get a list of finished dataset for which no histos have been created. 2508 """ 2509 cursor = self.getcursor() 2510 sql = " SELECT dataset.dataset_id,dataset.jobs_submitted, " 2511 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source, " 2512 sql += " dif.sensorname as sensor, " 2513 sql += " YEAR(plus.start_datetime) as year, " 2514 sql += " plus.category, plus.subcategory " 2515 sql += " FROM dataset,plus,dif " 2516 sql += " WHERE dataset.dataset_id = plus.dataset_id " 2517 sql += " AND dataset.dataset_id=dif.dataset_id " 2518 sql += " AND dataset.status='COMPLETE' " 2519 if dataset: 2520 sql += " AND dataset.dataset_id = %d " % dataset 2521 else: 2522 sql += " AND dataset.hist=1 " 2523 self.logger.debug(sql) 2524 cursor.execute(sql) 2525 sets = cursor.fetchall(); 2526 self.commit() 2527 for set in sets: 2528 set['sensor'] = set['sensor'].replace('ICECUBE','IceCube').replace('IceTop','IceTop') 2529 return sets
2530
2531 - def getFinishedSets(self,datasetlist=[]):
2532 """ 2533 Get a list of finished dataset for which no histos have been created. 2534 """ 2535 sets = [] 2536 if datasetlist: 2537 cursor = self.getcursor() 2538 sql = " SELECT dataset.* FROM dataset " 2539 sql += " WHERE dataset.dataset_id " 2540 sql += " IN (%s) " % ",".join(map(str,datasetlist)) 2541 sql += " AND dataset.status != 'PROCESSING' " 2542 self.logger.debug(sql) 2543 cursor.execute(sql) 2544 sets = cursor.fetchall(); 2545 self.commit() 2546 return sets
2547
2548 - def getSetsInfo(self,dataset_id):
2549 """ 2550 Get a list of new datasets. 2551 """ 2552 cursor = self.getcursor() 2553 sql = " SELECT dataset.dataset_id,dataset.jobs_submitted, " 2554 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source, " 2555 sql += " dif.sensorname as sensor, " 2556 sql += " YEAR(plus.start_datetime) as year, " 2557 sql += " plus.category, plus.subcategory " 2558 sql += " FROM dataset,plus,dif " 2559 sql += " WHERE dataset.dataset_id = plus.dataset_id " 2560 sql += " AND dataset.dataset_id=dif.dataset_id " 2561 sql += " AND dataset.status='COMPLETE' AND dataset.hist=0 " 2562 sql += " AND dataset.dataset_id = %d " % dataset_id 2563 self.logger.debug(sql) 2564 cursor.execute(sql) 2565 set = cursor.fetchone(); 2566 set['sensor'] = set['sensor'].replace('ICECUBE','IceCube').replace('IceTop','IceTop') 2567 self.commit() 2568 return set
2569
2570 - def AddedHisto(self,dataset):
2571 cursor = self.getcursor() 2572 sql = " UPDATE dataset " 2573 sql += " SET dataset.hist=0 " 2574 sql += " WHERE dataset.dataset_id = %d " % dataset 2575 sql += " AND dataset.hist=1 " 2576 self.logger.debug(sql) 2577 cursor.execute(sql) 2578 self.commit() 2579 return
2580
2581 - def update_monitoring(self,grid_id,dataset_id=0):
2582 """ 2583 Update statistics for datasets and return all dataset which 2584 have completed 2585 """ 2586 2587 finished_sets = [] 2588 cursor = self.getcursor() 2589 2590 self.logger.debug("updating monitoring tables") 2591 sql = " SELECT job.dataset_id, " 2592 sql += " job.grid_id, " 2593 sql += " SUM(job.time_real) AS real_time, " 2594 sql += " SUM(job.time_sys) AS sys_time, " 2595 sql += " SUM(job.time_user) AS user_time, " 2596 sql += " SUM(job.mem_heap) AS heap_mem, " 2597 sql += " SUM(job.mem_heap_peak) AS heap_mem_peak, " 2598 sql += " SUM(job.mem_stack_peak) AS stack_mem_peak, " 2599 sql += " SUM(job.nevents) AS events, " 2600 sql += " SUM(job.gevents) AS gevents, " 2601 sql += " SUM(job.evictions) AS sevictions " 2602 sql += " FROM job,dataset " 2603 sql += " WHERE job.status = 'OK' " 2604 sql += " AND dataset.dataset_id = job.dataset_id " 2605 if dataset_id: 2606 sql += " AND dataset.dataset_id = %d " % dataset_id 2607 sql += " GROUP BY job.grid_id " 2608 else: 2609 sql += " AND dataset.status = 'PROCESSING' " 2610 sql += " AND dataset.jobs_completed >= dataset.jobs_submitted " 2611 sql += " GROUP BY job.dataset_id, job.grid_id " 2612 cursor.execute(sql) 2613 result_set = cursor.fetchall(); 2614 self.commit() 2615 2616 #self.logger.debug('updating stats for %d datasets' % len(result_set)) 2617 #for entry in result_set: 2618 # sql = " UPDATE grid_statistics SET " 2619 # sql += " time_real = %g, " % entry['real_time'] 2620 # sql += " time_sys = %g, " % entry['sys_time'] 2621 # sql += " time_user = %g, " % entry['user_time'] 2622 # sql += " mem_heap = %g, " % entry['heap_mem'] 2623 # sql += " mem_heap_peak = %g, " % entry['heap_mem_peak'] 2624 # sql += " mem_stack_peak = %g, " % entry['stack_mem_peak'] 2625 # sql += " nevents = %d " % self.intcast(entry['events']) 2626 # sql += " WHERE dataset_id = %d " % entry['dataset_id'] 2627 # sql += " AND grid_id = %d " % entry['grid_id'] 2628 # cursor.execute(sql) 2629 # self.commit() 2630 2631 sql = " SELECT job.dataset_id, " 2632 sql += " MAX(job.status_changed) AS enddate, " 2633 sql += " SUM(1) AS jobs_submitted, " 2634 sql += " SUM(job.status='OK' ) AS jobs_completed, " 2635 sql += " SUM(job.status='FAILED') AS jobs_failed, " 2636 sql += " SUM(job.status='SUSPENDED') AS jobs_suspended, " 2637 sql += " SUM(job.time_real) AS real_time, " 2638 sql += " SUM(job.time_sys) AS sys_time, " 2639 sql += " SUM(job.time_user) AS user_time, " 2640 sql += " SUM(job.mem_heap) AS heap_mem, " 2641 sql += " SUM(job.mem_heap_peak) AS heap_mem_peak, " 2642 sql += " SUM(job.mem_stack_peak) AS stack_mem_peak, " 2643 sql += " SUM(job.nevents) AS events, " 2644 sql += " SUM(job.gevents) AS gevents, " 2645 sql += " SUM(job.evictions) AS sevictions, " 2646 sql += " grid_statistics.debug AS debug " 2647 sql += " FROM job, dataset, grid_statistics " 2648 sql += " WHERE dataset.dataset_id = job.dataset_id " 2649 sql += " AND dataset.dataset_id = grid_statistics.dataset_id " 2650 sql += " AND grid_statistics.grid_id = %d " % grid_id 2651 if dataset_id: 2652 sql += " AND dataset.dataset_id = %d " % dataset_id 2653 else: 2654 sql += " AND dataset.status = 'PROCESSING' " 2655 sql += " GROUP by job.dataset_id " 2656 cursor.execute(sql) 2657 result_set = cursor.fetchall(); 2658 self.commit() 2659 2660 for entry in result_set: 2661 try: 2662 entry['jobs_completed'] = self.intcast(entry['jobs_completed']) 2663 entry['jobs_failed'] = self.intcast(entry['jobs_failed']) 2664 except Exception,e: 2665 self.logger.error("Could not cast int(%s)" % entry['jobs_completed'] ) 2666 entry['jobs_completed'] = 0 2667 continue; 2668 2669 if self.intcast(entry['jobs_completed']) == self.intcast(entry['jobs_submitted']): 2670 2671 finished_sets.append(entry) 2672 sql = " UPDATE dataset " 2673 sql += " SET dataset.jobs_completed = %d, " % entry['jobs_completed'] 2674 sql += " dataset.jobs_failed = %d, " % entry['jobs_failed'] 2675 sql += " dataset.status = 'READYTOPUBLISH', " 2676 sql += " dataset.enddate = '%s', " % entry['enddate'] 2677 sql += " time_real = %g, " % entry['real_time'] 2678 sql += " time_sys = %g, " % entry['sys_time'] 2679 sql += " time_user = %g, " % entry['user_time'] 2680 sql += " mem_heap = %g, " % entry['heap_mem'] 2681 sql += " mem_heap_peak = %g, " % entry['heap_mem_peak'] 2682 sql += " mem_stack_peak = %g, " % entry['stack_mem_peak'] 2683 sql += " events = %d " % self.intcast(entry['events']) 2684 sql += " WHERE dataset.dataset_id = %d " % entry['dataset_id'] 2685 sql += " AND dataset.status = 'PROCESSING' " 2686 cursor.execute(sql) 2687 elif (self.intcast(entry['jobs_completed']) + \ 2688 self.intcast(entry['jobs_failed'])) == \ 2689 self.intcast(entry['jobs_submitted']): 2690 finished_sets.append(entry) 2691 sql = " UPDATE dataset " 2692 sql += " SET dataset.jobs_completed = %d, " % entry['jobs_completed'] 2693 sql += " dataset.jobs_failed = %d, " % entry['jobs_failed'] 2694 sql += " dataset.status = 'ERRORS', " 2695 sql += " dataset.enddate = '%s', " % entry['enddate'] 2696 sql += " time_real = %g, " % entry['real_time'] 2697 sql += " time_sys = %g, " % entry['sys_time'] 2698 sql += " time_user = %g, " % entry['user_time'] 2699 sql += " mem_heap = %g, " % entry['heap_mem'] 2700 sql += " mem_heap_peak = %g, " % entry['heap_mem_peak'] 2701 sql += " mem_stack_peak = %g, " % entry['stack_mem_peak'] 2702 sql += " events = %d " % self.intcast(entry['events']) 2703 sql += " WHERE dataset.dataset_id = %d " % entry['dataset_id'] 2704 sql += " AND dataset.status = 'PROCESSING' " 2705 cursor.execute(sql) 2706 2707 if self.intcast(entry['jobs_suspended']) + self.intcast(entry['jobs_failed']) > self.maxsuspend: 2708 sql = " UPDATE grid_statistics " 2709 sql += " SET suspend = 1 " 2710 sql += " WHERE suspend = 0 " 2711 sql += " AND dataset_id = %d " % entry['dataset_id'] 2712 sql += " AND grid_id = %d " % grid_id 2713 sql += " AND debug = 1" 2714 cursor.execute(sql) 2715 2716 self.commit() 2717 return finished_sets
2718 2719
2720 - def GetGridId(self,grid_name,institution=None,batchsys=None,url=None):
2721 """ 2722 Retrieve the key for grid_name 2723 """ 2724 ver = iceprod.__version__ 2725 if not self.isconnected(): self.connect() 2726 cursor = self.getcursor() 2727 sql = " SELECT grid_id FROM grid WHERE name='%s' " % grid_name 2728 cursor.execute(sql) 2729 result = cursor.fetchone() 2730 if result: 2731 grid_id = result['grid_id'] 2732 else: 2733 sql = " INSERT IGNORE INTO grid (name,institution,batchsys,version) " 2734 sql += " VALUES ('%s','%s','%s','%s') " % (grid_name,institution,batchsys,ver) 2735 cursor.execute(sql) 2736 grid_id = self.insert_id() 2737 2738 if institution and batchsys: 2739 sql = " UPDATE grid SET " 2740 sql += " institution = '%s', " % institution 2741 sql += " batchsys = '%s', " % batchsys 2742 sql += " version = '%s' " % ver 2743 sql += " WHERE grid_id=%d " % grid_id 2744 cursor.execute(sql) 2745 self.commit() 2746 2747 return grid_id
2748
2749 - def RegisterServer(self, 2750 grid_id, 2751 server_name, 2752 server_status, 2753 server_pid):
2754 """ 2755 Retrieve the key for grid_name 2756 """ 2757 if not self.isconnected(): self.connect() 2758 cursor = self.getcursor() 2759 2760 sql = " UPDATE grid SET " 2761 sql += " %s='%s', " % (server_name,server_status) 2762 sql += " %s_pid=%d " % (server_name,server_pid) 2763 sql += " WHERE grid_id=%d " % grid_id 2764 cursor.execute(sql) 2765 self.commit()
2766
2767 - def GetGridStatusChanges(self, grid_id):
2768 """ 2769 Get status changes for daemons 2770 """ 2771 if not self.isconnected(): self.connect() 2772 cursor = self.getcursor() 2773 2774 sql = " SELECT * FROM grid " 2775 sql += " WHERE grid_id=%d " % grid_id 2776 cursor.execute(sql) 2777 result = cursor.fetchone() 2778 2779 sql = " UPDATE grid SET lastupdate=NOW() " 2780 sql += " WHERE grid_id=%d " % grid_id 2781 cursor.execute(sql) 2782 2783 self.commit() 2784 return result
2785
2786 - def GridRequestSuspend(self, grid,daemon):
2787 """ 2788 Change status of daemons 2789 """ 2790 cursor = self.getcursor() 2791 2792 sql = " UPDATE grid SET " 2793 sql += " %s = 'STOPREQUEST' " % daemon 2794 sql += " WHERE %s ='RUNNING' " % daemon 2795 if type(grid) is types.IntType: 2796 sql += " AND grid_id=%u " % grid 2797 elif grid not in ('any','*','all'): 2798 sql += " AND name='%s' " % grid 2799 cursor.execute(sql) 2800 self.commit()
2801
2802 - def GridRequestResume(self, grid,daemon):
2803 """ 2804 Change status of daemons 2805 """ 2806 cursor = self.getcursor() 2807 2808 sql = " UPDATE grid SET " 2809 sql += " %s = 'STARTREQUEST' " % daemon 2810 sql += " WHERE %s = 'STOPPED' " % daemon 2811 if type(grid) is types.IntType: 2812 sql += " AND grid_id=%u " % grid 2813 else: 2814 sql += " AND name='%s' " % grid 2815 cursor.execute(sql) 2816 self.commit()
2817
2818 - def GetDatasetParams(self,dataset):
2819 """ 2820 Get parameters for given dataset 2821 """ 2822 cursor = self.getcursor() 2823 sql = " SELECT plus.category, plus.subcategory, " 2824 sql += " YEAR(plus.start_datetime) as year," 2825 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source " 2826 sql += " FROM plus,dif " 2827 sql += " WHERE plus.dataset_id = %d " % dataset 2828 sql += " AND plus.dataset_id = dif.dataset_id " 2829 self.logger.debug(sql) 2830 cursor.execute(sql) 2831 return cursor.fetchall()
2832
2833 - def GetFinishedJobs(self,grid_id,max_copy=20,delay=5):
2834 """ 2835 Fetch list of jobs that have completed for given grid_id 2836 """ 2837 # Get all jobs assigned to grid which need to be reset. 2838 cursor = self.getcursor() 2839 passkey = self.mkkey(6,9) 2840 job_list = [] 2841 2842 sql = " SELECT SUM(1) copying FROM `job` " 2843 sql += " WHERE " 2844 sql += " (status='COPYING' OR status='COPIED') " 2845 sql += " AND grid_id = %d " % grid_id 2846 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay 2847 self.logger.debug(sql) 2848 cursor.execute(sql) 2849 currently_copying = self.intcast(cursor.fetchone()['copying']) 2850 if not currently_copying: currently_copying = 0 2851 2852 sql = " UPDATE job SET " 2853 sql += " passkey='%s', " % passkey 2854 sql += " status_changed=NOW() " 2855 sql += " WHERE status='COPIED' " 2856 sql += " AND grid_id = %d " % grid_id 2857 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay 2858 sql += " LIMIT %d " % max_copy 2859 self.logger.debug(sql) 2860 cursor.execute(sql) 2861 self.commit() 2862 2863 sql = " UPDATE job SET " 2864 sql += " prev_state = status, " 2865 sql += " status = 'COPYING', " 2866 sql += " passkey='%s', " % passkey 2867 sql += " status_changed=NOW() " 2868 sql += " WHERE status='READYTOCOPY' " 2869 sql += " AND grid_id = %d " % grid_id 2870 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay 2871 sql += " LIMIT %d " % max(1,max_copy - currently_copying) 2872 self.logger.debug(sql) 2873 cursor.execute(sql) 2874 self.commit() 2875 2876 sql = " SELECT job.*, plus.category, plus.subcategory, " 2877 sql += " YEAR(plus.start_datetime) as year," 2878 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source " 2879 sql += " FROM job,plus,dif " 2880 sql += " WHERE grid_id = %d " % grid_id 2881 sql += " AND job.dataset_id = plus.dataset_id " 2882 sql += " AND plus.dataset_id = dif.dataset_id " 2883 sql += " AND (job.status='COPYING' OR job.status='COPIED') " 2884 sql += " AND job.passkey='%s' " % passkey 2885 self.logger.debug(sql) 2886 cursor.execute(sql) 2887 result_set = cursor.fetchall() 2888 self.commit() 2889 2890 return result_set
2891
2892 - def GetResetJobs(self,grid_id,max_reset=50):
2893 """ 2894 Fetch list of jobs that have completed for given grid_id 2895 """ 2896 # Get all jobs assigned to grid which need to be reset. 2897 cursor = self.getcursor() 2898 passkey = self.mkkey(6,9) 2899 2900 # suspend error jobs if dataset is in debug mode 2901 sql = " UPDATE job,dataset SET " 2902 sql += " job.prev_state = job.status, " 2903 sql += " job.status='SUSPENDED', " 2904 sql += " job.status_changed=NOW() " 2905 sql += " WHERE job.status='ERROR' " 2906 sql += " AND job.grid_id = %d " % grid_id 2907 sql += " AND job.dataset_id = dataset.dataset_id " 2908 sql += " AND dataset.debug = 1 " 2909 cursor.execute(sql) 2910 self.commit() 2911 2912 # put ERROR jobs with no grid assignment back in queue 2913 sql = " UPDATE job SET " 2914 sql += " prev_state = status, " 2915 sql += " status='WAITING', " 2916 sql += " status_changed=NOW() " 2917 sql += " WHERE status IN ('ERROR','RESET') " 2918 sql += " AND (grid_id =0 OR grid_id IS NULL) " 2919 cursor.execute(sql) 2920 self.commit() 2921 2922 # set passkey for jobs in RESET state 2923 sql = " UPDATE job SET " 2924 sql += " prev_state = status, " 2925 sql += " passkey='%s' " % passkey 2926 sql += " WHERE status='RESET' " 2927 sql += " AND grid_id = %d " % grid_id 2928 sql += " LIMIT %d " % max_reset 2929 cursor.execute(sql) 2930 self.commit() 2931 2932 # SELECT jobs to be cleaned up 2933 sql = " SELECT * FROM job " 2934 sql += " WHERE grid_id = %d " % grid_id 2935 sql += " AND passkey='%s' " % passkey 2936 sql += " AND status='RESET' " 2937 sql += " LIMIT %d " % max_reset 2938 cursor.execute(sql) 2939 result_set = cursor.fetchall() 2940 self.commit() 2941 2942 # and set their status to CLEANING 2943 sql = " UPDATE job SET " 2944 sql += " prev_state = status, " 2945 sql += " status = 'CLEANING', " 2946 sql += " passkey='%s', " % passkey 2947 sql += " status_changed=NOW() " 2948 sql += " WHERE status='RESET' " 2949 sql += " AND grid_id = %d " % grid_id 2950 sql += " AND NOW() > TIMESTAMPADD(MINUTE,5,status_changed) " 2951 sql += " ORDER BY priority DESC, job_id " 2952 sql += " LIMIT %d " % max_reset 2953 cursor.execute(sql) 2954 self.commit() 2955 2956 # reset jobs with given grid_id 2957 sql = " UPDATE job SET " 2958 sql += " job.prev_state = job.status, " 2959 sql += " job.status='RESET', " 2960 sql += " job.status_changed=NOW() " 2961 sql += " WHERE job.status='ERROR' " 2962 sql += " AND job.grid_id = %d " % grid_id 2963 sql += " ORDER BY job.priority DESC, job.job_id " 2964 sql += " LIMIT %d " % max_reset 2965 cursor.execute(sql) 2966 self.commit() 2967 2968 return result_set
2969
2970 - def GetActiveJobs(self,grid_id):
2971 """ 2972 Get list of jobs currently in any active state 2973 """ 2974 from iceprod.server.job import i3Job 2975 cursor = self.getcursor() 2976 job_list = [] 2977 sql = " SELECT * FROM job " 2978 sql += " WHERE job.grid_id = %d " % grid_id 2979 sql += " AND job.status NOT IN " 2980 sql += "('WAITING','OK','SUSPENDED','FAILED')" 2981 cursor.execute(sql) 2982 for j in cursor.fetchall(): 2983 job = i3Job() 2984 job.SetDatasetId(j['dataset_id']) 2985 job.SetProcNum(j['queue_id']) 2986 job.SetPrio(j['priority']) 2987 job.SetJobId(j['grid_queue_id']) 2988 job.AddArgOption("key",j['passkey']) 2989 job_list.append(job) 2990 self.commit() 2991 return job_list
2992
2993 - def GetQueuedJobs(self,grid_id,delay=5):
2994 """ 2995 Get list of jobs currently in queued status 2996 """ 2997 from iceprod.server.job import i3Job 2998 cursor = self.getcursor() 2999 job_list = [] 3000 sql = " SELECT * FROM job " 3001 sql += " WHERE job.grid_id = %d " % grid_id 3002 sql += " AND job.status = 'QUEUED' " 3003 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay 3004 cursor.execute(sql) 3005 for j in cursor.fetchall(): 3006 job = i3Job() 3007 job.SetDatabaseId(j['job_id']) 3008 job.SetDatasetId(j['dataset_id']) 3009 job.SetProcNum(j['queue_id']) 3010 job.SetPrio(j['priority']) 3011 job.SetJobId(j['grid_queue_id']) 3012 job.AddArgOption("key",j['passkey']) 3013 job_list.append(job) 3014 self.commit() 3015 return job_list
3016
3017 - def GetProcessingJobs(self,grid_id,delay=5):
3018 """ 3019 Get list of jobs currently in queued status 3020 """ 3021 from iceprod.server.job import i3Job 3022 cursor = self.getcursor() 3023 job_list = [] 3024 sql = " SELECT * FROM job " 3025 sql += " WHERE job.grid_id = %d " % grid_id 3026 sql += " AND job.status = 'PROCESSING' " 3027 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay 3028 cursor.execute(sql) 3029 for j in cursor.fetchall(): 3030 job = i3Job() 3031 job.SetDatasetId(j['dataset_id']) 3032 job.SetProcNum(j['queue_id']) 3033 job.SetPrio(j['priority']) 3034 job.SetJobId(j['grid_queue_id']) 3035 job.AddArgOption("key",j['passkey']) 3036 job_list.append(job) 3037 self.commit() 3038 return job_list
3039 3040
3041 - def QueueJobs(self,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0):
3042 """ 3043 Reserve at most 'maxjobs' from a given dataset. 3044 Get proc ids and set their status to 'QUEUEING' 3045 """ 3046 from iceprod.server.job import i3Job 3047 cursor = self.getcursor() 3048 job_list = {} 3049 3050 # Count the number of jobs currently enqueued or running 3051 sql = " SELECT SUM(1) AS total " 3052 sql += " FROM job " 3053 sql += " WHERE job.grid_id = %d " % grid_id 3054 sql += " AND NOT " 3055 sql += " (status='WAITING' OR status='OK' OR status='SUSPENDED' " 3056 sql += " OR status='RESET' OR status='FAILED' ) " 3057 cursor.execute(sql) 3058 result = cursor.fetchone() 3059 self.commit() 3060 if result['total'] == None: 3061 self.logger.debug('queue total returned None') 3062 total = 0 3063 else: 3064 total = int(result['total']) 3065 3066 maxjobs = maxjobs - min(maxjobs,total) 3067 self.logger.info('%d jobs are currently in the queue' % total) 3068 3069 # Get next waiting jobs in queue 3070 sql = " SELECT job.*, dataset.temporary_storage, " 3071 sql += " plus.category, plus.subcategory, " 3072 sql += " YEAR(plus.start_datetime) as year, " 3073 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source " 3074 sql += " FROM job,grid_statistics,dif,plus, dataset " 3075 sql += " WHERE job.dataset_id = grid_statistics.dataset_id " 3076 sql += " AND job.dataset_id = plus.dataset_id " 3077 sql += " AND plus.dataset_id = dif.dataset_id " 3078 sql += " AND job.dataset_id = dataset.dataset_id " 3079 sql += " AND grid_statistics.grid_id = %d " % grid_id 3080 sql += " AND grid_statistics.suspend != 1 " 3081 3082 if debug: # run in debug mode only submitting one job/per dataset 3083 sql += " AND grid_statistics.debug = 1 " 3084 sql += " AND job.queue_id = 0 " 3085 else: # datasets with smaller numbers should run first 3086 sql += " AND grid_statistics.debug != 1 " 3087 3088 sql += " AND job.status='WAITING' " 3089 sql += " ORDER BY priority DESC," 3090 if fifo: # datasets with smaller numbers should run first 3091 sql += " dataset_id," 3092 sql += " queue_id " 3093 sql += " LIMIT %u " % min(jobs_at_once,maxjobs) 3094 3095 self.logger.debug(sql) 3096 cursor.execute(sql) 3097 result_set = cursor.fetchall() 3098 self.commit() 3099 3100 if not len(result_set) > 0: 3101 self.logger.info("no jobs to queue at this time") 3102 return {} 3103 else: 3104 self.logger.info("reserved %d jobs" % len(result_set)) 3105 3106 for item in result_set: 3107 proc = item['queue_id'] 3108 target_url = item['temporary_storage'] 3109 item['subdirectory'] = "%05d-%05d" % ((proc/1000)*1000,((proc+1000)/1000)*1000-1) 3110 # Don't want to keep looping if we have exceeded maxjobs 3111 if len(job_list) >= maxjobs: 3112 return job_list 3113 3114 # Set job status to 'QUEUED' 3115 passkey = self.mkkey(6,9) 3116 queue_id = item['queue_id'] 3117 dataset_id = item['dataset_id'] 3118 priority = item['priority'] 3119 sql = " UPDATE job SET " 3120 sql += " job.prev_state = job.status, " 3121 sql += " job.status='QUEUEING', " 3122 sql += " job.host=NULL, " 3123 sql += " job.grid_id=%d, " % grid_id 3124 sql += " status_changed=NOW()," 3125 sql += " passkey='%s' " % passkey 3126 sql += " WHERE job.status='WAITING' " 3127 sql += " AND job.queue_id=%d " % queue_id 3128 sql += " AND job.dataset_id=%d " % dataset_id 3129 try: 3130 cursor.execute(sql) 3131 rowcount = self._conn.affected_rows() 3132 self.commit() 3133 except Exception, e: 3134 self.logger.debug(e) 3135 continue 3136 if rowcount == 1: 3137 self.logger.debug("%d job has been marked as 'QUEUEING' " % rowcount) 3138 job = i3Job() 3139 job.SetDatabaseId(item['job_id']) 3140 job.SetDatasetId(dataset_id) 3141 job.SetProcNum(queue_id) 3142 job.SetPrio(priority) 3143 job.AddArgOption("key",passkey) 3144 if not job_list.has_key(dataset_id): 3145 job_list[dataset_id] = [] 3146 job_list[dataset_id].append(job) 3147 elif rowcount == 0: 3148 self.logger.warn("someone beat me to job %d in dataset %d" % \ 3149 (queue_id,dataset_id)) 3150 else: 3151 raise Exception, "getjob:wrong number of rows affected" 3152 3153 return job_list
3154 3155
3156 - def InitializeGridStats(self,grids,dataset_id):
3157 """ 3158 Insert grid_statistics entries for grids which should run this 3159 dataset. 3160 @param grids: list of grids or clusters 3161 @param dataset_id: dataset id 3162 """ 3163 3164 delim = "" 3165 sql = " SELECT grid_id FROM grid WHERE " 3166 for grid in grids: 3167 sql += "%s name = '%s' " % (delim,grid) 3168 delim = "OR" 3169 cursor = self.getcursor() 3170 cursor.execute(sql) 3171 result_set = cursor.fetchall() 3172 self.commit() 3173 3174 if len(result_set) == 0: 3175 self.logger.error("could not match grid name '%s'" % ":".join(grids)) 3176 return 3177 3178 delim = "" 3179 sql = " INSERT INTO grid_statistics " 3180 sql += " (grid_id,dataset_id) VALUES " 3181 for item in result_set: 3182 sql += " %s (%d,%d) " % ( delim, item['grid_id'], dataset_id) 3183 delim = "," 3184 cursor = self.getcursor() 3185 cursor.execute(sql) 3186 self.commit()
3187
3188 - def InitializeJobTable(self,maxjobs,dataset_id,priority=0,stepsize=1000):
3189 """ 3190 Create job monitoring entries in database 3191 """ 3192 cursor = self.getcursor() 3193 3194 sql = " INSERT INTO job " 3195 sql += " (queue_id,status,dataset_id,priority,status_changed) VALUES " 3196 3197 for i in range(0,maxjobs,min(stepsize,maxjobs)): 3198 comma = "" 3199 sql1 = sql 3200 jlist = range(i,i+min(stepsize,maxjobs-i)) 3201 for job in jlist: 3202 sql1 += comma + " (%d,'WAITING',%d,%d,NOW()) " % ( job, dataset_id, priority ) 3203 comma = "," 3204 if jlist: 3205 cursor.execute(sql1) 3206 self.commit() 3207
3208 - def jobstart(self,hostname,grid_id,dataset_id=0,queue_id=0,key=None):
3209 """ 3210 Change the status of a job to indicate it is currently running 3211 @param hostname: host where job was queued from 3212 @param grid_id: ID of iceprod queue 3213 @param dataset_id: Optional dataset ID 3214 @param queue_id: Optional job ID (within dataset) 3215 @param key: temporary passkey to avoid job spoofs 3216 @return: dataset_id,nproc,procnum 3217 """ 3218 3219 cursor = self.getcursor() 3220 3221 sql = " SELECT jobs_submitted " 3222 sql += " FROM dataset " 3223 sql += " WHERE dataset_id = %d " % dataset_id 3224 cursor.execute(sql) 3225 item = cursor.fetchone() 3226 self.commit() 3227 jobs_submitted = item['jobs_submitted'] 3228 # Set job status to 'PROCESSING' 3229 sql = " UPDATE job SET " 3230 sql += " job.prev_state = job.status, " 3231 sql += " job.status='PROCESSING', " 3232 sql += " job.grid_id=%d, " % grid_id 3233 sql += " job.host='%s', " % hostname 3234 sql += " job.tray=0, " 3235 sql += " job.iter=0, " 3236 sql += " status_changed=NOW(), " 3237 sql += " keepalive=NOW() " 3238 sql += " WHERE " 3239 sql += " (job.status='QUEUED' OR job.status='PROCESSING' OR job.status='EVICTED')" 3240 sql += " AND job.queue_id=%d " % queue_id 3241 sql += " AND job.dataset_id=%d " % dataset_id 3242 sql += " AND job.passkey='%s' " % key 3243 rowcount = self.execute(cursor,sql) 3244 if rowcount > 0: 3245 return dataset_id,jobs_submitted,queue_id 3246 return -1,0,0
3247
3248 - def jobreset(self,dataset_id,job_id,reason=None,passkey=None):
3249 """ 3250 Update status for job 3251 @param dataset_id: dataset index 3252 @param job_id: process number within dataset 3253 """ 3254 cursor = self.getcursor() 3255 sql = " SELECT grid_queue_id FROM job " 3256 sql += " WHERE dataset_id=%d " % dataset_id 3257 sql += " AND queue_id=%d " % job_id 3258 if passkey: 3259 sql += " AND passkey='%s' " % passkey 3260 cursor.execute(sql) 3261 qid = cursor.fetchone() 3262 3263 sql = " UPDATE job SET " 3264 sql += " tray=0, " 3265 sql += " iter=0, " 3266 sql += " prev_state = status, " 3267 if reason: 3268 sql += " errormessage = '%s', " % reason 3269 sql += " status='RESET', " 3270 sql += " status_changed=NOW() " 3271 sql += " WHERE dataset_id=%d " % dataset_id 3272 sql += " AND queue_id=%d " % job_id 3273 if passkey: 3274 sql += " AND passkey='%s' " % passkey 3275 self.execute(cursor,sql) 3276 3277 return qid['grid_queue_id']
3278
3279 - def jobcopying(self,dataset_id,job_id,passkey=None):
3280 """ 3281 Update status for job 3282 @param dataset_id: dataset index 3283 @param job_id: process number within dataset 3284 """ 3285 cursor = self.getcursor() 3286 sql = " UPDATE job SET " 3287 sql += " prev_state = status, " 3288 sql += " status='COPYING', " 3289 sql += " status_changed=NOW() " 3290 sql += " WHERE dataset_id=%d " % dataset_id 3291 sql += " AND queue_id=%d " % job_id 3292 if passkey: 3293 sql += " AND passkey='%s' " % passkey 3294 self.execute(cursor,sql) 3295 return 1
3296
3297 - def jobfinalize(self,dataset_id,job_id,passkey,status='OK',clear_errors=True):
3298 """ 3299 Update status for job 3300 @param dataset_id: dataset index 3301 @param job_id: process number within dataset 3302 """ 3303 cursor = self.getcursor() 3304 3305 sql = " SELECT * FROM job " 3306 sql += " WHERE dataset_id=%d " % dataset_id 3307 sql += " AND queue_id=%d " % job_id 3308 cursor.execute(sql) 3309 job = cursor.fetchone() 3310 3311 sql = " UPDATE job SET " 3312 sql += " prev_state = status, " 3313 sql += " status='%s', " % status 3314 if clear_errors: 3315 sql += " errormessage = NULL, " 3316 sql += " status_changed=NOW() " 3317 sql += " WHERE dataset_id=%d " % dataset_id 3318 sql += " AND queue_id=%d " % job_id 3319 sql += " AND passkey='%s' " % passkey 3320 self.execute(cursor,sql) 3321 3322 if status == 'OK': 3323 # Update grid statistics 3324 sql = " UPDATE grid_statistics SET " 3325 sql += " grid_statistics.ok = grid_statistics.ok+1 " 3326 sql += " WHERE grid_statistics.dataset_id= %u " % job['dataset_id'] 3327 sql += " AND grid_statistics.grid_id= %u " % job['grid_id'] 3328 self.logger.debug(sql) 3329 try: 3330 self.execute(cursor,sql) 3331 except Exception,e: 3332 self.logger.error(e) 3333 self.logger.debug("wrote stats for job %d,%d " % (dataset_id,job_id)) 3334 3335 return 0,job['submitdir']
3336
3337 - def get_stats(self,dataset_id):
3338 """ 3339 Get collected dataset statistics 3340 @param dataset_id: dataset index 3341 """ 3342 stats = {} 3343 cursor = self.getcursor() 3344 sql = " SELECT name,value FROM dataset_statistics_mv " 3345 sql += " WHERE dataset_id='%s' " % dataset_id 3346 cursor.execute(sql) 3347 for entry in cursor.fetchall(): 3348 stats[entry['name']] = entry['value'] 3349 self.commit() 3350 return stats
3351
3352 - def jobfinish(self,dataset_id,job_id,stats,key=None,mode=0):
3353 """ 3354 Update monitoring for job and write statistics 3355 @param dataset_id: dataset index 3356 @param job_id: process number within dataset 3357 @param stats: dictonary of stat entries 3358 """ 3359 passkey = self.mkkey(6,9) 3360 cursor = self.getcursor() 3361 sql = " UPDATE job SET " 3362 sql += " prev_state = status, " 3363 if mode == 1: 3364 sql += " status = 'COPIED', " 3365 else: 3366 sql += " status = 'READYTOCOPY', " 3367 if stats.has_key('mem_heap'): 3368 sql += " mem_heap = %s, " % stats['mem_heap'] 3369 if stats.has_key('mem_heap_peak'): 3370 sql += " mem_heap_peak = %s, " % stats['mem_heap_peak'] 3371 if stats.has_key('user_time'): 3372 sql += " time_user = %s, " % stats['user_time'] 3373 if stats.has_key('sys_time'): 3374 sql += " time_sys = %s, " % stats['sys_time'] 3375 if stats.has_key('real_time'): 3376 sql += " time_real = %s, " % stats['real_time'] 3377 if stats.has_key('Triggered Events'): 3378 sql += " nevents = %s, " % stats['Triggered Events'] 3379 if stats.has_key('Generated Events'): 3380 sql += " gevents = %s, " % stats['Generated Events'] 3381 sql += " job.passkey='%s', " % passkey 3382 sql += " status_changed=NOW() " 3383 sql += " WHERE dataset_id=%d " % dataset_id 3384 sql += " AND queue_id=%d " % job_id 3385 sql += " AND job.passkey='%s' " % key 3386 self.logger.debug(sql) 3387 rowcount = self.execute(cursor,sql) 3388 3389 sql = " SELECT grid_id,host FROM job " 3390 sql += " WHERE queue_id=%d " % job_id 3391 sql += " AND dataset_id=%d " % dataset_id 3392 self.logger.debug(sql) 3393 cursor.execute(sql) 3394 gridinfo = cursor.fetchone() 3395 self.commit() 3396 if not gridinfo: # don't continue 3397 return (rowcount+1)%2 3398 3399 # Create node entry if it does not exist 3400 sql = " INSERT IGNORE INTO node_statistics " 3401 if stats.has_key('host_id'): 3402 sql += " (name,grid_id,host_id) VALUES ('%s',%d,'%s') " % (gridinfo['host'],gridinfo['grid_id'],stats['host_id']) 3403 else: 3404 sql += " (name,grid_id) VALUES ('%s',%d) " % (gridinfo['host'],gridinfo['grid_id']) 3405 self.logger.debug(sql) 3406 cursor.execute(sql) 3407 self.commit() 3408 3409 sql = " UPDATE node_statistics SET " 3410 sql += " completed = completed+1, " 3411 if stats.has_key('platform'): 3412 sql += " node_statistics.platform='%s', " % stats['platform'] 3413 if stats.has_key('host_id'): 3414 sql += " node_statistics.host_id='%s', " % stats['host_id'] 3415 sql += " mem_heap = mem_heap+%s, " % stats['mem_heap'] 3416 sql += " mem_heap_peak = mem_heap_peak+%s, " % stats['mem_heap_peak'] 3417 sql += " time_user = time_user+%s, " % stats['user_time'] 3418 sql += " time_sys = time_sys+%s, " % stats['sys_time'] 3419 sql += " time_real = time_real+%s " % stats['real_time'] 3420 sql += " WHERE name='%s' " % gridinfo['host'] 3421 sql += " AND grid_id=%d " % gridinfo['grid_id'] 3422 self.logger.debug(sql) 3423 cursor.execute(sql) 3424 self.commit() 3425 3426 # create new statistics entries if they do not exist 3427 if len(stats): 3428 cm = "" 3429 sql = " REPLACE INTO job_statistics " 3430 sql += " (queue_id,dataset_id,name,value) VALUES " 3431 for key,value in stats.items(): 3432 try: # check that value is numeric 3433 value = float(value) 3434 sql += "%s (%d,%d,'%s',%f) " % (cm,job_id,dataset_id,key,value) 3435 cm = "," 3436 except: continue 3437 cursor.execute(sql) 3438 3439 sql = " UPDATE grid_statistics SET " 3440 sql += " time_real = time_real + %g, " % stats['real_time'] 3441 sql += " time_sys = time_sys + %g, " % stats['sys_time'] 3442 sql += " time_user = time_user + %g " % stats['user_time'] 3443 sql += " WHERE grid_statistics.dataset_id= %u " % dataset_id 3444 sql += " AND grid_statistics.grid_id= %u " % gridinfo['grid_id'] 3445 cursor.execute(sql) 3446 3447 self.commit() 3448 3449 return (rowcount+1)%2
3450 3451 3452
3453 - def jobsubmitted(self,dataset_id,job_id,submitdir,grid_queue_id=None):
3454 """ 3455 Set the submission path of job so that it can be post processed 3456 on termination. 3457 """ 3458 cursor = self.getcursor() 3459 sql = " UPDATE job SET " 3460 sql += " prev_state = status, " 3461 sql += " status='QUEUED', " 3462 if not grid_queue_id == -1: 3463 sql += " grid_queue_id='%s', " % grid_queue_id 3464 sql += " submitdir=\"%s\" " % submitdir 3465 sql += " WHERE queue_id=%d " % job_id 3466 sql += " AND dataset_id=%d " % dataset_id 3467 self.logger.debug(sql) 3468 self.execute(cursor,sql) 3469 return 1
3470
3471 - def jobping(self,dataset_id,job_id,host,key=None,tray=0,iter=0):
3472 """ 3473 Update status_changed time for job 3474 """ 3475 cursor = self.getcursor() 3476 sql = " SELECT status from job " 3477 sql += " WHERE queue_id=%d " % job_id 3478 sql += " AND dataset_id=%d " % dataset_id 3479 sql += " AND passkey='%s' " % key 3480 self.logger.debug(sql) 3481 cursor.execute(sql) 3482 row = cursor.fetchone(); 3483 if not (row and row['status'] == 'PROCESSING'): 3484 return False 3485 3486 sql = " UPDATE job SET " 3487 sql += " tray=%d, " % tray 3488 sql += " iter=%d, " % iter 3489 sql += " keepalive=NOW() " 3490 sql += " WHERE queue_id=%d " % job_id 3491 sql += " AND dataset_id=%d " % dataset_id 3492 sql += " AND status='PROCESSING' " 3493 sql += " AND passkey='%s' " % key 3494 self.logger.debug(sql) 3495 self.execute(cursor,sql) 3496 return True
3497
3498 - def jobabort(self,job_id,dataset_id,error,errormessage='',key=None,stats={}):
3499 """ 3500 Reset any pending jobs to they get reprocesses. 3501 This would typically be run at startup in case the daemon 3502 crashed previously. 3503 @todo: update node statistics 3504 """ 3505 cursor = self.getcursor() 3506 3507 sql = " SELECT debug FROM dataset WHERE dataset_id = %d " % dataset_id 3508 self.logger.debug(sql) 3509 cursor.execute(sql) 3510 debug = cursor.fetchone()['debug'] 3511 3512 sql = " SELECT * FROM job " 3513 sql += " WHERE dataset_id = %u AND queue_id = %u " % (dataset_id,job_id) 3514 cursor.execute(sql) 3515 job = cursor.fetchone() 3516 3517 sql = " UPDATE job SET " 3518 sql += " prev_state = status, " 3519 sql += " errormessage=CONCAT(NOW(),QUOTE('%s')), " % self.defang(errormessage) 3520 sql += " status_changed=NOW(), " 3521 3522 if error == 1: # job was evicted 3523 sql += " evictions=evictions+1, " 3524 sql += " status='EVICTED', " 3525 if error == 2: # job failed with error 3526 sql += " failures=failures+1, " 3527 if debug: 3528 sql += " status='SUSPENDED', " 3529 else: 3530 sql += " status='ERROR', " 3531 sql += " nevents=0 " 3532 sql += " WHERE queue_id=%d " % job['queue_id'] 3533 sql += " AND dataset_id=%d " % job['dataset_id'] 3534 if key: 3535 sql += " AND passkey='%s' " % key 3536 self.logger.debug(sql) 3537 self.execute(cursor,sql) 3538 self.commit() 3539 3540 sql = " UPDATE node_statistics,job SET " 3541 if stats.has_key('platform'): 3542 sql += " node_statistics.platform='%s', " % stats['platform'] 3543 if error == 1: # job was evicted 3544 sql += " node_statistics.evictions=node_statistics.evictions+1 " 3545 else: # job failed with error 3546 sql += " node_statistics.failures = node_statistics.failures+1 " 3547 3548 sql += " WHERE node_statistics.name=job.host " 3549 if stats.has_key('host_id'): 3550 sql += " AND node_statistics.host_id='%s' " % stats['host_id'] 3551 sql += " AND node_statistics.grid_id=job.grid_id " 3552 sql += " AND job.queue_id=%d " % job_id 3553 sql += " AND job.dataset_id=%d " % dataset_id 3554 self.logger.debug(sql) 3555 try: 3556 self.execute(cursor,sql) 3557 except Exception,e: 3558 self.logger.error(e) 3559 3560 # Update grid statistics 3561 sql = " UPDATE grid_statistics SET " 3562 if stats: 3563 try: # update stats if given 3564 sql += " time_real = time_real + %g, " % stats['real_time'] 3565 sql += " time_sys = time_sys + %g, " % stats['sys_time'] 3566 sql += " time_user = time_user + %g, " % stats['user_time'] 3567 except: pass 3568 3569 if error == 1: # job was evicted 3570 sql += " grid_statistics.evictions = grid_statistics.evictions+1 " 3571 else: # job failed with error 3572 sql += " grid_statistics.failures = grid_statistics.failures+1 " 3573 sql += " WHERE grid_statistics.dataset_id= %u " % job['dataset_id'] 3574 sql += " AND grid_statistics.grid_id= %u " % job['grid_id'] 3575 self.logger.debug(sql) 3576 try: 3577 self.execute(cursor,sql) 3578 except Exception,e: 3579 self.logger.error(e) 3580 3581 self.commit()
3582 3583
3584 - def jobclean(self,dataset_id,archive=True):
3585 """ 3586 Remove jobs from queueue 3587 """ 3588 cursor = self.getcursor() 3589 3590 sql = " SELECT name, SUM(value) as total " 3591 sql += " FROM job_statistics " 3592 sql += " WHERE dataset_id=%d " % dataset_id 3593 sql += " GROUP BY name " 3594 cursor.execute(sql) 3595 results = cursor.fetchall() 3596 self.commit() 3597 if results: 3598 3599 # create new entries if they do not exist 3600 cm = "" 3601 sql = " REPLACE INTO dataset_statistics " 3602 sql += " (name,value,dataset_id) VALUES " 3603 for entry in results: 3604 sql += "%s ('%s',%f,%d) " % (cm,entry['name'],entry['total'],dataset_id) 3605 cm = "," 3606 cursor.execute(sql) 3607 self.commit() 3608 else: 3609 self.logger.warn("jobclean: no job statistics found for dataset %d" % dataset_id) 3610 3611 if archive: 3612 # archive job entries 3613 sql = " INSERT INTO archived_job " 3614 sql += " SELECT job.* FROM job WHERE dataset_id = %u " % dataset_id 3615 self.logger.debug(sql) 3616 cursor.execute(sql) 3617 3618 # archive job_statistics 3619 sql = " INSERT INTO archived_job_statistics " 3620 sql += " SELECT job_statistics.* FROM job_statistics WHERE dataset_id = %u " % dataset_id 3621 self.logger.debug(sql) 3622 cursor.execute(sql) 3623 3624 # delete job entries 3625 sql = " DELETE FROM job " 3626 sql += " WHERE dataset_id=%d " % dataset_id 3627 self.logger.debug(sql) 3628 cursor.execute(sql) 3629 3630 # delete job_statistics 3631 sql = " DELETE FROM job_statistics " 3632 sql += " WHERE dataset_id=%d " % dataset_id 3633 try: 3634 cursor.execute(sql) 3635 except Exception,e: 3636 self.logger.error('%s: could not delete job_statistics entry.' % str(e)) 3637 self.commit()
3638 3639
3640 - def jobsuspend(self,job_id,dataset_id,suspend=True):
3641 """ 3642 Reset any pending jobs to they get reprocesses. 3643 This would typically be run at startup in case the daemon 3644 crashed previously. 3645 """ 3646 if suspend: status = 'SUSPENDED' 3647 else: status = 'RESET' 3648 3649 passkey = self.mkkey(6,9) 3650 cursor = self.getcursor() 3651 3652 sql = " UPDATE job SET " 3653 sql += " prev_state = status, " 3654 sql += " status='%s', " % status 3655 sql += " passkey='%s', " % passkey 3656 sql += " status_changed=NOW() " 3657 sql += " WHERE dataset_id=%d " % dataset_id 3658 sql += " AND status != 'OK' " 3659 if not job_id < 0: 3660 sql += " AND queue_id=%d " % job_id 3661 self.execute(cursor,sql)
3662
3663 - def ToggleDatasetDebug(self,dataset):
3664 """ 3665 Update status of dataset 3666 """ 3667 cursor = self.getcursor() 3668 sql = " UPDATE dataset " 3669 sql += " SET debug = !debug " 3670 sql += " WHERE dataset_id=%d " % dataset 3671 cursor.execute(sql) 3672 self.commit()
3673
3674 - def setDatasetStatus(self,dataset,status):
3675 """ 3676 Update status of dataset 3677 """ 3678 cursor = self.getcursor() 3679 sql = " UPDATE dataset " 3680 sql += " SET status='%s' " % status 3681 sql += " WHERE dataset_id=%d " % dataset 3682 cursor.execute(sql) 3683 self.commit()
3684
3685 - def validate(self,dataset_id,status='TRUE'):
3686 """ 3687 Mark dataset as visible and valid. 3688 """ 3689 cursor = self.getcursor() 3690 sql = " UPDATE dataset SET verified = '%s' " % status 3691 sql += " WHERE dataset_id = %d " % dataset_id 3692 self.logger.debug(sql) 3693 cursor.execute(sql) 3694 self.commit()
3695
3696 - def multipart_job_start(self,dataset_id,queue_id,key=''):
3697 """ 3698 Change the status of a job to indicate it is currently running 3699 @param dataset_id: Dataset ID 3700 @param queue_id: Queue ID (within dataset) 3701 @param key: temporary passkey to avoid job spoofs 3702 @return: dataset_id,nproc,procnum 3703 """ 3704 3705 cursor = self.getcursor() 3706 3707 sql = " SELECT jobs_submitted " 3708 sql += " FROM dataset " 3709 sql += " WHERE dataset_id = %s " 3710 cursor.execute(sql,(dataset_id,)) 3711 item = cursor.fetchone() 3712 3713 jobs_submitted = item['jobs_submitted'] 3714 3715 self.logger.debug("Job %d.%d starting with key %s" % (dataset_id,queue_id,key)) 3716 3717 # update the main job 3718 sql = " UPDATE job SET " 3719 sql += " job.prev_state = job.status, " 3720 sql += " job.status='PROCESSING', " 3721 #sql += " errormessage = NULL, " 3722 sql += " status_changed=NOW(), " 3723 sql += " keepalive=NOW() " 3724 sql += " WHERE " 3725 sql += " (job.status='QUEUED' OR job.status='PROCESSING' OR job.status='EVICTED')" 3726 sql += " AND job.dataset_id=%s " 3727 sql += " AND job.queue_id=%s " 3728 sql += " AND job.passkey=%s " 3729 cursor.execute(sql,(dataset_id,queue_id,key)) 3730 3731 # separate this out because the main job may have been updated already 3732 sql = " SELECT EXISTS(" 3733 sql += " SELECT *" 3734 sql += " FROM job j" 3735 sql += " WHERE status = 'PROCESSING'" 3736 sql += " AND dataset_id = %s" 3737 sql += " AND queue_id = %s" 3738 sql += " AND passkey = %s" 3739 sql += ") AS found" 3740 cursor.execute(sql,(dataset_id,queue_id,key)) 3741 row = cursor.fetchone() 3742 if row and int(row['found']) == 1: 3743 return (dataset_id,jobs_submitted,queue_id) 3744 return (TASK_DATASET_ERROR_ID,0,0)
3745
3746 - def multipart_job_finish(self,dataset_id,queue_id,key=''):
3747 it = self.get_iterations(dataset_id) 3748 tray,iter = 0,0 3749 if it: tray,iter = it[-1]['tray_index'],it[-1]['iterations']+1 3750 cursor = self.getcursor() 3751 sql = " UPDATE job SET " 3752 sql += " prev_state = status, " 3753 sql += " status = 'OK', " 3754 sql += " tray = %u, " 3755 sql += " iter = %u, " 3756 sql += " errormessage = NULL, " 3757 sql += " status_changed=NOW() " 3758 sql += " WHERE dataset_id=%s " 3759 sql += " AND queue_id=%s " 3760 sql += " AND passkey=%s " 3761 cursor.execute(sql,(tray,iter,dataset_id,queue_id,key)) 3762 return self._conn.affected_rows()
3763
3764 - def get_iterations(self,dataset_id):
3765 cursor = self.getcursor() 3766 sql = " SELECT tray.tray_index, tray.iterations " 3767 sql += " FROM tray WHERE dataset_id = %u " 3768 cursor.execute(sql,(dataset_id,)) 3769 return cursor.fetchall()
3770
3771 - def task_start(self,dataset_id,queue_id,taskname,tray,iter,hostname,key=''):
3772 cursor = self.getcursor() 3773 3774 self.logger.info("task %s (tray: %d iter: %d) starting for job %d.%d" \ 3775 % (taskname,tray,iter,dataset_id,queue_id)) 3776 3777 # get the job ID 3778 sql = " SELECT job_id" 3779 sql += " FROM job j" 3780 sql += " WHERE dataset_id = %s" 3781 sql += " AND queue_id = %s" 3782 sql += " AND passkey = %s" 3783 cursor.execute(sql,(dataset_id,queue_id,key)) 3784 row = cursor.fetchone() 3785 if not row: 3786 msg = "task %s tried to start for job %d.%d with wrong passkey" \ 3787 % (taskname,dataset_id,queue_id) 3788 self.logger.warn(msg) 3789 return TASK_ERROR_ID 3790 job_id = row['job_id'] 3791 3792 # get the task definition ID 3793 sql = " SELECT task_def_id" 3794 sql += " FROM task_def td" 3795 sql += " WHERE dataset_id = %s" 3796 sql += " AND name = %s" 3797 cursor.execute(sql,(dataset_id,taskname)) 3798 row = cursor.fetchone() 3799 if not row: 3800 msg = "task %s not found for job %d.%d" % (taskname,dataset_id,queue_id) 3801 self.logger.error(msg) 3802 return TASK_ERROR_ID 3803 task_def_id = row['task_def_id'] 3804 3805 # get the task definition tray ID 3806 sql = " SELECT task_def_tray_id" 3807 sql += " FROM task_def_tray tdt" 3808 sql += " WHERE task_def_id = %s" 3809 sql += " AND idx = %s" 3810 sql += " AND iter = %s" 3811 cursor.execute(sql,(task_def_id,tray,iter)) 3812 row = cursor.fetchone() 3813 if not row: 3814 msg = "tray %d, iter %d not found for task %s in job %d.%d" \ 3815 % (tray,iter,taskname,dataset_id,queue_id) 3816 self.logger.error(msg) 3817 return TASK_ERROR_ID 3818 tdt_id = row['task_def_tray_id'] 3819 3820 sql = " SELECT task_id" 3821 sql += " FROM task t, task_def_tray tdt" 3822 sql += " WHERE t.task_def_tray_id = tdt.task_def_tray_id" 3823 sql += " AND tdt.task_def_tray_id = %s" 3824 sql += " AND t.job_id = %s" 3825 cursor.execute(sql,(tdt_id,job_id)) 3826 row = cursor.fetchone() 3827 if row: 3828 # task exists, update it 3829 task_id = row['task_id'] 3830 sql = " UPDATE task" 3831 sql += " SET last_status = status," 3832 sql += " status = 'STARTING'," 3833 sql += " status_changed = NOW()," 3834 sql += " host = %s," 3835 sql += " start = NOW()," 3836 sql += " finish = NULL" 3837 sql += " WHERE task_id = %s" 3838 cursor.execute(sql,(hostname,task_id)) 3839 3840 # delete old statistics too 3841 sql = " DELETE FROM task_statistics WHERE task_id = %s" 3842 cursor.execute(sql,(task_id,)) 3843 else: 3844 # task doesn't exist, insert it 3845 sql = "INSERT INTO task (task_def_tray_id, job_id, " 3846 sql += " host, status, status_changed, start)" 3847 sql += " VALUES (%s, %s, %s, 'STARTING', NOW(), NOW())" 3848 cursor.execute(sql,(tdt_id,job_id,hostname)) 3849 task_id = self._conn.insert_id() 3850 return task_id
3851
3852 - def task_update_status(self,task_id,status,key='',cursor=None):
3853 if not cursor: 3854 cursor = self.getcursor() 3855 sql = " UPDATE task t,job j" 3856 sql += " SET t.last_status = t.status," 3857 sql += " t.status = %s," 3858 sql += " t.status_changed = NOW()" 3859 sql += " WHERE t.job_id = j.job_id" 3860 sql += " AND t.task_id = %s" 3861 sql += " AND j.passkey = %s" 3862 self.logger.debug("task status update starting for task %d" % task_id) 3863 cursor.execute(sql,(status,task_id,key)) 3864 self.logger.debug("task status update done for task %d" % task_id) 3865 return self._conn.affected_rows()
3866
3867 - def task_is_finished(self,td_id,job_id,tray=None,iter=None):
3868 cursor = self.getcursor() 3869 sql = " SELECT COUNT(*) AS expected" 3870 sql += " FROM task_def_tray tdt" 3871 sql += " WHERE tdt.task_def_id = %s" 3872 params = [td_id] 3873 add_sql = "" 3874 if tray is not None: 3875 add_sql += " AND tdt.idx = %s" 3876 params.append(tray) 3877 if iter is not None: 3878 add_sql += " AND tdt.iter = %s" 3879 params.append(iter) 3880 if add_sql: 3881 sql += add_sql 3882 cursor.execute(sql,params) 3883 row = cursor.fetchone() 3884 if not row: 3885 return False 3886 expected = int(row['expected']) 3887 3888 params.insert(1, job_id) 3889 sql = " SELECT COUNT(*) AS actual" 3890 sql += " FROM task t, task_def_tray tdt" 3891 sql += " WHERE tdt.task_def_tray_id = t.task_def_tray_id" 3892 sql += " AND tdt.task_def_id = %s" 3893 sql += " AND t.job_id = %s" 3894 sql += " AND t.status = 'OK'" 3895 if add_sql: 3896 sql += add_sql 3897 3898 cursor.execute(sql,params) 3899 row = cursor.fetchone() 3900 if not row: 3901 return False 3902 actual = int(row['actual']) 3903 3904 return expected > 0 and actual == expected
3905
3906 - def task_abort(self,task_id,key=''):
3907 cursor = self.getcursor() 3908 ret = self.task_update_status(task_id,'ERROR',key,cursor) 3909 if not ret: 3910 self.logger.error("unable to update status for task %d" % task_id) 3911 return ret 3912 sql = " DELETE FROM task_statistics WHERE task_id = %s" 3913 self.logger.debug("aborting task %d" % task_id) 3914 cursor.execute(sql,(task_id,)) 3915 return True
3916
3917 - def task_finish(self,task_id,stats,key=''):
3918 cursor = self.getcursor() 3919 ret = self.task_update_status(task_id,'OK',key,cursor) 3920 if not ret: 3921 self.logger.error("unable to update status for task %d" % task_id) 3922 return ret 3923 3924 sql = " UPDATE task SET finish = NOW()" 3925 sql += " WHERE task_id=%s" 3926 ret = cursor.execute(sql,(task_id,)) 3927 if not ret: 3928 self.logger.error("unable to set finish time for task %d" % task_id) 3929 return ret 3930 3931 if stats: 3932 inserts = [] 3933 self.logger.debug("stats: %s" % stats) 3934 for name,value in stats.iteritems(): 3935 inserts.append((task_id,name,value)) 3936 sql = " REPLACE INTO task_statistics (task_id,name,value) VALUES (%s,%s,%s)" 3937 cursor.executemany(sql, inserts) 3938 3939 return True
3940 3941
3942 - def get_finished_jobs(self,grid_id,delay=1):
3943 """ 3944 Reset any pending jobs to they get reprocesses. 3945 This would typically be run at startup in case the daemon 3946 crashed previously. 3947 """ 3948 cursor = self.getcursor() 3949 sql = " SELECT * FROM job " 3950 sql += " WHERE grid_id=%d " % grid_id 3951 sql += " AND status='COPYING' " 3952 if delay: 3953 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay 3954 cursor.execute(sql) 3955 resuslts = cursor.fetchall(); 3956 self.commit() 3957 return results
3958
3959 - def SetFileURL(self,queue_id,dataset_id,location,filename,md5sum,filesize,transfertime,key):
3960 """ 3961 Add or change the global location of a file 3962 """ 3963 cursor = self.getcursor() 3964 3965 sql = " SELECT job_id FROM job " 3966 sql += " WHERE dataset_id=%u " % dataset_id 3967 sql += " AND queue_id = %u " % queue_id 3968 sql += " AND passkey='%s' " % key 3969 self.logger.debug(sql) 3970 cursor.execute(sql) 3971 results = cursor.fetchone() 3972 if not results: return 0 3973 3974 sql = " INSERT IGNORE INTO urlpath " 3975 sql += " (dataset_id,queue_id,name,path,md5sum,size,transfertime) VALUES " 3976 sql += " (%u,%u,'%s','%s','%s',%f,%f)" % \ 3977 (dataset_id,queue_id,filename,location,md5sum,filesize,transfertime) 3978 self.logger.debug(sql) 3979 cursor.execute(sql) 3980 if not cursor.rowcount: 3981 sql = " UPDATE urlpath SET " 3982 sql += " name ='%s', " % filename 3983 sql += " path ='%s', " % location 3984 sql += " md5sum ='%s', " % md5sum 3985 sql += " size =%f, " % filesize 3986 sql += " transfertime =%f " % transfertime 3987 sql += " WHERE dataset_id =%u " % dataset_id 3988 sql += " AND queue_id =%u " % queue_id 3989 sql += " AND name ='%s' " % filename 3990 self.logger.debug(sql) 3991 cursor.execute(sql) 3992 return 1
3993 3994
3995 - def getsummary(self,days,groupby=None):
3996 """ 3997 @param days: number of days to get summary from 3998 @param groupby: how to group statistics 3999 """ 4000 cursor = self.getcursor() 4001 sql = " SELECT " 4002 sql += " SUM(job.status = 'OK') as ok , " 4003 sql += " SUM(job.status = 'ERROR') as error , " 4004 sql += " SUM(job.status = 'SUSPENDED') as suspended, " 4005 if groupby: 4006 sql += " job.%s, " % groupby 4007 sql += " SUM(job.time_sys) as sys_t, " 4008 sql += " SUM(job.time_user) as usr_t, " 4009 sql += " SUM(job.time_real) as real_t " 4010 sql += " FROM job " 4011 sql += " WHERE DATE_SUB(CURDATE(),INTERVAL %d DAY) " % days 4012 sql += " < job.status_changed " 4013 sql += " AND job.grid_id != 0 " 4014 sql += " AND job.grid_id IS NOT NULL " 4015 if groupby: 4016 sql += " GROUP BY %s " % groupby 4017 cursor.execute(sql) 4018 return cursor.fetchall();
4019
4020 - def getsummary_stats(self,days,groupby=None):
4021 """ 4022 @param days: number of days to get summary from 4023 @param groupby: how to group statistics 4024 """ 4025 cursor = self.getcursor() 4026 sql = " SELECT job_statistics.name AS name, " 4027 sql += " SUM(job_statistics.value) AS value, " 4028 if groupby: 4029 sql += " job.%s, " % groupby 4030 sql += " SUM(1) AS completed " 4031 sql += " FROM job,job_statistics " 4032 sql += " WHERE DATE_SUB(CURDATE(),INTERVAL %d DAY) " % days 4033 sql += " < job.status_changed " 4034 sql += " AND job.status = 'OK' " 4035 sql += " AND job.dataset_id = job_statistics.dataset_id " 4036 sql += " AND job.queue_id = job_statistics.queue_id " 4037 sql += " AND job.grid_id IS NOT NULL " 4038 sql += " AND job.grid_id != 0 " 4039 if groupby: 4040 sql += " GROUP BY job_statistics.name,%s " % groupby 4041 else: 4042 sql += " GROUP BY job_statistics.name " 4043 cursor.execute(sql) 4044 return cursor.fetchall();
4045 4046
4047 - def getgrid_ids(self):
4048 grid_ids = {} 4049 cursor = self.getcursor() 4050 sql = " SELECT name,grid_id FROM grid " 4051 cursor.execute(sql) 4052 for item in cursor.fetchall(): 4053 grid_ids[item['grid_id']] = item['name'] 4054 return grid_ids
4055
4056 - def getstatus(self,dataset,job=-1):
4057 """ 4058 @param dataset: dataset id 4059 @param job: optional job id 4060 @return: formated string with dataset/job summary 4061 """ 4062 cursor = self.getcursor() 4063 if job >= 0: 4064 sql = " SELECT " 4065 sql += " dataset_id,job_id,status,grid_id,errormessage, " 4066 sql += " ( " 4067 sql += " (job.iter + SUM(IF(tray.tray_index < job.tray, tray.iterations, 0))) " 4068 sql += " /SUM(tray.iterations) " 4069 sql += " ) * 100 AS completion_percent " 4070 sql += " FROM job " 4071 sql += " WHERE dataset_id = %u " % dataset 4072 sql += " AND queue_id = %u " % job 4073 else: 4074 sql = " SELECT " 4075 sql += " SUM(job.status = 'OK') as ok , " 4076 sql += " SUM(job.status = 'ERROR') as error , " 4077 sql += " SUM(job.status = 'SUSPENDED') as suspended, " 4078 sql += " SUM(job.status = 'PROCESSING') as processing, " 4079 sql += " SUM(job.status = 'COPIED') as copied, " 4080 sql += " SUM(job.status = 'READYTOCOPY') as readytocopy, " 4081 sql += " SUM(job.status = 'FAILED') as failed, " 4082 sql += " SUM(job.status = 'QUEUED') as queued, " 4083 sql += " SUM(job.status = 'QUEUING') as queueing " 4084 sql += " FROM job " 4085 sql += " WHERE dataset_id = %u " % dataset 4086 cursor.execute(sql) 4087 return cursor.fetchall()
4088 4089
4090 - def printsummary(self,days):
4091 """ 4092 @param days: number of days to gather information from starting from today 4093 @return: formated string with production summary 4094 """ 4095 div = '+'+('-'*9+'+') 4096 s ='iceprod summary for %s in the last %d days' % (self.db_,days) 4097 s += '-'*10 4098 s += '\n\n' 4099 grid_id = self.getgrid_ids() 4100 row = "| %-7s | " % "grid" 4101 gridsum = self.getsummary(days,'grid_id') 4102 for key in gridsum[0].keys(): 4103 row += "%-7s | " % key[0:min(7,len(key))] 4104 div += ('-'*9+'+') 4105 s += row + '\n' 4106 s += div + '\n' 4107 for entry in gridsum: 4108 gridname = grid_id[entry['grid_id']] 4109 row = "| %-7s | " % gridname[0:min(7,len(gridname))] 4110 for key in gridsum[0].keys(): 4111 row += "%-7.2g | " % entry[key] 4112 s += row + '\n' 4113 s += div + '\n' 4114 totals = self.getsummary(days) 4115 for entry in totals: 4116 row = "| %-7s | " % "Total" 4117 for key in gridsum[0].keys(): 4118 if entry.has_key(key): 4119 row += "%-7.2g | " % entry[key] 4120 else: 4121 row += "%-7.2g | " % 0. 4122 s += row + '\n' 4123 s += div + '\n' 4124 4125 column_size = 10 4126 strfmt = "%%-%ds|" % column_size 4127 gfmt = "%%-%d.2g|" % column_size 4128 dictfmt = "%%%%(%%s)-%ds|" % column_size 4129 s += '\n\n' 4130 gridsum = self.getsummary_stats(days,'grid_id') 4131 newgridsum = {} 4132 grids = ["Total"] 4133 keys = {} 4134 totals = {} 4135 for entry in gridsum: 4136 grid = entry["grid_id"] 4137 name = entry["name"] 4138 gridname = grid_id[grid] 4139 gridname = gridname.strip()[0:min(column_size,len(gridname))] 4140 key = re.sub(r'[^a-z0-9_]','',entry["name"].lower()) 4141 if not keys.has_key(key): 4142 keys[key] = entry["name"][0:min(column_size,len(entry["name"]))] 4143 if ("time" in key) or ("event" in key): 4144 if not newgridsum.has_key(key): 4145 newgridsum[key] = {} 4146 if gridname not in grids: 4147 grids.append(gridname) 4148 try: 4149 newgridsum[key][gridname] = "%2.2g" % float(entry["value"]) 4150 if not newgridsum[key].has_key("Total"): 4151 newgridsum[key]["Total"] = 0.0 4152 newgridsum[key]["Total"] += float(entry["value"]) 4153 except Exception,e: 4154 print e,key 4155 newgridsum[key][gridname] = str(entry["value"]) 4156 rowhdr = "|" + "".join(map(lambda x: strfmt % x, [" "] + grids)) + "\n" 4157 div = "+" + ('-'*column_size+'+')*(len(grids)+1) + "\n" 4158 s += div + rowhdr 4159 for key in newgridsum.keys(): 4160 entry = newgridsum[key] 4161 rowfmt = "|" + strfmt % keys[key] + "".join(map(lambda x: dictfmt % x, grids)) + "\n" 4162 for grid in grids: 4163 if not entry.has_key(grid): entry[grid] = "N/A" 4164 entry["Total"] = "%2.2g" % float(entry["Total"]) 4165 row = rowfmt % entry 4166 s += div + row 4167 totals = self.getsummary_stats(days) 4168 s += div 4169 return s
4170 4171
4172 - def add_history(self,user,command):
4173 """ 4174 Add a history item 4175 """ 4176 cursor = self.getcursor() 4177 sql = " INSERT INTO history (user,cmd,time)" 4178 sql += " VALUES ('%s','%s',NOW())" % (user,command) 4179 cursor.execute(sql) 4180 self.commit()
4181 4182 4183
4184 -class MySQLParamDb(IceProdDB,ParamDb):
4185 4186 """ 4187 Class paramdb uploads parsed IceTrayConfig+Metaproject structure 4188 to the parameter database 4189 4190 """ 4191 logger = logging.getLogger('MySQLParamDb') 4192
4193 - def __init__(self):
4194 4195 IceProdDB.__init__(self) 4196 self.metaprojects = {} 4197 self.projects = {} 4198 self.modules = {} 4199 self.parameters = {} 4200 self.auth_function = lambda x: \ 4201 self.authenticate( 4202 get('host',x.host_), 4203 get('username',x.usr_), 4204 getpass.getpass(), 4205 get('database',x.db_),True) 4206 self.auth_function = lambda x: x 4207 return
4208
4209 - def new(self):
4210 """ 4211 Create a copy of this instance 4212 """ 4213 newconn = MySQLParamDb() 4214 newconn.host_ = self.host_ 4215 newconn.usr_ = self.usr_ 4216 newconn.passwd_ = self.passwd_ 4217 newconn.db_ = self.db_ 4218 newconn._connected = False 4219 return newconn
4220
4221 - def load(self,metaproject):
4222 """ 4223 load contents of metaproject tree to database 4224 @param metaproject: metaproject object 4225 """ 4226 self.connect() 4227 self.load_metaproject(metaproject) 4228 self.load_projects(metaproject) 4229 self.load_mp_pivot(metaproject) 4230 self.load_project_dependencies(metaproject) 4231 self.commit() 4232 return 1 4233 4234
4235 - def download(self):
4236 for metaproject in self.GetMetaProjects(): 4237 self.metaprojects[metaproject.GetId()]= metaproject 4238 for project in self.GetProjects(metaproject.GetId()): 4239 self.projects[project.GetId()] = project 4240 metaproject.AddProject(project.GetName(),project) 4241 4242 for dependency in self.GetProjectDependencies( 4243 project.GetId(), metaproject.GetId()): 4244 project.AddDependency(dependency) 4245 4246 for service in self.GetServices(project.GetId()): 4247 self.modules[service.GetId()] = service 4248 project.AddService(service) 4249 4250 for param in self.GetParameters(service.GetId()): 4251 service.AddParameter(param) 4252 self.parameters[param.GetId()] = param 4253 4254 for module in self.GetModules(project.GetId()): 4255 self.modules[module.GetId()] = module 4256 project.AddModule(module) 4257 4258 for param in self.GetParameters(module.GetId()): 4259 module.AddParameter(param) 4260 self.parameters[param.GetId()] = param 4261 return self.metaprojects 4262
4263 - def adddependencies(self, project, mp,maxrecursion=3):
4264 """ 4265 Recursively add project and its dependencies to metaproject 4266 """ 4267 if maxrecursion < 0: return 4268 for dep in self.GetProjectDependencies(project.GetId(),mp.GetId()): 4269 if dep.GetName() != project.GetName(): 4270 project.AddDependency(dep) 4271 self.adddependencies(dep,mp,maxrecursion-1)
4272
4273 - def SwitchMetaProject(self, iconfig, id, name, version):
4274 """ 4275 Add selected metaproject to configuration. Import modules and their 4276 parameters. 4277 """ 4278 4279 # Create a new metaproject and set version 4280 newmp = MetaProject() 4281 newmp.SetId(id) 4282 newmp.SetName(name) 4283 newmp.SetVersion(version) 4284 4285 # Create a temporary configuration 4286 new_config = IceTrayConfig() 4287 new_config.AddMetaProject(newmp.GetName(),newmp) 4288 4289 # Get all configured modules in previous configuration 4290 for oldmod in iconfig.GetModules(): 4291 for project in self.GetProjectsMM(oldmod,newmp): 4292 newmp.AddProject(project.GetName(),project) 4293 self.adddependencies(project, newmp) 4294 4295 # Get all configured services in previous configuration 4296 for oldserv in iconfig.GetServices(): 4297 for project in self.GetProjectsMM(oldserv,newmp): 4298 newmp.AddProject(project.GetName(),project) 4299 self.adddependencies(project, newmp) 4300 4301 # Get missing projects and dependencies 4302 if iconfig.HasMetaProject(newmp.GetName()): 4303 oldmp = iconfig.GetMetaProject(newmp.GetName()) 4304 for p in self.GetProjects(newmp.GetId()): 4305 if oldmp.HasProject(p.GetName()) and not newmp.HasProject(p.GetName()): 4306 self.logger.debug(p.GetName()) 4307 newmp.AddProject(p.GetName(),p) 4308 4309 # Loop over modules in old configuration and fetch their projects 4310 for oldmod in iconfig.GetModules(): 4311 if not new_config.HasModule(oldmod.GetName()): 4312 depend = self.GetProjectsMM(oldmod,newmp) 4313 if not depend: 4314 oldmod.SetDescription( 'class %s not found in metaproject %s %s' %( 4315 oldmod.GetClass(), 4316 newmp.GetName(), 4317 newmp.GetVersionTxt())) 4318 else: 4319 oldmod.AddProject(depend[0].GetName(),depend[0]) 4320 new_config.AddModule(oldmod) 4321 4322 for oldserv in iconfig.GetServices(): 4323 if not new_config.HasService(oldserv.GetName()): 4324 depend = self.GetProjectsMM(oldserv,newmp) 4325 if not depend: 4326 oldserv.SetDescription('class %s not found for project %s %s' %( 4327 oldserv.GetClass(), 4328 p.GetName(), 4329 p.GetVersionTxt())) 4330 else: 4331 oldserv.AddProject(depend[0].GetName(),depend[0]) 4332 new_config.AddService(oldserv) 4333 4334 # Overwrite metaproject 4335 iconfig.AddMetaProject(newmp.GetName(),newmp) 4336 4337 # Overwrite modules 4338 for newmod in new_config.GetModules(): 4339 iconfig.AddModule(newmod) 4340 4341 # Overwrite services 4342 for newserv in new_config.GetServices(): 4343 iconfig.AddService(newserv) 4344 return newmp
4345 4346
4347 - def GetMetaProjects(self):
4348 mplist = [] 4349 for m in self.fetch_metaproject_list(): 4350 metaproject = MetaProject() 4351 metaproject.SetName(m['name']) 4352 metaproject.SetVersion(m['versiontxt']) 4353 metaproject.SetId(m['metaproject_id']) 4354 mplist.append(metaproject) 4355 return mplist 4356
4357 - def GetProjectsSM(self,module,metaproj):
4358 return self.GetProjectsMM(module,metaproj,'service') 4359
4360 - def SetMetaProjectId(self,metaproj):
4361 sql = " SELECT metaproject.* FROM metaproject " 4362 sql += " WHERE metaproject.name = '%s' " % metaproj.GetName() 4363 sql += " AND metaproject.versiontxt= '%s' " % metaproj.GetVersion() 4364 cursor = self.getcursor() 4365 cursor.execute (sql.strip()); 4366 mp = cursor.fetchone (); 4367 metaproj.SetId(mp['metaproject_id']) 4368 return metaproj 4369
4370 - def GetProjectsMM(self,module,metaproj,modtype='module'):
4371 4372 sql = " SELECT project.* FROM project,module,mp_pivot,metaproject " 4373 sql += " WHERE module.project_id = project.project_id " 4374 sql += " AND mp_pivot.project_id = project.project_id " 4375 sql += " AND mp_pivot.metaproject_id = metaproject.metaproject_id " 4376 sql += " AND metaproject.name = '%s' " % metaproj.GetName() 4377 sql += " AND metaproject.versiontxt = '%s' " % metaproj.GetVersion() 4378 sql += " AND module.class = '%s' " % module.GetClass() 4379 cursor = self.getcursor() 4380 cursor.execute (sql.strip()); 4381 p = cursor.fetchone (); 4382 if not p: return [] 4383 4384 project = Project() 4385 project.SetName(p['name']) 4386 project.SetVersion(p['versiontxt']) 4387 project.SetId(p['project_id']) 4388 project_list = self.GetProjectDependencies(project.GetId(),metaproj.GetId()) 4389 project_list.insert(0,project) 4390 return project_list 4391
4392 - def GetProjects(self,metaproject_id):
4393 plist = [] 4394 for p in self.fetch_project_list(metaproject_id): 4395 project = Container() 4396 project.SetName(p['name']) 4397 project.SetVersion(p['versiontxt']) 4398 project.SetId(p['project_id']) 4399 plist.append(project) 4400 return plist 4401
4402 - def GetProjectDependencies(self,project_id,metaproject_id):
4403 dlist = [] 4404 for d in self.fetch_project_dependencies(project_id,metaproject_id): 4405 dependency = Container() 4406 dependency.SetName(d['name']) 4407 dependency.SetVersion(d['versiontxt']) 4408 dependency.SetId(d['project_id']) 4409 dlist.append(dependency) 4410 return dlist 4411
4412 - def GetServices(self,project_id):
4413 slist = [] 4414 for s in self.fetch_modules_from_project_id(project_id,'service'): 4415 service = Service() 4416 service.SetName(s['name']) 4417 service.SetClass(s['class']) 4418 service.SetId(s['module_id']) 4419 slist.append(service) 4420 return slist 4421
4422 - def GetServicesP(self,name,version):
4423 slist = [] 4424 for s in self.fetch_services_for_project(name,version): 4425 service = Service() 4426 service.SetName(s['name']) 4427 service.SetClass(s['class']) 4428 service.SetId(s['module_id']) 4429 slist.append(service) 4430 return slist 4431 4432
4433 - def GetModules(self,project_id):
4434 mlist = [] 4435 for m in self.fetch_modules_from_project_id(project_id): 4436 module = Module() 4437 module.SetName(m['name']) 4438 module.SetClass(m['class']) 4439 module.SetId(m['module_id']) 4440 mlist.append(module) 4441 return mlist 4442
4443 - def GetModulesP(self,name,version):
4444 mlist = [] 4445 for m in self.fetch_modules_for_project(name,version): 4446 module = Module() 4447 module.SetName(m['name']) 4448 module.SetClass(m['class']) 4449 module.SetId(m['module_id']) 4450 mlist.append(module) 4451 return mlist 4452
4453 - def GetIceProdModules(self):
4454 mlist = [] 4455 for m in self.fetch_iceprodmodules(): 4456 module = IceProdPre() 4457 module.SetName(m['name']) 4458 module.SetClass(m['class']) 4459 module.SetId(m['module_id']) 4460 mlist.append(module) 4461 return mlist 4462 4463
4464 - def GetParameters(self,module_id):
4465 plist = [] 4466 for param in self.fetch_service_parameters(module_id): 4467 parameter = Parameter() 4468 parameter.SetName(param['name']) 4469 parameter.SetType(param['type']) 4470 pid = param['parameter_id'] 4471 parameter.SetId(pid) 4472 4473 if parameter.GetType() == 'OMKeyv' : 4474 parameter.SetValue(self.select_omkey_array(pid)) 4475 elif parameter.GetType() == 'OMKey' : 4476 parameter.SetValue(self.select_omkey(pid)) 4477 elif parameter.GetType() in VectorTypes: 4478 parameter.SetValue(self.select_array(pid)) 4479 else: 4480 parameter.SetValue(Value(param['value'],param['unit'])) 4481 plist.append(parameter) 4482 return plist 4483 4484
4485 - def fetch_metaproject_list(self):
4486 """ 4487 retrive IDs for metaprojects in the database 4488 """ 4489 4490 sql = "SELECT * FROM metaproject" 4491 cursor = self.getcursor() 4492 sql = re.sub('\s+',' ',sql); 4493 cursor.execute (sql); 4494 result_set = cursor.fetchall (); 4495 4496 return map(self.nonify,result_set); 4497 4498
4499 - def fetch_project_list(self,metaproject_id=None):
4500 """ 4501 retrive IDs for projects in the database 4502 @param metaproject_id: table_id of metaproject 4503 """ 4504 4505 sql = """ 4506 SELECT 4507 project.project_id,project.name,project.versiontxt 4508 FROM project,mp_pivot """ 4509 if metaproject_id: 4510 sql += """ WHERE 4511 project.project_id = mp_pivot.project_id 4512 AND mp_pivot.metaproject_id = %s""" % metaproject_id 4513 4514 cursor = self.getcursor() 4515 sql = re.sub('\s+',' ',sql); 4516 cursor.execute (sql); 4517 result_set = cursor.fetchall (); 4518 return map(self.nonify,result_set); 4519
4520 - def fetch_project(self,id,table='project'):
4521 """ 4522 retrieve project with given id 4523 @param id: primary key of project 4524 @param table: table to query (project or metaprojects) 4525 """ 4526 sql="""SELECT * FROM %s WHERE %s_id ='%d' 4527 """ % (table,table,id) 4528 4529 sql = re.sub('\s+',' ',sql) 4530 cursor = self.getcursor() 4531 cursor.execute (sql); 4532 4533 return cursor.fetchall ()
4534 4535
4536 - def fetch_project_id(self,pname,pversion,table='project'):
4537 """ 4538 retrive id for project with matching name, version 4539 (there should only be one) 4540 @param pname: name of project to query 4541 @param pversion: tuple representing the major,minor,patch version 4542 @param table: table to query (project or metaprojects) 4543 """ 4544 sql="""SELECT %s_id FROM %s 4545 WHERE name ='%s' 4546 AND versiontxt ='%s' 4547 """ % (table,table,pname,pversion) 4548 4549 sql = re.sub('\s+',' ',sql) 4550 cursor = self.getcursor() 4551 cursor.execute (sql); 4552 result = cursor.fetchall () 4553 if result: 4554 return int(result[0]['%s_id' % table ]) 4555 else: 4556 self.logger.warn("project \'%s\' not found" % pname) 4557 return
4558 4559 4560 4561
4562 - def fetch_service_id(self,service,pid):
4563 return self.fetch_module_id(service,pid,'service') 4564
4565 - def fetch_module_id(self,module,pid,table='module'):
4566 """ 4567 retrive id for module with matching name, and project id 4568 (there should only be one) 4569 @param module: module to query 4570 """ 4571 if not pid: return None 4572 cname = module.GetClass() 4573 4574 sql =" SELECT module_id FROM module " 4575 sql +=" WHERE class ='%s' " % cname 4576 sql +=" AND project_id ='%d' """ % pid 4577 4578 sql = re.sub('\s+',' ',sql) 4579 self.logger.debug(sql) 4580 cursor = self.getcursor() 4581 cursor.execute (sql); 4582 result = cursor.fetchone() 4583 self.logger.debug(result) 4584 4585 if result: 4586 return int(result['module_id']) 4587 else: 4588 self.logger.error("%s \'%s\' not found" % (table,cname)) 4589 return 4590
4591 - def fetch_project_dependencies(self,project_id,metaproject_id):
4592 """ 4593 retrive dependencys for project 4594 @param project_id: id of project 4595 @return array of project names 4596 """ 4597 dependencies = [] 4598 4599 sql = " SELECT project.* " 4600 sql += " FROM project, project_depend " 4601 sql += " WHERE " 4602 sql += " project.project_id = project_depend.dependency_id " 4603 sql += " AND " 4604 sql += " project_depend.project_id = %d " % project_id 4605 sql += " AND " 4606 sql += " project_depend.metaproject_id = %d " % metaproject_id 4607 4608 cursor = self.getcursor() 4609 cursor.execute (sql.strip()); 4610 result_set = cursor.fetchall (); 4611 4612 return result_set 4613 4614
4615 - def fetch_modules_from_project_id(self,project_id,table='module'):
4616 """ 4617 retrive modules for with a given pid 4618 @param project_id: id of project 4619 """ 4620 sql=""" 4621 SELECT * FROM module WHERE project_id = %s 4622 AND module_type='%s' ORDER BY class 4623 """ % (project_id,table) 4624 4625 cursor = self.getcursor() 4626 sql = re.sub('\s+',' ',sql); 4627 cursor.execute (sql); 4628 result_set = cursor.fetchall (); 4629 4630 return map(self.nonify,result_set); 4631 4632
4633 - def fetch_modules_for_project(self,name,version):
4634 """ 4635 retrive modules for with a project given by 4636 name and version 4637 @param name: name of project 4638 @param version: version tuple 4639 """ 4640 pid = self.fetch_project_id(name,version) 4641 if pid: 4642 return self.fetch_modules_from_project_id(pid) 4643 else: 4644 return [] 4645
4646 - def fetch_services_for_project(self,name,version):
4647 """ 4648 retrive modules for with a project given by 4649 name and version 4650 @param name: name of project 4651 @param version: version tuple 4652 """ 4653 pid = self.fetch_project_id(name,version) 4654 if pid: 4655 return self.fetch_modules_from_project_id(pid,'service') 4656 else: 4657 return [] 4658
4659 - def fetch_iceprodmodules(self):
4660 """ 4661 retrive modules for with a project given by 4662 name and version 4663 """ 4664 pid = self.fetch_project_id('iceprod',iceprod.__version__) 4665 if pid: 4666 return self.fetch_modules_from_project_id(pid,'iceprod') 4667 else: 4668 return [] 4669
4670 - def fetch_module_parameters(self,module_id):
4671 """ 4672 retrive parameters for with a service/module given by 4673 the service/module id 4674 @param module_id: primary key in modules table on db 4675 """ 4676 sql="SELECT * FROM parameter WHERE module_id = %d" % module_id 4677 4678 cursor = self.getcursor() 4679 sql = re.sub('\s+',' ',sql); 4680 cursor.execute (sql); 4681 result_set = cursor.fetchall (); 4682 return map(self.nonify,result_set); 4683
4684 - def fetch_service_parameters(self,service_id):
4685 """ 4686 retrive parameters for with a service/module given by 4687 the service/module id 4688 @param service_id: primary key in services table on db 4689 """ 4690 return self.fetch_module_parameters(service_id) 4691
4692 - def load_mp_pivot(self,metaproject):
4693 """ 4694 Load cross-references between meta-projects (1) and projects 4695 The purpose of this is provide a way simulatneously for projects to 4696 reference which meta-projects they are members of and for 4697 meta-projects to list which projects belong to them (many-to-many). 4698 @param metaproject: Metaproject object 4699 """ 4700 4701 sql = """ 4702 INSERT IGNORE INTO mp_pivot 4703 (metaproject_id,project_id) VALUES 4704 """ 4705 cm = '' 4706 mid = metaproject.GetId() 4707 for p in metaproject.GetProjectList(): 4708 sql += "%s\n(%s,%s)" % (cm,mid,p.GetId()) 4709 cm = ',' 4710 4711 sql = re.sub('\s+',' ',sql) 4712 self.logger.debug(sql) 4713 cursor = self.getcursor() 4714 cursor.execute (sql) 4715 4716 self.logger.debug(self.insert_id()) 4717 self.logger.debug("%d mp_pivot rows were inserted" % cursor.rowcount) 4718 cursor.execute("SHOW WARNINGS") 4719 warn = cursor.fetchone() 4720 if warn: self.logger.warn(warn) 4721 return 4722
4723 - def load_dependencies(self,project,metaproject):
4724 """ 4725 Load cross-references between projects (1) and depency projects 4726 @param project: Project object 4727 """ 4728 4729 sql = " INSERT IGNORE INTO project_depend " 4730 sql += " (project_id,metaproject_id,dependency_id) " 4731 sql += " VALUES " 4732 pid = project.GetId() 4733 mpid = metaproject.GetId() 4734 cursor = self.getcursor() 4735 4736 for p in project.GetDependencies(): 4737 if not p: continue 4738 self.logger.info("%s - getting project dependency: %s" % \ 4739 (project.GetName(),p.GetName())) 4740 if not metaproject.GetProject(p.GetName()): 4741 self.logger.fatal('failed dependency:%s needs %s' % \ 4742 (project.GetName(),p.GetName()) ) 4743 os._exit(1) 4744 did = metaproject.GetProject(p.GetName()).GetId() 4745 sql2 = sql + "(%s,%s,%s)" % (pid,mpid,did) 4746 try: 4747 cursor.execute (sql2) 4748 self.logger.info( 4749 "%d project_depend rows were inserted" % cursor.rowcount) 4750 except Exception, e: 4751 self.logger.error(e) 4752 return 4753 4754 4755
4756 - def load_projects(self,metaproject):
4757 """ 4758 Load projects to database 4759 @param metaproject: MetaProject object 4760 """ 4761 for proj in metaproject.GetProjectList(): 4762 self.load_project(proj) 4763 self.load_services(proj) 4764 self.load_modules(proj) 4765 return 4766
4767 - def load_project_dependencies(self,metaproject):
4768 """ 4769 Load project dependencies to database 4770 @param metaproject: MetaProject object 4771 """ 4772 for proj in metaproject.GetProjectList(): 4773 self.load_dependencies(proj,metaproject) 4774 return 4775
4776 - def load_metaproject(self,metaproject):
4777 return self.load_project(metaproject,"metaproject") 4778 4779
4780 - def load_project(self,project,table="project"):
4781 """ 4782 Load project to database 4783 @param project: the project to be loaded 4784 @param table: table to which project should be loaded 4785 """ 4786 pid = self.fetch_project_id(project.GetName(),project.GetVersion(),table) 4787 self.logger.debug("%s %s.%s pid is %s" % (table,project.GetName(),project.GetVersion(),pid)) 4788 if not pid: 4789 sql = "INSERT INTO %s " % table 4790 sql += "(name, versiontxt,major_version,minor_version,patch_version) " 4791 sql += " VALUES " 4792 4793 ver = project.GetVersion() 4794 name = project.GetName() 4795 vt = ('00','00','00') 4796 # We still need to fill major,minor,path for back 4797 # compatibility 4798 legacy_ver = self.version_regex.search(ver) 4799 if legacy_ver: 4800 legacy_ver = legacy_ver.group(0).replace('V','') 4801 vt = legacy_ver.split('-') 4802 sql += "('%s','%s','%s','%s','%s')" % (name,ver,vt[0],vt[1],vt[2]) 4803 self.logger.debug(sql) 4804 4805 cursor = self.getcursor() 4806 cursor.execute(sql) 4807 pid = self.insert_id() 4808 if pid: 4809 self.logger.debug("inserted id %d" % pid) 4810 else: 4811 self.logger.error( 4812 "could not load project %s to parameter database" % project.GetName()) 4813 4814 project.SetId(pid) 4815 self.logger.debug("%s.GetId(): %s" % (project.GetName(),project.GetId())) 4816 return pid 4817 4818
4819 - def load_modules(self,project):
4820 """ 4821 Load modules into the database. 4822 @param project: Project object 4823 """ 4824 for module in project.GetModules(): 4825 self.load_module(module,project) 4826 self.load_params(module) 4827 return 4828
4829 - def load_services(self,project):
4830 """ 4831 Load services into the database. 4832 @param project: Project object 4833 """ 4834 for service in project.GetServices(): 4835 self.load_service(service,project) 4836 self.load_params(service) 4837 return 4838 4839
4840 - def load_module(self,module,project,type='module'):
4841 """ 4842 Load individual module to database 4843 @param module: object to load 4844 """ 4845 pid = project.GetId() 4846 mid = self.fetch_module_id(module,pid,type) 4847 if not mid: 4848 sql = " INSERT INTO module " 4849 sql += " (name,class,project_id,module_type) " 4850 sql += " VALUES " 4851 sql += " (\'%s\',\'%s\',%d,'%s') " % (module.GetName(),module.GetClass(),pid,type) 4852 4853 self.logger.debug(sql.strip()) 4854 4855 cursor = self.getcursor() 4856 cursor.execute(sql.strip()) 4857 mid = self.insert_id() 4858 4859 cursor.execute("SHOW WARNINGS") 4860 warn = cursor.fetchone() 4861 if warn: self.logger.warn(warn) 4862 4863 cursor.execute("SHOW ERRORS") 4864 err = cursor.fetchone() 4865 if err: self.logger.error(err) 4866 4867 rowcount = cursor.rowcount 4868 if mid: 4869 self.logger.debug("inserted module id %d" % mid) 4870 else: 4871 self.logger.error("could not fetch id for '%s'" % module.GetClass()) 4872 4873 module.SetId(mid) 4874 return mid 4875
4876 - def load_service(self,service,project):
4877 """ 4878 Load individual service to database 4879 @param service: object to load 4880 """ 4881 return self.load_module(service,project,type='service') 4882
4883 - def load_iceprodmodules(self,module,project):
4884 """ 4885 Load individual service to database 4886 @param module: object to load 4887 """ 4888 return self.load_module(service,project,type='iceprod') 4889 4890
4891 - def load_params(self,module):
4892 """ 4893 Load parameters for module or service to database 4894 @param module: object whose parameter will be loaded 4895 """ 4896 cursor = self.getcursor() 4897 sql = " INSERT IGNORE INTO parameter " 4898 sql += " (name,type,unit,description,module_id,value) " 4899 sql += " VALUES " 4900 sql2 = '' 4901 count = 0 4902 4903 m_id = module.GetId() 4904 self.logger.debug('module_id %d'% m_id) 4905 4906 cm = '' 4907 if not module.GetParameters(): return 4908 for p in module.GetParameters(): 4909 name = p.GetName() 4910 type = p.GetType() 4911 desc = p.GetDescription() 4912 desc = re.sub(r'\"','&quote;',desc) # defang quotes for sql 4913 desc = re.sub(r'\'','&quote;',desc) # defang quotes for sql 4914 4915 if type == 'OMKey' or type in VectorTypes: 4916 value = 0 4917 sql1 = sql + " ('%s','%s','%s','%s',%d,'%s') " % \ 4918 (name,type,'NULL',desc,m_id,value) 4919 cursor.execute (sql1.strip()) 4920 pid = self.insert_id() 4921 cursor.execute ('show warnings') 4922 retval = cursor.fetchall () 4923 if not pid: 4924 sql1a = " SELECT parameter_id FROM parameter " 4925 sql1a += " WHERE name='%s' " % name 4926 sql1a += " AND module_id=%d " % m_id 4927 self.logger.debug(sql1a.strip()) 4928 cursor.execute (sql1a.strip()) 4929 pid = cursor.fetchone () 4930 if pid: pid = pid['parameter_id'] 4931 else: raise Exception,"Failed to insert/fetch parameter id" 4932 4933 count = count + cursor.rowcount 4934 if type == 'OMKey': 4935 self.insert_omkey(p.GetValue(),pid) 4936 elif type == 'OMKeyv': 4937 self.insert_omkey_array(p.GetValue(),pid) 4938 else: 4939 self.insert_array(p.GetValue(),pid) 4940 4941 else: 4942 value = p.GetValue().value 4943 unit = self.nullify(p.GetValue().unit) 4944 sql2 += "%s (\'%s\',\'%s\',%s,\"%s\",%d,\'%s\') " % \ 4945 (cm,name,type,unit,desc,m_id,value) 4946 cm = ',' 4947 4948 if sql2: 4949 sql += sql2 4950 self.logger.debug(sql.strip()) 4951 cursor.execute(sql.strip()) 4952 cursor.execute ('show warnings') 4953 retval = cursor.fetchall () 4954 count = count + cursor.rowcount 4955 self.logger.debug("%d parameter rows were inserted " % count)
4956
4957 - def insert_omkey(self,omkey,pid):
4958 cursor = self.getcursor() 4959 sql = " INSERT INTO array_element (name,value,parameter_id) " 4960 sql += " VALUES " 4961 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid) 4962 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid) 4963 self.logger.debug(sql.strip()) 4964 cursor.execute (sql.strip()) 4965
4966 - def insert_omkey_array(self,omkeyvect,pid):
4967 cursor = self.getcursor() 4968 sql = " INSERT INTO array_element (name,value,parameter_id) " 4969 sql += " VALUES " 4970 cm = "" 4971 if len(omkeyvect) < 1: return 4972 for omkey in omkeyvect: 4973 sql += cm 4974 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid) 4975 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid) 4976 cm = "," 4977 cursor.execute (sql.strip()) 4978
4979 - def insert_array(self,values,pid):
4980 cursor = self.getcursor() 4981 sql = " INSERT INTO array_element (value,unit,parameter_id) " 4982 sformat = lambda x: "('%s','%s',%s)" % (x.value,self.nullify(x.unit),pid) 4983 vals = ",".join(map(sformat,values)) 4984 if len(vals) > 0: 4985 sql += " VALUES " + vals 4986 cursor.execute (sql.strip()) 4987
4988 - def select_array(self,pid):
4989 cursor = self.getcursor() 4990 sql = " SELECT * from array_element " 4991 sql += " WHERE parameter_id = %d " % pid 4992 cursor.execute (sql.strip()) 4993 result_set = cursor.fetchall(); 4994 vect = [] 4995 for item in result_set: 4996 vect.append(Value(item['value'],self.nonify(item['unit']))) 4997 return vect 4998
4999 - def select_omkey(self,pid):
5000 omkeys = self.select_omkey_array(pid) 5001 if len(omkeys) < 1: 5002 raise Exception,'could not find omkey for param %d' % pid 5003 return omkeys[0] 5004
5005 - def select_omkey_array(self,pid):
5006 cursor = self.getcursor() 5007 sql = " SELECT * from array_element " 5008 sql += " WHERE parameter_id = %d order by array_element_id" % pid 5009 cursor.execute (sql.strip()) 5010 result_set = cursor.fetchall(); 5011 omkeyvect = [] 5012 for item in result_set: 5013 if item['name'] == 'stringid': 5014 omkey = pyOMKey(0,0) 5015 omkey.stringid = item['value'] 5016 elif item['name'] == 'omid': 5017 omkey.omid = item['value'] 5018 omkeyvect.append(omkey) 5019 else: 5020 raise Exception,'expected omkey but found %s' % result_set[1]['name'] 5021 return omkeyvect 5022
5023 -class SoapParamDB:
5024
5025 - def __init__(self,pdb):
5026 self.paramdb = pdb
5027
5028 - def Register(self,parent):
5029 5030 def LoadParams(metaproject,username,passwd): 5031 db = self.paramdb.new() 5032 db.disconnect() 5033 if parent.auth_db(db,username, passwd,keep_open=True): 5034 retval = db.load(loads(metaproject)) 5035 db.disconnect() 5036 logger.info("Uploaded metaproject for %s %s %s" % (username,db.db_,db.host_)) 5037 return retval 5038 else: 5039 logger.error("Failed to authenticate %s on %s@%s with password" % (username,db.db_,db.host_)) 5040 return 0
5041 parent.server.register_function(LoadParams) 5042 5043 def DownloadParams(): 5044 return dumps(self.paramdb.download())
5045 parent.server.register_function(DownloadParams) 5046 5047 def SwitchMetaProject(iconfig,id,name,version): 5048 return dumps(self.paramdb.SwitchMetaProject(loads(iconfig),id,name,loads(version))) 5049 parent.server.register_function(SwitchMetaProject) 5050 5051 def GetMetaProjects(): 5052 return dumps(self.paramdb.GetMetaProjects()) 5053 parent.server.register_function(GetMetaProjects) 5054 5055 def GetProjectsSM(module,metaproj): 5056 return dumps(self.paramdb.GetProjectsSM(loads(module),loads(metaproj))) 5057 parent.server.register_function(GetProjectsSM) 5058 5059 def GetProjectsMM(module,metaproj): 5060 return dumps(self.paramdb.GetProjectsMM(loads(module),loads(metaproj))) 5061 parent.server.register_function(GetProjectsMM) 5062 5063 def GetProjects(metaproject_id): 5064 return dumps(self.paramdb.GetProjects(metaproject_id)) 5065 parent.server.register_function(GetProjects) 5066 5067 def GetProjectDependencies(project_id,metaproject_id): 5068 return dumps(self.paramdb.GetProjectDependencies(project_id,metaproject_id)) 5069 parent.server.register_function(GetProjectDependencies) 5070 5071 def GetServices(project_id): 5072 return dumps(self.paramdb.GetServices(project_id)) 5073 parent.server.register_function(GetServices) 5074 5075 def GetServicesP(name,version): 5076 return dumps(self.paramdb.GetServicesP(name,loads(version))) 5077 parent.server.register_function(GetServicesP) 5078 5079 def GetModules(project_id): 5080 return dumps(self.paramdb.GetModules(project_id)) 5081 parent.server.register_function(GetModules) 5082 5083 def GetModulesP(name,version): 5084 return dumps(self.paramdb.GetModulesP(name,loads(version))) 5085 parent.server.register_function(GetModulesP) 5086 5087 def GetIceProdModules(): 5088 return dumps(self.paramdb.GetIceProdModules()) 5089 parent.server.register_function(GetIceProdModules) 5090 5091 def GetParameters(module_id): 5092 return dumps(self.paramdb.GetParameters(module_id)) 5093 parent.server.register_function(GetParameters) 5094 5095 def fetch_metaproject_id(name, version): 5096 return self.paramdb.fetch_metaproject_id(name,loads(version)) 5097 parent.server.register_function(fetch_metaproject_id) 5098 5099 def fetch_metaproject_list(): 5100 return dumps(self.paramdb.fetch_metaproject_list()) 5101 parent.server.register_function(fetch_metaproject_list) 5102 5103 def fetch_project_list(metaproject_id): 5104 return dumps(self.paramdb.fetch_project_list(metaproject_id)) 5105 parent.server.register_function(fetch_project_list) 5106 5107 def fetch_project(id): 5108 return dumps(self.paramdb.fetch_project(id)) 5109 parent.server.register_function(fetch_project) 5110 5111 def fetch_project_id(pname,pversion): 5112 return dumps(self.paramdb.fetch_project_id(name,loads(pversion))) 5113 parent.server.register_function(fetch_project_id) 5114 5115 def fetch_service_id(service,pid): 5116 return self.paramdb.fetch_service_id(loads(service),pid) 5117 parent.server.register_function(fetch_service_id) 5118 5119 def fetch_module_id(module,mid): 5120 return self.paramdb.fetch_module_id(loads(module),mid) 5121 parent.server.register_function(fetch_module_id) 5122 5123 def fetch_project_dependencies(project_id,metaproject_id): 5124 return dumps(self.paramdb.fetch_project_dependencies(project_id,metaproject_id)) 5125 parent.server.register_function(fetch_project_dependencies) 5126 5127 def fetch_modules_from_project_id(project_id): 5128 return dumps(self.paramdb.fetch_modules_from_project_id(project_id)) 5129 parent.server.register_function(fetch_modules_from_project_id) 5130 5131 def fetch_modules_for_project(name,version): 5132 return dumps(self.paramdb.fetch_modules_for_project(name,loads(version))) 5133 parent.server.register_function(fetch_modules_for_project) 5134 5135 def fetch_services_for_project(name,version): 5136 return dumps(self.paramdb.fetch_services_for_project(name,loads(version))) 5137 parent.server.register_function(fetch_services_for_project) 5138 5139 def fetch_module_parameters(module_id): 5140 return dumps(self.paramdb.fetch_module_parameters(module_id)) 5141 parent.server.register_function(fetch_module_parameters) 5142 5143 def fetch_service_parameters(module_id): 5144 return dumps(self.paramdb.fetch_service_parameters(module_id)) 5145 parent.server.register_function(fetch_service_parameters) 5146 5147 5148 if __name__ == '__main__': 5149 5150 from xmlparser import IceTrayXMLParser 5151 from xmlwriter import IceTrayXMLWriter 5152 5153 if len(sys.argv) < 2: 5154 print 'Usage: python config.py <xml in> <xml out>' 5155 sys.exit() 5156 5157 # Instantiate the IceTrayConfigConfig 5158 steering = Steering() 5159 5160 i3db = ConfigDB() 5161 passwd = getpass.getpass() 5162 i3db.authenticate('dbs2.icecube.wisc.edu','i3iceprod-uw',passwd,'i3iceprod',True) 5163 runid = int(sys.argv[1]) 5164 i3config = i3db.download_config(runid,include_defaults=True,include_description=False) 5165 i3db.disconnect() 5166 writer = IceTrayXMLWriter(i3config) 5167 writer.write_to_file(sys.argv[2]) 5168