Skip to content

Human-in-the-Loop

While this library focuses on automated validation, you can implement human-in-the-loop patterns by using GuardrailContext.deps to inject approval services. This guide shows common patterns for adding human review to your AI agents.

Human-in-the-loop workflows are useful for:

  • High-stakes decisions: Financial transactions, data deletion, external communications
  • Compliance requirements: Audit trails, approval chains
  • Quality assurance: Review before publishing, customer-facing content
  • Learning: Collecting human feedback to improve guardrails

The key insight is that guardrail functions are async and can await external services, including human decisions.

  1. Create an approval service

    class ApprovalService:
    """Interface to your approval system (Slack, email, web UI, etc.)"""
    async def request_approval(
    self,
    content: str,
    reason: str,
    metadata: dict | None = None,
    ) -> bool:
    """
    Send approval request and wait for response.
    Returns True if approved, False if rejected.
    """
    # Implement your approval flow here
    # Could be Slack buttons, email links, web UI, etc.
    ...
  2. Write a guardrail that requests approval

    from pydantic_ai_guardrails import GuardrailContext, GuardrailResult
    async def require_human_approval(
    ctx: GuardrailContext,
    prompt: str,
    ) -> GuardrailResult:
    """Request human approval for sensitive operations."""
    approval_service = ctx.deps['approval_service']
    # Check if this prompt needs review
    if needs_review(prompt):
    approved = await approval_service.request_approval(
    content=prompt,
    reason='Contains sensitive operation',
    metadata={'user_id': ctx.deps.get('user_id')},
    )
    if not approved:
    return {
    'tripwire_triggered': True,
    'message': 'Human reviewer rejected this request',
    'severity': 'high',
    'metadata': {'review_required': True},
    }
    return {'tripwire_triggered': False}
  3. Use with your agent

    from pydantic_ai_guardrails import GuardedAgent, InputGuardrail
    guarded_agent = GuardedAgent(
    agent,
    input_guardrails=[InputGuardrail(require_human_approval)],
    )
    result = await guarded_agent.run(
    'Delete all user records from database',
    deps={
    'approval_service': ApprovalService(),
    'user_id': 'user_123',
    },
    )

A practical example using Slack for approvals:

import asyncio
from slack_sdk.web.async_client import AsyncWebClient
class SlackApprovalService:
def __init__(self, slack_token: str, channel: str):
self.client = AsyncWebClient(token=slack_token)
self.channel = channel
self.pending_approvals: dict[str, asyncio.Future] = {}
async def request_approval(
self,
content: str,
reason: str,
metadata: dict | None = None,
) -> bool:
"""Send Slack message with approve/reject buttons."""
# Generate unique ID for this request
request_id = str(uuid.uuid4())
# Create future to wait for response
future: asyncio.Future[bool] = asyncio.Future()
self.pending_approvals[request_id] = future
# Send message with interactive buttons
await self.client.chat_postMessage(
channel=self.channel,
text=f"Approval Required: {reason}",
blocks=[
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Approval Required*\n{reason}\n\n```{content[:500]}```"
}
},
{
"type": "actions",
"block_id": request_id,
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "Approve"},
"style": "primary",
"action_id": "approve",
},
{
"type": "button",
"text": {"type": "plain_text", "text": "Reject"},
"style": "danger",
"action_id": "reject",
}
]
}
]
)
# Wait for response (with timeout)
try:
return await asyncio.wait_for(future, timeout=300) # 5 min timeout
except asyncio.TimeoutError:
return False # Default to reject on timeout
finally:
self.pending_approvals.pop(request_id, None)
def handle_button_click(self, request_id: str, approved: bool):
"""Called by your Slack webhook handler."""
if request_id in self.pending_approvals:
self.pending_approvals[request_id].set_result(approved)

Not every request needs human review. Use risk assessment to route:

async def risk_based_approval(
ctx: GuardrailContext,
prompt: str,
) -> GuardrailResult:
"""Route to human review based on risk score."""
risk_score = await assess_risk(prompt)
approval_service = ctx.deps['approval_service']
if risk_score > 0.9:
# Critical risk: always block, notify security
await ctx.deps['security_alerts'].send(
f"Critical risk prompt blocked: {prompt[:100]}..."
)
return {
'tripwire_triggered': True,
'message': 'Request blocked due to critical risk',
'severity': 'critical',
}
elif risk_score > 0.6:
# High risk: require human approval
approved = await approval_service.request_approval(
content=prompt,
reason=f'High risk score: {risk_score:.2f}',
)
if not approved:
return {
'tripwire_triggered': True,
'message': 'Human reviewer rejected high-risk request',
'severity': 'high',
}
elif risk_score > 0.3:
# Medium risk: log for async review, but allow
await ctx.deps['review_queue'].add(prompt, risk_score)
# Low risk: proceed without review
return {'tripwire_triggered': False}
async def assess_risk(prompt: str) -> float:
"""Assess risk of a prompt (0.0 to 1.0)."""
# Your risk assessment logic here
# Could use ML model, keyword matching, etc.
...

Human review on outputs before returning to users:

async def review_before_send(
ctx: GuardrailContext,
output: str,
) -> GuardrailResult:
"""Require human approval before sending response."""
# Check if output type requires review
output_type = classify_output(output)
if output_type in ['financial_advice', 'legal_guidance', 'medical_info']:
approval_service = ctx.deps['approval_service']
approved = await approval_service.request_approval(
content=output,
reason=f'Review required for {output_type}',
metadata={
'original_prompt': ctx.prompt,
'output_type': output_type,
},
)
if not approved:
return {
'tripwire_triggered': True,
'message': f'{output_type} response rejected by reviewer',
'severity': 'high',
'suggestion': 'Provide more general guidance without specific recommendations',
}
return {'tripwire_triggered': False}

For non-blocking review that doesn’t stop the user:

class AsyncReviewQueue:
"""Queue responses for later human review."""
async def add(
self,
prompt: str,
output: str,
metadata: dict,
):
"""Add to review queue (non-blocking)."""
await self.database.insert({
'prompt': prompt,
'output': output,
'metadata': metadata,
'status': 'pending',
'created_at': datetime.utcnow(),
})
async def log_for_review(
ctx: GuardrailContext,
output: str,
) -> GuardrailResult:
"""Log output for async review (doesn't block)."""
review_queue = ctx.deps['review_queue']
await review_queue.add(
prompt=ctx.prompt,
output=output,
metadata={
'user_id': ctx.deps.get('user_id'),
'session_id': ctx.deps.get('session_id'),
},
)
# Always pass - review happens async
return {'tripwire_triggered': False}

For complex approval chains, integrate with workflow systems:

from temporalio.client import Client
class TemporalApprovalService:
def __init__(self, client: Client):
self.client = client
async def request_approval(self, content: str, reason: str) -> bool:
# Start approval workflow
handle = await self.client.start_workflow(
ApprovalWorkflow.run,
ApprovalRequest(content=content, reason=reason),
id=f"approval-{uuid.uuid4()}",
task_queue="approvals",
)
# Wait for workflow result
return await handle.result()
from prefect import flow, task
@task
async def send_approval_request(content: str, reason: str) -> str:
# Send notification, return request ID
...
@task
async def wait_for_approval(request_id: str, timeout: int) -> bool:
# Poll for approval status
...
@flow
async def approval_flow(content: str, reason: str) -> bool:
request_id = await send_approval_request(content, reason)
return await wait_for_approval(request_id, timeout=300)
# Don't block forever waiting for humans
try:
approved = await asyncio.wait_for(
approval_service.request_approval(...),
timeout=300, # 5 minutes
)
except asyncio.TimeoutError:
# Default to safe behavior
approved = False
await approval_service.request_approval(
content=prompt,
reason='Contains delete operation',
metadata={
'user_id': ctx.deps['user_id'],
'user_role': ctx.deps['user_role'],
'session_history': ctx.messages[-5:], # Last 5 messages
'risk_score': risk_score,
},
)
async def audited_approval(ctx: GuardrailContext, prompt: str) -> GuardrailResult:
result = await require_human_approval(ctx, prompt)
# Log the decision
await ctx.deps['audit_log'].record({
'action': 'human_review',
'prompt': prompt,
'approved': not result['tripwire_triggered'],
'reviewer': ctx.deps.get('reviewer_id'),
'timestamp': datetime.utcnow(),
})
return result
async def approval_with_fallback(ctx: GuardrailContext, prompt: str) -> GuardrailResult:
try:
return await require_human_approval(ctx, prompt)
except ApprovalServiceUnavailable:
# Fallback: block if approval service is down
return {
'tripwire_triggered': True,
'message': 'Approval service unavailable, defaulting to block',
'severity': 'high',
}