Building Complex Workflows in Astreus

Build sophisticated multi-agent workflows with advanced orchestration patterns. Learn dependency management, parallel execution, and production-grade pipeline design.

Multi-agent workflows orchestrate specialized agents to handle complex tasks. Instead of a single agent doing everything, each agent focuses on one responsibility while the pipeline manages coordination. This tutorial shows you how to build production-grade workflows using Astreus. ## Getting Started You can clone the complete example or install just the package: ```bash # Clone the full example git clone https://github.com/astreus-ai/complex-workflows cd complex-workflows npm install # Or install the package only npm install @astreus-ai/astreus ``` Configure your environment variables in `.env`: ```bash OPENAI_API_KEY=your-api-key-here DB_URL=sqlite://./astreus.db ``` The database URL can point to any supported database. SQLite works well for local development. ## Core Workflow Pattern Every Astreus workflow follows three fundamental steps: create specialized agents, initialize a graph pipeline, and add task nodes with dependencies. Here's the basic structure: ```typescript import { Agent, Graph } from '@astreus-ai/astreus'; // Step 1: Create specialized agents const researchAgent = await Agent.create({ name: 'Research Agent', model: 'gpt-4', systemPrompt: 'You are a research specialist. Gather and analyze information.' }); const writerAgent = await Agent.create({ name: 'Writer Agent', model: 'gpt-4', systemPrompt: 'You are a professional writer. Create clear, engaging content.' }); const editorAgent = await Agent.create({ name: 'Editor Agent', model: 'gpt-4', systemPrompt: 'You are an editor. Review and improve content quality.' }); ``` Each agent has a specific role defined by its system prompt. Specialized agents produce better results than generalists. ## Building the Pipeline Initialize a Graph with configuration and a root agent: ```typescript const pipeline = new Graph({ name: 'Content Creation Pipeline' }, researchAgent); ``` The root agent handles coordination. It receives the initial input and can orchestrate the overall workflow. ## Adding Task Nodes Task nodes define individual workflow steps. Use `addTaskNode()` to specify what each agent should do: ```typescript pipeline.addTaskNode({ name: 'research', prompt: 'Research the topic: {topic}. Find key facts and insights.', agentId: researchAgent.id }); pipeline.addTaskNode({ name: 'write', prompt: 'Write an article based on this research: {research}', agentId: writerAgent.id, dependencies: ['research'] }); pipeline.addTaskNode({ name: 'edit', prompt: 'Edit and improve this article: {write}', agentId: editorAgent.id, dependencies: ['write'] }); ``` The `dependencies` array defines execution order. The write task waits for research to complete. The edit task waits for writing to finish. ## Executing the Workflow Run the pipeline with initial inputs: ```typescript const result = await pipeline.run({ topic: 'The future of artificial intelligence' }); if (result.success) { console.log('Research:', result.results.research); console.log('Article:', result.results.write); console.log('Final:', result.results.edit); } else { console.error('Pipeline failed:', result.error); } ``` The `run()` method returns results containing success status and outputs from each task node. Access individual task results by their ID. ## Parallel Execution Tasks without dependencies run in parallel. This reduces total execution time by utilizing all available agents simultaneously. ```typescript const analysisAgent = await Agent.create({ name: 'Analysis Agent', model: 'gpt-4', systemPrompt: 'You analyze data and extract insights.' }); const sentimentAgent = await Agent.create({ name: 'Sentiment Agent', model: 'gpt-4', systemPrompt: 'You analyze emotional tone and sentiment.' }); const summaryAgent = await Agent.create({ name: 'Summary Agent', model: 'gpt-4', systemPrompt: 'You create concise summaries.' }); const reportAgent = await Agent.create({ name: 'Report Agent', model: 'gpt-4', systemPrompt: 'You compile information into comprehensive reports.' }); const reportPipeline = new Graph({ name: 'Analysis Report Pipeline' }, analysisAgent); // These three tasks run in parallel reportPipeline.addTaskNode({ name: 'analysis', prompt: 'Analyze this data: {data}', agentId: analysisAgent.id }); reportPipeline.addTaskNode({ name: 'sentiment', prompt: 'Determine sentiment of: {data}', agentId: sentimentAgent.id }); reportPipeline.addTaskNode({ name: 'summary', prompt: 'Summarize this data: {data}', agentId: summaryAgent.id }); // This task waits for all three to complete reportPipeline.addTaskNode({ name: 'report', prompt: 'Create report combining: {analysis}, {sentiment}, {summary}', agentId: reportAgent.id, dependencies: ['analysis', 'sentiment', 'summary'] }); ``` The analysis, sentiment, and summary tasks run concurrently since they have no dependencies. The report task combines their outputs once all three finish. ## Complex Dependency Chains Real workflows often have intricate dependency relationships. Some tasks depend on multiple predecessors, creating branching and merging patterns. ```typescript const dataAgent = await Agent.create({ name: 'Data Agent', model: 'gpt-4', systemPrompt: 'You validate and clean data.' }); const processingAgent = await Agent.create({ name: 'Processing Agent', model: 'gpt-4', systemPrompt: 'You process and transform data.' }); const validationAgent = await Agent.create({ name: 'Validation Agent', model: 'gpt-4', systemPrompt: 'You validate results against requirements.' }); const dataPipeline = new Graph({ name: 'Data Processing Pipeline' }, dataAgent); // Initial validation dataPipeline.addTaskNode({ name: 'validateInput', prompt: 'Validate this input data: {rawData}', agentId: validationAgent.id }); // Parallel processing paths dataPipeline.addTaskNode({ name: 'processTypeA', prompt: 'Process type A records: {validateInput}', agentId: processingAgent.id, dependencies: ['validateInput'] }); dataPipeline.addTaskNode({ name: 'processTypeB', prompt: 'Process type B records: {validateInput}', agentId: processingAgent.id, dependencies: ['validateInput'] }); // Merge results dataPipeline.addTaskNode({ name: 'merge', prompt: 'Merge processed data: {processTypeA}, {processTypeB}', agentId: dataAgent.id, dependencies: ['processTypeA', 'processTypeB'] }); // Final validation dataPipeline.addTaskNode({ name: 'validateOutput', prompt: 'Validate merged output: {merge}', agentId: validationAgent.id, dependencies: ['merge'] }); ``` This pipeline validates input once, processes two data types in parallel, merges the results, then validates the output. The dependency chain ensures correct execution order. ## Accessing Task Results Task outputs reference each other using curly brace syntax. When one task depends on another, it can access the predecessor's output in its prompt. ```typescript const result = await pipeline.run({ rawData: inputData }); if (result.success) { // Access individual task outputs const validated = result.results.validateInput; const processedA = result.results.processTypeA; const processedB = result.results.processTypeB; const merged = result.results.merge; const final = result.results.validateOutput; // Parse JSON responses if needed const parsedOutput = JSON.parse(final); console.log('Processing complete:', parsedOutput); } ``` The result object contains outputs from all executed tasks. You can access them by task ID and process the data as needed. ## Production Considerations Production workflows need error handling, monitoring, and optimization. Consider these patterns when building real systems. Add error handling to catch task failures: ```typescript try { const result = await pipeline.run(inputs); if (!result.success) { console.error('Pipeline failed:', result.error); // Handle failure - retry, alert, fallback } } catch (error) { console.error('Pipeline error:', error); // Handle unexpected errors } ``` Monitor execution time to identify bottlenecks: ```typescript const startTime = Date.now(); const result = await pipeline.run(inputs); const duration = Date.now() - startTime; console.log(`Pipeline completed in ${duration}ms`); if (duration > 30000) { console.warn('Pipeline execution exceeded 30 seconds'); } ``` Optimize agent selection based on task complexity. Use faster models for simple tasks and powerful models for complex reasoning: ```typescript const simpleAgent = await Agent.create({ name: 'Simple Tasks', model: 'gpt-3.5-turbo', systemPrompt: 'Handle straightforward tasks efficiently.' }); const complexAgent = await Agent.create({ name: 'Complex Tasks', model: 'gpt-4', systemPrompt: 'Handle complex reasoning and analysis.' }); ``` Match agent capabilities to task requirements. This reduces costs while maintaining quality. ## Real-World Example: Support Ticket System Here's a complete workflow for handling customer support tickets: ```typescript import { Agent, Graph } from '@astreus-ai/astreus'; // Create specialized agents const classifierAgent = await Agent.create({ name: 'Classifier', model: 'gpt-3.5-turbo', systemPrompt: 'Classify support tickets into categories: technical, billing, or general.' }); const technicalAgent = await Agent.create({ name: 'Technical Support', model: 'gpt-4', systemPrompt: 'Provide detailed technical support and troubleshooting.' }); const billingAgent = await Agent.create({ name: 'Billing Support', model: 'gpt-3.5-turbo', systemPrompt: 'Handle billing inquiries and account issues.' }); const generalAgent = await Agent.create({ name: 'General Support', model: 'gpt-3.5-turbo', systemPrompt: 'Answer general questions and provide information.' }); const qualityAgent = await Agent.create({ name: 'Quality Reviewer', model: 'gpt-4', systemPrompt: 'Review support responses for accuracy and helpfulness.' }); // Build the support pipeline const supportPipeline = new Graph({ name: 'Support Ticket Pipeline', description: 'Classify and respond to customer support tickets' }, classifierAgent); // Step 1: Classify the ticket supportPipeline.addTaskNode({ name: 'classify', prompt: 'Classify this support ticket and return JSON: {ticket}', agentId: classifierAgent.id }); // Step 2: Handle based on category (all three run, but only relevant one matters) supportPipeline.addTaskNode({ name: 'handleTechnical', prompt: 'Provide technical support for: {ticket}', agentId: technicalAgent.id, dependencies: ['classify'] }); supportPipeline.addTaskNode({ name: 'handleBilling', prompt: 'Handle billing issue: {ticket}', agentId: billingAgent.id, dependencies: ['classify'] }); supportPipeline.addTaskNode({ name: 'handleGeneral', prompt: 'Answer general inquiry: {ticket}', agentId: generalAgent.id, dependencies: ['classify'] }); // Step 3: Review quality supportPipeline.addTaskNode({ name: 'review', prompt: 'Review these responses: Technical: {handleTechnical}, Billing: {handleBilling}, General: {handleGeneral}. Select the best response based on classification: {classify}', agentId: qualityAgent.id, dependencies: ['handleTechnical', 'handleBilling', 'handleGeneral'] }); // Execute the pipeline const ticket = { id: 'TICKET-123', subject: 'Cannot login to my account', message: 'I keep getting an error when trying to login. It says invalid credentials but I know my password is correct.', user: { email: '[email protected]', accountType: 'premium' } }; const result = await supportPipeline.run({ ticket: JSON.stringify(ticket) }); if (result.success) { const classification = JSON.parse(result.results.classify); const response = JSON.parse(result.results.review); console.log('Category:', classification.category); console.log('Response:', response.selectedResponse); console.log('Confidence:', response.confidence); } ``` This workflow classifies tickets, generates responses from multiple specialized agents, and selects the best response through quality review. The pipeline handles the orchestration automatically. ## Key Concepts Successful workflow design follows several principles. First, create specialized agents with clear responsibilities. Each agent should excel at one type of task rather than trying to do everything. Second, minimize dependencies where possible. Independent tasks run in parallel, reducing total execution time. Only add dependencies when one task truly needs another's output. Third, design prompts that reference other task outputs explicitly. Use `{taskId}` syntax to pull results from dependencies. This makes data flow clear and maintainable. Fourth, handle errors gracefully at the pipeline level. Check `result.success` before accessing task outputs. Plan for failures and implement appropriate recovery strategies. ## Advanced Patterns Complex workflows can implement sophisticated patterns. Implement map-reduce by running the same agent on multiple inputs in parallel, then combining results: ```typescript const documents = ['doc1.txt', 'doc2.txt', 'doc3.txt', 'doc4.txt']; const summaryPipeline = new Graph({ name: 'Document Summary' }, summaryAgent); // Add a task for each document (runs in parallel) documents.forEach((doc, index) => { summaryPipeline.addTaskNode({ name: `summarize_${index}`, prompt: `Summarize this document: {document_${index}}`, agentId: summaryAgent.id }); }); // Combine all summaries summaryPipeline.addTaskNode({ name: 'combine', prompt: `Combine these summaries into one: ${documents.map((_, i) => `{summarize_${i}}`).join(', ')}`, agentId: summaryAgent.id, dependencies: documents.map((_, i) => `summarize_${i}`) }); ``` Implement validation chains where each stage validates different aspects: ```typescript validationPipeline.addTaskNode({ name: 'syntaxCheck', prompt: 'Validate syntax: {input}', agentId: validatorAgent.id }); validationPipeline.addTaskNode({ name: 'semanticCheck', prompt: 'Validate semantics: {input}', agentId: validatorAgent.id, dependencies: ['syntaxCheck'] }); validationPipeline.addTaskNode({ name: 'businessRules', prompt: 'Validate business rules: {input}', agentId: validatorAgent.id, dependencies: ['semanticCheck'] }); ``` Each validation stage only runs if the previous stage passes. This creates fail-fast behavior that saves resources. ## Conclusion Astreus workflows coordinate multiple specialized agents through dependency-based execution. The core pattern is simple: create agents with `Agent.create()`, initialize a pipeline with `new Graph()`, add tasks with `addTaskNode()`, and execute with `run()`. Parallel execution happens automatically when tasks lack dependencies. Complex workflows emerge from combining simple patterns. Start with linear pipelines, then add parallel branches as needed. Always design for failure by checking result status and handling errors appropriately. The key to effective workflows is matching agent capabilities to task requirements and minimizing unnecessary dependencies. This maximizes parallelism while maintaining correct execution order. This experiment is written for Astreus v0.5.37. Please ensure you are using a compatible version.