Package netlogger :: Package analysis :: Package workflow :: Module stampede_statistics
[hide private]
[frames] | no frames]

Source Code for Module netlogger.analysis.workflow.stampede_statistics

   1  """ 
   2  Library to generate statistics from the new Stampede 3.1 backend. 
   3   
   4  Usage:: 
   5   stats = StampedeStatistics(connString='sqlite:///montage.db') 
   6   stats.initialize('unique_wf_uuid') 
   7   stats.set_job_filter('dax') 
   8   print stats.get_total_jobs_status() 
   9   print stats.get_total_jobs_statistics() 
  10   stats.set_job_filter('dag') 
  11   print stats.get_total_jobs_status() 
  12   print stats.get_total_jobs_statistics() 
  13   etc. 
  14    
  15  Constructor and initialize methods: 
  16   
  17  The constructor takes a required sqlalchemy connection string 
  18  as the first argument.  The stats class will default to returning 
  19  data in the "expanded workflow" mode.  To change this behavior 
  20  and only analyize a single workflow set the optional arg: 
  21   
  22  expand_workflow = False 
  23   
  24  along with the connection string argument. 
  25   
  26  The initialize method is called with a single argument - the wf_uuid 
  27  of the desired "root workflow" whether returning data in expanded 
  28  mode or not.  The method will return True or False if a query 
  29  exception is raised so the programmer can test for success before 
  30  calling the subsequent query methods.  This method is intended 
  31  to be called once per object. 
  32   
  33  Job filtering: 
  34   
  35  Jobs can be filtered using any of the strings in the jobtype ENUM,  
  36  with the addition of the values 'all' and 'nonsub' which will 
  37  return all jobs and non-subworkflow jobs respectively.  If the  
  38  filter is not explicitly set, it will default to the 'all' mode. 
  39   
  40  The desired filter can be set with the set_job_filter() method. After 
  41  setting this method, all subsequent calls to the query methods will 
  42  return results according to the filter.  This can be set and reset 
  43  as many times as the user desires.  There is an example of re/setting  
  44  the job filter in the usage section above.  The query methods 
  45  will return different values after the filter is re/set. 
  46   
  47  Return values from methods: 
  48   
  49  The return value types will vary from method to method.  Most of 
  50  the methods will return a single integer or floating point number. 
  51   
  52  Methods which return rows from the DB (rather than just a number)  
  53  will return a list which can be interacted with in one of two  
  54  ways - either by array index (list of tuples) or by a named attr 
  55  (list of objects).  The two following methods of interacting with  
  56  the same query results will both produce the same output: 
  57   
  58  Example:: 
  59   for row in s.get_job_kickstart(): 
  60       print row[0], row[1], row[2] 
  61       print row.job_id, row.job_name, row.kickstart 
  62   
  63  Either syntax will work.  When using the named attribute method, the 
  64  attributes are the names of the columns/aliases in the SELECT  
  65  stanza of the query.  If the row returned by the method is printed,  
  66  it will display as a tuple of results per row. 
  67   
  68  Methods:: 
  69   get_sub_workflow_ids 
  70   get_descendant_workflow_ids 
  71   get_total_jobs_status 
  72   get_total_succeeded_jobs_status 
  73   get_total_failed_jobs_status 
  74   get_total_unknown_jobs_status 
  75   get_total_tasks_status 
  76   get_total_succeeded_tasks_status 
  77   get_total_failed_tasks_status 
  78   get_total_jobs_statistics 
  79   get_total_succeeded_jobs_statistics 
  80   get_total_failed_jobs_statistics 
  81   get_total_tasks_statistics 
  82   get_total_succeeded_tasks_statistics 
  83   get_total_failed_tasks_statistics 
  84   get_workflow_wall_time 
  85   get_workflow_cum_job_wall_time 
  86   get_submit_side_job_wall_time 
  87   get_job_name 
  88   get_job_site 
  89   get_job_kickstart 
  90   get_job_runtime 
  91   get_job_seqexec 
  92   get_job_seqexec_delay 
  93   get_condor_q_time 
  94   get_resource_delay 
  95   get_dagman_delay 
  96   get_post_time 
  97   get_transformation_statistics 
  98    
  99  Methods listed in order of query list on wiki. 
 100   
 101  https://confluence.pegasus.isi.edu/display/pegasus/Pegasus+statistics+python+version 
 102  """ 
 103  __rcsid__ = "$Id: stampede_statistics.py 28074 2011-06-09 15:50:35Z mgoode $" 
 104  __author__ = "Monte Goode" 
 105   
 106  import decimal 
 107   
 108  from netlogger.analysis.modules._base import SQLAlchemyInit 
 109  from netlogger.analysis.schema.stampede_schema import * 
 110  from netlogger.nllog import DoesLogging, get_logger 
 111   
112 -class StampedeStatistics(SQLAlchemyInit, DoesLogging):
113 - def __init__(self, connString=None, expand_workflow=True):
114 if connString is None: 115 raise ValueError("connString is required") 116 DoesLogging.__init__(self) 117 SQLAlchemyInit.__init__(self, connString, initializeToPegasusDB) 118 119 self._expand = expand_workflow 120 121 self._root_wf_id = None 122 self._root_wf_uuid = None 123 self._filter_mode = None 124 125 self._wfs = [] 126 pass
127
128 - def initialize(self, root_wf_uuid):
129 self.log.debug('initialize') 130 self._root_wf_uuid = root_wf_uuid 131 q = self.session.query(Workflow.wf_id).filter(Workflow.wf_uuid == self._root_wf_uuid) 132 133 try: 134 self._root_wf_id = q.one().wf_id 135 except orm.exc.MultipleResultsFound, e: 136 self.log.error('initialize', 137 msg='Multiple results found for wf_uuid: %s' % root_wf_uuid) 138 return False 139 except orm.exc.NoResultFound, e: 140 self.log.error('initialize', 141 msg='No results found for wf_uuid: %s' % root_wf_uuid) 142 return False 143 144 if self._expand: 145 q = self.session.query(Workflow.wf_id).filter(Workflow.root_wf_id == self._root_wf_id) 146 for row in q.all(): 147 self._wfs.append(row.wf_id) 148 else: 149 self._wfs.append(self._root_wf_id) 150 # Initialize filter with default value 151 self.set_job_filter() 152 return True
153
154 - def set_job_filter(self, filter='all'):
155 modes = ['all', 'nonsub', 'dax', 'dag', 'compute', 'stage-in-tx', 156 'stage-out-tx', 'registration', 'inter-site-tx', 'create-dir', 157 'staged-compute', 'cleanup', 'chmod'] 158 try: 159 modes.index(filter) 160 self._filter_mode = filter 161 self.log.debug('set_job_filter', msg='Setting filter to: %s' % filter) 162 except: 163 self._filter_mode = 'all' 164 self.log.error('set_job_filter', msg='Unknown job filter %s - setting to any' % filter)
165 166 # 167 # Pulls information about sub workflows 168 # 169
170 - def get_sub_workflow_ids(self):
171 """ 172 Returns info on child workflows only. 173 """ 174 q = self.session.query(Workflow.wf_id, Workflow.wf_uuid) 175 q = q.filter(Workflow.parent_wf_id == self._root_wf_id) 176 return q.all()
177
179 q = self.session.query(Workflow.wf_id, Workflow.wf_uuid) 180 q = q.filter(Workflow.root_wf_id == self._root_wf_id) 181 q = q.filter(Workflow.wf_id != self._root_wf_id) 182 return q.all()
183 184 # 185 # Status of initially planned wf components. 186 # 187
188 - def _get_job_filter(self):
189 filters = { 190 'all': None, 191 'nonsub': not_(self._dax_or_dag_cond()), 192 'dax': Job.type_desc == 'dax', 193 'dag': Job.type_desc == 'dag', 194 'compute': Job.type_desc == 'compute', 195 'stage-in-tx': Job.type_desc == 'stage-in-tx', 196 'stage-out-tx': Job.type_desc == 'stage-out-tx', 197 'registration': Job.type_desc == 'registration', 198 'inter-site-tx': Job.type_desc == 'inter-site-tx', 199 'create-dir': Job.type_desc == 'create-dir', 200 'staged-compute': Job.type_desc == 'staged-compute', 201 'cleanup': Job.type_desc == 'cleanup', 202 'chmod': Job.type_desc == 'chmod', 203 } 204 return filters[self._filter_mode]
205
206 - def _max_job_seq_subquery(self):
207 """ 208 Creates the following subquery that is used in 209 several queries: 210 and jb_inst.job_submit_seq = ( 211 select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id 212 ) 213 """ 214 JobInstanceSub = orm.aliased(JobInstance) 215 sub_q = self.session.query(func.max(JobInstanceSub.job_submit_seq).label('max_id')) 216 sub_q = sub_q.filter(JobInstanceSub.job_id == JobInstance.job_id).correlate(JobInstance) 217 sub_q = sub_q.group_by(JobInstanceSub.job_id).subquery() 218 return sub_q
219
220 - def _dax_or_dag_cond(self):
221 return or_(Job.type_desc == 'dax', Job.type_desc == 'dag')
222
223 - def get_total_jobs_status(self):
224 """ 225 select 226 ( 227 select count(*) from job as jb where jb.wf_id in (1,2,3) and not 228 (jb.type_desc = 'dax'or jb.type_desc = 'dag') 229 ) 230 + 231 ( 232 select count(*) from 233 ( 234 select jb_inst.job_id from job_instance as jb_inst , job as jb 235 where jb.wf_id in (1,2,3) 236 and jb_inst.job_id = jb.job_id 237 and (jb.type_desc ='dax' or jb.type_desc ='dag' ) 238 and jb_inst.subwf_id is null 239 and jb_inst.job_submit_seq = ( 240 select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id 241 ) 242 ) 243 ) as total_jobs 244 245 """ 246 if not self._expand: 247 q = self.session.query(Job) 248 q = q.filter(Job.wf_id.in_(self._wfs)) 249 if self._get_job_filter() is not None: 250 q = q.filter(self._get_job_filter()) 251 return q.count() 252 else: 253 q = self.session.query(Job.job_id) 254 q = q.filter(Job.wf_id.in_(self._wfs)) 255 q = q.filter(not_(self._dax_or_dag_cond())) 256 job_count = q.count() 257 258 sub_q = self._max_job_seq_subquery() 259 260 q = self.session.query(JobInstance.job_id) 261 q = q.filter(Job.wf_id.in_(self._wfs)) 262 q = q.filter(JobInstance.job_id == Job.job_id) 263 q = q.filter(JobInstance.subwf_id == None) 264 q = q.filter(JobInstance.job_submit_seq == sub_q.as_scalar()) 265 q = q.filter(self._dax_or_dag_cond()) 266 job_instance_count = q.count() 267 268 return job_count + job_instance_count
269
271 """ 272 select DISTINCT count(jb.job_id) 273 from 274 job as jb, 275 job_instance as jb_inst, 276 jobstate as jb_state 277 where jb.wf_id in( 278 1,2,3 279 ) 280 and jb.job_id = jb_inst.job_id 281 and not (jb.type_desc ='dax' or jb.type_desc ='dag') 282 and jb_inst.job_instance_id = jb_state.job_instance_id 283 and jb_state.state ='JOB_SUCCESS' 284 """ 285 q = self.session.query(Job.job_id).distinct() 286 q = q.filter(Job.wf_id.in_(self._wfs)) 287 q = q.filter(Job.job_id == JobInstance.job_id) 288 q = q.filter(JobInstance.job_instance_id == Jobstate.job_instance_id) 289 q = q.filter(Jobstate.state == 'JOB_SUCCESS') 290 # jobtype filtering 291 if not self._expand and self._get_job_filter() is not None: 292 q = q.filter(self._get_job_filter()) 293 else: 294 q = q.filter(not_(self._dax_or_dag_cond())) 295 296 return q.count()
297
299 """ 300 select count(*) from 301 ( 302 select jb_inst.job_instance_id 303 from job as jb, job_instance as jb_inst , jobstate as jb_state 304 where jb_inst.job_submit_seq = ( 305 select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id 306 ) 307 and jb.wf_id in (1,2,3) 308 and jb.job_id = jb_inst.job_id 309 and jb_inst.job_instance_id = jb_state.job_instance_id 310 and ( 311 (not (jb.type_desc ='dax' or jb.type_desc ='dag')) 312 or 313 ((jb.type_desc ='dax' or jb.type_desc ='dag') and jb_inst.subwf_id is NULL) 314 ) 315 and jb_state.state in ('JOB_FAILURE') 316 ) 317 """ 318 sub_q = self._max_job_seq_subquery() 319 320 q = self.session.query(JobInstance.job_instance_id) 321 q = q.filter(JobInstance.job_submit_seq == sub_q.as_scalar()) 322 q = q.filter(Job.wf_id.in_(self._wfs)) 323 q = q.filter(Job.job_id == JobInstance.job_id) 324 q = q.filter(JobInstance.job_instance_id == Jobstate.job_instance_id) 325 q = q.filter(Jobstate.state.in_(['JOB_FAILURE'])) # why in and not == ? 326 # jobtype filtering 327 if not self._expand and self._get_job_filter() is not None: 328 q = q.filter(self._get_job_filter()) 329 else: 330 d_or_d = self._dax_or_dag_cond() 331 q = q.filter(or_(not_(d_or_d), and_(d_or_d, JobInstance.subwf_id == None))) 332 333 return q.count()
334
335 - def _query_jobstate_for_instance(self, states):
336 """ 337 The states arg is a list of strings. 338 Returns an appropriate subquery. 339 """ 340 q = self.session.query(Jobstate.job_instance_id) 341 q = q.filter(Jobstate.job_instance_id == JobInstance.job_instance_id).correlate(JobInstance) 342 q = q.filter(Jobstate.state.in_(states)).subquery() 343 return q
344
346 """ 347 select count(*) from job_instance jb_inst , job as jb 348 where jb_inst.job_submit_seq = ( 349 select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id 350 ) 351 and jb_inst.job_instance_id in 352 ( 353 select js.job_instance_id from jobstate as js 354 where js.job_instance_id = jb_inst.job_instance_id 355 and js.state = 'SUBMIT' 356 ) 357 and jb_inst.job_instance_id not in 358 ( 359 select js.job_instance_id from jobstate as js 360 where js.job_instance_id = jb_inst.job_instance_id 361 and js.state in ( 'JOB_SUCCESS', 'JOB_FAILURE') 362 ) 363 and jb_inst.job_id = jb.job_id 364 and jb.wf_id in ( 365 1,2,3 366 ) 367 and not (jb.type_desc ='dax' or jb.type_desc ='dag' ) 368 """ 369 subq_1 = self._query_jobstate_for_instance(['SUBMIT']) 370 subq_2 = self._query_jobstate_for_instance(['JOB_SUCCESS', 'JOB_FAILURE']) 371 372 maxsub_q = self._max_job_seq_subquery() 373 374 q = self.session.query(Job.job_id) 375 q = q.filter(JobInstance.job_submit_seq == maxsub_q.as_scalar()) 376 q = q.filter(JobInstance.job_instance_id.in_(subq_1)) 377 q = q.filter(not_(JobInstance.job_instance_id.in_(subq_2))) 378 q = q.filter(Job.job_id == JobInstance.job_id) 379 q = q.filter(Job.wf_id.in_(self._wfs)) 380 # jobtype filtering 381 if not self._expand and self._get_job_filter() is not None: 382 q = q.filter(self._get_job_filter()) 383 else: 384 q = q.filter(not_(self._dax_or_dag_cond())) 385 386 return q.count()
387
388 - def get_total_tasks_status(self):
389 """ 390 select count(*) from task where wf_id in ( 391 1,2,3 392 ) 393 """ 394 return self.session.query(Task).filter(Task.wf_id.in_(self._wfs)).count()
395
396 - def _base_task_status_query(self):
397 """ 398 select count(*) from 399 task as tk, 400 job_instance as jb_inst, 401 job as jb, 402 invocation as invoc 403 where invoc.wf_id in ( 404 1,2,3 405 ) 406 and jb_inst.job_submit_seq = ( 407 select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id 408 ) 409 and tk.wf_id in ( 410 1,2,3 411 ) 412 and jb.job_id = jb_inst.job_id 413 and jb_inst.job_instance_id = invoc.job_instance_id 414 and tk.abs_task_id = invoc.abs_task_id 415 and tk.wf_id = invoc.wf_id 416 and invoc.exitcode = 0 417 """ 418 sub_q = self._max_job_seq_subquery() 419 420 q = self.session.query(Task.task_id) 421 q = q.filter(Invocation.wf_id.in_(self._wfs)) 422 q = q.filter(JobInstance.job_submit_seq == sub_q.as_scalar()) 423 q = q.filter(Task.wf_id.in_(self._wfs)) 424 q = q.filter(Job.job_id == JobInstance.job_id) 425 q = q.filter(JobInstance.job_instance_id == Invocation.job_instance_id) 426 q = q.filter(Task.abs_task_id == Invocation.abs_task_id) 427 q = q.filter(Task.wf_id == Invocation.wf_id) 428 return q
429
431 q = self._base_task_status_query() 432 q = q.filter(Invocation.exitcode == 0) 433 return q.count()
434
436 q = self._base_task_status_query() 437 q = q.filter(Invocation.exitcode != 0) 438 return q.count()
439 440 # 441 # Statistics of actually run wf components. 442 # 443
444 - def get_total_jobs_statistics(self):
445 """ 446 select count(*) as total_jobs 447 from 448 job_instance as jb_inst , 449 job as jb 450 where 451 jb_inst.job_id = jb.job_id 452 and jb.wf_id in ( 453 1,2,3 454 ) 455 and ( 456 (not (jb.type_desc ='dax' or jb.type_desc ='dag')) 457 or 458 ((jb.type_desc ='dax' or jb.type_desc ='dag') and jb_inst.subwf_id is NULL) 459 ) 460 """ 461 q = self.session.query(Job, JobInstance) 462 q = q.filter(Job.job_id == JobInstance.job_id) 463 q = q.filter(Job.wf_id.in_(self._wfs)) 464 # jobtype filtering 465 if not self._expand and self._get_job_filter() is not None: 466 q = q.filter(self._get_job_filter()) 467 else: 468 d_or_d = self._dax_or_dag_cond() 469 q = q.filter(or_(not_(d_or_d), and_(d_or_d, JobInstance.subwf_id == None))) 470 471 return q.count()
472
474 """ 475 select DISTINCT count(jb.job_id) 476 from 477 job as jb, 478 job_instance as jb_inst, 479 jobstate as jb_state 480 where jb.wf_id in( 481 1,2,3 482 ) 483 and jb.job_id = jb_inst.job_id 484 and not (jb.type_desc ='dax' or jb.type_desc ='dag') 485 and jb_inst.job_instance_id = jb_state.job_instance_id 486 and jb_state.state ='JOB_SUCCESS' 487 """ 488 q = self.session.query(Job.job_id).distinct() 489 q = q.filter(Job.wf_id.in_(self._wfs)) 490 q = q.filter(Job.job_id == JobInstance.job_id) 491 q = q.filter(JobInstance.job_instance_id == Jobstate.job_instance_id) 492 q = q.filter(Jobstate.state == 'JOB_SUCCESS') 493 # jobtype filtering 494 if not self._expand and self._get_job_filter() is not None: 495 q = q.filter(self._get_job_filter()) 496 else: 497 q = q.filter(not_(self._dax_or_dag_cond())) 498 499 return q.count()
500
502 """ 503 select count(*) as job_failure 504 from 505 jobstate as jb_state , 506 job_instance jb_inst, 507 job as jb 508 where jb.wf_id in( 509 1,2,3 510 ) 511 and jb_state.job_instance_id = jb_inst.job_instance_id 512 and jb.job_id = jb_inst.job_id 513 and jb_state.state = 'JOB_FAILURE' 514 and ( 515 (not (jb.type_desc ='dax' or jb.type_desc ='dag')) 516 or 517 ((jb.type_desc ='dax' or jb.type_desc ='dag') and jb_inst.subwf_id is NULL) 518 ) 519 """ 520 q = self.session.query(Job, JobInstance, Jobstate) 521 q = q.filter(Job.wf_id.in_(self._wfs)) 522 q = q.filter(Jobstate.job_instance_id == JobInstance.job_instance_id) 523 q = q.filter(Job.job_id == JobInstance.job_id) 524 q = q.filter(Jobstate.state == 'JOB_FAILURE') 525 # jobtype filtering 526 if not self._expand and self._get_job_filter() is not None: 527 q = q.filter(self._get_job_filter()) 528 else: 529 d_or_d = self._dax_or_dag_cond() 530 q = q.filter(or_(not_(d_or_d), and_(d_or_d, JobInstance.subwf_id == None))) 531 532 return q.count()
533 534
536 q = self.session.query(Invocation) 537 q = q.filter(Invocation.wf_id.in_(self._wfs)) 538 q = q.filter(Invocation.task_submit_seq >= 0) 539 return q
540
541 - def get_total_tasks_statistics(self):
542 """ 543 select count(*) from invocation as invoc where invoc.task_submit_seq >=0 and invoc.wf_id in ( 544 1,2,3 545 ) 546 """ 547 q = self._base_task_statistics_query() 548 return q.count()
549
551 """ 552 select count(*) as succeeded_tasks 553 from 554 invocation as invoc 555 where 556 invoc.wf_id in ( 557 1,2,3 558 ) 559 and invoc.exitcode = 0 560 and invoc.task_submit_seq >=0 561 """ 562 q = self._base_task_statistics_query() 563 q = q.filter(Invocation.exitcode == 0) 564 return q.count()
565
567 """ 568 select count(*) as failed_tasks 569 from 570 invocation as invoc 571 where 572 invoc.wf_id in ( 573 1,2,3 574 ) 575 and invoc.exitcode <> 0 576 and invoc.task_submit_seq >=0 577 """ 578 q = self._base_task_statistics_query() 579 q = q.filter(Invocation.exitcode != 0) 580 return q.count()
581 582 # 583 # Run statistics 584 # 585
586 - def get_workflow_wall_time(self):
587 """ 588 select ws.wf_id, 589 sum(case when (ws.state == 'WORKFLOW_TERMINATED') then ws.timestamp end) 590 - 591 sum (case when (ws.state == 'WORKFLOW_STARTED') then ws.timestamp end) as duration 592 from workflowstate ws 593 group by ws.wf_id 594 """ 595 q = self.session.query( 596 Workflowstate.wf_id, 597 ( 598 func.sum(case([(Workflowstate.state == 'WORKFLOW_TERMINATED', Workflowstate.timestamp)])) 599 - 600 func.sum(case([(Workflowstate.state == 'WORKFLOW_STARTED', Workflowstate.timestamp)])) 601 ).label('duration') 602 ).filter(Workflowstate.wf_id.in_(self._wfs)).group_by(Workflowstate.wf_id) 603 604 return q.all()
605
607 """ 608 select sum(remote_duration) from invocation as invoc 609 where invoc.task_submit_seq >=0 and invoc.wf_id in( 610 1,2,3 611 ) 612 """ 613 q = self.session.query(func.sum(Invocation.remote_duration)) 614 q = q.filter(Invocation.task_submit_seq >= 0) 615 q = q.filter(Invocation.wf_id.in_(self._wfs)) 616 return q.first()[0]
617
619 """ 620 select sum(local_duration) from job_instance as jb_inst , job as jb where 621 jb_inst.job_id = jb.job_id 622 and jb.wf_id in ( 623 1,2,3 624 ) 625 and ( 626 (not (jb.type_desc ='dax' or jb.type_desc ='dag')) 627 or 628 ((jb.type_desc ='dax' or jb.type_desc ='dag') and jb_inst.subwf_id is NULL) 629 ) 630 631 """ 632 q = self.session.query(func.sum(JobInstance.local_duration).label('wall_time')) 633 q = q.filter(JobInstance.job_id == Job.job_id) 634 q = q.filter(Job.wf_id.in_(self._wfs)) 635 if self._expand: 636 d_or_d = self._dax_or_dag_cond() 637 q = q.filter(or_(not_(d_or_d), and_(d_or_d, JobInstance.subwf_id == None))) 638 639 return q.first().wall_time
640 641 # 642 # Job Statistics 643 # 644
645 - def get_job_name(self):
646 """ 647 select jb.job_id, jb.exec_job_id as job_name 648 from 649 job as jb, 650 job_instance as jb_inst 651 where 652 jb_inst.job_id = jb.job_id 653 and jb.wf_id = 3 654 group by jb.job_id 655 """ 656 if self._expand: 657 return [] 658 q = self.session.query(Job.job_id, Job.exec_job_id.label('job_name')) 659 q = q.filter(Job.job_id == JobInstance.job_id) 660 q = q.filter(Job.wf_id.in_(self._wfs)).group_by(Job.job_id) 661 return q.all()
662
663 - def get_job_site(self):
664 """ 665 select job_id , group_concat(site) as sites from 666 ( 667 select DISTINCT jb.job_id as job_id , jb_inst.site as site 668 from 669 job as jb, 670 job_instance as jb_inst 671 where 672 jb.wf_id = 3 673 and jb_inst.job_id = jb.job_id 674 ) group by job_id 675 """ 676 if self._expand: 677 return [] 678 q = self.session.query(Job.job_id, func.group_concat(JobInstance.site).label('sites')) 679 q = q.filter(Job.wf_id.in_(self._wfs)) 680 q = q.filter(Job.job_id == JobInstance.job_id).group_by(Job.job_id) 681 return q.all()
682
683 - def get_job_kickstart(self):
684 """ 685 select jb.job_id, 686 jb.exec_job_id as job_name , 687 sum(remote_duration) as kickstart 688 from 689 job as jb, 690 invocation as invoc, 691 job_instance as jb_inst 692 where 693 jb_inst.job_id = jb.job_id 694 and jb.wf_id = 3 695 and invoc.wf_id =3 696 and invoc.task_submit_seq >=0 697 and invoc.job_instance_id = jb_inst.job_instance_id 698 group by jb.job_id 699 """ 700 if self._expand: 701 return [] 702 q = self.session.query(Job.job_id, Job.exec_job_id.label('job_name'), 703 func.sum(Invocation.remote_duration).label('kickstart')) 704 q = q.filter(Job.job_id == JobInstance.job_id) 705 q = q.filter(Job.wf_id.in_(self._wfs)) 706 q = q.filter(Invocation.wf_id.in_(self._wfs)) 707 q = q.filter(Invocation.task_submit_seq >= 0) 708 q = q.filter(Invocation.job_instance_id == JobInstance.job_instance_id) 709 q = q.group_by(Job.job_id, Job.exec_job_id).order_by(Job.job_id) 710 711 return q.all()
712
713 - def get_job_runtime(self):
714 """ 715 select jb.job_id, 716 sum(jb_inst.local_duration) as runtime 717 from 718 job as jb, 719 job_instance as jb_inst 720 where 721 jb_inst.job_id = jb.job_id 722 and jb.wf_id = 3 723 group by jb.job_id 724 """ 725 if self._expand: 726 return [] 727 q = self.session.query(Job.job_id, func.sum(JobInstance.local_duration).label('runtime')) 728 q = q.filter(Job.job_id == JobInstance.job_id) 729 q = q.filter(Job.wf_id.in_(self._wfs)) 730 q = q.group_by(Job.job_id).order_by(Job.job_id) 731 732 return q.all()
733
734 - def get_job_seqexec(self):
735 """ 736 select jb.job_id, 737 sum(jb_inst.cluster_duration) as seqexec 738 from 739 job as jb, 740 job_instance as jb_inst 741 where 742 jb_inst.job_id = jb.job_id 743 and jb.wf_id = 3 744 group by jb.job_id 745 """ 746 if self._expand: 747 return [] 748 q = self.session.query(Job.job_id, func.sum(JobInstance.cluster_duration).label('seqexec')) 749 q = q.filter(Job.job_id == JobInstance.job_id) 750 q = q.filter(Job.wf_id.in_(self._wfs)) 751 q = q.group_by(Job.job_id).order_by(Job.job_id) 752 753 return q.all()
754
755 - def get_job_seqexec_delay(self):
756 """ 757 Seqexec Delay is Seqexec - Kickstart calculated above. 758 759 select jb.job_id, 760 ( 761 ( 762 select sum(jb_inst.cluster_duration) 763 from 764 job_instance as jb_inst 765 where 766 jb_inst.job_id = jb.job_id 767 group by jb_inst.job_id 768 ) 769 - 770 ( 771 select sum(remote_duration) 772 from 773 invocation as invoc, 774 job_instance as jb_inst 775 where 776 jb_inst.job_id = jb.job_id 777 and invoc.wf_id =jb.wf_id 778 and invoc.task_submit_seq >=0 779 and invoc.job_instance_id = jb_inst.job_instance_id 780 group by jb_inst.job_id 781 ) 782 ) as seqexec_delay 783 from 784 job as jb 785 where jb.wf_id in (1,2,3) 786 and jb.clustered <>0 787 """ 788 if self._expand: 789 return [] 790 791 sq_1 = self.session.query(func.sum(JobInstance.cluster_duration)) 792 sq_1 = sq_1.filter(JobInstance.job_id == Job.job_id).correlate(Job) 793 sq_1 = sq_1.group_by(JobInstance.job_id).subquery().as_scalar() 794 795 sq_2 = self.session.query(func.sum(Invocation.remote_duration)) 796 sq_2 = sq_2.filter(JobInstance.job_id == Job.job_id).correlate(Job) 797 sq_2 = sq_2.filter(Invocation.wf_id == Job.wf_id) 798 sq_2 = sq_2.filter(Invocation.task_submit_seq >= 0) 799 sq_2 = sq_2.filter(Invocation.job_instance_id == JobInstance.job_instance_id) 800 sq_2 = sq_2.group_by(JobInstance.job_id).subquery().as_scalar() 801 802 q = self.session.query(Job.job_id, cast(sq_1 - sq_2, Float)) 803 q = q.filter(Job.wf_id.in_(self._wfs)) 804 q = q.filter(Job.clustered != 0) 805 q = q.order_by(Job.job_id) 806 807 return q.all()
808
809 - def get_condor_q_time(self):
810 """ 811 select job_id, job_name, sum(cQTime) as condorQTime from 812 ( 813 select jb.exec_job_id as job_name, jb.job_id as job_id, jb_inst.job_instance_id , 814 ( 815 (select min(timestamp) from jobstate where job_instance_id = jb_inst.job_instance_id and (state = 'GRID_SUBMIT' or state = 'GLOBUS_SUBMIT' or state = 'EXECUTE')) 816 - 817 (select timestamp from jobstate where job_instance_id = jb_inst.job_instance_id and state = 'SUBMIT' ) 818 ) as cQTime 819 from 820 job_instance as jb_inst, 821 job as jb 822 where jb_inst.job_id =jb.job_id 823 and jb.wf_id = 2 824 ) group by job_id 825 """ 826 if self._expand: 827 return [] 828 sq_1 = self.session.query(func.min(Jobstate.timestamp).label('ts')) 829 sq_1 = sq_1.filter(Jobstate.job_instance_id == JobInstance.job_instance_id) 830 sq_1 = sq_1.filter(or_(Jobstate.state == 'GRID_SUBMIT', 831 Jobstate.state == 'GLOBUS_SUBMIT', Jobstate.state == 'EXECUTE')).correlate(JobInstance) 832 sq_1 = sq_1.subquery().as_scalar() 833 834 sq_2 = self.session.query(Jobstate.timestamp.label('ts')) 835 sq_2 = sq_2.filter(Jobstate.job_instance_id == JobInstance.job_instance_id) 836 sq_2 = sq_2.filter(Jobstate.state == 'SUBMIT').correlate(JobInstance) 837 sq_2 = sq_2.subquery().as_scalar() 838 839 q = self.session.query(Job.exec_job_id.label('job_name'), Job.job_id, JobInstance.job_instance_id, 840 cast(sq_1 - sq_2, Float).label('cQTime')) 841 q = q.filter(JobInstance.job_id == Job.job_id) 842 q = q.filter(Job.wf_id.in_(self._wfs)) 843 q = q.order_by(Job.job_id).subquery() 844 845 main = self.session.query(q.c.job_id, q.c.job_name, func.sum(q.c.cQTime).label('condorQTime')) 846 main = main.group_by(q.c.job_id) 847 848 return main.all()
849
850 - def get_resource_delay(self):
851 """ 852 select job_id, job_name, sum(rTime) as resourceTime from 853 ( 854 select jb.exec_job_id as job_name, jb.job_id as job_id, jb_inst.job_instance_id , 855 ( 856 (select timestamp from jobstate where job_instance_id = jb_inst.job_instance_id and state = 'EXECUTE' ) 857 - 858 (select timestamp from jobstate where job_instance_id = jb_inst.job_instance_id and (state = 'GRID_SUBMIT' or state ='GLOBUS_SUBMIT')) 859 ) as rTime 860 from 861 job_instance as jb_inst, 862 job as jb 863 where jb_inst.job_id =jb.job_id 864 and jb.wf_id = 2 865 ) group by job_id 866 """ 867 if self._expand: 868 return [] 869 sq_1 = self.session.query(Jobstate.timestamp.label('ts')) 870 sq_1 = sq_1.filter(Jobstate.job_instance_id == JobInstance.job_instance_id) 871 sq_1 = sq_1.filter(Jobstate.state == 'EXECUTE').correlate(JobInstance) 872 sq_1 = sq_1.subquery().as_scalar() 873 874 sq_2 = self.session.query(Jobstate.timestamp.label('ts')) 875 sq_2 = sq_2.filter(Jobstate.job_instance_id == JobInstance.job_instance_id) 876 sq_2 = sq_2.filter(or_(Jobstate.state == 'GRID_SUBMIT', 877 Jobstate.state == 'GLOBUS_SUBMIT')).correlate(JobInstance) 878 sq_2 = sq_2.subquery().as_scalar() 879 880 q = self.session.query(Job.exec_job_id.label('job_name'), Job.job_id, JobInstance.job_instance_id, 881 cast(sq_1 - sq_2, Float).label('rTime')) 882 q = q.filter(JobInstance.job_id == Job.job_id) 883 q = q.filter(Job.wf_id.in_(self._wfs)) 884 q = q.order_by(Job.job_id).subquery() 885 886 main = self.session.query(q.c.job_id, q.c.job_name, func.sum(q.c.rTime).label('resourceTime')) 887 main = main.group_by(q.c.job_id) 888 889 return main.all()
890
891 - def get_dagman_delay(self):
892 """ 893 select jb.exec_job_id as job_name, 894 ( 895 ( 896 select min(timestamp) 897 from jobstate 898 where job_instance_id in 899 (select job_instance_id from job_instance as jb_inst where jb_inst.job_id = jb.job_id ) 900 and state ='SUBMIT' 901 ) 902 - 903 ( 904 select max(timestamp) 905 from jobstate 906 where job_instance_id in 907 (select job_instance_id from job_instance as jb_inst where jb_inst.job_id in 908 ( 909 select 910 parent.job_id as parent_job_id 911 from 912 job as parent, 913 job as child, 914 job_edge as edge 915 where 916 edge.wf_id = 2 917 and parent.wf_id = 2 918 and child.wf_id = 2 919 and child.job_id = jb.job_id 920 and edge.parent_exec_job_id like parent.exec_job_id 921 and edge.child_exec_job_id like child.exec_job_id 922 ) 923 ) 924 and (state = 'POST_SCRIPT_TERMINATED' or state ='JOB_TERMINATED') 925 ) 926 ) as dagmanDelay 927 from 928 job as jb where 929 jb.wf_id =2 930 """ 931 if self._expand: 932 return [] 933 # topmost nested queries 934 sq_1 = self.session.query(JobInstance.job_instance_id) 935 sq_1 = sq_1.filter(JobInstance.job_id == Job.job_id).correlate(Job) 936 sq_1 = sq_1.subquery() 937 938 sq_2 = self.session.query(func.min(Jobstate.timestamp)) 939 sq_2 = sq_2.filter(Jobstate.job_instance_id.in_(sq_1)) 940 sq_2 = sq_2.filter(Jobstate.state == 'SUBMIT').subquery().as_scalar() 941 942 # lower nested queries 943 Parent = orm.aliased(Job) 944 Child = orm.aliased(Job) 945 946 sq_3 = self.session.query(Parent.job_id.label('parent_job_id')) 947 sq_3 = sq_3.filter(JobEdge.wf_id.in_(self._wfs)) 948 sq_3 = sq_3.filter(Parent.wf_id.in_(self._wfs)) 949 sq_3 = sq_3.filter(Child.wf_id.in_(self._wfs)) 950 sq_3 = sq_3.filter(Child.job_id == Job.job_id).correlate(Job) 951 sq_3 = sq_3.filter(JobEdge.parent_exec_job_id.like(Parent.exec_job_id)) 952 sq_3 = sq_3.filter(JobEdge.child_exec_job_id.like(Child.exec_job_id)) 953 sq_3 = sq_3.subquery() 954 955 sq_4 = self.session.query(JobInstance.job_instance_id) 956 sq_4 = sq_4.filter(JobInstance.job_id.in_(sq_3)).subquery() 957 958 sq_5 = self.session.query(func.max(Jobstate.timestamp)) 959 sq_5 = sq_5.filter(Jobstate.job_instance_id.in_(sq_4)) 960 sq_5 = sq_5.filter(or_(Jobstate.state == 'POST_SCRIPT_TERMINATED', Jobstate.state == 'JOB_TERMINATED')) 961 sq_5 = sq_5.subquery().as_scalar() 962 963 q = self.session.query(Job.job_id, Job.exec_job_id.label('job_name'), 964 cast(sq_2 - sq_5, Float).label('dagmanDelay')) 965 966 q = q.filter(Job.wf_id.in_(self._wfs)) 967 968 return q.all()
969
970 - def get_post_time(self):
971 """ 972 select job_id, job_name, sum(pTime) as postTime from 973 ( 974 select jb.exec_job_id as job_name, jb.job_id as job_id, jb_inst.job_instance_id , 975 ( 976 (select timestamp from jobstate where job_instance_id = jb_inst.job_instance_id and state = 'POST_SCRIPT_TERMINATED') 977 - 978 (select max(timestamp) from jobstate where job_instance_id = jb_inst.job_instance_id and (state ='POST_SCRIPT_STARTED' or state ='JOB_TERMINATED')) 979 ) as pTime 980 from 981 job_instance as jb_inst, 982 job as jb 983 where jb_inst.job_id =jb.job_id 984 and jb.wf_id = 2 985 ) group by job_id 986 """ 987 if self._expand: 988 return [] 989 sq_1 = self.session.query(Jobstate.timestamp.label('ts')) 990 sq_1 = sq_1.filter(Jobstate.job_instance_id == JobInstance.job_instance_id) 991 sq_1 = sq_1.filter(Jobstate.state == 'POST_SCRIPT_TERMINATED').correlate(JobInstance) 992 sq_1 = sq_1.subquery().as_scalar() 993 994 sq_2 = self.session.query(func.max(Jobstate.timestamp).label('ts')) 995 sq_2 = sq_2.filter(Jobstate.job_instance_id == JobInstance.job_instance_id) 996 sq_2 = sq_2.filter(or_(Jobstate.state == 'POST_SCRIPT_STARTED', 997 Jobstate.state == 'JOB_TERMINATED')).correlate(JobInstance) 998 sq_2 = sq_2.subquery().as_scalar() 999 1000 q = self.session.query(Job.exec_job_id.label('job_name'), Job.job_id, JobInstance.job_instance_id, 1001 cast(sq_1 - sq_2, Float).label('pTime')) 1002 q = q.filter(JobInstance.job_id == Job.job_id) 1003 q = q.filter(Job.wf_id.in_(self._wfs)) 1004 q = q.order_by(Job.job_id).subquery() 1005 1006 main = self.session.query(q.c.job_id, q.c.job_name, func.sum(q.c.pTime).label('postTime')) 1007 main = main.group_by(q.c.job_id) 1008 1009 return main.all()
1010
1012 """ 1013 select transformation, count(*), 1014 min(remote_duration) , max(remote_duration) , 1015 avg(remote_duration) , sum(remote_duration) 1016 from invocation as invoc where invoc.wf_id = 3 group by transformation 1017 """ 1018 q = self.session.query(Invocation.transformation, 1019 func.count(Invocation.invocation_id).label('count'), 1020 func.min(Invocation.remote_duration).label('min'), 1021 func.max(Invocation.remote_duration).label('max'), 1022 func.avg(Invocation.remote_duration).label('avg'), 1023 func.sum(Invocation.remote_duration).label('sum')) 1024 q = q.filter(Invocation.wf_id.in_(self._wfs)) 1025 q = q.group_by(Invocation.transformation) 1026 1027 return q.all()
1028 1029 if __name__ == '__main__': 1030 pass 1031