Source code for evalquiz_pipeline_server.pipeline_execution.pipeline

from evalquiz_proto.shared.exceptions import (
    PipelineModuleCompositionNotValidException,
)
from evalquiz_pipeline_server.pipeline_execution.internal_pipeline_module import (
    InternalPipelineModule,
)


[docs]class Pipeline: def __init__(self, reference: str, pipeline_modules: list[InternalPipelineModule]): """Constructor of Pipeline. Args: reference (str): Reference of pipeline. pipeline_modules (list[InternalPipelineModule]): Implementations of pipeline modules describing the pipeline module execution order. Raises: PipelineModuleCompositionNotValidException """ self.reference = reference self.pipeline_modules = pipeline_modules if ( not self.validate_io_composition() or not self.validate_split_merge_composition() ): raise PipelineModuleCompositionNotValidException()
[docs] def validate_io_composition(self) -> bool: """Validates IO compatibility types of successive InternalPipelineModules. Returns: bool: True, if IO is compatible. """ for first, second in zip(self.pipeline_modules, self.pipeline_modules[1:]): if first.output_datatype != second.input_datatype: return False return True
[docs] def validate_split_merge_composition(self) -> bool: """Validates split-merge compatibility of Pipeline. Returns: bool: True, if split-merge flags are compatible. """ split = False merge = False for pipeline_module_implementation in self.pipeline_modules: if pipeline_module_implementation.split: if split: return False split = True merge = False if pipeline_module_implementation.merge: if not split or merge: return False merge = True split = False if split: return False return True