"""Compliance Screening Agent — webhook handler for regulatory events."""
import hashlib
import hmac
import json
import os
from datetime import datetime
import requests
from flask import Flask, request, jsonify
app = Flask(__name__)
# Configuration
SIGNING_SECRET = os.environ.get("WEBHOOK_SIGNING_SECRET", "whsec_...")
SECAPI_API_KEY = os.environ["SECAPI_API_KEY"]
API_BASE = "https://api.secapi.ai"
with open("config.json") as f:
CONFIG = json.load(f)
# Build a reverse lookup: ticker -> client info
TICKER_TO_CLIENT = {}
for client in CONFIG["clients"]:
for ticker in client["tickers"]:
TICKER_TO_CLIENT[ticker.upper()] = client
def verify_signature(payload: bytes, signature: str) -> bool:
"""Verify the webhook signature using HMAC-SHA256."""
expected = hmac.new(
SIGNING_SECRET.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
def classify_severity(event_type: str, data: dict) -> str:
"""Classify the alert severity based on event type and content."""
base_severity = CONFIG["severity_rules"].get(event_type, "medium")
# Upgrade severity for specific conditions
if event_type == "enforcement" and data.get("penalty_amount", 0) > 1_000_000:
return "critical"
if event_type == "restatement" and data.get("restatement_type") == "material":
return "critical"
if event_type == "auditor_change" and data.get("reason") == "disagreement":
return "critical"
if event_type == "officer_change" and data.get("role") in ("CEO", "CFO"):
return "high"
return base_severity
def format_alert(event_type: str, data: dict, severity: str) -> dict:
"""Format a compliance alert for Slack delivery."""
severity_emoji = {
"critical": "🔴",
"high": "🟠",
"medium": "🟡",
"low": "🟢",
}
emoji = severity_emoji.get(severity, "⚪")
ticker = data.get("ticker", "UNKNOWN")
company = data.get("company_name", ticker)
templates = {
"enforcement": (
f"{emoji} *ENFORCEMENT ACTION* — {company} ({ticker})\n"
f"*Violation:* {data.get('violation_type', 'N/A')}\n"
f"*Respondent:* {data.get('respondent_name', 'N/A')}\n"
f"*Penalty:* ${data.get('penalty_amount', 0):,.0f}\n"
f"*Date:* {data.get('date', 'N/A')}"
),
"restatement": (
f"{emoji} *FINANCIAL RESTATEMENT* — {company} ({ticker})\n"
f"*Type:* {data.get('restatement_type', 'N/A')}\n"
f"*Periods affected:* {data.get('periods_affected', 'N/A')}\n"
f"*Filed:* {data.get('filed_at', 'N/A')}"
),
"auditor_change": (
f"{emoji} *AUDITOR CHANGE* — {company} ({ticker})\n"
f"*Previous:* {data.get('previous_auditor', 'N/A')}\n"
f"*New:* {data.get('new_auditor', 'N/A')}\n"
f"*Reason:* {data.get('reason', 'N/A')}\n"
f"*Effective:* {data.get('effective_date', 'N/A')}"
),
"officer_change": (
f"{emoji} *OFFICER CHANGE* — {company} ({ticker})\n"
f"*Officer:* {data.get('officer_name', 'N/A')}\n"
f"*Role:* {data.get('role', 'N/A')}\n"
f"*Change:* {data.get('change_type', 'N/A')}\n"
f"*Effective:* {data.get('effective_date', 'N/A')}"
),
}
return {
"text": templates.get(event_type, f"{emoji} Unknown event: {event_type}"),
"severity": severity,
"ticker": ticker,
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat(),
}
def send_slack_alert(alert: dict):
"""Send a formatted alert to the Slack webhook."""
payload = {
"blocks": [
{
"type": "section",
"text": {"type": "mrkdwn", "text": alert["text"]},
},
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": f"Severity: *{alert['severity'].upper()}* | {alert['timestamp']}",
}
],
},
]
}
resp = requests.post(CONFIG["slack_webhook_url"], json=payload)
resp.raise_for_status()
@app.route("/webhooks/compliance", methods=["POST"])
def handle_compliance_webhook():
"""Handle incoming compliance event webhooks."""
# Verify signature
signature = request.headers.get("X-Webhook-Signature", "")
if not verify_signature(request.data, signature):
return jsonify({"error": "Invalid signature"}), 401
payload = request.json
event_type = payload.get("event", "").replace(".new", "")
data = payload.get("data", {})
ticker = data.get("ticker", "").upper()
# Check if this ticker is in our monitored universe
client = TICKER_TO_CLIENT.get(ticker)
if not client:
return jsonify({"status": "ignored", "reason": "not in watchlist"}), 200
# Classify and format
severity = classify_severity(event_type, data)
alert = format_alert(event_type, data, severity)
# Send alert
send_slack_alert(alert)
# Log for audit trail
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"ticker": ticker,
"severity": severity,
"client": client["name"],
"data": data,
}
with open("audit_log.jsonl", "a") as f:
f.write(json.dumps(log_entry) + "\n")
return jsonify({"status": "processed", "severity": severity}), 200
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "ok"}), 200
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)