src/pocketflow/node

Search:
Group by:

Node module for PocketFlow

Provides the core node types for building execution graphs. Nodes can be chained together to create complex workflows with retry logic, fallback handling, and parallel execution.

Types

BaseNode = ref object of RootObj
  params*: JsonNode
  successors*: TableRef[string, BaseNode]
BatchExecItemCallback = proc (ctx: PfContext; params: JsonNode; item: JsonNode): Future[
    JsonNode] {....gcsafe.}
BatchItemFallbackCallback = proc (ctx: PfContext; params: JsonNode;
                                  item: JsonNode; err: ref Exception): Future[
    JsonNode] {....gcsafe.}
BatchNode = ref object of BaseNode
  maxRetries*: int
  waitMs*: int
  curRetry*: int
  prepFunc*: BatchPrepCallback
  execItemFunc*: BatchExecItemCallback
  postFunc*: BatchPostCallback
  itemFallbackFunc*: BatchItemFallbackCallback
BatchPostCallback = proc (ctx: PfContext; params: JsonNode; prepRes: JsonNode;
                          execRes: JsonNode): Future[string] {....gcsafe.}
BatchPrepCallback = proc (ctx: PfContext; params: JsonNode): Future[JsonNode] {.
    ...gcsafe.}
ExecCallback = proc (ctx: PfContext; params: JsonNode; prepRes: JsonNode): Future[
    JsonNode] {....gcsafe.}
FallbackCallback = proc (ctx: PfContext; params: JsonNode; prepRes: JsonNode;
                         err: ref Exception): Future[JsonNode] {....gcsafe.}
Node = ref object of BaseNode
  maxRetries*: int
  waitMs*: int
  curRetry*: int
  prepFunc*: PrepCallback
  execFunc*: ExecCallback
  postFunc*: PostCallback
  fallbackFunc*: FallbackCallback
ParallelBatchNode = ref object of BatchNode
  maxConcurrency*: int
PostCallback = proc (ctx: PfContext; params: JsonNode; prepRes: JsonNode;
                     execRes: JsonNode): Future[string] {....gcsafe.}
PrepCallback = proc (ctx: PfContext; params: JsonNode): Future[JsonNode] {.
    ...gcsafe.}

Consts

DefaultAction = "default"
The default action returned when post callbacks return an empty string

Procs

proc `-`(node: BaseNode; action: string): ActionLink {....raises: [], tags: [],
    forbids: [].}
proc `>>`(link: ActionLink; nextNode: BaseNode): BaseNode {.discardable,
    ...raises: [], tags: [], forbids: [].}
proc `>>`(node: BaseNode; nextNode: BaseNode): BaseNode {.discardable,
    ...raises: [], tags: [], forbids: [].}
proc addSuccessor(node: BaseNode; action: string; nextNode: BaseNode): BaseNode {.
    discardable, ...raises: [], tags: [], forbids: [].}
proc getNextNode(node: BaseNode; action: string): BaseNode {....raises: [KeyError],
    tags: [], forbids: [].}
proc initBaseNode(node: BaseNode) {....raises: [], tags: [], forbids: [].}
proc newBatchNode(prep: BatchPrepCallback = nil;
                  execItem: BatchExecItemCallback = nil;
                  post: BatchPostCallback = nil;
                  itemFallback: BatchItemFallbackCallback = nil;
                  maxRetries: int = 1; waitMs: int = 0): BatchNode {....raises: [],
    tags: [], forbids: [].}
proc newNode(prep: PrepCallback = nil; exec: ExecCallback = nil;
             post: PostCallback = nil; fallback: FallbackCallback = nil;
             maxRetries: int = 1; waitMs: int = 0): Node {....raises: [], tags: [],
    forbids: [].}
proc newParallelBatchNode(prep: BatchPrepCallback = nil;
                          execItem: BatchExecItemCallback = nil;
                          post: BatchPostCallback = nil;
                          itemFallback: BatchItemFallbackCallback = nil;
                          maxRetries: int = 1; waitMs: int = 0;
                          maxConcurrency: int = 0): ParallelBatchNode {.
    ...raises: [], tags: [], forbids: [].}
proc next(node: BaseNode; action: string; nextNode: BaseNode): BaseNode {.
    discardable, ...raises: [], tags: [], forbids: [].}
proc next(node: BaseNode; nextNode: BaseNode): BaseNode {.discardable,
    ...raises: [], tags: [], forbids: [].}
proc setParams(node: BaseNode; params: JsonNode): BaseNode {.discardable,
    ...raises: [], tags: [], forbids: [].}

Methods

method internalRun(node: BaseNode; ctx: PfContext): Future[string] {.base,
    ...gcsafe, stackTrace: false, raises: [Exception, ValueError],
    tags: [RootEffect], forbids: [].}
Base implementation - should be overridden by derived types
method internalRun(node: BatchNode; ctx: PfContext): Future[string] {....gcsafe,
    stackTrace: false, raises: [Exception, ValueError],
    tags: [RootEffect, TimeEffect], forbids: [].}
method internalRun(node: Node; ctx: PfContext): Future[string] {....gcsafe,
    stackTrace: false, raises: [Exception, ValueError],
    tags: [RootEffect, TimeEffect], forbids: [].}
method internalRun(node: ParallelBatchNode; ctx: PfContext): Future[string] {.
    ...gcsafe, stackTrace: false, raises: [Exception, ValueError],
    tags: [RootEffect, TimeEffect], forbids: [].}
method run(node: BaseNode; ctx: PfContext): Future[string] {.base,
    ...stackTrace: false, raises: [Exception, ValueError], tags: [RootEffect],
    forbids: [].}