Building Complex Workflows in Astreus
Build sophisticated multi-agent workflows with advanced orchestration patterns. Learn dependency management, parallel execution, and production-grade pipeline design.
Multi-agent workflows orchestrate specialized agents to handle complex tasks. Instead of a single agent doing everything, each agent focuses on one responsibility while the pipeline manages coordination. This tutorial shows you how to build production-grade workflows using Astreus.
## Getting Started
You can clone the complete example or install just the package:
```bash
# Clone the full example
git clone https://github.com/astreus-ai/complex-workflows
cd complex-workflows
npm install
# Or install the package only
npm install @astreus-ai/astreus
```
Configure your environment variables in `.env`:
```bash
OPENAI_API_KEY=your-api-key-here
DB_URL=sqlite://./astreus.db
```
The database URL can point to any supported database. SQLite works well for local development.
## Core Workflow Pattern
Every Astreus workflow follows three fundamental steps: create specialized agents, initialize a graph pipeline, and add task nodes with dependencies.
Here's the basic structure:
```typescript
import { Agent, Graph } from '@astreus-ai/astreus';
// Step 1: Create specialized agents
const researchAgent = await Agent.create({
name: 'Research Agent',
model: 'gpt-4',
systemPrompt: 'You are a research specialist. Gather and analyze information.'
});
const writerAgent = await Agent.create({
name: 'Writer Agent',
model: 'gpt-4',
systemPrompt: 'You are a professional writer. Create clear, engaging content.'
});
const editorAgent = await Agent.create({
name: 'Editor Agent',
model: 'gpt-4',
systemPrompt: 'You are an editor. Review and improve content quality.'
});
```
Each agent has a specific role defined by its system prompt. Specialized agents produce better results than generalists.
## Building the Pipeline
Initialize a Graph with configuration and a root agent:
```typescript
const pipeline = new Graph({
name: 'Content Creation Pipeline'
}, researchAgent);
```
The root agent handles coordination. It receives the initial input and can orchestrate the overall workflow.
## Adding Task Nodes
Task nodes define individual workflow steps. Use `addTaskNode()` to specify what each agent should do:
```typescript
pipeline.addTaskNode({
name: 'research',
prompt: 'Research the topic: {topic}. Find key facts and insights.',
agentId: researchAgent.id
});
pipeline.addTaskNode({
name: 'write',
prompt: 'Write an article based on this research: {research}',
agentId: writerAgent.id,
dependencies: ['research']
});
pipeline.addTaskNode({
name: 'edit',
prompt: 'Edit and improve this article: {write}',
agentId: editorAgent.id,
dependencies: ['write']
});
```
The `dependencies` array defines execution order. The write task waits for research to complete. The edit task waits for writing to finish.
## Executing the Workflow
Run the pipeline with initial inputs:
```typescript
const result = await pipeline.run({
topic: 'The future of artificial intelligence'
});
if (result.success) {
console.log('Research:', result.results.research);
console.log('Article:', result.results.write);
console.log('Final:', result.results.edit);
} else {
console.error('Pipeline failed:', result.error);
}
```
The `run()` method returns results containing success status and outputs from each task node. Access individual task results by their ID.
## Parallel Execution
Tasks without dependencies run in parallel. This reduces total execution time by utilizing all available agents simultaneously.
```typescript
const analysisAgent = await Agent.create({
name: 'Analysis Agent',
model: 'gpt-4',
systemPrompt: 'You analyze data and extract insights.'
});
const sentimentAgent = await Agent.create({
name: 'Sentiment Agent',
model: 'gpt-4',
systemPrompt: 'You analyze emotional tone and sentiment.'
});
const summaryAgent = await Agent.create({
name: 'Summary Agent',
model: 'gpt-4',
systemPrompt: 'You create concise summaries.'
});
const reportAgent = await Agent.create({
name: 'Report Agent',
model: 'gpt-4',
systemPrompt: 'You compile information into comprehensive reports.'
});
const reportPipeline = new Graph({
name: 'Analysis Report Pipeline'
}, analysisAgent);
// These three tasks run in parallel
reportPipeline.addTaskNode({
name: 'analysis',
prompt: 'Analyze this data: {data}',
agentId: analysisAgent.id
});
reportPipeline.addTaskNode({
name: 'sentiment',
prompt: 'Determine sentiment of: {data}',
agentId: sentimentAgent.id
});
reportPipeline.addTaskNode({
name: 'summary',
prompt: 'Summarize this data: {data}',
agentId: summaryAgent.id
});
// This task waits for all three to complete
reportPipeline.addTaskNode({
name: 'report',
prompt: 'Create report combining: {analysis}, {sentiment}, {summary}',
agentId: reportAgent.id,
dependencies: ['analysis', 'sentiment', 'summary']
});
```
The analysis, sentiment, and summary tasks run concurrently since they have no dependencies. The report task combines their outputs once all three finish.
## Complex Dependency Chains
Real workflows often have intricate dependency relationships. Some tasks depend on multiple predecessors, creating branching and merging patterns.
```typescript
const dataAgent = await Agent.create({
name: 'Data Agent',
model: 'gpt-4',
systemPrompt: 'You validate and clean data.'
});
const processingAgent = await Agent.create({
name: 'Processing Agent',
model: 'gpt-4',
systemPrompt: 'You process and transform data.'
});
const validationAgent = await Agent.create({
name: 'Validation Agent',
model: 'gpt-4',
systemPrompt: 'You validate results against requirements.'
});
const dataPipeline = new Graph({
name: 'Data Processing Pipeline'
}, dataAgent);
// Initial validation
dataPipeline.addTaskNode({
name: 'validateInput',
prompt: 'Validate this input data: {rawData}',
agentId: validationAgent.id
});
// Parallel processing paths
dataPipeline.addTaskNode({
name: 'processTypeA',
prompt: 'Process type A records: {validateInput}',
agentId: processingAgent.id,
dependencies: ['validateInput']
});
dataPipeline.addTaskNode({
name: 'processTypeB',
prompt: 'Process type B records: {validateInput}',
agentId: processingAgent.id,
dependencies: ['validateInput']
});
// Merge results
dataPipeline.addTaskNode({
name: 'merge',
prompt: 'Merge processed data: {processTypeA}, {processTypeB}',
agentId: dataAgent.id,
dependencies: ['processTypeA', 'processTypeB']
});
// Final validation
dataPipeline.addTaskNode({
name: 'validateOutput',
prompt: 'Validate merged output: {merge}',
agentId: validationAgent.id,
dependencies: ['merge']
});
```
This pipeline validates input once, processes two data types in parallel, merges the results, then validates the output. The dependency chain ensures correct execution order.
## Accessing Task Results
Task outputs reference each other using curly brace syntax. When one task depends on another, it can access the predecessor's output in its prompt.
```typescript
const result = await pipeline.run({ rawData: inputData });
if (result.success) {
// Access individual task outputs
const validated = result.results.validateInput;
const processedA = result.results.processTypeA;
const processedB = result.results.processTypeB;
const merged = result.results.merge;
const final = result.results.validateOutput;
// Parse JSON responses if needed
const parsedOutput = JSON.parse(final);
console.log('Processing complete:', parsedOutput);
}
```
The result object contains outputs from all executed tasks. You can access them by task ID and process the data as needed.
## Production Considerations
Production workflows need error handling, monitoring, and optimization. Consider these patterns when building real systems.
Add error handling to catch task failures:
```typescript
try {
const result = await pipeline.run(inputs);
if (!result.success) {
console.error('Pipeline failed:', result.error);
// Handle failure - retry, alert, fallback
}
} catch (error) {
console.error('Pipeline error:', error);
// Handle unexpected errors
}
```
Monitor execution time to identify bottlenecks:
```typescript
const startTime = Date.now();
const result = await pipeline.run(inputs);
const duration = Date.now() - startTime;
console.log(`Pipeline completed in ${duration}ms`);
if (duration > 30000) {
console.warn('Pipeline execution exceeded 30 seconds');
}
```
Optimize agent selection based on task complexity. Use faster models for simple tasks and powerful models for complex reasoning:
```typescript
const simpleAgent = await Agent.create({
name: 'Simple Tasks',
model: 'gpt-3.5-turbo',
systemPrompt: 'Handle straightforward tasks efficiently.'
});
const complexAgent = await Agent.create({
name: 'Complex Tasks',
model: 'gpt-4',
systemPrompt: 'Handle complex reasoning and analysis.'
});
```
Match agent capabilities to task requirements. This reduces costs while maintaining quality.
## Real-World Example: Support Ticket System
Here's a complete workflow for handling customer support tickets:
```typescript
import { Agent, Graph } from '@astreus-ai/astreus';
// Create specialized agents
const classifierAgent = await Agent.create({
name: 'Classifier',
model: 'gpt-3.5-turbo',
systemPrompt: 'Classify support tickets into categories: technical, billing, or general.'
});
const technicalAgent = await Agent.create({
name: 'Technical Support',
model: 'gpt-4',
systemPrompt: 'Provide detailed technical support and troubleshooting.'
});
const billingAgent = await Agent.create({
name: 'Billing Support',
model: 'gpt-3.5-turbo',
systemPrompt: 'Handle billing inquiries and account issues.'
});
const generalAgent = await Agent.create({
name: 'General Support',
model: 'gpt-3.5-turbo',
systemPrompt: 'Answer general questions and provide information.'
});
const qualityAgent = await Agent.create({
name: 'Quality Reviewer',
model: 'gpt-4',
systemPrompt: 'Review support responses for accuracy and helpfulness.'
});
// Build the support pipeline
const supportPipeline = new Graph({
name: 'Support Ticket Pipeline',
description: 'Classify and respond to customer support tickets'
}, classifierAgent);
// Step 1: Classify the ticket
supportPipeline.addTaskNode({
name: 'classify',
prompt: 'Classify this support ticket and return JSON: {ticket}',
agentId: classifierAgent.id
});
// Step 2: Handle based on category (all three run, but only relevant one matters)
supportPipeline.addTaskNode({
name: 'handleTechnical',
prompt: 'Provide technical support for: {ticket}',
agentId: technicalAgent.id,
dependencies: ['classify']
});
supportPipeline.addTaskNode({
name: 'handleBilling',
prompt: 'Handle billing issue: {ticket}',
agentId: billingAgent.id,
dependencies: ['classify']
});
supportPipeline.addTaskNode({
name: 'handleGeneral',
prompt: 'Answer general inquiry: {ticket}',
agentId: generalAgent.id,
dependencies: ['classify']
});
// Step 3: Review quality
supportPipeline.addTaskNode({
name: 'review',
prompt: 'Review these responses: Technical: {handleTechnical}, Billing: {handleBilling}, General: {handleGeneral}. Select the best response based on classification: {classify}',
agentId: qualityAgent.id,
dependencies: ['handleTechnical', 'handleBilling', 'handleGeneral']
});
// Execute the pipeline
const ticket = {
id: 'TICKET-123',
subject: 'Cannot login to my account',
message: 'I keep getting an error when trying to login. It says invalid credentials but I know my password is correct.',
user: { email: '[email protected]', accountType: 'premium' }
};
const result = await supportPipeline.run({ ticket: JSON.stringify(ticket) });
if (result.success) {
const classification = JSON.parse(result.results.classify);
const response = JSON.parse(result.results.review);
console.log('Category:', classification.category);
console.log('Response:', response.selectedResponse);
console.log('Confidence:', response.confidence);
}
```
This workflow classifies tickets, generates responses from multiple specialized agents, and selects the best response through quality review. The pipeline handles the orchestration automatically.
## Key Concepts
Successful workflow design follows several principles. First, create specialized agents with clear responsibilities. Each agent should excel at one type of task rather than trying to do everything.
Second, minimize dependencies where possible. Independent tasks run in parallel, reducing total execution time. Only add dependencies when one task truly needs another's output.
Third, design prompts that reference other task outputs explicitly. Use `{taskId}` syntax to pull results from dependencies. This makes data flow clear and maintainable.
Fourth, handle errors gracefully at the pipeline level. Check `result.success` before accessing task outputs. Plan for failures and implement appropriate recovery strategies.
## Advanced Patterns
Complex workflows can implement sophisticated patterns. Implement map-reduce by running the same agent on multiple inputs in parallel, then combining results:
```typescript
const documents = ['doc1.txt', 'doc2.txt', 'doc3.txt', 'doc4.txt'];
const summaryPipeline = new Graph({
name: 'Document Summary'
}, summaryAgent);
// Add a task for each document (runs in parallel)
documents.forEach((doc, index) => {
summaryPipeline.addTaskNode({
name: `summarize_${index}`,
prompt: `Summarize this document: {document_${index}}`,
agentId: summaryAgent.id
});
});
// Combine all summaries
summaryPipeline.addTaskNode({
name: 'combine',
prompt: `Combine these summaries into one: ${documents.map((_, i) => `{summarize_${i}}`).join(', ')}`,
agentId: summaryAgent.id,
dependencies: documents.map((_, i) => `summarize_${i}`)
});
```
Implement validation chains where each stage validates different aspects:
```typescript
validationPipeline.addTaskNode({
name: 'syntaxCheck',
prompt: 'Validate syntax: {input}',
agentId: validatorAgent.id
});
validationPipeline.addTaskNode({
name: 'semanticCheck',
prompt: 'Validate semantics: {input}',
agentId: validatorAgent.id,
dependencies: ['syntaxCheck']
});
validationPipeline.addTaskNode({
name: 'businessRules',
prompt: 'Validate business rules: {input}',
agentId: validatorAgent.id,
dependencies: ['semanticCheck']
});
```
Each validation stage only runs if the previous stage passes. This creates fail-fast behavior that saves resources.
## Conclusion
Astreus workflows coordinate multiple specialized agents through dependency-based execution. The core pattern is simple: create agents with `Agent.create()`, initialize a pipeline with `new Graph()`, add tasks with `addTaskNode()`, and execute with `run()`.
Parallel execution happens automatically when tasks lack dependencies. Complex workflows emerge from combining simple patterns. Start with linear pipelines, then add parallel branches as needed. Always design for failure by checking result status and handling errors appropriately.
The key to effective workflows is matching agent capabilities to task requirements and minimizing unnecessary dependencies. This maximizes parallelism while maintaining correct execution order.
This experiment is written for Astreus v0.5.37. Please ensure you are using a compatible version.