Skip to content

chain

Chain execution service module

Attributes

Classes

ChainService

ChainService(
    sa_session: SASession, neo4j_session: NJSession
)

Bases: BaseService

Eleanor LLM chain execution service layer

Functions

invoke
invoke(
    chain_ref: Union[str, RunnableSerializable],
    templates: List[Dict[str, Any]],
    max_tries: Optional[int] = None,
    trace_session_id: Optional[str] = None,
    trace_user_id: Optional[str] = None,
    trace_metadata: (
        Union[ChainTraceMetadata, Dict] | None
    ) = None,
    trace_tags: List[str] | None = None,
    trace_id: Optional[str] = None,
) -> InvokeServiceResponse

Invokes a chain by executing a batch of inputs.

Parameters:

  • chain_ref (str) –

    The reference of the chain to invoke.

  • templates (List[Dict[str, Any]]) –

    The list of input templates to be processed by the chain.

  • max_tries (int, default: None ) –

    The maximum number of retries for executing the chain. Defaults to 1.

  • trace_session_id (str, default: None ) –

    The session ID for tracing purposes. Defaults to None.

  • trace_user_id (str, default: None ) –

    The user ID for tracing purposes. Defaults to None.

  • trace_metadata (Union[ChainTraceMetadata, Dict] | None, default: None ) –

    The metadata for tracing purposes. Defaults to None.

  • trace_tags (List[str], default: None ) –

    The list of tags for tracing purposes. Defaults to [].

Returns:

  • InvokeServiceResponse ( InvokeServiceResponse ) –

    The response object containing the results of the chain invocation.

invoke_and_process
invoke_and_process(
    chain_ref: Union[str, RunnableSerializable],
    templates: List[Dict[str, Any]],
    success_callback: (
        Callable[[int, InvokeServiceResponseRecord], None]
        | None
    ) = None,
    failure_callback: (
        Callable[[int, InvokeServiceResponseRecord], None]
        | None
    ) = None,
    units_of_work_ref: List[Any] | None = None,
    max_tries: int | None = None,
    trace_session_id: str | None = None,
    trace_user_id: str | None = None,
    trace_metadata: (
        Union[ChainTraceMetadata, Dict] | None
    ) = None,
    trace_tags: List[str] | None = None,
    trace_id: str | None = None,
    raise_on_error: bool = False,
    log_level: int = WARNING,
) -> InvokeServiceResponse

Wrapper around the chain_service.invoke method that handles success and failure cases and filters units of work.

Examples:

def handle_success(uow_index: int, response: InvokeServiceResponseRecord):
    print(f"Success at index {index}: {response_record}")
        def handle_failure(uow_index: int, response: InvokeServiceResponseRecord):
    print(f"Failure at index {index}: {response_record}")
        units_of_work = [{"input": "data1"}, {"input": "data2"}, {"input": "data3"}]
        response = chain_service.invoke_and_process(
    chain_ref="my_chain",
    templates=units_of_work,
    success_callback=handle_success,
    failure_callback=handle_failure,
    units_of_work_ref=units_of_work,
    max_tries=3,
    trace_session_id="session_123",
    trace_user_id="user_456",
    trace_metadata={"dynamic": True},
    trace_tags=["tag1", "tag2"],
    trace_id="trace_789",
    raise_on_error=False,
    log_level=logging.WARNING,
)

Parameters:

  • chain_ref (Union[str, RunnableSerializable]) –

    The reference to the chain.

  • templates (List[Dict[str, Any]]) –

    A list of dictionaries used as templates for invocation.

  • success_callback (Callable[[int, InvokeServiceResponseRecord], None], default: None ) –

    Callback for successful responses.

  • failure_callback (Callable[[int, InvokeServiceResponseRecord], None], default: None ) –

    Callback for failed responses.

  • units_of_work_ref (List[Any] | None, default: None ) –

    Reference to a list that will have failed units removed. Defaults to None.

  • max_tries (int | None, default: None ) –

    Maximum number of tries for the invocation. Defaults to None.

  • trace_session_id (str | None, default: None ) –

    Session identifier for the trace. Defaults to None.

  • trace_user_id (str | None, default: None ) –

    User identifier for the trace. Defaults to None.

  • trace_metadata (Union[ChainTraceMetadata, Dict] | None, default: None ) –

    Metadata for tracing. Defaults to None.

  • trace_tags (List[str] | None, default: None ) –

    Tags related to the trace. Defaults to None.

  • trace_id (str | None, default: None ) –

    Trace identifier for the invocation. Defaults to None.

  • raise_on_error (bool, default: False ) –

    If True, raises a ValueError on failure; otherwise, logs the error. Defaults to False.

  • log_level (int, default: WARNING ) –

    The logging level to use when logging errors. Defaults to logging.WARNING.

Returns:

  • InvokeServiceResponse ( InvokeServiceResponse ) –

    The response object returned by the chain_service.invoke method.

map_reduce
map_reduce(
    doc_producer: Iterable[str],
    base_template: Dict[str, Any],
    reducer_chain_ref: str,
    chain_documents_key: str = FIELD_DOCUMENTS,
    chain_documents_formatter: Optional[
        Callable[[List[str]], str]
    ] = None,
    reduce_batch_size: int = 200,
    max_mr_iterations: int = 10,
    trace_session_id: Optional[str] = None,
    trace_user_id: Optional[str] = None,
    trace_metadata: ChainTraceMetadata | None = None,
    trace_tags: List[str] | None = None,
    trace_id: Optional[str] = None,
) -> str

Perform a map-reduce operation using a reducer chain.

Parameters:

  • doc_producer (Iterable[str]) –

    An iterable of document strings to be processed.

  • base_template (Dict[str, Any]) –

    The base template used for the map-reduce operation.

  • reducer_chain_ref (str) –

    The reference to the reducer chain to be used.

  • chain_documents_key (str, default: FIELD_DOCUMENTS ) –

    The key used to access the documents in the chain. Defaults to FIELD_DOCUMENTS.

  • chain_documents_formatter (Optional[Callable[[List[str]], str]], default: None ) –

    A formatter function to format the chain documents. Defaults to None.

  • reduce_batch_size (int, default: 200 ) –

    The batch size used for the reduce operation. Defaults to 200.

  • max_mr_iterations (int, default: 10 ) –

    The maximum number of map-reduce iterations. Defaults to 10.

  • trace_session_id (Optional[str], default: None ) –

    The session ID for tracing purposes. Defaults to None.

  • trace_user_id (Optional[str], default: None ) –

    The user ID for tracing purposes. Defaults to None.

  • trace_metadata (ChainTraceMetadata | None, default: None ) –

    The metadata for tracing purposes. Defaults to None.

  • trace_tags (List[str] | None, default: None ) –

    The tags for tracing purposes. Defaults to None.

  • trace_id (Optional[str], default: None ) –

    The ID for tracing purposes. Defaults to None.

Returns:

  • str ( str ) –

    The result of the map-reduce operation.

ChainTraceMetadata

Bases: BaseModel

Langfuse chain execution trace metadata object

Attributes

chain class-attribute instance-attribute
chain: Dict = Field(
    default_factory=dict, description="Chain metadata"
)
chat class-attribute instance-attribute
chat: Dict = Field(
    default_factory=dict, description="Chat context"
)
memories class-attribute instance-attribute
memories: Any = Field(
    default_factory=dict, description="Recalled memories"
)

InvokeServiceResponse

Bases: BaseModel

Response object for chain invocation

Attributes

completion_tokens class-attribute instance-attribute
completion_tokens: Optional[int] = Field(
    default=None,
    title="Completion tokens",
    description="Completion tokens used in chain",
)
execute_seconds class-attribute instance-attribute
execute_seconds: float = Field(
    default=0.0, title="Execution time in seconds"
)
prompt_tokens class-attribute instance-attribute
prompt_tokens: Optional[int] = Field(
    default=None,
    title="Prompt tokens",
    description="Prompt tokens used in chain",
)
result_iterator property
result_iterator: Iterator[
    Tuple[int, InvokeServiceResponseRecord]
]

Returns an iterator over the items in the results_map.

Note that we are using our own custom property since we don’t want to break compatibility of iter in the base Pydantic model.

Yields:

  • Tuple[int, InvokeServiceResponseRecord]

    Tuple[int, InvokeServiceResponseRecord]: A tuple containing an integer key and a record from the results_map.

results_map class-attribute instance-attribute
results_map: Dict[int, InvokeServiceResponseRecord] = Field(
    default_factory=dict,
    title="Invocation results",
    description="A dictionary of invocation results, where the key is the index of the input in the original input list, and the value is the response record.",
)
total_failure class-attribute instance-attribute
total_failure: int = Field(
    default=0, title="Number of failed invocations"
)
total_retries class-attribute instance-attribute
total_retries: int = Field(
    default=0, title="Total number of retries"
)
total_success class-attribute instance-attribute
total_success: int = Field(
    default=0, title="Number of successful invocations"
)
total_tokens class-attribute instance-attribute
total_tokens: Optional[int] = Field(
    default=None,
    title="Total tokens",
    description="Total tokens used in chain",
)
trace_id class-attribute instance-attribute
trace_id: str = Field(
    default_factory=lambda: gen_uuid("trace_id."),
    title="Trace identifier",
)

InvokeServiceResponseRecord

Bases: BaseModel

Response object that contains result details of a chain invocation

Attributes

echo_fields class-attribute instance-attribute
echo_fields: Dict[str, Any] = Field(
    default_factory=dict,
    title="Echo Fields",
    description="Contains a copy of the input fields used to render the prompt template.",
)
error_count class-attribute instance-attribute
error_count: int = Field(
    default=0,
    title="Error count",
    description="The number of errors that occurred during the invocation, will be a number between 0 and less than max_retries",
)
raw class-attribute instance-attribute
raw: Any = Field(
    default=None,
    title="Raw response object",
    description="Raw output from LLM before it is converted to text. The intended use case is for calling the chain invoke service internally where we don't need to response back with plain text to a HTTP client.",
)
status class-attribute instance-attribute
status: ResponseCode = Field(
    default=LLM_ERROR,
    title=__doc__,
    description="When 0, the invocation was successful and the results will be in the text field. Otherwise, an error occurred and the text field will contain the error message.",
)
text class-attribute instance-attribute
text: str = Field(
    default="",
    title="Language model response",
    description="",
)

Functions