Package netlogger :: Package analysis :: Package workflow :: Module stampede_statistics :: Class StampedeStatistics
[hide private]
[frames] | no frames]

Class StampedeStatistics

source code

modules._base.SQLAlchemyInit --+
                               |
           nllog.DoesLogging --+
                               |
                              StampedeStatistics

Instance Methods [hide private]
 
__init__(self, connString=None, expand_workflow=True) source code
 
initialize(self, root_wf_uuid) source code
 
set_job_filter(self, filter='all') source code
 
get_sub_workflow_ids(self)
Returns info on child workflows only.
source code
 
get_descendant_workflow_ids(self) source code
 
_get_job_filter(self) source code
 
_max_job_seq_subquery(self)
Creates the following subquery that is used in ...
source code
 
_dax_or_dag_cond(self) source code
 
get_total_jobs_status(self)
select...
source code
 
get_total_succeeded_jobs_status(self)
select DISTINCT count(jb.job_id)...
source code
 
get_total_failed_jobs_status(self)
select count(*) from...
source code
 
_query_jobstate_for_instance(self, states)
The states arg is a list of strings.
source code
 
get_total_unknown_jobs_status(self)
select count(*) from job_instance jb_inst , job as jb...
source code
 
get_total_tasks_status(self)
select count(*) from task where wf_id in (...
source code
 
_base_task_status_query(self)
select count(*) from...
source code
 
get_total_succeeded_tasks_status(self) source code
 
get_total_failed_tasks_status(self) source code
 
get_total_jobs_statistics(self)
select count(*) as total_jobs...
source code
 
get_total_succeeded_jobs_statistics(self)
select DISTINCT count(jb.job_id)...
source code
 
get_total_failed_jobs_statistics(self)
select count(*) as job_failure...
source code
 
_base_task_statistics_query(self) source code
 
get_total_tasks_statistics(self)
select count(*) from invocation as invoc where invoc.task_submit_seq >=0 and invoc.wf_id in (...
source code
 
get_total_succeeded_tasks_statistics(self)
select count(*) as succeeded_tasks...
source code
 
get_total_failed_tasks_statistics(self)
select count(*) as failed_tasks...
source code
 
get_workflow_wall_time(self)
select ws.wf_id,...
source code
 
get_workflow_cum_job_wall_time(self)
select sum(remote_duration) from invocation as invoc ...
source code
 
get_submit_side_job_wall_time(self)
select sum(local_duration) from job_instance as jb_inst , job as jb where...
source code
 
get_job_name(self)
select jb.job_id, jb.exec_job_id as job_name...
source code
 
get_job_site(self)
select job_id , group_concat(site) as sites from ( select DISTINCT jb.job_id as job_id , jb_inst.site as site from job as jb, job_instance as jb_inst where jb.wf_id = 3 and jb_inst.job_id = jb.job_id ) group by job_id
source code
 
get_job_kickstart(self)
select jb.job_id, jb.exec_job_id as job_name , sum(remote_duration) as kickstart from job as jb, invocation as invoc, job_instance as jb_inst where jb_inst.job_id = jb.job_id and jb.wf_id = 3 and invoc.wf_id =3 and invoc.task_submit_seq >=0 and invoc.job_instance_id = jb_inst.job_instance_id group by jb.job_id
source code
 
get_job_runtime(self)
select jb.job_id, sum(jb_inst.local_duration) as runtime from job as jb, job_instance as jb_inst where jb_inst.job_id = jb.job_id and jb.wf_id = 3 group by jb.job_id
source code
 
get_job_seqexec(self)
select jb.job_id, sum(jb_inst.cluster_duration) as seqexec from job as jb, job_instance as jb_inst where jb_inst.job_id = jb.job_id and jb.wf_id = 3 group by jb.job_id
source code
 
get_job_seqexec_delay(self)
Seqexec Delay is Seqexec - Kickstart calculated above.
source code
 
get_condor_q_time(self)
select job_id, job_name, sum(cQTime) as condorQTime from...
source code
 
get_resource_delay(self)
select job_id, job_name, sum(rTime) as resourceTime from...
source code
 
get_dagman_delay(self)
select jb.exec_job_id as job_name, ...
source code
 
get_post_time(self)
select job_id, job_name, sum(pTime) as postTime from...
source code
 
get_transformation_statistics(self)
select transformation, count(*), min(remote_duration) , max(remote_duration) , avg(remote_duration) , sum(remote_duration) from invocation as invoc where invoc.wf_id = 3 group by transformation
source code
Method Details [hide private]

__init__(self, connString=None, expand_workflow=True)
(Constructor)

source code 
Overrides: nllog.DoesLogging.__init__

_max_job_seq_subquery(self)

source code 

Creates the following subquery that is used in 
several queries:
and jb_inst.job_submit_seq  = (
    select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id
    )

get_total_jobs_status(self)

source code 

select
(    
    select count(*)  from job as jb where jb.wf_id in (1,2,3) and not 
                (jb.type_desc = 'dax'or jb.type_desc = 'dag')
)
+
(
    select count(*) from
    (
        select jb_inst.job_id from job_instance as jb_inst , job as jb  
        where jb.wf_id in (1,2,3)
        and jb_inst.job_id = jb.job_id
        and (jb.type_desc ='dax' or jb.type_desc ='dag' )
        and jb_inst.subwf_id is  null
        and jb_inst.job_submit_seq  = (
            select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id
            )
    )
) as total_jobs

get_total_succeeded_jobs_status(self)

source code 

select DISTINCT count(jb.job_id)
    from
    job as jb,
    job_instance as jb_inst,
    jobstate as jb_state
    where  jb.wf_id in(
    1,2,3
    )
    and jb.job_id = jb_inst.job_id
    and not (jb.type_desc ='dax' or jb.type_desc ='dag')
    and jb_inst.job_instance_id = jb_state.job_instance_id
    and jb_state.state ='JOB_SUCCESS'

get_total_failed_jobs_status(self)

source code 

select count(*) from
(
    select jb_inst.job_instance_id
    from job as jb, job_instance as jb_inst , jobstate as jb_state
    where jb_inst.job_submit_seq  = (
        select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id
    )
    and jb.wf_id in (1,2,3)    
    and jb.job_id = jb_inst.job_id
    and jb_inst.job_instance_id = jb_state.job_instance_id
    and (
              (not (jb.type_desc ='dax' or jb.type_desc ='dag'))
        or
              ((jb.type_desc ='dax' or jb.type_desc ='dag') and jb_inst.subwf_id is NULL)
             )
    and jb_state.state in ('JOB_FAILURE')
)

_query_jobstate_for_instance(self, states)

source code 

The states arg is a list of strings. Returns an appropriate subquery.

get_total_unknown_jobs_status(self)

source code 

select count(*) from job_instance jb_inst , job as jb
where jb_inst.job_submit_seq  = (
        select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id
    )
and jb_inst.job_instance_id in
    (
    select js.job_instance_id from jobstate as js
    where js.job_instance_id = jb_inst.job_instance_id
    and js.state = 'SUBMIT'
    )
and jb_inst.job_instance_id not in
    (
    select js.job_instance_id from jobstate as js
    where js.job_instance_id = jb_inst.job_instance_id
    and js.state in ( 'JOB_SUCCESS', 'JOB_FAILURE')
)
and jb_inst.job_id  = jb.job_id
and jb.wf_id in (
        1,2,3
)
and not (jb.type_desc ='dax' or jb.type_desc ='dag' )

get_total_tasks_status(self)

source code 

select count(*) from task where wf_id in (
    1,2,3
   )

_base_task_status_query(self)

source code 

select count(*) from
task as tk,
job_instance as jb_inst,
job as jb,
invocation as invoc
where invoc.wf_id in (
    1,2,3
 )
and jb_inst.job_submit_seq  = (
        select max(job_submit_seq) from job_instance where job_id = jb_inst.job_id group by job_id
    )
and tk.wf_id in (
     1,2,3
)
and  jb.job_id = jb_inst.job_id
and jb_inst.job_instance_id = invoc.job_instance_id
and tk.abs_task_id = invoc.abs_task_id
and tk.wf_id = invoc.wf_id
and invoc.exitcode = 0

get_total_jobs_statistics(self)

source code 

select count(*)  as total_jobs
from
job_instance as jb_inst ,
job as jb
where
jb_inst.job_id  = jb.job_id
and jb.wf_id in (
    1,2,3
)  
and (
      (not (jb.type_desc ='dax' or jb.type_desc ='dag'))
    or
      ((jb.type_desc ='dax' or jb.type_desc ='dag') and jb_inst.subwf_id is NULL)
   )

get_total_succeeded_jobs_statistics(self)

source code 

select DISTINCT count(jb.job_id)
    from
    job as jb,
    job_instance as jb_inst,
    jobstate as jb_state
    where  jb.wf_id in(
    1,2,3
    )
    and jb.job_id = jb_inst.job_id
    and not (jb.type_desc ='dax' or jb.type_desc ='dag')
    and jb_inst.job_instance_id = jb_state.job_instance_id
    and jb_state.state ='JOB_SUCCESS'

get_total_failed_jobs_statistics(self)

source code 

select count(*) as job_failure
from
jobstate as jb_state ,
job_instance jb_inst,
job as jb
where  jb.wf_id in(
    1,2,3
      )
and jb_state.job_instance_id = jb_inst.job_instance_id
and jb.job_id = jb_inst.job_id
and jb_state.state = 'JOB_FAILURE'
and (
      (not (jb.type_desc ='dax' or jb.type_desc ='dag'))
    or
      ((jb.type_desc ='dax' or jb.type_desc ='dag') and jb_inst.subwf_id is NULL)
   )

get_total_tasks_statistics(self)

source code 

select count(*) from invocation as invoc where invoc.task_submit_seq >=0 and invoc.wf_id in  (
      1,2,3
 )

get_total_succeeded_tasks_statistics(self)

source code 

select count(*) as succeeded_tasks
from
invocation as invoc
where
invoc.wf_id in  (
    1,2,3
)
and invoc.exitcode = 0
and invoc.task_submit_seq >=0

get_total_failed_tasks_statistics(self)

source code 

select count(*) as failed_tasks
from
invocation as invoc
where
invoc.wf_id in  (
    1,2,3
)
and invoc.exitcode <> 0
and invoc.task_submit_seq >=0

get_workflow_wall_time(self)

source code 

select ws.wf_id,
sum(case when (ws.state == 'WORKFLOW_TERMINATED') then ws.timestamp end)
-
sum (case when (ws.state == 'WORKFLOW_STARTED') then ws.timestamp end) as duration
from workflowstate ws
group by ws.wf_id

get_workflow_cum_job_wall_time(self)

source code 

select sum(remote_duration) from invocation as invoc 
   where  invoc.task_submit_seq >=0 and invoc.wf_id in(
      1,2,3
   )

get_submit_side_job_wall_time(self)

source code 

select sum(local_duration) from job_instance as jb_inst , job as jb where
jb_inst.job_id  = jb.job_id
and jb.wf_id in (
    1,2,3
)
and (
      (not (jb.type_desc ='dax' or jb.type_desc ='dag'))
    or
      ((jb.type_desc ='dax' or jb.type_desc ='dag') and jb_inst.subwf_id is NULL)
   )
   

get_job_name(self)

source code 

select jb.job_id, jb.exec_job_id as job_name
 from
 job as jb,
 job_instance as jb_inst
 where
 jb_inst.job_id = jb.job_id
 and jb.wf_id = 3
 group by jb.job_id

get_job_seqexec_delay(self)

source code 

Seqexec Delay is Seqexec - Kickstart calculated above.

select jb.job_id,
(
 (
 select sum(jb_inst.cluster_duration)
 from
 job_instance as jb_inst
 where
 jb_inst.job_id = jb.job_id
 group by jb_inst.job_id
 )
-
 (
 select sum(remote_duration)
 from
 invocation as invoc,
 job_instance as jb_inst
 where
 jb_inst.job_id = jb.job_id
 and invoc.wf_id =jb.wf_id
 and invoc.task_submit_seq >=0
 and invoc.job_instance_id = jb_inst.job_instance_id
 group by jb_inst.job_id
 )
) as seqexec_delay
from
job as jb
where jb.wf_id in (1,2,3)
and jb.clustered <>0

get_condor_q_time(self)

source code 

select job_id, job_name, sum(cQTime) as condorQTime from
(
select jb.exec_job_id as job_name,  jb.job_id as job_id, jb_inst.job_instance_id ,
    (
        (select min(timestamp) from jobstate where job_instance_id = jb_inst.job_instance_id and (state = 'GRID_SUBMIT' or state = 'GLOBUS_SUBMIT' or state = 'EXECUTE'))
        -
        (select timestamp from jobstate where job_instance_id = jb_inst.job_instance_id and state = 'SUBMIT' )
    )   as cQTime
from
job_instance as jb_inst,
job as jb
where jb_inst.job_id =jb.job_id
and jb.wf_id = 2
)   group by job_id

get_resource_delay(self)

source code 

select job_id, job_name, sum(rTime) as resourceTime from
(
select jb.exec_job_id as job_name,  jb.job_id as job_id, jb_inst.job_instance_id ,
    (
             (select timestamp from jobstate where job_instance_id = jb_inst.job_instance_id and state = 'EXECUTE' )  
        -
            (select timestamp from jobstate where job_instance_id = jb_inst.job_instance_id and (state = 'GRID_SUBMIT' or state ='GLOBUS_SUBMIT'))
    )   as rTime
from
job_instance as jb_inst,
job as jb
where jb_inst.job_id =jb.job_id
and jb.wf_id = 2
)   group by job_id

get_dagman_delay(self)

source code 

select jb.exec_job_id as job_name,  
(
  (
   select min(timestamp)
   from jobstate
   where job_instance_id in 
     (select job_instance_id from job_instance as jb_inst where jb_inst.job_id = jb.job_id )  
   and state ='SUBMIT'
  )
  -
  (
   select max(timestamp)
   from jobstate
   where job_instance_id in 
      (select job_instance_id from job_instance as jb_inst where jb_inst.job_id in
        (
          select
          parent.job_id as parent_job_id
          from
          job as parent,
          job as child,
          job_edge as edge
          where
          edge.wf_id = 2
          and parent.wf_id = 2
          and child.wf_id = 2
          and child.job_id = jb.job_id
          and edge.parent_exec_job_id like parent.exec_job_id
          and edge.child_exec_job_id like child.exec_job_id
         )
       )
   and (state = 'POST_SCRIPT_TERMINATED' or state ='JOB_TERMINATED')
  )
)   as dagmanDelay
from
job as jb where
jb.wf_id =2

get_post_time(self)

source code 

select job_id, job_name, sum(pTime) as postTime from
(
select jb.exec_job_id as job_name,  jb.job_id as job_id, jb_inst.job_instance_id ,
    (
             (select timestamp from jobstate where job_instance_id = jb_inst.job_instance_id and state = 'POST_SCRIPT_TERMINATED')
        -
            (select max(timestamp) from jobstate  where job_instance_id = jb_inst.job_instance_id  and (state ='POST_SCRIPT_STARTED' or state ='JOB_TERMINATED'))
    )   as pTime
from
job_instance as jb_inst,
job as jb
where jb_inst.job_id =jb.job_id
and jb.wf_id = 2
) group by job_id