Write steps, not plumbing
Define your workflow as a sequence of named tasks in YAML. No SDK, no orchestration scaffolding, no boilerplate. Your workflow file is the entire implementation.
Durability out of the box
Every workflow runs on Temporal. Retries, crash recovery and execution history are handled for you, without any extra configuration.
Catch errors early
Zigflow validates your workflow file before execution starts. Invalid constructs and unsupported fields are rejected with clear, actionable error messages.
See how it works
Two steps. Fetch a user profile, then send a welcome email. If either step fails, Temporal retries it automatically using the retry policy defined in the YAML.
Your workflow is a single file. Validate it, run it and share it.
Try your first workflowdocument:
dsl: 1.0.0
taskQueue: acme
workflowType: onboard-user
version: 1.0.0
do:
- fetchProfile:
call: http
with:
method: get
endpoint: ${ "https://api.acme.com/users/" + ($input.userId | tostring) }
output:
as:
profile: ${ . }
- sendWelcome:
call: http
metadata:
activityOptions:
retryPolicy:
maximumAttempts: 3
with:
method: post
endpoint: https://api.acme.com/emails
body:
to: ${ .profile.email }
template: welcome
Every Zigflow workflow runs on Temporal, a battle-tested engine for durable execution. You get automatic retries, crash recovery and full execution history without writing SDK code.
Example workflows
A selection of real workflow patterns, each defined in YAML and ready to run. Full examples are also available in the GitHub repo.
- Agentic Workflow
- Child Workflows
- Custom Search Attributes
- Debugging
- External Calls
- For
- Heartbeat
- Query Listeners
- Scheduling
- Signal Listeners
- Update Listeners
- Wait
Agentic Workflow
A bounded plan/act/observe loop driven by an AI planner and a tool-backed lookup activity.
document:
dsl: 1.0.0
taskQueue: zigflow
workflowType: agentic-workflow
version: 0.0.1
title: Agentic Workflow
summary: A bounded plan/act/observe loop driven by an AI planner and a tool-backed lookup activity.
metadata:
tags:
- agent
- for-loop
- switch
- activity
activityOptions:
startToCloseTimeout:
minutes: 2
input:
schema:
format: json
document:
type: object
required:
- question
properties:
question:
type: string
description: The user question the agent is trying to answer.
maxIterations:
type: integer
description: Upper bound on planner/tool iterations before falling back.
default: 5
minimum: 1
do:
# Primary workflow. Drives the agent loop and produces the final result.
- agentic-workflow:
do:
# Seed $context with the canonical agent state. All later expressions
# read from $context so the loop has a single, stable carrier for
# inter-iteration state.
- initialise:
export:
as: ${ . }
set:
question: ${ $input.question }
maxIterations: ${ $input.maxIterations // 5 }
iteration: 0
observations: []
finalAnswer: null
done: false
# Bounded plan/act loop.
#
# The for task iterates up to maxIterations times. The while condition
# is checked before each iteration; once a tool branch sets
# $context.done == true, the next iteration short-circuits and the
# loop returns. The export pulls forward the last iteration's output
# as the new $context so downstream tasks see the up-to-date state.
- agentLoop:
export:
as: ${ .[-1] // $context }
for:
in: ${ $context.maxIterations }
while: ${ ($context.done // false) != true }
do:
- plan:
export:
as: '${ $context + { plan: . } }'
call: activity
with:
taskQueue: agent-worker
name: agent.PlanNextStep
arguments:
- question: ${ $context.question }
observations: ${ $context.observations }
iteration: ${ $context.iteration }
# Route on the planner result. Each branch is a named workflow
# below that returns the full updated agent state. The switch
# task's export merges that returned state into $context so the
# next iteration sees the new observations, iteration counter
# and done flag.
- routeTool:
export:
as: '${ $context + . }'
switch:
- lookup:
when: ${ $context.plan.tool == "lookup" }
then: runLookup
- finalAnswer:
when: ${ $context.plan.tool == "final_answer" }
then: markAnswered
- default:
then: markUnsupported
# If the loop exhausted maxIterations without a final answer, ask the
# summarisation activity to produce a best-effort response. The export
# parks the activity output under $context.fallbackAnswer so the next
# task can pick it up.
- fallbackAnswer:
export:
as: '${ $context + { fallbackAnswer: . } }'
switch:
- needsSummary:
when: ${ $context.done != true }
then: summarisePartialResult
- default:
then: continue
# Shape the workflow result. answer prefers the planner's final
# answer and falls back to the summariser's best-effort answer.
- finalOutput:
output:
as: ${ . }
set:
answer: ${ $context.finalAnswer // $context.fallbackAnswer.answer }
iterations: ${ $context.iteration }
observations: ${ $context.observations }
# Lookup branch. Calls the AI-backed lookup activity and records the
# observation. Returns the full updated agent state so the parent's switch
# export can merge it into $context.
- runLookup:
do:
- runLookup:
call: activity
with:
taskQueue: agent-worker
name: agent.Lookup
arguments:
- query: ${ $context.plan.arguments.query }
- storeLookupObservation:
output:
as: ${ . }
set:
question: ${ $context.question }
maxIterations: ${ $context.maxIterations }
iteration: ${ $context.iteration + 1 }
observations: '${ $context.observations + [{ tool: "lookup", input: $context.plan.arguments, output: $data.runLookup }] }'
finalAnswer: ${ $context.finalAnswer }
done: false
# Final-answer branch. Captures the planner's answer and signals done.
# iteration is not incremented here so the result reflects the number of
# tool calls rather than the number of planning rounds.
- markAnswered:
do:
- markAnswered:
output:
as: ${ . }
set:
question: ${ $context.question }
maxIterations: ${ $context.maxIterations }
iteration: ${ $context.iteration }
observations: ${ $context.observations }
finalAnswer: ${ $context.plan.arguments.answer }
done: true
# Unsupported-tool branch. The planner returned a tool we don't handle, so
# we close the loop cleanly with an explanatory answer rather than failing
# the workflow. The done flag stops the loop on the next while check.
- markUnsupported:
do:
- markUnsupported:
output:
as: ${ . }
set:
question: ${ $context.question }
maxIterations: ${ $context.maxIterations }
iteration: ${ $context.iteration }
observations: ${ $context.observations }
finalAnswer: '${ "Planner returned an unsupported tool: " + ($context.plan.tool // "<none>") }'
done: true
# Best-effort summarisation when the loop ends without a final answer.
- summarisePartialResult:
do:
- summarisePartialResult:
output:
as: ${ . }
call: activity
with:
taskQueue: agent-worker
name: agent.SummarisePartialResult
arguments:
- question: ${ $context.question }
observations: ${ $context.observations }
Child Workflows
Define multiple workflows and call a child workflow from a parent
document:
dsl: 1.0.0
taskQueue: zigflow
workflowType: childWorkflow
version: 0.0.1
title: Child Workflows
summary: Define multiple workflows and call a child workflow from a parent
metadata:
tags:
- child-workflow
activityOptions:
startToCloseTimeout:
minutes: 1
do:
- parentWorkflow:
do:
- wait:
wait:
seconds: 5
# Call child workflow as a single workflow - this will be run synchronously
- callChildWorkflow1:
run:
workflow:
type: child-workflow1
# Do a fan-out child workflow - this will be run asynchronously
- fanOut:
fork:
compete: false
branches:
- callWorkflow1:
run:
workflow:
type: child-workflow1
- callWorkflow2:
run:
workflow:
type: child-workflow2
- wait:
output:
as:
completed: true
wait:
seconds: 5
- child-workflow1:
do:
- wait:
wait:
seconds: 10
- child-workflow2:
do:
- wait:
wait:
seconds: 3
Custom Search Attributes
How to add custom search attribute data into your Temporal workflows
document:
dsl: 1.0.0
taskQueue: zigflow
workflowType: searchAttributes # Workflow name
version: 0.0.1
title: Custom Search Attributes
summary: How to add custom search attribute data into your Temporal workflows
metadata:
tags:
- search-attributes
activityOptions:
startToCloseTimeout:
minutes: 1
input:
schema:
format: json
document:
type: object
required:
- userId
properties:
userId:
type: number
do:
- wait:
metadata:
searchAttributes:
waitTime:
type: int
value: 5
wait:
seconds: 5
- getUser:
call: http
metadata:
# The search attributes must be configured in your Temporal service - with great power comes great responsibility
searchAttributes:
hello:
type: text
value: world
call:
type: text
value: ${ $data.task.name }
userId:
type: int
value: ${ $input.userId }
with:
method: get
endpoint: ${ "https://jsonplaceholder.typicode.com/users/" + ($input.userId | tostring) }
- wait:
metadata:
searchAttributes:
waitTime:
type: int
value: 3
wait:
seconds: 3
Debugging
An example of how to use CloudEvents for debugging workflows
document:
dsl: 1.0.0
taskQueue: zigflow
workflowType: cloudevents
version: 0.0.1
title: Debugging
summary: An example of how to use CloudEvents for debugging workflows
metadata:
tags:
- cloudevents
- debugging
activityOptions:
startToCloseTimeout:
minutes: 1
# Optionally validate the input received
input:
schema:
format: json
document:
type: object
required:
- userId
properties:
userId:
type: number
do:
- baseData:
export:
as:
baseData: ${ . }
set:
# Set a variable from an envvar
envvar: ${ $env.EXAMPLE_ENVVAR }
uuid: ${ uuid }
object:
hello: world
uuid: ${ uuid }
inputUserId: ${ $input.userId }
now: ${ now }
now_formatted: ${ now | strftime("%Y-%m-%d %H:%M:%S") }
timestamp: ${ timestamp }
timestamp_formatted: ${ timestamp | strftime("%Y-%m-%d %H:%M:%S") }
now_iso8601: ${ timestamp_iso8601 }
array:
- ${ uuid }
- hello: world
- wait:
wait:
seconds: 5
- getUser:
call: http
export:
as: '${ $context + { getUser: . } }'
metadata:
# Configure activity options for this task only
activityOptions:
retryPolicy:
maximumAttempts: 5
with:
method: get
endpoint: ${ "https://jsonplaceholder.typicode.com/users/" + ($input.userId | tostring) }
- raiseAlarm:
# A fork is a series of child workflows running in parallel
output:
as: '${ $context + del(.multiStep) }'
fork:
# If not competing, all tasks will run to the finish - this is the default behaviour
compete: false
branches:
# A single step is passed in by the Serverless Workflow task
- callNurse:
call: http
with:
method: get
endpoint: https://jsonplaceholder.typicode.com/users/2
# Multiple steps can be passed in by the Serverless Workflow do task
- multiStep:
do:
- wait1:
wait:
seconds: 3
- wait2:
wait:
seconds: 2
# Another single step child workflow
- callDoctor:
call: http
with:
method: get
endpoint: ${ "https://jsonplaceholder.typicode.com/users/" + ($input.userId | tostring) }
External Calls
An example of how to use Zigflow to make external gRPC and HTTP calls
document:
dsl: 1.0.0
taskQueue: zigflow
workflowType: external-calls
version: 0.0.1
title: External Calls
summary: An example of how to use Zigflow to make external gRPC and HTTP calls
metadata:
tags:
- http
- grpc
do:
- fork:
fork:
compete: false
branches:
- grpc:
call: grpc
with:
proto:
endpoint: file:///go/app/examples/external-calls/grpc/basic/proto/basic/v1/basic.proto
service:
name: providers.v1.BasicService
host: grpc
port: 3000
method: Command1
arguments:
input: ${ $env.GRPC_INPUT }
- http:
call: http
with:
method: get
endpoint: https://jsonplaceholder.typicode.com/users/3
For
How to use the for loop task
document:
dsl: 1.0.0
taskQueue: zigflow
workflowType: for-loop
version: 0.0.1
title: For
summary: How to use the for loop task
metadata:
tags:
- for-loop
input:
schema:
format: json
document:
type: object
required:
- map
- data
properties:
map:
type: object
required:
- key1
- key2
- key3
properties:
key1:
type: string
key2:
type: number
key3:
type: boolean
data:
type: array
items:
type: object
required:
- userId
properties:
userId:
type: number
do:
# Iterate over the map object
- forTaskMap:
export:
as: '${ $context + { forTaskMap: . } }'
for:
in: ${ $input.map }
do:
- setData:
export:
as: ${ . }
set:
key: "${ \"hello: \" + $data.index }"
value: ${ $data.item }
- wait:
output:
as: ${ $context }
wait:
seconds: 2
# Iterate over the data array
- forTaskArray:
export:
as: '${ $context + { forTaskArray: . } }'
for:
each: item
in: ${ $input.data }
at: index
# while: ${ $data.item.userId != 4 } # If this returns false, it will cut the iteration
do:
# Each iteration will run these tasks in order
- setData:
export:
as: ${ . }
set:
userId: ${ $data.item.userId } # Get the userId for this iteration
id: ${ $data.index } # Get the key
processed: true
- wait:
output:
as: ${ $context }
wait:
seconds: 1
- forTaskNumber:
export:
as: '${ $context + { forTaskNumber: . } }'
for:
in: ${ 5 }
do:
- setData:
export:
as: ${ . }
set:
number: ${ $data.item }
- wait:
output:
as: ${ $context }
wait:
seconds: 1
- forTaskStateCarryOver:
output:
as: '${ $context + { forTaskStateCarryOver: . } }'
for:
in: ${ 5 }
while: '${ ($output.pageNumber // 0) < 4 }'
do:
- incrementPage:
output:
as: '${ { pageNumber: ($context.pageNumber + 1), iteration: $data.index } }'
export:
as: '${ $context + { pageNumber: ($context.pageNumber + 1) } }'
set:
previousPageNumber: ${ $context.pageNumber }
nextPageNumber: ${ $context.pageNumber + 1 }
iteration: ${ $data.index }
Heartbeat
Set activity heartbeat. Useful on long-running activities.
document:
dsl: 1.0.0
taskQueue: zigflow
workflowType: heartbeat
version: 0.0.1
title: Heartbeat
summary: Set [activity heartbeat](https://docs.temporal.io/encyclopedia/detecting-activity-failures#activity-heartbeat). Useful on long-running activities.
metadata:
tags:
- heartbeat
- activity
do:
- longRunningActivity:
metadata:
# Trigger a heartbeat every 8 seconds - this MUST be less than your heartbeatTimeout
heartbeat:
seconds: 8
activityOptions:
# Set a heartbeat timeout of 10 seconds
heartbeatTimeout:
seconds: 10
startToCloseTimeout:
# Increase from 15s default to more than the sleep
minutes: 1
run:
shell:
# Sleep for 30s - this will timeout if heartbeat doesn't run
command: sleep
arguments:
- "30"
Query Listeners
Listen for Temporal query events
document:
dsl: 1.0.0
taskQueue: zigflow
workflowType: query
version: 0.0.1
title: Query Listeners
summary: Listen for Temporal query events
metadata:
tags:
- query
# Acts as override to test out continue-as-new
# @link https://docs.temporal.io/develop/go/continue-as-new
canMaxHistoryLength: 5
do:
- queryState:
listen:
to:
one:
with:
# ID maps to the query name in Temporal
id: get_state
# Temporal query - used to make a non-blocking read request
type: query
# This data will be returned as-is
data:
id: ${ $data.id }
progressPercentage: ${ $data.progressPercentage }
status: ${ $data.status }
- createState:
output:
as:
data: ${ . }
set:
# Items created at top-level are persisted
id: ${ uuid }
status: not started
progressPercentage: 0
- wait:
wait:
seconds: 5
- updateState:
set:
progressPercentage: 33
status: running
- wait:
wait:
seconds: 5
- updateState:
set:
progressPercentage: 66
# status remains as "running" if it remains as top-level item
- wait:
wait:
seconds: 5
- stateComplete:
set:
progressPercentage: 100
status: finished
- wait:
wait:
seconds: 5
Scheduling
Schedule the tasks to be triggered automatically
document:
dsl: 1.0.0
taskQueue: zigflow
workflowType: schedule
version: 0.0.1
title: Scheduling
summary: Schedule the tasks to be triggered automatically
metadata:
tags:
- schedule
# Set the workflow name to trigger - this will either be the document.workflowType or the Do task (see multiple-workflows example)
scheduleWorkflowName: schedule
# Optionally set the schedule ID name
scheduleId: some-schedule
# Optionally set any input for the workflow when triggered - this can receive envvars
scheduleInput:
- msg:
- hello
- world
envvars: ${ $env.EXAMPLE_ENVVAR }
activityOptions:
startToCloseTimeout:
minutes: 1
schedule:
# Every is supported as a Temporal interval - https://pkg.go.dev/go.temporal.io/sdk/client#ScheduleIntervalSpec
every:
minutes: 3
# Cron is supported as a Temporal cronjob - timezone is UTC
cron: "0 0 * * *"
do:
- wait:
wait:
seconds: 5
Signal Listeners
Listen for Temporal signal events
document:
dsl: 1.0.0
taskQueue: zigflow
workflowType: signal
version: 0.0.1
title: Signal Listeners
summary: Listen for Temporal signal events
metadata:
tags:
- signal
do:
- approveListener:
metadata:
timeout: 10s # Controls the AwaitWithTimeout timeout - defaults to 60s
listen:
to:
one:
with:
# ID maps to the signal name in Temporal - this blocks until received
id: approve
# Temporal signal - used to make write request
type: signal
acceptIf: ${ $data.approveListener }
- outputSignal:
export:
as: '${ $context + { response: . } }'
set:
# Get the data from the approveListener signal
signal: ${ $data.approveListener }
- wait:
output:
as: ${ $context }
wait:
seconds: 5
Update Listeners
Listen for Temporal update events
document:
dsl: 1.0.0
taskQueue: zigflow
workflowType: updates
version: 0.0.1
title: Update Listeners
summary: Listen for Temporal update events
metadata:
tags:
- update
do:
- callDoctor:
metadata:
timeout: 10s # Controls the AwaitWithTimeout timeout - defaults to 60s
listen:
to:
# Only progress after every update received
all:
- with:
# ID maps to the update name in Temporal
id: temperature
# Temporal update - used to make read/write request
type: update
acceptIf: ${ $data.temperature > 38 }
- with:
id: bpm
type: update
acceptIf: ${ $data.bpm < 60 or $data.bpm > 100 }
- wait:
output:
as:
temperature: ${ $data.temperature }
bpm: ${ $data.bpm }
wait:
seconds: 10
Wait
Pause a workflow on a Temporal durable timer with until and expression durations.
document:
dsl: 1.0.0
taskQueue: zigflow
workflowType: wait
version: 0.0.1
title: Wait
summary: Pause a workflow on a Temporal durable timer with until and expression durations.
input:
schema:
format: json
document:
type: object
required:
- sendAt
- cancelGraceSeconds
- recipient
- body
properties:
sendAt:
type: string
cancelGraceSeconds:
type: number
recipient:
type: string
body:
type: string
do:
# Hard-coded literal wait. This is the Serverless Workflow spec form
# parsed by the upstream SDK as *model.WaitTask.
- prewarmDelay:
wait:
seconds: 1
# Short cancellation grace window. The length is set per request so a
# caller can opt into "send immediately" with 0 or a longer cooldown.
# This exercises the expression-aware duration form: the value comes
# from workflow input at runtime.
- cancelGrace:
wait:
seconds: ${ $input.cancelGraceSeconds }
# Pause on Temporal's durable timer until the scheduled send moment.
# This exercises the absolute-time until form. A past timestamp is a
# debug-logged no-op and the workflow continues immediately.
- waitForSendTime:
wait:
until: ${ $input.sendAt }
# Record the delivery so the workflow result confirms both waits fired
# and the input was carried through. A production version of this
# workflow would replace this with `call: activity` against a real
# messaging worker; the demo is intentionally self-contained.
- recordSent:
output:
as:
delivered: ${ . }
set:
recipient: ${ $input.recipient }
body: ${ $input.body }