bosk.executor.parallel.greedy#

Module Contents#

Classes#

ParallelEngine

Parallel execution engine interface.

JoblibParallelEngine

Joblib-based Parallel Engine.

MultiprocessingParallelEngine

Multiprocessing-based thread pool execution engine.

GreedyParallelExecutor

The recursive executor implementation.

Attributes#

ResultT

Execution result generic typevar.

OutSlotToData

bosk.executor.parallel.greedy.ResultT#

Execution result generic typevar.

Required to constrain ParallelEngine.Instance.staramap result.

bosk.executor.parallel.greedy.OutSlotToData#
class bosk.executor.parallel.greedy.ParallelEngine#

Bases: abc.ABC

Parallel execution engine interface.

Implements a context manager interface, returning execution engine instance.

class Instance#

Bases: abc.ABC

Execution engine instance interface.

Can somehow execute starmap() function in parallel.

abstract starmap(func, iterable)#
Parameters:
  • func (Callable[Ellipsis, ResultT]) –

  • iterable (Iterable) –

Return type:

List[ResultT]

abstract __enter__()#
Return type:

ParallelEngine

abstract __exit__(_type, _value, _traceback)#
class bosk.executor.parallel.greedy.JoblibParallelEngine(n_threads=-1, backend=None, prefer='threads')#

Bases: ParallelEngine

Joblib-based Parallel Engine.

Parameters:
  • n_threads (int) – Number of threads.

  • backend (Optional[str]) – Joblib backend to use.

  • prefer (Optional[str]) – Soft hint what to prefer (threads or processes).

class JoblibInstance(parallel)#

Bases: ParallelEngine

Execution engine instance interface.

Can somehow execute starmap() function in parallel.

Parameters:

parallel (joblib.Parallel) –

starmap(func, iterable)#
__enter__()#
Return type:

JoblibParallelEngine

__exit__(_type, _value, _traceback)#
class bosk.executor.parallel.greedy.MultiprocessingParallelEngine(n_threads=None)#

Bases: ParallelEngine

Multiprocessing-based thread pool execution engine.

Parameters:

n_threads (Optional[int]) – Number of threads.

class MPInstance(pool)#

Bases: ParallelEngine

Execution engine instance interface.

Can somehow execute starmap() function in parallel.

Parameters:

pool (multiprocessing.pool.ThreadPool) –

starmap(func, iterable)#
__enter__()#
Return type:

MultiprocessingParallelEngine

__exit__(_type, _value, _traceback)#
class bosk.executor.parallel.greedy.GreedyParallelExecutor(pipeline, stage, inputs=None, outputs=None, slot_handler=None, block_executor=None, parallel_engine=MultiprocessingParallelEngine())#

Bases: bosk.executor.base.BaseExecutor

The recursive executor implementation.

Considers only input-output slots information to match slots.

_conn_map#

Pipeline connections, represented as a hash map, the keys are blocks’ input slots, the values are output ones. Each input slot corresponds no more than one output slot, so this representation is correct.

Parameters:
property outputs: Optional[frozenset[str]]#

Getter for the executor’s ouputs set. None if there are no restrictions on the pipeline’s outputs.

Return type:

Optional[frozenset[str]]

_conn_map :Mapping[bosk.block.base.BlockInputSlot, bosk.block.base.BlockOutputSlot]#
_prepare_out_to_in_edges()#

Prepare the mapping from output slots to list of input slots.

Returns:

Dictionary with output slots as keys, lists of the corresponding input slots as values.

Return type:

Dict[bosk.block.base.BlockOutputSlot, List[bosk.block.base.BlockInputSlot]]

_get_blocks(output_slots)#

Get all blocks that should be executed.

Parameters:

output_slots (Set[bosk.block.base.BlockOutputSlot]) – Set of output slots.

Returns:

Set of the pipeline blocks.

Return type:

Set[bosk.block.base.BaseBlock]

_prepare_inputs_by_block()#

Prepare the mapping from blocks to their inputs.

Returns:

Dictionary with blocks as keys, sets of the corresponding input slots as values.

Return type:

Dict[bosk.block.base.BaseBlock, Set[bosk.block.base.BlockInputSlot]]

_prepare_inputs(block, input_slot_values)#

Prepare the mapping of inputs needed for the block.

Parameters:
Returns:

Mapping from input slots to the corresponding data for the given block.

Return type:

Mapping[bosk.block.base.BlockInputSlot, bosk.data.BaseData]

_compute_all_plain(blocks, computed_values)#

Filter plain blocks and compute them.

It is assumed that plain block execution is computationally effortless.

Parameters:
Returns:

Mapping from BlockOutputSlot to Data.

Return type:

Mapping[bosk.block.base.BlockOutputSlot, bosk.data.BaseData]

_compute_all_parallel(blocks, computed_values, parallel)#

Filter blocks that can be computed in parallel and compute them.

Parameters:
Returns:

Mapping from BlockOutputSlot to Data.

Return type:

Mapping[bosk.block.base.BlockOutputSlot, bosk.data.BaseData]

_compute_all_non_threadsafe(blocks, computed_values)#

Filter blocks that are not plain and cannot be computed in parallel, and compute them.

Parameters:
Returns:

Mapping from BlockOutputSlot to Data.

Return type:

OutSlotToData

_clean_unnecessary_data(computed_values, remaining_blocks)#

Remove the intermediate data (execution results) that will not be required in the future.

Parameters:

Returns:

_find_ready_blocks(computed_values, remaining_blocks)#

Find the blocks for which required inputs are already computed.

Parameters:
Returns:

List of blocks which are ready to be computed.

Return type:

List[bosk.block.base.BaseBlock]

__append_outputs(output_values, computed_values, output_slots, new_outputs)#

Append newly computed outputs.

Parameters:
__execute_with_parallel(input_values, parallel)#

Pipeline execution with given parallel engine instance.

Parameters:
Returns:

Dictionary with output slots as keys and computed data as values.

Return type:

Dict[bosk.block.base.BlockOutputSlot, bosk.data.BaseData]

execute(input_values)#

Executes the pipeline given BaseData inputs and return BaseData output values.

Parameters:

input_values (Mapping[str, bosk.data.BaseData]) – Input data.

Returns:

Calculated output data dictionary that maps output names to the data.

Return type:

Dict[str, bosk.data.BaseData]