Skip to content

eleanor.backend.checkmate

Resumable sequential task execution.

Attributes

__all__ module-attribute

__all__ = [
    "BaseStage",
    "BaseStageState",
    "BindException",
    "CheckMate",
    "MissingStateException",
    "PersistenceDataNotFoundException",
    "StageStatus",
    "CheckMateEngine",
    "TaskEngineException",
    "TaskIOFile",
]

Classes

BaseStage

Bases: ABC, BaseModel, Generic[STAGE_STATE]

Base task stage implementation

This class is thread-safe.

Note

STAGE_INPUT and STAGE_OUTPUT must by JSON-serializable models Pydantic models where all attributes are optional; these will be managed by the actual task execution engine.

Attributes

attempts property
attempts: int

Return the number of attempts for the current stage

description class-attribute instance-attribute
description: str = Field(
    default="",
    title="Stage Description",
    description="Human readable stage description intended for short, inline documentation",
)
engine property

Return the task reference

model_config class-attribute instance-attribute
model_config = ConfigDict(
    extra="forbid",
    strict=False,
    arbitrary_types_allowed=True,
)
obj_name property
obj_name: str

Return the object name for the current stage.

The stage must be bound to a task before this property can be used.

Returns:

  • str ( str ) –

    The object name for the current stage.

stage_name class-attribute instance-attribute
stage_name: str = Field(
    ...,
    min_length=1,
    max_length=STAGE_NAME_MAX_LENGTH,
    title="Stage Name",
    description="Unique stage name / identifier. This field will be used to generate the statemetadata file/object and persisted during execution",
)
state class-attribute instance-attribute
state: STAGE_STATE = Field(
    ..., title="Stage State", description="Stage state data"
)
status property
status: StageStatus

Return the current status of the stage

timezone class-attribute instance-attribute
timezone: str = Field(
    default="UTC",
    title="Timezone",
    description="Timezone to use in history entries",
)
url property
url: str

Generate and return the URL for the current stage.

Returns:

  • str ( str ) –

    The URL for the current stage.

volatile_engine_ref class-attribute instance-attribute
volatile_engine_ref: Optional[CheckMateEngine] = Field(
    default=None,
    title="Task",
    description="Task object reference (volatile), this is set by the Task execution engine",
    exclude=True,
)

Functions

bind_to_engine
bind_to_engine(
    task_engine: CheckMateEngine,
) -> BaseStage[STAGE_STATE]

Bind the stage to a TaskEngine. If the stage state exists, it will be loaded from persistent storage.

Parameters:

  • task_engine (TaskEngine) –

    The task engine to bind the stage to.

Returns:

Raises:

cancel
cancel() -> None

When a task is canceled, the cancel method on the last incomplete stage is triggered to perform cleanup.

execute_stage abstractmethod
execute_stage() -> None

Executes the task stage

Warning

Returns nothing on success and raises an exception on failure. Do not update state status inside this method, all state updates and retries are handled by the TaskExecution process.

Warning

Do not modify previous_state_copy as it may contain important state information needed for rollback operations.

Note

The previous_state_copy parameter is provided to allow the stage to access inputs and outputs from previous stages. It is the responsibility of classes that implement BaseStage to decide exactly what is passed via state as each stage is executed. On the first stage, the previous_state will be a reference to stage.state.

Note

Then referring to self.state or previous_state_copy in the implementation, Pylint will raise warnings like:

Assigning to attribute 'SOME_ATTRIBUTE' not defined in class slots

This can be safely ignored and muted via a # pylint: disable=assigning-non-slot comment.

gen_obj_name
gen_obj_name(
    engine: Optional[CheckMateEngine] = None,
) -> str

Generate an object name for the current stage.

Parameters:

  • engine (Optional[TaskEngine], default: None ) –

    An optional TaskEngine instance. If not provided, the instance’s engine will be used.

Returns:

  • str ( str ) –

    The generated object name.

inc_attempts
inc_attempts() -> int

Increment the number of attempts for the current stage.

This method increments the attempts attribute of the state object by 1 and returns the updated number of attempts.

Returns:

  • int ( int ) –

    The updated number of attempts.

persistence_file_exists
persistence_file_exists() -> bool

Check if the stage persistence state exists in the task’s input/output system.

Returns:

  • bool ( bool ) –

    True if the persistence file exists, False otherwise.

rollback_stage
rollback_stage() -> None

When a processing error occurs, attempt to rollback to a stable state

Rollbacks are triggered when the stage execution encounters an error. When a rollback operation needed, subclasses can implement this method.

save
save(trigger_update_hook: bool = True) -> None

Persist the current state to durable storage

update_status
update_status(
    message: str,
    status: StageStatus,
    begin_ts: float,
    end_ts: float | None = None,
) -> None

Updates the status of the stage and records the change in history.

This method derived the correct new state based on the change that is taking place and writes to persistent storage.

Parameters:

  • message (str) –

    A message describing the status update.

  • status (StageStatus) –

    The new status to be set for the stage.

  • begin_ts (float) –

    The timestamp when the status update begins.

  • end_ts (float, default: None ) –

    The timestamp when the status update ends.

Returns:

  • None

    None

validate_timezone classmethod
validate_timezone(timezone_str: str) -> str

BaseStageState

Bases: BaseModel, Generic[STAGE_RESULT]

Base class for stage state data. This class should be subclassed to provide specific fields for managing state data.

Note that all stages in a task share the same state class. This is a deliberate design decision to ensure the currently executing stag as access to all relevant state details of every stage that came before it.

Attributes

attempts class-attribute instance-attribute
attempts: int = Field(
    default=0,
    ge=0,
    title="Run attempts",
    description="Number of times the stage has been run, value of 0 indicates it has not beenrun yet. Primary used to track retries.",
)
history class-attribute instance-attribute
history: List[StageStateHistoryEntry] = Field(
    default=[],
    title="History",
    description="Stage state history",
)
model_config class-attribute instance-attribute
model_config = ConfigDict(
    extra="forbid",
    strict=False,
    arbitrary_types_allowed=False,
)
result class-attribute instance-attribute
result: STAGE_RESULT | None = Field(
    default=None,
    title="Result",
    description="Contains the stage execution result. The intended use case is to store the output of the final stage such that external systems can inspect the final stage to determine the overall result of the task. A value of None indicates that the stage has not been completed successfully",
)
stage_name class-attribute instance-attribute
stage_name: str = Field(
    "",
    max_length=STAGE_NAME_MAX_LENGTH,
    title="Stage Name",
    description="Unique stage name / identifier. This field will be used to generate the statemetadata file/object and persisted during execution",
)
status class-attribute instance-attribute
status: StageStatus = Field(
    default=UNBOUND,
    title="Stage Status",
    description="Stage execution status, used by the task execution engine to determine whetherthis state needs to be executed or skipped",
)
task_id class-attribute instance-attribute
task_id: str = Field(
    default="", title="Task ID", description="Task ID"
)

BindException

Bases: Exception

Raised when a TaskEngine bind operation fails due to another existing bind

CheckMate

CheckMate(
    checkmate_factory: Callable[..., CheckMate],
    stage_templates: List[STAGE] | None = None,
    task_id: str | None = None,
    description: str | None = None,
    no_lock: bool = False,
    retry_policy: RetryPolicy | None = None,
    max_attempts: int | None = None,
    bind_lock_timeout: int | None = None,
    auto_cleanup: bool | None = None,
    force_create: bool = False,
    queue: str = DEFAULT_QUEUE_NAME,
    tags: List[str] | None = None,
)

Bases: CheckMateEngine[STAGE, STAGE_RESULT]

ManagedTask extends TaskEngine to add RDBMS persistence via bind/unbind hooks.

This class provides additional functionality to bind and unbind tasks with a relational database management system (RDBMS). It ensures that task states are persisted in the database during the bind and unbind operations.

Attributes:

Initializes a ManagedTask instance.

This constructor sets up a ManagedTask with optional stage templates, task ID, description, and various configuration options. It ensures that the task is properly configured for RDBMS persistence and task engine operations.

Parameters:

  • stage_templates (List[STAGE] | None, default: None ) –

    Optional list of stage templates.

  • task_id (str | None, default: None ) –

    Optional unique identifier for the task.

  • description (str | None, default: None ) –

    Optional description of the task.

  • no_lock (bool, default: False ) –

    Boolean flag to indicate if locking should be disabled.

  • retry_policy (RetryPolicy | None, default: None ) –

    Optional retry policy for the task.

  • max_attempts (int | None, default: None ) –

    Optional maximum number of attempts for the task.

  • bind_lock_timeout (int | None, default: None ) –

    Optional timeout for bind lock.

  • auto_cleanup (bool | None, default: None ) –

    Optional flag to enable automatic cleanup.

  • force_create (bool, default: False ) –

    Boolean flag to indicate if the task should be created if it doesn’t exist.

  • queue (str, default: DEFAULT_QUEUE_NAME ) –

    Queue name for the task.

Example
with ManagedTask(
stage_templates=[
    NOPStage(
    stage_name="stage1",
    description="No-op stage for testing",
    state=BaseStageState(),
    ),
    NOPStage(
    stage_name="stage2",
    description="No-op stage for testing",
    state=BaseStageState(),
    ),
],
description="Sample task",
no_lock=True,
retry_policy=RetryPolicy.RETRY,
max_attempts=3,
bind_lock_timeout=5,
auto_cleanup=False
) as task:
# Perform task operations here...

Attributes

checkmate_factory instance-attribute
description instance-attribute
description = description
queue instance-attribute
queue = queue
tags instance-attribute
tags = tags

Functions

bind
bind() -> None

Bind the task to the current process.

This method ensures that the task is persisted in the RDBMS. If the task does not exist in the database, it will be created.

state_update_hook
state_update_hook() -> None
unbind
unbind(clean: bool = False) -> None

Unbind the task from the current process.

This method ensures that the task state is updated in the RDBMS. If the task does not exist in the database, a warning is logged.

Parameters:

  • clean (bool, default: False ) –

    Boolean flag to indicate if the task should be marked as cleaned.

CheckMateEngine

CheckMateEngine(
    io: TaskIO,
    task_id: str | None = None,
    stage_templates: List[STAGE] | None = None,
    retry_policy: RetryPolicy = NONE,
    max_attempts: int = 1,
    bind_lock_timeout: float | None = None,
    auto_cleanup: bool = False,
    no_lock: bool = False,
    force_create: bool = False,
)

Bases: Generic[STAGE, STAGE_RESULT]

Multi-stage task execution framework

The scope of the task engine is to manage the execution of a series of stages. Each stage is executed sequentially until it is completed, canceled by the user, exhausts retries, or encounters an unrecoverable error.

The intended use case is to provide a lightweight application-level framework that can execute a series of steps with builtin checkpoint and resume capabilities. Ideally, the TaskEngine is integrated into application logic and invoked via an external orchestration and resource management framework such as Apache Airflow.

Warning

The TaskEngine class is not thread-safe and should not be shared between threads. A future enhancement may add thread safety to support parallel stage execution however this arguably should be the responsibility of the orchestration framework.

Initializes a TaskEngine instance.

Parameters:

  • stage_templates (List[BaseStage], default: None ) –

    A list of stage templates to be used in the task.

  • io (TaskIO) –

    The input/output handler for the task.

  • task_id (str | None, default: None ) –

    The unique identifier for the task. Defaults to None.

  • retry_policy (RetryPolicy, default: NONE ) –

    The policy for retrying the task. Defaults to RetryPolicy.NONE.

  • max_attempts (int, default: 1 ) –

    The maximum number of attempts for the task. Defaults to 1.

  • bind_lock_timeout (float | None, default: None ) –

    The timeout for acquiring the bind lock. Defaults to None.

  • auto_cleanup (bool, default: False ) –

    Whether to automatically clean up after the task. Note that this will force a cleanup operation to happen on unbind even if the task did not complete successfully. Defaults to False.

  • no_lock (bool, default: False ) –

    Whether to disable the bind lock. Defaults to False.

  • force_create (bool, default: False ) –

    Whether to force the creation of a new task. This allows an external process to generate the task_id and pass it in. The TaskEngine will use this flag to assume that this is always a new task. Defaults to False.

Raises:

Attributes

auto_cleanup instance-attribute
auto_cleanup = auto_cleanup
bind_lock_timeout instance-attribute
bind_lock_timeout = bind_lock_timeout
cumulative_attempts property
cumulative_attempts: int

Return the cumulative number of attempts for all stages

Returns:

  • int ( int ) –

    The cumulative number of attempts for all stages

has_lockfile property
has_lockfile: bool

Checks for the existence of a lock file, even when the CheckMate engine is not bound. Note that you should not interpret the presence of a lockfile to mean that there is still a running process.

io instance-attribute
io = io
is_bound property
is_bound: bool

Returns whether the task engine is bound

lock instance-attribute
lock = Lock()
lock_obj_name property
lock_obj_name: str

Generates a lock object name for the task.

The lock object name is generated using the task ID, a maximum length for the stage name, and a specific file extension for lock files.

Returns:

  • str ( str ) –

    The generated lock object name.

max_attempts instance-attribute
max_attempts = max_attempts
no_lock instance-attribute
no_lock = no_lock
retry_policy instance-attribute
retry_policy = retry_policy
stages instance-attribute
stages: List[BaseStage] = []
status property
status: StageStatus

Return the current status of the overall task by inspecting the status of each stage.

This method validates the state transitions between consecutive stages using the _StageStateMachine to ensure they are valid. It then determines the overall status of the task based on the statuses of all stages.

Returns:

  • StageStatus ( StageStatus ) –

    The overall status of the task, which can be one of the following:

  • StageStatus
    • StageStatus.COMPLETE: All stages are complete.
  • StageStatus
    • StageStatus.RUNNING: At least one stage is running.
  • StageStatus
    • StageStatus.RETRY: At least one stage is in retry state.
  • StageStatus
    • StageStatus.ERROR: At least one stage has encountered an error.
  • StageStatus
    • StageStatus.CANCELED: At least one stage has been canceled.
  • StageStatus
    • StageStatus.UNBOUND: At least one stage is unbound.
  • StageStatus
    • StageStatus.CREATED: All stages are in the created state or no other status applies.

Raises:

  • TaskEngineException

    If an invalid state transition is detected between any two consecutive stages.

task_id instance-attribute
task_id = (
    gen_uuid(prefix="task.") if not task_id else task_id
)
task_result property
task_result: STAGE_RESULT | None

Retrieves the result state of the last completed task.

This method iterates through the stages and returns the result of the most recently completed stage. If no stages are completed, it returns None.

Returns:

  • STAGE_RESULT ( STAGE_RESULT | None ) –

    The result state of the last completed task, or None if no

  • STAGE_RESULT | None

    task is completed.

task_state instance-attribute
task_state: TaskEngineState[STAGE] = TaskEngineState(
    stage_template_classes=[
        string_from_callable(x) for x in stage_templates
    ],
    stage_templates=[
        model_copy(deep=True) for x in stage_templates
    ],
)

Functions

__enter__
__enter__() -> CheckMateEngine

Enter the runtime context related to a TaskEngine instance.

Returns:

  • TaskEngine ( CheckMateEngine ) –

    A reference to the current instance of the TaskEngine.

__exit__
__exit__(
    exc_type: Type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> Literal[False]

Releases the bind lock upon exiting a with statement.

Parameters:

  • exc_type (Type[BaseException] | None) –

    The exception type, if an exception was raised.

  • exc_value (BaseException | None) –

    The exception instance, if an exception was raised.

  • traceback (TracebackType | None) –

    The traceback object, if an exception was raised.

Returns:

  • Literal[False]

    Literal[False]: Always returns False to indicate that exceptions should propagate.

bind
bind() -> None

Binds the task engine to the task, acquiring the bind lock.

This method performs the following actions: - Checks if the task engine is already bound to a task and raises an exception if it is. - Verifies that all stages have unique names and raises an exception if they do not. - Acquires the bind lock for the task in persistent storage. - Binds the stages to the task engine, initializing all stages and reading from persistent storage if necessary.

Raises:

  • TaskEngineException

    If the task engine is already bound to a task or if stage names are not unique.

cancel
cancel() -> None

Cancels the task by setting all stages to a CANCELED state.

This method finds the starting stage index and updates the status of the corresponding stage to CANCELED. It also sets a cancellation message and the current timestamp.

Raises:

Returns:

  • None

    None

check_sync
check_sync() -> bool

Verifies whether the in-memory state and persistence storage state are in sync

Returns:

  • bool ( bool ) –

    True if the stages from memory and persistence are the same, False otherwise.

dump_stages
dump_stages(
    source: Literal["memory", "persistence"] = "memory",
    color=LIGHTWHITE_EX,
    basenames_only: bool = False,
    source_in_heading: bool = False,
) -> str

Generates a Markdown-formatted summary of all stages, their state data, and persistent storage objects.

The intended use case for this method is unit testing and debugging.

Parameters:

  • color

    The color to use for the structured log output.

  • source (Literal['memory', 'persistence'], default: 'memory' ) –

    The source to use for stage state data. If “memory”, the current state data is used. If “persistence”, the state data is loaded from persistent storage.

  • basenames_only (bool, default: False ) –

    If True, only the basenames of the files will be displayed.

  • source_in_heading (bool, default: False ) –

    If True, the source will be included in the heading. Set this to False when you want to compare memory and persistence source strings.

Raises:

Returns:

  • str ( str ) –

    The structured log entries for all stages.

execute
execute(strict_recovery: bool = False) -> None

Reads state data (if any) and executes the task beginning from the last successful stage

Parameters:

  • strict_recovery (bool, default: False ) –

    When running execute and a stage is already marked as RUNNING, this typically means that the previous execute call to persistence store was not successful and the stage is effectively in an unknown state. When this is set to True, the CheckMateEngine treats these as warnings and will attempt to re-execute, otherwise an exception will be raised. Defaults to False.

log_stages
log_stages(
    source: Literal["memory", "persistence"] = "memory",
    color=LIGHTWHITE_EX,
    basenames_only: bool = False,
) -> None

Logs a Markdown-formatted summary of all stages, their state data, and persistent storage objects.

The intended use case for this method is unit testing and debugging.

Parameters:

  • color

    The color to use for the structured log output.

  • source (Literal['memory', 'persistence'], default: 'memory' ) –

    The source to use for stage state data. If “memory”, the current state data is used. If “persistence”, the state data is loaded from persistent storage.

  • basenames_only (bool, default: False ) –

    If True, only the basenames of the files will be displayed.

Raises:

Returns:

  • str ( None ) –

    The structured log entries for all stages

state_update_hook
state_update_hook() -> None

Hook that is called when a stage updates the task state.

Subclasses should override this method to perform additional actions when state is updated.

unbind
unbind(clean: bool = False) -> None

Unbinds the task engine from the task, releasing the bind lock.

This method restores the original stage templates and releases the bind lock. It raises a TaskEngineException if the task engine is not currently bound.

Parameters:

  • clean (bool, default: False ) –

    If True, state data for all stages will be removed from persistent storage.

Raises:

MissingStateException

Bases: Exception

Raised when required data is not set in the stage state object

PersistenceDataNotFoundException

Bases: Exception

Raised when there was an error loading task data from the persistence store.

StageStatus

Bases: Enum

Task status enumeration.

Attributes:

  • UNBOUND (str) –

    Task has not been bound to a task engine.

  • CREATED (str) –

    Task has been created but not yet started.

  • RUNNING (str) –

    Task is currently running.

  • RETRY (str) –

    Task encountered an error and is eligible for to be retried.

  • ERROR (str) –

    Task has encountered an error and cannot be retried.

  • COMPLETE (str) –

    Task has completed successfully.

  • CANCELED (str) –

    Task has been canceled.

Attributes

CANCELED class-attribute instance-attribute
CANCELED = 'CANCELED'
COMPLETE class-attribute instance-attribute
COMPLETE = 'COMPLETE'
CREATED class-attribute instance-attribute
CREATED = 'CREATED'
ERROR class-attribute instance-attribute
ERROR = 'ERROR'
RETRY class-attribute instance-attribute
RETRY = 'RETRY'
RUNNING class-attribute instance-attribute
RUNNING = 'RUNNING'
UNBOUND class-attribute instance-attribute
UNBOUND = 'UNBOUND'

TaskEngineException

Bases: Exception

Raised during task engine execution when it is determined that a task can be auto-recovered

TaskIOFile

TaskIOFile(base_url: str, encoding: str = 'utf-8')

Bases: TaskIO

Task I/O implementation for file-based storage

Attributes

base_url property
base_url: str

Returns the base URL.

Returns:

  • str ( str ) –

    The base URL.

Functions

build_url
build_url(obj_name: str) -> str

Builds a URL for a given object name by appending it to the base URL.

Parameters:

  • obj_name (str) –

    The object name to convert.

Returns:

  • str ( str ) –

    The corresponding filesystem path.

exists
exists(obj_name: str) -> bool

Checks if a state data file exists.

Parameters:

  • obj_name (str) –

    The object name to be appended to the base URL.

Returns:

  • bool ( bool ) –

    True if the file exists, False otherwise.

list
list(prefix: str) -> List[str]

Lists objects in the persistent storage that match the specified prefix, typically this will be the task ID

read
read(obj_name: str) -> str

Reads state data from a file.

Parameters:

  • obj_name (str) –

    The object name to be appended to the base URL.

Returns:

  • str ( str ) –

    The data read from the file.

remove
remove(obj_name: str) -> None

Removes a state data file.

Parameters:

  • obj_name (str) –

    The object name to be appended to the base URL.

write
write(obj_name: str, data: str) -> None

Writes state data to a file.

Parameters:

  • obj_name (str) –

    The object name to be appended to the base URL.

  • data (str) –

    The data to write to the file.