Skip to content

engine

EleanorAI Task Execution Framework

This module implements a lightweight task execution framework that allows for both checkpointing, rollbacks, and retries on long-running operations.

Concepts:

  • Overview of the TaskEngines and stages
  • Pluggable TaskIO interface to support object storage in the future
  • Persistent state management
  • TaskEngine and stage binding

Attributes

LOCK_EXTENSION module-attribute

LOCK_EXTENSION = 'lock'

STAGE module-attribute

STAGE = TypeVar('STAGE', bound='BaseStage')

STAGE_STATE module-attribute

STAGE_STATE = TypeVar('STAGE_STATE', bound=BaseStageState)

Classes

BaseStage

Bases: ABC, BaseModel, Generic[STAGE_STATE]

Base task stage implementation

This class is thread-safe.

Note

STAGE_INPUT and STAGE_OUTPUT must by JSON-serializable models Pydantic models where all attributes are optional; these will be managed by the actual task execution engine.

Attributes

attempts property
attempts: int

Return the number of attempts for the current stage

description class-attribute instance-attribute
description: str = Field(
    default="",
    title="Stage Description",
    description="Human readable stage description intended for short, inline documentation",
)
engine property

Return the task reference

model_config class-attribute instance-attribute
model_config = ConfigDict(
    extra="forbid",
    strict=False,
    arbitrary_types_allowed=True,
)
obj_name property
obj_name: str

Return the object name for the current stage.

The stage must be bound to a task before this property can be used.

Returns:

  • str ( str ) –

    The object name for the current stage.

stage_name class-attribute instance-attribute
stage_name: str = Field(
    ...,
    min_length=1,
    max_length=STAGE_NAME_MAX_LENGTH,
    title="Stage Name",
    description="Unique stage name / identifier. This field will be used to generate the statemetadata file/object and persisted during execution",
)
state class-attribute instance-attribute
state: STAGE_STATE = Field(
    ..., title="Stage State", description="Stage state data"
)
status property
status: StageStatus

Return the current status of the stage

timezone class-attribute instance-attribute
timezone: str = Field(
    default="UTC",
    title="Timezone",
    description="Timezone to use in history entries",
)
url property
url: str

Generate and return the URL for the current stage.

Returns:

  • str ( str ) –

    The URL for the current stage.

volatile_engine_ref class-attribute instance-attribute
volatile_engine_ref: Optional[CheckMateEngine] = Field(
    default=None,
    title="Task",
    description="Task object reference (volatile), this is set by the Task execution engine",
    exclude=True,
)

Functions

bind_to_engine
bind_to_engine(
    task_engine: CheckMateEngine,
) -> BaseStage[STAGE_STATE]

Bind the stage to a TaskEngine. If the stage state exists, it will be loaded from persistent storage.

Parameters:

  • task_engine (TaskEngine) –

    The task engine to bind the stage to.

Returns:

Raises:

cancel
cancel() -> None

When a task is canceled, the cancel method on the last incomplete stage is triggered to perform cleanup.

execute_stage abstractmethod
execute_stage() -> None

Executes the task stage

Warning

Returns nothing on success and raises an exception on failure. Do not update state status inside this method, all state updates and retries are handled by the TaskExecution process.

Warning

Do not modify previous_state_copy as it may contain important state information needed for rollback operations.

Note

The previous_state_copy parameter is provided to allow the stage to access inputs and outputs from previous stages. It is the responsibility of classes that implement BaseStage to decide exactly what is passed via state as each stage is executed. On the first stage, the previous_state will be a reference to stage.state.

Note

Then referring to self.state or previous_state_copy in the implementation, Pylint will raise warnings like:

Assigning to attribute 'SOME_ATTRIBUTE' not defined in class slots

This can be safely ignored and muted via a # pylint: disable=assigning-non-slot comment.

gen_obj_name
gen_obj_name(
    engine: Optional[CheckMateEngine] = None,
) -> str

Generate an object name for the current stage.

Parameters:

  • engine (Optional[TaskEngine], default: None ) –

    An optional TaskEngine instance. If not provided, the instance’s engine will be used.

Returns:

  • str ( str ) –

    The generated object name.

inc_attempts
inc_attempts() -> int

Increment the number of attempts for the current stage.

This method increments the attempts attribute of the state object by 1 and returns the updated number of attempts.

Returns:

  • int ( int ) –

    The updated number of attempts.

persistence_file_exists
persistence_file_exists() -> bool

Check if the stage persistence state exists in the task’s input/output system.

Returns:

  • bool ( bool ) –

    True if the persistence file exists, False otherwise.

rollback_stage
rollback_stage() -> None

When a processing error occurs, attempt to rollback to a stable state

Rollbacks are triggered when the stage execution encounters an error. When a rollback operation needed, subclasses can implement this method.

save
save(trigger_update_hook: bool = True) -> None

Persist the current state to durable storage

update_status
update_status(
    message: str,
    status: StageStatus,
    begin_ts: float,
    end_ts: float | None = None,
) -> None

Updates the status of the stage and records the change in history.

This method derived the correct new state based on the change that is taking place and writes to persistent storage.

Parameters:

  • message (str) –

    A message describing the status update.

  • status (StageStatus) –

    The new status to be set for the stage.

  • begin_ts (float) –

    The timestamp when the status update begins.

  • end_ts (float, default: None ) –

    The timestamp when the status update ends.

Returns:

  • None

    None

validate_timezone classmethod
validate_timezone(timezone_str: str) -> str

BindLock

BindLock(
    task_io: TaskIO,
    obj_name: str,
    acquire_timeout: float | None = None,
)

A lock handler that uses the TaskIO interface to lock persistent task state when it is bound to a task engine.

Attributes:

  • task_io (TaskIO) –

    The storage interface for locking operations.

  • obj_name (str) –

    The object name used as the lock file identifier.

  • acquire_timeout (float | None) –

    Maximum time (in seconds) to wait for the lock.

  • _is_locked (bool) –

    Tracks the lock status internally.

  • _thread_lock (Lock) –

    Ensures thread-safe access to acquire and release methods.

Initializes a lock file handler.

Parameters:

  • task_io (TaskIO) –

    The storage interface to use.

  • obj_name (str) –

    The object name used as the lock file identifier.

  • acquire_timeout (float | None, default: None ) –

    The maximum time (in seconds) to wait for the lock. If None, attempts to acquire the lock immediately.

Attributes

acquire_timeout instance-attribute
acquire_timeout = acquire_timeout
is_locked property
is_locked: bool

Returns the current lock status.

obj_name instance-attribute
obj_name = obj_name
task_io instance-attribute
task_io = task_io

Functions

__enter__
__enter__() -> BindLock

Acquires the lock for use within a with statement.

Raises:

Returns:

  • BindLock ( BindLock ) –

    The instance itself for use in a with statement.

__exit__
__exit__(
    exc_type: Type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None

Releases the lock upon exiting a with statement.

Parameters:

  • exc_type (Type[BaseException] | None) –

    The exception type, if any.

  • exc_value (BaseException | None) –

    The exception value, if any.

  • traceback (TracebackType | None) –

    The traceback, if any.

acquire
acquire() -> None

Attempts to acquire the lock by writing to the specified task I/O object. Waits up to a specified timeout if the lock is already in use.

This method is thread-safe.

Raises:

  • BindException

    If the lock cannot be acquired within the timeout.

release
release() -> None

Releases the lock by removing the lock file if it was acquired.

This method is thread-safe.

CheckMateEngine

CheckMateEngine(
    io: TaskIO,
    task_id: str | None = None,
    stage_templates: List[STAGE] | None = None,
    retry_policy: RetryPolicy = NONE,
    max_attempts: int = 1,
    bind_lock_timeout: float | None = None,
    auto_cleanup: bool = False,
    no_lock: bool = False,
    force_create: bool = False,
)

Bases: Generic[STAGE, STAGE_RESULT]

Multi-stage task execution framework

The scope of the task engine is to manage the execution of a series of stages. Each stage is executed sequentially until it is completed, canceled by the user, exhausts retries, or encounters an unrecoverable error.

The intended use case is to provide a lightweight application-level framework that can execute a series of steps with builtin checkpoint and resume capabilities. Ideally, the TaskEngine is integrated into application logic and invoked via an external orchestration and resource management framework such as Apache Airflow.

Warning

The TaskEngine class is not thread-safe and should not be shared between threads. A future enhancement may add thread safety to support parallel stage execution however this arguably should be the responsibility of the orchestration framework.

Initializes a TaskEngine instance.

Parameters:

  • stage_templates (List[BaseStage], default: None ) –

    A list of stage templates to be used in the task.

  • io (TaskIO) –

    The input/output handler for the task.

  • task_id (str | None, default: None ) –

    The unique identifier for the task. Defaults to None.

  • retry_policy (RetryPolicy, default: NONE ) –

    The policy for retrying the task. Defaults to RetryPolicy.NONE.

  • max_attempts (int, default: 1 ) –

    The maximum number of attempts for the task. Defaults to 1.

  • bind_lock_timeout (float | None, default: None ) –

    The timeout for acquiring the bind lock. Defaults to None.

  • auto_cleanup (bool, default: False ) –

    Whether to automatically clean up after the task. Note that this will force a cleanup operation to happen on unbind even if the task did not complete successfully. Defaults to False.

  • no_lock (bool, default: False ) –

    Whether to disable the bind lock. Defaults to False.

  • force_create (bool, default: False ) –

    Whether to force the creation of a new task. This allows an external process to generate the task_id and pass it in. The TaskEngine will use this flag to assume that this is always a new task. Defaults to False.

Raises:

Attributes

auto_cleanup instance-attribute
auto_cleanup = auto_cleanup
bind_lock_timeout instance-attribute
bind_lock_timeout = bind_lock_timeout
cumulative_attempts property
cumulative_attempts: int

Return the cumulative number of attempts for all stages

Returns:

  • int ( int ) –

    The cumulative number of attempts for all stages

has_lockfile property
has_lockfile: bool

Checks for the existence of a lock file, even when the CheckMate engine is not bound. Note that you should not interpret the presence of a lockfile to mean that there is still a running process.

io instance-attribute
io = io
is_bound property
is_bound: bool

Returns whether the task engine is bound

lock instance-attribute
lock = Lock()
lock_obj_name property
lock_obj_name: str

Generates a lock object name for the task.

The lock object name is generated using the task ID, a maximum length for the stage name, and a specific file extension for lock files.

Returns:

  • str ( str ) –

    The generated lock object name.

max_attempts instance-attribute
max_attempts = max_attempts
no_lock instance-attribute
no_lock = no_lock
retry_policy instance-attribute
retry_policy = retry_policy
stages instance-attribute
stages: List[BaseStage] = []
status property
status: StageStatus

Return the current status of the overall task by inspecting the status of each stage.

This method validates the state transitions between consecutive stages using the _StageStateMachine to ensure they are valid. It then determines the overall status of the task based on the statuses of all stages.

Returns:

  • StageStatus ( StageStatus ) –

    The overall status of the task, which can be one of the following:

  • StageStatus
    • StageStatus.COMPLETE: All stages are complete.
  • StageStatus
    • StageStatus.RUNNING: At least one stage is running.
  • StageStatus
    • StageStatus.RETRY: At least one stage is in retry state.
  • StageStatus
    • StageStatus.ERROR: At least one stage has encountered an error.
  • StageStatus
    • StageStatus.CANCELED: At least one stage has been canceled.
  • StageStatus
    • StageStatus.UNBOUND: At least one stage is unbound.
  • StageStatus
    • StageStatus.CREATED: All stages are in the created state or no other status applies.

Raises:

  • TaskEngineException

    If an invalid state transition is detected between any two consecutive stages.

task_id instance-attribute
task_id = (
    gen_uuid(prefix="task.") if not task_id else task_id
)
task_result property
task_result: STAGE_RESULT | None

Retrieves the result state of the last completed task.

This method iterates through the stages and returns the result of the most recently completed stage. If no stages are completed, it returns None.

Returns:

  • STAGE_RESULT ( STAGE_RESULT | None ) –

    The result state of the last completed task, or None if no

  • STAGE_RESULT | None

    task is completed.

task_state instance-attribute
task_state: TaskEngineState[STAGE] = TaskEngineState(
    stage_template_classes=[
        string_from_callable(x) for x in stage_templates
    ],
    stage_templates=[
        model_copy(deep=True) for x in stage_templates
    ],
)

Functions

__enter__
__enter__() -> CheckMateEngine

Enter the runtime context related to a TaskEngine instance.

Returns:

  • TaskEngine ( CheckMateEngine ) –

    A reference to the current instance of the TaskEngine.

__exit__
__exit__(
    exc_type: Type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> Literal[False]

Releases the bind lock upon exiting a with statement.

Parameters:

  • exc_type (Type[BaseException] | None) –

    The exception type, if an exception was raised.

  • exc_value (BaseException | None) –

    The exception instance, if an exception was raised.

  • traceback (TracebackType | None) –

    The traceback object, if an exception was raised.

Returns:

  • Literal[False]

    Literal[False]: Always returns False to indicate that exceptions should propagate.

bind
bind() -> None

Binds the task engine to the task, acquiring the bind lock.

This method performs the following actions: - Checks if the task engine is already bound to a task and raises an exception if it is. - Verifies that all stages have unique names and raises an exception if they do not. - Acquires the bind lock for the task in persistent storage. - Binds the stages to the task engine, initializing all stages and reading from persistent storage if necessary.

Raises:

  • TaskEngineException

    If the task engine is already bound to a task or if stage names are not unique.

cancel
cancel() -> None

Cancels the task by setting all stages to a CANCELED state.

This method finds the starting stage index and updates the status of the corresponding stage to CANCELED. It also sets a cancellation message and the current timestamp.

Raises:

Returns:

  • None

    None

check_sync
check_sync() -> bool

Verifies whether the in-memory state and persistence storage state are in sync

Returns:

  • bool ( bool ) –

    True if the stages from memory and persistence are the same, False otherwise.

dump_stages
dump_stages(
    source: Literal["memory", "persistence"] = "memory",
    color=LIGHTWHITE_EX,
    basenames_only: bool = False,
    source_in_heading: bool = False,
) -> str

Generates a Markdown-formatted summary of all stages, their state data, and persistent storage objects.

The intended use case for this method is unit testing and debugging.

Parameters:

  • color

    The color to use for the structured log output.

  • source (Literal['memory', 'persistence'], default: 'memory' ) –

    The source to use for stage state data. If “memory”, the current state data is used. If “persistence”, the state data is loaded from persistent storage.

  • basenames_only (bool, default: False ) –

    If True, only the basenames of the files will be displayed.

  • source_in_heading (bool, default: False ) –

    If True, the source will be included in the heading. Set this to False when you want to compare memory and persistence source strings.

Raises:

Returns:

  • str ( str ) –

    The structured log entries for all stages.

execute
execute(strict_recovery: bool = False) -> None

Reads state data (if any) and executes the task beginning from the last successful stage

Parameters:

  • strict_recovery (bool, default: False ) –

    When running execute and a stage is already marked as RUNNING, this typically means that the previous execute call to persistence store was not successful and the stage is effectively in an unknown state. When this is set to True, the CheckMateEngine treats these as warnings and will attempt to re-execute, otherwise an exception will be raised. Defaults to False.

log_stages
log_stages(
    source: Literal["memory", "persistence"] = "memory",
    color=LIGHTWHITE_EX,
    basenames_only: bool = False,
) -> None

Logs a Markdown-formatted summary of all stages, their state data, and persistent storage objects.

The intended use case for this method is unit testing and debugging.

Parameters:

  • color

    The color to use for the structured log output.

  • source (Literal['memory', 'persistence'], default: 'memory' ) –

    The source to use for stage state data. If “memory”, the current state data is used. If “persistence”, the state data is loaded from persistent storage.

  • basenames_only (bool, default: False ) –

    If True, only the basenames of the files will be displayed.

Raises:

Returns:

  • str ( None ) –

    The structured log entries for all stages

state_update_hook
state_update_hook() -> None

Hook that is called when a stage updates the task state.

Subclasses should override this method to perform additional actions when state is updated.

unbind
unbind(clean: bool = False) -> None

Unbinds the task engine from the task, releasing the bind lock.

This method restores the original stage templates and releases the bind lock. It raises a TaskEngineException if the task engine is not currently bound.

Parameters:

  • clean (bool, default: False ) –

    If True, state data for all stages will be removed from persistent storage.

Raises:

TaskEngineState

Bases: BaseModel, Generic[STAGE]

Attributes

model_config class-attribute instance-attribute
model_config = ConfigDict(
    extra="forbid",
    strict=False,
    arbitrary_types_allowed=False,
)
stage_template_classes class-attribute instance-attribute
stage_template_classes: List[str] = Field(
    ...,
    title="Stage Template Classes",
    description="List of stage template classes",
)
stage_templates class-attribute instance-attribute
stage_templates: List[STAGE] = Field(
    ...,
    title="Stage Templates",
    description="List of stage templates",
)

Functions

build_lockfile_name

build_lockfile_name(task_id: str) -> str

Generates a lock file name for the task.

Parameters:

  • task_id (str) –

    The unique identifier for the task.

Returns:

  • str ( str ) –

    The generated lock file name.