chain
Chain execution service module
Attributes
Classes
ChainService
Bases: BaseService
Eleanor LLM chain execution service layer
Functions
invoke
invoke(
chain_ref: Union[str, RunnableSerializable],
templates: List[Dict[str, Any]],
max_tries: Optional[int] = None,
trace_session_id: Optional[str] = None,
trace_user_id: Optional[str] = None,
trace_metadata: (
Union[ChainTraceMetadata, Dict] | None
) = None,
trace_tags: List[str] | None = None,
trace_id: Optional[str] = None,
) -> InvokeServiceResponse
Invokes a chain by executing a batch of inputs.
Parameters:
-
chain_ref
(str
) –The reference of the chain to invoke.
-
templates
(List[Dict[str, Any]]
) –The list of input templates to be processed by the chain.
-
max_tries
(int
, default:None
) –The maximum number of retries for executing the chain. Defaults to 1.
-
trace_session_id
(str
, default:None
) –The session ID for tracing purposes. Defaults to None.
-
trace_user_id
(str
, default:None
) –The user ID for tracing purposes. Defaults to None.
-
trace_metadata
(Union[ChainTraceMetadata, Dict] | None
, default:None
) –The metadata for tracing purposes. Defaults to None.
-
trace_tags
(List[str]
, default:None
) –The list of tags for tracing purposes. Defaults to [].
Returns:
-
InvokeServiceResponse
(InvokeServiceResponse
) –The response object containing the results of the chain invocation.
invoke_and_process
invoke_and_process(
chain_ref: Union[str, RunnableSerializable],
templates: List[Dict[str, Any]],
success_callback: (
Callable[[int, InvokeServiceResponseRecord], None]
| None
) = None,
failure_callback: (
Callable[[int, InvokeServiceResponseRecord], None]
| None
) = None,
units_of_work_ref: List[Any] | None = None,
max_tries: int | None = None,
trace_session_id: str | None = None,
trace_user_id: str | None = None,
trace_metadata: (
Union[ChainTraceMetadata, Dict] | None
) = None,
trace_tags: List[str] | None = None,
trace_id: str | None = None,
raise_on_error: bool = False,
log_level: int = WARNING,
) -> InvokeServiceResponse
Wrapper around the chain_service.invoke method that handles success and failure cases and filters units of work.
Examples:
def handle_success(uow_index: int, response: InvokeServiceResponseRecord):
print(f"Success at index {index}: {response_record}")
def handle_failure(uow_index: int, response: InvokeServiceResponseRecord):
print(f"Failure at index {index}: {response_record}")
units_of_work = [{"input": "data1"}, {"input": "data2"}, {"input": "data3"}]
response = chain_service.invoke_and_process(
chain_ref="my_chain",
templates=units_of_work,
success_callback=handle_success,
failure_callback=handle_failure,
units_of_work_ref=units_of_work,
max_tries=3,
trace_session_id="session_123",
trace_user_id="user_456",
trace_metadata={"dynamic": True},
trace_tags=["tag1", "tag2"],
trace_id="trace_789",
raise_on_error=False,
log_level=logging.WARNING,
)
Parameters:
-
chain_ref
(Union[str, RunnableSerializable]
) –The reference to the chain.
-
templates
(List[Dict[str, Any]]
) –A list of dictionaries used as templates for invocation.
-
success_callback
(Callable[[int, InvokeServiceResponseRecord], None]
, default:None
) –Callback for successful responses.
-
failure_callback
(Callable[[int, InvokeServiceResponseRecord], None]
, default:None
) –Callback for failed responses.
-
units_of_work_ref
(List[Any] | None
, default:None
) –Reference to a list that will have failed units removed. Defaults to None.
-
max_tries
(int | None
, default:None
) –Maximum number of tries for the invocation. Defaults to None.
-
trace_session_id
(str | None
, default:None
) –Session identifier for the trace. Defaults to None.
-
trace_user_id
(str | None
, default:None
) –User identifier for the trace. Defaults to None.
-
trace_metadata
(Union[ChainTraceMetadata, Dict] | None
, default:None
) –Metadata for tracing. Defaults to None.
-
trace_tags
(List[str] | None
, default:None
) –Tags related to the trace. Defaults to None.
-
trace_id
(str | None
, default:None
) –Trace identifier for the invocation. Defaults to None.
-
raise_on_error
(bool
, default:False
) –If True, raises a ValueError on failure; otherwise, logs the error. Defaults to False.
-
log_level
(int
, default:WARNING
) –The logging level to use when logging errors. Defaults to logging.WARNING.
Returns:
-
InvokeServiceResponse
(InvokeServiceResponse
) –The response object returned by the chain_service.invoke method.
map_reduce
map_reduce(
doc_producer: Iterable[str],
base_template: Dict[str, Any],
reducer_chain_ref: str,
chain_documents_key: str = FIELD_DOCUMENTS,
chain_documents_formatter: Optional[
Callable[[List[str]], str]
] = None,
reduce_batch_size: int = 200,
max_mr_iterations: int = 10,
trace_session_id: Optional[str] = None,
trace_user_id: Optional[str] = None,
trace_metadata: ChainTraceMetadata | None = None,
trace_tags: List[str] | None = None,
trace_id: Optional[str] = None,
) -> str
Perform a map-reduce operation using a reducer chain.
Parameters:
-
doc_producer
(Iterable[str]
) –An iterable of document strings to be processed.
-
base_template
(Dict[str, Any]
) –The base template used for the map-reduce operation.
-
reducer_chain_ref
(str
) –The reference to the reducer chain to be used.
-
chain_documents_key
(str
, default:FIELD_DOCUMENTS
) –The key used to access the documents in the chain. Defaults to FIELD_DOCUMENTS.
-
chain_documents_formatter
(Optional[Callable[[List[str]], str]]
, default:None
) –A formatter function to format the chain documents. Defaults to None.
-
reduce_batch_size
(int
, default:200
) –The batch size used for the reduce operation. Defaults to 200.
-
max_mr_iterations
(int
, default:10
) –The maximum number of map-reduce iterations. Defaults to 10.
-
trace_session_id
(Optional[str]
, default:None
) –The session ID for tracing purposes. Defaults to None.
-
trace_user_id
(Optional[str]
, default:None
) –The user ID for tracing purposes. Defaults to None.
-
trace_metadata
(ChainTraceMetadata | None
, default:None
) –The metadata for tracing purposes. Defaults to None.
-
trace_tags
(List[str] | None
, default:None
) –The tags for tracing purposes. Defaults to None.
-
trace_id
(Optional[str]
, default:None
) –The ID for tracing purposes. Defaults to None.
Returns:
-
str
(str
) –The result of the map-reduce operation.
ChainTraceMetadata
Bases: BaseModel
Langfuse chain execution trace metadata object
Attributes
chain
class-attribute
instance-attribute
chat
class-attribute
instance-attribute
InvokeServiceResponse
Bases: BaseModel
Response object for chain invocation
Attributes
completion_tokens
class-attribute
instance-attribute
completion_tokens: Optional[int] = Field(
default=None,
title="Completion tokens",
description="Completion tokens used in chain",
)
execute_seconds
class-attribute
instance-attribute
prompt_tokens
class-attribute
instance-attribute
prompt_tokens: Optional[int] = Field(
default=None,
title="Prompt tokens",
description="Prompt tokens used in chain",
)
result_iterator
property
result_iterator: Iterator[
Tuple[int, InvokeServiceResponseRecord]
]
Returns an iterator over the items in the results_map.
Note that we are using our own custom property since we don’t want to break compatibility of iter in the base Pydantic model.
Yields:
-
Tuple[int, InvokeServiceResponseRecord]
–Tuple[int, InvokeServiceResponseRecord]: A tuple containing an integer key and a record from the results_map.
results_map
class-attribute
instance-attribute
results_map: Dict[int, InvokeServiceResponseRecord] = Field(
default_factory=dict,
title="Invocation results",
description="A dictionary of invocation results, where the key is the index of the input in the original input list, and the value is the response record.",
)
total_failure
class-attribute
instance-attribute
total_retries
class-attribute
instance-attribute
total_success
class-attribute
instance-attribute
total_tokens
class-attribute
instance-attribute
total_tokens: Optional[int] = Field(
default=None,
title="Total tokens",
description="Total tokens used in chain",
)
trace_id
class-attribute
instance-attribute
trace_id: str = Field(
default_factory=lambda: gen_uuid("trace_id."),
title="Trace identifier",
)
InvokeServiceResponseRecord
Bases: BaseModel
Response object that contains result details of a chain invocation
Attributes
echo_fields
class-attribute
instance-attribute
echo_fields: Dict[str, Any] = Field(
default_factory=dict,
title="Echo Fields",
description="Contains a copy of the input fields used to render the prompt template.",
)
error_count
class-attribute
instance-attribute
error_count: int = Field(
default=0,
title="Error count",
description="The number of errors that occurred during the invocation, will be a number between 0 and less than max_retries",
)
raw
class-attribute
instance-attribute
raw: Any = Field(
default=None,
title="Raw response object",
description="Raw output from LLM before it is converted to text. The intended use case is for calling the chain invoke service internally where we don't need to response back with plain text to a HTTP client.",
)
status
class-attribute
instance-attribute
status: ResponseCode = Field(
default=LLM_ERROR,
title=__doc__,
description="When 0, the invocation was successful and the results will be in the text field. Otherwise, an error occurred and the text field will contain the error message.",
)