bosk.executor.parallel.greedy
#
Module Contents#
Classes#
Parallel execution engine interface. |
|
Joblib-based Parallel Engine. |
|
Multiprocessing-based thread pool execution engine. |
|
The recursive executor implementation. |
Attributes#
Execution result generic typevar. |
|
- bosk.executor.parallel.greedy.ResultT#
Execution result generic typevar.
Required to constrain ParallelEngine.Instance.staramap result.
- bosk.executor.parallel.greedy.OutSlotToData#
- class bosk.executor.parallel.greedy.ParallelEngine#
Bases:
abc.ABC
Parallel execution engine interface.
Implements a context manager interface, returning execution engine instance.
- class Instance#
Bases:
abc.ABC
Execution engine instance interface.
Can somehow execute
starmap()
function in parallel.- abstract starmap(func, iterable)#
- Parameters:
func (Callable[Ellipsis, ResultT]) –
iterable (Iterable) –
- Return type:
List[ResultT]
- abstract __enter__()#
- Return type:
- abstract __exit__(_type, _value, _traceback)#
- class bosk.executor.parallel.greedy.JoblibParallelEngine(n_threads=-1, backend=None, prefer='threads')#
Bases:
ParallelEngine
Joblib-based Parallel Engine.
- Parameters:
n_threads (int) – Number of threads.
backend (Optional[str]) – Joblib backend to use.
prefer (Optional[str]) – Soft hint what to prefer (threads or processes).
- class JoblibInstance(parallel)#
Bases:
ParallelEngine
Execution engine instance interface.
Can somehow execute
starmap()
function in parallel.- Parameters:
parallel (joblib.Parallel) –
- starmap(func, iterable)#
- __enter__()#
- Return type:
- __exit__(_type, _value, _traceback)#
- class bosk.executor.parallel.greedy.MultiprocessingParallelEngine(n_threads=None)#
Bases:
ParallelEngine
Multiprocessing-based thread pool execution engine.
- Parameters:
n_threads (Optional[int]) – Number of threads.
- class MPInstance(pool)#
Bases:
ParallelEngine
Execution engine instance interface.
Can somehow execute
starmap()
function in parallel.- Parameters:
pool (multiprocessing.pool.ThreadPool) –
- starmap(func, iterable)#
- __enter__()#
- Return type:
- __exit__(_type, _value, _traceback)#
- class bosk.executor.parallel.greedy.GreedyParallelExecutor(pipeline, stage, inputs=None, outputs=None, slot_handler=None, block_executor=None, parallel_engine=MultiprocessingParallelEngine())#
Bases:
bosk.executor.base.BaseExecutor
The recursive executor implementation.
Considers only input-output slots information to match slots.
- _conn_map#
Pipeline connections, represented as a hash map, the keys are blocks’ input slots, the values are output ones. Each input slot corresponds no more than one output slot, so this representation is correct.
- Parameters:
pipeline (bosk.pipeline.BasePipeline) – Sets
BaseExecutor.__pipeline
.stage (bosk.executor.base.Stage) – Sets
BaseExecutor.__stage
,inputs (Optional[Sequence[str]]) – Sets
BaseExecutor.__inputs
.outputs (Optional[Sequence[str]]) – Sets
BaseExecutor.__outputs
.slot_handler (Optional[bosk.executor.base.BaseSlotHandler]) – Sets
BaseExecutor.__slot_handler
with _prepare_slot_handler method.block_executor (Optional[bosk.executor.base.BaseBlockExecutor]) – Sets
BaseExecutor.__block_executor
with _prepare_block_executor method.parallel_engine (ParallelEngine) –
- property outputs: Optional[frozenset[str]]#
Getter for the executor’s ouputs set. None if there are no restrictions on the pipeline’s outputs.
- Return type:
Optional[frozenset[str]]
- _conn_map :Mapping[bosk.block.base.BlockInputSlot, bosk.block.base.BlockOutputSlot]#
- _prepare_out_to_in_edges()#
Prepare the mapping from output slots to list of input slots.
- Returns:
Dictionary with output slots as keys, lists of the corresponding input slots as values.
- Return type:
Dict[bosk.block.base.BlockOutputSlot, List[bosk.block.base.BlockInputSlot]]
- _get_blocks(output_slots)#
Get all blocks that should be executed.
- Parameters:
output_slots (Set[bosk.block.base.BlockOutputSlot]) – Set of output slots.
- Returns:
Set of the pipeline blocks.
- Return type:
- _prepare_inputs_by_block()#
Prepare the mapping from blocks to their inputs.
- Returns:
Dictionary with blocks as keys, sets of the corresponding input slots as values.
- Return type:
Dict[bosk.block.base.BaseBlock, Set[bosk.block.base.BlockInputSlot]]
- _prepare_inputs(block, input_slot_values)#
Prepare the mapping of inputs needed for the block.
- Parameters:
block – The block for which input values are needed.
input_slot_values (Mapping[bosk.block.base.BlockInputSlot, bosk.data.BaseData]) – Mapping from input slots to the corresponding data.
- Returns:
Mapping from input slots to the corresponding data for the given block.
- Return type:
- _compute_all_plain(blocks, computed_values)#
Filter plain blocks and compute them.
It is assumed that plain block execution is computationally effortless.
- Parameters:
blocks (Sequence[bosk.block.base.BaseBlock]) – Blocks that potentially can be computed (not necessarily plain).
computed_values (Mapping[bosk.block.base.BlockInputSlot, bosk.data.BaseData]) –
- Returns:
Mapping from BlockOutputSlot to Data.
- Return type:
Mapping[bosk.block.base.BlockOutputSlot, bosk.data.BaseData]
- _compute_all_parallel(blocks, computed_values, parallel)#
Filter blocks that can be computed in parallel and compute them.
- Parameters:
blocks (Sequence[bosk.block.base.BaseBlock]) – All blocks that potentially can be computed.
computed_values (Mapping[bosk.block.base.BlockInputSlot, bosk.data.BaseData]) –
parallel (ParallelEngine) –
- Returns:
Mapping from BlockOutputSlot to Data.
- Return type:
Mapping[bosk.block.base.BlockOutputSlot, bosk.data.BaseData]
- _compute_all_non_threadsafe(blocks, computed_values)#
Filter blocks that are not plain and cannot be computed in parallel, and compute them.
- Parameters:
blocks (Sequence[bosk.block.base.BaseBlock]) – All blocks that potentially can be computed.
computed_values (Mapping[bosk.block.base.BlockInputSlot, bosk.data.BaseData]) –
- Returns:
Mapping from BlockOutputSlot to Data.
- Return type:
OutSlotToData
- _clean_unnecessary_data(computed_values, remaining_blocks)#
Remove the intermediate data (execution results) that will not be required in the future.
- Parameters:
computed_values (Dict[bosk.block.base.BlockInputSlot, bosk.data.BaseData]) – Dictionary of already computed values.
remaining_blocks (Set[bosk.block.base.BaseBlock]) – Set of blocks that should be computed in the next steps.
Returns:
- _find_ready_blocks(computed_values, remaining_blocks)#
Find the blocks for which required inputs are already computed.
- Parameters:
computed_values (Dict[bosk.block.base.BlockInputSlot, bosk.data.BaseData]) – Mapping from input slots to the corresponding computed data.
remaining_blocks (Set[bosk.block.base.BaseBlock]) – Set of blocks which haven’t been computed yet.
- Returns:
List of blocks which are ready to be computed.
- Return type:
- __append_outputs(output_values, computed_values, output_slots, new_outputs)#
Append newly computed outputs.
- Parameters:
output_values (Dict[bosk.block.base.BlockOutputSlot, bosk.data.BaseData]) – Final output values (will be modified).
computed_values (Dict[bosk.block.base.BlockInputSlot, bosk.data.BaseData]) – Computed values required for following blocks computation (will be modified).
output_slots (Set[bosk.block.base.BlockOutputSlot]) – Set of output slots.
new_outputs (bosk.block.base.BlockOutputData) – Newly computed outputs.
- __execute_with_parallel(input_values, parallel)#
Pipeline execution with given parallel engine instance.
- Parameters:
input_values (Mapping[str, bosk.data.BaseData]) – Input values data mapping.
parallel (ParallelEngine) – Parallel engine instance.
- Returns:
Dictionary with output slots as keys and computed data as values.
- Return type:
- execute(input_values)#
Executes the pipeline given BaseData inputs and return BaseData output values.
- Parameters:
input_values (Mapping[str, bosk.data.BaseData]) – Input data.
- Returns:
Calculated output data dictionary that maps output names to the data.
- Return type:
Dict[str, bosk.data.BaseData]