Node module for PocketFlow
Provides the core node types for building execution graphs. Nodes can be chained together to create complex workflows with retry logic, fallback handling, and parallel execution.
Types
ActionLink = object node*: BaseNode action*: string
BatchExecItemCallback = proc (ctx: PfContext; params: JsonNode; item: JsonNode): Future[ JsonNode] {....gcsafe.}
BatchItemFallbackCallback = proc (ctx: PfContext; params: JsonNode; item: JsonNode; err: ref Exception): Future[ JsonNode] {....gcsafe.}
BatchNode = ref object of BaseNode maxRetries*: int waitMs*: int curRetry*: int prepFunc*: BatchPrepCallback execItemFunc*: BatchExecItemCallback postFunc*: BatchPostCallback itemFallbackFunc*: BatchItemFallbackCallback
BatchPostCallback = proc (ctx: PfContext; params: JsonNode; prepRes: JsonNode; execRes: JsonNode): Future[string] {....gcsafe.}
BatchPrepCallback = proc (ctx: PfContext; params: JsonNode): Future[JsonNode] {. ...gcsafe.}
ExecCallback = proc (ctx: PfContext; params: JsonNode; prepRes: JsonNode): Future[ JsonNode] {....gcsafe.}
FallbackCallback = proc (ctx: PfContext; params: JsonNode; prepRes: JsonNode; err: ref Exception): Future[JsonNode] {....gcsafe.}
Node = ref object of BaseNode maxRetries*: int waitMs*: int curRetry*: int prepFunc*: PrepCallback execFunc*: ExecCallback postFunc*: PostCallback fallbackFunc*: FallbackCallback
ParallelBatchNode = ref object of BatchNode maxConcurrency*: int
PostCallback = proc (ctx: PfContext; params: JsonNode; prepRes: JsonNode; execRes: JsonNode): Future[string] {....gcsafe.}
PrepCallback = proc (ctx: PfContext; params: JsonNode): Future[JsonNode] {. ...gcsafe.}
Consts
DefaultAction = "default"
- The default action returned when post callbacks return an empty string
Procs
proc `-`(node: BaseNode; action: string): ActionLink {....raises: [], tags: [], forbids: [].}
proc `>>`(link: ActionLink; nextNode: BaseNode): BaseNode {.discardable, ...raises: [], tags: [], forbids: [].}
proc addSuccessor(node: BaseNode; action: string; nextNode: BaseNode): BaseNode {. discardable, ...raises: [], tags: [], forbids: [].}
proc getNextNode(node: BaseNode; action: string): BaseNode {....raises: [KeyError], tags: [], forbids: [].}
proc initBaseNode(node: BaseNode) {....raises: [], tags: [], forbids: [].}
proc newBatchNode(prep: BatchPrepCallback = nil; execItem: BatchExecItemCallback = nil; post: BatchPostCallback = nil; itemFallback: BatchItemFallbackCallback = nil; maxRetries: int = 1; waitMs: int = 0): BatchNode {....raises: [], tags: [], forbids: [].}
proc newNode(prep: PrepCallback = nil; exec: ExecCallback = nil; post: PostCallback = nil; fallback: FallbackCallback = nil; maxRetries: int = 1; waitMs: int = 0): Node {....raises: [], tags: [], forbids: [].}
proc newParallelBatchNode(prep: BatchPrepCallback = nil; execItem: BatchExecItemCallback = nil; post: BatchPostCallback = nil; itemFallback: BatchItemFallbackCallback = nil; maxRetries: int = 1; waitMs: int = 0; maxConcurrency: int = 0): ParallelBatchNode {. ...raises: [], tags: [], forbids: [].}
Methods
method internalRun(node: BaseNode; ctx: PfContext): Future[string] {.base, ...gcsafe, stackTrace: false, raises: [Exception, ValueError], tags: [RootEffect], forbids: [].}
- Base implementation - should be overridden by derived types
method internalRun(node: BatchNode; ctx: PfContext): Future[string] {....gcsafe, stackTrace: false, raises: [Exception, ValueError], tags: [RootEffect, TimeEffect], forbids: [].}
method internalRun(node: Node; ctx: PfContext): Future[string] {....gcsafe, stackTrace: false, raises: [Exception, ValueError], tags: [RootEffect, TimeEffect], forbids: [].}
method internalRun(node: ParallelBatchNode; ctx: PfContext): Future[string] {. ...gcsafe, stackTrace: false, raises: [Exception, ValueError], tags: [RootEffect, TimeEffect], forbids: [].}