Example of the autoblock usage#

Computational graphs in bosk requires their nodes to be a BaseBlock instances. To make user models including easier, the auto_block decorator was developed. Using it, you can easily convert your custom model to the BaseBlock. To get examples of converting scikit-learn interface models, see the bosk.block.zoo.models.classification subpackage code.

This exmaple requires the Pytorch to be installed. Make sure you have the torch package installed or run the following cell:

[ ]:
!pip install torch
[1]:
from bosk.block.auto import auto_block
from bosk.block.meta import BlockExecutionProperties
from bosk.executor.parallel.greedy import GreedyParallelExecutor
from bosk.painter.graphviz import GraphvizPainter
from bosk.pipeline.builder.functional import FunctionalPipelineBuilder
from bosk.stages import Stage
import torch
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
from sklearn.datasets import make_moons
from sklearn.model_selection import train_test_split
from IPython.display import Image

Consider we have the following model:

[2]:
@auto_block(
    execution_props=BlockExecutionProperties(cpu=False, gpu=True, plain=False, threadsafe=True),
    random_state_field=None,
)
class NeuralNetwork(torch.nn.Module):
    def __init__(self, dim: int) -> None:
        super().__init__()
        self.device = torch.device('cuda')
        self.nn = torch.nn.Sequential(
            torch.nn.Linear(dim, dim * 10),
            torch.nn.ReLU(),
            torch.nn.Linear(dim * 10, dim),
            torch.nn.Tanh(),
            torch.nn.Linear(dim, 1),
            torch.nn.Sigmoid()
        ).to(self.device)
        self.optimizer = torch.optim.Adam(self.nn.parameters(), 0.0005)
        self.batch_size = 64
        self.epochs_num = 500
        self.loss_fn = torch.nn.CrossEntropyLoss()

    def _get_tensor(self, x: np.ndarray) -> torch.Tensor:
        return torch.from_numpy(x.astype(np.float32)).to(self.device)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        proba = self.nn(x)
        return torch.concat((1 - proba, proba), dim=-1)

    def transform(self, x: np.ndarray) -> np.ndarray:
        with torch.no_grad():
            x = self._get_tensor(x)
            return self(x).cpu().numpy()

    def fit(self, x: np.ndarray, y: np.ndarray) -> 'NeuralNetwork':
        x_tens = self._get_tensor(x)
        y_tens = self._get_tensor(y).long()
        dataset = TensorDataset(x_tens, y_tens)
        data_loader = DataLoader(dataset, self.batch_size, True)
        self.train()
        for _ in range(self.epochs_num):
            for x, y in data_loader:
                self.optimizer.zero_grad()
                pred = self(x)
                loss = self.loss_fn(pred, y)
                loss.backward()
                self.optimizer.step()
        return self

Let’s describe what is happening here. We’ve got a Pytorch neural network model which we want to include in some bosk pipeline and train it on a GPU. To turn our model into a bosk BaseBlock we need to use auto_block decorator.

The decorator works with the classes having fit and transform methods. It uses the methods’ arguments names to create slots metainformation. Each argument name creates its own slot. If an argument is presented in the fit method, than the slot will be marked as used on the FIT stage. The same with the transform method and the TRANSFORM stage.

We can use auto_block without any arguments, and it will create a default CPU block with the default behaviour, which you can learn from the documentation. In our case we want this block to be marked to be runnable on the GPU (cpu=False, gpu=True), able to be parallelized (thread_safe=True), and as we didn’t implement neural network weights initialization according to the given random seed, we marked random_state_field=None to do nothing in the block.set_random_state method.

Now let’s make a pipeline with our model. Notice that we need to register our custom block in the functional builder. Also, it is needed to say that currently in bosk GPU data is stored using jax arrays, which are incompatible with the Pytorch tensors. This is why we didn’t put any MoveToBlock in the pipeline and implemented all the data transfer logic inside of the NeuralNetwork model.

[3]:
dim = 2

b = FunctionalPipelineBuilder()
X, y = b.Input()(), b.TargetInput()()
rf = b.RFC()(X=X, y=y)
rf_roc_auc = b.RocAuc()(gt_y=y, pred_probas=rf)

# block registration
nn_func_block_wrapper = b.new(NeuralNetwork, dim=dim)
# adding our block in the pipeline
nn = nn_func_block_wrapper(x=X, y=y)

nn_roc_auc = b.RocAuc()(gt_y=y, pred_probas=nn)
stack = b.Stack(['rf', 'nn'], axis=1)(rf=rf, nn=nn)
average = b.Average(axis=1)(X=stack)
roc_auc = b.RocAuc()(gt_y=y, pred_probas=average)
pipeline = b.build(
    {'X': X, 'y': y},
    {'probas': average, 'rf roc-auc': rf_roc_auc, 'total roc-auc': roc_auc, 'nn roc-auc': nn_roc_auc}
)
GraphvizPainter(figure_dpi=100).from_pipeline(pipeline).render('pipeline.jpeg')
display(Image('pipeline.jpeg'))
../_images/notebooks_autoblock_7_0.jpg

Now let’s fit the pipeline in the parallel mode and see if our ensemble model brought some improvement to the basic models.

[4]:
x, y = make_moons(200, noise=0.5)
train_x, test_x, train_y, test_y = train_test_split(x, y, test_size=0.5)
train_data = {'X': train_x, 'y': train_y}
test_data = {'X': test_x, 'y': test_y}

fit_exec = GreedyParallelExecutor(pipeline, Stage.FIT, outputs=['rf roc-auc', 'nn roc-auc', 'total roc-auc'])
fit_res = fit_exec(train_data).numpy()
for key, val in fit_res.items():
    print(f'{key}:', round(val, 3))
rf roc-auc: 1.0
total roc-auc: 0.985
nn roc-auc: 0.923
[5]:
test_exec = GreedyParallelExecutor(pipeline, Stage.TRANSFORM, outputs=['rf roc-auc', 'nn roc-auc', 'total roc-auc'])
test_res = test_exec(test_data).numpy()
for key, val in test_res.items():
    print(f'{key}:', round(val, 3))
rf roc-auc: 0.838
total roc-auc: 0.859
nn roc-auc: 0.891