Skip to main content

Overview

The Workflow class provides type-safe, composable task execution with built-in error handling, state management, and observability. Workflows are composed of steps that can be chained, parallelized, or conditionally executed.

Type Parameters

TEngineType
any
default:"DefaultEngineType"
The execution engine type
TSteps
Step[]
default:"Step[]"
Array of step types in the workflow
TWorkflowId
string
default:"string"
The workflow identifier type
TState
any
default:"unknown"
The workflow state type
TInput
any
default:"unknown"
The workflow input type
TOutput
any
default:"unknown"
The workflow output type
TPrevSchema
any
default:"TInput"
The previous step’s output schema type
TRequestContext
Record<string, any>
default:"unknown"
The request context schema type

Creating Workflows

createWorkflow

createWorkflow<TWorkflowId, TState, TInput, TOutput, TSteps, TRequestContext>(
  params: WorkflowConfig<TWorkflowId, TState, TInput, TOutput, TSteps, TRequestContext>
): Workflow<DefaultEngineType, TSteps, TWorkflowId, TState, TInput, TOutput, TInput, TRequestContext>
Creates a new workflow with the specified configuration.
params
WorkflowConfig
required
Workflow configuration
workflow
Workflow
A new workflow builder instance

Properties

id
string
The unique identifier for the workflow
description
string | undefined
Description of the workflow
inputSchema
SchemaWithValidation<TInput>
Schema for validating input
outputSchema
SchemaWithValidation<TOutput>
Schema for validating output
stateSchema
SchemaWithValidation<TState> | undefined
Schema for workflow state
requestContextSchema
SchemaWithValidation<TRequestContext> | undefined
Schema for request context
steps
Record<string, StepWithComponent>
The steps in the workflow
engineType
WorkflowEngineType
The execution engine type (always ‘default’)
type
'default' | 'processor'
Type of workflow
committed
boolean
Whether the workflow has been committed (finalized)
retryConfig
{ attempts?: number; delay?: number }
Retry configuration
mastra
Mastra | undefined
The Mastra instance
options
WorkflowOptions
Workflow options including tracing and callbacks

Methods

Building Workflows

then

then<TStep extends Step>(
  step: TStep
): Workflow<...>
Adds a sequential step to the workflow.
step
Step
required
The step to add to the workflow chain
workflow
Workflow
The workflow builder for method chaining

parallel

parallel<TSteps extends Step[]>(
  ...steps: TSteps
): Workflow<...>
Adds steps that execute in parallel.
steps
Step[]
required
The steps to execute in parallel
workflow
Workflow
The workflow builder for method chaining

branch

branch<TSteps extends Step[]>(config: {
  conditions: ConditionFunction[];
  steps: TSteps;
}): Workflow<...>
Adds conditional branching to the workflow.
config
object
required
Branch configuration
workflow
Workflow
The workflow builder for method chaining

loop

loop<TStep extends Step>(config: {
  step: TStep;
  condition: LoopConditionFunction;
  type: 'dowhile' | 'dountil';
}): Workflow<...>
Adds a loop to the workflow.
config
object
required
Loop configuration
workflow
Workflow
The workflow builder for method chaining

foreach

foreach<TStep extends Step>(config: {
  step: TStep;
  concurrency?: number;
}): Workflow<...>
Adds a foreach loop that iterates over array items.
config
object
required
Foreach configuration
workflow
Workflow
The workflow builder for method chaining

sleep

sleep(config: {
  id: string;
  duration?: number;
  fn?: ExecuteFunction;
}): Workflow<...>
Adds a sleep/delay step to the workflow.
config
object
required
Sleep configuration
workflow
Workflow
The workflow builder for method chaining

sleepUntil

sleepUntil(config: {
  id: string;
  date?: Date;
  fn?: ExecuteFunction;
}): Workflow<...>
Adds a sleep until specific date/time step.
config
object
required
Sleep until configuration
workflow
Workflow
The workflow builder for method chaining

commit

commit(): Workflow<...>
Finalizes the workflow and makes it executable.
workflow
Workflow
The finalized workflow ready for execution

Executing Workflows

createRun

createRun(options: {
  runId?: string;
  resourceId?: string;
  requestContext?: RequestContext;
}): Promise<Run>
Creates a new workflow run instance.
options
object
Run creation options
run
Run
A new workflow run instance

execute

execute(params: {
  inputData: TInput;
  initialState?: TState;
  requestContext?: RequestContext;
  resourceId?: string;
  runId?: string;
  tracingOptions?: TracingOptions;
}): Promise<WorkflowResult<TOutput>>
Executes the workflow with the given input and returns the result.
params
object
required
Execution parameters
result
WorkflowResult<TOutput>
The workflow execution result

stream

stream(params: {
  inputData: TInput;
  initialState?: TState;
  requestContext?: RequestContext;
  resourceId?: string;
  runId?: string;
  tracingOptions?: TracingOptions;
}): Promise<MastraWorkflowStream>
Executes the workflow and streams the results.
params
object
required
Same as execute parameters
stream
MastraWorkflowStream
A stream of workflow execution events

Workflow Management

listWorkflowRuns

listWorkflowRuns(params?: StorageListWorkflowRunsInput): Promise<WorkflowRuns>
Lists workflow runs with optional filtering.
params
StorageListWorkflowRunsInput
Filter parameters
runs
WorkflowRuns
Object containing runs array and total count

getWorkflowRunById

getWorkflowRunById(params: {
  runId: string;
  fields?: WorkflowStateField[];
}): Promise<WorkflowState | null>
Gets a specific workflow run by ID.
params
object
required
Query parameters
state
WorkflowState | null
The workflow run state, or null if not found

deleteWorkflowRun

deleteWorkflowRun(runId: string): Promise<void>
Deletes a workflow run.
runId
string
required
The run ID to delete

Serialization

serialize

serialize(): WorkflowInfo
Serializes the workflow to a JSON-compatible format.
info
WorkflowInfo
Serialized workflow information including steps, schemas, and configuration

Example

import { createWorkflow, createStep } from '@mastra/core/workflows';
import { z } from 'zod';

// Define a workflow
const processDataWorkflow = createWorkflow({
  id: 'process-data',
  description: 'Processes user data through multiple steps',
  inputSchema: z.object({
    userId: z.string(),
    data: z.any()
  }),
  outputSchema: z.object({
    processed: z.boolean(),
    result: z.any()
  })
})
  .then(
    createStep({
      id: 'validate',
      description: 'Validate input data',
      inputSchema: z.object({ userId: z.string(), data: z.any() }),
      outputSchema: z.object({ valid: z.boolean() }),
      execute: async ({ inputData }) => {
        return { valid: true };
      }
    })
  )
  .then(
    createStep({
      id: 'transform',
      description: 'Transform data',
      inputSchema: z.object({ valid: z.boolean() }),
      outputSchema: z.object({ transformed: z.any() }),
      execute: async ({ inputData }) => {
        return { transformed: { ...inputData, timestamp: Date.now() } };
      }
    })
  )
  .commit();

// Execute the workflow
const result = await processDataWorkflow.execute({
  inputData: {
    userId: '123',
    data: { name: 'John' }
  }
});

console.log(result); // { processed: true, result: ... }