Source code for evalquiz_pipeline_server.pipeline_execution.pipeline_executor

from collections import defaultdict
from typing import Any, AsyncIterator
from evalquiz_pipeline_server.pipeline_execution.pipeline import Pipeline
from evalquiz_pipeline_server.pipeline_execution.pipeline_execution import (
    PipelineExecution,
)
from evalquiz_pipeline_server.pipeline_execution.internal_pipeline_module import (
    InternalPipelineModule,
)
from evalquiz_proto.shared.generated import PipelineStatus


[docs]class PipelineExecutor: def __init__( self, pipelines: defaultdict[str, Pipeline] = defaultdict(), ) -> None: """Constructor of PipelineExecutor. Args: pipelines (defaultdict[str, Pipeline], optional): Available pipelines to execute. Defaults to defaultdict(). """ self.pipelines: defaultdict[str, Pipeline] = pipelines
[docs] def add_pipeline( self, reference: str, pipeline_modules: list[InternalPipelineModule] ) -> None: """Adds a pipeline to self.pipelines Args: reference (str): Reference that the pipeline can be accessed under. pipeline_modules (list[InternalPipelineModule]): Implementations of pipeline modules describing the pipeline module execution order. """ pipeline = Pipeline(reference, pipeline_modules) self.pipelines[reference] = pipeline
[docs] def delete_pipeline(self, reference: str) -> None: """Deletes a pipeline from self.pipelines Args: reference (str): Reference of pipeline that should be deleted. """ if reference in self.pipelines.keys(): del self.pipelines[reference]
[docs] def run_pipeline(self, reference: str, input: Any) -> AsyncIterator[PipelineStatus]: """Creates and runs a new PipelineExecution on specified pipeline. Args: reference (str): Reference of pipeline that should be executed. input (Any): Input of PipelineExecution. Returns: AsyncIterator[PipelineStatus]: Iterator with PipelineStatus of the current execution. """ pipeline = self.pipelines[reference] pipeline_execution = PipelineExecution(input, pipeline) return pipeline_execution.run()