Human-in-the-Loop
While this library focuses on automated validation, you can implement human-in-the-loop patterns by using GuardrailContext.deps to inject approval services. This guide shows common patterns for adding human review to your AI agents.
Overview
Section titled “Overview”Human-in-the-loop workflows are useful for:
- High-stakes decisions: Financial transactions, data deletion, external communications
- Compliance requirements: Audit trails, approval chains
- Quality assurance: Review before publishing, customer-facing content
- Learning: Collecting human feedback to improve guardrails
The key insight is that guardrail functions are async and can await external services, including human decisions.
Basic Pattern
Section titled “Basic Pattern”-
Create an approval service
class ApprovalService:"""Interface to your approval system (Slack, email, web UI, etc.)"""async def request_approval(self,content: str,reason: str,metadata: dict | None = None,) -> bool:"""Send approval request and wait for response.Returns True if approved, False if rejected."""# Implement your approval flow here# Could be Slack buttons, email links, web UI, etc.... -
Write a guardrail that requests approval
from pydantic_ai_guardrails import GuardrailContext, GuardrailResultasync def require_human_approval(ctx: GuardrailContext,prompt: str,) -> GuardrailResult:"""Request human approval for sensitive operations."""approval_service = ctx.deps['approval_service']# Check if this prompt needs reviewif needs_review(prompt):approved = await approval_service.request_approval(content=prompt,reason='Contains sensitive operation',metadata={'user_id': ctx.deps.get('user_id')},)if not approved:return {'tripwire_triggered': True,'message': 'Human reviewer rejected this request','severity': 'high','metadata': {'review_required': True},}return {'tripwire_triggered': False} -
Use with your agent
from pydantic_ai_guardrails import GuardedAgent, InputGuardrailguarded_agent = GuardedAgent(agent,input_guardrails=[InputGuardrail(require_human_approval)],)result = await guarded_agent.run('Delete all user records from database',deps={'approval_service': ApprovalService(),'user_id': 'user_123',},)
Slack Integration Example
Section titled “Slack Integration Example”A practical example using Slack for approvals:
import asynciofrom slack_sdk.web.async_client import AsyncWebClient
class SlackApprovalService: def __init__(self, slack_token: str, channel: str): self.client = AsyncWebClient(token=slack_token) self.channel = channel self.pending_approvals: dict[str, asyncio.Future] = {}
async def request_approval( self, content: str, reason: str, metadata: dict | None = None, ) -> bool: """Send Slack message with approve/reject buttons."""
# Generate unique ID for this request request_id = str(uuid.uuid4())
# Create future to wait for response future: asyncio.Future[bool] = asyncio.Future() self.pending_approvals[request_id] = future
# Send message with interactive buttons await self.client.chat_postMessage( channel=self.channel, text=f"Approval Required: {reason}", blocks=[ { "type": "section", "text": { "type": "mrkdwn", "text": f"*Approval Required*\n{reason}\n\n```{content[:500]}```" } }, { "type": "actions", "block_id": request_id, "elements": [ { "type": "button", "text": {"type": "plain_text", "text": "Approve"}, "style": "primary", "action_id": "approve", }, { "type": "button", "text": {"type": "plain_text", "text": "Reject"}, "style": "danger", "action_id": "reject", } ] } ] )
# Wait for response (with timeout) try: return await asyncio.wait_for(future, timeout=300) # 5 min timeout except asyncio.TimeoutError: return False # Default to reject on timeout finally: self.pending_approvals.pop(request_id, None)
def handle_button_click(self, request_id: str, approved: bool): """Called by your Slack webhook handler.""" if request_id in self.pending_approvals: self.pending_approvals[request_id].set_result(approved)Risk-Based Routing
Section titled “Risk-Based Routing”Not every request needs human review. Use risk assessment to route:
async def risk_based_approval( ctx: GuardrailContext, prompt: str,) -> GuardrailResult: """Route to human review based on risk score."""
risk_score = await assess_risk(prompt) approval_service = ctx.deps['approval_service']
if risk_score > 0.9: # Critical risk: always block, notify security await ctx.deps['security_alerts'].send( f"Critical risk prompt blocked: {prompt[:100]}..." ) return { 'tripwire_triggered': True, 'message': 'Request blocked due to critical risk', 'severity': 'critical', }
elif risk_score > 0.6: # High risk: require human approval approved = await approval_service.request_approval( content=prompt, reason=f'High risk score: {risk_score:.2f}', ) if not approved: return { 'tripwire_triggered': True, 'message': 'Human reviewer rejected high-risk request', 'severity': 'high', }
elif risk_score > 0.3: # Medium risk: log for async review, but allow await ctx.deps['review_queue'].add(prompt, risk_score)
# Low risk: proceed without review return {'tripwire_triggered': False}
async def assess_risk(prompt: str) -> float: """Assess risk of a prompt (0.0 to 1.0).""" # Your risk assessment logic here # Could use ML model, keyword matching, etc. ...Output Review Pattern
Section titled “Output Review Pattern”Human review on outputs before returning to users:
async def review_before_send( ctx: GuardrailContext, output: str,) -> GuardrailResult: """Require human approval before sending response."""
# Check if output type requires review output_type = classify_output(output)
if output_type in ['financial_advice', 'legal_guidance', 'medical_info']: approval_service = ctx.deps['approval_service']
approved = await approval_service.request_approval( content=output, reason=f'Review required for {output_type}', metadata={ 'original_prompt': ctx.prompt, 'output_type': output_type, }, )
if not approved: return { 'tripwire_triggered': True, 'message': f'{output_type} response rejected by reviewer', 'severity': 'high', 'suggestion': 'Provide more general guidance without specific recommendations', }
return {'tripwire_triggered': False}Async Review Queue
Section titled “Async Review Queue”For non-blocking review that doesn’t stop the user:
class AsyncReviewQueue: """Queue responses for later human review."""
async def add( self, prompt: str, output: str, metadata: dict, ): """Add to review queue (non-blocking).""" await self.database.insert({ 'prompt': prompt, 'output': output, 'metadata': metadata, 'status': 'pending', 'created_at': datetime.utcnow(), })
async def log_for_review( ctx: GuardrailContext, output: str,) -> GuardrailResult: """Log output for async review (doesn't block)."""
review_queue = ctx.deps['review_queue']
await review_queue.add( prompt=ctx.prompt, output=output, metadata={ 'user_id': ctx.deps.get('user_id'), 'session_id': ctx.deps.get('session_id'), }, )
# Always pass - review happens async return {'tripwire_triggered': False}Integration with Workflow Systems
Section titled “Integration with Workflow Systems”For complex approval chains, integrate with workflow systems:
Temporal
Section titled “Temporal”from temporalio.client import Client
class TemporalApprovalService: def __init__(self, client: Client): self.client = client
async def request_approval(self, content: str, reason: str) -> bool: # Start approval workflow handle = await self.client.start_workflow( ApprovalWorkflow.run, ApprovalRequest(content=content, reason=reason), id=f"approval-{uuid.uuid4()}", task_queue="approvals", ) # Wait for workflow result return await handle.result()Prefect
Section titled “Prefect”from prefect import flow, task
@taskasync def send_approval_request(content: str, reason: str) -> str: # Send notification, return request ID ...
@taskasync def wait_for_approval(request_id: str, timeout: int) -> bool: # Poll for approval status ...
@flowasync def approval_flow(content: str, reason: str) -> bool: request_id = await send_approval_request(content, reason) return await wait_for_approval(request_id, timeout=300)Best Practices
Section titled “Best Practices”1. Set Reasonable Timeouts
Section titled “1. Set Reasonable Timeouts”# Don't block forever waiting for humanstry: approved = await asyncio.wait_for( approval_service.request_approval(...), timeout=300, # 5 minutes )except asyncio.TimeoutError: # Default to safe behavior approved = False2. Provide Context to Reviewers
Section titled “2. Provide Context to Reviewers”await approval_service.request_approval( content=prompt, reason='Contains delete operation', metadata={ 'user_id': ctx.deps['user_id'], 'user_role': ctx.deps['user_role'], 'session_history': ctx.messages[-5:], # Last 5 messages 'risk_score': risk_score, },)3. Audit Trail
Section titled “3. Audit Trail”async def audited_approval(ctx: GuardrailContext, prompt: str) -> GuardrailResult: result = await require_human_approval(ctx, prompt)
# Log the decision await ctx.deps['audit_log'].record({ 'action': 'human_review', 'prompt': prompt, 'approved': not result['tripwire_triggered'], 'reviewer': ctx.deps.get('reviewer_id'), 'timestamp': datetime.utcnow(), })
return result4. Graceful Degradation
Section titled “4. Graceful Degradation”async def approval_with_fallback(ctx: GuardrailContext, prompt: str) -> GuardrailResult: try: return await require_human_approval(ctx, prompt) except ApprovalServiceUnavailable: # Fallback: block if approval service is down return { 'tripwire_triggered': True, 'message': 'Approval service unavailable, defaulting to block', 'severity': 'high', }Next Steps
Section titled “Next Steps”- Custom Guardrails - Write complex approval logic
- Error Handling - Handle approval failures gracefully
- Logfire Integration - Trace approval workflows