User guide of the pipeline executors usage#
Bosk pipelines can be fitted and executed with the different managers called executors. In this user guide we will explain the semantics of the executor interface and show, how you can customize the executor’s behaviour. Particular class that we will use is RecursiveExecutor
. RecursiveExecutor
is a simple executor that for each output recursively computes data and thus traverses a computational graph backwards.
[1]:
from bosk.executor.recursive import RecursiveExecutor
from bosk.executor.timer import TimerBlockExecutor
from bosk.stages import Stage
from bosk.pipeline.builder.functional import FunctionalPipelineBuilder
from sklearn.datasets import make_moons
Let’s build a simple Deep Forest
[2]:
n_estimators = 15
b = FunctionalPipelineBuilder()
X, y = b.Input()(), b.TargetInput()()
rf_1 = b.RFC(n_estimators=n_estimators)(X=X, y=y)
et_1 = b.ETC(n_estimators=n_estimators)(X=X, y=y)
concat_1 = b.Concat(['X', 'rf_1', 'et_1'])(X=X, rf_1=rf_1, et_1=et_1)
rf_2 = b.RFC(n_estimators=n_estimators)(X=concat_1, y=y)
et_2 = b.ETC(n_estimators=n_estimators)(X=concat_1, y=y)
stack = b.Stack(['rf_2', 'et_2'], axis=1)(rf_2=rf_2, et_2=et_2)
average = b.Average(axis=1)(X=stack)
argmax = b.Argmax(axis=1)(X=average)
rf_1_roc_auc = b.RocAuc()(gt_y=y, pred_probas=rf_1)
roc_auc = b.RocAuc()(gt_y=y, pred_probas=average)
pipeline = b.build(
{'X': X, 'y': y},
{'labels': argmax, 'probas': average, 'rf_1_roc-auc': rf_1_roc_auc, 'roc-auc': roc_auc}
)
Now we can run this pipeline with RecursiveExecutor
. As it was said in “Example of the basic bosk usage”, for each stage we have to create a new executor instance. The most simple way to create executor is to pass your pipeline and the computational stage.
[3]:
fit_exec = RecursiveExecutor(pipeline, Stage.FIT)
# let's make some data to train our model
X, y = make_moons(noise=0.5)
# now we need to create a dictionary to map the data to the
# pipeline's inputs
full_data = {'X': X, 'y': y}
# to run the executor we need to pass the data
fit_res = fit_exec(full_data)
It is good to understand that to fit a layer we need to pass a data through the previous one. So during the fit stage every block is fitted, than the transform method is called. Therefore on the fit stage we have transformation of the training data as the output.
[4]:
print(list(fit_res.keys()))
['labels', 'probas', 'rf_1_roc-auc', 'roc-auc']
Let’s make another executor for the transform stage, but now we will pay the attention to the inputs
and outputs
arguments. They set a constraint on the corresponding pipeline attributes. Passing inputs
, you set up a hard requirement for the input values to execute the computational graph. It means that if you specify inputs
, input data must contain only those keys. outputs
specify graph’s outputs that will be proceeded.
[5]:
# let's specify inputs and outputs
tf_exec = RecursiveExecutor(pipeline, Stage.TRANSFORM, ['X'], ['probas'])
# let's try to break out inputs requirement
try:
tf_exec(full_data)
except AssertionError as e:
print(e)
Input "y" is not in the executor's inputs set
It is needed to say that if we didn’t specify the inputs, the exception wouldn’t be raised even if we had some keys except X
and y
in the input dictionary.
[6]:
# now let's make the right dictionary
tf_data = {'X': X}
tf_res = tf_exec(tf_data)
print(list(tf_res.keys()))
['probas']
Now we need to discuss slot_handler
and block_executor
arguments. Those ones are needed to perform user customization of the executor behaviour. As the names imply, slot_handler
is responsible for the slots handling policy and block_executor
- blocks. User can implement his own inheritors of BaseSlotHandler
and BaseBlockExecutor
respectively and thus implement some additional logic during the pipeline execution. In the examle below we will measure blocks execution time
using custom block_executor
.
[7]:
timer_block_exec = TimerBlockExecutor()
tf_exec = RecursiveExecutor(pipeline, Stage.TRANSFORM,
outputs=['labels'],
block_executor=timer_block_exec)
tf_exec(tf_data)
# this block executor stores cpu execution time
# for each executed block
# let's find the most complex block
max_time_block = None
max_time = 0
for block, time in timer_block_exec.blocks_time.items():
if time > max_time:
max_time = time
max_time_block = block
print(f'The most complex block is {max_time_block} (id { hash(max_time_block)}).',
f'Execution time is {round(max_time, 5)} s.')
The most complex block is RFCBlock (id 8759314183514). Execution time is 0.00369 s.