OpenAI AgentKit Complete Guide Part 2: Production Deployment and Advanced Patterns

OpenAI AgentKit Complete Guide Part 2: Production Deployment and Advanced Patterns

From enterprise-grade multi-agent system design to production deployment, master AgentKit implementation in real-world scenarios

Series: Master OpenAI AgentKit (2/2)

  1. OpenAI AgentKit Complete Guide Part 1: Core Concepts and Getting Started
  2. OpenAI AgentKit Complete Guide Part 2: Production Deployment and Advanced Patterns ← Current Article

In Part 1, we learned AgentKit’s basic concepts and foundational tutorials. In Part 2, we’ll explore how to build enterprise-grade systems in actual production environments.

1. Enterprise Architecture Patterns

1.1 Hierarchical Manager-Worker Pattern

In large-scale projects, a single agent cannot handle all tasks. A hierarchical structure where a manager agent formulates overall strategy and delegates work to specialized worker agents is effective.

Architecture Diagram:

graph TD
    User[User] -->|Request| Manager[Manager Agent<br/>Project Orchestrator]
    Manager -->|Delegate Task| Frontend[Frontend<br/>Specialist]
    Manager -->|Delegate Task| Backend[Backend<br/>Specialist]
    Manager -->|Delegate Task| DevOps[DevOps<br/>Specialist]

    Frontend -->|Report| Manager
    Backend -->|Report| Manager
    DevOps -->|Report| Manager

    Manager -->|Integrated Result| User

    style Manager fill:#10A37F,color:#fff
    style Frontend fill:#4A90E2,color:#fff
    style Backend fill:#E94B3C,color:#fff
    style DevOps fill:#F5A623,color:#fff

Implementation Example:

from openai_agents import Agent, Runner

# Manager Agent
manager = Agent(
    name="Project Manager",
    instructions="""
    You are a technical project manager for building web applications.

    Your responsibilities:
    1. Analyze user requirements
    2. Break down into tasks
    3. Delegate to specialist agents
    4. Coordinate between teams
    5. Ensure quality standards
    6. Report progress
    """,
    handoffs=["Frontend Specialist", "Backend Specialist", "DevOps Specialist"]
)

# Frontend Specialist
frontend = Agent(
    name="Frontend Specialist",
    instructions="""
    You are a React/Next.js expert.

    Specialization:
    - Component design
    - State management (Redux/Zustand)
    - Responsive UI/UX
    - Performance optimization
    - Accessibility (WCAG 2.1 AA)

    Hand back to manager when complete.
    """,
    handoffs=["Project Manager"]
)

# Backend Specialist
backend = Agent(
    name="Backend Specialist",
    instructions="""
    You are a Node.js/Python/Go backend expert.

    Specialization:
    - RESTful/GraphQL API design
    - Database optimization (PostgreSQL/MongoDB)
    - Authentication/Authorization (OAuth 2.0/JWT)
    - Microservices architecture
    - API security (OWASP Top 10)

    Hand back to manager when complete.
    """,
    handoffs=["Project Manager"]
)

# DevOps Specialist
devops = Agent(
    name="DevOps Specialist",
    instructions="""
    You are a Kubernetes/Docker/CI/CD expert.

    Specialization:
    - Container orchestration
    - GitHub Actions/GitLab CI configuration
    - Infrastructure as Code (Terraform)
    - Monitoring (Prometheus/Grafana)
    - Zero-downtime deployment

    Hand back to manager when complete.
    """,
    handoffs=["Project Manager"]
)

# Execute agents
runner = Runner(agents=[manager, frontend, backend, devops])
result = runner.run(
    agent=manager,
    messages=[{
        "role": "user",
        "content": "Build an e-commerce platform with user authentication, product catalog, and payment system."
    }]
)

Execution Flow:

  1. Manager Analysis: Breaks requirement into 3 subtasks

    • Frontend: Product listing UI, shopping cart UI
    • Backend: Product API, payment processing logic
    • DevOps: Kubernetes deployment configuration
  2. Parallel Execution: 3 specialists work simultaneously

  3. Integration: Manager consolidates all deliverables into final report

1.2 Event-Driven Orchestration

For real-time systems, a pattern that reacts to events and activates appropriate agents is effective.

Architecture Diagram:

graph LR
    Event[Event Bus<br/>RabbitMQ/Kafka] --> Router[Router Agent<br/>Event Classification]

    Router -->|payment_received| Payment[Payment<br/>Processor]
    Router -->|order_placed| Order[Order<br/>Handler]
    Router -->|support_ticket| Support[Customer<br/>Support]
    Router -->|fraud_alert| Fraud[Fraud<br/>Detection]

    Payment --> DB[(Database)]
    Order --> DB
    Support --> DB
    Fraud --> DB

    style Event fill:#FF6B6B,color:#fff
    style Router fill:#10A37F,color:#fff

Implementation Example:

from openai_agents import Agent, Runner
import asyncio
from typing import Dict, Any

class EventDrivenOrchestrator:
    def __init__(self):
        # Router Agent
        self.router = Agent(
            name="Event Router",
            instructions="""
            Analyze event type and route to appropriate specialist agent.

            Routing logic:
            - payment_received → Payment Processor
            - order_placed → Order Handler
            - support_ticket → Customer Support
            - fraud_alert → Fraud Detection
            """
        )

        # Specialist Agents
        self.payment_processor = Agent(
            name="Payment Processor",
            instructions="Process payments and record transactions."
        )

        self.order_handler = Agent(
            name="Order Handler",
            instructions="Process orders, update inventory, and initiate shipping."
        )

        self.support_agent = Agent(
            name="Customer Support",
            instructions="Analyze support tickets and auto-respond or escalate to humans."
        )

        self.fraud_detector = Agent(
            name="Fraud Detection",
            instructions="Analyze fraudulent transaction patterns and flag/block accounts."
        )

        self.runner = Runner(agents=[
            self.router,
            self.payment_processor,
            self.order_handler,
            self.support_agent,
            self.fraud_detector
        ])

    async def handle_event(self, event: Dict[str, Any]):
        """Route event to appropriate agent"""
        event_type = event.get("type")

        # Select agent
        agent_map = {
            "payment_received": self.payment_processor,
            "order_placed": self.order_handler,
            "support_ticket": self.support_agent,
            "fraud_alert": self.fraud_detector
        }

        target_agent = agent_map.get(event_type)
        if not target_agent:
            print(f"Unknown event type: {event_type}")
            return

        # Execute agent
        result = self.runner.run(
            agent=target_agent,
            messages=[{
                "role": "user",
                "content": f"Process event: {event}"
            }]
        )

        return result

# Usage example
orchestrator = EventDrivenOrchestrator()

# Simulated events
events = [
    {"type": "payment_received", "amount": 99.99, "user_id": "user123"},
    {"type": "order_placed", "order_id": "ORD-456", "items": [{"id": "ITEM-1", "qty": 2}]},
    {"type": "support_ticket", "ticket_id": "TKT-789", "issue": "Delivery delay"},
    {"type": "fraud_alert", "user_id": "user999", "reason": "Abnormal login pattern"}
]

# Process events
async def main():
    tasks = [orchestrator.handle_event(event) for event in events]
    results = await asyncio.gather(*tasks)
    print(f"Processed {len(results)} events")

asyncio.run(main())

1.3 Graph-Based Workflow (LangGraph Integration)

For complex decision flows, a graph structure with conditional branching and dynamic routing is suitable.

Workflow Diagram:

graph TD
    Start([User Input]) --> Classifier[Classifier<br/>Intent Analysis]

    Classifier -->|Technical Question| TechSupport[Technical Support<br/>Agent]
    Classifier -->|Billing Issue| Billing[Billing<br/>Agent]
    Classifier -->|Unclear| Clarify[Clarification<br/>Agent]

    TechSupport -->|Resolved| End([Complete])
    TechSupport -->|Unresolved| Human[Human<br/>Escalation]

    Billing -->|Approval Needed| Approval[Approval<br/>Process]
    Billing -->|Auto-Process| End

    Clarify --> Classifier

    Approval -->|Approved| End
    Approval -->|Rejected| Human
    Human --> End

    style Start fill:#10A37F,color:#fff
    style Classifier fill:#4A90E2,color:#fff
    style End fill:#10A37F,color:#fff

Implementation Example (using LangGraph):

from langgraph.graph import Graph, StateGraph
from openai_agents import Agent, Runner
from typing import TypedDict, Literal

class WorkflowState(TypedDict):
    user_input: str
    intent: str
    resolved: bool
    escalated: bool
    response: str

# Agent definitions
classifier = Agent(
    name="Intent Classifier",
    instructions="Classify user intent into 'technical', 'billing', or 'unclear'."
)

tech_support = Agent(
    name="Tech Support",
    instructions="Troubleshoot technical issues. Be honest if you cannot resolve."
)

billing_agent = Agent(
    name="Billing Agent",
    instructions="Handle billing inquiries. Refunds over $100 require approval."
)

clarify_agent = Agent(
    name="Clarifier",
    instructions="Ask questions to clarify unclear user queries."
)

# Build workflow graph
workflow = StateGraph(WorkflowState)

def classify_intent(state: WorkflowState) -> WorkflowState:
    """Classify intent"""
    runner = Runner(agents=[classifier])
    result = runner.run(
        agent=classifier,
        messages=[{"role": "user", "content": state["user_input"]}]
    )
    state["intent"] = result.output  # 'technical', 'billing', 'unclear'
    return state

def handle_technical(state: WorkflowState) -> WorkflowState:
    """Handle technical support"""
    runner = Runner(agents=[tech_support])
    result = runner.run(
        agent=tech_support,
        messages=[{"role": "user", "content": state["user_input"]}]
    )

    # Escalate if contains "unknown" or "cannot resolve"
    if "unknown" in result.output.lower() or "cannot resolve" in result.output.lower():
        state["escalated"] = True
    else:
        state["resolved"] = True

    state["response"] = result.output
    return state

def handle_billing(state: WorkflowState) -> WorkflowState:
    """Handle billing"""
    runner = Runner(agents=[billing_agent])
    result = runner.run(
        agent=billing_agent,
        messages=[{"role": "user", "content": state["user_input"]}]
    )

    # Refund requests over $100 require approval
    if "refund" in state["user_input"].lower() and any(
        str(amt) in state["user_input"] for amt in range(101, 10000)
    ):
        state["response"] = "Awaiting approval"
    else:
        state["resolved"] = True
        state["response"] = result.output

    return state

def handle_unclear(state: WorkflowState) -> WorkflowState:
    """Clarify unclear input"""
    runner = Runner(agents=[clarify_agent])
    result = runner.run(
        agent=clarify_agent,
        messages=[{"role": "user", "content": state["user_input"]}]
    )
    state["response"] = result.output
    # Loop back for reclassification
    return state

def route_based_on_intent(state: WorkflowState) -> Literal["technical", "billing", "unclear"]:
    """Route based on intent"""
    return state["intent"]

def check_if_done(state: WorkflowState) -> Literal["end", "escalate"]:
    """Check if done or needs escalation"""
    if state.get("escalated"):
        return "escalate"
    elif state.get("resolved"):
        return "end"
    else:
        return "end"  # Default

# Add graph nodes
workflow.add_node("classify", classify_intent)
workflow.add_node("technical", handle_technical)
workflow.add_node("billing", handle_billing)
workflow.add_node("unclear", handle_unclear)

# Define edges
workflow.set_entry_point("classify")
workflow.add_conditional_edges(
    "classify",
    route_based_on_intent,
    {
        "technical": "technical",
        "billing": "billing",
        "unclear": "unclear"
    }
)
workflow.add_conditional_edges("technical", check_if_done)
workflow.add_conditional_edges("billing", check_if_done)
workflow.add_edge("unclear", "classify")  # Loop back

# Compile graph
app = workflow.compile()

# Execute
result = app.invoke({
    "user_input": "I can't log in. I reset my password but still can't access my account.",
    "intent": "",
    "resolved": False,
    "escalated": False,
    "response": ""
})

print(f"Final result: {result['response']}")
print(f"Resolved: {result['resolved']}, Escalated: {result['escalated']}")

2. Custom MCP Server Development

AgentKit’s true power lies in building custom tools through the MCP protocol. Let’s see how to integrate with internal enterprise systems (Slack, JIRA, internal databases, etc.).

2.1 Slack Integration MCP Server

Use Case: Enable agents to send Slack messages, read channel history, and add reactions.

Implementation:

from mcp import Server, Tool, Context
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import os
from typing import Dict, Any, List

class SlackMCPServer(Server):
    def __init__(self):
        super().__init__(
            name="slack",
            version="1.0.0",
            description="Slack workspace operations"
        )
        self.client = WebClient(token=os.environ["SLACK_BOT_TOKEN"])

    @Tool(
        name="send_message",
        description="Send a message to a Slack channel",
        parameters={
            "channel": {
                "type": "string",
                "description": "Channel ID or name (e.g., #general)"
            },
            "text": {
                "type": "string",
                "description": "Message text to send"
            },
            "thread_ts": {
                "type": "string",
                "description": "Optional: timestamp for thread reply",
                "optional": True
            }
        }
    )
    async def send_message(
        self,
        channel: str,
        text: str,
        thread_ts: str = None,
        context: Context = None
    ) -> Dict[str, Any]:
        """Send message to Slack"""
        try:
            response = self.client.chat_postMessage(
                channel=channel,
                text=text,
                thread_ts=thread_ts
            )
            return {
                "success": True,
                "ts": response["ts"],
                "channel": response["channel"]
            }
        except SlackApiError as e:
            return {
                "success": False,
                "error": str(e)
            }

    @Tool(
        name="get_channel_history",
        description="Get recent message history from a channel",
        parameters={
            "channel": {
                "type": "string",
                "description": "Channel ID or name"
            },
            "limit": {
                "type": "integer",
                "description": "Number of messages to retrieve (default: 10)",
                "optional": True
            }
        }
    )
    async def get_channel_history(
        self,
        channel: str,
        limit: int = 10,
        context: Context = None
    ) -> List[Dict[str, Any]]:
        """Get channel history"""
        try:
            response = self.client.conversations_history(
                channel=channel,
                limit=limit
            )
            messages = response["messages"]

            # Resolve usernames
            parsed_messages = []
            for msg in messages:
                user_id = msg.get("user")
                user_info = self.client.users_info(user=user_id) if user_id else None
                parsed_messages.append({
                    "text": msg.get("text"),
                    "user": user_info["user"]["real_name"] if user_info else "Unknown",
                    "timestamp": msg.get("ts")
                })

            return parsed_messages
        except SlackApiError as e:
            return [{"error": str(e)}]

    @Tool(
        name="add_reaction",
        description="Add emoji reaction to a message",
        parameters={
            "channel": {"type": "string", "description": "Channel ID"},
            "timestamp": {"type": "string", "description": "Message timestamp"},
            "emoji": {"type": "string", "description": "Emoji name (e.g., thumbsup)"}
        }
    )
    async def add_reaction(
        self,
        channel: str,
        timestamp: str,
        emoji: str,
        context: Context = None
    ) -> Dict[str, bool]:
        """Add reaction to message"""
        try:
            self.client.reactions_add(
                channel=channel,
                timestamp=timestamp,
                name=emoji
            )
            return {"success": True}
        except SlackApiError as e:
            return {"success": False, "error": str(e)}

# Launch MCP server
if __name__ == "__main__":
    server = SlackMCPServer()
    server.run(transport="stdio")  # stdio or websocket

Using in Agent:

from openai_agents import Agent, Runner
import subprocess
import json

# Start MCP server process
mcp_process = subprocess.Popen(
    ["python", "slack_mcp_server.py"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE
)

# Agent with Slack tools
slack_agent = Agent(
    name="Slack Assistant",
    instructions="""
    You are a Slack admin assistant.

    Capabilities:
    - Send messages
    - Check channel history
    - Add emoji reactions

    Analyze user requests and execute appropriate Slack operations.
    """,
    mcp_servers=["slack"]  # Connect to MCP server
)

# Execute
runner = Runner(agents=[slack_agent])
result = runner.run(
    agent=slack_agent,
    messages=[{
        "role": "user",
        "content": "Post 'Production deployment complete ✅' to #engineering channel"
    }]
)

print(result.output)

2.2 MCP Server Best Practices

  1. Error Handling: Wrap all tool calls with try-except
  2. Authentication: Manage API keys via environment variables (.env file)
  3. Rate Limiting: Consider API rate limits (implement backoff strategy)
  4. Logging: Detailed logs for debugging
  5. Type Safety: Validate parameters with Pydantic
  6. Documentation: Clear descriptions for each tool

3. Production Monitoring (Using Evals)

AgentKit’s Evals feature continuously monitors and optimizes agent performance.

3.1 A/B Testing for Prompt Optimization

Scenario: Compare two prompt versions for a customer support agent

from openai_agents import Agent, Runner
from openai_agents.evals import ABTest, Metric
import random

# Version A: Formal
agent_a = Agent(
    name="Support Agent A",
    instructions="""
    You are a professional customer support representative.

    Guidelines:
    - Formal and polite language
    - Detailed explanations
    - Adhere to company policies
    """
)

# Version B: Casual
agent_b = Agent(
    name="Support Agent B",
    instructions="""
    You are a friendly customer support representative.

    Guidelines:
    - Casual and approachable language
    - Concise responses
    - Show empathy
    """
)

# A/B test configuration
ab_test = ABTest(
    name="Support Tone Experiment",
    variants={
        "formal": agent_a,
        "casual": agent_b
    },
    traffic_split={"formal": 0.5, "casual": 0.5},  # 50/50 split
    metrics=[
        Metric(name="response_time", type="latency"),
        Metric(name="user_satisfaction", type="rating"),  # 1-5 scale
        Metric(name="resolution_rate", type="binary")     # Resolved/Unresolved
    ]
)

# Simulate user requests
test_queries = [
    "My order hasn't arrived",
    "I want to request a refund",
    "How do I cancel my account?",
    # ... 100 test cases
]

runner = Runner(agents=[agent_a, agent_b])

for query in test_queries:
    # Randomly select variant (50/50)
    variant = random.choice(["formal", "casual"])
    agent = agent_a if variant == "formal" else agent_b

    result = runner.run(
        agent=agent,
        messages=[{"role": "user", "content": query}]
    )

    # Record metrics
    ab_test.record_result(
        variant=variant,
        query=query,
        metrics={
            "response_time": result.latency,
            "user_satisfaction": 4.2,  # Actually from user feedback
            "resolution_rate": 1 if "resolved" in result.output.lower() else 0
        }
    )

# Analyze results
report = ab_test.analyze()
print(f"""
A/B Test Results:

Formal Version:
- Avg Response Time: {report['formal']['response_time']}s
- User Satisfaction: {report['formal']['user_satisfaction']}/5
- Resolution Rate: {report['formal']['resolution_rate']}%

Casual Version:
- Avg Response Time: {report['casual']['response_time']}s
- User Satisfaction: {report['casual']['user_satisfaction']}/5
- Resolution Rate: {report['casual']['resolution_rate']}%

Winner: {report['winner']} (Confidence: {report['confidence']}%)
""")

3.2 Automatic Prompt Optimization

Evals can automatically improve prompts using machine learning:

from openai_agents.evals import PromptOptimizer

optimizer = PromptOptimizer(
    agent=agent_a,
    objective="user_satisfaction",  # Metric to optimize
    baseline_score=3.8,              # Current baseline
    improvement_threshold=0.2        # Minimum improvement
)

# Optimize over 100 iterations
best_prompt = optimizer.optimize(
    test_dataset=test_queries,
    iterations=100,
    techniques=["few-shot", "chain-of-thought", "self-critique"]
)

print(f"Optimized prompt:\n{best_prompt}")
print(f"Improvement: +{optimizer.improvement} (satisfaction 3.8 → 4.0)")

4. Enterprise Security and Compliance

4.1 Multi-Tenancy Implementation

Scenario: SaaS platform where multiple customers share agents, but data is strictly isolated

from openai_agents import Agent, Runner, Session
from typing import Dict, Any
import hashlib

class TenantIsolatedAgent:
    def __init__(self):
        self.base_agent = Agent(
            name="Multi-Tenant Assistant",
            instructions="""
            You are a multi-tenant SaaS assistant.

            Critical security rules:
            1. tenant_id is immutable
            2. Never access other tenants' data
            3. Always add tenant_id filter to queries
            """
        )
        self.runner = Runner(agents=[self.base_agent])

    def create_tenant_session(self, tenant_id: str) -> Session:
        """Create tenant-specific session"""
        # Namespace session ID with tenant ID
        session_id = hashlib.sha256(f"{tenant_id}:session".encode()).hexdigest()

        session = Session(
            id=session_id,
            metadata={
                "tenant_id": tenant_id,
                "data_scope": f"WHERE tenant_id = '{tenant_id}'",  # SQL injection prevention
                "access_level": "tenant_isolated"
            }
        )
        return session

    def query_with_isolation(
        self,
        tenant_id: str,
        user_query: str
    ) -> Dict[str, Any]:
        """Execute tenant-isolated query"""
        session = self.create_tenant_session(tenant_id)

        # Inject tenant context into system message
        result = self.runner.run(
            agent=self.base_agent,
            session=session,
            messages=[
                {
                    "role": "system",
                    "content": f"[SECURITY] You are currently operating in context of tenant_id='{tenant_id}'. Never access other tenant data."
                },
                {
                    "role": "user",
                    "content": user_query
                }
            ]
        )

        return result

# Usage example
assistant = TenantIsolatedAgent()

# Tenant A
result_a = assistant.query_with_isolation(
    tenant_id="tenant_abc123",
    user_query="Show last month's sales report"
)

# Tenant B (completely isolated)
result_b = assistant.query_with_isolation(
    tenant_id="tenant_xyz789",
    user_query="Show last month's sales report"
)

# result_a and result_b return different data

4.2 GDPR/CCPA Compliance

Data Deletion Request Handling:

from openai_agents import Agent, Runner
from datetime import datetime
import json

class GDPRCompliantAgent:
    def __init__(self):
        self.agent = Agent(
            name="GDPR Agent",
            instructions="""
            You are a GDPR compliance manager.

            User rights:
            1. Right to Access
            2. Right to Erasure
            3. Right to Data Portability
            4. Right to Restriction of Processing
            """
        )
        self.audit_log = []

    def handle_deletion_request(self, user_id: str) -> Dict[str, Any]:
        """Handle user data deletion request"""
        runner = Runner(agents=[self.agent])

        result = runner.run(
            agent=self.agent,
            messages=[{
                "role": "user",
                "content": f"Delete all personal data for User ID {user_id}. Under GDPR Article 17."
            }]
        )

        # Record audit log
        self.audit_log.append({
            "timestamp": datetime.now().isoformat(),
            "action": "data_deletion",
            "user_id": user_id,
            "status": "completed",
            "regulation": "GDPR Article 17"
        })

        return {
            "success": True,
            "deleted_records": result.output,
            "audit_id": len(self.audit_log)
        }

    def export_user_data(self, user_id: str) -> str:
        """Export user data in JSON format"""
        runner = Runner(agents=[self.agent])

        result = runner.run(
            agent=self.agent,
            messages=[{
                "role": "user",
                "content": f"Export all personal data for User ID {user_id} in JSON format. Under GDPR Article 20."
            }]
        )

        # Record audit log
        self.audit_log.append({
            "timestamp": datetime.now().isoformat(),
            "action": "data_export",
            "user_id": user_id,
            "regulation": "GDPR Article 20"
        })

        return result.output  # JSON string

    def get_audit_trail(self) -> str:
        """Get audit trail"""
        return json.dumps(self.audit_log, indent=2)

# Usage example
gdpr_agent = GDPRCompliantAgent()

# User requests data deletion
deletion_result = gdpr_agent.handle_deletion_request(user_id="user_12345")
print(f"Deletion complete: {deletion_result['deleted_records']} records")

# User requests data export
exported_data = gdpr_agent.export_user_data(user_id="user_67890")
print(f"Exported data:\n{exported_data}")

# Check audit trail
audit_trail = gdpr_agent.get_audit_trail()
print(f"Audit log:\n{audit_trail}")

5. Real-World Case Studies

Case Study 1: SaaS Onboarding Automation

Company: ProjectManage Pro (fictional project management SaaS)

Challenge: Slow new user onboarding process (average 7 days), high churn rate (40% leave within 30 days)

AgentKit Implementation:

from openai_agents import Agent, Runner

# Onboarding Manager
onboarding_manager = Agent(
    name="Onboarding Manager",
    instructions="""
    Onboard new users in 7 days.

    Day 1: Account setup, first project creation
    Day 2: Invite team members, configure permissions
    Day 3: Create tasks, use Kanban board
    Day 4: Integration setup (Slack/GitHub/Jira)
    Day 5: Reporting features, dashboard
    Day 6: Create automation workflows
    Day 7: Best practices, pro tips

    Hand off to next step after each completion.
    """,
    handoffs=["Setup Specialist", "Integration Specialist", "Automation Coach"]
)

# Specialist agents
setup_specialist = Agent(
    name="Setup Specialist",
    instructions="Guide account setup and project initialization."
)

integration_specialist = Agent(
    name="Integration Specialist",
    instructions="Help set up Slack, GitHub, Jira integrations."
)

automation_coach = Agent(
    name="Automation Coach",
    instructions="Teach workflow automation best practices."
)

# Execute
runner = Runner(agents=[
    onboarding_manager,
    setup_specialist,
    integration_specialist,
    automation_coach
])

result = runner.run(
    agent=onboarding_manager,
    messages=[{
        "role": "user",
        "content": "Start onboarding for new user John Doe."
    }]
)

Results:

  • Onboarding Time: 7 days → 2 days (70% reduction)
  • 30-Day Churn Rate: 40% → 12% (28-point improvement)
  • Activation Rate: 45% → 78% (73% increase)
  • Customer Support Tickets: 50% reduction

Case Study 2: Data Pipeline Auto-Recovery

Company: DataFlow Analytics (big data analytics platform)

Challenge: Frequent ETL pipeline failures (5-10 times/week), manual recovery takes average 2 hours

AgentKit Implementation:

from openai_agents import Agent, Runner
import asyncio

# Incident Responder
incident_responder = Agent(
    name="Incident Responder",
    instructions="""
    Auto-diagnose and recover data pipeline failures.

    Diagnostic steps:
    1. Analyze error logs
    2. Identify root cause
    3. Determine recovery strategy
    4. Attempt auto-fix
    5. Confirm success or escalate

    Common issues:
    - API rate limit → Backoff retry
    - Data schema change → Auto-adjust schema
    - Out of memory → Switch to chunk processing
    - Timeout → Increase timeout settings
    """,
    handoffs=["Database Specialist", "API Specialist"]
)

database_specialist = Agent(
    name="Database Specialist",
    instructions="Handle database issues (connection, query optimization, schema)."
)

api_specialist = Agent(
    name="API Specialist",
    instructions="Handle external API integration issues (auth, rate limits, response format)."
)

# Monitoring loop
async def monitor_pipeline():
    runner = Runner(agents=[incident_responder, database_specialist, api_specialist])

    while True:
        # Pipeline health check (every 5 minutes)
        await asyncio.sleep(300)

        # Simulate failure detection
        pipeline_status = check_pipeline_health()  # Fictional function

        if pipeline_status["status"] == "failed":
            print(f"Failure detected: {pipeline_status['error']}")

            # Auto-trigger agent
            result = runner.run(
                agent=incident_responder,
                messages=[{
                    "role": "user",
                    "content": f"Fix pipeline failure: {pipeline_status['error']}"
                }]
            )

            if "Fixed" in result.output:
                print(f"Auto-recovery successful: {result.output}")
            else:
                print(f"Escalate to human: {result.output}")

# Execute
asyncio.run(monitor_pipeline())

Results:

  • MTTR (Mean Time To Recovery): 2 hours → 22 minutes (82% reduction)
  • Auto-Recovery Success Rate: 0% → 73%
  • On-Call Engineer Load: 40 hours/week → 8 hours/week (80% reduction)
  • Pipeline Uptime: 92% → 99.2%

Case Study 3: DevOps Workflow Automation

Company: MicroServices Corp (microservices architecture company)

Challenge: 50+ daily deployments, manual review is bottleneck (average 3-hour deployment wait)

AgentKit Implementation:

from openai_agents import Agent, Runner

# Deploy Manager
deploy_manager = Agent(
    name="Deploy Manager",
    instructions="""
    Auto-review pull requests and deploy.

    Review criteria:
    1. All tests pass
    2. Code coverage 80%+
    3. Security scan clean
    4. No performance regression
    5. No breaking changes

    Approve → Staging → Canary deploy → Production
    Detect issues → Auto-rollback
    """,
    handoffs=["Test Analyzer", "Security Scanner", "Performance Monitor"]
)

test_analyzer = Agent(
    name="Test Analyzer",
    instructions="Analyze test results, check coverage and pass rate."
)

security_scanner = Agent(
    name="Security Scanner",
    instructions="Scan for OWASP Top 10, dependency vulnerabilities."
)

performance_monitor = Agent(
    name="Performance Monitor",
    instructions="Monitor response time, throughput, error rate."
)

# Triggered by GitHub webhook
def on_pull_request_opened(pr_data):
    runner = Runner(agents=[
        deploy_manager,
        test_analyzer,
        security_scanner,
        performance_monitor
    ])

    result = runner.run(
        agent=deploy_manager,
        messages=[{
            "role": "user",
            "content": f"Review and deploy PR #{pr_data['number']}."
        }]
    )

    if "Approved" in result.output:
        print(f"Auto-deploy starting: {result.output}")
    else:
        print(f"Deploy blocked: {result.output}")

Results:

  • Deployment Wait Time: 3 hours → 8 minutes (96% reduction)
  • Manual Review Burden: 8 hours/day → 30 minutes/day (94% reduction)
  • Deployment Frequency: 50/day → 200/day (300% increase)
  • Production Incidents: 3/week → 1/month (92% reduction)

6. Performance Optimization

6.1 Parallel Processing

Execute multiple independent tasks in parallel to reduce latency:

from openai_agents import Agent, Runner
import asyncio

agents = [
    Agent(name="Agent 1", instructions="Process task 1"),
    Agent(name="Agent 2", instructions="Process task 2"),
    Agent(name="Agent 3", instructions="Process task 3")
]

runner = Runner(agents=agents)

# Parallel execution
async def run_parallel():
    tasks = [
        runner.run_async(agent=agents[0], messages=[{"role": "user", "content": "Task 1"}]),
        runner.run_async(agent=agents[1], messages=[{"role": "user", "content": "Task 2"}]),
        runner.run_async(agent=agents[2], messages=[{"role": "user", "content": "Task 3"}])
    ]
    results = await asyncio.gather(*tasks)
    return results

results = asyncio.run(run_parallel())
# 3 seconds → 1 second (67% faster)

6.2 Streaming Responses

Use streaming for long responses to improve First Byte Time (FBT):

from openai_agents import Agent, Runner

agent = Agent(
    name="Streaming Agent",
    instructions="Provide detailed responses via streaming."
)

runner = Runner(agents=[agent])

# Enable streaming
for chunk in runner.run_stream(
    agent=agent,
    messages=[{"role": "user", "content": "Provide detailed project analysis"}]
):
    print(chunk, end="", flush=True)  # Real-time display

6.3 Batch Processing

Batch similar requests to reduce API costs:

from openai_agents import Agent, Runner

agent = Agent(name="Batch Processor", instructions="Process requests in batches.")

runner = Runner(agents=[agent])

# Batch process 100 requests
requests = [f"Request {i}" for i in range(100)]

results = runner.run_batch(
    agent=agent,
    messages=[{"role": "user", "content": req} for req in requests],
    batch_size=10  # Process 10 at a time
)

# 100 API calls → 10 API calls (90% reduction)

Conclusion

In Part 2, we explored AgentKit’s advanced features:

  1. Enterprise Architecture Patterns: Hierarchical, event-driven, and graph-based patterns
  2. Custom MCP Servers: How to develop custom tools like Slack integration
  3. Production Monitoring: A/B testing and auto-prompt optimization with Evals
  4. Security and Compliance: Multi-tenancy, GDPR compliance
  5. Real-World Case Studies: SaaS onboarding 70% time reduction, data pipeline auto-recovery 82% MTTR reduction, DevOps manual review 94% reduction
  6. Performance Optimization: Parallel processing, streaming, batch processing

AgentKit is a powerful platform for building AI agent systems at any scale, from theory to practice.

Next Steps:

Start building production-grade agents today!

Read in Other Languages

Was this helpful?

Your support helps me create better content. Buy me a coffee! ☕

About the Author

JK

Kim Jangwook

Full-Stack Developer specializing in AI/LLM

Building AI agent systems, LLM applications, and automation solutions with 10+ years of web development experience. Sharing practical insights on Claude Code, MCP, and RAG systems.