Информация по использованию исполнителей#

Конвейеры bosk можно обучать и выполнять с помощью различных менеджеров, называемых исполнителями. В этом руководстве пользователя мы объясним семантику интерфейса исполнителя и покажем, как Вы можете настроить его поведение. Конкретный класс, который мы будем использовать, это RecursiveExecutor. RecursiveExecutor — это простой исполнитель, который для каждого вывода рекурсивно вычисляет данные и, таким образом, проходит вычислительный граф в обратном направлении.

[1]:
from bosk.executor.recursive import RecursiveExecutor
from bosk.executor.timer import TimerBlockExecutor
from bosk.stages import Stage
from bosk.pipeline.builder.functional import FunctionalPipelineBuilder
from sklearn.datasets import make_moons

Определим простой Глубокий лес

[2]:
n_estimators = 15
b = FunctionalPipelineBuilder()
X, y = b.Input()(), b.TargetInput()()
rf_1 = b.RFC(n_estimators=n_estimators)(X=X, y=y)
et_1 = b.ETC(n_estimators=n_estimators)(X=X, y=y)
concat_1 = b.Concat(['X', 'rf_1', 'et_1'])(X=X, rf_1=rf_1, et_1=et_1)
rf_2 = b.RFC(n_estimators=n_estimators)(X=concat_1, y=y)
et_2 = b.ETC(n_estimators=n_estimators)(X=concat_1, y=y)
stack = b.Stack(['rf_2', 'et_2'], axis=1)(rf_2=rf_2, et_2=et_2)
average = b.Average(axis=1)(X=stack)
argmax = b.Argmax(axis=1)(X=average)
rf_1_roc_auc = b.RocAuc()(gt_y=y, pred_probas=rf_1)
roc_auc = b.RocAuc()(gt_y=y, pred_probas=average)
pipeline = b.build(
    {'X': X, 'y': y},
    {'labels': argmax, 'probas': average, 'rf_1_roc-auc': rf_1_roc_auc, 'roc-auc': roc_auc}
)

Теперь мы можем запустить этот конвейер с помощью RecursiveExecutor. Как было сказано в “Базовом примере использования bosk”, для каждого этапа мы должны создать новый экземпляр исполнителя. Самый простой способ создать исполнитель — передать конвейер и вычислительную стадию.

[3]:
fit_exec = RecursiveExecutor(pipeline, Stage.FIT)
# сгенерируем данные для обучения модели
X, y = make_moons(noise=0.5)
# теперь нужно создать словарь для
# сопоставления данных и входов
full_data = {'X': X, 'y': y}
# для запуска исполнителя просто передаем данные
fit_res = fit_exec(full_data)

Хорошо понимать, что для обучения слоя нам нужно передать ему данные от предыдущего. Таким образом, на этапе обучения обучается каждый блок, после чего вызывается метод преобразования. Поэтому на этапе обучения у нас есть преобразование обучающих данных в качестве вывода исполнителя.

[4]:
print(list(fit_res.keys()))
['labels', 'probas', 'rf_1_roc-auc', 'roc-auc']

Сделаем еще один исполнитель для этапа преобразования, но теперь обратим внимание на аргументы inputs и outputs. Они устанавливают ограничение на соответствующие атрибуты конвейера. Передавая входные данные, вы устанавливаете жесткие требования к входным значениям для выполнения вычислительного графа. Это означает, что если вы укажете inputs, входные данные должны содержать только эти ключи. outputs определяют выходы графа, которые будут обработаны.

[5]:
# давайте уточним входы и выходы
tf_exec = RecursiveExecutor(pipeline, Stage.TRANSFORM, ['X'], ['probas'])
# попробуем нарушить требование к входам
try:
    tf_exec(full_data)
except AssertionError as e:
    print(e)
Input "y" is not in the executor's inputs set

Нужно сказать, что если бы мы не указали входные данные, то исключение не было бы возбуждено, даже если бы у нас были какие-то ключи кроме X и y во входном словаре.

[6]:
# теперь сделаем правильный словарь
tf_data = {'X': X}
tf_res = tf_exec(tf_data)
print(list(tf_res.keys()))
['probas']

Теперь нам нужно обсудить аргументы slot_handler и block_executor. Они необходимы для выполнения пользовательской настройки поведения исполнителя. Как видно из названий, slot_handler отвечает за политику обработки слотов, а block_executor – за блоки. Пользователь может реализовать свои собственные наследники BaseSlotHandler и BaseBlockExecutor соответственно и, таким образом, реализовать некоторую дополнительную логику во время выполнения конвейера. В приведенном ниже примере мы будем измерять время выполнения блоков с помощью пользовательского block_executor.

[7]:
timer_block_exec = TimerBlockExecutor()
tf_exec = RecursiveExecutor(pipeline, Stage.TRANSFORM,
                            outputs=['labels'],
                            block_executor=timer_block_exec)
tf_exec(tf_data)
# this block executor stores cpu execution time
# for each executed block
# let's find the most complex block
max_time_block = None
max_time = 0
for block, time in timer_block_exec.blocks_time.items():
    if time > max_time:
        max_time = time
        max_time_block = block
print(f'The most complex block is {max_time_block} (id { hash(max_time_block)}).',
      f'Execution time is {round(max_time, 5)} s.')
The most complex block is RFCBlock (id 8759314183514). Execution time is 0.00369 s.