Useful Resources¶
Quick Reference¶
Basic steps to create a workflow¶
Data Types¶
Task
: Function or program; atomic unit of executionTaskArray
: List of one or more Tasks, which will be executed in a TemplateInputTypes
: Types for inputs of a Task.InputValues
: Values for inputs of a Task.InputArray
: One or more InputValues, which will be inputs to a TaskArray in a Templatesequence
: List of tasks placed in series; output of one is the input of the nextparallel
: List of tasks processing their inputs in parallelsplit
: “split” task feeding inputs to a set of parallel tasksmerge
: Collect output of parallel tasksFunctions¶
Note
Name is an optional parameter in all below functions.
(1) Create inputs for tasks¶
Description | Functions and examples |
---|---|
Create InputTypes | input_type_1 = InputTypes("Types1", [int, str]) |
Create InputValue | input_value_1 = InputValues("Values1", [1, "hello"]) |
Create InputArray | input_array_12 = InputArray("Array12", [input_value_1, input_value_2]) |
(2) Create tasks and task arrays¶
Description | Functions and examples |
---|---|
Create a Task | task_f1 = Task("A", FUNCTION, "myfunc", input_type_1) task_x1 = Task("X", EXECUTABLE, "./bin/x", input_type_1) |
Create a TaskArray | task_array_xyz = TaskArray("xyz", [task_f1, task_x1]) task_array_12 = TaskArray("tasks12", [task_f1, task_f2]) |
(3) Create templates¶
Description | Functions and examples |
---|---|
Sequence | output = sequence("my_sequence", task_array_12, input_array_12) |
Data parallel | output = parallel("abc", task_array_12, input_array_12) |
Split | output = split("split", task_x1, input_value_1, spl_taskarray, spl_input_array) |
Merge | output = merge("sync", syn_ta, syn_ia, task_x1, input_value_1) |
Constants and Special Notations¶
Constant | Description |
---|---|
FUNCTION | Function in current program |
EXECUTABLE | Different program to execute |
PREVIOUS.task_name.i[output_no] | Output number of previous task task_name |
Program Skeleton¶
Below is a skeleton to help kickstart writing your Tigres code. This will allow you to specify
different Execution plugins.
You can download the Tigres skeleton program here
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | """
My Tigres Program
"""
import os
import sys
from tigres import *
from tigres.utils import Execution, TigresException, State
def main(execution):
# Setup up the Tigres Program using __file__ to build the log file name
start(log_dest=os.path.splitext(__file__)[0] + '.log', execution=execution)
# Set the logging level
set_log_level(Level.INFO)
# Heart of Tigres Program here
# Create DOT (plain text graph) file
dot_execution()
# End the Tigres Program
end()
if __name__ == "__main__":
# Simple Usage Here
if len(sys.argv) <= 1:
print("Usage: {} ({})>".format(sys.argv[0], "|".join(Execution.LOOKUP.keys())))
exit()
try:
main(Execution.get(sys.argv[1]))
except TigresException as e:
print(e)
task_check = check('task', state=State.FAIL)
for task_record in task_check:
print(".. State of {} = {} - {}".format(task_record.name, task_record.state, task_record.errmsg))
|
This program allows you to execute your workflow with any of the available execution plugins. If you execute this python code, you will see all the available execution plugins:
$ python hello_world_parallel.py
Usage: hello_world_parallel.py (EXECUTION_SLURM|EXECUTION_DISTRIBUTE_PROCESS|EXECUTION_LOCAL_THREAD|EXECUTION_LOCAL_PROCESS|EXECUTION_SGE)>
Execute the script with EXECUTION_LOCAL_THREAD
:
$ hello_world_parallel.py EXECUTION_LOCAL_THREAD
There is no output to the screen but you will see a log file in the current directory:
$ ls
hello_world_parallel.log hello_world_parallel.py
Load Tigres API¶
In lines 7-8 the Tigres API is loaded into the python program namespace.
Line 7 imports all functions and classes from tigres
while line 8
imports the tigres.utils.Execution
, tigres.utils.TigresException
and tigres.utils.State
.
from tigres import *
from tigres.utils import Execution, TigresException, State
The main Function¶
The main function has some boiler plate code that is run before and after any templates are executed.
def main(execution):
# Setup up the Tigres Program using __file__ to build the log file name
start(log_dest=os.path.splitext(__file__)[0] + '.log', execution=execution)
# Set the logging level
set_log_level(Level.INFO)
# Heart of Tigres Program here
# Create DOT (plain text graph) file
dot_execution()
# End the Tigres Program
end()
Program initialization¶
Before any templates are executed, the Tigres program is initialized with a call
to start()
. This function configures and initializes the Tigres program. In the above code,
the program specifies the log file name (log_dest
) as well as the execution
mechanism (e.g. execution='EXECUTION_LOCAL_THREADS'
).
The function start()
will end a previously started Tigres program. The
verbosity of the logs is set using set_log_level()
. After this call,
all messages at a numerically higher level (e.g. DEBUG if level is INFO) will be skipped
(See Level
).
Graph visualization¶
After the last template is executed, the execution can be visualized using
dot_execution()
. This function creates a DOT file (plain text graph)
for a visual representation of the completed workflow. The function
dot_execution()
writes a graph file
of the current
Tigres execution workflow that can be visualized
using the
visualization tools of your choice
such as GraphViz. Lastly, the Tigres program is finalized
with a call to end()
.
The __main__ Driver¶
The driver program at the bottom of the file, does some argument checking,
executes main()
and prints any raised errors. If a tigres.utils.TigresException
is raised, the message is printed and then the monitoring information is checked for more
detailed error messages using tigres.check()
. Line 26, searches the monitoring information
for the currently running program for all Tigres tasks that have a state of State.FAIL
.
if __name__ == "__main__":
# Simple Usage Here
if len(sys.argv) <= 1:
print("Usage: {} ({})>".format(sys.argv[0], "|".join(Execution.LOOKUP.keys())))
exit()
try:
main(Execution.get(sys.argv[1]))
except TigresException as e:
print(e)
task_check = check('task', state=State.FAIL)
for task_record in task_check:
print(".. State of {} = {} - {}".format(task_record.name, task_record.state, task_record.errmsg))