Package iceprod :: Package server :: Module db
[hide private]
[frames] | no frames]

Source Code for Module iceprod.server.db

   1  #!/bin/env python 
   2  # 
   3  """ 
   4   Provides an interface with MySQL job monitoring database  
   5   
   6   copyright  (c) 2008 the icecube collaboration 
   7   
   8   @version: $Revision: $ 
   9   @date: $Date: $ 
  10   @author: Juan Carlos Diaz Velez <juancarlos@icecube.wisc.edu> 
  11  """ 
  12  import re,sys 
  13  import getpass 
  14  import string 
  15  import random 
  16  import iceprod 
  17  import logging 
  18  import types 
  19  import copy 
  20  import math 
  21  import time 
  22  from cPickle import loads,dumps 
  23   
  24  from iceprod.core import logger  
  25  from iceprod.core.dataclasses import * 
  26  from iceprod.core.paramdb  import * 
  27  from iceprod.core.metadata    import * 
  28  from iceprod.core.constants import * 
  29   
  30  import MySQLdb 
  31  from MySQLdb import OperationalError 
  32   
  33  import logging 
  34   
  35   
36 -class IceProdDB:
37 38 logger = logging.getLogger('IceProdDB') 39
40 - def __init__(self):
41 """ 42 Constructor 43 44 """ 45 self._conn = None 46 self._connected = False 47 self._auto = False 48 self.host_ = None 49 self.usr_ = None 50 self.passwd_ = None 51 self.db_ = None 52 self.port_ = 3306 53 self.auth_function = lambda x: None 54 self.version_regex = re.compile(r'[A-Z][0-9][0-9]-[0-9][0-9]-[0-9][0-9]') 55 return
56
57 - def nullify(self,value):
58 if value: 59 return "\'%s\'" % value 60 return 'NULL'
61
62 - def nonify(self,value):
63 if value == 'NULL': 64 return None 65 return value 66
67 - def intcast(self,value):
68 if value == 'NULL' or not value: 69 return 0 70 return int(value) 71
72 - def defang(self,txt):
73 return txt.replace("\'","&quot;").replace("\"","&quot;") 74
75 - def get(self,name,value):
76 sys.stdout.write("%s [%s] : " % (name,value)) 77 str = sys.stdin.readline().strip() 78 if str: 79 return str 80 else: 81 return value
82
83 - def new(self):
84 """ 85 Create a copy of this instance 86 """ 87 newconn = IceProdDB() 88 newconn.host_ = self.host_ 89 newconn.usr_ = self.usr_ 90 newconn.passwd_ = self.passwd_ 91 newconn.db_ = self.db_ 92 newconn._connected = False 93 return newconn
94
95 - def ping(self):
96 """ 97 Ping server to reactivate connection 98 """ 99 if not self.isconnected(): 100 time.sleep(50) 101 raise OperationalError,"Not connected to database" 102 try: 103 self._conn.ping() 104 except OperationalError,e: 105 self.logger.error('%s: will attempt to reconnect.' % str(e)) 106 self._conn = MySQLdb.connect(self.host_,self.usr_,self.passwd_,self.db_,self.port_) 107 self._connected = True
108
109 - def getcursor(self):
110 if self.isconnected(): 111 self.ping() 112 return self._conn.cursor (MySQLdb.cursors.DictCursor) 113 else: 114 self.logger.warn('Not connected to database. Attempting to reconnect..') 115 self.logger.info('delaying for 10 sec.') 116 time.sleep(10) 117 raise OperationalError,"Not connected to database"
118
119 - def commit(self):
120 self.logger.debug("auto-commit set to %s" % self._auto) 121 if not self._auto: 122 return self._conn.commit()
123
124 - def rollback(self):
125 logger.debug("rolling back transaction") 126 return self._conn.rollback()
127
128 - def execute(self,cursor,sql,commit=True):
129 130 for i in range(10): 131 try: 132 cursor.execute(sql); 133 rowcount = self._conn.affected_rows() 134 if commit: self.commit() 135 return rowcount; 136 except OperationalError,e: 137 self.logger.error(e); 138 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 139 return 0
140
141 - def insert_id(self):
142 return self._conn.insert_id()
143
144 - def SetAuthFunc(self,func):
145 self.auth_function = func
146
147 - def authenticate(self,host,usr,passwd,db,keep_open=False,port=3306):
148 """ 149 Database authentication 150 @param host: ip or name of MySQL host 151 @param usr: username 152 @param passwd: account password 153 @param db: name of database 154 @param keep_open: don't close connection after authenticating 155 """ 156 if (self.host_,self.usr_,self.passwd_,self.db_,self.port_) != (host,usr,passwd,db,port): 157 (self.host_,self.usr_,self.passwd_,self.db_,self.port_) = (host,usr,passwd,db,port) 158 self.disconnect() 159 160 try: 161 self.connect() 162 if not keep_open: 163 self.disconnect() 164 return True 165 except Exception,e: 166 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 167 self.logger.error('%s: %s' % (sys.exc_type,e)) 168 self.logger.error('Authentication failed: %s@%s' % (usr,host)) 169 return False 170
171 - def authenticate2(self,host,usr,passwd,db,port=3306):
172 """ 173 Simple database authentication test 174 @param host: ip or name of MySQL host 175 @param usr: username 176 @param passwd: account password 177 @param db: name of database 178 @param keep_open: don't close connection after authenticating 179 """ 180 try: 181 self._conn = MySQLdb.connect(host,usr,passwd,db,port=port) 182 self._conn.close() 183 return True 184 except Exception,e: 185 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 186 self.logger.error('%s: %s' % (sys.exc_type,e)) 187 self.logger.error('Authentication failed: %s@%s' % (usr,host)) 188 return False 189
190 - def set_auto(self,autocommit=True):
191 """ Set auto-commit """ 192 self._auto = autocommit 193 try: 194 self._conn.autocommit(self._auto) 195 except Exception,e: 196 self.logger.warn(e)
197
198 - def connect(self):
199 """ Connect to database """ 200 if self.isconnected(): 201 try: 202 self.ping() 203 return 204 except Exception ,e: 205 self.logger.error(str(e)) 206 self._connected = False 207 try: 208 self._conn = MySQLdb.connect(self.host_,self.usr_,self.passwd_,self.db_,self.port_) 209 self._connected = True 210 except Exception,e: 211 self.logger.debug('%s: %s' % (sys.exc_type,e)) 212 self.logger.error('Connection failed : %s@%s' % (self.usr_,self.host_)) 213 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 214 raise sys.exc_type, e
215
216 - def disconnect(self):
217 """ Disconnect from database """ 218 if self._connected: 219 self._conn.close() 220 self._connected = False
221
222 - def isconnected(self):
223 return self._conn and self._connected
224 225
226 - def mkkey(self,minsize,maxsize):
227 """ 228 Generate random alphanumeric sequence 229 """ 230 key = '' 231 seq = ['a','b','c','d','e','f','g','h','i','j', 232 'k','l','m','n','o','p','q','r','s','t', 233 'u','v','w','x','y','z'] 234 seq += map(string.upper,seq) 235 seq += range(0,9) 236 r = random.Random() 237 size = r.randint(minsize,maxsize) 238 for i in range(0,size): 239 key += str(r.choice(seq)) 240 return key
241 242
243 - def fetch_metaproject_id(self, project_name, project_version):
244 """ 245 Retrieve the metaproject_id for a given metaproject 246 @param project_name: 247 @param project_version: version string 248 """ 249 sql = """ 250 SELECT metaproject_id 251 FROM metaproject 252 WHERE name = '%s' AND versiontxt = '%s' 253 """ % ( project_name, project_version ) 254 255 cursor = self.getcursor() 256 cursor.execute(sql) 257 258 result_set = cursor.fetchall(); 259 if result_set: 260 return result_set[0]['metaproject_id'] 261 else: 262 self.logger.error('could not find an entry for %s %s' % (project_name,str(project_version))) 263 return -1
264 265
266 -class ConfigDB(IceProdDB):
267 268 logger = logging.getLogger('ConfigDB') 269
270 - def __init__(self):
271 """ 272 Constructor 273 """ 274 IceProdDB.__init__(self) 275 self.submitter = '' 276 self.temporary_storage = None 277 self.global_storage = None 278 self.metaproject_dict = { } 279 self.institution = '' 280 return
281
282 - def new(self):
283 """ 284 Create a copy of this instance 285 """ 286 newconn = ConfigDB() 287 newconn.host_ = self.host_ 288 newconn.usr_ = self.usr_ 289 newconn.passwd_ = self.passwd_ 290 newconn.db_ = self.db_ 291 newconn.port_ = self.port_ 292 newconn._connected = False 293 return newconn
294
295 - def SetSubmitter(self,submitter):
296 self.submitter = submitter
297
298 - def SetInstitution(self,institution):
299 self.institution = institution
300
301 - def SetTempStoragePath(self,path):
302 """ 303 Set the temporary path for dataset. 304 @param path: 305 """ 306 self.temporary_storage = path
307
308 - def SetGlobalStoragePath(self,path):
309 """ 310 Set the global path in datawarehouse for dataset. 311 @param path: 312 """ 313 self.global_storage = path
314 315
316 - def updateConfig(self,dataset_id,param_dict):
317 """ 318 update icetray configuration to database 319 @param dataset_id: primary key in production database 320 @param param_dict: Dictionary of parameters to update 321 """ 322 try: 323 sql = """ 324 SELECT * from dataset 325 WHERE dataset_id = %d """ % dataset_id 326 327 cursor = self.getcursor() 328 cursor.execute(sql) 329 result_set = cursor.fetchall(); 330 331 r = result_set[0]; 332 if r["jobs_completed"] == r['jobs_submitted']: 333 param_dict["status"] = 'COMPLETE' # completed 334 self.logger.info("status %d" % param_dict['status']) 335 336 elif r["jobs_failed"] > 0 : 337 param_dict["status"] = 'ERRORS' # some jobs failed 338 self.logger.info("status %d" % param_dict['status']) 339 340 except Exception, e: 341 self.logger.error(" %s could not fetch dataset %d" % (e,dataset_id)) 342 343 try: 344 sql = """UPDATE dataset SET """ 345 346 for key in param_dict.keys(): 347 sql += " %s=%s, " % (key,param_dict[key]) 348 sql += " enddate=NOW() " 349 sql += " WHERE dataset_id = %d" % dataset_id 350 351 self.logger.debug(sql) 352 cursor = self.getcursor() 353 cursor.execute(sql) 354 self.commit() 355 356 except Exception, e: 357 self.logger.error(str(e) + " rolling back transaction" ) 358 self._conn.rollback() 359 raise Exception, e
360 361
362 - def upload_config(self,steering,ticket=0,template=0,maxjobs=0):
363 """ 364 icetray configuration to database 365 @param steering: IceTrayConfig object containing configuration 366 @param ticket: optional ticket ID to relate dataset to 367 @param template: Whether this is a template or not 368 @return: primary key for run on config db 369 """ 370 371 # Do this as a transaction in case one of the table inserts fails 372 #self._conn.begin() 373 dataset_id = None 374 375 try: 376 debug = steering.GetParameter('DEBUG') 377 geo = steering.GetParameter('geometry') 378 if debug: 379 debug = int(debug.GetValue()) 380 else: 381 debug = 0 382 383 simcat_id = self.loadsimcat(steering) 384 parent_id = steering.GetParentId() 385 status = 'PROCESSING' 386 if template: 387 status = 'TEMPLATE' 388 389 sql = """INSERT IGNORE INTO dataset 390 ( 391 simcat_id, 392 startdate, 393 username, 394 institution, 395 description, 396 status, 397 temporary_storage, 398 global_storage, 399 jobs_submitted , 400 ticket_number, 401 parent_id, 402 debug, 403 dataset_category 404 ) 405 VALUES """ 406 407 desc = steering.GetDescription() 408 sql += """( 409 %d, 410 NOW(), 411 \'%s\', 412 \'%s\', 413 \'%s\', 414 \'%s\', 415 \'%s\', 416 \'%s\', 417 %d, 418 %d, 419 %d, 420 %d, 421 \'%s\')""" % \ 422 ( 423 simcat_id, 424 self.submitter, 425 self.institution, 426 re.sub('\'','\\\' ',desc), 427 status, 428 self.temporary_storage, 429 self.global_storage, 430 int(maxjobs), 431 ticket, 432 parent_id, 433 debug, 434 steering.GetDatasetType() 435 ) 436 437 sql = re.sub('\s+',' ',sql) 438 cursor = self.getcursor() 439 cursor.execute(sql) 440 dataset_id = self.insert_id() 441 442 self.load_steering(dataset_id,steering) 443 self.load_steering_dependencies(dataset_id,steering) 444 self.load_job_dependencies(dataset_id,steering) 445 self.load_tasks(dataset_id,steering) 446 self.load_batch_options(dataset_id,steering) 447 self.load_externals(dataset_id,steering) 448 449 tray_index=0 450 tsql = " INSERT INTO tray " 451 tsql += " (dataset_id,tray_index,inputevents,iterations,name,python) " 452 tsql += " VALUES (%s, %s, %s, %s, %s, %s)" 453 for i3config in steering.GetTrays(): 454 # do inserts one at a time so we can get insert IDs as we go 455 params = (dataset_id, tray_index, \ 456 int(i3config.GetEvents()), \ 457 int(i3config.GetIterations()), 458 i3config.GetName(), 459 i3config.GetPython() or 'NULL', 460 ) 461 cursor.execute(tsql, params) 462 tray_id = self._conn.insert_id() 463 464 funcs = {'input': i3config.GetInputFiles, \ 465 'output': i3config.GetOutputFiles} 466 files = [] 467 for type, func in funcs.iteritems(): 468 for file in func(): 469 files.append((tray_id, type, file.GetName(), int(file.IsPhotonicsTable()))) 470 if files: 471 fsql = " INSERT INTO tray_files" 472 fsql += " (tray_id, type, name, photonics)" 473 fsql += " VALUES (%s, %s, %s, %s)" 474 cursor.executemany(fsql, files) 475 476 self.load_projects(dataset_id,i3config,tray_index) 477 self.load_pre(dataset_id,i3config,tray_index) 478 self.load_services(dataset_id,i3config,tray_index) 479 self.load_modules(dataset_id,i3config,tray_index) 480 self.load_connections(dataset_id,i3config,tray_index) 481 self.load_post(dataset_id,i3config,tray_index) 482 tray_index+=1 483 484 if geo: 485 geo_unique = [] 486 for g in geo.GetValue().replace('+',' ').replace('and',' ').replace(',',' ').split(): 487 if g and g not in geo_unique: 488 geo_unique.append(g) 489 self.load_geometry(g,dataset_id) 490 491 except Exception, e: 492 sys.excepthook(sys.exc_type,sys.exc_value,sys.exc_traceback) 493 self.logger.error(str(e) + " rolling back transaction" ) 494 if not self._conn == None: 495 self._conn.rollback() 496 raise Exception, e 497 498 # Commit changes 499 self.commit() 500 self.logger.debug("Successful load configuration to database" ) 501 502 return dataset_id 503
504 - def SetStorageURL(self,dataset,steering):
505 """ 506 Set output path for job 507 """ 508 cursor = self.getcursor() 509 sql = " INSERT IGNORE INTO urlpath " 510 sql += " (dataset_id, name, path) " 511 sql += " VALUES " 512 cm = "" 513 sql1 = "" 514 for p in steering.GetParameters(): 515 if p.GetName().startswith("TARGET::"): 516 sql1 += "%s (%d, '%s', '%s') " % (cm,dataset,p.GetName().replace("TARGET::",""),p.GetValue()) 517 cm ="," 518 if sql1: 519 cursor.execute(sql+sql1)
520 521 522
523 - def get_simcat_categories(self):
524 """ 525 Get list of sim_cat names 526 """ 527 cursor = self.getcursor() 528 sql = " SELECT category FROM simcat " 529 cursor.execute(sql) 530 return map(lambda x: x['category'],cursor.fetchall());
531 532
533 - def getNewSets(self):
534 """ 535 Get a list of new datasets. 536 """ 537 cursor = self.getcursor() 538 sql = " SELECT dataset_id,jobs_submitted FROM dataset " 539 sql += " WHERE status='PROCESSING' " 540 sql += " AND verified='TRUE' " 541 self.logger.debug(sql) 542 cursor.execute(sql) 543 self.commit() 544 return cursor.fetchall();
545
546 - def load_datasetparams(self,paramdict,dataset):
547 """ 548 Load simulation category 549 @return: index of category 550 """ 551 self.logger.debug("loading simulation category") 552 cursor = self.getcursor() 553 554 sql = " INSERT INTO dataset_param " 555 sql += " (dataset_id,name,value) " 556 sql += " VALUES " 557 cm = '' 558 for item in paramdict.items(): 559 sql += " %s(%d,'%s','%s') " % (cm,dataset,item[0],item[1]) 560 cm = ',' 561 cursor.execute(sql) 562 self.commit() 563 564 if paramdict.has_key('geometry'): 565 self.logger.debug("setting geometry") 566 sql = " UPDATE dataset SET geometry = '%s' " % paramdict['geometry'] 567 sql += " WHERE dataset_id = %d " % dataset 568 cursor.execute(sql) 569 self.commit() 570
571 - def load_geometry(self,geo,dataset):
572 self.logger.debug("loading geometry information") 573 cursor = self.getcursor() 574 sql = " INSERT IGNORE INTO geometry " 575 sql += " (dataset_id,name) " 576 sql += " VALUES (%u,'%s') " % (dataset,geo) 577 self.logger.debug(sql) 578 cursor.execute(sql) 579 580
581 - def loadsimcat(self,steering):
582 """ 583 Load simulation category 584 @return: index of category 585 """ 586 self.logger.debug("loading simulation category") 587 category = steering.GetCategory() 588 589 sql = """ 590 SELECT simcat_id from simcat 591 WHERE category = '%s' """ % category 592 593 cursor = self.getcursor() 594 cursor.execute(sql) 595 result_set = cursor.fetchall(); 596 597 if len(result_set) > 0: 598 simcat_id = result_set[0]['simcat_id'] 599 else: 600 601 sql = """INSERT IGNORE INTO simcat 602 (category) VALUES ('%s')""" % category 603 604 cursor = self.getcursor() 605 cursor.execute(sql) 606 simcat_id = self.insert_id() 607 608 return simcat_id 609
610 - def load_externals(self,dataset_id,steering):
611 """ 612 Load external programs to run prior to icetray 613 @param dataset_id: primary key for run on config db 614 """ 615 self.logger.debug("loading externals") 616 externs = steering.GetExterns() 617 if not externs: 618 return 619 620 sql = "INSERT IGNORE INTO extern (" 621 sql += " name,command,version,description,arguments, " 622 sql += " extern.infile,extern.outfile, extern.errfile, " 623 sql += " steering,steering_name, dataset_id) " 624 sql += " VALUES " 625 626 cm = '' 627 for e in externs: 628 sql += "%s\n(" % cm 629 sql += "'%s'," % e.GetName() 630 sql += "'%s'," % e.GetExec() 631 sql += "%s," % self.nullify(e.GetVersion()) 632 sql += "%s," % self.nullify(e.GetDescription()) 633 sql += "%s," % self.nullify(e.GetArgs()) 634 sql += "%s," % self.nullify(e.GetInFile()) 635 sql += "%s," % self.nullify(e.GetOutFile()) 636 sql += "%s," % self.nullify(e.GetErrFile()) 637 if e.GetSteering(): 638 sql += "'%s'," % e.GetSteering()[0].GetText() 639 sql += "'%s'," % e.GetSteering()[0].GetName() 640 else: 641 sql += "NULL,NULL," 642 sql += "%d)" % dataset_id 643 cm = ',' 644 645 sql = re.sub('\s+',' ',sql) 646 cursor = self.getcursor() 647 self.logger.debug(sql) 648 cursor.execute(sql) 649 return 650 651
652 - def load_projects(self,dataset_id,i3config,tray_index):
653 """ 654 Load projects to database 655 @param dataset_id: primary key for run on config db 656 """ 657 self.logger.debug("loading projects") 658 659 # First load metaprojects and their projects 660 load_index=0 661 for mproj in i3config.GetMetaProjectList(): 662 mpid = self.load_metaproject(mproj,dataset_id,tray_index,load_index) 663 load_index += 1 664
665 - def load_steering(self,dataset_id,steering):
666 """ 667 Load steering parameters to database 668 @param dataset_id: primary key for run on config db 669 """ 670 671 sql = """INSERT IGNORE INTO steering_parameter 672 (type, name, value, description,dataset_id) VALUES """ 673 674 cm = '' 675 for p in steering.GetParameters(): 676 type = p.GetType() 677 name = p.GetName() 678 value = p.GetValue() 679 desc = p.GetDescription() 680 sql += "%s\n (\'%s\',\'%s\',\'%s\',\'%s\',%d)" % \ 681 (cm,type,name,value,desc,dataset_id) 682 cm = ',' 683 684 sql = re.sub('\s+',' ',sql) 685 cursor = self.getcursor() 686 cursor.execute(sql) 687
688 - def load_steering_dependencies(self,dataset_id,steering):
689 """ 690 Load file dependencies in steering element to database 691 @param dataset_id: primary key for run on config db 692 """ 693 dependencies = steering.GetDependencies() 694 if not dependencies: 695 return 696 697 sql = """INSERT IGNORE INTO steering_dependency 698 (filename, dataset_id) VALUES """ 699 700 cm = '' 701 for dep in dependencies: 702 d = dep 703 unpack = 'false' 704 if type(dep) in types.StringTypes: 705 d = dep 706 else: 707 d = dep.GetName() 708 unpack = dep.unpack 709 sql += "%s\n (\'%s\',%d)" % (cm,d,dataset_id) 710 cm = ',' 711 712 sql = re.sub('\s+',' ',sql) 713 cursor = self.getcursor() 714 cursor.execute(sql) 715 return 716
717 - def load_steering_statistics(self,dataset_id,steering):
718 """ 719 Load statistics to track in monitoring 720 @param dataset_id: primary key for run on config db 721 """ 722 if not steering.GetStatistics(): 723 return 724 cursor = self.getcursor() 725 sql = "INSERT IGNORE INTO dataset_statistics (name, value) VALUES ('%s',0.0) " 726 727 stats = map(lambda x: (x,), steering.GetStatistics()) 728 cursor.executemany(sql,stats) 729 return 730
731 - def load_job_dependencies(self,dataset_id,steering):
732 """ 733 Load job dependencies in steering element to database 734 @param dataset_id: primary key for run on config db 735 """ 736 if not steering.GetJobDependencies(): return 737 738 cursor = self.getcursor() 739 sql = 'INSERT IGNORE INTO job_dependency (dataset_id,input_dataset,input_job) VALUES (%s,%s,%s)' 740 741 inserts = [] 742 for d in steering.GetJobDependencies(): 743 insert = (dataset_id,d.dataset,d.job) 744 self.logger.debug(sql % insert) 745 inserts.append(insert) 746 cursor.executemany(sql, inserts) 747 return 748
749 - def load_tasks(self,dataset_id,steering):
750 """ 751 Load tasks in steering element to database, used for 752 multi-part simulation jobs 753 @param dataset_id: primary key for run on config db 754 """ 755 tasks = steering.GetTaskDefinitions() 756 if not tasks: 757 return 758 759 # insert all the tasks first 760 sql = "INSERT INTO task_def (dataset_id,name,reqs,opts,parallel,photonics) VALUES (%s,%s,%s,%s,%s,%s)" 761 inserts = [] 762 for name,task in tasks.items(): 763 reqs = task.GetRequirements() 764 opts = task.GetBatchOpts() 765 parallel = int(task.ParallelExecutionEnabled()) 766 photonics = int(task.UsesPhotonics()) 767 self.logger.debug(sql % (dataset_id,name,reqs,opts,parallel,photonics)) 768 self.logger.debug("task %s added to DB, parallel: %s, photonics: %s, reqs: %s" \ 769 % (name,parallel,photonics,reqs)) 770 inserts.append((dataset_id,name,reqs,opts,parallel,photonics)) 771 self.logger.debug(inserts) 772 cursor = self.getcursor() 773 cursor.executemany(sql, inserts) 774 self.logger.debug("task definitions added") 775 776 # now set up the relationships and trays each task runs 777 relationship_sql = "INSERT INTO task_def_rel" \ 778 + " (parent_task_def_id,child_task_def_id)" \ 779 + " VALUES (%s, %s)" 780 tray_sql = "INSERT INTO task_def_tray (task_def_id,idx,iter)" \ 781 + " VALUES (%s,%s,%s)" 782 id_sql = "SELECT task_def_id FROM task_def WHERE dataset_id = %s" \ 783 + " AND name = %s" 784 # grid_task_sql = "INSERT INTO task_def_grid FROM task_def WHERE dataset_id = %s" \ 785 # + " AND name = %s" 786 for name,task in tasks.items(): 787 cursor.execute(id_sql, (dataset_id,name)) 788 row = cursor.fetchone() 789 if not row: 790 self.logger.error("task %s didn't get inserted into DB" % \ 791 name) 792 cursor.rollback() 793 return 794 task_id = row['task_def_id'] 795 796 # add parents 797 inserts = [] 798 parents = task.GetParents() 799 for parent in parents: 800 self.logger.debug("task %s has parent %s" % (name,parent)) 801 cursor.execute(id_sql, (dataset_id,parent)) 802 row = cursor.fetchone() 803 if not row: 804 self.logger.error("referenced parent task %s not found in DB" % \ 805 parent) 806 self.rollback() 807 return 808 parent_id = row['task_def_id'] 809 inserts.append((parent_id,task_id)) 810 cursor.executemany(relationship_sql, inserts) 811 812 # add trays 813 inserts = [] 814 trays = task.GetTrays() 815 for index,tray in trays.items(): 816 for iter in tray.GetIters(): 817 self.logger.debug("task %s has tray %s iter %s" \ 818 % (name,index,iter)) 819 inserts.append((task_id,index,iter)) 820 cursor.executemany(tray_sql, inserts) 821 self.logger.debug("added all tasks") 822 self.commit() 823 return
824
825 - def load_batch_options(self,dataset_id,steering):
826 """ 827 Load batch system options from steering to database 828 @param dataset_id: primary key for run on config db 829 """ 830 batchopts = steering.GetBatchOpts() 831 if not batchopts: 832 return 833 834 sql = """INSERT IGNORE INTO batch_option 835 (name, type, value, dataset_id) VALUES """ 836 837 cm = '' 838 for b in batchopts: 839 name = b.GetName() 840 value = b.GetValue() 841 type = b.GetType() 842 sql += "%s\n (\'%s\',\'%s\',\'%s\',%d)" % \ 843 (cm,name,type,value,dataset_id) 844 cm = ',' 845 846 sql = re.sub('\s+',' ',sql) 847 cursor = self.getcursor() 848 cursor.execute(sql) 849 850
851 - def load_project(self,project,dataset_id,metaproject_id,tray_index,load_index):
852 """ 853 Load project to database 854 @param project: the Project object to be loaded 855 @param dataset_id: primary key for run on config db 856 @return: primary key for projects table on config db 857 858 """ 859 cursor = self.getcursor() 860 pid = self.fetch_project_id(project.GetName(), project.GetVersion()) 861 self.logger.debug("%s.%s pid is %s" % (project.GetName(),project.GetVersion(),pid)) 862 if not pid: 863 ver = project.GetVersion() 864 name = project.GetName() 865 sql = " INSERT IGNORE INTO project " 866 if isinstance(ver,types.StringTypes): 867 sql += "(name, versiontxt,major_version,minor_version,patch_version) " 868 sql += ' VALUES ' 869 870 vt = ('00','00','00') 871 # We still need to fill major,minor,path for back 872 # compatibility 873 legacy_ver = self.version_regex.search(ver) 874 if legacy_ver: 875 legacy_ver = legacy_ver.group(0).replace('V','') 876 vt = legacy_ver.split('-') 877 sql += "('%s','%s','%s','%s','%s')" % (name,ver,vt[0],vt[1],vt[2]) 878 self.logger.debug(sql) 879 else: 880 raise Exception, "incompatible version type: %s" % type(version) 881 cursor.execute(sql.strip()) 882 pid = self.insert_id() 883 self.logger.debug("After insert: %s.%s pid is %s" % (project.GetName(),project.GetVersion(),pid)) 884 if cursor.rowcount: 885 self.logger.debug("inserted id %d" % pid) 886 else: 887 self.logger.warn("could not load project %s " % name) 888 sql = " INSERT IGNORE INTO mp_pivot " 889 sql += " (project_id, metaproject_id) VALUES (%d,%d)" % (pid,metaproject_id) 890 cursor.execute(sql.strip()) 891 892 project.SetId(pid) 893 sql = " INSERT INTO project_pivot " 894 sql += " (project_id, dataset_id,tray_index,load_index) " 895 sql += " VALUES (%d,%d,%d,%d)" % (pid,dataset_id,tray_index,load_index) 896 cursor.execute(sql.strip()) 897 return pid 898
899 - def fetch_project_id(self, project_name,project_version):
900 """ 901 Retrieve the project_id for a given project 902 @param project_name: name of library 903 @param project_version: version string 904 """ 905 sql = " SELECT project_id " 906 sql += " FROM project " 907 sql += " WHERE name = '%s' " % project_name 908 sql += " AND versiontxt = '%s' " % project_version 909 910 cursor = self.getcursor() 911 cursor.execute(sql.strip()) 912 913 result_set = cursor.fetchall(); 914 if result_set: 915 return result_set[0]['project_id']
916 917 918
919 - def load_project_dependencies(self,project,metaproject_id,i3config):
920 """ 921 Load cross-references between projects (1) and depency projects 922 @param project: Project object 923 """ 924 925 sql = """ 926 INSERT IGNORE INTO project_depend 927 (project_id,metaproject_id,dependency_id) VALUES 928 """ 929 cm = '' 930 pid = project.GetId() 931 for p in project.GetDependencies(): 932 self.logger.debug("%s - getting project dependency: %s" % \ 933 (project.GetName(),p)) 934 if i3config.GetProject(p): 935 did = i3config.GetProject(p).GetId() 936 sql += "%s\n(%s,%s,%s)" % (cm,pid,metaproject_id,did) 937 cm = ',' 938 939 if not cm: 940 return 941 942 sql = re.sub('\s+',' ',sql) 943 cursor = self.getcursor() 944 cursor.execute (sql) 945 946 self.logger.debug(self.insert_id()) 947 self.logger.debug( 948 "%d project_dependency rows were inserted" % cursor.rowcount) 949 return 950 951
952 - def load_metaproject(self,metaproject,dataset_id,tray_index,load_index):
953 """ 954 Load metaproject to database 955 @param metaproject: the Project object to be loaded 956 @param dataset_id: primary key for run on config db 957 @return: primary key for projects table on config db 958 959 """ 960 name = metaproject.GetName() 961 version = metaproject.GetVersion() 962 mpid = self.fetch_metaproject_id(name, version) 963 964 if not mpid: 965 raise Exception, "metaproject '%s-%s' not found." % (name,str(version)) 966 967 968 sql = " INSERT IGNORE INTO metaproject_pivot " 969 sql += " (metaproject_id, dataset_id,tray_index,load_index) " 970 sql += " VALUES (%d,%d,%d,%d)""" % (mpid,dataset_id,tray_index,load_index) 971 972 sql = re.sub('\s+',' ',sql) 973 cursor = self.getcursor() 974 cursor.execute(sql) 975 metaproject.SetId(mpid) 976 977 project_load_index=0 978 for proj in metaproject.GetProjectList(): 979 self.load_project(proj,dataset_id,mpid,tray_index,project_load_index) 980 self.logger.debug("%s.%s.GetId() = %s" %(proj.GetName(),proj.GetVersion(),proj.GetId())) 981 project_load_index+=1 982 983 return mpid 984
985 - def load_dependencies(self,module,module_id,i3config):
986 """ 987 Load cross-references between modules (1) and depency projects 988 @param module: Module object 989 """ 990 991 sql = """ 992 INSERT IGNORE INTO module_dependency 993 (module_id,project_id) VALUES 994 """ 995 cm = '' 996 for p in module.GetProjectList(): 997 project = i3config.GetProject(p.GetName()) 998 if project: 999 self.logger.debug("%s - getting module dependency: %s" % \ 1000 (module.GetName(),project.GetName())) 1001 sql += "%s\n(%d,%d)" % (cm,module_id,project.GetId()) 1002 cm = ',' 1003 else: 1004 self.logger.error("project %s not found" % p.GetName()) 1005 1006 if not cm: 1007 return 1008 1009 self.logger.debug(sql.strip()) 1010 cursor = self.getcursor() 1011 cursor.execute (sql) 1012 1013 self.logger.debug(self.insert_id()) 1014 self.logger.debug("%d module_dependency rows were inserted" % cursor.rowcount) 1015 return 1016 1017
1018 - def load_connections(self,dataset_id,i3config,tray_index):
1019 """ 1020 Load module connections to database 1021 @param dataset_id: primary key for run on config db 1022 """ 1023 for con in i3config.GetConnections(): 1024 cid = self.load_connection(con,dataset_id,tray_index) 1025
1026 - def load_connection(self,connection,dataset_id,tray_index):
1027 """ 1028 Load connection to database 1029 @param connection: the Connection object to be loaded 1030 @param dataset_id: primary key for run on config db 1031 @return: primary key for projects table on config db 1032 1033 """ 1034 sql = """INSERT IGNORE INTO `connection` 1035 (source, outbox, destination, inbox, 1036 dataset_id,tray_index) VALUES """ 1037 1038 1039 source = connection.GetOutbox().GetModule() 1040 outbox = connection.GetOutbox().GetBoxName() 1041 destination = connection.GetInbox().GetModule() 1042 inbox = connection.GetInbox().GetBoxName() 1043 1044 sql += "(\'%s\',\'%s\',\'%s\',\'%s\',%d,%d)" % \ 1045 (source,outbox,destination,inbox,dataset_id,tray_index) 1046 sql = re.sub('\s+',' ',sql) 1047 1048 cursor = self.getcursor() 1049 cursor.execute(sql) 1050 cid = self.insert_id() 1051 if cursor.rowcount: 1052 self.logger.debug("inserted id %d into connections table" % cid) 1053 for mesg in cursor.messages: 1054 self.logger.debug("connections: %s " % mesg) 1055 else: 1056 for mesg in cursor.messages: 1057 self.logger.error("connections: %s " % mesg) 1058 return cid 1059
1060 - def load_pre(self,dataset_id,i3config,tray_index):
1061 """ 1062 Load IceProd pre modules into the database. 1063 @param dataset_id: primary key for run on config db 1064 """ 1065 load_index=0 1066 for module in i3config.GetIceProdPres(): 1067 self.load_module(module,dataset_id,load_index,tray_index,i3config,'iceprod','pre') 1068 self.load_params(module,dataset_id,tray_index) 1069 load_index +=1 1070
1071 - def load_post(self,dataset_id,i3config,tray_index):
1072 """ 1073 Load IceProd post modules into the database. 1074 @param dataset_id: primary key for run on config db 1075 """ 1076 load_index=0 1077 for module in i3config.GetIceProdPosts(): 1078 self.load_module(module,dataset_id,load_index,tray_index,i3config,'iceprod','post') 1079 self.load_params(module,dataset_id,tray_index) 1080 load_index +=1 1081
1082 - def load_modules(self,dataset_id,i3config,tray_index):
1083 """ 1084 Load modules into the database. 1085 @param dataset_id: primary key for run on config db 1086 """ 1087 load_index=0 1088 for module in i3config.GetModules(): 1089 self.load_module(module,dataset_id,load_index,tray_index,i3config) 1090 self.load_params(module,dataset_id,tray_index) 1091 load_index +=1 1092 1093
1094 - def load_services(self,dataset_id,i3config,tray_index):
1095 """ 1096 Load services into the database. 1097 @param dataset_id: primary key for run on config db 1098 """ 1099 load_index=0 1100 for service in i3config.GetServices(): 1101 self.load_service(service,dataset_id,load_index,tray_index,i3config) 1102 self.load_params(service,dataset_id,tray_index) 1103 load_index +=1 1104 1105
1106 - def load_module(self,module,dataset_id,load_index,tray_index,i3config,type='module',iptype='tray'):
1107 """ 1108 Load individual module into the database given a run ID. 1109 @param module: the module to be loaded 1110 @param dataset_id: primary key for run on config db 1111 @param load_index: order in which module should be loaded 1112 @param tray_index: tray instance to add module to 1113 @param i3config: Steering instance 1114 @param type: module,service,iceprod 1115 @param iptype: one of tray,pre,post. Serves to distinguish pre and post modules 1116 @return: primary key for modules table on config db 1117 """ 1118 cursor = self.getcursor() 1119 if type == 'iceprod': 1120 pname = 'iceprod' 1121 pver = iceprod.__version__ 1122 pid = self.fetch_project_id(pname,pver) 1123 if not pid: 1124 1125 vt = ('00','00','00') 1126 legacy_ver = self.version_regex.search(pver) 1127 if legacy_ver: 1128 legacy_ver = legacy_ver.group(0).replace('V','') 1129 vt = legacy_ver.split('-') 1130 1131 sql = " INSERT INTO project " 1132 sql += " (name, versiontxt,major_version,minor_version,patch_version) " 1133 sql += " VALUES ('%s','%s','%s','%s','%s')" % (pname,pver,vt[0],vt[1],vt[2]) 1134 cursor.execute(sql) 1135 pid = self.insert_id() 1136 1137 self.logger.debug("load_module: %s " % pid) 1138 else: 1139 if not module.GetProjectList(): 1140 self.logger.error("module %s doesn't have project attrbute" % module.GetName()) 1141 raise Exception, "module %s is missing parent project"% module.GetName() 1142 project = module.GetProjectList()[0] 1143 project = i3config.GetProject(project.GetName()) 1144 pid = project.GetId() 1145 self.logger.debug("load_module: %s " % pid) 1146 1147 self.logger.debug('fectching %s module for project id %s' % (type,pid)) 1148 mid = self.fetch_module_id(module,pid,type) 1149 self.logger.debug('fectched %s module with id %s' % (type,mid)) 1150 if not mid: 1151 sql = " INSERT INTO module " 1152 sql += " (name,class,module_type,project_id) " 1153 sql += " VALUES (\'%s\',\'%s\',\'%s\',%d) " \ 1154 % (module.GetName(),module.GetClass(),type,pid) 1155 1156 self.logger.debug(sql.strip()) 1157 cursor.execute(sql.strip()) 1158 mid = self.insert_id() 1159 if cursor.rowcount: 1160 self.logger.debug("inserted %s id %d" % (type,mid)) 1161 else: 1162 self.logger.debug("failed to insert %s id %d" % (type,mid)) 1163 self.load_dependencies(module,mid,i3config) 1164 sql = " INSERT INTO module_pivot " 1165 sql += " (module_id, name, dataset_id,tray_index,load_index,iptype) " 1166 sql += " VALUES (%d,'%s',%d,%d,%d,'%s') " % ( mid,module.GetName(), 1167 dataset_id,tray_index,load_index,iptype) 1168 self.logger.debug(sql.strip()) 1169 cursor.execute(sql.strip()) 1170 mpid = self.insert_id() 1171 module.SetId(mpid) 1172 return mpid 1173 1174 1175
1176 - def fetch_module_id(self,module,project_id,type='module'):
1177 """ 1178 retrive id for module with matching name, and project_id 1179 (there should only be one) 1180 @param module: module to query 1181 @param project_id: primary key of parent project 1182 @param type: ('module'|'service') 1183 """ 1184 sql = " SELECT module_id FROM module " 1185 sql += " WHERE class ='%s' " % module.GetClass() 1186 sql += " AND module_type ='%s' " % type 1187 sql += " AND project_id =%d " % project_id 1188 1189 self.logger.debug(sql.strip()) 1190 cursor = self.getcursor() 1191 cursor.execute (sql.strip()); 1192 result = cursor.fetchone() 1193 self.logger.debug(str(result)) 1194 if result: 1195 return int(result['module_id']) 1196 else: 1197 self.logger.warn("module \'%s\' not found" % module.GetClass()) 1198 return 1199
1200 - def fetch_service_id(self,service,project_id):
1201 """ 1202 retrive id for service with matching name, and project_id 1203 (there should only be one) 1204 @param service: service to query 1205 @param project_id: primary key of parent project 1206 """ 1207 return self.fetch_module_id(service,project_id,'service') 1208
1209 - def load_service(self,service,dataset_id,load_index,tray_index,i3config):
1210 """ 1211 Load individual service into the database given a run ID. 1212 @param service: the Service object to be loaded 1213 @param dataset_id: primary key for run on config db 1214 @return: primary key for services table on config db 1215 """ 1216 return self.load_module(service,dataset_id,load_index,tray_index,i3config,type='service') 1217
1218 - def insert_omkey(self,omkey,pid):
1219 """ 1220 Add OMKey object 1221 @param omkey: OMKeys 1222 @param pid: configured parameter id or cparameter_id 1223 """ 1224 cursor = self.getcursor() 1225 sql = " INSERT INTO carray_element (name,value,cparameter_id) " 1226 sql += " VALUES " 1227 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid) 1228 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid) 1229 cursor.execute (sql.strip()) 1230
1231 - def insert_omkey_array(self,omkeyvect,pid):
1232 """ 1233 Add array of OMKey objects 1234 @param omkeyvect: list of OMKeys 1235 @param pid: configured parameter id or cparameter_id 1236 """ 1237 if not len(omkeyvect) > 0: return 1238 cursor = self.getcursor() 1239 sql = " INSERT INTO carray_element (name,value,cparameter_id) " 1240 sql += " VALUES " 1241 cm = "" 1242 for omkey in omkeyvect: 1243 sql += cm 1244 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid) 1245 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid) 1246 cm = "," 1247 cursor.execute (sql.strip()) 1248
1249 - def insert_array(self,values,pid):
1250 """ 1251 Add value array 1252 @param values: list of array elements 1253 @param pid: configured parameter id or cparameter_id 1254 """ 1255 cursor = self.getcursor() 1256 if not len(values) > 0: return 1257 sql = " INSERT INTO carray_element (value,unit,cparameter_id) " 1258 sformat = lambda x: "('%s',%s,%s)" % (x.value,self.nullify(x.unit),pid) 1259 vals = ",".join(map(sformat,values)) 1260 sql += " VALUES " + vals 1261 cursor.execute (sql.strip()) 1262 1263
1264 - def load_params(self,module,dataset_id,tray_index):
1265 """ 1266 Load parameters into the database. 1267 @param module: whose parameters are to be loaded to database 1268 """ 1269 cursor = self.getcursor() 1270 sql = " INSERT INTO cparameter " 1271 sql += " (name,type,unit,module_pivot_id,dataset_id,tray_index,value) " 1272 sql += " VALUES " 1273 count = 0 1274 1275 m_id = module.GetId() 1276 self.logger.debug('load_params: mid = %s' % m_id) 1277 1278 if not module.GetParameters(): 1279 return 1280 for p in module.GetParameters(): 1281 name = p.GetName() 1282 type = p.GetType() 1283 desc = p.GetDescription() 1284 1285 if type == 'OMKey' or type in VectorTypes: 1286 value = 0 1287 unit = 'NULL' 1288 else: 1289 value = p.GetValue().value 1290 unit = self.nullify(p.GetValue().unit) 1291 sql1 = sql + " ('%s','%s',%s,%d,%d,%d,'%s') " % \ 1292 (name,type,unit,m_id,dataset_id,tray_index,value) 1293 self.logger.debug(sql1.strip()) 1294 cursor.execute (sql1.strip()) 1295 pid = self.insert_id() 1296 p.SetId(pid) 1297 count = count + cursor.rowcount 1298 1299 if type == 'OMKey': 1300 self.insert_omkey(p.GetValue(),pid) 1301 elif type == 'OMKeyv': 1302 self.insert_omkey_array(p.GetValue(),pid) 1303 elif type in VectorTypes: 1304 self.insert_array(p.GetValue(),pid) 1305 1306 self.logger.debug("%d cparameter rows were inserted" % count) 1307 1308
1309 - def show_dataset_table(self,search_string=""):
1310 """ 1311 DOWNLOAD dataset briefs FROM database 1312 @return: resultset from database 1313 """ 1314 sql = " SELECT * FROM dataset " 1315 if search_string and len(search_string): 1316 sql += " WHERE username LIKE '%%%s%%' " % search_string 1317 sql += " OR hostname LIKE '%%%s%%' " % search_string 1318 sql += " OR description LIKE '%%%s%%' " % search_string 1319 sql += " OR startdate LIKE '%%%s%%' " % search_string 1320 for token in search_string.split(): 1321 try: 1322 sql += " OR dataset_id = %d " % int(token) 1323 except: pass 1324 sql += " ORDER BY dataset_id DESC " 1325 1326 cursor = self.getcursor() 1327 cursor.execute(sql) 1328 result_set = cursor.fetchall(); 1329 return result_set 1330 1331 1332
1333 - def download_config(self,dataset_id, include_defaults=False,include_description=False):
1334 """ 1335 DOWNLOAD icetray configuration FROM database 1336 @param dataset_id: ID of the run whose configuration we whish to download 1337 @return: IceTrayConfig object containing the IceTray configuration 1338 """ 1339 1340 steering = self.download_steering(dataset_id) 1341 category = self.getsimcat(dataset_id) 1342 steering.SetCategory(category) 1343 self.download_steering_dependencies(dataset_id,steering) 1344 self.download_steering_statistics(dataset_id,steering) 1345 self.download_job_dependencies(dataset_id,steering) 1346 self.download_tasks(dataset_id,steering) 1347 self.download_batch_options(dataset_id,steering) 1348 self.download_externals(dataset_id,steering) 1349 1350 # Set the description field 1351 sql = "SELECT * FROM dataset WHERE dataset_id = %d" % dataset_id 1352 cursor = self.getcursor() 1353 cursor.execute(sql) 1354 result = cursor.fetchone(); 1355 if result: 1356 steering.SetDescription(result['description']) 1357 steering.SetParentId(result['dataset_id']) 1358 1359 # Get Trays 1360 sql = "SELECT * FROM tray WHERE dataset_id = %d" % dataset_id 1361 cursor = self.getcursor() 1362 cursor.execute(sql) 1363 trayitems = cursor.fetchall(); 1364 for tray in trayitems: 1365 i3config = IceTrayConfig() 1366 1367 tray_id = tray['tray_id'] 1368 tray_index = tray['tray_index'] 1369 i3config.SetEvents(tray['inputevents']) 1370 i3config.SetIterations(tray['iterations']) 1371 i3config.SetName(self.nonify(tray['name'])) 1372 i3config.SetPython(self.nonify(tray['python'])) 1373 1374 funcs = {'input': i3config.AddInputFile, \ 1375 'output': i3config.AddOutputFile} 1376 fsql = "SELECT type, name, photonics FROM tray_files WHERE tray_id = %s" 1377 cursor.execute(fsql, (tray_id,)) 1378 files = cursor.fetchall() 1379 for file in files: 1380 type = file['type'].lower() 1381 obj = IceTrayFile(file['name'], file["photonics"]) 1382 func = funcs[type] 1383 func(obj) 1384 1385 self.download_metaprojects(dataset_id,tray_index,i3config) 1386 self.download_projects(dataset_id,tray_index,i3config) 1387 self.download_pre(dataset_id,tray_index,i3config,include_defaults,include_description) 1388 self.download_services(dataset_id,tray_index,i3config,include_defaults,include_description) 1389 self.download_modules(dataset_id,tray_index,i3config, include_defaults=include_defaults,include_description=include_description) 1390 self.download_connections(dataset_id,tray_index,i3config) 1391 self.download_post(dataset_id,tray_index,i3config,include_defaults,include_description) 1392 steering.AddTray(i3config) 1393 1394 self.commit() 1395 return steering 1396 1397
1398 - def download_metaprojects(self,dataset_id,tray_index,i3config):
1399 """ 1400 Download metaprojects from database 1401 Download projects from database 1402 @param dataset_id: ID of the run whose configuration we wish to download 1403 """ 1404 sql = " SELECT metaproject.* " 1405 sql += " FROM metaproject,metaproject_pivot " 1406 sql += " WHERE metaproject_pivot.dataset_id = %d " % dataset_id 1407 sql += " AND metaproject.metaproject_id = " 1408 sql += " metaproject_pivot.metaproject_id " 1409 sql += " AND metaproject_pivot.tray_index = %d " % tray_index 1410 sql += " ORDER BY load_index " 1411 1412 cursor = self.getcursor() 1413 self.logger.debug(sql) 1414 cursor.execute(sql.strip()) 1415 result_set = cursor.fetchall(); 1416 for mp in result_set: 1417 mpname = mp['name'] 1418 mpver = mp['versiontxt'] 1419 mp_id = mp['metaproject_id'] 1420 mproject = MetaProject() 1421 mproject.SetId(mp_id) 1422 1423 self.logger.debug("downloaded metaproject %s with id %d" % \ 1424 (mproject.GetName(),mproject.GetId())) 1425 1426 mproject.SetName(mpname) 1427 mproject.SetVersion(mpver) 1428 1429 #add metaproject to local dictonary for reference 1430 i3config.AddMetaProject(mproject.GetName(),mproject) 1431 self.metaproject_dict[mp_id] = mproject 1432
1433 - def download_externals(self,dataset_id,steering):
1434 """ 1435 Download Load external programs to run prior to icetray 1436 @param dataset_id: primary key for run on config db 1437 """ 1438 self.logger.debug("downloading externals") 1439 1440 cursor = self.getcursor() 1441 1442 sql = "SELECT * FROM extern " 1443 sql += "WHERE dataset_id = %d " % dataset_id 1444 cursor.execute(sql.strip()) 1445 result_set = cursor.fetchall(); 1446 for e in result_set: 1447 extern = Extern() 1448 extern.SetName(self.nonify(e['name'])) 1449 extern.SetVersion(self.nonify(e['version'])) 1450 extern.SetExec(self.nonify(e['command'])) 1451 extern.SetDescription(self.nonify(e['description'])) 1452 extern.SetArgs(self.nonify(e['arguments'])) 1453 extern.SetInFile(self.nonify(e['infile'])) 1454 extern.SetOutFile(self.nonify(e['outfile'])) 1455 extern.SetErrFile(self.nonify(e['errfile'])) 1456 if self.nonify(e['steering_name']): 1457 es = ExternSteering() 1458 es.SetName(e['steering_name']) 1459 es.SetText(e['steering']) 1460 extern.AddSteering(es) 1461 steering.AddExtern(extern) 1462 1463 return 1464
1465 - def fetch_dict_value(self, key):
1466 """ 1467 Retrieve value stored in dictionary 1468 @param key: string key to dictionary entry 1469 """ 1470 sql = " SELECT value FROM dictionary WHERE " 1471 sql += " keystring = '%s' " % key 1472 cursor = self.getcursor() 1473 cursor.execute(sql.strip()) 1474 result_set = cursor.fetchone(); 1475 if result_set: 1476 return result_set['value'] 1477 else: 1478 return ''
1479
1480 - def fetch_filename(self, key,dataset_id=0):
1481 """ 1482 Retrieve value stored in dictionary 1483 @param key: string key to dictionary entry 1484 """ 1485 sql = " SELECT * FROM file " 1486 sql += " WHERE file_number = %d " % key 1487 if dataset_id: 1488 sql += " AND dataset_id = %d " % dataset_id 1489 cursor = self.getcursor() 1490 cursor.execute(sql.strip()) 1491 result_set = cursor.fetchone(); 1492 if result_set: 1493 return result_set 1494 else: 1495 return ''
1496
1497 - def fetch_project_list(self,dataset_id):
1498 """ 1499 Download projects from database 1500 @param dataset_id: ID of the run whose configuration we wish to download 1501 """ 1502 projects = [] 1503 1504 sql = " SELECT metaproject_pivot.metaproject_id,project.*" 1505 sql += " FROM metaproject_pivot,project,project_pivot,mp_pivot" 1506 sql += " WHERE metaproject_pivot.dataset_id = %d " % dataset_id 1507 sql += " AND project.project_id = mp_pivot.project_id " 1508 sql += " AND project.project_id = project_pivot.project_id " 1509 sql += " AND metaproject_pivot.metaproject_id = mp_pivot.metaproject_id " 1510 sql += " AND project_pivot.dataset_id = metaproject_pivot.dataset_id " 1511 sql += " ORDER BY metaproject_pivot.load_index,project_pivot.load_index " 1512 cursor = self.getcursor() 1513 self.logger.debug(sql.strip()) 1514 cursor.execute(sql.strip()) 1515 result_set = cursor.fetchall(); 1516 1517 for p in result_set: 1518 pname = p['name'] 1519 pver = p['versiontxt'] 1520 mp_id = p['metaproject_id'] 1521 project = Project() 1522 project.SetId(p['project_id']) 1523 self.logger.debug("downloaded project %s with id %d" % (project.GetName(),project.GetId())) 1524 project.SetName(pname) 1525 project.SetVersion(pver) 1526 projects.append(project) 1527 return projects 1528
1529 - def download_projects(self,dataset_id,tray_index,i3config):
1530 """ 1531 Download projects from database 1532 @param dataset_id: ID of the run whose configuration we wish to download 1533 """ 1534 1535 sql = " SELECT metaproject_pivot.metaproject_id,project.*" 1536 sql += " FROM metaproject_pivot,project,project_pivot,mp_pivot" 1537 sql += " WHERE metaproject_pivot.dataset_id = %d " % dataset_id 1538 sql += " AND project.project_id = mp_pivot.project_id " 1539 sql += " AND project.project_id = project_pivot.project_id " 1540 sql += " AND metaproject_pivot.metaproject_id = mp_pivot.metaproject_id " 1541 sql += " AND project_pivot.dataset_id = metaproject_pivot.dataset_id " 1542 sql += " AND metaproject_pivot.tray_index = project_pivot.tray_index " 1543 sql += " AND project_pivot.tray_index = %d " % tray_index 1544 sql += " ORDER BY metaproject_pivot.load_index,project_pivot.load_index " 1545 cursor = self.getcursor() 1546 self.logger.debug(sql.strip()) 1547 cursor.execute(sql.strip()) 1548 result_set = cursor.fetchall(); 1549 1550 for p in result_set: 1551 pname = p['name'] 1552 pver = p['versiontxt'] 1553 mp_id = p['metaproject_id'] 1554 project = Project() 1555 project.SetId(p['project_id']) 1556 self.logger.debug("downloaded project %s with id %d" % (project.GetName(),project.GetId())) 1557 1558 for d in self.fetch_project_dependencies(project.GetId(),mp_id): 1559 self.logger.debug(" %s - adding dependency %s" % (project.GetName(),d.GetName())) 1560 project.AddDependency(d.GetName()) 1561 project.SetName(pname) 1562 project.SetVersion(pver) 1563 try: 1564 metaproject = self.metaproject_dict[mp_id] 1565 self.logger.debug("found metaproject %s with id %d" % (metaproject.GetName(),mp_id)) 1566 metaproject.AddProject(pname,project) 1567 1568 if not i3config.HasMetaProject(metaproject.GetName()): 1569 i3config.AddMetaProject(metaproject.GetName(),metaproject) 1570 self.logger.debug("adding metaproject - %s" % metaproject.GetName()) 1571 1572 except KeyError, k: 1573 self.logger.warn("could not find metaproject with id %d" % mp_id) 1574 self.logger.warn("Adding project to top-level container.") 1575 i3config.AddProject(pname,project) 1576 1577
1578 - def fetch_project_dependencies(self,project_id,metaproject_id):
1579 """ 1580 retrive dependencies for project 1581 @param project_id: id of project 1582 @return array of project names 1583 """ 1584 dependencies = [] 1585 1586 sql = """ 1587 SELECT 1588 project_depend.project_depend_id, 1589 project.name, 1590 project.project_id, 1591 project.versiontxt 1592 FROM 1593 project,project_depend 1594 WHERE 1595 project.project_id = project_depend.dependency_id 1596 AND 1597 project_depend.project_id = %d 1598 AND 1599 project_depend.metaproject_id = %d 1600 ORDER BY 1601 project_depend.project_depend_id 1602 """ % (project_id,metaproject_id) 1603 1604 cursor = self.getcursor() 1605 sql = re.sub('\s+',' ',sql); 1606 self.logger.debug(sql); 1607 cursor.execute (sql); 1608 result_set = cursor.fetchall (); 1609 1610 for d in result_set: 1611 dependency = Project() 1612 dependency.SetName(d['name']) 1613 dependency.SetVersion(d['versiontxt']) 1614 1615 dependency.SetId(d['project_depend_id']) 1616 dependencies.append(dependency) 1617 1618 return dependencies 1619 1620
1621 - def fetch_module_dependencies(self,module_id):
1622 """ 1623 retrive dependencies for module 1624 @param module_id: id of module 1625 @return array of project names 1626 """ 1627 dependencies = [] 1628 1629 sql = """ 1630 SELECT 1631 module_dependency.module_dependency_id, 1632 project.name, 1633 project.project_id, 1634 project.versiontxt 1635 FROM 1636 project,module_dependency 1637 WHERE 1638 project.project_id = module_dependency.project_id 1639 AND 1640 module_dependency.module_id = %d 1641 ORDER BY 1642 module_dependency.module_dependency_id """ % module_id 1643 1644 cursor = self.getcursor() 1645 sql = re.sub('\s+',' ',sql); 1646 cursor.execute (sql); 1647 result_set = cursor.fetchall (); 1648 1649 for d in result_set: 1650 dependency = Project() 1651 dependency.SetName(d['name']) 1652 dependency.SetVersion(d['versiontxt']) 1653 dependency.SetId(d['project_id']) 1654 dependencies.append(dependency) 1655 1656 return dependencies 1657 1658
1659 - def download_connections(self,dataset_id,tray_index,i3config):
1660 """ 1661 Download module connections from database 1662 @param dataset_id: ID of the run whose configuration we whish to download 1663 """ 1664 sql = " SELECT * FROM `connection` " 1665 sql += " WHERE dataset_id = %d " % dataset_id 1666 sql += " AND tray_index = %d " % tray_index 1667 cursor = self.getcursor() 1668 cursor.execute(sql) 1669 result_set = cursor.fetchall(); 1670 for c in result_set: 1671 csource = c['source'] 1672 coutbox = c['outbox'] 1673 cdest = c['destination'] 1674 cinbox = c['inbox'] 1675 1676 conn = Connection() 1677 conn.From(csource,coutbox) 1678 conn.To(cdest,cinbox) 1679 i3config.AddConnection(conn) 1680
1681 - def getsimcat(self,dataset_id):
1682 """ 1683 Get simulation category 1684 @param dataset_id: dataset ID 1685 @return: category 1686 """ 1687 self.logger.debug("retrieving simulation category") 1688 1689 sql = """ 1690 SELECT simcat.category from simcat,dataset 1691 WHERE simcat.simcat_id = dataset.simcat_id 1692 AND dataset.dataset_id = %d """ % dataset_id 1693 1694 cursor = self.getcursor() 1695 cursor.execute(sql) 1696 result_set = cursor.fetchall(); 1697 1698 if len(result_set) > 0: 1699 return result_set[0]['category'] 1700
1701 - def download_steering(self,dataset_id):
1702 """ 1703 Get steering parameters from database 1704 @param dataset_id: ID of the run whose configuration we whish to download 1705 """ 1706 steering = Steering() 1707 sql = " SELECT * FROM steering_parameter " 1708 sql += " WHERE dataset_id = '%s'" % dataset_id 1709 sql += " ORDER by name " 1710 cursor = self.getcursor() 1711 cursor.execute(sql) 1712 result_set = cursor.fetchall(); 1713 1714 for p in result_set: 1715 param = Parameter() 1716 param.SetType(p['type']) 1717 param.SetName(p['name']) 1718 param.SetValue(p['value']) 1719 steering.AddParameter(param) 1720 return steering 1721
1722 - def get_steering_parameter(self,dataset_id,param):
1723 sql = " SELECT value FROM steering_parameter " 1724 sql += " WHERE name = '%s'" % param 1725 sql += " AND dataset_id = '%s'" % dataset_id 1726 cursor = self.getcursor() 1727 cursor.execute(sql) 1728 result = cursor.fetchone(); 1729 if result: 1730 return result['value'] 1731
1732 - def download_steering_dependencies(self,dataset_id,steering):
1733 """ 1734 Get steering dependencies from database 1735 @param dataset_id: ID of the run whose configuration we whish to download 1736 """ 1737 sql = "SELECT * FROM steering_dependency WHERE dataset_id = '%s'" % dataset_id 1738 cursor = self.getcursor() 1739 cursor.execute(sql) 1740 result_set = cursor.fetchall(); 1741 1742 for p in result_set: 1743 steering.AddDependency(p['filename']) 1744
1745 - def download_steering_statistics(self,dataset_id,steering):
1746 """ 1747 Get statistics that are to be tracked in database 1748 @param dataset_id: ID of the run whose configuration we whish to download 1749 """ 1750 sql = "SELECT * FROM dataset_statistics WHERE dataset_id = '%s'" % dataset_id 1751 cursor = self.getcursor() 1752 cursor.execute(sql) 1753 result_set = cursor.fetchall(); 1754 1755 for p in result_set: 1756 steering.AddStatistic(p['name']) 1757
1758 - def download_job_dependencies(self,dataset_id,steering):
1759 """ 1760 Get job dependency rules 1761 @param dataset_id: ID of the run whose configuration we whish to download 1762 """ 1763 sql = "SELECT * FROM job_dependency WHERE dataset_id = %s " 1764 self.logger.debug( sql % (dataset_id,) ) 1765 cursor = self.getcursor() 1766 cursor.execute(sql, (dataset_id,)) 1767 results = cursor.fetchall() 1768 for row in results: 1769 steering.AddJobDependency(row['input_dataset'],row['input_job']) 1770 return 1771
1772 - def download_tasks(self,dataset_id,steering):
1773 """ 1774 Get job parts from database 1775 @param dataset_id: ID of the run whose configuration we whish to download 1776 """ 1777 1778 sql = "SELECT task_def_id,name,reqs,opts,parallel,photonics,grids FROM task_def" \ 1779 + " WHERE dataset_id = %s ORDER BY task_def_id" 1780 cursor = self.getcursor() 1781 cursor.execute(sql, (dataset_id,)) 1782 results = cursor.fetchall() 1783 1784 tray_sql = "SELECT idx,CONVERT(GROUP_CONCAT(iter),char) AS iters" \ 1785 + " FROM task_def_tray WHERE task_def_id = %s" \ 1786 + " GROUP BY idx,task_def_id" 1787 parent_sql = "SELECT name FROM task_def,task_def_rel" \ 1788 + " WHERE child_task_def_id = %s" \ 1789 + " AND parent_task_def_id = task_def_id" 1790 child_sql = "SELECT name FROM task_def,task_def_rel" \ 1791 + " WHERE parent_task_def_id = %s" \ 1792 + " AND child_task_def_id = task_def_id" 1793 1794 for row in results: 1795 id = row['task_def_id'] 1796 name = row['name'] 1797 reqs = row['reqs'] 1798 opts = row['opts'] 1799 parallel = row['parallel'] 1800 photonics = row['photonics'] 1801 grids = row['grids'] 1802 1803 td = TaskDefinition(name,id) 1804 td.SetRequirements(reqs) 1805 if opts: td.SetBatchOpts(opts) 1806 td.SetParallelExecution(parallel) 1807 td.SetUsesPhotonics(photonics) 1808 #for grid in grids.split(","): td.AddGrid(grid) 1809 1810 self.logger.debug(tray_sql % id) 1811 cursor.execute(tray_sql, (id,)) 1812 trays = cursor.fetchall() 1813 for tray in trays: 1814 td.AddTray(tray['idx'], tray['iters']) 1815 1816 cursor.execute(parent_sql, (id,)) 1817 parents = cursor.fetchall() 1818 for parent in parents: 1819 td.AddParent(parent['name']) 1820 1821 cursor.execute(child_sql, (id,)) 1822 children = cursor.fetchall() 1823 for child in children: 1824 td.AddChild(child['name']) 1825 1826 steering.AddTaskDefinition(td)
1827
1828 - def fetch_batch_options(self,dataset_id,steering):
1829 """ 1830 Fetch batch system options from database 1831 @param dataset_id: ID of the run whose configuration we whish to download 1832 """ 1833 batchopts = [] 1834 sql = "SELECT * FROM batch_option WHERE dataset_id = '%s'" % dataset_id 1835 cursor = self.getcursor() 1836 cursor.execute(sql) 1837 result_set = cursor.fetchall() 1838 for b in result_set: 1839 opt = BatchOpt() 1840 opt.SetName(b['name']) 1841 opt.SetType(b['type']) 1842 opt.SetValue(b['value']) 1843 batchopts.append(opt) 1844 steering.AddBatchOpt(opt) 1845 return batchopts 1846
1847 - def download_batch_options(self,dataset_id,steering):
1848 """ 1849 Get batch system options from database 1850 @param dataset_id: ID of the run whose configuration we whish to download 1851 """ 1852 sql = "SELECT * FROM batch_option WHERE dataset_id = '%s'" % dataset_id 1853 cursor = self.getcursor() 1854 cursor.execute(sql) 1855 result_set = cursor.fetchall(); 1856 1857 for b in result_set: 1858 opt = BatchOpt() 1859 opt.SetName(b['name']) 1860 opt.SetType(b['type']) 1861 opt.SetValue(b['value']) 1862 steering.AddBatchOpt(opt) 1863 1864
1865 - def download_modules(self,dataset_id, 1866 tray_index, 1867 i3config, 1868 type='module', 1869 iptype='tray', 1870 include_defaults=False, 1871 include_description=False):
1872 """ 1873 Get modules from the database. 1874 @param dataset_id: ID of the run whose configuration we whish to download 1875 """ 1876 sql = " SELECT module.class,module.module_id,module_pivot.module_pivot_id," 1877 sql += " module_pivot.name,module_pivot.load_index " 1878 sql += " FROM module,module_pivot " 1879 sql += " WHERE module_pivot.dataset_id = %d " % dataset_id 1880 sql += " AND module.module_type = '%s' " % type 1881 sql += " AND module_pivot.iptype = '%s' " % iptype 1882 sql += " AND module.module_id = module_pivot.module_id " 1883 sql += " AND module_pivot.tray_index = %d " % tray_index 1884 sql += " ORDER BY load_index " 1885 1886 cursor = self.getcursor() 1887 self.logger.debug(sql.strip()) 1888 cursor.execute(sql.strip()) 1889 result_set = cursor.fetchall(); 1890 for m in result_set: 1891 mod = Service() 1892 1893 mod.SetClass(m['class']) 1894 mod.SetName(m['name']) 1895 mod.SetId(m['module_id']) 1896 module_pivot_id = m['module_pivot_id'] 1897 if type == 'module': 1898 i3config.AddModule(mod) 1899 elif type == 'service': 1900 i3config.AddService(mod) 1901 elif type == 'iceprod': 1902 if iptype == 'pre': 1903 i3config.AddIceProdPre(mod) 1904 elif iptype == 'post': 1905 i3config.AddIceProdPost(mod) 1906 1907 if type in ['module','service']: 1908 for p in self.fetch_module_dependencies(mod.GetId()): 1909 project = i3config.GetProject(p.GetName()) 1910 if not project == None: 1911 mod.AddProject(project.GetName(),project) 1912 else: 1913 self.logger.warn('could not find dependency \'%s\'' % p.GetName() ) 1914 i3config.AddProject(p.GetName(),p) 1915 mod.AddProject(p.GetName(),p) 1916 1917 self.download_params(mod,module_pivot_id,dataset_id,include_defaults,include_description) 1918 1919
1920 - def download_services(self,dataset_id,tray_index,i3config, 1921 include_defaults=False,include_description=False):
1922 """ 1923 Download services from the database. 1924 @param dataset_id: ID of the run whose configuration we whish to download 1925 """ 1926 return self.download_modules(dataset_id, 1927 tray_index, 1928 i3config, 1929 type='service', 1930 include_defaults=include_defaults, 1931 include_description=include_description) 1932
1933 - def download_pre(self,dataset_id,tray_index,i3config, 1934 include_defaults=False,include_description=False):
1935 """ 1936 Download IceProdPre modules from the database. 1937 @param dataset_id: ID of the run whose configuration we whish to download 1938 """ 1939 return self.download_modules(dataset_id, 1940 tray_index, 1941 i3config, 1942 type='iceprod', 1943 iptype='pre', 1944 include_defaults=include_defaults, 1945 include_description=include_description) 1946
1947 - def download_post(self,dataset_id,tray_index,i3config, 1948 include_defaults=False, 1949 include_description=False):
1950 """ 1951 Download IceProdPost modules from the database. 1952 @param dataset_id: ID of the run whose configuration we whish to download 1953 """ 1954 return self.download_modules(dataset_id, 1955 tray_index, 1956 i3config, 1957 type='iceprod', 1958 iptype='post', 1959 include_defaults=include_defaults, 1960 include_description=include_description) 1961
1962 - def select_array(self,pid):
1963 cursor = self.getcursor() 1964 sql = " SELECT * from carray_element " 1965 sql += " WHERE cparameter_id = %d " % pid 1966 cursor.execute (sql.strip()) 1967 result_set = cursor.fetchall(); 1968 vect = [] 1969 for item in result_set: 1970 vect.append(Value(item['value'],self.nonify(item['unit']))) 1971 return vect 1972
1973 - def select_omkey(self,pid):
1974 omkeys = self.select_omkey_array(pid) 1975 if len(omkeys) < 1: 1976 raise Exception,'could not find omkey for param %d' % pid 1977 return omkeys[0] 1978
1979 - def select_omkey_array(self,pid):
1980 cursor = self.getcursor() 1981 sql = " SELECT * from carray_element " 1982 sql += " WHERE cparameter_id = %d order by carray_element_id" % pid 1983 cursor.execute (sql.strip()) 1984 result_set = cursor.fetchall(); 1985 omkeyvect = [] 1986 for item in result_set: 1987 if item['name'] == 'stringid': 1988 omkey = pyOMKey(0,0) 1989 omkey.stringid = item['value'] 1990 elif item['name'] == 'omid': 1991 omkey.omid = item['value'] 1992 omkeyvect.append(omkey) 1993 else: 1994 raise Exception,'expected omkey but found %s' % result_set[1]['name'] 1995 return omkeyvect 1996
1997 - def download_params(self,module,mod_id,dataset_id,include_defaults=False, include_description=False):
1998 """ 1999 Download module parameters from the database. 2000 @param mod_id: index corresponding to module table 2001 """ 2002 paramdict = {} 2003 # First include default parameters 2004 if include_defaults or include_description: 2005 sql = " SELECT * FROM parameter " 2006 sql += " WHERE module_id = %d" % module.GetId() 2007 sql += " ORDER BY name " 2008 cursor = self.getcursor() 2009 cursor.execute(sql) 2010 result_set = cursor.fetchall(); 2011 for p in result_set: 2012 param = Parameter() 2013 param.SetType(p['type']) 2014 param.SetName(p['name']) 2015 pid = p['parameter_id'] 2016 if param.GetType() == 'OMKeyv': 2017 param.SetValue(self.select_omkey_array(pid)) 2018 elif param.GetType() == 'OMKey': 2019 param.SetValue(self.select_omkey(pid)) 2020 elif param.GetType() in VectorTypes: 2021 param.SetValue(self.select_array(pid)) 2022 else: 2023 param.SetValue(Value(p['value'],self.nonify(p['unit']))) 2024 param.SetUnit(self.nonify(p['unit'])) 2025 param.SetDefault(True) 2026 param.SetDefault(param.GetValue()) 2027 paramdict[param.GetName().lower()] = param 2028 if include_description: 2029 param.SetDescription(p['description']) 2030 2031 2032 # Then override with configured values 2033 sql = " SELECT * FROM cparameter " 2034 sql += " WHERE module_pivot_id = '%s'" % mod_id 2035 sql += " AND dataset_id = '%s'" % dataset_id 2036 sql += " ORDER BY name " 2037 cursor = self.getcursor() 2038 cursor.execute(sql) 2039 result_set = cursor.fetchall(); 2040 2041 for p in result_set: 2042 param = Parameter() 2043 param.SetType(p['type']) 2044 param.SetName(p['name']) 2045 pid = p['cparameter_id'] 2046 if param.GetType() == 'OMKeyv': 2047 param.SetValue(self.select_omkey_array(pid)) 2048 elif param.GetType() == 'OMKey': 2049 param.SetValue(self.select_omkey(pid)) 2050 elif param.GetType() in VectorTypes: 2051 param.SetValue(self.select_array(pid)) 2052 else: 2053 param.SetValue(Value(p['value'],self.nonify(p['unit']))) 2054 param.SetUnit(self.nonify(p['unit'])) 2055 if paramdict.has_key(param.GetName().lower()): 2056 param.SetDefault(paramdict[param.GetName().lower()].GetDefault()) 2057 paramdict[param.GetName().lower()] = param 2058 2059 # now add all parameters to the module 2060 for param in paramdict.values(): 2061 module.AddParameter(param) 2062 2063 2064
2065 - def CheckVersion(self):
2066 """ 2067 Check latest version of software and see if we need to upgrade 2068 """ 2069 ver = iceprod.__version__ 2070 ver = ver.replace('V','').split('-') 2071 if len(ver) < 3: return None 2072 2073 if not self.isconnected(): self.connect() 2074 cursor = self.getcursor() 2075 sql = " SELECT * FROM svn " 2076 sql += " WHERE major='%s' " % ver[0] 2077 sql += " AND minor='%s' " % ver[0] 2078 cursor.execute(sql) 2079 result = cursor.fetchone() 2080 try: 2081 if int(ver[2]) < int(result['patch']): 2082 return result['url'] 2083 except: pass 2084 return None
2085 2086
2087 - def validate(self,dataset_id,status='TRUE'):
2088 """ 2089 Mark dataset as visible and valid. 2090 """ 2091 cursor = self.getcursor() 2092 sql = " UPDATE dataset SET verified = '%s' " % status 2093 sql += " WHERE dataset_id = %d " % dataset_id 2094 self.logger.debug(sql) 2095 cursor.execute(sql) 2096 self.commit()
2097 2098
2099 - def download_metadata(self,dataset_id):
2100 2101 cursor = self.getcursor() 2102 dp = DIF_Plus() 2103 2104 sql = " SELECT * FROM dif " 2105 sql += " WHERE dataset_id = %d " % dataset_id 2106 cursor.execute(sql) 2107 row = cursor.fetchone(); 2108 if not row: return dp 2109 dif = dp.GetDIF() 2110 dif.SetParameters(row['parameters']) 2111 dif.SetEntryTitle(row['entry_title']) 2112 dif.SetSummary(row['summary']) 2113 dif.SetSourceName(row['source_name']) 2114 dif.SetSensorName(row['sensorname']) 2115 td = time.strptime(str(row['dif_creation_date']),"%Y-%m-%d %H:%M:%S") 2116 td = time.strftime("%Y-%m-%d",td) 2117 dif.SetDIFCreationDate(td) 2118 2119 sql = " SELECT * FROM plus " 2120 sql += " WHERE dataset_id = %d " % dataset_id 2121 cursor.execute(sql) 2122 row = cursor.fetchone(); 2123 plus = dp.GetPlus() 2124 ts = time.strptime(str(row['start_datetime']),"%Y-%m-%d %H:%M:%S") 2125 ts = time.strftime("%Y-%m-%dT%H:%M:%S",ts) 2126 plus.SetStartDatetime(ts) 2127 2128 te = time.strptime(str(row['end_datetime']),"%Y-%m-%d %H:%M:%S") 2129 te = time.strftime("%Y-%m-%dT%H:%M:%S",te) 2130 plus.SetEndDatetime(te) 2131 plus.SetCategory(row['category']) 2132 plus.SetSubCategory(row['subcategory']) 2133 plus.SetSimDBKey(dataset_id) 2134 plus.SetI3DBKey(row['i3db_key']) 2135 plus.SetSteeringFile(row['steering_file']) 2136 for project in self.fetch_project_list(dataset_id): 2137 plus.AddProject(project) 2138 2139 return dp 2140
2141 - def upload_metadata(self,dataset_id,difplus):
2142 2143 cursor = self.getcursor() 2144 2145 dif = difplus.GetDIF() 2146 sql = " INSERT INTO dif " 2147 sql += " (dataset_id,parameters,entry_title,summary, " 2148 sql += " source_name,sensorname,dif_creation_date) " 2149 sql += " VALUES ( %d, " % dataset_id 2150 sql += "'%s'," % str(dif.GetParameters()) 2151 sql += "'%s'," % str(dif.GetEntryTitle()) 2152 sql += "'%s'," % str(dif.GetSummary()) 2153 sql += "'%s'," % str(dif.GetSourceName()) 2154 sql += "'%s'," % str(dif.GetSensorName()) 2155 sql += "'%s')" % str(dif.GetDIFCreationDate()) 2156 self.logger.debug(sql) 2157 cursor.execute(sql) 2158 2159 plus = difplus.GetPlus() 2160 sql = " INSERT INTO plus " 2161 sql += " (dataset_id,start_datetime,end_datetime," 2162 sql += " category,subcategory,i3db_key,steering_file) " 2163 sql += " VALUES ( %d, " % dataset_id 2164 sql += "'%s'," % str(plus.GetStartDatetime()) 2165 sql += "'%s'," % str(plus.GetEndDatetime()) 2166 sql += "'%s'," % str(plus.GetCategory()) 2167 sql += "'%s'," % str(plus.GetSubCategory()) 2168 sql += "%s," % (plus.GetI3DBKey() or 'NULL') 2169 sql += "'%s')" % str(plus.GetSteeringFile()) 2170 self.logger.debug(sql) 2171 cursor.execute(sql) 2172 2173 self.commit() 2174
2175 - def set_metadata_subcat(self,dataset_id,sub_cat):
2176 """ 2177 Change Plus:subcategory in DIFPlus metadata 2178 """ 2179 cursor = self.getcursor() 2180 sql = " UPDATE plus " 2181 sql += " SET subcategory='%s' " % sub_cat 2182 sql += " WHERE dataset_id=%d " % dataset_id 2183 self.logger.debug(sql) 2184 cursor.execute(sql) 2185 self.commit() 2186 2187
2188 - def load_filelist(self,odict,dataset_id=0):
2189 """ 2190 load a list of files for filtering. 2191 @param dataset_id: the dataset id that dictonary is bound to 2192 @param odict: the dictionary to load 2193 """ 2194 cursor = self.getcursor() 2195 index = 0 2196 list = odict.keys()[ 2197 min(index,len(odict.keys())): 2198 min(index+100,len(odict.keys()))] 2199 while list: 2200 sql = " INSERT INTO file (file_key,path,subdir,filename,dataset_id) " 2201 sql += " VALUES " 2202 cm = '' 2203 for key in list: 2204 file = odict[key] 2205 sql += " %s ( %d, '%s', '%s', '%s', %d ) " % \ 2206 (cm,key,file[0],file[1],file[2],dataset_id) 2207 cm = ',' 2208 index = index + 100 2209 list = odict.keys()[ 2210 min(index,len(odict.keys())): 2211 min(index+100,len(odict.keys()))] 2212 self.logger.debug(sql) 2213 cursor.execute(sql) 2214 self.commit()
2215
2216 - def get_metaproject_tarball(self,metaproject_name,metaproject_version,platform,gccversion,ppc=0):
2217 """ 2218 Get best matching tarball in file repository 2219 @param metaproject_name 2220 @param metaproject_version 2221 @param platform 2222 @param gccversion 2223 @param ppc: boolean (get PPC version of tarball) 2224 @return: string path to tarball 2225 """ 2226 cursor = self.getcursor() 2227 2228 sql = " SELECT * FROM metaproject_tarball mt " 2229 sql += " JOIN metaproject m ON m.metaproject_id = mt.metaproject_id " 2230 sql += " WHERE m.name = '%s' " % metaproject_name 2231 sql += " AND m.versiontxt = '%s' " % metaproject_version 2232 sql += " AND mt.platform = '%s' " % platform 2233 sql += " AND mt.gcc <= '%s' " % gccversion 2234 sql += " ORDER BY mt.gcc DESC " 2235 2236 self.logger.debug(sql) 2237 cursor.execute(sql) 2238 row = cursor.fetchone(); 2239 if row: 2240 return row['relpath'] 2241 else: 2242 return '' 2243 2244 2245
2246 -class MonitorDB(IceProdDB):
2247 2248 logger = logging.getLogger('MonitorDB') 2249
2250 - def __init__(self):
2251 """ 2252 Constructor 2253 """ 2254 IceProdDB.__init__(self) 2255 self.maxsuspend = 20
2256
2257 - def new(self):
2258 """ 2259 Create a copy of this instance 2260 """ 2261 newconn = MonitorDB() 2262 newconn.host_ = self.host_ 2263 newconn.usr_ = self.usr_ 2264 newconn.passwd_ = self.passwd_ 2265 newconn.db_ = self.db_ 2266 newconn._connected = False 2267 return newconn
2268 2269
2270 - def reset_old_jobs(self, 2271 grid_id, 2272 maxidletime, 2273 maxruntime, 2274 maxsubmittime, 2275 maxcopytime, 2276 maxfailures=10, 2277 maxevicttime=10, 2278 keepalive=14400):
2279 """ 2280 reset status of jobs that where queued but who's status 2281 has not changed in more that maxtime minutes 2282 2283 @param grid_id: id of current cluster 2284 @param maxruntime: maximum run time for jobs 2285 @param maxsubmittime: maximum submit time for jobs 2286 @param maxcopytime: maximum time for jobs to be in 'copying' state 2287 @param maxfailures: maximum number of time a job is allowd to fail 2288 @param keepalive: how often should server expect to hear from jobs 2289 """ 2290 2291 totalrows = 0 2292 cursor = self.getcursor() 2293 passkey = self.mkkey(6,9) 2294 2295 # jobs in QUEUED state 2296 self.logger.debug("reseting stale queued jobs...") 2297 sql = " UPDATE job SET " 2298 sql += " errormessage=CONCAT( " 2299 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0] 2300 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), " 2301 sql += " prev_state = status, " 2302 sql += " status='RESET', " 2303 sql += " passkey='%s', " % passkey 2304 sql += " status_changed=NOW() " 2305 sql += " WHERE grid_id=%d " % grid_id 2306 sql += " AND status='QUEUED' " 2307 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxidletime 2308 sql += " LIMIT 20 " 2309 self.logger.debug(sql) 2310 cursor.execute(sql) 2311 rowcount = self._conn.affected_rows() 2312 totalrows += rowcount 2313 self.commit() 2314 self.logger.debug('Reset %d queued jobs' % rowcount) 2315 2316 # jobs in QUEUING state 2317 self.logger.debug("reseting stale queuing jobs...") 2318 sql = " UPDATE job SET " 2319 sql += " errormessage=CONCAT( " 2320 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0] 2321 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), " 2322 sql += " prev_state = status, " 2323 sql += " status='RESET', " 2324 sql += " passkey='%s', " % passkey 2325 sql += " status_changed=NOW() " 2326 sql += " WHERE grid_id=%d " % grid_id 2327 sql += " AND status='QUEUEING' " 2328 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxsubmittime 2329 sql += " LIMIT 20 " 2330 cursor.execute(sql) 2331 rowcount = self._conn.affected_rows() 2332 totalrows += rowcount 2333 self.commit() 2334 self.logger.debug('Reset %d queueing jobs' % rowcount) 2335 2336 # jobs in CLEANING state 2337 self.logger.debug("reseting stale cleaning jobs...") 2338 sql = " UPDATE job SET " 2339 sql += " errormessage=CONCAT( " 2340 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0] 2341 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), " 2342 sql += " status=prev_state, " 2343 sql += " prev_state = 'CLEANING', " 2344 sql += " passkey='%s', " % passkey 2345 sql += " status_changed=NOW() " 2346 sql += " WHERE grid_id=%d " % grid_id 2347 sql += " AND status='CLEANING' " 2348 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxsubmittime 2349 sql += " LIMIT 20 " 2350 cursor.execute(sql) 2351 rowcount = self._conn.affected_rows() 2352 totalrows += rowcount 2353 self.commit() 2354 self.logger.debug('Reset %d cleaning jobs' % rowcount) 2355 2356 # jobs in PROCESSING state 2357 self.logger.debug("reseting stale processing jobs...") 2358 sql = " UPDATE job SET " 2359 sql += " errormessage=CONCAT( " 2360 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0] 2361 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), " 2362 sql += " prev_state = status, " 2363 sql += " status='RESET', " 2364 sql += " passkey='%s', " % passkey 2365 sql += " status_changed=NOW() " 2366 sql += " WHERE grid_id=%d " % grid_id 2367 sql += " AND status='PROCESSING' " 2368 timeout = maxruntime 2369 if keepalive > 0: 2370 timeout = min(maxruntime,keepalive) 2371 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % timeout 2372 sql += " LIMIT 20 " 2373 cursor.execute(sql) 2374 rowcount = self._conn.affected_rows() 2375 totalrows += rowcount 2376 self.commit() 2377 self.logger.debug('Reset %d processing jobs' % rowcount) 2378 2379 # jobs in EVICTED state 2380 self.logger.debug("reseting evicted jobs...") 2381 sql = " UPDATE job SET " 2382 sql += " errormessage=CONCAT( " 2383 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0] 2384 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), " 2385 sql += " prev_state = status, " 2386 sql += " status='RESET', " 2387 sql += " passkey='%s', " % passkey 2388 sql += " status_changed=NOW() " 2389 sql += " WHERE grid_id=%d " % grid_id 2390 sql += " AND status='EVICTED' " 2391 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxevicttime 2392 sql += " LIMIT 20 " 2393 cursor.execute(sql) 2394 rowcount = self._conn.affected_rows() 2395 totalrows += rowcount 2396 self.commit() 2397 self.logger.debug('Reset %d evicted jobs' % rowcount) 2398 2399 # jobs in COPYING state 2400 self.logger.debug("reseting stale copying jobs...") 2401 sql = " UPDATE job SET " 2402 sql += " errormessage=CONCAT( " 2403 sql += " '%s: max. allowed time reached for state ', " % os.uname()[1].split('.')[0] 2404 sql += " job.status,': ', TIMEDIFF(NOW(),status_changed)), " 2405 sql += " prev_state = status, " 2406 sql += " status='RESET', " 2407 sql += " passkey='%s', " % passkey 2408 sql += " status_changed=NOW() " 2409 sql += " WHERE grid_id=%d " % grid_id 2410 sql += " AND status='COPYING' " 2411 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % maxcopytime 2412 sql += " LIMIT 100 " 2413 cursor.execute(sql) 2414 rowcount = self._conn.affected_rows() 2415 totalrows += rowcount 2416 self.commit() 2417 self.logger.debug('Reset %d copying jobs' % rowcount) 2418 self.logger.info('Reset %d jobs' % totalrows) 2419 2420 # suspend jobs with too many errors 2421 self.logger.debug("suspending jobs with too many errors...") 2422 sql = " UPDATE job SET " 2423 sql += " errormessage=CONCAT('too many errors.',errormessage), " 2424 sql += " prev_state = status, " 2425 sql += " status='FAILED', " 2426 sql += " passkey='%s', " % passkey 2427 sql += " status_changed=NOW() " 2428 sql += " WHERE grid_id=%d " % grid_id 2429 sql += " AND status != 'SUSPENDED' " 2430 sql += " AND status != 'FAILED' " 2431 sql += " AND status != 'OK' " 2432 sql += " AND job.failures > %d " % maxfailures 2433 sql += " LIMIT 2000 " 2434 cursor.execute(sql) 2435 rowcount = self._conn.affected_rows() 2436 self.commit() 2437 if rowcount > 0: 2438 self.logger.info('Suspended %d jobs with too many errors' % rowcount)
2439
2440 - def download_tasks(self,dataset_id,steering):
2441 """ 2442 Get job parts from database 2443 @param dataset_id: ID of the run whose configuration we whish to download 2444 """ 2445 2446 sql = "SELECT task_def_id,name,reqs,opts,parallel,photonics FROM task_def" \ 2447 + " WHERE dataset_id = %s ORDER BY task_def_id" 2448 cursor = self.getcursor() 2449 cursor.execute(sql, (dataset_id,)) 2450 results = cursor.fetchall() 2451 2452 tray_sql = "SELECT idx,CONVERT(GROUP_CONCAT(iter),char) AS iters" \ 2453 + " FROM task_def_tray WHERE task_def_id = %s" \ 2454 + " GROUP BY idx,task_def_id" 2455 parent_sql = "SELECT name FROM task_def,task_def_rel" \ 2456 + " WHERE child_task_def_id = %s" \ 2457 + " AND parent_task_def_id = task_def_id" 2458 child_sql = "SELECT name FROM task_def,task_def_rel" \ 2459 + " WHERE parent_task_def_id = %s" \ 2460 + " AND child_task_def_id = task_def_id" 2461 2462 for row in results: 2463 id = row['task_def_id'] 2464 name = row['name'] 2465 reqs = row['reqs'] 2466 opts = row['opts'] 2467 parallel = row['parallel'] 2468 photonics = row['photonics'] 2469 2470 td = TaskDefinition(name,id) 2471 td.SetRequirements(reqs) 2472 if opts: td.SetBatchOpts(opts) 2473 td.SetParallelExecution(parallel) 2474 td.SetUsesPhotonics(photonics) 2475 2476 self.logger.debug(tray_sql % id) 2477 cursor.execute(tray_sql, (id,)) 2478 trays = cursor.fetchall() 2479 for tray in trays: 2480 td.AddTray(tray['idx'], tray['iters']) 2481 2482 cursor.execute(parent_sql, (id,)) 2483 parents = cursor.fetchall() 2484 for parent in parents: 2485 td.AddParent(parent['name']) 2486 2487 cursor.execute(child_sql, (id,)) 2488 children = cursor.fetchall() 2489 for child in children: 2490 td.AddChild(child['name']) 2491 2492 steering.AddTaskDefinition(td)
2493 2494 2495 2496
2497 - def getNewSets(self,grid=None,dataset=0):
2498 """ 2499 Get a list of new datasets. 2500 """ 2501 cursor = self.getcursor() 2502 sql = " SELECT dataset.dataset_id,dataset.jobs_submitted, " 2503 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source, " 2504 sql += " dif.sensorname as sensor, " 2505 sql += " YEAR(plus.start_datetime) as year, " 2506 sql += " plus.category, plus.subcategory,dataset.hist " 2507 sql += " FROM dataset,plus,dif " 2508 if grid: 2509 sql += ",grid_statistics " 2510 sql += " WHERE dataset.dataset_id = plus.dataset_id " 2511 sql += " AND dataset.dataset_id=dif.dataset_id " 2512 if grid: 2513 sql += " AND grid_statistics.dataset_id = dataset.dataset_id " 2514 sql += " AND grid_statistics.grid_id = %d " % grid 2515 if dataset > 0: 2516 sql += " AND dataset.dataset_id = %d " % dataset 2517 else: 2518 sql += " AND dataset.status='PROCESSING' " 2519 self.logger.debug(sql) 2520 cursor.execute(sql) 2521 sets = cursor.fetchall(); 2522 self.commit() 2523 for set in sets: 2524 set['sensor'] = set['sensor'].replace('ICECUBE','IceCube').replace('IceTop','IceTop') 2525 return sets
2526
2527 - def getNewFinishedSets(self,dataset=0):
2528 """ 2529 Get a list of finished dataset for which no histos have been created. 2530 """ 2531 cursor = self.getcursor() 2532 sql = " SELECT dataset.dataset_id,dataset.jobs_submitted, " 2533 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source, " 2534 sql += " dif.sensorname as sensor, " 2535 sql += " YEAR(plus.start_datetime) as year, " 2536 sql += " plus.category, plus.subcategory " 2537 sql += " FROM dataset,plus,dif " 2538 sql += " WHERE dataset.dataset_id = plus.dataset_id " 2539 sql += " AND dataset.dataset_id=dif.dataset_id " 2540 sql += " AND dataset.status='COMPLETE' " 2541 if dataset: 2542 sql += " AND dataset.dataset_id = %d " % dataset 2543 else: 2544 sql += " AND dataset.hist=1 " 2545 self.logger.debug(sql) 2546 cursor.execute(sql) 2547 sets = cursor.fetchall(); 2548 self.commit() 2549 for set in sets: 2550 set['sensor'] = set['sensor'].replace('ICECUBE','IceCube').replace('IceTop','IceTop') 2551 return sets
2552
2553 - def getFinishedSets(self,datasetlist=[]):
2554 """ 2555 Get a list of finished dataset for which no histos have been created. 2556 """ 2557 sets = [] 2558 if datasetlist: 2559 cursor = self.getcursor() 2560 sql = " SELECT dataset.* FROM dataset " 2561 sql += " WHERE dataset.dataset_id " 2562 sql += " IN (%s) " % ",".join(map(str,datasetlist)) 2563 sql += " AND dataset.status != 'PROCESSING' " 2564 self.logger.debug(sql) 2565 cursor.execute(sql) 2566 sets = cursor.fetchall(); 2567 self.commit() 2568 return sets
2569
2570 - def getSetsInfo(self,dataset_id):
2571 """ 2572 Get a list of new datasets. 2573 """ 2574 cursor = self.getcursor() 2575 sql = " SELECT dataset.dataset_id,dataset.jobs_submitted, " 2576 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source, " 2577 sql += " dif.sensorname as sensor, " 2578 sql += " YEAR(plus.start_datetime) as year, " 2579 sql += " plus.category, plus.subcategory " 2580 sql += " FROM dataset,plus,dif " 2581 sql += " WHERE dataset.dataset_id = plus.dataset_id " 2582 sql += " AND dataset.dataset_id=dif.dataset_id " 2583 sql += " AND dataset.status='COMPLETE' AND dataset.hist=0 " 2584 sql += " AND dataset.dataset_id = %d " % dataset_id 2585 self.logger.debug(sql) 2586 cursor.execute(sql) 2587 set = cursor.fetchone(); 2588 set['sensor'] = set['sensor'].replace('ICECUBE','IceCube').replace('IceTop','IceTop') 2589 self.commit() 2590 return set
2591
2592 - def AddedHisto(self,dataset):
2593 cursor = self.getcursor() 2594 sql = " UPDATE dataset " 2595 sql += " SET dataset.hist=0 " 2596 sql += " WHERE dataset.dataset_id = %d " % dataset 2597 sql += " AND dataset.hist=1 " 2598 self.logger.debug(sql) 2599 cursor.execute(sql) 2600 self.commit() 2601 return
2602
2603 - def update_monitoring(self,grid_id=None,dataset_id=0):
2604 """ 2605 Update statistics for datasets and return all dataset which 2606 have completed 2607 """ 2608 2609 finished_sets = [] 2610 cursor = self.getcursor() 2611 2612 #self.logger.debug("updating monitoring tables") 2613 #sql = " SELECT job.dataset_id, " 2614 #sql += " job.grid_id, " 2615 #sql += " SUM(job.time_real) AS real_time, " 2616 #sql += " SUM(job.time_sys) AS sys_time, " 2617 #sql += " SUM(job.time_user) AS user_time, " 2618 #sql += " SUM(job.mem_heap) AS heap_mem, " 2619 #sql += " SUM(job.mem_heap_peak) AS heap_mem_peak, " 2620 #sql += " SUM(job.mem_stack_peak) AS stack_mem_peak, " 2621 #sql += " SUM(job.nevents) AS events, " 2622 #sql += " SUM(job.gevents) AS gevents, " 2623 #sql += " SUM(job.evictions) AS sevictions " 2624 #sql += " FROM job,dataset " 2625 #sql += " WHERE job.status = 'OK' " 2626 #sql += " AND dataset.dataset_id = job.dataset_id " 2627 #if dataset_id: 2628 # sql += " AND dataset.dataset_id = %d " % dataset_id 2629 # sql += " GROUP BY job.grid_id " 2630 #else: 2631 # sql += " AND dataset.status = 'PROCESSING' " 2632 # sql += " AND dataset.jobs_completed >= dataset.jobs_submitted " 2633 # sql += " GROUP BY job.dataset_id, job.grid_id " 2634 #cursor.execute(sql) 2635 #result_set = cursor.fetchall(); 2636 #self.commit() 2637 2638 #self.logger.debug('updating stats for %d datasets' % len(result_set)) 2639 #for entry in result_set: 2640 # sql = " UPDATE grid_statistics SET " 2641 # sql += " time_real = %g, " % entry['real_time'] 2642 # sql += " time_sys = %g, " % entry['sys_time'] 2643 # sql += " time_user = %g, " % entry['user_time'] 2644 # sql += " mem_heap = %g, " % entry['heap_mem'] 2645 # sql += " mem_heap_peak = %g, " % entry['heap_mem_peak'] 2646 # sql += " mem_stack_peak = %g, " % entry['stack_mem_peak'] 2647 # sql += " nevents = %d " % self.intcast(entry['events']) 2648 # sql += " WHERE dataset_id = %d " % entry['dataset_id'] 2649 # sql += " AND grid_id = %d " % entry['grid_id'] 2650 # cursor.execute(sql) 2651 # self.commit() 2652 2653 sql = " SELECT job.dataset_id, " 2654 sql += " MAX(job.status_changed) AS enddate, " 2655 sql += " SUM(1) AS jobs_submitted, " 2656 sql += " SUM(job.status='OK' ) AS jobs_completed, " 2657 sql += " SUM(job.status='FAILED') AS jobs_failed, " 2658 sql += " SUM(job.status='SUSPENDED') AS jobs_suspended, " 2659 sql += " SUM(job.time_real) AS real_time, " 2660 sql += " SUM(job.time_sys) AS sys_time, " 2661 sql += " SUM(job.time_user) AS user_time, " 2662 sql += " SUM(job.mem_heap) AS heap_mem, " 2663 sql += " SUM(job.mem_heap_peak) AS heap_mem_peak, " 2664 sql += " SUM(job.mem_stack_peak) AS stack_mem_peak, " 2665 sql += " SUM(job.nevents) AS events, " 2666 sql += " SUM(job.gevents) AS gevents, " 2667 sql += " SUM(job.evictions) AS sevictions, " 2668 sql += " grid_statistics.debug AS debug " 2669 sql += " FROM job, dataset, grid_statistics " 2670 sql += " WHERE dataset.dataset_id = job.dataset_id " 2671 sql += " AND dataset.dataset_id = grid_statistics.dataset_id " 2672 if grid_id is not None: 2673 sql += " AND grid_statistics.grid_id = %d " % grid_id 2674 if dataset_id: 2675 sql += " AND dataset.dataset_id = %d " % dataset_id 2676 else: 2677 sql += " AND dataset.status = 'PROCESSING' " 2678 sql += " GROUP by job.dataset_id " 2679 cursor.execute(sql) 2680 result_set = cursor.fetchall(); 2681 self.commit() 2682 2683 for entry in result_set: 2684 try: 2685 entry['jobs_completed'] = self.intcast(entry['jobs_completed']) 2686 entry['jobs_failed'] = self.intcast(entry['jobs_failed']) 2687 except Exception,e: 2688 self.logger.error("Could not cast int(%s)" % entry['jobs_completed'] ) 2689 entry['jobs_completed'] = 0 2690 continue; 2691 2692 if self.intcast(entry['jobs_completed']) == self.intcast(entry['jobs_submitted']): 2693 2694 finished_sets.append(entry) 2695 sql = " UPDATE dataset " 2696 sql += " SET dataset.jobs_completed = %(jobs_completed)d, " 2697 sql += " dataset.jobs_failed = %(jobs_failed)d, " 2698 sql += " dataset.status = '%(status)s', " 2699 sql += " dataset.enddate = '%(enddate)s', " 2700 sql += " time_real = %(real_time)g, " 2701 sql += " time_sys = %(sys_time)g, " 2702 sql += " time_user = %(user_time)g, " 2703 sql += " mem_heap = %(heap_mem)g, " 2704 sql += " mem_heap_peak = %(heap_mem_peak)g, " 2705 sql += " mem_stack_peak = %(stack_mem_peak)g, " 2706 sql += " events = %(events)d " 2707 sql += " WHERE dataset.dataset_id = %(dataset_id)d " 2708 sql += " AND dataset.status = 'PROCESSING' " 2709 sql += " %(and)s " 2710 2711 entry['status'] = 'READYTOPUBLISH' 2712 entry['and'] = "AND dataset.dataset_category != 'TEMPLATE'" 2713 cursor.execute(sql % entry) 2714 2715 entry['status'] = 'TEMPLATE' 2716 entry['and'] = "AND dataset.dataset_category = 'TEMPLATE'" 2717 cursor.execute(sql % entry) 2718 2719 sql = " UPDATE grid_statistics " 2720 sql += " SET suspend = 1 " 2721 sql += " WHERE dataset_id = %d " % entry['dataset_id'] 2722 cursor.execute(sql) 2723 2724 elif entry['jobs_submitted'] and (self.intcast(entry['jobs_completed']) + \ 2725 self.intcast(entry['jobs_failed'])) == \ 2726 self.intcast(entry['jobs_submitted']): 2727 finished_sets.append(entry) 2728 sql = " UPDATE dataset " 2729 sql += " SET dataset.jobs_completed = %d, " % entry['jobs_completed'] 2730 sql += " dataset.jobs_failed = %d, " % entry['jobs_failed'] 2731 sql += " dataset.status = 'ERRORS', " 2732 sql += " dataset.enddate = '%s', " % entry['enddate'] 2733 sql += " time_real = %g, " % entry['real_time'] 2734 sql += " time_sys = %g, " % entry['sys_time'] 2735 sql += " time_user = %g, " % entry['user_time'] 2736 sql += " mem_heap = %g, " % entry['heap_mem'] 2737 sql += " mem_heap_peak = %g, " % entry['heap_mem_peak'] 2738 sql += " mem_stack_peak = %g, " % entry['stack_mem_peak'] 2739 sql += " events = %d " % self.intcast(entry['events']) 2740 sql += " WHERE dataset.dataset_id = %d " % entry['dataset_id'] 2741 sql += " AND dataset.status = 'PROCESSING' " 2742 cursor.execute(sql) 2743 2744 sql = " UPDATE grid_statistics " 2745 sql += " SET suspend = 1 " 2746 sql += " WHERE dataset_id = %d " % entry['dataset_id'] 2747 cursor.execute(sql) 2748 2749 if self.intcast(entry['jobs_suspended']) + self.intcast(entry['jobs_failed']) > self.maxsuspend: 2750 sql = " UPDATE grid_statistics " 2751 sql += " SET suspend = 1 " 2752 sql += " WHERE suspend = 0 " 2753 sql += " AND dataset_id = %d " % entry['dataset_id'] 2754 if grid_id is not None: 2755 sql += " AND grid_id = %d " % grid_id 2756 sql += " AND debug = 1" 2757 cursor.execute(sql) 2758 2759 self.commit() 2760 return finished_sets
2761 2762
2763 - def GetGridId(self,grid_name,institution=None,batchsys=None,url=None):
2764 """ 2765 Retrieve the key for grid_name 2766 """ 2767 ver = iceprod.__version__ 2768 if not self.isconnected(): self.connect() 2769 cursor = self.getcursor() 2770 sql = " SELECT grid_id FROM grid WHERE name='%s' " % grid_name 2771 cursor.execute(sql) 2772 result = cursor.fetchone() 2773 if result: 2774 grid_id = result['grid_id'] 2775 else: 2776 sql = " INSERT IGNORE INTO grid (name,institution,batchsys,version) " 2777 sql += " VALUES ('%s','%s','%s','%s') " % (grid_name,institution,batchsys,ver) 2778 cursor.execute(sql) 2779 grid_id = self.insert_id() 2780 2781 if institution and batchsys: 2782 sql = " UPDATE grid SET " 2783 if url: 2784 sql += " url = '%s', " % url 2785 sql += " institution = '%s', " % institution 2786 sql += " batchsys = '%s', " % batchsys 2787 sql += " version = '%s' " % ver 2788 sql += " WHERE grid_id=%d " % grid_id 2789 cursor.execute(sql) 2790 self.commit() 2791 2792 return grid_id
2793
2794 - def RegisterServer(self, 2795 grid_id, 2796 server_name, 2797 server_status, 2798 server_pid):
2799 """ 2800 Retrieve the key for grid_name 2801 """ 2802 if not self.isconnected(): self.connect() 2803 cursor = self.getcursor() 2804 2805 sql = " UPDATE grid SET " 2806 sql += " %s='%s', " % (server_name,server_status) 2807 sql += " %s_pid=%d " % (server_name,server_pid) 2808 sql += " WHERE grid_id=%d " % grid_id 2809 cursor.execute(sql) 2810 self.commit()
2811
2812 - def GetGridStatusChanges(self, grid_id):
2813 """ 2814 Get status changes for daemons 2815 """ 2816 if not self.isconnected(): self.connect() 2817 cursor = self.getcursor() 2818 2819 sql = " SELECT * FROM grid " 2820 sql += " WHERE grid_id=%d " % grid_id 2821 cursor.execute(sql) 2822 result = cursor.fetchone() 2823 2824 sql = " UPDATE grid SET lastupdate=NOW() " 2825 sql += " WHERE grid_id=%d " % grid_id 2826 cursor.execute(sql) 2827 2828 self.commit() 2829 return result
2830
2831 - def GridRequestSuspend(self, grid,daemon):
2832 """ 2833 Change status of daemons 2834 """ 2835 cursor = self.getcursor() 2836 2837 sql = " UPDATE grid SET " 2838 sql += " %s = 'STOPREQUEST' " % daemon 2839 sql += " WHERE %s ='RUNNING' " % daemon 2840 if type(grid) is types.IntType: 2841 sql += " AND grid_id=%u " % grid 2842 elif grid not in ('any','*','all'): 2843 sql += " AND name='%s' " % grid 2844 cursor.execute(sql) 2845 self.commit()
2846
2847 - def GridRequestResume(self, grid,daemon):
2848 """ 2849 Change status of daemons 2850 """ 2851 cursor = self.getcursor() 2852 2853 sql = " UPDATE grid SET " 2854 sql += " %s = 'STARTREQUEST' " % daemon 2855 sql += " WHERE %s = 'STOPPED' " % daemon 2856 if type(grid) is types.IntType: 2857 sql += " AND grid_id=%u " % grid 2858 else: 2859 sql += " AND name='%s' " % grid 2860 cursor.execute(sql) 2861 self.commit()
2862
2863 - def GetDatasetParams(self,dataset):
2864 """ 2865 Get parameters for given dataset 2866 """ 2867 cursor = self.getcursor() 2868 sql = " SELECT plus.category, plus.subcategory, " 2869 sql += " YEAR(plus.start_datetime) as year," 2870 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source " 2871 sql += " FROM plus,dif " 2872 sql += " WHERE plus.dataset_id = %d " % dataset 2873 sql += " AND plus.dataset_id = dif.dataset_id " 2874 self.logger.debug(sql) 2875 cursor.execute(sql) 2876 return cursor.fetchall()
2877
2878 - def GetFinishedJobs(self,grid_id,max_copy=20,delay=5):
2879 """ 2880 Fetch list of jobs that have completed for given grid_id 2881 """ 2882 # Get all jobs assigned to grid which need to be reset. 2883 cursor = self.getcursor() 2884 passkey = self.mkkey(6,9) 2885 job_list = [] 2886 2887 sql = " SELECT SUM(1) copying FROM `job` " 2888 sql += " WHERE " 2889 sql += " (status='COPYING' OR status='COPIED') " 2890 sql += " AND grid_id = %d " % grid_id 2891 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay 2892 self.logger.debug(sql) 2893 cursor.execute(sql) 2894 currently_copying = self.intcast(cursor.fetchone()['copying']) 2895 if not currently_copying: currently_copying = 0 2896 2897 sql = " UPDATE job SET " 2898 sql += " passkey='%s', " % passkey 2899 sql += " status_changed=NOW() " 2900 sql += " WHERE status='COPIED' " 2901 sql += " AND grid_id = %d " % grid_id 2902 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay 2903 sql += " LIMIT %d " % max_copy 2904 self.logger.debug(sql) 2905 cursor.execute(sql) 2906 self.commit() 2907 2908 sql = " UPDATE job SET " 2909 sql += " prev_state = status, " 2910 sql += " status = 'COPYING', " 2911 sql += " passkey='%s', " % passkey 2912 sql += " status_changed=NOW() " 2913 sql += " WHERE status='READYTOCOPY' " 2914 sql += " AND grid_id = %d " % grid_id 2915 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay 2916 sql += " LIMIT %d " % max(1,max_copy - currently_copying) 2917 self.logger.debug(sql) 2918 cursor.execute(sql) 2919 self.commit() 2920 2921 sql = " SELECT job.*, plus.category, plus.subcategory, " 2922 sql += " YEAR(plus.start_datetime) as year," 2923 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source " 2924 sql += " FROM job,plus,dif " 2925 sql += " WHERE grid_id = %d " % grid_id 2926 sql += " AND job.dataset_id = plus.dataset_id " 2927 sql += " AND plus.dataset_id = dif.dataset_id " 2928 sql += " AND (job.status='COPYING' OR job.status='COPIED') " 2929 sql += " AND job.passkey='%s' " % passkey 2930 self.logger.debug(sql) 2931 cursor.execute(sql) 2932 result_set = cursor.fetchall() 2933 self.commit() 2934 2935 return result_set
2936
2937 - def GetResetJobs(self,grid_id,max_reset=50):
2938 """ 2939 Fetch list of jobs that have completed for given grid_id 2940 """ 2941 # Get all jobs assigned to grid which need to be reset. 2942 cursor = self.getcursor() 2943 passkey = self.mkkey(6,9) 2944 2945 # suspend error jobs if dataset is in debug mode 2946 sql = " UPDATE job,dataset SET " 2947 sql += " job.prev_state = job.status, " 2948 sql += " job.status='SUSPENDED', " 2949 sql += " job.status_changed=NOW() " 2950 sql += " WHERE job.status='ERROR' " 2951 sql += " AND job.grid_id = %d " % grid_id 2952 sql += " AND job.dataset_id = dataset.dataset_id " 2953 sql += " AND dataset.debug = 1 " 2954 cursor.execute(sql) 2955 self.commit() 2956 2957 # put ERROR jobs with no grid assignment back in queue 2958 sql = " UPDATE job SET " 2959 sql += " prev_state = status, " 2960 sql += " status='WAITING', " 2961 sql += " status_changed=NOW() " 2962 sql += " WHERE status IN ('ERROR','RESET') " 2963 sql += " AND (grid_id =0 OR grid_id IS NULL) " 2964 cursor.execute(sql) 2965 self.commit() 2966 2967 # set passkey for jobs in RESET state 2968 sql = " UPDATE job SET " 2969 sql += " prev_state = status, " 2970 sql += " passkey='%s' " % passkey 2971 sql += " WHERE status='RESET' " 2972 sql += " AND grid_id = %d " % grid_id 2973 sql += " LIMIT %d " % max_reset 2974 cursor.execute(sql) 2975 self.commit() 2976 2977 # SELECT jobs to be cleaned up 2978 sql = " SELECT * FROM job " 2979 sql += " WHERE grid_id = %d " % grid_id 2980 sql += " AND passkey='%s' " % passkey 2981 sql += " AND status='RESET' " 2982 sql += " LIMIT %d " % max_reset 2983 cursor.execute(sql) 2984 result_set = cursor.fetchall() 2985 self.commit() 2986 2987 # and set their status to CLEANING 2988 sql = " UPDATE job SET " 2989 sql += " prev_state = status, " 2990 sql += " status = 'CLEANING', " 2991 sql += " passkey='%s', " % passkey 2992 sql += " status_changed=NOW() " 2993 sql += " WHERE status='RESET' " 2994 sql += " AND grid_id = %d " % grid_id 2995 sql += " AND NOW() > TIMESTAMPADD(MINUTE,5,status_changed) " 2996 sql += " ORDER BY priority DESC, job_id " 2997 sql += " LIMIT %d " % max_reset 2998 cursor.execute(sql) 2999 self.commit() 3000 3001 # reset jobs with given grid_id 3002 sql = " UPDATE job SET " 3003 sql += " job.prev_state = job.status, " 3004 sql += " job.status='RESET', " 3005 sql += " job.status_changed=NOW() " 3006 sql += " WHERE job.status='ERROR' " 3007 sql += " AND job.grid_id = %d " % grid_id 3008 sql += " ORDER BY job.priority DESC, job.job_id " 3009 sql += " LIMIT %d " % max_reset 3010 cursor.execute(sql) 3011 self.commit() 3012 3013 return result_set
3014
3015 - def reset_old_tasks(self, 3016 grid_id, 3017 maxidletime, 3018 maxruntime, 3019 maxsubmittime, 3020 maxcopytime, 3021 maxfailures=10, 3022 maxevicttime=10, 3023 keepalive=14400):
3024 """ 3025 reset status of tasks that where queued but who's status 3026 has not changed in more that maxtime minutes 3027 3028 @param grid_id: id of current cluster 3029 @param maxruntime: maximum run time for jobs 3030 @param maxsubmittime: maximum submit time for jobs 3031 @param maxcopytime: maximum time for jobs to be in 'copying' state 3032 @param maxfailures: maximum number of time a job is allowd to fail 3033 @param keepalive: how often should server expect to hear from jobs 3034 """ 3035 3036 totalrows = 0 3037 cursor = self.getcursor() 3038 passkey = self.mkkey(6,9) 3039 3040 # jobs in QUEUED state 3041 self.logger.debug("reseting stale queued tasks...") 3042 sql = " UPDATE task SET " 3043 sql += " last_status = status, " 3044 sql += " status='RESET', " 3045 sql += " passkey='%s', " % passkey 3046 sql += " status_changed=NOW() " 3047 sql += " WHERE grid_id=%d " % grid_id 3048 sql += " AND status=%s " 3049 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%s,status_changed) " 3050 sql += " LIMIT 20 " 3051 3052 self.logger.debug(sql) 3053 3054 timeouts = [] 3055 timeouts.append(('QUEUED',maxidletime)) 3056 timeouts.append(('QUEUEING',maxsubmittime)) 3057 timeouts.append(('CLEANING',maxsubmittime)) 3058 timeouts.append(('EVICTED',maxevicttime)) 3059 timeouts.append(('COPIED',maxcopytime)) 3060 timeouts.append(('COPYINGINPUT',maxcopytime)) 3061 timeouts.append(('COPYINGOUTPUT',maxcopytime)) 3062 timeouts.append(('STARTING',maxcopytime)) 3063 cursor.executemany(sql, timeouts) 3064 3065 rowcount = self._conn.affected_rows() 3066 totalrows += rowcount 3067 self.commit() 3068 self.logger.debug('Reset %d queued jobs' % rowcount)
3069 3070 3071
3072 - def GetResetTasks(self,grid_id,max_reset=50):
3073 """ 3074 Fetch list of jobs that have completed for given grid_id 3075 """ 3076 # Get all jobs assigned to grid which need to be reset. 3077 cursor = self.getcursor() 3078 passkey = self.mkkey(6,9) 3079 3080 # put ERROR jobs with no grid assignment back in queue 3081 sql = " UPDATE task SET " 3082 sql += " last_status = status, " 3083 sql += " status='WAITING', " 3084 sql += " status_changed=NOW() " 3085 sql += " WHERE status IN ('ERROR','RESET') " 3086 sql += " AND (grid_id =0 OR grid_id IS NULL) " 3087 cursor.execute(sql) 3088 self.commit() 3089 3090 # set passkey for jobs in RESET state 3091 sql = " UPDATE task SET " 3092 sql += " last_status = status, " 3093 sql += " passkey='%s' " % passkey 3094 sql += " WHERE status='RESET' " 3095 sql += " AND grid_id = %d " % grid_id 3096 sql += " LIMIT %d " % max_reset 3097 cursor.execute(sql) 3098 self.commit() 3099 3100 # SELECT jobs to be cleaned up 3101 sql = " SELECT * FROM task " 3102 sql += " JOIN job ON task.job_id=job.job_id " 3103 sql += " WHERE task.grid_id = %d " % grid_id 3104 sql += " AND task.passkey='%s' " % passkey 3105 sql += " AND task.status='RESET' " 3106 sql += " LIMIT %d " % max_reset 3107 cursor.execute(sql) 3108 result_set = cursor.fetchall() 3109 self.commit() 3110 3111 # and set their status to CLEANING 3112 sql = " UPDATE task SET " 3113 sql += " last_status = status, " 3114 sql += " status = 'CLEANING', " 3115 sql += " passkey='%s', " % passkey 3116 sql += " status_changed=NOW() " 3117 sql += " WHERE status='RESET' " 3118 sql += " AND grid_id = %d " % grid_id 3119 sql += " AND NOW() > TIMESTAMPADD(MINUTE,5,status_changed) " 3120 sql += " ORDER BY job_id " 3121 sql += " LIMIT %d " % max_reset 3122 cursor.execute(sql) 3123 self.commit() 3124 3125 # reset jobs with given grid_id 3126 sql = " UPDATE task SET " 3127 sql += " task.last_status = task.status, " 3128 sql += " task.status='RESET', " 3129 sql += " task.status_changed=NOW() " 3130 sql += " WHERE task.status='ERROR' " 3131 sql += " AND task.grid_id = %d " % grid_id 3132 sql += " ORDER BY task.job_id " 3133 sql += " LIMIT %d " % max_reset 3134 cursor.execute(sql) 3135 self.commit() 3136 3137 return result_set
3138 3139
3140 - def GetActiveJobs(self,grid_id):
3141 """ 3142 Get list of jobs currently in any active state 3143 """ 3144 from iceprod.server.job import i3Job,i3Task 3145 cursor = self.getcursor() 3146 job_list = [] 3147 sql = " SELECT * FROM job " 3148 sql += " WHERE job.grid_id = %d " % grid_id 3149 sql += " AND job.status NOT IN " 3150 sql += "('WAITING','OK','SUSPENDED','FAILED')" 3151 cursor.execute(sql) 3152 for j in cursor.fetchall(): 3153 job = i3Job() 3154 job.SetDatasetId(j['dataset_id']) 3155 job.SetDatabaseId(j['job_id']) 3156 job.SetProcNum(j['queue_id']) 3157 job.SetPrio(j['priority']) 3158 job.SetJobId(j['grid_queue_id']) 3159 job.AddArgOption("key",j['passkey']) 3160 job.SetLogFile( "%s/%s.log" % (j['submitdir'],job.Prefix() )) 3161 job.SetOutputFile( "%s/%s.out" % (j['submitdir'],job.Prefix() )) 3162 job.SetErrorFile( "%s/%s.err" % (j['submitdir'],job.Prefix() )) 3163 3164 job_list.append(job) 3165 self.commit() 3166 return job_list
3167
3168 - def GetQueuedJobs(self,grid_id,delay=5):
3169 """ 3170 Get list of jobs currently in queued status 3171 """ 3172 from iceprod.server.job import i3Job 3173 cursor = self.getcursor() 3174 job_list = [] 3175 sql = " SELECT * FROM job " 3176 sql += " WHERE job.grid_id = %d " % grid_id 3177 sql += " AND job.status = 'QUEUED' " 3178 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay 3179 cursor.execute(sql) 3180 for j in cursor.fetchall(): 3181 job = i3Job() 3182 job.SetDatabaseId(j['job_id']) 3183 job.SetDatasetId(j['dataset_id']) 3184 job.SetProcNum(j['queue_id']) 3185 job.SetPrio(j['priority']) 3186 job.SetJobId(j['grid_queue_id']) 3187 job.AddArgOption("key",j['passkey']) 3188 job.SetLogFile( "%s/%s.log" % (j['submitdir'],job.Prefix() )) 3189 job.SetOutputFile( "%s/%s.out" % (j['submitdir'],job.Prefix() )) 3190 job.SetErrorFile( "%s/%s.err" % (j['submitdir'],job.Prefix() )) 3191 job_list.append(job) 3192 self.commit() 3193 return job_list
3194
3195 - def GetProcessingJobs(self,grid_id,delay=5):
3196 """ 3197 Get list of jobs currently in queued status 3198 """ 3199 from iceprod.server.job import i3Job 3200 cursor = self.getcursor() 3201 job_list = [] 3202 sql = " SELECT * FROM job " 3203 sql += " WHERE job.grid_id = %d " % grid_id 3204 sql += " AND job.status = 'PROCESSING' " 3205 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay 3206 cursor.execute(sql) 3207 for j in cursor.fetchall(): 3208 job = i3Job() 3209 job.SetDatasetId(j['dataset_id']) 3210 job.SetProcNum(j['queue_id']) 3211 job.SetPrio(j['priority']) 3212 job.SetJobId(j['grid_queue_id']) 3213 job.AddArgOption("key",j['passkey']) 3214 job.SetLogFile( "%s/%s.log" % (j['submitdir'],job.Prefix() )) 3215 job.SetOutputFile( "%s/%s.out" % (j['submitdir'],job.Prefix() )) 3216 job.SetErrorFile( "%s/%s.err" % (j['submitdir'],job.Prefix() )) 3217 job_list.append(job) 3218 self.commit() 3219 return job_list
3220
3221 - def GetFinishedTasks(self,grid_id,max_copy=20,delay=5):
3222 """ 3223 Fetch list of jobs that have completed for given grid_id 3224 """ 3225 # Get all jobs assigned to grid which need to be reset. 3226 cursor = self.getcursor() 3227 passkey = self.mkkey(6,9) 3228 job_list = [] 3229 3230 sql = " SELECT SUM(1) copying FROM `task` " 3231 sql += " WHERE " 3232 sql += " (status='COPYING' OR status='COPIED') " 3233 sql += " AND grid_id = %d " % grid_id 3234 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay 3235 self.logger.debug(sql) 3236 cursor.execute(sql) 3237 currently_copying = self.intcast(cursor.fetchone()['copying']) 3238 if not currently_copying: currently_copying = 0 3239 3240 sql = " UPDATE task SET " 3241 sql += " passkey='%s', " % passkey 3242 sql += " status_changed=NOW() " 3243 sql += " WHERE status='COPIED' " 3244 sql += " AND grid_id = %d " % grid_id 3245 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay 3246 sql += " LIMIT %d " % max_copy 3247 self.logger.debug(sql) 3248 cursor.execute(sql) 3249 self.commit() 3250 3251 sql = " SELECT task.*, plus.category, plus.subcategory, " 3252 sql += " YEAR(plus.start_datetime) as year," 3253 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source " 3254 sql += " FROM task,job,plus,dif " 3255 sql += " WHERE task.grid_id = %d " % grid_id 3256 sql += " AND task.job_id = job.job_id " 3257 sql += " AND job.dataset_id = plus.dataset_id " 3258 sql += " AND plus.dataset_id = dif.dataset_id " 3259 sql += " AND (task.status='COPYING' OR task.status='COPIED') " 3260 sql += " AND task.passkey='%s' " % passkey 3261 self.logger.debug(sql) 3262 cursor.execute(sql) 3263 result_set = cursor.fetchall() 3264 self.commit() 3265 3266 return result_set
3267 3268 3269
3270 - def GetTasks(self,grid_id,status=('QUEUED','QUEUEING','PROCESSING','RESET','ERROR'),delay=0):
3271 """ 3272 Get list of tasks currently in any given status 3273 """ 3274 from iceprod.server.job import i3Task 3275 cursor = self.getcursor() 3276 task_list = [] 3277 sql = " SELECT * FROM task" 3278 sql += " JOIN job " 3279 sql += " ON job.job_id = task.job_id " 3280 sql += " WHERE task.grid_id = %d " % grid_id 3281 if isinstance(status,tuple): 3282 sql += " AND task.status IN " + str(status) 3283 else: 3284 sql += " AND task.status = '%s' " % status 3285 self.logger.debug(sql) 3286 cursor.execute(sql) 3287 for j in cursor.fetchall(): 3288 task = i3Task() 3289 task.SetTaskId(j['task_id']) 3290 task.SetDatasetId(j['dataset_id']) 3291 task.SetDatabaseId(j['job_id']) 3292 task.SetProcNum(j['queue_id']) 3293 task.SetPrio(j['priority']) 3294 task.SetJobId(j['grid_queue_id']) 3295 task.AddArgOption("key",j['passkey']) 3296 task.SetLogFile( "%s/%s.log" % (j['submitdir'],task.Prefix() )) 3297 task.SetOutputFile( "%s/%s.out" % (j['submitdir'],task.Prefix() )) 3298 task.SetErrorFile( "%s/%s.err" % (j['submitdir'],task.Prefix() )) 3299 3300 task_list.append(task) 3301 self.commit() 3302 return task_list
3303 3304
3305 - def GetActiveTasks(self,grid_id,delay=0):
3306 """ 3307 Get list of tasks currently active 3308 """ 3309 return self.GetTasks(grid_id,status=('QUEUED','QUEUEING','PROCESSING','RESET','ERROR'),delay=delay)
3310 3311
3312 - def GetQueuedTasks(self,grid_id,delay=5):
3313 """ 3314 Get list of tasks currently in queue 3315 """ 3316 return self.GetTasks(grid_id,status=('QUEUED'),delay=delay)
3317 3318
3319 - def GetProcessingTasks(self,grid_id,delay=5):
3320 """ 3321 Get list of tasks currently in queue 3322 """ 3323 return self.GetTasks(grid_id,status=('STARTING','COPYINGINPUT','PROCESSING','COPYINGOUTPUT','READYTOCOPY','COPYING','COPIED'),delay=delay)
3324 3325
3326 - def CheckJobDependencies(self,grid_id):
3327 3328 cursor = self.getcursor() 3329 3330 # Get dependency rules for active datasets 3331 sql = " SELECT jd.* FROM job_dependency jd " 3332 sql += " JOIN dataset d ON jd.dataset_id = d.dataset_id " 3333 sql += " JOIN grid_statistics gs ON gs.dataset_id = d.dataset_id " 3334 sql += " WHERE d.status = 'PROCESSING' " 3335 sql += " AND gs.grid_id = %u " % grid_id 3336 self.logger.debug(sql) 3337 3338 try: 3339 cursor.execute(sql) 3340 except: 3341 logger.error(sql) 3342 return 3343 result = cursor.fetchall() 3344 3345 # Create dictionary of rules for datasets 3346 rules = {} 3347 for entry in result: 3348 if not rules.has_key('dataset_id'): 3349 rules[entry['dataset_id']] = [] 3350 rules[entry['dataset_id']].append(entry) 3351 3352 # Get IDLE jobs 3353 sql = " SELECT j.* FROM job j " 3354 sql += " JOIN dataset d ON j.dataset_id = d.dataset_id " 3355 sql += " JOIN grid_statistics gs ON gs.dataset_id = d.dataset_id " 3356 sql += " WHERE d.status = 'PROCESSING' " 3357 sql += " AND gs.grid_id = %u " % grid_id 3358 sql += " AND j.status = 'IDLE' " 3359 sql += " AND d.dataset_id IN ( " 3360 sql += " SELECT d.dataset_id FROM dataset d " 3361 sql += " JOIN job_dependency jd ON jd.dataset_id = d.dataset_id " 3362 sql += " JOIN grid_statistics gs ON gs.dataset_id = d.dataset_id " 3363 sql += " WHERE d.status = 'PROCESSING' " 3364 sql += " AND gs.grid_id = %u " % grid_id 3365 sql += " GROUP BY dataset_id " 3366 sql += " ) " 3367 sql += " LIMIT 500 " 3368 3369 try: 3370 cursor.execute(sql) 3371 except: 3372 logger.error(sql) 3373 return 3374 jobs = cursor.fetchall() 3375 3376 for job in jobs: # Check all rules for given job 3377 dataset = job['dataset_id'] 3378 deps = [] 3379 for rule in rules[dataset]: 3380 for table in ('job','archived_job'): 3381 sql = " SELECT j.* FROM %s j " % table 3382 sql += " WHERE j.dataset_id = %s " % rule['input_dataset'] 3383 sql += " AND j.%s = %s " % (rule['input_job'],job['queue_id']) 3384 try: 3385 cursor.execute(sql) 3386 except Exception,e: 3387 logger.error(e) 3388 else: 3389 deps += cursor.fetchall() 3390 3391 if reduce(lambda x,y: x and y['status'] in ('OK','COPIED'), deps, True): 3392 sql = " UPDATE job j1 SET status='WAITING' " 3393 sql += " WHERE j1.job_id = %(job_id)u " % job 3394 sql += " AND j1.status = 'IDLE' " 3395 logger.debug(sql) 3396 cursor.execute(sql) 3397 self.commit()
3398 3399
3400 - def QueueJobs(self,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0,maxq=1000):
3401 """ 3402 Reserve at most 'maxjobs' from a given dataset. 3403 Get proc ids and set their status to 'QUEUEING' 3404 """ 3405 from iceprod.server.job import i3Job,i3Task 3406 cursor = self.getcursor() 3407 job_list = {} 3408 3409 # Count the number of jobs currently enqueued or running 3410 sql = " SELECT SUM(1) AS total, " 3411 sql += " SUM(IF(status = 'QUEUED', 1, 0)) AS queued, " 3412 sql += " SUM(IF(status = 'QUEUEING', 1, 0)) AS queueing, " 3413 sql += " SUM(IF(status = 'PROCESSING', 1, 0)) AS processing " 3414 sql += " FROM job " 3415 sql += " WHERE job.grid_id = %d " % grid_id 3416 sql += " AND status NOT IN ('COPIED','IDLE','WAITING','OK','SUSPENDED','RESET','FAILED','CLEANING') " 3417 cursor.execute(sql) 3418 result = cursor.fetchone() 3419 self.commit() 3420 if result['total'] == None: 3421 self.logger.debug('queue total returned None') 3422 total = 0 3423 else: 3424 total = int(result['total']) 3425 if result['queued'] == None: 3426 queued = 0 3427 else: 3428 queued = int(result['queued']) 3429 if result['queueing'] == None: 3430 queueing = 0 3431 else: 3432 queueing = int(result['queueing']) 3433 if result['processing'] == None: 3434 processing = 0 3435 else: 3436 processing = int(result['processing']) 3437 3438 self.logger.info('%d jobs are currently in the queue' % total) 3439 self.logger.info('%d jobs are currently queued' % (queued+queueing)) 3440 self.logger.info('%d jobs are currently processing ' % processing) 3441 3442 maxjobs = maxjobs - min(maxjobs,total) 3443 if queued+queueing > maxq: maxjobs = 0 3444 3445 # Get next waiting jobs in queue 3446 sql = " SELECT gs.dataset_id FROM grid_statistics gs " 3447 sql += " JOIN dataset d ON gs.dataset_id = d.dataset_id " 3448 sql += " WHERE gs.grid_id = %d AND gs.suspend = 0 " % grid_id 3449 sql += " AND d.status = 'PROCESSING' " 3450 if debug: # run in debug mode 3451 sql += " AND gs.debug = 1 " 3452 else: 3453 sql += " AND gs.debug != 1 " 3454 3455 self.logger.debug(sql) 3456 cursor.execute(sql) 3457 dataset_list = [] 3458 result_set = cursor.fetchall() 3459 self.commit() 3460 3461 if not len(result_set) > 0: 3462 self.logger.info("no jobs to queue at this time") 3463 return {} 3464 3465 for item in result_set: 3466 dataset_list.append(item['dataset_id']) 3467 3468 sql = " SELECT job.*, dataset.temporary_storage, " 3469 sql += " plus.category, plus.subcategory, " 3470 sql += " YEAR(plus.start_datetime) as year, " 3471 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source " 3472 sql += " FROM job use index(status_dataset_index) " 3473 sql += " STRAIGHT_JOIN dataset ON job.dataset_id = dataset.dataset_id " 3474 sql += " STRAIGHT_JOIN dif ON job.dataset_id = dif.dataset_id " 3475 sql += " STRAIGHT_JOIN plus ON job.dataset_id = plus.dataset_id " 3476 sql += " WHERE job.status = 'WAITING' and job.dataset_id in ( " 3477 sep = "" 3478 for d in dataset_list: 3479 sql += sep+str(d) 3480 sep = "," 3481 sql += ") " 3482 sql += " ORDER BY job.priority DESC," 3483 if fifo: # datasets with smaller numbers should run first 3484 sql += " job.dataset_id," 3485 sql += " job.queue_id " 3486 sql += " LIMIT %u " % min(jobs_at_once,maxjobs) 3487 3488 3489 self.logger.debug(sql) 3490 cursor.execute(sql) 3491 result_set = cursor.fetchall() 3492 self.commit() 3493 3494 if not len(result_set) > 0: 3495 self.logger.info("no jobs to queue at this time") 3496 return {} 3497 else: 3498 self.logger.info("reserved %d jobs" % len(result_set)) 3499 3500 for item in result_set: 3501 proc = item['queue_id'] 3502 target_url = item['temporary_storage'] 3503 item['subdirectory'] = "%05d-%05d" % ((proc/1000)*1000,((proc+1000)/1000)*1000-1) 3504 # Don't want to keep looping if we have exceeded maxjobs 3505 if len(job_list) >= maxjobs: 3506 return job_list 3507 3508 # Set job status to 'QUEUED' 3509 passkey = self.mkkey(6,9) 3510 queue_id = item['queue_id'] 3511 dataset_id = item['dataset_id'] 3512 priority = item['priority'] 3513 3514 sql = " UPDATE job SET " 3515 sql += " job.prev_state = job.status, " 3516 sql += " job.status='QUEUEING', " 3517 sql += " job.host=NULL, " 3518 sql += " job.grid_id=%d, " % grid_id 3519 sql += " status_changed=NOW()," 3520 sql += " passkey='%s' " % passkey 3521 sql += " WHERE job.status='WAITING' " 3522 sql += " AND job.queue_id=%d " % queue_id 3523 sql += " AND job.dataset_id=%d " % dataset_id 3524 try: 3525 cursor.execute(sql) 3526 rowcount = self._conn.affected_rows() 3527 self.commit() 3528 except Exception, e: 3529 self.logger.debug(e) 3530 continue 3531 if rowcount == 1: 3532 self.logger.debug("%d job has been marked as 'QUEUEING' " % rowcount) 3533 3534 sql = " SELECT * FROM run " 3535 sql += " WHERE run.dataset_id = %u " % dataset_id 3536 sql += " AND run.queue_id = %u " % queue_id 3537 cursor.execute(sql) 3538 runinfo = cursor.fetchone() 3539 3540 job = i3Job() 3541 job.SetDatabaseId(item['job_id']) 3542 job.SetDatasetId(dataset_id) 3543 job.SetProcNum(queue_id) 3544 job.SetPrio(priority) 3545 job.passkey = passkey 3546 job.AddArgOption("key",passkey) 3547 if runinfo: 3548 self.logger.info("%(run_id)s, date %(date)s" % runinfo) 3549 job.AddArgOption("run",runinfo["run_id"]) 3550 job.AddArgOption("subrun",runinfo["sub_run"]) 3551 job.AddArgOption("date",str(runinfo['date'])) 3552 3553 if not job_list.has_key(dataset_id): 3554 job_list[dataset_id] = [] 3555 job_list[dataset_id].append(job) 3556 elif rowcount == 0: 3557 self.logger.warn("someone beat me to job %d in dataset %d" % \ 3558 (queue_id,dataset_id)) 3559 else: 3560 raise Exception, "getjob:wrong number of rows affected" 3561 3562 return job_list
3563 3564
3565 - def SetTasks(self,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0):
3566 """ 3567 DAG mode 3568 Reserve at most 'maxjobs' from a given dataset. 3569 Get proc ids and set their status to 'QUEUEING' 3570 """ 3571 3572 # Get next waiting jobs in queue 3573 sql = " SELECT job.*, " 3574 sql += " plus.category, plus.subcategory, " 3575 sql += " YEAR(plus.start_datetime) as year, " 3576 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source " 3577 sql += " FROM task " 3578 sql += " JOIN job ON task.job_id = job.job_id " 3579 sql += " JOIN task_def_tray tdt ON task.task_def_tray_id = tdt.task_def_tray_id " 3580 sql += " JOIN grid_statistics gst ON gst.task_def_id = tdt.task_def_id " 3581 sql += " JOIN dif on job.dataset_id = dif.dataset_id " 3582 sql += " JOIN plus on job.dataset_id = plus.dataset_id " 3583 sql += " WHERE gst.grid_id = %d " % grid_id 3584 sql += " AND gst.suspend != 1 " 3585 3586 sql += " AND task.status='WAITING' " 3587 sql += " ORDER BY job.priority DESC," 3588 if fifo: # datasets with smaller numbers should run first 3589 sql += " dataset_id," 3590 sql += " queue_id " 3591 sql += " LIMIT %u " % min(jobs_at_once,maxjobs) 3592 3593 self.logger.debug(sql) 3594 cursor.execute(sql) 3595 result_set = cursor.fetchall() 3596 self.commit() 3597 3598 if not len(result_set) > 0: 3599 self.logger.info("no jobs to queue at this time") 3600 return {} 3601 else: 3602 self.logger.info("reserved %d jobs" % len(result_set))
3603 3604 3605 3606
3607 - def QueueTasks(self,maxjobs,grid_id,jobs_at_once=20,fifo=True,debug=0):
3608 """ 3609 DAG mode 3610 Reserve at most 'maxjobs' from a given dataset. 3611 Get proc ids and set their status to 'QUEUEING' 3612 """ 3613 from iceprod.server.job import i3Job,i3Task 3614 cursor = self.getcursor() 3615 job_list = {} 3616 3617 # Count the number of jobs currently enqueued or running 3618 sql = " SELECT SUM(1) AS total " 3619 sql += " FROM task " 3620 sql += " JOIN task_def_tray tdt ON task.task_def_tray_id = tdt.task_def_tray_id " 3621 sql += " JOIN task_def td ON tdt.task_def_id = td.task_def_id " 3622 sql += " JOIN grid_statistics gst ON gst.dataset_id = td.dataset_id " 3623 sql += " WHERE gst.grid_id = %d " % grid_id 3624 sql += " AND task.status = 'WAITING' " 3625 cursor.execute(sql) 3626 result = cursor.fetchone() 3627 self.commit() 3628 if result['total'] == None: 3629 self.logger.debug('queue total returned None') 3630 total = 0 3631 else: 3632 total = int(result['total']) 3633 3634 #maxjobs = maxjobs - min(maxjobs,total) 3635 maxjobs = min(maxjobs,total) 3636 self.logger.info('%d jobs are currently in the queue' % total) 3637 3638 # Get next waiting jobs in queue 3639 sql = " SELECT tdt.task_def_id, min(tdt.idx) as idx, tdt.task_def_tray_id " 3640 sql += " FROM task_def_tray tdt JOIN task_def td ON tdt.task_def_id = td.task_def_id " 3641 sql += " JOIN grid_statistics gs ON gs.task_def_id = td.task_def_id " 3642 sql += " JOIN dataset d ON d.dataset_id = td.dataset_id " 3643 sql += " WHERE d.status='PROCESSING' AND gs.grid_id = %u " % grid_id 3644 sql += " GROUP BY tdt.task_def_id " 3645 sql += " ORDER BY task_def_tray_id " 3646 self.logger.debug(sql) 3647 cursor.execute(sql) 3648 result_set = cursor.fetchall() 3649 tdts = map(lambda x: "%u" % x['task_def_tray_id'],result_set) 3650 3651 3652 # Get next waiting jobs in queue 3653 sql = " SELECT job.*,task.*, tdt.*, td.*, " 3654 sql += " plus.category, plus.subcategory, " 3655 sql += " YEAR(plus.start_datetime) as year, " 3656 sql += " LOWER(SUBSTRING(dif.source_name,1,3)) as source " 3657 sql += " FROM task " 3658 sql += " JOIN task_def_tray tdt ON task.task_def_tray_id = tdt.task_def_tray_id " 3659 sql += " JOIN task_def td ON tdt.task_def_id = td.task_def_id " 3660 sql += " JOIN job ON task.job_id = job.job_id " 3661 sql += " JOIN dif on job.dataset_id = dif.dataset_id " 3662 sql += " JOIN plus on job.dataset_id = plus.dataset_id " 3663 sql += " JOIN grid_statistics gst ON gst.dataset_id = td.dataset_id AND gst.task_def_id = td.task_def_id " 3664 sql += " WHERE gst.grid_id = %d " % grid_id 3665 sql += " AND task.status='WAITING' " 3666 sql += " AND job.status IN ('QUEUED','PROCESSING') " 3667 sql += " AND task.task_def_tray_id IN %s " % str(tuple(tdts)) 3668 sql += " GROUP BY tdt.task_def_id " 3669 sql += " ORDER BY job.priority DESC," 3670 if fifo: # datasets with smaller numbers should run first 3671 sql += " job.dataset_id," 3672 sql += " job.queue_id " 3673 sql += " LIMIT %u " % min(jobs_at_once,maxjobs) 3674 3675 self.logger.debug(sql) 3676 cursor.execute(sql) 3677 result_set = cursor.fetchall() 3678 self.commit() 3679 3680 if not len(result_set) > 0: 3681 self.logger.info("no jobs to queue at this time") 3682 return {} 3683 else: 3684 self.logger.info("reserved %d jobs" % len(result_set)) 3685 3686 for item in result_set: 3687 proc = item['queue_id'] 3688 item['subdirectory'] = "%05d-%05d" % ((proc/1000)*1000,((proc+1000)/1000)*1000-1) 3689 # Don't want to keep looping if we have exceeded maxjobs 3690 if len(job_list) >= maxjobs: 3691 return job_list 3692 3693 passkey = item['passkey'] 3694 #passkey = self.mkkey(6,9) 3695 queue_id = item['queue_id'] 3696 dataset_id = item['dataset_id'] 3697 task_id = item['task_id'] 3698 priority = item['priority'] 3699 sql = " UPDATE task,job SET " 3700 sql += " task.last_status = task.status, " 3701 sql += " task.status='QUEUEING', " 3702 sql += " task.host=NULL, " 3703 sql += " task.grid_id=%s, " % grid_id 3704 sql += " task.status_changed=NOW(), " 3705 sql += " task.passkey = '%s' " % passkey 3706 sql += " WHERE task.job_id= job.job_id " 3707 sql += " AND task.task_id= %s " % task_id 3708 sql += " AND task.status='WAITING' " 3709 sql += " AND job.status IN ('QUEUED','PROCESSING') " 3710 self.logger.debug(sql) 3711 try: 3712 cursor.execute(sql) 3713 rowcount = self._conn.affected_rows() 3714 self.commit() 3715 except Exception, e: 3716 self.logger.error(e) 3717 continue 3718 if rowcount == 1: 3719 self.logger.debug("%d job has been marked as 'QUEUEING' " % rowcount) 3720 task = i3Task() 3721 task.SetDatabaseId(item['job_id']) 3722 task.idx = item['idx'] 3723 task.iter = item['iter'] 3724 task.SetDatasetId(dataset_id) 3725 task.SetProcNum(queue_id) 3726 task.SetPrio(priority) 3727 task.SetTaskId(task_id) 3728 task.task_name = item['name'] 3729 task.passkey = passkey 3730 task.AddArgOption("key",passkey) 3731 if not job_list.has_key(dataset_id): 3732 job_list[dataset_id] = [] 3733 job_list[dataset_id].append(task) 3734 elif rowcount == 0: 3735 self.logger.warn("someone beat me to task %u" % task_id) 3736 else: 3737 raise Exception, "get task:wrong number of rows affected" 3738 3739 return job_list
3740 3741
3742 - def SuspendGridDataset(self,grid,dataset,suspend=1):
3743 """ 3744 Update grid participation in dataset 3745 dataset. 3746 @param grid: grid or cluster 3747 @param dataset: dataset id 3748 """ 3749 3750 cursor = self.getcursor() 3751 sql = " SELECT g.grid_id FROM grid_statistics gs " 3752 sql += " JOIN grid g on gs.grid_id = g.grid_id " 3753 sql += " WHERE gs.dataset_id = %u " % dataset 3754 if type(grid) is types.IntType: 3755 sql += " AND g.grid_id=%u " % grid 3756 elif grid not in ('any','*','all'): 3757 sql += " AND g.name='%s' " % grid 3758 cursor.execute(sql) 3759 gridmap = cursor.fetchall() 3760 3761 if not gridmap: 3762 sql = " INSERT INTO grid_statistics (grid_id,dataset_id) " 3763 if type(grid) is types.IntType: 3764 sql += " VALUES (%s,%s) " % (grid,dataset) 3765 else: 3766 sql += " SELECT grid_id,%s FROM grid where name = '%s' " % (dataset,grid) 3767 cursor.execute(sql) 3768 3769 sql = " UPDATE grid_statistics gs, grid g SET gs.suspend = %u " % suspend 3770 sql += " WHERE gs.dataset_id = %u " % dataset 3771 if type(grid) is types.IntType: 3772 sql += " AND g.grid_id=%u " % grid 3773 elif grid not in ('any','*','all'): 3774 sql += " AND g.name='%s' " % grid 3775 cursor.execute(sql) 3776 3777 self.commit()
3778 3779
3780 - def InitializeGridStats(self,grids,dataset_id):
3781 """ 3782 Insert grid_statistics entries for grids which should run this 3783 dataset. 3784 @param grids: list of grids or clusters 3785 @param dataset_id: dataset id 3786 """ 3787 # get grid ids if we were given names 3788 delim = "" 3789 sql = " SELECT grid_id FROM grid WHERE " 3790 for grid in grids: 3791 if type(grid) is types.IntType: 3792 sql += " %s grid_id=%u " % (delim,grid) 3793 elif grid not in ('any','*','all'): 3794 sql += " %s name='%s' " % (delim,grid) 3795 delim = "OR" 3796 logger.debug(sql) 3797 cursor = self.getcursor() 3798 cursor.execute(sql) 3799 result_set = cursor.fetchall() 3800 self.commit() 3801 3802 if len(result_set) == 0: 3803 self.logger.error("could not match grid name '%s'" % ":".join(grids)) 3804 return 3805 3806 # find if any of these grids are already added 3807 sql = " SELECT grid_id,dataset_id,suspend FROM grid_statistics WHERE grid_id in (" 3808 delim = "" 3809 for item in result_set: 3810 sql += " %s %d " % (delim, item['grid_id']) 3811 delim = "," 3812 sql += ") and dataset_id = %d" % dataset_id 3813 logger.debug(sql) 3814 cursor = self.getcursor() 3815 cursor.execute(sql) 3816 result_set2 = cursor.fetchall() 3817 3818 insert_grids = [] 3819 unsuspend_grids = [] 3820 for item in result_set: 3821 flag = None 3822 for item2 in result_set2: 3823 if item['grid_id'] == item2['grid_id']: 3824 if item2['suspend'] == 1 or item2['suspend'] == '1': 3825 flag = 'u' 3826 else: 3827 flag = 'f' 3828 break 3829 if flag == 'u': 3830 unsuspend_grids.append(item['grid_id']) 3831 elif flag == None: 3832 insert_grids.append(item['grid_id']) 3833 3834 # add new grid,dataset pairs 3835 if len(insert_grids) > 0: 3836 delim = "" 3837 sql = " INSERT INTO grid_statistics " 3838 sql += " (grid_id,dataset_id) VALUES " 3839 for g in insert_grids: 3840 sql += " %s (%d,%d) " % ( delim, g, dataset_id) 3841 delim = "," 3842 logger.debug(sql) 3843 cursor = self.getcursor() 3844 cursor.execute(sql) 3845 self.commit() 3846 3847 # unsuspend suspended grids 3848 if len(unsuspend_grids) > 0: 3849 delim = "" 3850 sql = " UPDATE grid_statistics " 3851 sql += " set suspend = 1 WHERE grid_id in ( " 3852 for g in unsuspend_grids: 3853 sql += " %s %d " % ( delim, g) 3854 delim = "," 3855 sql += ") and dataset_id = %d" % dataset_id 3856 logger.debug(sql) 3857 cursor = self.getcursor() 3858 cursor.execute(sql) 3859 self.commit() 3860 3861
3862 - def InitializeGridStatsDAG(self,grids,steering,dataset_id):
3863 """ 3864 Insert grid_statistics entries for grids which should run this 3865 dataset. 3866 @param grids: list of grids or clusters 3867 @param dataset_id: dataset id 3868 """ 3869 cursor = self.getcursor() 3870 3871 # 3872 # See what is already matched (we may want to also remove things that no longer match at some point) 3873 # 3874 sql = " SELECT grid_id,task_def_id FROM grid_statistics WHERE dataset_id = %s" % dataset_id 3875 cursor.execute(sql) 3876 matches = {} 3877 for item in cursor.fetchall(): 3878 matches[ (item['grid_id'],item['task_def_id']) ] = True 3879 3880 # 3881 # See what resources are available on different grids 3882 # 3883 delim = "" 3884 sql = " SELECT grid.name, grid_req.* FROM grid " 3885 sql += " JOIN grid_req ON grid.grid_id = grid_req.grid_id " 3886 sql += " WHERE grid.name in (%s) " % ",".join(map(lambda x: '"'+x+'"', grids)) 3887 grid_ids = filter(lambda x: isinstance(x,int),grids) 3888 if len(grid_ids): 3889 sql += " OR grid.grid_id in (%s) " % ",".join(grid_ids) 3890 self.logger.debug(sql) 3891 cursor.execute(sql) 3892 gridmap = cursor.fetchall() 3893 3894 # 3895 # Get task requirements to match with resources 3896 # 3897 sql = " SELECT task_def.* FROM task_def " 3898 sql += " WHERE dataset_id = %u " % dataset_id 3899 self.logger.debug(sql) 3900 cursor.execute(sql) 3901 result_set = cursor.fetchall() 3902 import string 3903 3904 3905 # 3906 # Create grid,dataset,task pivots for matched grid,requirement pairs 3907 # 3908 inserts = [] 3909 sql = " INSERT IGNORE INTO grid_statistics (grid_id,dataset_id,task_def_id) VALUES (%s,%s,%s) " 3910 for entry in result_set: 3911 tdef = steering.GetTaskDefinition(entry['name']) 3912 for grid in gridmap: 3913 include_grid = True 3914 for req in map(string.strip,tdef.requirements.split("&&")): 3915 include_grid = include_grid and grid['req'].lower() == req.lower() 3916 if include_grid and not matches.has_key( (grid['grid_id'],entry['task_def_id'])) : # don't insert it if already there 3917 inserts.append((grid['grid_id'],dataset_id,entry['task_def_id'])) 3918 self.logger.info("found match for '%s' reqs on grid %s" % (entry['name'],grid['grid_id'] )) 3919 else: 3920 self.logger.error("no match for '%s' reqs on grid %s" % (entry['name'],grid['grid_id'] )) 3921 cursor.executemany(sql, inserts) 3922 self.commit() 3923 3924
3925 - def InitializeJobTable(self,maxjobs,dataset_id,priority=0,stepsize=1000, start_qid=0, status='WAITING'):
3926 """ 3927 Create job monitoring entries in database 3928 """ 3929 cursor = self.getcursor() 3930 3931 sql = " INSERT INTO job " 3932 sql += " (queue_id,status,dataset_id,priority,status_changed) VALUES " 3933 3934 qstart = start_qid 3935 qend = start_qid + maxjobs 3936 for i in range(qstart,qend,min(stepsize,maxjobs)): 3937 comma = "" 3938 sql1 = sql 3939 for job in range(i,min(i+stepsize,qend)): 3940 sql1 += comma + " (%d,'%s',%d,%d,NOW()) " % ( job, status, dataset_id, priority ) 3941 comma = "," 3942 cursor.execute(sql1) 3943 self.commit() 3944 3945
3946 - def GetJob(self,dataset_id=0,queue_id=0):
3947 """ 3948 Get Job info 3949 @param queue_id: queue_id 3950 @param dataset_id: dataset ID 3951 @return: i3Job object 3952 """ 3953 from job import i3Job 3954 cursor = self.getcursor() 3955 3956 sql = " SELECT * FROM job " 3957 sql += " WHERE dataset_id = %d " % dataset_id 3958 sql += " AND queue_id = %d " % queue_id 3959 cursor.execute(sql) 3960 j = cursor.fetchone() 3961 job = i3Job() 3962 job.SetDatasetId(j['dataset_id']) 3963 job.SetDatabaseId(j['job_id']) 3964 job.SetProcNum(j['queue_id']) 3965 job.SetPrio(j['priority']) 3966 job.SetJobId(j['grid_queue_id']) 3967 job.AddArgOption("key",j['passkey']) 3968 job.SetLogFile( "%s/%s.log" % (j['submitdir'],job.Prefix() )) 3969 job.SetOutputFile( "%s/%s.out" % (j['submitdir'],job.Prefix() )) 3970 job.SetErrorFile( "%s/%s.err" % (j['submitdir'],job.Prefix() )) 3971 return job
3972 3973
3974 - def jobstart(self,hostname,grid_id,dataset_id=0,queue_id=0,key=None):
3975 """ 3976 Change the status of a job to indicate it is currently running 3977 @param hostname: host where job was queued from 3978 @param grid_id: ID of iceprod queue 3979 @param dataset_id: Optional dataset ID 3980 @param queue_id: Optional job ID (within dataset) 3981 @param key: temporary passkey to avoid job spoofs 3982 @return: dataset_id,nproc,procnum 3983 """ 3984 3985 cursor = self.getcursor() 3986 3987 sql = " SELECT jobs_submitted " 3988 sql += " FROM dataset " 3989 sql += " WHERE dataset_id = %d " % dataset_id 3990 cursor.execute(sql) 3991 item = cursor.fetchone() 3992 3993 jobs_submitted = item['jobs_submitted'] 3994 # Set job status to 'PROCESSING' 3995 sql = " UPDATE job SET " 3996 sql += " job.prev_state = job.status, " 3997 sql += " job.status='PROCESSING', " 3998 sql += " job.grid_id=%d, " % grid_id 3999 sql += " job.host='%s', " % hostname 4000 sql += " job.tray=0, " 4001 sql += " job.iter=0, " 4002 sql += " status_changed=NOW(), " 4003 sql += " keepalive=NOW() " 4004 sql += " WHERE job.status IN ('QUEUEING','QUEUED','PROCESSING','EVICTED') " 4005 sql += " AND job.queue_id=%d " % queue_id 4006 sql += " AND job.dataset_id=%d " % dataset_id 4007 sql += " AND job.passkey='%s' " % key 4008 rowcount = self.execute(cursor,sql) 4009 self.commit() 4010 4011 sql = " SELECT * FROM job " 4012 sql += " WHERE dataset_id = %d " % dataset_id 4013 sql += " AND job.queue_id=%d " % queue_id 4014 sql += " AND job.passkey='%s' " % key 4015 cursor.execute(sql) 4016 job = cursor.fetchone() 4017 4018 4019 if job: 4020 self.logger.debug("updated job %(dataset_id)s.%(queue_id)s with passkey %(passkey)s" % job ) 4021 return job['dataset_id'],jobs_submitted,job['queue_id'] 4022 return -1,0,0
4023
4024 - def jobreset(self,dataset_id,job_id,reason=None,passkey=None):
4025 """ 4026 Update status for job 4027 @param dataset_id: dataset index 4028 @param job_id: process number within dataset 4029 """ 4030 cursor = self.getcursor() 4031 sql = " SELECT grid_queue_id FROM job " 4032 sql += " WHERE dataset_id=%d " % dataset_id 4033 sql += " AND queue_id=%d " % job_id 4034 if passkey: 4035 sql += " AND passkey='%s' " % passkey 4036 cursor.execute(sql) 4037 qid = cursor.fetchone() 4038 4039 sql = " UPDATE job SET " 4040 sql += " tray=0, " 4041 sql += " iter=0, " 4042 sql += " prev_state = status, " 4043 if reason: 4044 sql += " errormessage = '%s', " % self.defang(reason) 4045 sql += " status='RESET', " 4046 sql += " status_changed=NOW() " 4047 sql += " WHERE dataset_id=%d " % dataset_id 4048 sql += " AND queue_id=%d " % job_id 4049 if passkey: 4050 sql += " AND passkey='%s' " % passkey 4051 self.execute(cursor,sql) 4052 4053 return qid['grid_queue_id']
4054
4055 - def jobcopying(self,dataset_id,job_id,passkey=None):
4056 """ 4057 Update status for job 4058 @param dataset_id: dataset index 4059 @param job_id: process number within dataset 4060 """ 4061 cursor = self.getcursor() 4062 sql = " UPDATE job SET " 4063 sql += " prev_state = status, " 4064 sql += " status='COPYING', " 4065 sql += " status_changed=NOW() " 4066 sql += " WHERE dataset_id=%d " % dataset_id 4067 sql += " AND queue_id=%d " % job_id 4068 if passkey: 4069 sql += " AND passkey='%s' " % passkey 4070 self.execute(cursor,sql) 4071 return 1
4072
4073 - def jobfinalize(self,dataset_id,job_id,job,status='OK',clear_errors=True):
4074 """ 4075 Update status for job 4076 @param dataset_id: dataset index 4077 @param job_id: process number within dataset 4078 """ 4079 cursor = self.getcursor() 4080 4081 sql = " SELECT * FROM job " 4082 sql += " WHERE dataset_id=%d " % dataset_id 4083 sql += " AND queue_id=%d " % job_id 4084 cursor.execute(sql) 4085 job = cursor.fetchone() 4086 4087 sql = " UPDATE job SET " 4088 sql += " prev_state = status, " 4089 sql += " status='%s', " % status 4090 if clear_errors: 4091 sql += " errormessage = NULL, " 4092 sql += " status_changed=NOW() " 4093 sql += " WHERE dataset_id=%d " % dataset_id 4094 sql += " AND queue_id=%d " % job_id 4095 sql += " AND passkey='%s' " % job['passkey'] 4096 self.execute(cursor,sql) 4097 4098 if status == 'OK': 4099 # Update grid statistics 4100 sql = " UPDATE grid_statistics SET " 4101 sql += " grid_statistics.ok = grid_statistics.ok+1 " 4102 sql += " WHERE grid_statistics.dataset_id= %u " % job['dataset_id'] 4103 sql += " AND grid_statistics.grid_id= %u " % job['grid_id'] 4104 self.logger.debug(sql) 4105 try: 4106 self.execute(cursor,sql) 4107 except Exception,e: 4108 self.logger.error(e) 4109 self.logger.debug("wrote stats for job %d,%d " % (dataset_id,job_id)) 4110 4111 return 0,job['submitdir']
4112
4113 - def get_stats(self,dataset_id):
4114 """ 4115 Get collected dataset statistics 4116 @param dataset_id: dataset index 4117 """ 4118 stats = {} 4119 cursor = self.getcursor() 4120 sql = " SELECT name,value FROM dataset_statistics " 4121 sql += " WHERE dataset_id='%s' " % dataset_id 4122 cursor.execute(sql) 4123 for entry in cursor.fetchall(): 4124 stats[entry['name']] = entry['value'] 4125 self.commit() 4126 return stats
4127
4128 - def jobfinish(self,dataset_id,job_id,stats,key=None,mode=0):
4129 """ 4130 Update monitoring for job and write statistics 4131 @param dataset_id: dataset index 4132 @param job_id: process number within dataset 4133 @param stats: dictonary of stat entries 4134 """ 4135 passkey = self.mkkey(6,9) 4136 cursor = self.getcursor() 4137 sql = " UPDATE job SET " 4138 sql += " prev_state = status, " 4139 if mode == 1: 4140 sql += " status = 'COPIED', " 4141 #self.CheckJobDependencies(dataset_id,job_id) 4142 else: 4143 sql += " status = 'READYTOCOPY', " 4144 if stats.has_key('mem_heap'): 4145 sql += " mem_heap = %s, " % stats['mem_heap'] 4146 if stats.has_key('mem_heap_peak'): 4147 sql += " mem_heap_peak = %s, " % stats['mem_heap_peak'] 4148 if stats.has_key('user_time'): 4149 sql += " time_user = %s, " % stats['user_time'] 4150 if stats.has_key('sys_time'): 4151 sql += " time_sys = %s, " % stats['sys_time'] 4152 if stats.has_key('real_time'): 4153 sql += " time_real = %s, " % stats['real_time'] 4154 if stats.has_key('Triggered Events'): 4155 sql += " nevents = %s, " % stats['Triggered Events'] 4156 if stats.has_key('Generated Events'): 4157 sql += " gevents = %s, " % stats['Generated Events'] 4158 sql += " job.passkey='%s', " % passkey 4159 sql += " status_changed=NOW() " 4160 sql += " WHERE dataset_id=%d " % dataset_id 4161 sql += " AND queue_id=%d " % job_id 4162 sql += " AND job.passkey='%s' " % key 4163 self.logger.debug(sql) 4164 rowcount = self.execute(cursor,sql) 4165 4166 sql = " SELECT grid_id,host FROM job " 4167 sql += " WHERE queue_id=%d " % job_id 4168 sql += " AND dataset_id=%d " % dataset_id 4169 self.logger.debug(sql) 4170 cursor.execute(sql) 4171 gridinfo = cursor.fetchone() 4172 self.commit() 4173 if not gridinfo: # don't continue 4174 return (rowcount+1)%2 4175 4176 4177 if len(stats): 4178 4179 if gridinfo("host"): 4180 # update node statistics 4181 self.update_node_statistics(gridinfo,stats) 4182 4183 cm = "" 4184 sql = " REPLACE INTO job_statistics " 4185 sql += " (queue_id,dataset_id,name,value) VALUES " 4186 for key,value in stats.items(): 4187 try: # check that value is numeric 4188 value = float(value) 4189 sql += "%s (%d,%d,'%s',%f) " % (cm,job_id,dataset_id,key,value) 4190 cm = "," 4191 except: continue 4192 cursor.execute(sql) 4193 4194 sql = " UPDATE grid_statistics SET " 4195 sql += " time_real = time_real + %g, " % stats['real_time'] 4196 sql += " time_sys = time_sys + %g, " % stats['sys_time'] 4197 sql += " time_user = time_user + %g " % stats['user_time'] 4198 sql += " WHERE grid_statistics.dataset_id= %u " % dataset_id 4199 sql += " AND grid_statistics.grid_id= %u " % gridinfo['grid_id'] 4200 cursor.execute(sql) 4201 4202 self.commit() 4203 4204 return (rowcount+1)%2
4205 4206
4207 - def update_node_statistics(self,gridinfo,stats,retval=0):
4208 4209 cursor = self.getcursor() 4210 4211 # Generate host_id if none given 4212 gridinfo['hostname'] = gridinfo['host'] 4213 if stats.has_key('hostname'): 4214 gridinfo['hostname'] = stats['hostname'] 4215 4216 # Create node entry if it does not exist 4217 sql = " INSERT IGNORE INTO node_statistics " 4218 sql += " (name,grid_id,host_id) VALUES ('%s',1,'%s') " % (gridinfo['hostname'],gridinfo['host']) 4219 self.logger.debug(sql) 4220 cursor.execute(sql) 4221 self.commit() 4222 4223 sql = " UPDATE node_statistics SET " 4224 if retval == 0: 4225 sql += " completed = completed+1, " 4226 elif retval == 1: 4227 sql += " evictions = evictions+1, " 4228 elif retval == 2: 4229 sql += " failures = failures+1, " 4230 if stats.has_key('platform'): 4231 sql += " node_statistics.platform='%s', " % stats['platform'] 4232 if stats.has_key('mem_heap'): 4233 sql += " mem_heap = mem_heap+%s, " % stats['mem_heap'] 4234 if stats.has_key('mem_heap_peak'): 4235 sql += " mem_heap_peak = mem_heap_peak+%s, " % stats['mem_heap_peak'] 4236 if stats.has_key('user_time'): 4237 sql += " time_user = time_user+%s, " % stats['user_time'] 4238 if stats.has_key('sys_time'): 4239 sql += " time_sys = time_sys+%s, " % stats['sys_time'] 4240 if stats.has_key('real_time'): 4241 sql += " time_real = time_real+%s, " % stats['real_time'] 4242 sql += " name = '%s' " % gridinfo['hostname'] 4243 4244 sql += " WHERE host_id='%s' " % gridinfo['hostname'] 4245 self.logger.debug(sql) 4246 cursor.execute(sql) 4247 self.commit()
4248 4249
4250 - def jobsubmitted(self,dataset_id,job_id,submitdir,grid_queue_id=None):
4251 """ 4252 Set the submission path of job so that it can be post processed 4253 on termination. 4254 """ 4255 cursor = self.getcursor() 4256 sql = " UPDATE job SET " 4257 if not grid_queue_id == -1: 4258 sql += " grid_queue_id='%s', " % grid_queue_id 4259 sql += " submitdir=\"%s\" " % submitdir 4260 sql += " WHERE queue_id=%d " % job_id 4261 sql += " AND dataset_id=%d " % dataset_id 4262 self.logger.debug(sql) 4263 self.execute(cursor,sql) 4264 4265 cursor = self.getcursor() 4266 sql = " UPDATE job SET " 4267 sql += " prev_state = status, " 4268 sql += " status='QUEUED' " 4269 sql += " WHERE queue_id=%d " % job_id 4270 sql += " AND dataset_id=%d " % dataset_id 4271 sql += " AND status='QUEUEING' " 4272 self.logger.debug(sql) 4273 self.execute(cursor,sql) 4274 return 1
4275
4276 - def jobping(self,dataset_id,job_id,host,key=None,tray=0,iter=0):
4277 """ 4278 Update status_changed time for job 4279 """ 4280 cursor = self.getcursor() 4281 sql = " SELECT status from job " 4282 sql += " WHERE queue_id=%d " % job_id 4283 sql += " AND dataset_id=%d " % dataset_id 4284 sql += " AND passkey='%s' " % key 4285 self.logger.debug(sql) 4286 cursor.execute(sql) 4287 row = cursor.fetchone(); 4288 if not (row and row['status'] == 'PROCESSING'): 4289 return False 4290 4291 sql = " UPDATE job SET " 4292 sql += " tray=%d, " % tray 4293 sql += " iter=%d, " % iter 4294 sql += " keepalive=NOW() " 4295 sql += " WHERE queue_id=%d " % job_id 4296 sql += " AND dataset_id=%d " % dataset_id 4297 sql += " AND status='PROCESSING' " 4298 sql += " AND passkey='%s' " % key 4299 self.logger.debug(sql) 4300 self.execute(cursor,sql) 4301 return True
4302
4303 - def jobabort(self,job_id,dataset_id,error,errormessage='',key=None,stats={}):
4304 """ 4305 Reset any pending jobs to they get reprocesses. 4306 This would typically be run at startup in case the daemon 4307 crashed previously. 4308 @todo: update node statistics 4309 """ 4310 cursor = self.getcursor() 4311 4312 sql = " SELECT debug FROM dataset WHERE dataset_id = %d " % dataset_id 4313 self.logger.debug(sql) 4314 cursor.execute(sql) 4315 debug = cursor.fetchone()['debug'] 4316 4317 sql = " SELECT * FROM job " 4318 sql += " WHERE dataset_id = %u AND queue_id = %u " % (dataset_id,job_id) 4319 cursor.execute(sql) 4320 job = cursor.fetchone() 4321 4322 sql = " UPDATE job SET " 4323 sql += " prev_state = status, " 4324 sql += " errormessage=CONCAT(NOW(),QUOTE('%s')), " % self.defang(errormessage) 4325 sql += " status_changed=NOW(), " 4326 4327 if error == 1: # job was evicted 4328 sql += " evictions=evictions+1, " 4329 sql += " status='EVICTED', " 4330 if error == 2: # job failed with error 4331 sql += " failures=failures+1, " 4332 if debug: 4333 sql += " status='SUSPENDED', " 4334 else: 4335 sql += " status='ERROR', " 4336 sql += " nevents=0 " 4337 sql += " WHERE queue_id=%d " % job['queue_id'] 4338 sql += " AND dataset_id=%d " % job['dataset_id'] 4339 if key: 4340 sql += " AND passkey='%s' " % key 4341 self.logger.debug(sql) 4342 self.execute(cursor,sql) 4343 self.commit() 4344 4345 if stats: 4346 # update node statistics 4347 sql = " SELECT grid_id,host FROM job " 4348 sql += " WHERE queue_id=%d " % job['queue_id'] 4349 sql += " AND dataset_id=%d " % job['dataset_id'] 4350 self.logger.debug(sql) 4351 cursor.execute(sql) 4352 gridinfo = cursor.fetchone() 4353 if gridinfo and gridinfo["host"]: 4354 self.update_node_statistics(gridinfo,stats,error) 4355 4356 try: 4357 self.execute(cursor,sql) 4358 except Exception,e: 4359 self.logger.error(e) 4360 4361 # Update grid statistics 4362 sql = " UPDATE grid_statistics SET " 4363 if stats: 4364 try: # update stats if given 4365 sql += " time_real = time_real + %g, " % stats['real_time'] 4366 sql += " time_sys = time_sys + %g, " % stats['sys_time'] 4367 sql += " time_user = time_user + %g, " % stats['user_time'] 4368 except: pass 4369 4370 if error == 1: # job was evicted 4371 sql += " grid_statistics.evictions = grid_statistics.evictions+1 " 4372 else: # job failed with error 4373 sql += " grid_statistics.failures = grid_statistics.failures+1 " 4374 sql += " WHERE grid_statistics.dataset_id= %u " % job['dataset_id'] 4375 sql += " AND grid_statistics.grid_id= %u " % job['grid_id'] 4376 self.logger.debug(sql) 4377 try: 4378 self.execute(cursor,sql) 4379 except Exception,e: 4380 self.logger.error(e) 4381 4382 self.commit()
4383 4384
4385 - def jobclean(self,dataset_id,archive=True):
4386 """ 4387 Remove jobs from queueue 4388 """ 4389 cursor = self.getcursor() 4390 4391 sql = " SELECT name, SUM(value) as total " 4392 sql += " FROM job_statistics " 4393 sql += " WHERE dataset_id=%d " % dataset_id 4394 sql += " GROUP BY name " 4395 cursor.execute(sql) 4396 results = cursor.fetchall() 4397 self.commit() 4398 if results: 4399 4400 # create new entries if they do not exist 4401 cm = "" 4402 sql = " REPLACE INTO dataset_statistics " 4403 sql += " (name,value,dataset_id) VALUES " 4404 for entry in results: 4405 sql += "%s ('%s',%f,%d) " % (cm,entry['name'],entry['total'],dataset_id) 4406 cm = "," 4407 cursor.execute(sql) 4408 self.commit() 4409 else: 4410 self.logger.warn("jobclean: no job statistics found for dataset %d" % dataset_id) 4411 4412 if archive: 4413 # archive job entries 4414 sql = " INSERT INTO archived_job " 4415 sql += " SELECT job.* FROM job WHERE dataset_id = %u " % dataset_id 4416 self.logger.debug(sql) 4417 cursor.execute(sql) 4418 self.commit() 4419 4420 # archive job_statistics 4421 sql = " INSERT INTO archived_job_statistics " 4422 sql += " SELECT job_statistics.* FROM job_statistics WHERE dataset_id = %u " % dataset_id 4423 self.logger.debug(sql) 4424 #cursor.execute(sql) 4425 4426 # delete job entries 4427 sql = " DELETE FROM job " 4428 sql += " WHERE dataset_id=%d " % dataset_id 4429 self.logger.debug(sql) 4430 cursor.execute(sql) 4431 self.commit() 4432 4433 # delete job_statistics 10000 at a time so we don't lock the db for too long 4434 sql = " DELETE FROM job_statistics " 4435 sql += " WHERE dataset_id=%d " % dataset_id 4436 sql += " limit 10000 " 4437 try: 4438 while True: 4439 cursor.execute(sql) 4440 if cursor.rowcount < 1: 4441 break 4442 self.commit() 4443 time.sleep(10) 4444 except Exception,e: 4445 self.logger.error('%s: could not delete job_statistics entry.' % str(e)) 4446 self.commit()
4447 4448
4449 - def jobsuspend(self,job_id,dataset_id,suspend=True):
4450 """ 4451 Reset any pending jobs to they get reprocesses. 4452 This would typically be run at startup in case the daemon 4453 crashed previously. 4454 """ 4455 if suspend: status = 'SUSPENDED' 4456 else: status = 'RESET' 4457 4458 passkey = self.mkkey(6,9) 4459 cursor = self.getcursor() 4460 4461 sql = " UPDATE job SET " 4462 sql += " prev_state = status, " 4463 if status == 'RESET': 4464 sql += " failures=0, " 4465 sql += " status='%s', " % status 4466 sql += " passkey='%s', " % passkey 4467 sql += " status_changed=NOW() " 4468 sql += " WHERE dataset_id=%d " % dataset_id 4469 sql += " AND status != 'OK' " 4470 sql += " AND status NOT LIKE 'IDL%' " 4471 if not job_id < 0: 4472 sql += " AND queue_id=%d " % job_id 4473 self.execute(cursor,sql)
4474
4475 - def jobsetstatus(self,dataset_id,job_id=-1,status="RESET",reason=None,passkey=None):
4476 cursor = self.getcursor() 4477 sql = " UPDATE job SET " 4478 sql += " tray=0, " 4479 sql += " iter=0, " 4480 sql += " prev_state = status, " 4481 if reason: 4482 sql += " errormessage = '%s', " % reason 4483 sql += " status='%s', " % status 4484 sql += " status_changed=NOW() " 4485 sql += " WHERE dataset_id=%d " % dataset_id 4486 if job_id != -1: 4487 sql += " AND queue_id=%d " % job_id 4488 if passkey: 4489 sql += " AND passkey='%s' " % passkey 4490 self.execute(cursor,sql)
4491
4492 - def ToggleDatasetDebug(self,dataset):
4493 """ 4494 Update status of dataset 4495 """ 4496 cursor = self.getcursor() 4497 sql = " UPDATE dataset " 4498 sql += " SET debug = !debug " 4499 sql += " WHERE dataset_id=%d " % dataset 4500 cursor.execute(sql) 4501 self.commit()
4502
4503 - def getDatasetStatus(self,dataset):
4504 """ 4505 Get status of dataset 4506 """ 4507 cursor = self.getcursor() 4508 sql = " SELECT dataset_id,status from dataset " 4509 sql += " WHERE dataset_id=%d " % dataset 4510 cursor.execute(sql) 4511 return cursor.fetchall()
4512
4513 - def setDatasetStatus(self,dataset,status):
4514 """ 4515 Update status of dataset 4516 """ 4517 cursor = self.getcursor() 4518 sql = " UPDATE dataset " 4519 sql += " SET status='%s' " % status 4520 sql += " WHERE dataset_id=%d " % dataset 4521 cursor.execute(sql) 4522 self.commit()
4523
4524 - def set_metadata_subcat(self,dataset_id,sub_cat):
4525 """ 4526 Change Plus:subcategory in DIFPlus metadata 4527 """ 4528 cursor = self.getcursor() 4529 sql = " UPDATE plus " 4530 sql += " SET subcategory='%s' " % sub_cat 4531 sql += " WHERE dataset_id=%d " % dataset_id 4532 self.logger.debug(sql) 4533 cursor.execute(sql) 4534
4535 - def validate(self,dataset_id,status='TRUE'):
4536 """ 4537 Mark dataset as visible and valid. 4538 """ 4539 cursor = self.getcursor() 4540 sql = " UPDATE dataset SET verified = '%s' " % status 4541 sql += " WHERE dataset_id = %d " % dataset_id 4542 self.logger.debug(sql) 4543 cursor.execute(sql) 4544 self.commit()
4545
4546 - def multipart_job_start(self,dataset_id,queue_id,key=''):
4547 """ 4548 Change the status of a job to indicate it is currently running 4549 @param dataset_id: Dataset ID 4550 @param queue_id: Queue ID (within dataset) 4551 @param key: temporary passkey to avoid job spoofs 4552 @return: dataset_id,nproc,procnum 4553 """ 4554 4555 cursor = self.getcursor() 4556 4557 sql = " SELECT jobs_submitted " 4558 sql += " FROM dataset " 4559 sql += " WHERE dataset_id = %s " 4560 cursor.execute(sql,(dataset_id,)) 4561 item = cursor.fetchone() 4562 4563 jobs_submitted = item['jobs_submitted'] 4564 4565 self.logger.debug("Job %d.%d starting with key %s" % (dataset_id,queue_id,key)) 4566 4567 # update the main job 4568 sql = " UPDATE job SET " 4569 sql += " job.prev_state = job.status, " 4570 sql += " job.status='PROCESSING', " 4571 #sql += " errormessage = NULL, " 4572 sql += " status_changed=NOW(), " 4573 sql += " keepalive=NOW() " 4574 sql += " WHERE " 4575 sql += " (job.status='QUEUED' OR job.status='PROCESSING' OR job.status='EVICTED')" 4576 sql += " AND job.dataset_id=%s " 4577 sql += " AND job.queue_id=%s " 4578 sql += " AND job.passkey=%s " 4579 cursor.execute(sql,(dataset_id,queue_id,key)) 4580 4581 # separate this out because the main job may have been updated already 4582 sql = " SELECT EXISTS(" 4583 sql += " SELECT *" 4584 sql += " FROM job j" 4585 sql += " WHERE status = 'PROCESSING'" 4586 sql += " AND dataset_id = %s" 4587 sql += " AND queue_id = %s" 4588 sql += " AND passkey = %s" 4589 sql += ") AS found" 4590 cursor.execute(sql,(dataset_id,queue_id,key)) 4591 row = cursor.fetchone() 4592 if row and int(row['found']) == 1: 4593 return (dataset_id,jobs_submitted,queue_id) 4594 return (TASK_DATASET_ERROR_ID,0,0)
4595
4596 - def multipart_job_finish(self,dataset_id,queue_id,key=''):
4597 it = self.get_iterations(dataset_id) 4598 tray,iter = 0,0 4599 if it: tray,iter = it[-1]['tray_index'],it[-1]['iterations']+1 4600 cursor = self.getcursor() 4601 sql = " UPDATE job SET " 4602 sql += " prev_state = status, " 4603 sql += " status = 'OK', " 4604 sql += " tray = %s, " 4605 sql += " iter = %s, " 4606 sql += " errormessage = NULL, " 4607 sql += " status_changed=NOW() " 4608 sql += " WHERE dataset_id=%s " 4609 sql += " AND queue_id=%s " 4610 sql += " AND passkey=%s " 4611 self.logger.debug(sql % (tray,iter,dataset_id,queue_id,key)) 4612 cursor.execute(sql,(tray,iter,dataset_id,queue_id,key)) 4613 rows = self._conn.affected_rows() 4614 if not rows: return rows 4615 4616 # Now update job statistics 4617 sql = " REPLACE INTO job_statistics " 4618 sql += " ( " 4619 sql += " SELECT NULL,j.dataset_id, j.queue_id, " 4620 sql += " ts.name, AVG(ts.value) AS value " 4621 sql += " FROM task_statistics ts " 4622 sql += " JOIN task t ON ts.task_id = t.task_id " 4623 sql += " JOIN job j ON j.job_id = t.job_id " 4624 sql += " WHERE j.dataset_id = %s AND j.queue_id = %s " 4625 sql += " GROUP BY name " 4626 sql += " ) " 4627 cursor.execute(sql,(dataset_id,queue_id)) 4628 4629 return self._conn.affected_rows()
4630
4631 - def get_iterations(self,dataset_id):
4632 cursor = self.getcursor() 4633 sql = " SELECT tray.tray_index, tray.iterations " 4634 sql += " FROM tray WHERE dataset_id = %s " 4635 cursor.execute(sql,(dataset_id,)) 4636 return cursor.fetchall()
4637
4638 - def get_task_id(self,task_def_id,job_id,tray,iter):
4639 cursor = self.getcursor() 4640 4641 # get the task definition tray ID 4642 sql = " SELECT task_def_tray_id" 4643 sql += " FROM task_def_tray tdt" 4644 sql += " WHERE task_def_id = %s" 4645 sql += " AND idx = %s" 4646 sql += " AND iter = %s" 4647 cursor.execute(sql,(task_def_id,tray,iter)) 4648 row = cursor.fetchone() 4649 if not row: 4650 self.logger.error("could not find task_def_tray_id %s %s" % (task_def_id,idx)) 4651 return 0 4652 tdt_id = row['task_def_tray_id'] 4653 4654 sql = " SELECT task_id" 4655 sql += " FROM task t, task_def_tray tdt" 4656 sql += " WHERE t.task_def_tray_id = tdt.task_def_tray_id" 4657 sql += " AND tdt.task_def_tray_id = %s" 4658 sql += " AND t.job_id = %s" 4659 cursor.execute(sql,(tdt_id,job_id,)) 4660 row = cursor.fetchone() 4661 if row: 4662 return row['task_id'] 4663 return 0
4664 4665
4666 - def task_start(self,dataset_id,queue_id,taskname,tray,iter,hostname,key=''):
4667 cursor = self.getcursor() 4668 4669 self.logger.info("task %s (tray: %d iter: %d) starting for job %d.%d" \ 4670 % (taskname,tray,iter,dataset_id,queue_id)) 4671 4672 # get the job ID 4673 sql = " SELECT job_id" 4674 sql += " FROM job j" 4675 sql += " WHERE dataset_id = %s" 4676 sql += " AND queue_id = %s" 4677 sql += " AND passkey = %s" 4678 cursor.execute(sql,(dataset_id,queue_id,key)) 4679 row = cursor.fetchone() 4680 if not row: 4681 msg = "task %s tried to start for job %d.%d with wrong passkey" \ 4682 % (taskname,dataset_id,queue_id) 4683 self.logger.warn(msg) 4684 return TASK_ERROR_ID 4685 job_id = row['job_id'] 4686 4687 # get the task definition ID 4688 sql = " SELECT task_def_id" 4689 sql += " FROM task_def td" 4690 sql += " WHERE dataset_id = %s" 4691 sql += " AND name = %s" 4692 cursor.execute(sql,(dataset_id,taskname)) 4693 row = cursor.fetchone() 4694 if not row: 4695 msg = "task %s not found for job %d.%d" % (taskname,dataset_id,queue_id) 4696 self.logger.error(msg) 4697 return TASK_ERROR_ID 4698 task_def_id = row['task_def_id'] 4699 4700 # get the task definition tray ID 4701 sql = " SELECT task_def_tray_id" 4702 sql += " FROM task_def_tray tdt" 4703 sql += " WHERE task_def_id = %s" 4704 sql += " AND idx = %s" 4705 sql += " AND iter = %s" 4706 cursor.execute(sql,(task_def_id,tray,iter)) 4707 row = cursor.fetchone() 4708 if not row: 4709 msg = "tray %d, iter %d not found for task %s in job %d.%d" \ 4710 % (tray,iter,taskname,dataset_id,queue_id) 4711 self.logger.error(msg) 4712 return TASK_ERROR_ID 4713 tdt_id = row['task_def_tray_id'] 4714 4715 sql = " SELECT task_id" 4716 sql += " FROM task t, task_def_tray tdt" 4717 sql += " WHERE t.task_def_tray_id = tdt.task_def_tray_id" 4718 sql += " AND tdt.task_def_tray_id = %s" 4719 sql += " AND t.job_id = %s" 4720 cursor.execute(sql,(tdt_id,job_id)) 4721 row = cursor.fetchone() 4722 if row: 4723 # task exists, update it 4724 task_id = row['task_id'] 4725 sql = " UPDATE task" 4726 sql += " SET last_status = status," 4727 sql += " status = 'STARTING'," 4728 sql += " status_changed = NOW()," 4729 sql += " host = %s," 4730 sql += " start = NOW()," 4731 sql += " finish = NULL" 4732 sql += " WHERE task_id = %s" 4733 cursor.execute(sql,(hostname,task_id)) 4734 4735 # delete old statistics too 4736 sql = " DELETE FROM task_statistics WHERE task_id = %s" 4737 cursor.execute(sql,(task_id,)) 4738 else: 4739 # task doesn't exist, insert it 4740 sql = "INSERT INTO task (task_def_tray_id, job_id, " 4741 sql += " host, status, status_changed, start)" 4742 sql += " VALUES (%s, %s, %s, 'STARTING', NOW(), NOW())" 4743 cursor.execute(sql,(tdt_id,job_id,hostname)) 4744 task_id = self._conn.insert_id() 4745 return task_id
4746
4747 - def task_init(self,dataset_id,job_id,tray=None,iter=None):
4748 """ 4749 Initialize task entry and set status to IDLE 4750 @param: dataset_id 4751 @param: job_id 4752 @param: taskname 4753 @param: tray 4754 @param: iter 4755 """ 4756 cursor = self.getcursor() 4757 4758 # get the task definition tray ID 4759 sql = " SELECT tdt.task_def_tray_id, td.task_def_id, tdt.idx, tdt.iter " 4760 sql += " FROM task_def_tray tdt" 4761 sql += " JOIN task_def td " 4762 sql += " ON td.task_def_id = tdt.task_def_id " 4763 sql += " WHERE dataset_id = %s" % dataset_id 4764 if tray: 4765 sql += " AND idx = %s" % tray 4766 if iter: 4767 sql += " AND iter = %s" % iter 4768 cursor.execute(sql) 4769 rows = cursor.fetchall() 4770 if not rows: return 0 4771 4772 inserts = map(lambda x: (x['task_def_tray_id'],job_id), rows) 4773 sql = "INSERT IGNORE INTO task (task_def_tray_id, job_id, status, status_changed)" 4774 sql += " VALUES (%s, %s, 'IDLE', NOW())" 4775 cursor.executemany(sql, inserts) 4776 self.commit() 4777 return self.get_task_id(rows[0]["task_def_id"],job_id, rows[0]['idx'], rows[0]['iter'])
4778
4779 - def task_status(self,task_id):
4780 """ 4781 Get status for task 4782 @param: task_id 4783 """ 4784 cursor = self.getcursor() 4785 4786 # get the task definition tray ID 4787 sql = " SELECT * FROM task WHERE task_id = %s " % task_id 4788 cursor.execute(sql) 4789 row = cursor.fetchone() 4790 if not row: return 0 4791 return row["status"]
4792 4793
4794 - def task_update_status(self,task_id,status,key='',cursor=None,grid_id=0):
4795 if not cursor: 4796 cursor = self.getcursor() 4797 sql = " UPDATE task t,job j" 4798 sql += " SET t.last_status = t.status," 4799 sql += " t.status = %s," 4800 if grid_id: 4801 sql += " t.grid_id = %s," 4802 sql += " t.status_changed = NOW()" 4803 sql += " WHERE t.job_id = j.job_id" 4804 sql += " AND t.task_id = %s" 4805 sql += " AND j.passkey = %s" 4806 sql += " AND t.status != 'OK'" 4807 self.logger.debug("task status update starting for task %d" % task_id) 4808 if grid_id: 4809 rowcount = cursor.execute(sql,(status,grid_id,task_id,key)) 4810 self.logger.debug(sql % (status,grid_id,task_id,key)) 4811 else: 4812 rowcount = cursor.execute(sql,(status,task_id,key)) 4813 self.logger.debug(sql % (status,task_id,key)) 4814 self.logger.debug("task status update done for task %d" % task_id) 4815 self.commit() 4816 return rowcount
4817 #return self._conn.affected_rows() 4818
4819 - def task_is_finished(self,td_id,job_id,tray=None,iter=None):
4820 cursor = self.getcursor() 4821 sql = " SELECT COUNT(*) AS expected" 4822 sql += " FROM task_def_tray tdt" 4823 sql += " WHERE tdt.task_def_id = %s" 4824 params = [td_id] 4825 add_sql = "" 4826 if tray is not None: 4827 add_sql += " AND tdt.idx = %s" 4828 params.append(tray) 4829 if iter is not None: 4830 add_sql += " AND tdt.iter = %s" 4831 params.append(iter) 4832 if add_sql: 4833 sql += add_sql 4834 cursor.execute(sql,params) 4835 row = cursor.fetchone() 4836 if not row: 4837 return False 4838 expected = int(row['expected']) 4839 4840 params.insert(1, job_id) 4841 sql = " SELECT COUNT(*) AS actual" 4842 sql += " FROM task t, task_def_tray tdt" 4843 sql += " WHERE tdt.task_def_tray_id = t.task_def_tray_id" 4844 sql += " AND tdt.task_def_id = %s" 4845 sql += " AND t.job_id = %s" 4846 sql += " AND t.status = 'OK'" 4847 if add_sql: 4848 sql += add_sql 4849 4850 cursor.execute(sql,params) 4851 row = cursor.fetchone() 4852 if not row: 4853 return False 4854 actual = int(row['actual']) 4855 4856 return expected > 0 and actual == expected
4857
4858 - def task_abort(self,task_id,key='',stats={}):
4859 cursor = self.getcursor() 4860 ret = self.task_update_status(task_id,'ERROR',key,cursor) 4861 if not ret: 4862 self.logger.error("unable to update status for task %d" % task_id) 4863 return ret 4864 sql = " DELETE FROM task_statistics WHERE task_id = %s" 4865 self.logger.debug("aborting task %d" % task_id) 4866 cursor.execute(sql,(task_id,)) 4867 4868 if stats: 4869 # update node statistics 4870 sql = " SELECT grid_id,host FROM task " 4871 sql += " WHERE task_id=%d " % task_id 4872 self.logger.debug(sql) 4873 cursor.execute(sql) 4874 gridinfo = cursor.fetchone() 4875 if gridinfo: # don't continue 4876 self.update_node_statistics(gridinfo,stats,error=2) 4877 4878 return True
4879
4880 - def task_finish(self,task_id,stats,key=''):
4881 cursor = self.getcursor() 4882 ret = self.task_update_status(task_id,'OK',key,cursor) 4883 if not ret: 4884 self.logger.error("unable to update status for task %d" % task_id) 4885 return ret 4886 4887 sql = " UPDATE task SET finish = NOW()" 4888 sql += " WHERE task_id=%s" 4889 self.logger.debug(sql % task_id) 4890 cursor.execute(sql,(task_id,)) 4891 ret = self._conn.affected_rows() 4892 if not ret: 4893 self.logger.error("unable to set finish time for task %d" % task_id) 4894 #return ret 4895 4896 if stats: 4897 inserts = [] 4898 self.logger.debug("stats: %s" % stats) 4899 for name,value in stats.iteritems(): 4900 if type(value) == float and not str(value)=='nan': 4901 inserts.append((task_id,name,value)) 4902 sql = " REPLACE INTO task_statistics (task_id,name,value) VALUES (%s,%s,%s)" 4903 cursor.executemany(sql, inserts) 4904 4905 # update node statistics 4906 sql = " SELECT grid_id,host FROM task " 4907 sql += " WHERE task_id=%d " % task_id 4908 self.logger.debug(sql) 4909 cursor.execute(sql) 4910 gridinfo = cursor.fetchone() 4911 if gridinfo: # don't continue 4912 self.update_node_statistics(gridinfo,stats) 4913 4914 return True
4915 4916
4917 - def get_finished_jobs(self,grid_id,delay=1):
4918 """ 4919 Reset any pending jobs to they get reprocesses. 4920 This would typically be run at startup in case the daemon 4921 crashed previously. 4922 """ 4923 cursor = self.getcursor() 4924 sql = " SELECT * FROM job " 4925 sql += " WHERE grid_id=%d " % grid_id 4926 sql += " AND status='COPYING' " 4927 if delay: 4928 sql += " AND NOW() > TIMESTAMPADD(MINUTE,%d,status_changed) " % delay 4929 cursor.execute(sql) 4930 resuslts = cursor.fetchall(); 4931 self.commit() 4932 return results
4933
4934 - def SetFileURL(self,queue_id,dataset_id,location,filename,md5sum,filesize,transfertime,key):
4935 """ 4936 Add or change the global location of a file 4937 """ 4938 cursor = self.getcursor() 4939 4940 sql = " SELECT job_id FROM job " 4941 sql += " WHERE dataset_id=%u " % dataset_id 4942 sql += " AND queue_id = %u " % queue_id 4943 sql += " AND passkey='%s' " % key 4944 self.logger.debug(sql) 4945 cursor.execute(sql) 4946 results = cursor.fetchone() 4947 if not results: return 0 4948 4949 sql = " INSERT IGNORE INTO urlpath " 4950 sql += " (dataset_id,queue_id,name,path,md5sum,size,transfertime) VALUES " 4951 sql += " (%u,%u,'%s','%s','%s',%f,%f)" % \ 4952 (dataset_id,queue_id,filename,location,md5sum,filesize,transfertime) 4953 self.logger.debug(sql) 4954 cursor.execute(sql) 4955 if not cursor.rowcount: 4956 sql = " UPDATE urlpath SET " 4957 sql += " name ='%s', " % filename 4958 sql += " path ='%s', " % location 4959 sql += " md5sum ='%s', " % md5sum 4960 sql += " size =%f, " % filesize 4961 sql += " transfertime =%f " % transfertime 4962 sql += " WHERE dataset_id =%u " % dataset_id 4963 sql += " AND queue_id =%u " % queue_id 4964 sql += " AND name ='%s' " % filename 4965 self.logger.debug(sql) 4966 cursor.execute(sql) 4967 return 1
4968
4969 - def GetStorageURL(self,dataset_id,queue_id,passkey,storage_type='INPUT'):
4970 """ 4971 Get status of dataset 4972 """ 4973 cursor = self.getcursor() 4974 sql = " SELECT urlpath.* " 4975 sql += " FROM urlpath " 4976 sql += " JOIN job ON urlpath.dataset_id = job.dataset_id AND urlpath.queue_id = job.queue_id " 4977 sql += " WHERE urlpath.dataset_id = %u " % dataset_id 4978 sql += " AND urlpath.queue_id = %u " % queue_id 4979 sql += " AND job.passkey = '%s' " % passkey 4980 sql += " AND urlpath.type = '%s' " % storage_type 4981 self.logger.debug(sql) 4982 cursor.execute(sql) 4983 return cursor.fetchall();
4984
4985 - def clearStorageURL(self,dataset_id):
4986 """ 4987 Get status of dataset 4988 """ 4989 cursor = self.getcursor() 4990 sql = " DELETE FROM urlpath " 4991 sql += " WHERE urlpath.dataset_id = %u " % dataset_id 4992 self.logger.debug(sql) 4993 cursor.execute(sql) 4994 return 1
4995 4996
4997 - def getsummary(self,days,groupby=None):
4998 """ 4999 @param days: number of days to get summary from 5000 @param groupby: how to group statistics 5001 """ 5002 cursor = self.getcursor() 5003 sql = " SELECT " 5004 sql += " SUM(job.status = 'OK') as ok , " 5005 sql += " SUM(job.status = 'ERROR') as error , " 5006 sql += " SUM(job.status = 'SUSPENDED') as suspended, " 5007 if groupby: 5008 sql += " job.%s, " % groupby 5009 sql += " SUM(job.time_sys) as sys_t, " 5010 sql += " SUM(job.time_user) as usr_t, " 5011 sql += " SUM(job.time_real) as real_t " 5012 sql += " FROM job " 5013 sql += " WHERE DATE_SUB(CURDATE(),INTERVAL %d DAY) " % days 5014 sql += " < job.status_changed " 5015 sql += " AND job.grid_id != 0 " 5016 sql += " AND job.grid_id IS NOT NULL " 5017 if groupby: 5018 sql += " GROUP BY %s " % groupby 5019 cursor.execute(sql) 5020 return cursor.fetchall();
5021
5022 - def getsummary_stats(self,days,groupby=None):
5023 """ 5024 @param days: number of days to get summary from 5025 @param groupby: how to group statistics 5026 """ 5027 cursor = self.getcursor() 5028 sql = " SELECT job_statistics.name AS name, " 5029 sql += " SUM(job_statistics.value) AS value, " 5030 if groupby: 5031 sql += " job.%s, " % groupby 5032 sql += " SUM(1) AS completed " 5033 sql += " FROM job,job_statistics " 5034 sql += " WHERE DATE_SUB(CURDATE(),INTERVAL %d DAY) " % days 5035 sql += " < job.status_changed " 5036 sql += " AND job.status = 'OK' " 5037 sql += " AND job.dataset_id = job_statistics.dataset_id " 5038 sql += " AND job.queue_id = job_statistics.queue_id " 5039 sql += " AND job.grid_id IS NOT NULL " 5040 sql += " AND job.grid_id != 0 " 5041 if groupby: 5042 sql += " GROUP BY job_statistics.name,%s " % groupby 5043 else: 5044 sql += " GROUP BY job_statistics.name " 5045 cursor.execute(sql) 5046 return cursor.fetchall();
5047 5048
5049 - def getgrid_ids(self):
5050 grid_ids = {} 5051 cursor = self.getcursor() 5052 sql = " SELECT name,grid_id FROM grid " 5053 cursor.execute(sql) 5054 for item in cursor.fetchall(): 5055 grid_ids[item['grid_id']] = item['name'] 5056 return grid_ids
5057
5058 - def getstatus(self,dataset,job=-1):
5059 """ 5060 @param dataset: dataset id 5061 @param job: optional job id 5062 @return: formated string with dataset/job summary 5063 """ 5064 cursor = self.getcursor() 5065 if job >= 0: 5066 sql = " SELECT " 5067 sql += " job.dataset_id,job.job_id,status,job.grid_id,job.errormessage, " 5068 sql += " ( " 5069 sql += " (job.iter + SUM(IF(tray.tray_index < job.tray, tray.iterations, 0))) " 5070 sql += " /SUM(tray.iterations) " 5071 sql += " ) * 100 AS completion_percent " 5072 sql += " FROM job join tray on job.dataset_id = tray.dataset_id" 5073 sql += " WHERE job.dataset_id = %u " % dataset 5074 sql += " AND job.queue_id = %u " % job 5075 else: 5076 sql = " SELECT " 5077 sql += " SUM(job.status = 'RESET') as reset , " 5078 sql += " SUM(job.status = 'WAITING') as waiting , " 5079 sql += " SUM(job.status = 'OK') as ok , " 5080 sql += " SUM(job.status = 'ERROR') as error , " 5081 sql += " SUM(job.status = 'SUSPENDED') as suspended, " 5082 sql += " SUM(job.status = 'PROCESSING') as processing, " 5083 sql += " SUM(job.status = 'COPIED') as copied, " 5084 sql += " SUM(job.status = 'READYTOCOPY') as readytocopy, " 5085 sql += " SUM(job.status = 'FAILED') as failed, " 5086 sql += " SUM(job.status = 'QUEUED') as queued, " 5087 sql += " SUM(job.status = 'QUEUING') as queueing " 5088 sql += " FROM job " 5089 sql += " WHERE dataset_id = %u " % dataset 5090 cursor.execute(sql) 5091 return cursor.fetchall()
5092 5093
5094 - def printsummary(self,days):
5095 """ 5096 @param days: number of days to gather information from starting from today 5097 @return: formated string with production summary 5098 """ 5099 div = '+'+('-'*9+'+') 5100 s ='iceprod summary for %s in the last %d days' % (self.db_,days) 5101 s += '-'*10 5102 s += '\n\n' 5103 grid_id = self.getgrid_ids() 5104 row = "| %-7s | " % "grid" 5105 gridsum = self.getsummary(days,'grid_id') 5106 for key in gridsum[0].keys(): 5107 row += "%-7s | " % key[0:min(7,len(key))] 5108 div += ('-'*9+'+') 5109 s += row + '\n' 5110 s += div + '\n' 5111 for entry in gridsum: 5112 gridname = grid_id[entry['grid_id']] 5113 row = "| %-7s | " % gridname[0:min(7,len(gridname))] 5114 for key in gridsum[0].keys(): 5115 row += "%-7.2g | " % entry[key] 5116 s += row + '\n' 5117 s += div + '\n' 5118 totals = self.getsummary(days) 5119 for entry in totals: 5120 row = "| %-7s | " % "Total" 5121 for key in gridsum[0].keys(): 5122 if entry.has_key(key): 5123 row += "%-7.2g | " % entry[key] 5124 else: 5125 row += "%-7.2g | " % 0. 5126 s += row + '\n' 5127 s += div + '\n' 5128 5129 column_size = 10 5130 strfmt = "%%-%ds|" % column_size 5131 gfmt = "%%-%d.2g|" % column_size 5132 dictfmt = "%%%%(%%s)-%ds|" % column_size 5133 s += '\n\n' 5134 gridsum = self.getsummary_stats(days,'grid_id') 5135 newgridsum = {} 5136 grids = ["Total"] 5137 keys = {} 5138 totals = {} 5139 for entry in gridsum: 5140 grid = entry["grid_id"] 5141 name = entry["name"] 5142 gridname = grid_id[grid] 5143 gridname = gridname.strip()[0:min(column_size,len(gridname))] 5144 key = re.sub(r'[^a-z0-9_]','',entry["name"].lower()) 5145 if not keys.has_key(key): 5146 keys[key] = entry["name"][0:min(column_size,len(entry["name"]))] 5147 if ("time" in key) or ("event" in key): 5148 if not newgridsum.has_key(key): 5149 newgridsum[key] = {} 5150 if gridname not in grids: 5151 grids.append(gridname) 5152 try: 5153 newgridsum[key][gridname] = "%2.2g" % float(entry["value"]) 5154 if not newgridsum[key].has_key("Total"): 5155 newgridsum[key]["Total"] = 0.0 5156 newgridsum[key]["Total"] += float(entry["value"]) 5157 except Exception,e: 5158 print e,key 5159 newgridsum[key][gridname] = str(entry["value"]) 5160 rowhdr = "|" + "".join(map(lambda x: strfmt % x, [" "] + grids)) + "\n" 5161 div = "+" + ('-'*column_size+'+')*(len(grids)+1) + "\n" 5162 s += div + rowhdr 5163 for key in newgridsum.keys(): 5164 entry = newgridsum[key] 5165 rowfmt = "|" + strfmt % keys[key] + "".join(map(lambda x: dictfmt % x, grids)) + "\n" 5166 for grid in grids: 5167 if not entry.has_key(grid): entry[grid] = "N/A" 5168 entry["Total"] = "%2.2g" % float(entry["Total"]) 5169 row = rowfmt % entry 5170 s += div + row 5171 totals = self.getsummary_stats(days) 5172 s += div 5173 return s
5174 5175
5176 - def add_history(self,user,command):
5177 """ 5178 Add a history item 5179 """ 5180 cursor = self.getcursor() 5181 sql = " INSERT INTO history (user,cmd,time)" 5182 sql += " VALUES ('%s','%s',NOW())" % (user,command) 5183 cursor.execute(sql) 5184 self.commit()
5185 5186 5187
5188 -class MySQLParamDb(IceProdDB,ParamDb):
5189 5190 """ 5191 Class paramdb uploads parsed IceTrayConfig+Metaproject structure 5192 to the parameter database 5193 5194 """ 5195 logger = logging.getLogger('MySQLParamDb') 5196
5197 - def __init__(self):
5198 5199 IceProdDB.__init__(self) 5200 self.metaprojects = {} 5201 self.projects = {} 5202 self.modules = {} 5203 self.parameters = {} 5204 self.auth_function = lambda x: \ 5205 self.authenticate( 5206 get('host',x.host_), 5207 get('username',x.usr_), 5208 getpass.getpass(), 5209 get('database',x.db_),True) 5210 self.auth_function = lambda x: x 5211 return
5212
5213 - def new(self):
5214 """ 5215 Create a copy of this instance 5216 """ 5217 newconn = MySQLParamDb() 5218 newconn.host_ = self.host_ 5219 newconn.usr_ = self.usr_ 5220 newconn.passwd_ = self.passwd_ 5221 newconn.db_ = self.db_ 5222 newconn._connected = False 5223 return newconn
5224
5225 - def load(self,metaproject):
5226 """ 5227 load contents of metaproject tree to database 5228 @param metaproject: metaproject object 5229 """ 5230 self.connect() 5231 self.load_metaproject(metaproject) 5232 self.load_projects(metaproject) 5233 self.load_mp_pivot(metaproject) 5234 self.load_project_dependencies(metaproject) 5235 self.commit() 5236 return 1 5237 5238
5239 - def download(self):
5240 for metaproject in self.GetMetaProjects(): 5241 self.metaprojects[metaproject.GetId()]= metaproject 5242 for project in self.GetProjects(metaproject.GetId()): 5243 self.projects[project.GetId()] = project 5244 metaproject.AddProject(project.GetName(),project) 5245 5246 for dependency in self.GetProjectDependencies( 5247 project.GetId(), metaproject.GetId()): 5248 project.AddDependency(dependency) 5249 5250 for service in self.GetServices(project.GetId()): 5251 self.modules[service.GetId()] = service 5252 project.AddService(service) 5253 5254 for param in self.GetParameters(service.GetId()): 5255 service.AddParameter(param) 5256 self.parameters[param.GetId()] = param 5257 5258 for module in self.GetModules(project.GetId()): 5259 self.modules[module.GetId()] = module 5260 project.AddModule(module) 5261 5262 for param in self.GetParameters(module.GetId()): 5263 module.AddParameter(param) 5264 self.parameters[param.GetId()] = param 5265 return self.metaprojects 5266
5267 - def adddependencies(self, project, mp,maxrecursion=3):
5268 """ 5269 Recursively add project and its dependencies to metaproject 5270 """ 5271 if maxrecursion < 0: return 5272 for dep in self.GetProjectDependencies(project.GetId(),mp.GetId()): 5273 if dep.GetName() != project.GetName(): 5274 project.AddDependency(dep) 5275 self.adddependencies(dep,mp,maxrecursion-1)
5276
5277 - def SwitchMetaProject(self, iconfig, id, name, version):
5278 """ 5279 Add selected metaproject to configuration. Import modules and their 5280 parameters. 5281 """ 5282 5283 # Create a new metaproject and set version 5284 newmp = MetaProject() 5285 newmp.SetId(id) 5286 newmp.SetName(name) 5287 newmp.SetVersion(version) 5288 5289 # Create a temporary configuration 5290 new_config = IceTrayConfig() 5291 new_config.AddMetaProject(newmp.GetName(),newmp) 5292 5293 # Get all configured modules in previous configuration 5294 for oldmod in iconfig.GetModules(): 5295 for project in self.GetProjectsMM(oldmod,newmp): 5296 newmp.AddProject(project.GetName(),project) 5297 self.adddependencies(project, newmp) 5298 5299 # Get all configured services in previous configuration 5300 for oldserv in iconfig.GetServices(): 5301 for project in self.GetProjectsMM(oldserv,newmp): 5302 newmp.AddProject(project.GetName(),project) 5303 self.adddependencies(project, newmp) 5304 5305 # Get missing projects and dependencies 5306 if iconfig.HasMetaProject(newmp.GetName()): 5307 oldmp = iconfig.GetMetaProject(newmp.GetName()) 5308 for p in self.GetProjects(newmp.GetId()): 5309 if oldmp.HasProject(p.GetName()) and not newmp.HasProject(p.GetName()): 5310 self.logger.debug(p.GetName()) 5311 newmp.AddProject(p.GetName(),p) 5312 5313 # Loop over modules in old configuration and fetch their projects 5314 for oldmod in iconfig.GetModules(): 5315 if not new_config.HasModule(oldmod.GetName()): 5316 depend = self.GetProjectsMM(oldmod,newmp) 5317 if not depend: 5318 oldmod.SetDescription( 'class %s not found in metaproject %s %s' %( 5319 oldmod.GetClass(), 5320 newmp.GetName(), 5321 newmp.GetVersionTxt())) 5322 else: 5323 oldmod.AddProject(depend[0].GetName(),depend[0]) 5324 new_config.AddModule(oldmod) 5325 5326 for oldserv in iconfig.GetServices(): 5327 if not new_config.HasService(oldserv.GetName()): 5328 depend = self.GetProjectsMM(oldserv,newmp) 5329 if not depend: 5330 oldserv.SetDescription('class %s not found for project %s %s' %( 5331 oldserv.GetClass(), 5332 p.GetName(), 5333 p.GetVersionTxt())) 5334 else: 5335 oldserv.AddProject(depend[0].GetName(),depend[0]) 5336 new_config.AddService(oldserv) 5337 5338 # Overwrite metaproject 5339 iconfig.AddMetaProject(newmp.GetName(),newmp) 5340 5341 # Overwrite modules 5342 for newmod in new_config.GetModules(): 5343 iconfig.AddModule(newmod) 5344 5345 # Overwrite services 5346 for newserv in new_config.GetServices(): 5347 iconfig.AddService(newserv) 5348 return newmp
5349 5350
5351 - def GetMetaProjects(self):
5352 mplist = [] 5353 for m in self.fetch_metaproject_list(): 5354 metaproject = MetaProject() 5355 metaproject.SetName(m['name']) 5356 metaproject.SetVersion(m['versiontxt']) 5357 metaproject.SetId(m['metaproject_id']) 5358 mplist.append(metaproject) 5359 return mplist 5360
5361 - def GetProjectsSM(self,module,metaproj):
5362 return self.GetProjectsMM(module,metaproj,'service') 5363
5364 - def SetMetaProjectId(self,metaproj):
5365 sql = " SELECT metaproject.* FROM metaproject " 5366 sql += " WHERE metaproject.name = '%s' " % metaproj.GetName() 5367 sql += " AND metaproject.versiontxt= '%s' " % metaproj.GetVersion() 5368 cursor = self.getcursor() 5369 cursor.execute (sql.strip()); 5370 mp = cursor.fetchone (); 5371 metaproj.SetId(mp['metaproject_id']) 5372 return metaproj 5373
5374 - def GetProjectsMM(self,module,metaproj,modtype='module'):
5375 5376 sql = " SELECT project.* FROM project,module,mp_pivot,metaproject " 5377 sql += " WHERE module.project_id = project.project_id " 5378 sql += " AND mp_pivot.project_id = project.project_id " 5379 sql += " AND mp_pivot.metaproject_id = metaproject.metaproject_id " 5380 sql += " AND metaproject.name = '%s' " % metaproj.GetName() 5381 sql += " AND metaproject.versiontxt = '%s' " % metaproj.GetVersion() 5382 sql += " AND module.class = '%s' " % module.GetClass() 5383 cursor = self.getcursor() 5384 cursor.execute (sql.strip()); 5385 p = cursor.fetchone (); 5386 if not p: return [] 5387 5388 project = Project() 5389 project.SetName(p['name']) 5390 project.SetVersion(p['versiontxt']) 5391 project.SetId(p['project_id']) 5392 project_list = self.GetProjectDependencies(project.GetId(),metaproj.GetId()) 5393 project_list.insert(0,project) 5394 return project_list 5395
5396 - def GetProjects(self,metaproject_id):
5397 plist = [] 5398 for p in self.fetch_project_list(metaproject_id): 5399 project = Container() 5400 project.SetName(p['name']) 5401 project.SetVersion(p['versiontxt']) 5402 project.SetId(p['project_id']) 5403 plist.append(project) 5404 return plist 5405
5406 - def GetProjectDependencies(self,project_id,metaproject_id):
5407 dlist = [] 5408 for d in self.fetch_project_dependencies(project_id,metaproject_id): 5409 dependency = Container() 5410 dependency.SetName(d['name']) 5411 dependency.SetVersion(d['versiontxt']) 5412 dependency.SetId(d['project_id']) 5413 dlist.append(dependency) 5414 return dlist 5415
5416 - def GetServices(self,project_id):
5417 slist = [] 5418 for s in self.fetch_modules_from_project_id(project_id,'service'): 5419 service = Service() 5420 service.SetName(s['name']) 5421 service.SetClass(s['class']) 5422 service.SetId(s['module_id']) 5423 slist.append(service) 5424 return slist 5425
5426 - def GetServicesP(self,name,version):
5427 slist = [] 5428 for s in self.fetch_services_for_project(name,version): 5429 service = Service() 5430 service.SetName(s['name']) 5431 service.SetClass(s['class']) 5432 service.SetId(s['module_id']) 5433 slist.append(service) 5434 return slist 5435 5436
5437 - def GetModules(self,project_id):
5438 mlist = [] 5439 for m in self.fetch_modules_from_project_id(project_id): 5440 module = Module() 5441 module.SetName(m['name']) 5442 module.SetClass(m['class']) 5443 module.SetId(m['module_id']) 5444 mlist.append(module) 5445 return mlist 5446
5447 - def GetModulesP(self,name,version):
5448 mlist = [] 5449 for m in self.fetch_modules_for_project(name,version): 5450 module = Module() 5451 module.SetName(m['name']) 5452 module.SetClass(m['class']) 5453 module.SetId(m['module_id']) 5454 mlist.append(module) 5455 return mlist 5456
5457 - def GetIceProdModules(self):
5458 mlist = [] 5459 for m in self.fetch_iceprodmodules(): 5460 module = IceProdPre() 5461 module.SetName(m['name']) 5462 module.SetClass(m['class']) 5463 module.SetId(m['module_id']) 5464 mlist.append(module) 5465 return mlist 5466 5467
5468 - def GetParameters(self,module_id):
5469 plist = [] 5470 for param in self.fetch_service_parameters(module_id): 5471 parameter = Parameter() 5472 parameter.SetName(param['name']) 5473 parameter.SetType(param['type']) 5474 pid = param['parameter_id'] 5475 parameter.SetId(pid) 5476 5477 if parameter.GetType() == 'OMKeyv' : 5478 parameter.SetValue(self.select_omkey_array(pid)) 5479 elif parameter.GetType() == 'OMKey' : 5480 parameter.SetValue(self.select_omkey(pid)) 5481 elif parameter.GetType() in VectorTypes: 5482 parameter.SetValue(self.select_array(pid)) 5483 else: 5484 parameter.SetValue(Value(param['value'],param['unit'])) 5485 plist.append(parameter) 5486 return plist 5487 5488
5489 - def fetch_metaproject_list(self):
5490 """ 5491 retrive IDs for metaprojects in the database 5492 """ 5493 5494 sql = "SELECT * FROM metaproject" 5495 cursor = self.getcursor() 5496 sql = re.sub('\s+',' ',sql); 5497 cursor.execute (sql); 5498 result_set = cursor.fetchall (); 5499 5500 return map(self.nonify,result_set); 5501 5502
5503 - def fetch_project_list(self,metaproject_id=None):
5504 """ 5505 retrive IDs for projects in the database 5506 @param metaproject_id: table_id of metaproject 5507 """ 5508 5509 sql = """ 5510 SELECT 5511 project.project_id,project.name,project.versiontxt 5512 FROM project,mp_pivot """ 5513 if metaproject_id: 5514 sql += """ WHERE 5515 project.project_id = mp_pivot.project_id 5516 AND mp_pivot.metaproject_id = %s""" % metaproject_id 5517 5518 cursor = self.getcursor() 5519 sql = re.sub('\s+',' ',sql); 5520 cursor.execute (sql); 5521 result_set = cursor.fetchall (); 5522 return map(self.nonify,result_set); 5523
5524 - def fetch_project(self,id,table='project'):
5525 """ 5526 retrieve project with given id 5527 @param id: primary key of project 5528 @param table: table to query (project or metaprojects) 5529 """ 5530 sql="""SELECT * FROM %s WHERE %s_id ='%d' 5531 """ % (table,table,id) 5532 5533 sql = re.sub('\s+',' ',sql) 5534 cursor = self.getcursor() 5535 cursor.execute (sql); 5536 5537 return cursor.fetchall ()
5538 5539
5540 - def fetch_project_id(self,pname,pversion,table='project'):
5541 """ 5542 retrive id for project with matching name, version 5543 (there should only be one) 5544 @param pname: name of project to query 5545 @param pversion: tuple representing the major,minor,patch version 5546 @param table: table to query (project or metaprojects) 5547 """ 5548 sql="""SELECT %s_id FROM %s 5549 WHERE name ='%s' 5550 AND versiontxt ='%s' 5551 """ % (table,table,pname,pversion) 5552 5553 sql = re.sub('\s+',' ',sql) 5554 cursor = self.getcursor() 5555 cursor.execute (sql); 5556 result = cursor.fetchall () 5557 if result: 5558 return int(result[0]['%s_id' % table ]) 5559 else: 5560 self.logger.warn("project \'%s\' not found" % pname) 5561 return
5562 5563 5564 5565
5566 - def fetch_service_id(self,service,pid):
5567 return self.fetch_module_id(service,pid,'service') 5568
5569 - def fetch_module_id(self,module,pid,table='module'):
5570 """ 5571 retrive id for module with matching name, and project id 5572 (there should only be one) 5573 @param module: module to query 5574 """ 5575 if not pid: return None 5576 cname = module.GetClass() 5577 5578 sql =" SELECT module_id FROM module " 5579 sql +=" WHERE class ='%s' " % cname 5580 sql +=" AND project_id ='%d' """ % pid 5581 5582 sql = re.sub('\s+',' ',sql) 5583 self.logger.debug(sql) 5584 cursor = self.getcursor() 5585 cursor.execute (sql); 5586 result = cursor.fetchone() 5587 self.logger.debug(result) 5588 5589 if result: 5590 return int(result['module_id']) 5591 else: 5592 self.logger.error("%s \'%s\' not found" % (table,cname)) 5593 return 5594
5595 - def fetch_project_dependencies(self,project_id,metaproject_id):
5596 """ 5597 retrive dependencys for project 5598 @param project_id: id of project 5599 @return array of project names 5600 """ 5601 dependencies = [] 5602 5603 sql = " SELECT project.* " 5604 sql += " FROM project, project_depend " 5605 sql += " WHERE " 5606 sql += " project.project_id = project_depend.dependency_id " 5607 sql += " AND " 5608 sql += " project_depend.project_id = %d " % project_id 5609 sql += " AND " 5610 sql += " project_depend.metaproject_id = %d " % metaproject_id 5611 5612 cursor = self.getcursor() 5613 cursor.execute (sql.strip()); 5614 result_set = cursor.fetchall (); 5615 5616 return result_set 5617 5618
5619 - def fetch_modules_from_project_id(self,project_id,table='module'):
5620 """ 5621 retrive modules for with a given pid 5622 @param project_id: id of project 5623 """ 5624 sql=""" 5625 SELECT * FROM module WHERE project_id = %s 5626 AND module_type='%s' ORDER BY class 5627 """ % (project_id,table) 5628 5629 cursor = self.getcursor() 5630 sql = re.sub('\s+',' ',sql); 5631 cursor.execute (sql); 5632 result_set = cursor.fetchall (); 5633 5634 return map(self.nonify,result_set); 5635 5636
5637 - def fetch_modules_for_project(self,name,version):
5638 """ 5639 retrive modules for with a project given by 5640 name and version 5641 @param name: name of project 5642 @param version: version tuple 5643 """ 5644 pid = self.fetch_project_id(name,version) 5645 if pid: 5646 return self.fetch_modules_from_project_id(pid) 5647 else: 5648 return [] 5649
5650 - def fetch_services_for_project(self,name,version):
5651 """ 5652 retrive modules for with a project given by 5653 name and version 5654 @param name: name of project 5655 @param version: version tuple 5656 """ 5657 pid = self.fetch_project_id(name,version) 5658 if pid: 5659 return self.fetch_modules_from_project_id(pid,'service') 5660 else: 5661 return [] 5662
5663 - def fetch_iceprodmodules(self):
5664 """ 5665 retrive modules for with a project given by 5666 name and version 5667 """ 5668 pid = self.fetch_project_id('iceprod',iceprod.__version__) 5669 if pid: 5670 return self.fetch_modules_from_project_id(pid,'iceprod') 5671 else: 5672 return [] 5673
5674 - def fetch_module_parameters(self,module_id):
5675 """ 5676 retrive parameters for with a service/module given by 5677 the service/module id 5678 @param module_id: primary key in modules table on db 5679 """ 5680 sql="SELECT * FROM parameter WHERE module_id = %d" % module_id 5681 5682 cursor = self.getcursor() 5683 sql = re.sub('\s+',' ',sql); 5684 cursor.execute (sql); 5685 result_set = cursor.fetchall (); 5686 return map(self.nonify,result_set); 5687
5688 - def fetch_service_parameters(self,service_id):
5689 """ 5690 retrive parameters for with a service/module given by 5691 the service/module id 5692 @param service_id: primary key in services table on db 5693 """ 5694 return self.fetch_module_parameters(service_id) 5695
5696 - def load_mp_pivot(self,metaproject):
5697 """ 5698 Load cross-references between meta-projects (1) and projects 5699 The purpose of this is provide a way simulatneously for projects to 5700 reference which meta-projects they are members of and for 5701 meta-projects to list which projects belong to them (many-to-many). 5702 @param metaproject: Metaproject object 5703 """ 5704 5705 sql = """ 5706 INSERT IGNORE INTO mp_pivot 5707 (metaproject_id,project_id) VALUES 5708 """ 5709 cm = '' 5710 mid = metaproject.GetId() 5711 for p in metaproject.GetProjectList(): 5712 sql += "%s\n(%s,%s)" % (cm,mid,p.GetId()) 5713 cm = ',' 5714 5715 sql = re.sub('\s+',' ',sql) 5716 self.logger.debug(sql) 5717 cursor = self.getcursor() 5718 cursor.execute (sql) 5719 5720 self.logger.debug(self.insert_id()) 5721 self.logger.debug("%d mp_pivot rows were inserted" % cursor.rowcount) 5722 cursor.execute("SHOW WARNINGS") 5723 warn = cursor.fetchone() 5724 if warn: self.logger.warn(warn) 5725 return 5726
5727 - def load_dependencies(self,project,metaproject):
5728 """ 5729 Load cross-references between projects (1) and depency projects 5730 @param project: Project object 5731 """ 5732 5733 sql = " INSERT IGNORE INTO project_depend " 5734 sql += " (project_id,metaproject_id,dependency_id) " 5735 sql += " VALUES " 5736 pid = project.GetId() 5737 mpid = metaproject.GetId() 5738 cursor = self.getcursor() 5739 5740 for p in project.GetDependencies(): 5741 if not p: continue 5742 self.logger.info("%s - getting project dependency: %s" % \ 5743 (project.GetName(),p.GetName())) 5744 if not metaproject.GetProject(p.GetName()): 5745 self.logger.fatal('failed dependency:%s needs %s' % \ 5746 (project.GetName(),p.GetName()) ) 5747 os._exit(1) 5748 did = metaproject.GetProject(p.GetName()).GetId() 5749 sql2 = sql + "(%s,%s,%s)" % (pid,mpid,did) 5750 try: 5751 cursor.execute (sql2) 5752 self.logger.info( 5753 "%d project_depend rows were inserted" % cursor.rowcount) 5754 except Exception, e: 5755 self.logger.error(e) 5756 return 5757 5758 5759
5760 - def load_projects(self,metaproject):
5761 """ 5762 Load projects to database 5763 @param metaproject: MetaProject object 5764 """ 5765 for proj in metaproject.GetProjectList(): 5766 self.load_project(proj) 5767 self.load_services(proj) 5768 self.load_modules(proj) 5769 return 5770
5771 - def load_project_dependencies(self,metaproject):
5772 """ 5773 Load project dependencies to database 5774 @param metaproject: MetaProject object 5775 """ 5776 for proj in metaproject.GetProjectList(): 5777 self.load_dependencies(proj,metaproject) 5778 return 5779
5780 - def load_metaproject(self,metaproject):
5781 return self.load_project(metaproject,"metaproject") 5782 5783
5784 - def load_project(self,project,table="project"):
5785 """ 5786 Load project to database 5787 @param project: the project to be loaded 5788 @param table: table to which project should be loaded 5789 """ 5790 pid = self.fetch_project_id(project.GetName(),project.GetVersion(),table) 5791 self.logger.debug("%s %s.%s pid is %s" % (table,project.GetName(),project.GetVersion(),pid)) 5792 if not pid: 5793 sql = "INSERT INTO %s " % table 5794 sql += "(name, versiontxt,major_version,minor_version,patch_version) " 5795 sql += " VALUES " 5796 5797 ver = project.GetVersion() 5798 name = project.GetName() 5799 vt = ('00','00','00') 5800 # We still need to fill major,minor,path for back 5801 # compatibility 5802 legacy_ver = self.version_regex.search(ver) 5803 if legacy_ver: 5804 legacy_ver = legacy_ver.group(0).replace('V','') 5805 vt = legacy_ver.split('-') 5806 sql += "('%s','%s','%s','%s','%s')" % (name,ver,vt[0],vt[1],vt[2]) 5807 self.logger.debug(sql) 5808 5809 cursor = self.getcursor() 5810 cursor.execute(sql) 5811 pid = self.insert_id() 5812 if pid: 5813 self.logger.debug("inserted id %d" % pid) 5814 else: 5815 self.logger.error( 5816 "could not load project %s to parameter database" % project.GetName()) 5817 5818 project.SetId(pid) 5819 self.logger.debug("%s.GetId(): %s" % (project.GetName(),project.GetId())) 5820 return pid 5821 5822
5823 - def load_modules(self,project):
5824 """ 5825 Load modules into the database. 5826 @param project: Project object 5827 """ 5828 for module in project.GetModules(): 5829 self.load_module(module,project) 5830 self.load_params(module) 5831 return 5832
5833 - def load_services(self,project):
5834 """ 5835 Load services into the database. 5836 @param project: Project object 5837 """ 5838 for service in project.GetServices(): 5839 self.load_service(service,project) 5840 self.load_params(service) 5841 return 5842 5843
5844 - def load_module(self,module,project,type='module'):
5845 """ 5846 Load individual module to database 5847 @param module: object to load 5848 """ 5849 pid = project.GetId() 5850 mid = self.fetch_module_id(module,pid,type) 5851 if not mid: 5852 sql = " INSERT INTO module " 5853 sql += " (name,class,project_id,module_type) " 5854 sql += " VALUES " 5855 sql += " (\'%s\',\'%s\',%d,'%s') " % (module.GetName(),module.GetClass(),pid,type) 5856 5857 self.logger.debug(sql.strip()) 5858 5859 cursor = self.getcursor() 5860 cursor.execute(sql.strip()) 5861 mid = self.insert_id() 5862 5863 cursor.execute("SHOW WARNINGS") 5864 warn = cursor.fetchone() 5865 if warn: self.logger.warn(warn) 5866 5867 cursor.execute("SHOW ERRORS") 5868 err = cursor.fetchone() 5869 if err: self.logger.error(err) 5870 5871 rowcount = cursor.rowcount 5872 if mid: 5873 self.logger.debug("inserted module id %d" % mid) 5874 else: 5875 self.logger.error("could not fetch id for '%s'" % module.GetClass()) 5876 5877 module.SetId(mid) 5878 return mid 5879
5880 - def load_service(self,service,project):
5881 """ 5882 Load individual service to database 5883 @param service: object to load 5884 """ 5885 return self.load_module(service,project,type='service') 5886
5887 - def load_iceprodmodules(self,module,project):
5888 """ 5889 Load individual service to database 5890 @param module: object to load 5891 """ 5892 return self.load_module(service,project,type='iceprod') 5893 5894
5895 - def load_params(self,module):
5896 """ 5897 Load parameters for module or service to database 5898 @param module: object whose parameter will be loaded 5899 """ 5900 cursor = self.getcursor() 5901 sql = " INSERT IGNORE INTO parameter " 5902 sql += " (name,type,unit,description,module_id,value) " 5903 sql += " VALUES " 5904 sql2 = '' 5905 count = 0 5906 5907 m_id = module.GetId() 5908 self.logger.debug('module_id %d'% m_id) 5909 5910 cm = '' 5911 if not module.GetParameters(): return 5912 for p in module.GetParameters(): 5913 name = p.GetName() 5914 type = p.GetType() 5915 desc = p.GetDescription() 5916 desc = re.sub(r'\"','&quot;',desc) # defang quotes for sql 5917 desc = re.sub(r'\'','&quot;',desc) # defang quotes for sql 5918 5919 if type == 'OMKey' or type in VectorTypes: 5920 value = 0 5921 sql1 = sql + " ('%s','%s','%s','%s',%d,'%s') " % \ 5922 (name,type,'NULL',desc,m_id,value) 5923 cursor.execute (sql1.strip()) 5924 pid = self.insert_id() 5925 cursor.execute ('show warnings') 5926 retval = cursor.fetchall () 5927 if not pid: 5928 sql1a = " SELECT parameter_id FROM parameter " 5929 sql1a += " WHERE name='%s' " % name 5930 sql1a += " AND module_id=%d " % m_id 5931 self.logger.debug(sql1a.strip()) 5932 cursor.execute (sql1a.strip()) 5933 pid = cursor.fetchone () 5934 if pid: pid = pid['parameter_id'] 5935 else: raise Exception,"Failed to insert/fetch parameter id" 5936 5937 count = count + cursor.rowcount 5938 if type == 'OMKey': 5939 self.insert_omkey(p.GetValue(),pid) 5940 elif type == 'OMKeyv': 5941 self.insert_omkey_array(p.GetValue(),pid) 5942 else: 5943 self.insert_array(p.GetValue(),pid) 5944 5945 else: 5946 value = p.GetValue().value 5947 unit = self.nullify(p.GetValue().unit) 5948 sql2 += "%s (\'%s\',\'%s\',%s,\"%s\",%d,\'%s\') " % \ 5949 (cm,name,type,unit,desc,m_id,value) 5950 cm = ',' 5951 5952 if sql2: 5953 sql += sql2 5954 self.logger.debug(sql.strip()) 5955 cursor.execute(sql.strip()) 5956 cursor.execute ('show warnings') 5957 retval = cursor.fetchall () 5958 count = count + cursor.rowcount 5959 self.logger.debug("%d parameter rows were inserted " % count)
5960
5961 - def insert_omkey(self,omkey,pid):
5962 cursor = self.getcursor() 5963 sql = " INSERT INTO array_element (name,value,parameter_id) " 5964 sql += " VALUES " 5965 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid) 5966 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid) 5967 self.logger.debug(sql.strip()) 5968 cursor.execute (sql.strip()) 5969
5970 - def insert_omkey_array(self,omkeyvect,pid):
5971 cursor = self.getcursor() 5972 sql = " INSERT INTO array_element (name,value,parameter_id) " 5973 sql += " VALUES " 5974 cm = "" 5975 if len(omkeyvect) < 1: return 5976 for omkey in omkeyvect: 5977 sql += cm 5978 sql += " ('%s','%s',%s), " % ('stringid',omkey.stringid,pid) 5979 sql += " ('%s','%s',%s) " % ('omid',omkey.omid,pid) 5980 cm = "," 5981 cursor.execute (sql.strip()) 5982
5983 - def insert_array(self,values,pid):
5984 cursor = self.getcursor() 5985 sql = " INSERT INTO array_element (value,unit,parameter_id) " 5986 sformat = lambda x: "('%s','%s',%s)" % (x.value,self.nullify(x.unit),pid) 5987 vals = ",".join(map(sformat,values)) 5988 if len(vals) > 0: 5989 sql += " VALUES " + vals 5990 cursor.execute (sql.strip()) 5991
5992 - def select_array(self,pid):
5993 cursor = self.getcursor() 5994 sql = " SELECT * from array_element " 5995 sql += " WHERE parameter_id = %d " % pid 5996 cursor.execute (sql.strip()) 5997 result_set = cursor.fetchall(); 5998 vect = [] 5999 for item in result_set: 6000 vect.append(Value(item['value'],self.nonify(item['unit']))) 6001 return vect 6002
6003 - def select_omkey(self,pid):
6004 omkeys = self.select_omkey_array(pid) 6005 if len(omkeys) < 1: 6006 raise Exception,'could not find omkey for param %d' % pid 6007 return omkeys[0] 6008
6009 - def select_omkey_array(self,pid):
6010 cursor = self.getcursor() 6011 sql = " SELECT * from array_element " 6012 sql += " WHERE parameter_id = %d order by array_element_id" % pid 6013 cursor.execute (sql.strip()) 6014 result_set = cursor.fetchall(); 6015 omkeyvect = [] 6016 for item in result_set: 6017 if item['name'] == 'stringid': 6018 omkey = pyOMKey(0,0) 6019 omkey.stringid = item['value'] 6020 elif item['name'] == 'omid': 6021 omkey.omid = item['value'] 6022 omkeyvect.append(omkey) 6023 else: 6024 raise Exception,'expected omkey but found %s' % result_set[1]['name'] 6025 return omkeyvect 6026
6027 -class SoapParamDB:
6028
6029 - def __init__(self,pdb):
6030 self.paramdb = pdb
6031
6032 - def Register(self,parent):
6033 6034 def LoadParams(metaproject,username,passwd): 6035 db = self.paramdb.new() 6036 db.disconnect() 6037 if parent.auth_db(db,username, passwd,keep_open=True): 6038 retval = db.load(loads(metaproject)) 6039 db.disconnect() 6040 logger.info("Uploaded metaproject for %s %s %s" % (username,db.db_,db.host_)) 6041 return retval 6042 else: 6043 logger.error("Failed to authenticate %s on %s@%s with password" % (username,db.db_,db.host_)) 6044 return 0
6045 parent.server.register_function(LoadParams) 6046 6047 def DownloadParams(): 6048 return dumps(self.paramdb.download())
6049 parent.server.register_function(DownloadParams) 6050 6051 def SwitchMetaProject(iconfig,id,name,version): 6052 return dumps(self.paramdb.SwitchMetaProject(loads(iconfig),id,name,loads(version))) 6053 parent.server.register_function(SwitchMetaProject) 6054 6055 def GetMetaProjects(): 6056 return dumps(self.paramdb.GetMetaProjects()) 6057 parent.server.register_function(GetMetaProjects) 6058 6059 def GetProjectsSM(module,metaproj): 6060 return dumps(self.paramdb.GetProjectsSM(loads(module),loads(metaproj))) 6061 parent.server.register_function(GetProjectsSM) 6062 6063 def GetProjectsMM(module,metaproj): 6064 return dumps(self.paramdb.GetProjectsMM(loads(module),loads(metaproj))) 6065 parent.server.register_function(GetProjectsMM) 6066 6067 def GetProjects(metaproject_id): 6068 return dumps(self.paramdb.GetProjects(metaproject_id)) 6069 parent.server.register_function(GetProjects) 6070 6071 def GetProjectDependencies(project_id,metaproject_id): 6072 return dumps(self.paramdb.GetProjectDependencies(project_id,metaproject_id)) 6073 parent.server.register_function(GetProjectDependencies) 6074 6075 def GetServices(project_id): 6076 return dumps(self.paramdb.GetServices(project_id)) 6077 parent.server.register_function(GetServices) 6078 6079 def GetServicesP(name,version): 6080 return dumps(self.paramdb.GetServicesP(name,loads(version))) 6081 parent.server.register_function(GetServicesP) 6082 6083 def GetModules(project_id): 6084 return dumps(self.paramdb.GetModules(project_id)) 6085 parent.server.register_function(GetModules) 6086 6087 def GetModulesP(name,version): 6088 return dumps(self.paramdb.GetModulesP(name,loads(version))) 6089 parent.server.register_function(GetModulesP) 6090 6091 def GetIceProdModules(): 6092 return dumps(self.paramdb.GetIceProdModules()) 6093 parent.server.register_function(GetIceProdModules) 6094 6095 def GetParameters(module_id): 6096 return dumps(self.paramdb.GetParameters(module_id)) 6097 parent.server.register_function(GetParameters) 6098 6099 def fetch_metaproject_id(name, version): 6100 return self.paramdb.fetch_metaproject_id(name,loads(version)) 6101 parent.server.register_function(fetch_metaproject_id) 6102 6103 def fetch_metaproject_list(): 6104 return dumps(self.paramdb.fetch_metaproject_list()) 6105 parent.server.register_function(fetch_metaproject_list) 6106 6107 def fetch_project_list(metaproject_id): 6108 return dumps(self.paramdb.fetch_project_list(metaproject_id)) 6109 parent.server.register_function(fetch_project_list) 6110 6111 def fetch_project(id): 6112 return dumps(self.paramdb.fetch_project(id)) 6113 parent.server.register_function(fetch_project) 6114 6115 def fetch_project_id(pname,pversion): 6116 return dumps(self.paramdb.fetch_project_id(name,loads(pversion))) 6117 parent.server.register_function(fetch_project_id) 6118 6119 def fetch_service_id(service,pid): 6120 return self.paramdb.fetch_service_id(loads(service),pid) 6121 parent.server.register_function(fetch_service_id) 6122 6123 def fetch_module_id(module,mid): 6124 return self.paramdb.fetch_module_id(loads(module),mid) 6125 parent.server.register_function(fetch_module_id) 6126 6127 def fetch_project_dependencies(project_id,metaproject_id): 6128 return dumps(self.paramdb.fetch_project_dependencies(project_id,metaproject_id)) 6129 parent.server.register_function(fetch_project_dependencies) 6130 6131 def fetch_modules_from_project_id(project_id): 6132 return dumps(self.paramdb.fetch_modules_from_project_id(project_id)) 6133 parent.server.register_function(fetch_modules_from_project_id) 6134 6135 def fetch_modules_for_project(name,version): 6136 return dumps(self.paramdb.fetch_modules_for_project(name,loads(version))) 6137 parent.server.register_function(fetch_modules_for_project) 6138 6139 def fetch_services_for_project(name,version): 6140 return dumps(self.paramdb.fetch_services_for_project(name,loads(version))) 6141 parent.server.register_function(fetch_services_for_project) 6142 6143 def fetch_module_parameters(module_id): 6144 return dumps(self.paramdb.fetch_module_parameters(module_id)) 6145 parent.server.register_function(fetch_module_parameters) 6146 6147 def fetch_service_parameters(module_id): 6148 return dumps(self.paramdb.fetch_service_parameters(module_id)) 6149 parent.server.register_function(fetch_service_parameters) 6150 6151 6152 if __name__ == '__main__': 6153 6154 from xmlparser import IceTrayXMLParser 6155 from xmlwriter import IceTrayXMLWriter 6156 6157 if len(sys.argv) < 2: 6158 print 'Usage: python config.py <xml in> <xml out>' 6159 sys.exit() 6160 6161 # Instantiate the IceTrayConfigConfig 6162 steering = Steering() 6163 6164 i3db = ConfigDB() 6165 passwd = getpass.getpass() 6166 i3db.authenticate('dbs2.icecube.wisc.edu','i3iceprod-uw',passwd,'i3iceprod',True) 6167 runid = int(sys.argv[1]) 6168 i3config = i3db.download_config(runid,include_defaults=True,include_description=False) 6169 i3db.disconnect() 6170 writer = IceTrayXMLWriter(i3config) 6171 writer.write_to_file(sys.argv[2]) 6172