Package netlogger ::
Package analysis ::
Package workflow ::
Module stampede_statistics
|
|
1 """
2 Library to generate statistics from the new Stampede 3.1 backend.
3
4 Usage::
5 stats = StampedeStatistics(connString='sqlite:///montage.db')
6 stats.initialize('unique_wf_uuid')
7 stats.set_job_filter('dax')
8 print stats.get_total_jobs_status()
9 print stats.get_total_jobs_statistics()
10 stats.set_job_filter('dag')
11 print stats.get_total_jobs_status()
12 print stats.get_total_jobs_statistics()
13 etc.
14
15 Constructor and initialize methods:
16
17 The constructor takes a required sqlalchemy connection string
18 as the first argument. The stats class will default to returning
19 data in the "expanded workflow" mode. To change this behavior
20 and only analyize a single workflow set the optional arg:
21
22 expand_workflow = False
23
24 along with the connection string argument.
25
26 The initialize method is called with a single argument - the wf_uuid
27 of the desired "root workflow" whether returning data in expanded
28 mode or not. The method will return True or False if a query
29 exception is raised so the programmer can test for success before
30 calling the subsequent query methods. This method is intended
31 to be called once per object.
32
33 Job filtering:
34
35 Jobs can be filtered using any of the strings in the jobtype ENUM,
36 with the addition of the values 'all' and 'nonsub' which will
37 return all jobs and non-subworkflow jobs respectively. If the
38 filter is not explicitly set, it will default to the 'all' mode.
39
40 The desired filter can be set with the set_job_filter() method. After
41 setting this method, all subsequent calls to the query methods will
42 return results according to the filter. This can be set and reset
43 as many times as the user desires. There is an example of re/setting
44 the job filter in the usage section above. The query methods
45 will return different values after the filter is re/set.
46
47 Return values from methods:
48
49 The return value types will vary from method to method. Most of
50 the methods will return a single integer or floating point number.
51
52 Methods which return rows from the DB (rather than just a number)
53 will return a list which can be interacted with in one of two
54 ways - either by array index (list of tuples) or by a named attr
55 (list of objects). The two following methods of interacting with
56 the same query results will both produce the same output:
57
58 Example::
59 for row in s.get_job_kickstart():
60 print row[0], row[1], row[2]
61 print row.job_id, row.job_name, row.kickstart
62
63 Either syntax will work. When using the named attribute method, the
64 attributes are the names of the columns/aliases in the SELECT
65 stanza of the query. If the row returned by the method is printed,
66 it will display as a tuple of results per row.
67
68 Methods::
69 get_sub_workflow_ids
70 get_descendant_workflow_ids
71 get_total_jobs_status
72 get_total_succeeded_jobs_status
73 get_total_failed_jobs_status
74 get_total_unknown_jobs_status
75 get_total_tasks_status
76 get_total_succeeded_tasks_status
77 get_total_failed_tasks_status
78 get_total_jobs_statistics
79 get_total_succeeded_jobs_statistics
80 get_total_failed_jobs_statistics
81 get_total_tasks_statistics
82 get_total_succeeded_tasks_statistics
83 get_total_failed_tasks_statistics
84 get_workflow_wall_time
85 get_workflow_cum_job_wall_time
86 get_submit_side_job_wall_time
87 get_job_name
88 get_job_site
89 get_job_kickstart
90 get_job_runtime
91 get_job_seqexec
92 get_job_seqexec_delay
93 get_condor_q_time
94 get_resource_delay
95 get_dagman_delay
96 get_post_time
97 get_transformation_statistics
98
99 Methods listed in order of query list on wiki.
100
101 https://confluence.pegasus.isi.edu/display/pegasus/Pegasus+statistics+python+version
102 """
103 __rcsid__ = "$Id: stampede_statistics.py 28074 2011-06-09 15:50:35Z mgoode $"
104 __author__ = "Monte Goode"
105
106 import decimal
107
108 from netlogger.analysis.modules._base import SQLAlchemyInit
109 from netlogger.analysis.schema.stampede_schema import *
110 from netlogger.nllog import DoesLogging, get_logger
111
113 - def __init__(self, connString=None, expand_workflow=True):
114 if connString is None:
115 raise ValueError("connString is required")
116 DoesLogging.__init__(self)
117 SQLAlchemyInit.__init__(self, connString, initializeToPegasusDB)
118
119 self._expand = expand_workflow
120
121 self._root_wf_id = None
122 self._root_wf_uuid = None
123 self._filter_mode = None
124
125 self._wfs = []
126 pass
127
129 self.log.debug('initialize')
130 self._root_wf_uuid = root_wf_uuid
131 q = self.session.query(Workflow.wf_id).filter(Workflow.wf_uuid == self._root_wf_uuid)
132
133 try:
134 self._root_wf_id = q.one().wf_id
135 except orm.exc.MultipleResultsFound, e:
136 self.log.error('initialize',
137 msg='Multiple results found for wf_uuid: %s' % root_wf_uuid)
138 return False
139 except orm.exc.NoResultFound, e:
140 self.log.error('initialize',
141 msg='No results found for wf_uuid: %s' % root_wf_uuid)
142 return False
143
144 if self._expand:
145 q = self.session.query(Workflow.wf_id).filter(Workflow.root_wf_id == self._root_wf_id)
146 for row in q.all():
147 self._wfs.append(row.wf_id)
148 else:
149 self._wfs.append(self._root_wf_id)
150
151 self.set_job_filter()
152 return True
153
155 modes = ['all', 'nonsub', 'dax', 'dag', 'compute', 'stage-in-tx',
156 'stage-out-tx', 'registration', 'inter-site-tx', 'create-dir',
157 'staged-compute', 'cleanup', 'chmod']
158 try:
159 modes.index(filter)
160 self._filter_mode = filter
161 self.log.debug('set_job_filter', msg='Setting filter to: %s' % filter)
162 except:
163 self._filter_mode = 'all'
164 self.log.error('set_job_filter', msg='Unknown job filter %s - setting to any' % filter)
165
166
167
168
169
171 """
172 Returns info on child workflows only.
173 """
174 q = self.session.query(Workflow.wf_id, Workflow.wf_uuid)
175 q = q.filter(Workflow.parent_wf_id == self._root_wf_id)
176 return q.all()
177
179 q = self.session.query(Workflow.wf_id, Workflow.wf_uuid)
180 q = q.filter(Workflow.root_wf_id == self._root_wf_id)
181 q = q.filter(Workflow.wf_id != self._root_wf_id)
182 return q.all()
183
184
185
186
187
189 filters = {
190 'all': None,
191 'nonsub': not_(self._dax_or_dag_cond()),
192 'dax': Job.type_desc == 'dax',
193 'dag': Job.type_desc == 'dag',
194 'compute': Job.type_desc == 'compute',
195 'stage-in-tx': Job.type_desc == 'stage-in-tx',
196 'stage-out-tx': Job.type_desc == 'stage-out-tx',
197 'registration': Job.type_desc == 'registration',
198 'inter-site-tx': Job.type_desc == 'inter-site-tx',
199 'create-dir': Job.type_desc == 'create-dir',
200 'staged-compute': Job.type_desc == 'staged-compute',
201 'cleanup': Job.type_desc == 'cleanup',
202 'chmod': Job.type_desc == 'chmod',
203 }
204 return filters[self._filter_mode]
205
207 """
208 Creates the following subquery that is used in
209 several queries:
210 and jb_inst.job_submit_seq = (
211 select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id
212 )
213 """
214 JobInstanceSub = orm.aliased(JobInstance)
215 sub_q = self.session.query(func.max(JobInstanceSub.job_submit_seq).label('max_id'))
216 sub_q = sub_q.filter(JobInstanceSub.job_id == JobInstance.job_id).correlate(JobInstance)
217 sub_q = sub_q.group_by(JobInstanceSub.job_id).subquery()
218 return sub_q
219
221 return or_(Job.type_desc == 'dax', Job.type_desc == 'dag')
222
224 """
225 select
226 (
227 select count(*) from job as jb where jb.wf_id in (1,2,3) and not
228 (jb.type_desc = 'dax'or jb.type_desc = 'dag')
229 )
230 +
231 (
232 select count(*) from
233 (
234 select jb_inst.job_id from job_instance as jb_inst , job as jb
235 where jb.wf_id in (1,2,3)
236 and jb_inst.job_id = jb.job_id
237 and (jb.type_desc ='dax' or jb.type_desc ='dag' )
238 and jb_inst.subwf_id is null
239 and jb_inst.job_submit_seq = (
240 select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id
241 )
242 )
243 ) as total_jobs
244
245 """
246 if not self._expand:
247 q = self.session.query(Job)
248 q = q.filter(Job.wf_id.in_(self._wfs))
249 if self._get_job_filter() is not None:
250 q = q.filter(self._get_job_filter())
251 return q.count()
252 else:
253 q = self.session.query(Job.job_id)
254 q = q.filter(Job.wf_id.in_(self._wfs))
255 q = q.filter(not_(self._dax_or_dag_cond()))
256 job_count = q.count()
257
258 sub_q = self._max_job_seq_subquery()
259
260 q = self.session.query(JobInstance.job_id)
261 q = q.filter(Job.wf_id.in_(self._wfs))
262 q = q.filter(JobInstance.job_id == Job.job_id)
263 q = q.filter(JobInstance.subwf_id == None)
264 q = q.filter(JobInstance.job_submit_seq == sub_q.as_scalar())
265 q = q.filter(self._dax_or_dag_cond())
266 job_instance_count = q.count()
267
268 return job_count + job_instance_count
269
271 """
272 select DISTINCT count(jb.job_id)
273 from
274 job as jb,
275 job_instance as jb_inst,
276 jobstate as jb_state
277 where jb.wf_id in(
278 1,2,3
279 )
280 and jb.job_id = jb_inst.job_id
281 and not (jb.type_desc ='dax' or jb.type_desc ='dag')
282 and jb_inst.job_instance_id = jb_state.job_instance_id
283 and jb_state.state ='JOB_SUCCESS'
284 """
285 q = self.session.query(Job.job_id).distinct()
286 q = q.filter(Job.wf_id.in_(self._wfs))
287 q = q.filter(Job.job_id == JobInstance.job_id)
288 q = q.filter(JobInstance.job_instance_id == Jobstate.job_instance_id)
289 q = q.filter(Jobstate.state == 'JOB_SUCCESS')
290
291 if not self._expand and self._get_job_filter() is not None:
292 q = q.filter(self._get_job_filter())
293 else:
294 q = q.filter(not_(self._dax_or_dag_cond()))
295
296 return q.count()
297
299 """
300 select count(*) from
301 (
302 select jb_inst.job_instance_id
303 from job as jb, job_instance as jb_inst , jobstate as jb_state
304 where jb_inst.job_submit_seq = (
305 select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id
306 )
307 and jb.wf_id in (1,2,3)
308 and jb.job_id = jb_inst.job_id
309 and jb_inst.job_instance_id = jb_state.job_instance_id
310 and (
311 (not (jb.type_desc ='dax' or jb.type_desc ='dag'))
312 or
313 ((jb.type_desc ='dax' or jb.type_desc ='dag') and jb_inst.subwf_id is NULL)
314 )
315 and jb_state.state in ('JOB_FAILURE')
316 )
317 """
318 sub_q = self._max_job_seq_subquery()
319
320 q = self.session.query(JobInstance.job_instance_id)
321 q = q.filter(JobInstance.job_submit_seq == sub_q.as_scalar())
322 q = q.filter(Job.wf_id.in_(self._wfs))
323 q = q.filter(Job.job_id == JobInstance.job_id)
324 q = q.filter(JobInstance.job_instance_id == Jobstate.job_instance_id)
325 q = q.filter(Jobstate.state.in_(['JOB_FAILURE']))
326
327 if not self._expand and self._get_job_filter() is not None:
328 q = q.filter(self._get_job_filter())
329 else:
330 d_or_d = self._dax_or_dag_cond()
331 q = q.filter(or_(not_(d_or_d), and_(d_or_d, JobInstance.subwf_id == None)))
332
333 return q.count()
334
336 """
337 The states arg is a list of strings.
338 Returns an appropriate subquery.
339 """
340 q = self.session.query(Jobstate.job_instance_id)
341 q = q.filter(Jobstate.job_instance_id == JobInstance.job_instance_id).correlate(JobInstance)
342 q = q.filter(Jobstate.state.in_(states)).subquery()
343 return q
344
346 """
347 select count(*) from job_instance jb_inst , job as jb
348 where jb_inst.job_submit_seq = (
349 select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id
350 )
351 and jb_inst.job_instance_id in
352 (
353 select js.job_instance_id from jobstate as js
354 where js.job_instance_id = jb_inst.job_instance_id
355 and js.state = 'SUBMIT'
356 )
357 and jb_inst.job_instance_id not in
358 (
359 select js.job_instance_id from jobstate as js
360 where js.job_instance_id = jb_inst.job_instance_id
361 and js.state in ( 'JOB_SUCCESS', 'JOB_FAILURE')
362 )
363 and jb_inst.job_id = jb.job_id
364 and jb.wf_id in (
365 1,2,3
366 )
367 and not (jb.type_desc ='dax' or jb.type_desc ='dag' )
368 """
369 subq_1 = self._query_jobstate_for_instance(['SUBMIT'])
370 subq_2 = self._query_jobstate_for_instance(['JOB_SUCCESS', 'JOB_FAILURE'])
371
372 maxsub_q = self._max_job_seq_subquery()
373
374 q = self.session.query(Job.job_id)
375 q = q.filter(JobInstance.job_submit_seq == maxsub_q.as_scalar())
376 q = q.filter(JobInstance.job_instance_id.in_(subq_1))
377 q = q.filter(not_(JobInstance.job_instance_id.in_(subq_2)))
378 q = q.filter(Job.job_id == JobInstance.job_id)
379 q = q.filter(Job.wf_id.in_(self._wfs))
380
381 if not self._expand and self._get_job_filter() is not None:
382 q = q.filter(self._get_job_filter())
383 else:
384 q = q.filter(not_(self._dax_or_dag_cond()))
385
386 return q.count()
387
389 """
390 select count(*) from task where wf_id in (
391 1,2,3
392 )
393 """
394 return self.session.query(Task).filter(Task.wf_id.in_(self._wfs)).count()
395
397 """
398 select count(*) from
399 task as tk,
400 job_instance as jb_inst,
401 job as jb,
402 invocation as invoc
403 where invoc.wf_id in (
404 1,2,3
405 )
406 and jb_inst.job_submit_seq = (
407 select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id
408 )
409 and tk.wf_id in (
410 1,2,3
411 )
412 and jb.job_id = jb_inst.job_id
413 and jb_inst.job_instance_id = invoc.job_instance_id
414 and tk.abs_task_id = invoc.abs_task_id
415 and tk.wf_id = invoc.wf_id
416 and invoc.exitcode = 0
417 """
418 sub_q = self._max_job_seq_subquery()
419
420 q = self.session.query(Task.task_id)
421 q = q.filter(Invocation.wf_id.in_(self._wfs))
422 q = q.filter(JobInstance.job_submit_seq == sub_q.as_scalar())
423 q = q.filter(Task.wf_id.in_(self._wfs))
424 q = q.filter(Job.job_id == JobInstance.job_id)
425 q = q.filter(JobInstance.job_instance_id == Invocation.job_instance_id)
426 q = q.filter(Task.abs_task_id == Invocation.abs_task_id)
427 q = q.filter(Task.wf_id == Invocation.wf_id)
428 return q
429
434
439
440
441
442
443
445 """
446 select count(*) as total_jobs
447 from
448 job_instance as jb_inst ,
449 job as jb
450 where
451 jb_inst.job_id = jb.job_id
452 and jb.wf_id in (
453 1,2,3
454 )
455 and (
456 (not (jb.type_desc ='dax' or jb.type_desc ='dag'))
457 or
458 ((jb.type_desc ='dax' or jb.type_desc ='dag') and jb_inst.subwf_id is NULL)
459 )
460 """
461 q = self.session.query(Job, JobInstance)
462 q = q.filter(Job.job_id == JobInstance.job_id)
463 q = q.filter(Job.wf_id.in_(self._wfs))
464
465 if not self._expand and self._get_job_filter() is not None:
466 q = q.filter(self._get_job_filter())
467 else:
468 d_or_d = self._dax_or_dag_cond()
469 q = q.filter(or_(not_(d_or_d), and_(d_or_d, JobInstance.subwf_id == None)))
470
471 return q.count()
472
474 """
475 select DISTINCT count(jb.job_id)
476 from
477 job as jb,
478 job_instance as jb_inst,
479 jobstate as jb_state
480 where jb.wf_id in(
481 1,2,3
482 )
483 and jb.job_id = jb_inst.job_id
484 and not (jb.type_desc ='dax' or jb.type_desc ='dag')
485 and jb_inst.job_instance_id = jb_state.job_instance_id
486 and jb_state.state ='JOB_SUCCESS'
487 """
488 q = self.session.query(Job.job_id).distinct()
489 q = q.filter(Job.wf_id.in_(self._wfs))
490 q = q.filter(Job.job_id == JobInstance.job_id)
491 q = q.filter(JobInstance.job_instance_id == Jobstate.job_instance_id)
492 q = q.filter(Jobstate.state == 'JOB_SUCCESS')
493
494 if not self._expand and self._get_job_filter() is not None:
495 q = q.filter(self._get_job_filter())
496 else:
497 q = q.filter(not_(self._dax_or_dag_cond()))
498
499 return q.count()
500
502 """
503 select count(*) as job_failure
504 from
505 jobstate as jb_state ,
506 job_instance jb_inst,
507 job as jb
508 where jb.wf_id in(
509 1,2,3
510 )
511 and jb_state.job_instance_id = jb_inst.job_instance_id
512 and jb.job_id = jb_inst.job_id
513 and jb_state.state = 'JOB_FAILURE'
514 and (
515 (not (jb.type_desc ='dax' or jb.type_desc ='dag'))
516 or
517 ((jb.type_desc ='dax' or jb.type_desc ='dag') and jb_inst.subwf_id is NULL)
518 )
519 """
520 q = self.session.query(Job, JobInstance, Jobstate)
521 q = q.filter(Job.wf_id.in_(self._wfs))
522 q = q.filter(Jobstate.job_instance_id == JobInstance.job_instance_id)
523 q = q.filter(Job.job_id == JobInstance.job_id)
524 q = q.filter(Jobstate.state == 'JOB_FAILURE')
525
526 if not self._expand and self._get_job_filter() is not None:
527 q = q.filter(self._get_job_filter())
528 else:
529 d_or_d = self._dax_or_dag_cond()
530 q = q.filter(or_(not_(d_or_d), and_(d_or_d, JobInstance.subwf_id == None)))
531
532 return q.count()
533
534
536 q = self.session.query(Invocation)
537 q = q.filter(Invocation.wf_id.in_(self._wfs))
538 q = q.filter(Invocation.task_submit_seq >= 0)
539 return q
540
542 """
543 select count(*) from invocation as invoc where invoc.task_submit_seq >=0 and invoc.wf_id in (
544 1,2,3
545 )
546 """
547 q = self._base_task_statistics_query()
548 return q.count()
549
551 """
552 select count(*) as succeeded_tasks
553 from
554 invocation as invoc
555 where
556 invoc.wf_id in (
557 1,2,3
558 )
559 and invoc.exitcode = 0
560 and invoc.task_submit_seq >=0
561 """
562 q = self._base_task_statistics_query()
563 q = q.filter(Invocation.exitcode == 0)
564 return q.count()
565
567 """
568 select count(*) as failed_tasks
569 from
570 invocation as invoc
571 where
572 invoc.wf_id in (
573 1,2,3
574 )
575 and invoc.exitcode <> 0
576 and invoc.task_submit_seq >=0
577 """
578 q = self._base_task_statistics_query()
579 q = q.filter(Invocation.exitcode != 0)
580 return q.count()
581
582
583
584
585
587 """
588 select ws.wf_id,
589 sum(case when (ws.state == 'WORKFLOW_TERMINATED') then ws.timestamp end)
590 -
591 sum (case when (ws.state == 'WORKFLOW_STARTED') then ws.timestamp end) as duration
592 from workflowstate ws
593 group by ws.wf_id
594 """
595 q = self.session.query(
596 Workflowstate.wf_id,
597 (
598 func.sum(case([(Workflowstate.state == 'WORKFLOW_TERMINATED', Workflowstate.timestamp)]))
599 -
600 func.sum(case([(Workflowstate.state == 'WORKFLOW_STARTED', Workflowstate.timestamp)]))
601 ).label('duration')
602 ).filter(Workflowstate.wf_id.in_(self._wfs)).group_by(Workflowstate.wf_id)
603
604 return q.all()
605
607 """
608 select sum(remote_duration) from invocation as invoc
609 where invoc.task_submit_seq >=0 and invoc.wf_id in(
610 1,2,3
611 )
612 """
613 q = self.session.query(func.sum(Invocation.remote_duration))
614 q = q.filter(Invocation.task_submit_seq >= 0)
615 q = q.filter(Invocation.wf_id.in_(self._wfs))
616 return q.first()[0]
617
619 """
620 select sum(local_duration) from job_instance as jb_inst , job as jb where
621 jb_inst.job_id = jb.job_id
622 and jb.wf_id in (
623 1,2,3
624 )
625 and (
626 (not (jb.type_desc ='dax' or jb.type_desc ='dag'))
627 or
628 ((jb.type_desc ='dax' or jb.type_desc ='dag') and jb_inst.subwf_id is NULL)
629 )
630
631 """
632 q = self.session.query(func.sum(JobInstance.local_duration).label('wall_time'))
633 q = q.filter(JobInstance.job_id == Job.job_id)
634 q = q.filter(Job.wf_id.in_(self._wfs))
635 if self._expand:
636 d_or_d = self._dax_or_dag_cond()
637 q = q.filter(or_(not_(d_or_d), and_(d_or_d, JobInstance.subwf_id == None)))
638
639 return q.first().wall_time
640
641
642
643
644
646 """
647 select jb.job_id, jb.exec_job_id as job_name
648 from
649 job as jb,
650 job_instance as jb_inst
651 where
652 jb_inst.job_id = jb.job_id
653 and jb.wf_id = 3
654 group by jb.job_id
655 """
656 if self._expand:
657 return []
658 q = self.session.query(Job.job_id, Job.exec_job_id.label('job_name'))
659 q = q.filter(Job.job_id == JobInstance.job_id)
660 q = q.filter(Job.wf_id.in_(self._wfs)).group_by(Job.job_id)
661 return q.all()
662
664 """
665 select job_id , group_concat(site) as sites from
666 (
667 select DISTINCT jb.job_id as job_id , jb_inst.site as site
668 from
669 job as jb,
670 job_instance as jb_inst
671 where
672 jb.wf_id = 3
673 and jb_inst.job_id = jb.job_id
674 ) group by job_id
675 """
676 if self._expand:
677 return []
678 q = self.session.query(Job.job_id, func.group_concat(JobInstance.site).label('sites'))
679 q = q.filter(Job.wf_id.in_(self._wfs))
680 q = q.filter(Job.job_id == JobInstance.job_id).group_by(Job.job_id)
681 return q.all()
682
684 """
685 select jb.job_id,
686 jb.exec_job_id as job_name ,
687 sum(remote_duration) as kickstart
688 from
689 job as jb,
690 invocation as invoc,
691 job_instance as jb_inst
692 where
693 jb_inst.job_id = jb.job_id
694 and jb.wf_id = 3
695 and invoc.wf_id =3
696 and invoc.task_submit_seq >=0
697 and invoc.job_instance_id = jb_inst.job_instance_id
698 group by jb.job_id
699 """
700 if self._expand:
701 return []
702 q = self.session.query(Job.job_id, Job.exec_job_id.label('job_name'),
703 func.sum(Invocation.remote_duration).label('kickstart'))
704 q = q.filter(Job.job_id == JobInstance.job_id)
705 q = q.filter(Job.wf_id.in_(self._wfs))
706 q = q.filter(Invocation.wf_id.in_(self._wfs))
707 q = q.filter(Invocation.task_submit_seq >= 0)
708 q = q.filter(Invocation.job_instance_id == JobInstance.job_instance_id)
709 q = q.group_by(Job.job_id, Job.exec_job_id).order_by(Job.job_id)
710
711 return q.all()
712
714 """
715 select jb.job_id,
716 sum(jb_inst.local_duration) as runtime
717 from
718 job as jb,
719 job_instance as jb_inst
720 where
721 jb_inst.job_id = jb.job_id
722 and jb.wf_id = 3
723 group by jb.job_id
724 """
725 if self._expand:
726 return []
727 q = self.session.query(Job.job_id, func.sum(JobInstance.local_duration).label('runtime'))
728 q = q.filter(Job.job_id == JobInstance.job_id)
729 q = q.filter(Job.wf_id.in_(self._wfs))
730 q = q.group_by(Job.job_id).order_by(Job.job_id)
731
732 return q.all()
733
735 """
736 select jb.job_id,
737 sum(jb_inst.cluster_duration) as seqexec
738 from
739 job as jb,
740 job_instance as jb_inst
741 where
742 jb_inst.job_id = jb.job_id
743 and jb.wf_id = 3
744 group by jb.job_id
745 """
746 if self._expand:
747 return []
748 q = self.session.query(Job.job_id, func.sum(JobInstance.cluster_duration).label('seqexec'))
749 q = q.filter(Job.job_id == JobInstance.job_id)
750 q = q.filter(Job.wf_id.in_(self._wfs))
751 q = q.group_by(Job.job_id).order_by(Job.job_id)
752
753 return q.all()
754
756 """
757 Seqexec Delay is Seqexec - Kickstart calculated above.
758
759 select jb.job_id,
760 (
761 (
762 select sum(jb_inst.cluster_duration)
763 from
764 job_instance as jb_inst
765 where
766 jb_inst.job_id = jb.job_id
767 group by jb_inst.job_id
768 )
769 -
770 (
771 select sum(remote_duration)
772 from
773 invocation as invoc,
774 job_instance as jb_inst
775 where
776 jb_inst.job_id = jb.job_id
777 and invoc.wf_id =jb.wf_id
778 and invoc.task_submit_seq >=0
779 and invoc.job_instance_id = jb_inst.job_instance_id
780 group by jb_inst.job_id
781 )
782 ) as seqexec_delay
783 from
784 job as jb
785 where jb.wf_id in (1,2,3)
786 and jb.clustered <>0
787 """
788 if self._expand:
789 return []
790
791 sq_1 = self.session.query(func.sum(JobInstance.cluster_duration))
792 sq_1 = sq_1.filter(JobInstance.job_id == Job.job_id).correlate(Job)
793 sq_1 = sq_1.group_by(JobInstance.job_id).subquery().as_scalar()
794
795 sq_2 = self.session.query(func.sum(Invocation.remote_duration))
796 sq_2 = sq_2.filter(JobInstance.job_id == Job.job_id).correlate(Job)
797 sq_2 = sq_2.filter(Invocation.wf_id == Job.wf_id)
798 sq_2 = sq_2.filter(Invocation.task_submit_seq >= 0)
799 sq_2 = sq_2.filter(Invocation.job_instance_id == JobInstance.job_instance_id)
800 sq_2 = sq_2.group_by(JobInstance.job_id).subquery().as_scalar()
801
802 q = self.session.query(Job.job_id, cast(sq_1 - sq_2, Float))
803 q = q.filter(Job.wf_id.in_(self._wfs))
804 q = q.filter(Job.clustered != 0)
805 q = q.order_by(Job.job_id)
806
807 return q.all()
808
810 """
811 select job_id, job_name, sum(cQTime) as condorQTime from
812 (
813 select jb.exec_job_id as job_name, jb.job_id as job_id, jb_inst.job_instance_id ,
814 (
815 (select min(timestamp) from jobstate where job_instance_id = jb_inst.job_instance_id and (state = 'GRID_SUBMIT' or state = 'GLOBUS_SUBMIT' or state = 'EXECUTE'))
816 -
817 (select timestamp from jobstate where job_instance_id = jb_inst.job_instance_id and state = 'SUBMIT' )
818 ) as cQTime
819 from
820 job_instance as jb_inst,
821 job as jb
822 where jb_inst.job_id =jb.job_id
823 and jb.wf_id = 2
824 ) group by job_id
825 """
826 if self._expand:
827 return []
828 sq_1 = self.session.query(func.min(Jobstate.timestamp).label('ts'))
829 sq_1 = sq_1.filter(Jobstate.job_instance_id == JobInstance.job_instance_id)
830 sq_1 = sq_1.filter(or_(Jobstate.state == 'GRID_SUBMIT',
831 Jobstate.state == 'GLOBUS_SUBMIT', Jobstate.state == 'EXECUTE')).correlate(JobInstance)
832 sq_1 = sq_1.subquery().as_scalar()
833
834 sq_2 = self.session.query(Jobstate.timestamp.label('ts'))
835 sq_2 = sq_2.filter(Jobstate.job_instance_id == JobInstance.job_instance_id)
836 sq_2 = sq_2.filter(Jobstate.state == 'SUBMIT').correlate(JobInstance)
837 sq_2 = sq_2.subquery().as_scalar()
838
839 q = self.session.query(Job.exec_job_id.label('job_name'), Job.job_id, JobInstance.job_instance_id,
840 cast(sq_1 - sq_2, Float).label('cQTime'))
841 q = q.filter(JobInstance.job_id == Job.job_id)
842 q = q.filter(Job.wf_id.in_(self._wfs))
843 q = q.order_by(Job.job_id).subquery()
844
845 main = self.session.query(q.c.job_id, q.c.job_name, func.sum(q.c.cQTime).label('condorQTime'))
846 main = main.group_by(q.c.job_id)
847
848 return main.all()
849
851 """
852 select job_id, job_name, sum(rTime) as resourceTime from
853 (
854 select jb.exec_job_id as job_name, jb.job_id as job_id, jb_inst.job_instance_id ,
855 (
856 (select timestamp from jobstate where job_instance_id = jb_inst.job_instance_id and state = 'EXECUTE' )
857 -
858 (select timestamp from jobstate where job_instance_id = jb_inst.job_instance_id and (state = 'GRID_SUBMIT' or state ='GLOBUS_SUBMIT'))
859 ) as rTime
860 from
861 job_instance as jb_inst,
862 job as jb
863 where jb_inst.job_id =jb.job_id
864 and jb.wf_id = 2
865 ) group by job_id
866 """
867 if self._expand:
868 return []
869 sq_1 = self.session.query(Jobstate.timestamp.label('ts'))
870 sq_1 = sq_1.filter(Jobstate.job_instance_id == JobInstance.job_instance_id)
871 sq_1 = sq_1.filter(Jobstate.state == 'EXECUTE').correlate(JobInstance)
872 sq_1 = sq_1.subquery().as_scalar()
873
874 sq_2 = self.session.query(Jobstate.timestamp.label('ts'))
875 sq_2 = sq_2.filter(Jobstate.job_instance_id == JobInstance.job_instance_id)
876 sq_2 = sq_2.filter(or_(Jobstate.state == 'GRID_SUBMIT',
877 Jobstate.state == 'GLOBUS_SUBMIT')).correlate(JobInstance)
878 sq_2 = sq_2.subquery().as_scalar()
879
880 q = self.session.query(Job.exec_job_id.label('job_name'), Job.job_id, JobInstance.job_instance_id,
881 cast(sq_1 - sq_2, Float).label('rTime'))
882 q = q.filter(JobInstance.job_id == Job.job_id)
883 q = q.filter(Job.wf_id.in_(self._wfs))
884 q = q.order_by(Job.job_id).subquery()
885
886 main = self.session.query(q.c.job_id, q.c.job_name, func.sum(q.c.rTime).label('resourceTime'))
887 main = main.group_by(q.c.job_id)
888
889 return main.all()
890
892 """
893 select jb.exec_job_id as job_name,
894 (
895 (
896 select min(timestamp)
897 from jobstate
898 where job_instance_id in
899 (select job_instance_id from job_instance as jb_inst where jb_inst.job_id = jb.job_id )
900 and state ='SUBMIT'
901 )
902 -
903 (
904 select max(timestamp)
905 from jobstate
906 where job_instance_id in
907 (select job_instance_id from job_instance as jb_inst where jb_inst.job_id in
908 (
909 select
910 parent.job_id as parent_job_id
911 from
912 job as parent,
913 job as child,
914 job_edge as edge
915 where
916 edge.wf_id = 2
917 and parent.wf_id = 2
918 and child.wf_id = 2
919 and child.job_id = jb.job_id
920 and edge.parent_exec_job_id like parent.exec_job_id
921 and edge.child_exec_job_id like child.exec_job_id
922 )
923 )
924 and (state = 'POST_SCRIPT_TERMINATED' or state ='JOB_TERMINATED')
925 )
926 ) as dagmanDelay
927 from
928 job as jb where
929 jb.wf_id =2
930 """
931 if self._expand:
932 return []
933
934 sq_1 = self.session.query(JobInstance.job_instance_id)
935 sq_1 = sq_1.filter(JobInstance.job_id == Job.job_id).correlate(Job)
936 sq_1 = sq_1.subquery()
937
938 sq_2 = self.session.query(func.min(Jobstate.timestamp))
939 sq_2 = sq_2.filter(Jobstate.job_instance_id.in_(sq_1))
940 sq_2 = sq_2.filter(Jobstate.state == 'SUBMIT').subquery().as_scalar()
941
942
943 Parent = orm.aliased(Job)
944 Child = orm.aliased(Job)
945
946 sq_3 = self.session.query(Parent.job_id.label('parent_job_id'))
947 sq_3 = sq_3.filter(JobEdge.wf_id.in_(self._wfs))
948 sq_3 = sq_3.filter(Parent.wf_id.in_(self._wfs))
949 sq_3 = sq_3.filter(Child.wf_id.in_(self._wfs))
950 sq_3 = sq_3.filter(Child.job_id == Job.job_id).correlate(Job)
951 sq_3 = sq_3.filter(JobEdge.parent_exec_job_id.like(Parent.exec_job_id))
952 sq_3 = sq_3.filter(JobEdge.child_exec_job_id.like(Child.exec_job_id))
953 sq_3 = sq_3.subquery()
954
955 sq_4 = self.session.query(JobInstance.job_instance_id)
956 sq_4 = sq_4.filter(JobInstance.job_id.in_(sq_3)).subquery()
957
958 sq_5 = self.session.query(func.max(Jobstate.timestamp))
959 sq_5 = sq_5.filter(Jobstate.job_instance_id.in_(sq_4))
960 sq_5 = sq_5.filter(or_(Jobstate.state == 'POST_SCRIPT_TERMINATED', Jobstate.state == 'JOB_TERMINATED'))
961 sq_5 = sq_5.subquery().as_scalar()
962
963 q = self.session.query(Job.job_id, Job.exec_job_id.label('job_name'),
964 cast(sq_2 - sq_5, Float).label('dagmanDelay'))
965
966 q = q.filter(Job.wf_id.in_(self._wfs))
967
968 return q.all()
969
970 - def get_post_time(self):
971 """
972 select job_id, job_name, sum(pTime) as postTime from
973 (
974 select jb.exec_job_id as job_name, jb.job_id as job_id, jb_inst.job_instance_id ,
975 (
976 (select timestamp from jobstate where job_instance_id = jb_inst.job_instance_id and state = 'POST_SCRIPT_TERMINATED')
977 -
978 (select max(timestamp) from jobstate where job_instance_id = jb_inst.job_instance_id and (state ='POST_SCRIPT_STARTED' or state ='JOB_TERMINATED'))
979 ) as pTime
980 from
981 job_instance as jb_inst,
982 job as jb
983 where jb_inst.job_id =jb.job_id
984 and jb.wf_id = 2
985 ) group by job_id
986 """
987 if self._expand:
988 return []
989 sq_1 = self.session.query(Jobstate.timestamp.label('ts'))
990 sq_1 = sq_1.filter(Jobstate.job_instance_id == JobInstance.job_instance_id)
991 sq_1 = sq_1.filter(Jobstate.state == 'POST_SCRIPT_TERMINATED').correlate(JobInstance)
992 sq_1 = sq_1.subquery().as_scalar()
993
994 sq_2 = self.session.query(func.max(Jobstate.timestamp).label('ts'))
995 sq_2 = sq_2.filter(Jobstate.job_instance_id == JobInstance.job_instance_id)
996 sq_2 = sq_2.filter(or_(Jobstate.state == 'POST_SCRIPT_STARTED',
997 Jobstate.state == 'JOB_TERMINATED')).correlate(JobInstance)
998 sq_2 = sq_2.subquery().as_scalar()
999
1000 q = self.session.query(Job.exec_job_id.label('job_name'), Job.job_id, JobInstance.job_instance_id,
1001 cast(sq_1 - sq_2, Float).label('pTime'))
1002 q = q.filter(JobInstance.job_id == Job.job_id)
1003 q = q.filter(Job.wf_id.in_(self._wfs))
1004 q = q.order_by(Job.job_id).subquery()
1005
1006 main = self.session.query(q.c.job_id, q.c.job_name, func.sum(q.c.pTime).label('postTime'))
1007 main = main.group_by(q.c.job_id)
1008
1009 return main.all()
1010
1028
1029 if __name__ == '__main__':
1030 pass
1031