eleanor.backend.checkmate
Resumable sequential task execution.
Attributes
__all__
module-attribute
__all__ = [
"BaseStage",
"BaseStageState",
"BindException",
"CheckMate",
"MissingStateException",
"PersistenceDataNotFoundException",
"StageStatus",
"CheckMateEngine",
"TaskEngineException",
"TaskIOFile",
]
Classes
BaseStage
Bases: ABC
, BaseModel
, Generic[STAGE_STATE]
Base task stage implementation
This class is thread-safe.
Note
STAGE_INPUT and STAGE_OUTPUT must by JSON-serializable models Pydantic models where all attributes are optional; these will be managed by the actual task execution engine.
Attributes
description
class-attribute
instance-attribute
description: str = Field(
default="",
title="Stage Description",
description="Human readable stage description intended for short, inline documentation",
)
model_config
class-attribute
instance-attribute
obj_name
property
Return the object name for the current stage.
The stage must be bound to a task before this property can be used.
Returns:
-
str
(str
) –The object name for the current stage.
stage_name
class-attribute
instance-attribute
stage_name: str = Field(
...,
min_length=1,
max_length=STAGE_NAME_MAX_LENGTH,
title="Stage Name",
description="Unique stage name / identifier. This field will be used to generate the statemetadata file/object and persisted during execution",
)
state
class-attribute
instance-attribute
state: STAGE_STATE = Field(
..., title="Stage State", description="Stage state data"
)
timezone
class-attribute
instance-attribute
timezone: str = Field(
default="UTC",
title="Timezone",
description="Timezone to use in history entries",
)
url
property
Generate and return the URL for the current stage.
Returns:
-
str
(str
) –The URL for the current stage.
volatile_engine_ref
class-attribute
instance-attribute
volatile_engine_ref: Optional[CheckMateEngine] = Field(
default=None,
title="Task",
description="Task object reference (volatile), this is set by the Task execution engine",
exclude=True,
)
Functions
bind_to_engine
bind_to_engine(
task_engine: CheckMateEngine,
) -> BaseStage[STAGE_STATE]
Bind the stage to a TaskEngine. If the stage state exists, it will be loaded from persistent storage.
Parameters:
-
task_engine
(TaskEngine
) –The task engine to bind the stage to.
Returns:
-
BaseStage[STAGE_STATE]
–BaseStage[STAGE_STATE]: The bound stage instance.
Raises:
-
TaskEngineException
–If the stage is already bound to a TaskEngine.
cancel
When a task is canceled, the cancel method on the last incomplete stage is triggered to perform cleanup.
execute_stage
abstractmethod
Executes the task stage
Warning
Returns nothing on success and raises an exception on failure. Do not update state status inside this method, all state updates and retries are handled by the TaskExecution process.
Warning
Do not modify previous_state_copy
as it may contain important state
information needed for rollback operations.
Note
The previous_state_copy
parameter is provided to allow the stage to access
inputs and outputs from previous stages. It is the responsibility of classes
that implement BaseStage to decide exactly what is passed via state as each
stage is executed. On the first stage, the previous_state
will be a
reference to stage.state
.
gen_obj_name
gen_obj_name(
engine: Optional[CheckMateEngine] = None,
) -> str
Generate an object name for the current stage.
Parameters:
-
engine
(Optional[TaskEngine]
, default:None
) –An optional TaskEngine instance. If not provided, the instance’s engine will be used.
Returns:
-
str
(str
) –The generated object name.
inc_attempts
Increment the number of attempts for the current stage.
This method increments the attempts
attribute of the state
object by 1
and returns the updated number of attempts.
Returns:
-
int
(int
) –The updated number of attempts.
persistence_file_exists
Check if the stage persistence state exists in the task’s input/output system.
Returns:
-
bool
(bool
) –True if the persistence file exists, False otherwise.
rollback_stage
When a processing error occurs, attempt to rollback to a stable state
Rollbacks are triggered when the stage execution encounters an error. When a rollback operation needed, subclasses can implement this method.
update_status
update_status(
message: str,
status: StageStatus,
begin_ts: float,
end_ts: float | None = None,
) -> None
Updates the status of the stage and records the change in history.
This method derived the correct new state based on the change that is taking place and writes to persistent storage.
Parameters:
-
message
(str
) –A message describing the status update.
-
status
(StageStatus
) –The new status to be set for the stage.
-
begin_ts
(float
) –The timestamp when the status update begins.
-
end_ts
(float
, default:None
) –The timestamp when the status update ends.
Returns:
-
None
–None
BaseStageState
Bases: BaseModel
, Generic[STAGE_RESULT]
Base class for stage state data. This class should be subclassed to provide specific fields for managing state data.
Note that all stages in a task share the same state class. This is a deliberate design decision to ensure the currently executing stag as access to all relevant state details of every stage that came before it.
Attributes
attempts
class-attribute
instance-attribute
attempts: int = Field(
default=0,
ge=0,
title="Run attempts",
description="Number of times the stage has been run, value of 0 indicates it has not beenrun yet. Primary used to track retries.",
)
history
class-attribute
instance-attribute
history: List[StageStateHistoryEntry] = Field(
default=[],
title="History",
description="Stage state history",
)
model_config
class-attribute
instance-attribute
result
class-attribute
instance-attribute
result: STAGE_RESULT | None = Field(
default=None,
title="Result",
description="Contains the stage execution result. The intended use case is to store the output of the final stage such that external systems can inspect the final stage to determine the overall result of the task. A value of None indicates that the stage has not been completed successfully",
)
stage_name
class-attribute
instance-attribute
stage_name: str = Field(
"",
max_length=STAGE_NAME_MAX_LENGTH,
title="Stage Name",
description="Unique stage name / identifier. This field will be used to generate the statemetadata file/object and persisted during execution",
)
status
class-attribute
instance-attribute
status: StageStatus = Field(
default=UNBOUND,
title="Stage Status",
description="Stage execution status, used by the task execution engine to determine whetherthis state needs to be executed or skipped",
)
BindException
Bases: Exception
Raised when a TaskEngine bind operation fails due to another existing bind
CheckMate
CheckMate(
checkmate_factory: Callable[..., CheckMate],
stage_templates: List[STAGE] | None = None,
task_id: str | None = None,
description: str | None = None,
no_lock: bool = False,
retry_policy: RetryPolicy | None = None,
max_attempts: int | None = None,
bind_lock_timeout: int | None = None,
auto_cleanup: bool | None = None,
force_create: bool = False,
queue: str = DEFAULT_QUEUE_NAME,
tags: List[str] | None = None,
)
Bases: CheckMateEngine[STAGE, STAGE_RESULT]
ManagedTask extends TaskEngine to add RDBMS persistence via bind/unbind hooks.
This class provides additional functionality to bind and unbind tasks with a relational database management system (RDBMS). It ensures that task states are persisted in the database during the bind and unbind operations.
Attributes:
-
description
–An optional description of the task.
Initializes a ManagedTask instance.
This constructor sets up a ManagedTask with optional stage templates, task ID, description, and various configuration options. It ensures that the task is properly configured for RDBMS persistence and task engine operations.
Parameters:
-
stage_templates
(List[STAGE] | None
, default:None
) –Optional list of stage templates.
-
task_id
(str | None
, default:None
) –Optional unique identifier for the task.
-
description
(str | None
, default:None
) –Optional description of the task.
-
no_lock
(bool
, default:False
) –Boolean flag to indicate if locking should be disabled.
-
retry_policy
(RetryPolicy | None
, default:None
) –Optional retry policy for the task.
-
max_attempts
(int | None
, default:None
) –Optional maximum number of attempts for the task.
-
bind_lock_timeout
(int | None
, default:None
) –Optional timeout for bind lock.
-
auto_cleanup
(bool | None
, default:None
) –Optional flag to enable automatic cleanup.
-
force_create
(bool
, default:False
) –Boolean flag to indicate if the task should be created if it doesn’t exist.
-
queue
(str
, default:DEFAULT_QUEUE_NAME
) –Queue name for the task.
Example
with ManagedTask(
stage_templates=[
NOPStage(
stage_name="stage1",
description="No-op stage for testing",
state=BaseStageState(),
),
NOPStage(
stage_name="stage2",
description="No-op stage for testing",
state=BaseStageState(),
),
],
description="Sample task",
no_lock=True,
retry_policy=RetryPolicy.RETRY,
max_attempts=3,
bind_lock_timeout=5,
auto_cleanup=False
) as task:
# Perform task operations here...
Attributes
Functions
bind
Bind the task to the current process.
This method ensures that the task is persisted in the RDBMS. If the task does not exist in the database, it will be created.
unbind
Unbind the task from the current process.
This method ensures that the task state is updated in the RDBMS. If the task does not exist in the database, a warning is logged.
Parameters:
-
clean
(bool
, default:False
) –Boolean flag to indicate if the task should be marked as cleaned.
CheckMateEngine
CheckMateEngine(
io: TaskIO,
task_id: str | None = None,
stage_templates: List[STAGE] | None = None,
retry_policy: RetryPolicy = NONE,
max_attempts: int = 1,
bind_lock_timeout: float | None = None,
auto_cleanup: bool = False,
no_lock: bool = False,
force_create: bool = False,
)
Bases: Generic[STAGE, STAGE_RESULT]
Multi-stage task execution framework
The scope of the task engine is to manage the execution of a series of stages. Each stage is executed sequentially until it is completed, canceled by the user, exhausts retries, or encounters an unrecoverable error.
The intended use case is to provide a lightweight application-level framework that can execute a series of steps with builtin checkpoint and resume capabilities. Ideally, the TaskEngine is integrated into application logic and invoked via an external orchestration and resource management framework such as Apache Airflow.
Warning
The TaskEngine
class is not thread-safe and should not be shared between
threads. A future enhancement may add thread safety to support parallel stage
execution however this arguably should be the responsibility of the
orchestration framework.
Initializes a TaskEngine instance.
Parameters:
-
stage_templates
(List[BaseStage]
, default:None
) –A list of stage templates to be used in the task.
-
io
(TaskIO
) –The input/output handler for the task.
-
task_id
(str | None
, default:None
) –The unique identifier for the task. Defaults to None.
-
retry_policy
(RetryPolicy
, default:NONE
) –The policy for retrying the task. Defaults to RetryPolicy.NONE.
-
max_attempts
(int
, default:1
) –The maximum number of attempts for the task. Defaults to 1.
-
bind_lock_timeout
(float | None
, default:None
) –The timeout for acquiring the bind lock. Defaults to None.
-
auto_cleanup
(bool
, default:False
) –Whether to automatically clean up after the task. Note that this will force a cleanup operation to happen on unbind even if the task did not complete successfully. Defaults to False.
-
no_lock
(bool
, default:False
) –Whether to disable the bind lock. Defaults to False.
-
force_create
(bool
, default:False
) –Whether to force the creation of a new task. This allows an external process to generate the task_id and pass it in. The TaskEngine will use this flag to assume that this is always a new task. Defaults to False.
Raises:
-
TaskEngineException
–If no stage templates are provided.
-
TaskEngineException
–If the retry policy is NONE and max_attempts is not 1.
-
TaskEngineException
–If max_attempts is less than 1.
Attributes
cumulative_attempts
property
Return the cumulative number of attempts for all stages
Returns:
-
int
(int
) –The cumulative number of attempts for all stages
has_lockfile
property
Checks for the existence of a lock file, even when the CheckMate engine is not bound. Note that you should not interpret the presence of a lockfile to mean that there is still a running process.
lock_obj_name
property
Generates a lock object name for the task.
The lock object name is generated using the task ID, a maximum length for the stage name, and a specific file extension for lock files.
Returns:
-
str
(str
) –The generated lock object name.
status
property
status: StageStatus
Return the current status of the overall task by inspecting the status of each stage.
This method validates the state transitions between consecutive stages using the _StageStateMachine to ensure they are valid. It then determines the overall status of the task based on the statuses of all stages.
Returns:
-
StageStatus
(StageStatus
) –The overall status of the task, which can be one of the following:
-
StageStatus
–- StageStatus.COMPLETE: All stages are complete.
-
StageStatus
–- StageStatus.RUNNING: At least one stage is running.
-
StageStatus
–- StageStatus.RETRY: At least one stage is in retry state.
-
StageStatus
–- StageStatus.ERROR: At least one stage has encountered an error.
-
StageStatus
–- StageStatus.CANCELED: At least one stage has been canceled.
-
StageStatus
–- StageStatus.UNBOUND: At least one stage is unbound.
-
StageStatus
–- StageStatus.CREATED: All stages are in the created state or no other status applies.
Raises:
-
TaskEngineException
–If an invalid state transition is detected between any two consecutive stages.
task_result
property
task_result: STAGE_RESULT | None
Retrieves the result state of the last completed task.
This method iterates through the stages and returns the result of the most recently completed stage. If no stages are completed, it returns None.
Returns:
-
STAGE_RESULT
(STAGE_RESULT | None
) –The result state of the last completed task, or None if no
-
STAGE_RESULT | None
–task is completed.
task_state
instance-attribute
task_state: TaskEngineState[STAGE] = TaskEngineState(
stage_template_classes=[
string_from_callable(x) for x in stage_templates
],
stage_templates=[
model_copy(deep=True) for x in stage_templates
],
)
Functions
__enter__
__enter__() -> CheckMateEngine
Enter the runtime context related to a TaskEngine instance.
Returns:
-
TaskEngine
(CheckMateEngine
) –A reference to the current instance of the TaskEngine.
__exit__
__exit__(
exc_type: Type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> Literal[False]
Releases the bind lock upon exiting a with
statement.
Parameters:
-
exc_type
(Type[BaseException] | None
) –The exception type, if an exception was raised.
-
exc_value
(BaseException | None
) –The exception instance, if an exception was raised.
-
traceback
(TracebackType | None
) –The traceback object, if an exception was raised.
Returns:
-
Literal[False]
–Literal[False]: Always returns False to indicate that exceptions should propagate.
bind
Binds the task engine to the task, acquiring the bind lock.
This method performs the following actions: - Checks if the task engine is already bound to a task and raises an exception if it is. - Verifies that all stages have unique names and raises an exception if they do not. - Acquires the bind lock for the task in persistent storage. - Binds the stages to the task engine, initializing all stages and reading from persistent storage if necessary.
Raises:
-
TaskEngineException
–If the task engine is already bound to a task or if stage names are not unique.
cancel
Cancels the task by setting all stages to a CANCELED state.
This method finds the starting stage index and updates the status of the corresponding stage to CANCELED. It also sets a cancellation message and the current timestamp.
Raises:
-
TaskEngineException
–If the task is in a state that cannot be canceled.
Returns:
-
None
–None
check_sync
Verifies whether the in-memory state and persistence storage state are in sync
Returns:
-
bool
(bool
) –True if the stages from memory and persistence are the same, False otherwise.
dump_stages
dump_stages(
source: Literal["memory", "persistence"] = "memory",
color=LIGHTWHITE_EX,
basenames_only: bool = False,
source_in_heading: bool = False,
) -> str
Generates a Markdown-formatted summary of all stages, their state data, and persistent storage objects.
The intended use case for this method is unit testing and debugging.
Parameters:
-
color
–The color to use for the structured log output.
-
source
(Literal['memory', 'persistence']
, default:'memory'
) –The source to use for stage state data. If “memory”, the current state data is used. If “persistence”, the state data is loaded from persistent storage.
-
basenames_only
(bool
, default:False
) –If True, only the basenames of the files will be displayed.
-
source_in_heading
(bool
, default:False
) –If True, the source will be included in the heading. Set this to False when you want to compare memory and persistence source strings.
Raises:
-
TaskEngineException
–If an invalid source is specified.
Returns:
-
str
(str
) –The structured log entries for all stages.
execute
Reads state data (if any) and executes the task beginning from the last successful stage
Parameters:
-
strict_recovery
(bool
, default:False
) –When running execute and a stage is already marked as RUNNING, this typically means that the previous execute call to persistence store was not successful and the stage is effectively in an unknown state. When this is set to True, the CheckMateEngine treats these as warnings and will attempt to re-execute, otherwise an exception will be raised. Defaults to False.
log_stages
log_stages(
source: Literal["memory", "persistence"] = "memory",
color=LIGHTWHITE_EX,
basenames_only: bool = False,
) -> None
Logs a Markdown-formatted summary of all stages, their state data, and persistent storage objects.
The intended use case for this method is unit testing and debugging.
Parameters:
-
color
–The color to use for the structured log output.
-
source
(Literal['memory', 'persistence']
, default:'memory'
) –The source to use for stage state data. If “memory”, the current state data is used. If “persistence”, the state data is loaded from persistent storage.
-
basenames_only
(bool
, default:False
) –If True, only the basenames of the files will be displayed.
Raises:
-
TaskEngineException
–If an invalid source is specified.
Returns:
-
str
(None
) –The structured log entries for all stages
state_update_hook
Hook that is called when a stage updates the task state.
Subclasses should override this method to perform additional actions when state is updated.
unbind
Unbinds the task engine from the task, releasing the bind lock.
This method restores the original stage templates and releases the bind lock. It raises a TaskEngineException if the task engine is not currently bound.
Parameters:
-
clean
(bool
, default:False
) –If True, state data for all stages will be removed from persistent storage.
Raises:
-
TaskEngineException
–If the task engine is not bound.
MissingStateException
Bases: Exception
Raised when required data is not set in the stage state object
PersistenceDataNotFoundException
Bases: Exception
Raised when there was an error loading task data from the persistence store.
StageStatus
Bases: Enum
Task status enumeration.
Attributes:
-
UNBOUND
(str
) –Task has not been bound to a task engine.
-
CREATED
(str
) –Task has been created but not yet started.
-
RUNNING
(str
) –Task is currently running.
-
RETRY
(str
) –Task encountered an error and is eligible for to be retried.
-
ERROR
(str
) –Task has encountered an error and cannot be retried.
-
COMPLETE
(str
) –Task has completed successfully.
-
CANCELED
(str
) –Task has been canceled.
Attributes
TaskEngineException
Bases: Exception
Raised during task engine execution when it is determined that a task can be auto-recovered
TaskIOFile
Bases: TaskIO
Task I/O implementation for file-based storage
Attributes
Functions
build_url
Builds a URL for a given object name by appending it to the base URL.
Parameters:
-
obj_name
(str
) –The object name to convert.
Returns:
-
str
(str
) –The corresponding filesystem path.
exists
Checks if a state data file exists.
Parameters:
-
obj_name
(str
) –The object name to be appended to the base URL.
Returns:
-
bool
(bool
) –True if the file exists, False otherwise.
list
Lists objects in the persistent storage that match the specified prefix, typically this will be the task ID
read
Reads state data from a file.
Parameters:
-
obj_name
(str
) –The object name to be appended to the base URL.
Returns:
-
str
(str
) –The data read from the file.
remove
Removes a state data file.
Parameters:
-
obj_name
(str
) –The object name to be appended to the base URL.