OpenAI AgentKit Complete Guide Part 2: Production Deployment and Advanced Patterns
From enterprise-grade multi-agent system design to production deployment, master AgentKit implementation in real-world scenarios
Series: Master OpenAI AgentKit (2/2)
- OpenAI AgentKit Complete Guide Part 1: Core Concepts and Getting Started
- OpenAI AgentKit Complete Guide Part 2: Production Deployment and Advanced Patterns ← Current Article
In Part 1, we learned AgentKit’s basic concepts and foundational tutorials. In Part 2, we’ll explore how to build enterprise-grade systems in actual production environments.
1. Enterprise Architecture Patterns
1.1 Hierarchical Manager-Worker Pattern
In large-scale projects, a single agent cannot handle all tasks. A hierarchical structure where a manager agent formulates overall strategy and delegates work to specialized worker agents is effective.
Architecture Diagram:
graph TD
User[User] -->|Request| Manager[Manager Agent<br/>Project Orchestrator]
Manager -->|Delegate Task| Frontend[Frontend<br/>Specialist]
Manager -->|Delegate Task| Backend[Backend<br/>Specialist]
Manager -->|Delegate Task| DevOps[DevOps<br/>Specialist]
Frontend -->|Report| Manager
Backend -->|Report| Manager
DevOps -->|Report| Manager
Manager -->|Integrated Result| User
style Manager fill:#10A37F,color:#fff
style Frontend fill:#4A90E2,color:#fff
style Backend fill:#E94B3C,color:#fff
style DevOps fill:#F5A623,color:#fff
Implementation Example:
from openai_agents import Agent, Runner
# Manager Agent
manager = Agent(
name="Project Manager",
instructions="""
You are a technical project manager for building web applications.
Your responsibilities:
1. Analyze user requirements
2. Break down into tasks
3. Delegate to specialist agents
4. Coordinate between teams
5. Ensure quality standards
6. Report progress
""",
handoffs=["Frontend Specialist", "Backend Specialist", "DevOps Specialist"]
)
# Frontend Specialist
frontend = Agent(
name="Frontend Specialist",
instructions="""
You are a React/Next.js expert.
Specialization:
- Component design
- State management (Redux/Zustand)
- Responsive UI/UX
- Performance optimization
- Accessibility (WCAG 2.1 AA)
Hand back to manager when complete.
""",
handoffs=["Project Manager"]
)
# Backend Specialist
backend = Agent(
name="Backend Specialist",
instructions="""
You are a Node.js/Python/Go backend expert.
Specialization:
- RESTful/GraphQL API design
- Database optimization (PostgreSQL/MongoDB)
- Authentication/Authorization (OAuth 2.0/JWT)
- Microservices architecture
- API security (OWASP Top 10)
Hand back to manager when complete.
""",
handoffs=["Project Manager"]
)
# DevOps Specialist
devops = Agent(
name="DevOps Specialist",
instructions="""
You are a Kubernetes/Docker/CI/CD expert.
Specialization:
- Container orchestration
- GitHub Actions/GitLab CI configuration
- Infrastructure as Code (Terraform)
- Monitoring (Prometheus/Grafana)
- Zero-downtime deployment
Hand back to manager when complete.
""",
handoffs=["Project Manager"]
)
# Execute agents
runner = Runner(agents=[manager, frontend, backend, devops])
result = runner.run(
agent=manager,
messages=[{
"role": "user",
"content": "Build an e-commerce platform with user authentication, product catalog, and payment system."
}]
)
Execution Flow:
-
Manager Analysis: Breaks requirement into 3 subtasks
- Frontend: Product listing UI, shopping cart UI
- Backend: Product API, payment processing logic
- DevOps: Kubernetes deployment configuration
-
Parallel Execution: 3 specialists work simultaneously
-
Integration: Manager consolidates all deliverables into final report
1.2 Event-Driven Orchestration
For real-time systems, a pattern that reacts to events and activates appropriate agents is effective.
Architecture Diagram:
graph LR
Event[Event Bus<br/>RabbitMQ/Kafka] --> Router[Router Agent<br/>Event Classification]
Router -->|payment_received| Payment[Payment<br/>Processor]
Router -->|order_placed| Order[Order<br/>Handler]
Router -->|support_ticket| Support[Customer<br/>Support]
Router -->|fraud_alert| Fraud[Fraud<br/>Detection]
Payment --> DB[(Database)]
Order --> DB
Support --> DB
Fraud --> DB
style Event fill:#FF6B6B,color:#fff
style Router fill:#10A37F,color:#fff
Implementation Example:
from openai_agents import Agent, Runner
import asyncio
from typing import Dict, Any
class EventDrivenOrchestrator:
def __init__(self):
# Router Agent
self.router = Agent(
name="Event Router",
instructions="""
Analyze event type and route to appropriate specialist agent.
Routing logic:
- payment_received → Payment Processor
- order_placed → Order Handler
- support_ticket → Customer Support
- fraud_alert → Fraud Detection
"""
)
# Specialist Agents
self.payment_processor = Agent(
name="Payment Processor",
instructions="Process payments and record transactions."
)
self.order_handler = Agent(
name="Order Handler",
instructions="Process orders, update inventory, and initiate shipping."
)
self.support_agent = Agent(
name="Customer Support",
instructions="Analyze support tickets and auto-respond or escalate to humans."
)
self.fraud_detector = Agent(
name="Fraud Detection",
instructions="Analyze fraudulent transaction patterns and flag/block accounts."
)
self.runner = Runner(agents=[
self.router,
self.payment_processor,
self.order_handler,
self.support_agent,
self.fraud_detector
])
async def handle_event(self, event: Dict[str, Any]):
"""Route event to appropriate agent"""
event_type = event.get("type")
# Select agent
agent_map = {
"payment_received": self.payment_processor,
"order_placed": self.order_handler,
"support_ticket": self.support_agent,
"fraud_alert": self.fraud_detector
}
target_agent = agent_map.get(event_type)
if not target_agent:
print(f"Unknown event type: {event_type}")
return
# Execute agent
result = self.runner.run(
agent=target_agent,
messages=[{
"role": "user",
"content": f"Process event: {event}"
}]
)
return result
# Usage example
orchestrator = EventDrivenOrchestrator()
# Simulated events
events = [
{"type": "payment_received", "amount": 99.99, "user_id": "user123"},
{"type": "order_placed", "order_id": "ORD-456", "items": [{"id": "ITEM-1", "qty": 2}]},
{"type": "support_ticket", "ticket_id": "TKT-789", "issue": "Delivery delay"},
{"type": "fraud_alert", "user_id": "user999", "reason": "Abnormal login pattern"}
]
# Process events
async def main():
tasks = [orchestrator.handle_event(event) for event in events]
results = await asyncio.gather(*tasks)
print(f"Processed {len(results)} events")
asyncio.run(main())
1.3 Graph-Based Workflow (LangGraph Integration)
For complex decision flows, a graph structure with conditional branching and dynamic routing is suitable.
Workflow Diagram:
graph TD
Start([User Input]) --> Classifier[Classifier<br/>Intent Analysis]
Classifier -->|Technical Question| TechSupport[Technical Support<br/>Agent]
Classifier -->|Billing Issue| Billing[Billing<br/>Agent]
Classifier -->|Unclear| Clarify[Clarification<br/>Agent]
TechSupport -->|Resolved| End([Complete])
TechSupport -->|Unresolved| Human[Human<br/>Escalation]
Billing -->|Approval Needed| Approval[Approval<br/>Process]
Billing -->|Auto-Process| End
Clarify --> Classifier
Approval -->|Approved| End
Approval -->|Rejected| Human
Human --> End
style Start fill:#10A37F,color:#fff
style Classifier fill:#4A90E2,color:#fff
style End fill:#10A37F,color:#fff
Implementation Example (using LangGraph):
from langgraph.graph import Graph, StateGraph
from openai_agents import Agent, Runner
from typing import TypedDict, Literal
class WorkflowState(TypedDict):
user_input: str
intent: str
resolved: bool
escalated: bool
response: str
# Agent definitions
classifier = Agent(
name="Intent Classifier",
instructions="Classify user intent into 'technical', 'billing', or 'unclear'."
)
tech_support = Agent(
name="Tech Support",
instructions="Troubleshoot technical issues. Be honest if you cannot resolve."
)
billing_agent = Agent(
name="Billing Agent",
instructions="Handle billing inquiries. Refunds over $100 require approval."
)
clarify_agent = Agent(
name="Clarifier",
instructions="Ask questions to clarify unclear user queries."
)
# Build workflow graph
workflow = StateGraph(WorkflowState)
def classify_intent(state: WorkflowState) -> WorkflowState:
"""Classify intent"""
runner = Runner(agents=[classifier])
result = runner.run(
agent=classifier,
messages=[{"role": "user", "content": state["user_input"]}]
)
state["intent"] = result.output # 'technical', 'billing', 'unclear'
return state
def handle_technical(state: WorkflowState) -> WorkflowState:
"""Handle technical support"""
runner = Runner(agents=[tech_support])
result = runner.run(
agent=tech_support,
messages=[{"role": "user", "content": state["user_input"]}]
)
# Escalate if contains "unknown" or "cannot resolve"
if "unknown" in result.output.lower() or "cannot resolve" in result.output.lower():
state["escalated"] = True
else:
state["resolved"] = True
state["response"] = result.output
return state
def handle_billing(state: WorkflowState) -> WorkflowState:
"""Handle billing"""
runner = Runner(agents=[billing_agent])
result = runner.run(
agent=billing_agent,
messages=[{"role": "user", "content": state["user_input"]}]
)
# Refund requests over $100 require approval
if "refund" in state["user_input"].lower() and any(
str(amt) in state["user_input"] for amt in range(101, 10000)
):
state["response"] = "Awaiting approval"
else:
state["resolved"] = True
state["response"] = result.output
return state
def handle_unclear(state: WorkflowState) -> WorkflowState:
"""Clarify unclear input"""
runner = Runner(agents=[clarify_agent])
result = runner.run(
agent=clarify_agent,
messages=[{"role": "user", "content": state["user_input"]}]
)
state["response"] = result.output
# Loop back for reclassification
return state
def route_based_on_intent(state: WorkflowState) -> Literal["technical", "billing", "unclear"]:
"""Route based on intent"""
return state["intent"]
def check_if_done(state: WorkflowState) -> Literal["end", "escalate"]:
"""Check if done or needs escalation"""
if state.get("escalated"):
return "escalate"
elif state.get("resolved"):
return "end"
else:
return "end" # Default
# Add graph nodes
workflow.add_node("classify", classify_intent)
workflow.add_node("technical", handle_technical)
workflow.add_node("billing", handle_billing)
workflow.add_node("unclear", handle_unclear)
# Define edges
workflow.set_entry_point("classify")
workflow.add_conditional_edges(
"classify",
route_based_on_intent,
{
"technical": "technical",
"billing": "billing",
"unclear": "unclear"
}
)
workflow.add_conditional_edges("technical", check_if_done)
workflow.add_conditional_edges("billing", check_if_done)
workflow.add_edge("unclear", "classify") # Loop back
# Compile graph
app = workflow.compile()
# Execute
result = app.invoke({
"user_input": "I can't log in. I reset my password but still can't access my account.",
"intent": "",
"resolved": False,
"escalated": False,
"response": ""
})
print(f"Final result: {result['response']}")
print(f"Resolved: {result['resolved']}, Escalated: {result['escalated']}")
2. Custom MCP Server Development
AgentKit’s true power lies in building custom tools through the MCP protocol. Let’s see how to integrate with internal enterprise systems (Slack, JIRA, internal databases, etc.).
2.1 Slack Integration MCP Server
Use Case: Enable agents to send Slack messages, read channel history, and add reactions.
Implementation:
from mcp import Server, Tool, Context
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import os
from typing import Dict, Any, List
class SlackMCPServer(Server):
def __init__(self):
super().__init__(
name="slack",
version="1.0.0",
description="Slack workspace operations"
)
self.client = WebClient(token=os.environ["SLACK_BOT_TOKEN"])
@Tool(
name="send_message",
description="Send a message to a Slack channel",
parameters={
"channel": {
"type": "string",
"description": "Channel ID or name (e.g., #general)"
},
"text": {
"type": "string",
"description": "Message text to send"
},
"thread_ts": {
"type": "string",
"description": "Optional: timestamp for thread reply",
"optional": True
}
}
)
async def send_message(
self,
channel: str,
text: str,
thread_ts: str = None,
context: Context = None
) -> Dict[str, Any]:
"""Send message to Slack"""
try:
response = self.client.chat_postMessage(
channel=channel,
text=text,
thread_ts=thread_ts
)
return {
"success": True,
"ts": response["ts"],
"channel": response["channel"]
}
except SlackApiError as e:
return {
"success": False,
"error": str(e)
}
@Tool(
name="get_channel_history",
description="Get recent message history from a channel",
parameters={
"channel": {
"type": "string",
"description": "Channel ID or name"
},
"limit": {
"type": "integer",
"description": "Number of messages to retrieve (default: 10)",
"optional": True
}
}
)
async def get_channel_history(
self,
channel: str,
limit: int = 10,
context: Context = None
) -> List[Dict[str, Any]]:
"""Get channel history"""
try:
response = self.client.conversations_history(
channel=channel,
limit=limit
)
messages = response["messages"]
# Resolve usernames
parsed_messages = []
for msg in messages:
user_id = msg.get("user")
user_info = self.client.users_info(user=user_id) if user_id else None
parsed_messages.append({
"text": msg.get("text"),
"user": user_info["user"]["real_name"] if user_info else "Unknown",
"timestamp": msg.get("ts")
})
return parsed_messages
except SlackApiError as e:
return [{"error": str(e)}]
@Tool(
name="add_reaction",
description="Add emoji reaction to a message",
parameters={
"channel": {"type": "string", "description": "Channel ID"},
"timestamp": {"type": "string", "description": "Message timestamp"},
"emoji": {"type": "string", "description": "Emoji name (e.g., thumbsup)"}
}
)
async def add_reaction(
self,
channel: str,
timestamp: str,
emoji: str,
context: Context = None
) -> Dict[str, bool]:
"""Add reaction to message"""
try:
self.client.reactions_add(
channel=channel,
timestamp=timestamp,
name=emoji
)
return {"success": True}
except SlackApiError as e:
return {"success": False, "error": str(e)}
# Launch MCP server
if __name__ == "__main__":
server = SlackMCPServer()
server.run(transport="stdio") # stdio or websocket
Using in Agent:
from openai_agents import Agent, Runner
import subprocess
import json
# Start MCP server process
mcp_process = subprocess.Popen(
["python", "slack_mcp_server.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Agent with Slack tools
slack_agent = Agent(
name="Slack Assistant",
instructions="""
You are a Slack admin assistant.
Capabilities:
- Send messages
- Check channel history
- Add emoji reactions
Analyze user requests and execute appropriate Slack operations.
""",
mcp_servers=["slack"] # Connect to MCP server
)
# Execute
runner = Runner(agents=[slack_agent])
result = runner.run(
agent=slack_agent,
messages=[{
"role": "user",
"content": "Post 'Production deployment complete ✅' to #engineering channel"
}]
)
print(result.output)
2.2 MCP Server Best Practices
- Error Handling: Wrap all tool calls with
try-except - Authentication: Manage API keys via environment variables (
.envfile) - Rate Limiting: Consider API rate limits (implement backoff strategy)
- Logging: Detailed logs for debugging
- Type Safety: Validate parameters with Pydantic
- Documentation: Clear descriptions for each tool
3. Production Monitoring (Using Evals)
AgentKit’s Evals feature continuously monitors and optimizes agent performance.
3.1 A/B Testing for Prompt Optimization
Scenario: Compare two prompt versions for a customer support agent
from openai_agents import Agent, Runner
from openai_agents.evals import ABTest, Metric
import random
# Version A: Formal
agent_a = Agent(
name="Support Agent A",
instructions="""
You are a professional customer support representative.
Guidelines:
- Formal and polite language
- Detailed explanations
- Adhere to company policies
"""
)
# Version B: Casual
agent_b = Agent(
name="Support Agent B",
instructions="""
You are a friendly customer support representative.
Guidelines:
- Casual and approachable language
- Concise responses
- Show empathy
"""
)
# A/B test configuration
ab_test = ABTest(
name="Support Tone Experiment",
variants={
"formal": agent_a,
"casual": agent_b
},
traffic_split={"formal": 0.5, "casual": 0.5}, # 50/50 split
metrics=[
Metric(name="response_time", type="latency"),
Metric(name="user_satisfaction", type="rating"), # 1-5 scale
Metric(name="resolution_rate", type="binary") # Resolved/Unresolved
]
)
# Simulate user requests
test_queries = [
"My order hasn't arrived",
"I want to request a refund",
"How do I cancel my account?",
# ... 100 test cases
]
runner = Runner(agents=[agent_a, agent_b])
for query in test_queries:
# Randomly select variant (50/50)
variant = random.choice(["formal", "casual"])
agent = agent_a if variant == "formal" else agent_b
result = runner.run(
agent=agent,
messages=[{"role": "user", "content": query}]
)
# Record metrics
ab_test.record_result(
variant=variant,
query=query,
metrics={
"response_time": result.latency,
"user_satisfaction": 4.2, # Actually from user feedback
"resolution_rate": 1 if "resolved" in result.output.lower() else 0
}
)
# Analyze results
report = ab_test.analyze()
print(f"""
A/B Test Results:
Formal Version:
- Avg Response Time: {report['formal']['response_time']}s
- User Satisfaction: {report['formal']['user_satisfaction']}/5
- Resolution Rate: {report['formal']['resolution_rate']}%
Casual Version:
- Avg Response Time: {report['casual']['response_time']}s
- User Satisfaction: {report['casual']['user_satisfaction']}/5
- Resolution Rate: {report['casual']['resolution_rate']}%
Winner: {report['winner']} (Confidence: {report['confidence']}%)
""")
3.2 Automatic Prompt Optimization
Evals can automatically improve prompts using machine learning:
from openai_agents.evals import PromptOptimizer
optimizer = PromptOptimizer(
agent=agent_a,
objective="user_satisfaction", # Metric to optimize
baseline_score=3.8, # Current baseline
improvement_threshold=0.2 # Minimum improvement
)
# Optimize over 100 iterations
best_prompt = optimizer.optimize(
test_dataset=test_queries,
iterations=100,
techniques=["few-shot", "chain-of-thought", "self-critique"]
)
print(f"Optimized prompt:\n{best_prompt}")
print(f"Improvement: +{optimizer.improvement} (satisfaction 3.8 → 4.0)")
4. Enterprise Security and Compliance
4.1 Multi-Tenancy Implementation
Scenario: SaaS platform where multiple customers share agents, but data is strictly isolated
from openai_agents import Agent, Runner, Session
from typing import Dict, Any
import hashlib
class TenantIsolatedAgent:
def __init__(self):
self.base_agent = Agent(
name="Multi-Tenant Assistant",
instructions="""
You are a multi-tenant SaaS assistant.
Critical security rules:
1. tenant_id is immutable
2. Never access other tenants' data
3. Always add tenant_id filter to queries
"""
)
self.runner = Runner(agents=[self.base_agent])
def create_tenant_session(self, tenant_id: str) -> Session:
"""Create tenant-specific session"""
# Namespace session ID with tenant ID
session_id = hashlib.sha256(f"{tenant_id}:session".encode()).hexdigest()
session = Session(
id=session_id,
metadata={
"tenant_id": tenant_id,
"data_scope": f"WHERE tenant_id = '{tenant_id}'", # SQL injection prevention
"access_level": "tenant_isolated"
}
)
return session
def query_with_isolation(
self,
tenant_id: str,
user_query: str
) -> Dict[str, Any]:
"""Execute tenant-isolated query"""
session = self.create_tenant_session(tenant_id)
# Inject tenant context into system message
result = self.runner.run(
agent=self.base_agent,
session=session,
messages=[
{
"role": "system",
"content": f"[SECURITY] You are currently operating in context of tenant_id='{tenant_id}'. Never access other tenant data."
},
{
"role": "user",
"content": user_query
}
]
)
return result
# Usage example
assistant = TenantIsolatedAgent()
# Tenant A
result_a = assistant.query_with_isolation(
tenant_id="tenant_abc123",
user_query="Show last month's sales report"
)
# Tenant B (completely isolated)
result_b = assistant.query_with_isolation(
tenant_id="tenant_xyz789",
user_query="Show last month's sales report"
)
# result_a and result_b return different data
4.2 GDPR/CCPA Compliance
Data Deletion Request Handling:
from openai_agents import Agent, Runner
from datetime import datetime
import json
class GDPRCompliantAgent:
def __init__(self):
self.agent = Agent(
name="GDPR Agent",
instructions="""
You are a GDPR compliance manager.
User rights:
1. Right to Access
2. Right to Erasure
3. Right to Data Portability
4. Right to Restriction of Processing
"""
)
self.audit_log = []
def handle_deletion_request(self, user_id: str) -> Dict[str, Any]:
"""Handle user data deletion request"""
runner = Runner(agents=[self.agent])
result = runner.run(
agent=self.agent,
messages=[{
"role": "user",
"content": f"Delete all personal data for User ID {user_id}. Under GDPR Article 17."
}]
)
# Record audit log
self.audit_log.append({
"timestamp": datetime.now().isoformat(),
"action": "data_deletion",
"user_id": user_id,
"status": "completed",
"regulation": "GDPR Article 17"
})
return {
"success": True,
"deleted_records": result.output,
"audit_id": len(self.audit_log)
}
def export_user_data(self, user_id: str) -> str:
"""Export user data in JSON format"""
runner = Runner(agents=[self.agent])
result = runner.run(
agent=self.agent,
messages=[{
"role": "user",
"content": f"Export all personal data for User ID {user_id} in JSON format. Under GDPR Article 20."
}]
)
# Record audit log
self.audit_log.append({
"timestamp": datetime.now().isoformat(),
"action": "data_export",
"user_id": user_id,
"regulation": "GDPR Article 20"
})
return result.output # JSON string
def get_audit_trail(self) -> str:
"""Get audit trail"""
return json.dumps(self.audit_log, indent=2)
# Usage example
gdpr_agent = GDPRCompliantAgent()
# User requests data deletion
deletion_result = gdpr_agent.handle_deletion_request(user_id="user_12345")
print(f"Deletion complete: {deletion_result['deleted_records']} records")
# User requests data export
exported_data = gdpr_agent.export_user_data(user_id="user_67890")
print(f"Exported data:\n{exported_data}")
# Check audit trail
audit_trail = gdpr_agent.get_audit_trail()
print(f"Audit log:\n{audit_trail}")
5. Real-World Case Studies
Case Study 1: SaaS Onboarding Automation
Company: ProjectManage Pro (fictional project management SaaS)
Challenge: Slow new user onboarding process (average 7 days), high churn rate (40% leave within 30 days)
AgentKit Implementation:
from openai_agents import Agent, Runner
# Onboarding Manager
onboarding_manager = Agent(
name="Onboarding Manager",
instructions="""
Onboard new users in 7 days.
Day 1: Account setup, first project creation
Day 2: Invite team members, configure permissions
Day 3: Create tasks, use Kanban board
Day 4: Integration setup (Slack/GitHub/Jira)
Day 5: Reporting features, dashboard
Day 6: Create automation workflows
Day 7: Best practices, pro tips
Hand off to next step after each completion.
""",
handoffs=["Setup Specialist", "Integration Specialist", "Automation Coach"]
)
# Specialist agents
setup_specialist = Agent(
name="Setup Specialist",
instructions="Guide account setup and project initialization."
)
integration_specialist = Agent(
name="Integration Specialist",
instructions="Help set up Slack, GitHub, Jira integrations."
)
automation_coach = Agent(
name="Automation Coach",
instructions="Teach workflow automation best practices."
)
# Execute
runner = Runner(agents=[
onboarding_manager,
setup_specialist,
integration_specialist,
automation_coach
])
result = runner.run(
agent=onboarding_manager,
messages=[{
"role": "user",
"content": "Start onboarding for new user John Doe."
}]
)
Results:
- Onboarding Time: 7 days → 2 days (70% reduction)
- 30-Day Churn Rate: 40% → 12% (28-point improvement)
- Activation Rate: 45% → 78% (73% increase)
- Customer Support Tickets: 50% reduction
Case Study 2: Data Pipeline Auto-Recovery
Company: DataFlow Analytics (big data analytics platform)
Challenge: Frequent ETL pipeline failures (5-10 times/week), manual recovery takes average 2 hours
AgentKit Implementation:
from openai_agents import Agent, Runner
import asyncio
# Incident Responder
incident_responder = Agent(
name="Incident Responder",
instructions="""
Auto-diagnose and recover data pipeline failures.
Diagnostic steps:
1. Analyze error logs
2. Identify root cause
3. Determine recovery strategy
4. Attempt auto-fix
5. Confirm success or escalate
Common issues:
- API rate limit → Backoff retry
- Data schema change → Auto-adjust schema
- Out of memory → Switch to chunk processing
- Timeout → Increase timeout settings
""",
handoffs=["Database Specialist", "API Specialist"]
)
database_specialist = Agent(
name="Database Specialist",
instructions="Handle database issues (connection, query optimization, schema)."
)
api_specialist = Agent(
name="API Specialist",
instructions="Handle external API integration issues (auth, rate limits, response format)."
)
# Monitoring loop
async def monitor_pipeline():
runner = Runner(agents=[incident_responder, database_specialist, api_specialist])
while True:
# Pipeline health check (every 5 minutes)
await asyncio.sleep(300)
# Simulate failure detection
pipeline_status = check_pipeline_health() # Fictional function
if pipeline_status["status"] == "failed":
print(f"Failure detected: {pipeline_status['error']}")
# Auto-trigger agent
result = runner.run(
agent=incident_responder,
messages=[{
"role": "user",
"content": f"Fix pipeline failure: {pipeline_status['error']}"
}]
)
if "Fixed" in result.output:
print(f"Auto-recovery successful: {result.output}")
else:
print(f"Escalate to human: {result.output}")
# Execute
asyncio.run(monitor_pipeline())
Results:
- MTTR (Mean Time To Recovery): 2 hours → 22 minutes (82% reduction)
- Auto-Recovery Success Rate: 0% → 73%
- On-Call Engineer Load: 40 hours/week → 8 hours/week (80% reduction)
- Pipeline Uptime: 92% → 99.2%
Case Study 3: DevOps Workflow Automation
Company: MicroServices Corp (microservices architecture company)
Challenge: 50+ daily deployments, manual review is bottleneck (average 3-hour deployment wait)
AgentKit Implementation:
from openai_agents import Agent, Runner
# Deploy Manager
deploy_manager = Agent(
name="Deploy Manager",
instructions="""
Auto-review pull requests and deploy.
Review criteria:
1. All tests pass
2. Code coverage 80%+
3. Security scan clean
4. No performance regression
5. No breaking changes
Approve → Staging → Canary deploy → Production
Detect issues → Auto-rollback
""",
handoffs=["Test Analyzer", "Security Scanner", "Performance Monitor"]
)
test_analyzer = Agent(
name="Test Analyzer",
instructions="Analyze test results, check coverage and pass rate."
)
security_scanner = Agent(
name="Security Scanner",
instructions="Scan for OWASP Top 10, dependency vulnerabilities."
)
performance_monitor = Agent(
name="Performance Monitor",
instructions="Monitor response time, throughput, error rate."
)
# Triggered by GitHub webhook
def on_pull_request_opened(pr_data):
runner = Runner(agents=[
deploy_manager,
test_analyzer,
security_scanner,
performance_monitor
])
result = runner.run(
agent=deploy_manager,
messages=[{
"role": "user",
"content": f"Review and deploy PR #{pr_data['number']}."
}]
)
if "Approved" in result.output:
print(f"Auto-deploy starting: {result.output}")
else:
print(f"Deploy blocked: {result.output}")
Results:
- Deployment Wait Time: 3 hours → 8 minutes (96% reduction)
- Manual Review Burden: 8 hours/day → 30 minutes/day (94% reduction)
- Deployment Frequency: 50/day → 200/day (300% increase)
- Production Incidents: 3/week → 1/month (92% reduction)
6. Performance Optimization
6.1 Parallel Processing
Execute multiple independent tasks in parallel to reduce latency:
from openai_agents import Agent, Runner
import asyncio
agents = [
Agent(name="Agent 1", instructions="Process task 1"),
Agent(name="Agent 2", instructions="Process task 2"),
Agent(name="Agent 3", instructions="Process task 3")
]
runner = Runner(agents=agents)
# Parallel execution
async def run_parallel():
tasks = [
runner.run_async(agent=agents[0], messages=[{"role": "user", "content": "Task 1"}]),
runner.run_async(agent=agents[1], messages=[{"role": "user", "content": "Task 2"}]),
runner.run_async(agent=agents[2], messages=[{"role": "user", "content": "Task 3"}])
]
results = await asyncio.gather(*tasks)
return results
results = asyncio.run(run_parallel())
# 3 seconds → 1 second (67% faster)
6.2 Streaming Responses
Use streaming for long responses to improve First Byte Time (FBT):
from openai_agents import Agent, Runner
agent = Agent(
name="Streaming Agent",
instructions="Provide detailed responses via streaming."
)
runner = Runner(agents=[agent])
# Enable streaming
for chunk in runner.run_stream(
agent=agent,
messages=[{"role": "user", "content": "Provide detailed project analysis"}]
):
print(chunk, end="", flush=True) # Real-time display
6.3 Batch Processing
Batch similar requests to reduce API costs:
from openai_agents import Agent, Runner
agent = Agent(name="Batch Processor", instructions="Process requests in batches.")
runner = Runner(agents=[agent])
# Batch process 100 requests
requests = [f"Request {i}" for i in range(100)]
results = runner.run_batch(
agent=agent,
messages=[{"role": "user", "content": req} for req in requests],
batch_size=10 # Process 10 at a time
)
# 100 API calls → 10 API calls (90% reduction)
Conclusion
In Part 2, we explored AgentKit’s advanced features:
- Enterprise Architecture Patterns: Hierarchical, event-driven, and graph-based patterns
- Custom MCP Servers: How to develop custom tools like Slack integration
- Production Monitoring: A/B testing and auto-prompt optimization with Evals
- Security and Compliance: Multi-tenancy, GDPR compliance
- Real-World Case Studies: SaaS onboarding 70% time reduction, data pipeline auto-recovery 82% MTTR reduction, DevOps manual review 94% reduction
- Performance Optimization: Parallel processing, streaming, batch processing
AgentKit is a powerful platform for building AI agent systems at any scale, from theory to practice.
Next Steps:
- OpenAI AgentKit Official Docs
- MCP Protocol Specification
- Agent Builder Beta Access
- Community Examples
Start building production-grade agents today!
Was this helpful?
Your support helps me create better content. Buy me a coffee! ☕