Skip to main content

Overview

MastraCompositeStore is the interface that all storage providers must implement. It provides standardized methods for persisting threads, messages, workflow runs, agents, and other data.

Type Definition

interface MastraCompositeStore {
  // Thread operations
  getThreadById(threadId: string): Promise<StorageThreadType | null>;
  listThreads(args: StorageListThreadsInput): Promise<StorageListThreadsOutput>;
  saveThread(thread: StorageThreadType): Promise<StorageThreadType>;
  deleteThread(threadId: string): Promise<void>;
  cloneThread?(input: StorageCloneThreadInput): Promise<StorageCloneThreadOutput>;
  
  // Message operations
  listMessages(args: StorageListMessagesInput): Promise<StorageListMessagesOutput>;
  listMessagesByResourceId?(args: StorageListMessagesByResourceIdInput): Promise<StorageListMessagesOutput>;
  saveMessages(messages: MastraDBMessage[]): Promise<MastraDBMessage[]>;
  deleteMessage?(input: MessageDeleteInput): Promise<void>;
  
  // Workflow operations
  saveWorkflowRun(run: StorageWorkflowRun): Promise<void>;
  getWorkflowRunById(runId: string, fields?: WorkflowStateField[]): Promise<WorkflowState | null>;
  listWorkflowRuns(input?: StorageListWorkflowRunsInput): Promise<WorkflowRuns>;
  deleteWorkflowRun(runId: string): Promise<void>;
  
  // Agent operations (for stored agents)
  createAgent?(input: StorageCreateAgentInput): Promise<StorageResolvedAgentType>;
  getAgentByIdResolved?(id: string): Promise<StorageResolvedAgentType | null>;
  updateAgent?(input: StorageUpdateAgentInput): Promise<StorageResolvedAgentType>;
  deleteAgent?(id: string): Promise<void>;
  listAgentsResolved?(input?: StorageListAgentsInput): Promise<StorageListAgentsOutput>;
  
  // Initialization
  init?(): Promise<void>;
}

Thread Methods

getThreadById

getThreadById(threadId: string): Promise<StorageThreadType | null>
Retrieves a specific thread by its ID.
threadId
string
required
The unique identifier of the thread
thread
StorageThreadType | null
The thread or null if not found

listThreads

listThreads(args: StorageListThreadsInput): Promise<StorageListThreadsOutput>
Lists threads with optional filtering.
args
StorageListThreadsInput
required
Query parameters including filters, pagination, and ordering
result
StorageListThreadsOutput
Paginated thread results

saveThread

saveThread(thread: StorageThreadType): Promise<StorageThreadType>
Saves or updates a thread.
thread
StorageThreadType
required
The thread data to save
thread
StorageThreadType
The saved thread

deleteThread

deleteThread(threadId: string): Promise<void>
Deletes a thread and all its messages.
threadId
string
required
The ID of the thread to delete

cloneThread (optional)

cloneThread?(input: StorageCloneThreadInput): Promise<StorageCloneThreadOutput>
Clones a thread with optional message filtering.
input
StorageCloneThreadInput
required
Clone configuration including source thread ID, filters, and options
result
StorageCloneThreadOutput
The cloned thread and messages

Message Methods

listMessages

listMessages(args: StorageListMessagesInput): Promise<StorageListMessagesOutput>
Lists messages for specific thread(s).
args
StorageListMessagesInput
required
Query parameters including thread ID(s), pagination, and filters
result
StorageListMessagesOutput
Paginated message results

listMessagesByResourceId (optional)

listMessagesByResourceId?(args: StorageListMessagesByResourceIdInput): Promise<StorageListMessagesOutput>
Lists all messages for a resource ID across all threads.
args
StorageListMessagesByResourceIdInput
required
Query parameters including resource ID, pagination, and filters
result
StorageListMessagesOutput
Paginated message results

saveMessages

saveMessages(messages: MastraDBMessage[]): Promise<MastraDBMessage[]>
Saves an array of messages.
messages
MastraDBMessage[]
required
Array of messages to save
messages
MastraDBMessage[]
The saved messages

deleteMessage (optional)

deleteMessage?(input: MessageDeleteInput): Promise<void>
Deletes a specific message.
input
MessageDeleteInput
required
Message ID and thread ID

Workflow Methods

saveWorkflowRun

saveWorkflowRun(run: StorageWorkflowRun): Promise<void>
Saves a workflow run snapshot.
run
StorageWorkflowRun
required
The workflow run data to save

getWorkflowRunById

getWorkflowRunById(runId: string, fields?: WorkflowStateField[]): Promise<WorkflowState | null>
Retrieves a workflow run by ID.
runId
string
required
The workflow run ID
fields
WorkflowStateField[]
Optional fields to include: ‘result’, ‘error’, ‘payload’, ‘steps’, ‘activeStepsPath’, ‘serializedStepGraph’
state
WorkflowState | null
The workflow run state or null if not found

listWorkflowRuns

listWorkflowRuns(input?: StorageListWorkflowRunsInput): Promise<WorkflowRuns>
Lists workflow runs with optional filtering.
input
StorageListWorkflowRunsInput
Query parameters including workflow name, date range, status, and pagination
runs
WorkflowRuns
Object containing runs array and total count

deleteWorkflowRun

deleteWorkflowRun(runId: string): Promise<void>
Deletes a workflow run.
runId
string
required
The workflow run ID to delete

Agent Methods (Optional)

These methods are optional and only needed for storage providers that support stored agents.

createAgent

createAgent?(input: StorageCreateAgentInput): Promise<StorageResolvedAgentType>
Creates a new stored agent.
input
StorageCreateAgentInput
required
Agent configuration including id, name, instructions, model, tools, etc.
agent
StorageResolvedAgentType
The created agent with resolved configuration

getAgentByIdResolved

getAgentByIdResolved?(id: string): Promise<StorageResolvedAgentType | null>
Retrieves a stored agent by ID with resolved configuration.
id
string
required
The agent ID
agent
StorageResolvedAgentType | null
The agent or null if not found

updateAgent

updateAgent?(input: StorageUpdateAgentInput): Promise<StorageResolvedAgentType>
Updates a stored agent.
input
StorageUpdateAgentInput
required
Agent update data including id and fields to update
agent
StorageResolvedAgentType
The updated agent with resolved configuration

deleteAgent

deleteAgent?(id: string): Promise<void>
Deletes a stored agent.
id
string
required
The agent ID to delete

listAgentsResolved

listAgentsResolved?(input?: StorageListAgentsInput): Promise<StorageListAgentsOutput>
Lists stored agents with optional filtering.
input
StorageListAgentsInput
Query parameters including filters and pagination
result
StorageListAgentsOutput
Paginated agent results

Initialization

init

init?(): Promise<void>
Optional initialization method called when storage is first used. Use this to set up database schemas, create tables, or establish connections.

Available Storage Providers

Mastra provides several storage providers out of the box:
  • @mastra/libsql - LibSQL/Turso database storage
  • @mastra/postgres - PostgreSQL storage
  • @mastra/upstash - Upstash Redis storage
  • @mastra/memory - In-memory storage (development only)

Example Usage

import { LibSQLStore } from '@mastra/libsql';
import { Mastra } from '@mastra/core';
import { Memory } from '@mastra/memory';

// Create storage instance
const storage = new LibSQLStore({
  id: 'mastra-storage',
  url: process.env.DATABASE_URL || ':memory:'
});

// Initialize storage (if needed)
await storage.init?.();

// Use with Mastra
const mastra = new Mastra({
  storage,
  agents: { /* ... */ },
  workflows: { /* ... */ }
});

// Use with Memory
const memory = new Memory({
  name: 'conversation-memory',
  storage
});

// Thread operations
const thread = await storage.saveThread({
  id: 'thread-123',
  title: 'Support Conversation',
  resourceId: 'user-456',
  createdAt: new Date(),
  updatedAt: new Date()
});

// Message operations
const messages = await storage.listMessages({
  threadId: 'thread-123',
  perPage: 50
});

// Workflow operations
const runs = await storage.listWorkflowRuns({
  workflowName: 'data-processor',
  status: 'success'
});