engine
EleanorAI Task Execution Framework
This module implements a lightweight task execution framework that allows for both checkpointing, rollbacks, and retries on long-running operations.
Concepts:
- Overview of the TaskEngines and stages
- Pluggable TaskIO interface to support object storage in the future
- Persistent state management
- TaskEngine and stage binding
Attributes
Classes
BaseStage
Bases: ABC
, BaseModel
, Generic[STAGE_STATE]
Base task stage implementation
This class is thread-safe.
Note
STAGE_INPUT and STAGE_OUTPUT must by JSON-serializable models Pydantic models where all attributes are optional; these will be managed by the actual task execution engine.
Attributes
description
class-attribute
instance-attribute
description: str = Field(
default="",
title="Stage Description",
description="Human readable stage description intended for short, inline documentation",
)
model_config
class-attribute
instance-attribute
obj_name
property
Return the object name for the current stage.
The stage must be bound to a task before this property can be used.
Returns:
-
str
(str
) –The object name for the current stage.
stage_name
class-attribute
instance-attribute
stage_name: str = Field(
...,
min_length=1,
max_length=STAGE_NAME_MAX_LENGTH,
title="Stage Name",
description="Unique stage name / identifier. This field will be used to generate the statemetadata file/object and persisted during execution",
)
state
class-attribute
instance-attribute
state: STAGE_STATE = Field(
..., title="Stage State", description="Stage state data"
)
timezone
class-attribute
instance-attribute
timezone: str = Field(
default="UTC",
title="Timezone",
description="Timezone to use in history entries",
)
url
property
Generate and return the URL for the current stage.
Returns:
-
str
(str
) –The URL for the current stage.
volatile_engine_ref
class-attribute
instance-attribute
volatile_engine_ref: Optional[CheckMateEngine] = Field(
default=None,
title="Task",
description="Task object reference (volatile), this is set by the Task execution engine",
exclude=True,
)
Functions
bind_to_engine
bind_to_engine(
task_engine: CheckMateEngine,
) -> BaseStage[STAGE_STATE]
Bind the stage to a TaskEngine. If the stage state exists, it will be loaded from persistent storage.
Parameters:
-
task_engine
(TaskEngine
) –The task engine to bind the stage to.
Returns:
-
BaseStage[STAGE_STATE]
–BaseStage[STAGE_STATE]: The bound stage instance.
Raises:
-
TaskEngineException
–If the stage is already bound to a TaskEngine.
cancel
When a task is canceled, the cancel method on the last incomplete stage is triggered to perform cleanup.
execute_stage
abstractmethod
Executes the task stage
Warning
Returns nothing on success and raises an exception on failure. Do not update state status inside this method, all state updates and retries are handled by the TaskExecution process.
Warning
Do not modify previous_state_copy
as it may contain important state
information needed for rollback operations.
Note
The previous_state_copy
parameter is provided to allow the stage to access
inputs and outputs from previous stages. It is the responsibility of classes
that implement BaseStage to decide exactly what is passed via state as each
stage is executed. On the first stage, the previous_state
will be a
reference to stage.state
.
gen_obj_name
gen_obj_name(
engine: Optional[CheckMateEngine] = None,
) -> str
Generate an object name for the current stage.
Parameters:
-
engine
(Optional[TaskEngine]
, default:None
) –An optional TaskEngine instance. If not provided, the instance’s engine will be used.
Returns:
-
str
(str
) –The generated object name.
inc_attempts
Increment the number of attempts for the current stage.
This method increments the attempts
attribute of the state
object by 1
and returns the updated number of attempts.
Returns:
-
int
(int
) –The updated number of attempts.
persistence_file_exists
Check if the stage persistence state exists in the task’s input/output system.
Returns:
-
bool
(bool
) –True if the persistence file exists, False otherwise.
rollback_stage
When a processing error occurs, attempt to rollback to a stable state
Rollbacks are triggered when the stage execution encounters an error. When a rollback operation needed, subclasses can implement this method.
update_status
update_status(
message: str,
status: StageStatus,
begin_ts: float,
end_ts: float | None = None,
) -> None
Updates the status of the stage and records the change in history.
This method derived the correct new state based on the change that is taking place and writes to persistent storage.
Parameters:
-
message
(str
) –A message describing the status update.
-
status
(StageStatus
) –The new status to be set for the stage.
-
begin_ts
(float
) –The timestamp when the status update begins.
-
end_ts
(float
, default:None
) –The timestamp when the status update ends.
Returns:
-
None
–None
BindLock
BindLock(
task_io: TaskIO,
obj_name: str,
acquire_timeout: float | None = None,
)
A lock handler that uses the TaskIO interface to lock persistent task state when it is bound to a task engine.
Attributes:
-
task_io
(TaskIO
) –The storage interface for locking operations.
-
obj_name
(str
) –The object name used as the lock file identifier.
-
acquire_timeout
(float | None
) –Maximum time (in seconds) to wait for the lock.
-
_is_locked
(bool
) –Tracks the lock status internally.
-
_thread_lock
(Lock
) –Ensures thread-safe access to acquire and release methods.
Initializes a lock file handler.
Parameters:
-
task_io
(TaskIO
) –The storage interface to use.
-
obj_name
(str
) –The object name used as the lock file identifier.
-
acquire_timeout
(float | None
, default:None
) –The maximum time (in seconds) to wait for the lock. If None, attempts to acquire the lock immediately.
Attributes
Functions
__enter__
__enter__() -> BindLock
Acquires the lock for use within a with
statement.
Raises:
-
TaskEngineException
–If the lock file cannot be acquired within the timeout.
Returns:
-
BindLock
(BindLock
) –The instance itself for use in a
with
statement.
__exit__
__exit__(
exc_type: Type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None
Releases the lock upon exiting a with
statement.
Parameters:
-
exc_type
(Type[BaseException] | None
) –The exception type, if any.
-
exc_value
(BaseException | None
) –The exception value, if any.
-
traceback
(TracebackType | None
) –The traceback, if any.
acquire
Attempts to acquire the lock by writing to the specified task I/O object. Waits up to a specified timeout if the lock is already in use.
This method is thread-safe.
Raises:
-
BindException
–If the lock cannot be acquired within the timeout.
CheckMateEngine
CheckMateEngine(
io: TaskIO,
task_id: str | None = None,
stage_templates: List[STAGE] | None = None,
retry_policy: RetryPolicy = NONE,
max_attempts: int = 1,
bind_lock_timeout: float | None = None,
auto_cleanup: bool = False,
no_lock: bool = False,
force_create: bool = False,
)
Bases: Generic[STAGE, STAGE_RESULT]
Multi-stage task execution framework
The scope of the task engine is to manage the execution of a series of stages. Each stage is executed sequentially until it is completed, canceled by the user, exhausts retries, or encounters an unrecoverable error.
The intended use case is to provide a lightweight application-level framework that can execute a series of steps with builtin checkpoint and resume capabilities. Ideally, the TaskEngine is integrated into application logic and invoked via an external orchestration and resource management framework such as Apache Airflow.
Warning
The TaskEngine
class is not thread-safe and should not be shared between
threads. A future enhancement may add thread safety to support parallel stage
execution however this arguably should be the responsibility of the
orchestration framework.
Initializes a TaskEngine instance.
Parameters:
-
stage_templates
(List[BaseStage]
, default:None
) –A list of stage templates to be used in the task.
-
io
(TaskIO
) –The input/output handler for the task.
-
task_id
(str | None
, default:None
) –The unique identifier for the task. Defaults to None.
-
retry_policy
(RetryPolicy
, default:NONE
) –The policy for retrying the task. Defaults to RetryPolicy.NONE.
-
max_attempts
(int
, default:1
) –The maximum number of attempts for the task. Defaults to 1.
-
bind_lock_timeout
(float | None
, default:None
) –The timeout for acquiring the bind lock. Defaults to None.
-
auto_cleanup
(bool
, default:False
) –Whether to automatically clean up after the task. Note that this will force a cleanup operation to happen on unbind even if the task did not complete successfully. Defaults to False.
-
no_lock
(bool
, default:False
) –Whether to disable the bind lock. Defaults to False.
-
force_create
(bool
, default:False
) –Whether to force the creation of a new task. This allows an external process to generate the task_id and pass it in. The TaskEngine will use this flag to assume that this is always a new task. Defaults to False.
Raises:
-
TaskEngineException
–If no stage templates are provided.
-
TaskEngineException
–If the retry policy is NONE and max_attempts is not 1.
-
TaskEngineException
–If max_attempts is less than 1.
Attributes
cumulative_attempts
property
Return the cumulative number of attempts for all stages
Returns:
-
int
(int
) –The cumulative number of attempts for all stages
has_lockfile
property
Checks for the existence of a lock file, even when the CheckMate engine is not bound. Note that you should not interpret the presence of a lockfile to mean that there is still a running process.
lock_obj_name
property
Generates a lock object name for the task.
The lock object name is generated using the task ID, a maximum length for the stage name, and a specific file extension for lock files.
Returns:
-
str
(str
) –The generated lock object name.
status
property
status: StageStatus
Return the current status of the overall task by inspecting the status of each stage.
This method validates the state transitions between consecutive stages using the _StageStateMachine to ensure they are valid. It then determines the overall status of the task based on the statuses of all stages.
Returns:
-
StageStatus
(StageStatus
) –The overall status of the task, which can be one of the following:
-
StageStatus
–- StageStatus.COMPLETE: All stages are complete.
-
StageStatus
–- StageStatus.RUNNING: At least one stage is running.
-
StageStatus
–- StageStatus.RETRY: At least one stage is in retry state.
-
StageStatus
–- StageStatus.ERROR: At least one stage has encountered an error.
-
StageStatus
–- StageStatus.CANCELED: At least one stage has been canceled.
-
StageStatus
–- StageStatus.UNBOUND: At least one stage is unbound.
-
StageStatus
–- StageStatus.CREATED: All stages are in the created state or no other status applies.
Raises:
-
TaskEngineException
–If an invalid state transition is detected between any two consecutive stages.
task_result
property
task_result: STAGE_RESULT | None
Retrieves the result state of the last completed task.
This method iterates through the stages and returns the result of the most recently completed stage. If no stages are completed, it returns None.
Returns:
-
STAGE_RESULT
(STAGE_RESULT | None
) –The result state of the last completed task, or None if no
-
STAGE_RESULT | None
–task is completed.
task_state
instance-attribute
task_state: TaskEngineState[STAGE] = TaskEngineState(
stage_template_classes=[
string_from_callable(x) for x in stage_templates
],
stage_templates=[
model_copy(deep=True) for x in stage_templates
],
)
Functions
__enter__
__enter__() -> CheckMateEngine
Enter the runtime context related to a TaskEngine instance.
Returns:
-
TaskEngine
(CheckMateEngine
) –A reference to the current instance of the TaskEngine.
__exit__
__exit__(
exc_type: Type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> Literal[False]
Releases the bind lock upon exiting a with
statement.
Parameters:
-
exc_type
(Type[BaseException] | None
) –The exception type, if an exception was raised.
-
exc_value
(BaseException | None
) –The exception instance, if an exception was raised.
-
traceback
(TracebackType | None
) –The traceback object, if an exception was raised.
Returns:
-
Literal[False]
–Literal[False]: Always returns False to indicate that exceptions should propagate.
bind
Binds the task engine to the task, acquiring the bind lock.
This method performs the following actions: - Checks if the task engine is already bound to a task and raises an exception if it is. - Verifies that all stages have unique names and raises an exception if they do not. - Acquires the bind lock for the task in persistent storage. - Binds the stages to the task engine, initializing all stages and reading from persistent storage if necessary.
Raises:
-
TaskEngineException
–If the task engine is already bound to a task or if stage names are not unique.
cancel
Cancels the task by setting all stages to a CANCELED state.
This method finds the starting stage index and updates the status of the corresponding stage to CANCELED. It also sets a cancellation message and the current timestamp.
Raises:
-
TaskEngineException
–If the task is in a state that cannot be canceled.
Returns:
-
None
–None
check_sync
Verifies whether the in-memory state and persistence storage state are in sync
Returns:
-
bool
(bool
) –True if the stages from memory and persistence are the same, False otherwise.
dump_stages
dump_stages(
source: Literal["memory", "persistence"] = "memory",
color=LIGHTWHITE_EX,
basenames_only: bool = False,
source_in_heading: bool = False,
) -> str
Generates a Markdown-formatted summary of all stages, their state data, and persistent storage objects.
The intended use case for this method is unit testing and debugging.
Parameters:
-
color
–The color to use for the structured log output.
-
source
(Literal['memory', 'persistence']
, default:'memory'
) –The source to use for stage state data. If “memory”, the current state data is used. If “persistence”, the state data is loaded from persistent storage.
-
basenames_only
(bool
, default:False
) –If True, only the basenames of the files will be displayed.
-
source_in_heading
(bool
, default:False
) –If True, the source will be included in the heading. Set this to False when you want to compare memory and persistence source strings.
Raises:
-
TaskEngineException
–If an invalid source is specified.
Returns:
-
str
(str
) –The structured log entries for all stages.
execute
Reads state data (if any) and executes the task beginning from the last successful stage
Parameters:
-
strict_recovery
(bool
, default:False
) –When running execute and a stage is already marked as RUNNING, this typically means that the previous execute call to persistence store was not successful and the stage is effectively in an unknown state. When this is set to True, the CheckMateEngine treats these as warnings and will attempt to re-execute, otherwise an exception will be raised. Defaults to False.
log_stages
log_stages(
source: Literal["memory", "persistence"] = "memory",
color=LIGHTWHITE_EX,
basenames_only: bool = False,
) -> None
Logs a Markdown-formatted summary of all stages, their state data, and persistent storage objects.
The intended use case for this method is unit testing and debugging.
Parameters:
-
color
–The color to use for the structured log output.
-
source
(Literal['memory', 'persistence']
, default:'memory'
) –The source to use for stage state data. If “memory”, the current state data is used. If “persistence”, the state data is loaded from persistent storage.
-
basenames_only
(bool
, default:False
) –If True, only the basenames of the files will be displayed.
Raises:
-
TaskEngineException
–If an invalid source is specified.
Returns:
-
str
(None
) –The structured log entries for all stages
state_update_hook
Hook that is called when a stage updates the task state.
Subclasses should override this method to perform additional actions when state is updated.
unbind
Unbinds the task engine from the task, releasing the bind lock.
This method restores the original stage templates and releases the bind lock. It raises a TaskEngineException if the task engine is not currently bound.
Parameters:
-
clean
(bool
, default:False
) –If True, state data for all stages will be removed from persistent storage.
Raises:
-
TaskEngineException
–If the task engine is not bound.
TaskEngineState
Bases: BaseModel
, Generic[STAGE]
Attributes
model_config
class-attribute
instance-attribute
stage_template_classes
class-attribute
instance-attribute
stage_template_classes: List[str] = Field(
...,
title="Stage Template Classes",
description="List of stage template classes",
)
stage_templates
class-attribute
instance-attribute
stage_templates: List[STAGE] = Field(
...,
title="Stage Templates",
description="List of stage templates",
)