src/pocketflow/persistence

State persistence module for PocketFlow

Provides serialization and recovery of flow state for checkpointing and resumption of long-running workflows.

Types

FlowState = object
  flowId*: string
  timestamp*: Time
  contextData*: JsonNode
  currentNodeIndex*: int
  completedNodes*: seq[string]
  metadata*: JsonNode
Serializable state of a flow execution
StateStore = ref object
  storageDir*: string
Storage backend for flow states

Vars

globalStateStore = newStateStore(".pocketflow_state")

Procs

proc captureState(ctx: PfContext; flowId: string;
                  metadata: JsonNode = newJObject()): FlowState {....raises: [],
    tags: [TimeEffect], forbids: [].}
Captures the current state of a flow
proc deleteState(store: StateStore; flowId: string) {....raises: [OSError],
    tags: [ReadDirEffect, WriteDirEffect], forbids: [].}
Deletes all saved states for a flow
proc listStates(store: StateStore): seq[tuple[flowId: string, timestamp: Time]] {.
    ...raises: [], tags: [ReadDirEffect], forbids: [].}
Lists all saved flow states
proc loadState(store: StateStore; flowId: string): FlowState {....raises: [IOError,
    OSError, JsonParsingError, ValueError, KeyError, TimeParseError],
    tags: [ReadDirEffect, ReadIOEffect, WriteIOEffect, TimeEffect], forbids: [].}
Loads the most recent state for a flow
proc newStateStore(storageDir: string = ".pocketflow_state"): StateStore {.
    ...raises: [OSError, IOError], tags: [ReadDirEffect, WriteDirEffect],
    forbids: [].}
Creates a new state store
proc restoreContext(ctx: PfContext; state: FlowState) {....raises: [], tags: [],
    forbids: [].}
Restores context from saved state
proc saveState(store: StateStore; state: FlowState) {....raises: [IOError],
    tags: [WriteIOEffect], forbids: [].}
Saves a flow state to disk