Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.secapi.ai/llms.txt

Use this file to discover all available pages before exploring further.

Build a Compliance Screening Agent

Compliance teams need to stay on top of regulatory actions and corporate governance events across their client universe. This tutorial builds an agent that monitors enforcement actions, financial restatements, auditor changes, and officer changes using the OMNI Datastream REST API and webhooks, then delivers structured alerts.

What you will build

  • A webhook-based compliance monitoring system
  • Screening for enforcement actions, restatements, auditor changes, and officer changes
  • Structured alert payloads with severity classification
  • A webhook handler that routes alerts by type and severity
  • A polling fallback for environments where webhooks are not available

Prerequisites

  • An Omni Datastream API key (set as OMNI_DATASTREAM_API_KEY)
  • Python 3.9+ (for the webhook handler)
  • A publicly accessible HTTPS endpoint (use ngrok for local development)
  • (Optional) Node.js 18+ for the JavaScript handler variant

Step 1 — Define your client universe

Create a configuration file that defines the companies you are monitoring and the alert routing rules.
mkdir -p compliance-screening-agent
cd compliance-screening-agent
Create config.json:
{
  "clients": [
    {
      "name": "Portfolio A",
      "tickers": ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA"],
      "alert_email": "compliance-team-a@yourfirm.com"
    },
    {
      "name": "Portfolio B",
      "tickers": ["JPM", "GS", "MS", "BAC", "WFC"],
      "alert_email": "compliance-team-b@yourfirm.com"
    }
  ],
  "severity_rules": {
    "enforcement": "critical",
    "restatement": "high",
    "auditor_change": "high",
    "officer_change": "medium"
  },
  "slack_webhook_url": "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
}

Step 2 — Register webhook endpoints

Set up webhook endpoints with the OMNI Datastream API to receive push notifications for compliance events.
# Register a webhook endpoint for compliance events
curl -X POST \
  -H "x-api-key: $OMNI_DATASTREAM_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "url": "https://your-server.com/webhooks/compliance",
    "description": "Compliance screening agent",
    "events": [
      "enforcement.new",
      "restatement.new",
      "auditor_change.new",
      "officer_change.new"
    ]
  }' \
  "https://api.secapi.ai/v1/webhook_endpoints"
Save the returned id and signing_secret for use in your handler.

Step 3 — Build the webhook handler

Create handler.py with a Flask server that receives and processes compliance events.
"""Compliance Screening Agent — webhook handler for regulatory events."""

import hashlib
import hmac
import json
import os
from datetime import datetime

import requests
from flask import Flask, request, jsonify

app = Flask(__name__)

# Configuration
SIGNING_SECRET = os.environ.get("WEBHOOK_SIGNING_SECRET", "whsec_...")
OMNI_API_KEY = os.environ["OMNI_DATASTREAM_API_KEY"]
API_BASE = "https://api.secapi.ai"

with open("config.json") as f:
    CONFIG = json.load(f)

# Build a reverse lookup: ticker -> client info
TICKER_TO_CLIENT = {}
for client in CONFIG["clients"]:
    for ticker in client["tickers"]:
        TICKER_TO_CLIENT[ticker.upper()] = client


def verify_signature(payload: bytes, signature: str) -> bool:
    """Verify the webhook signature using HMAC-SHA256."""
    expected = hmac.new(
        SIGNING_SECRET.encode(), payload, hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(signature, expected)


def classify_severity(event_type: str, data: dict) -> str:
    """Classify the alert severity based on event type and content."""
    base_severity = CONFIG["severity_rules"].get(event_type, "medium")

    # Upgrade severity for specific conditions
    if event_type == "enforcement" and data.get("penalty_amount", 0) > 1_000_000:
        return "critical"
    if event_type == "restatement" and data.get("restatement_type") == "material":
        return "critical"
    if event_type == "auditor_change" and data.get("reason") == "disagreement":
        return "critical"
    if event_type == "officer_change" and data.get("role") in ("CEO", "CFO"):
        return "high"

    return base_severity


def format_alert(event_type: str, data: dict, severity: str) -> dict:
    """Format a compliance alert for Slack delivery."""
    severity_emoji = {
        "critical": "🔴",
        "high": "🟠",
        "medium": "🟡",
        "low": "🟢",
    }
    emoji = severity_emoji.get(severity, "⚪")

    ticker = data.get("ticker", "UNKNOWN")
    company = data.get("company_name", ticker)

    templates = {
        "enforcement": (
            f"{emoji} *ENFORCEMENT ACTION* — {company} ({ticker})\n"
            f"*Violation:* {data.get('violation_type', 'N/A')}\n"
            f"*Respondent:* {data.get('respondent_name', 'N/A')}\n"
            f"*Penalty:* ${data.get('penalty_amount', 0):,.0f}\n"
            f"*Date:* {data.get('date', 'N/A')}"
        ),
        "restatement": (
            f"{emoji} *FINANCIAL RESTATEMENT* — {company} ({ticker})\n"
            f"*Type:* {data.get('restatement_type', 'N/A')}\n"
            f"*Periods affected:* {data.get('periods_affected', 'N/A')}\n"
            f"*Filed:* {data.get('filed_at', 'N/A')}"
        ),
        "auditor_change": (
            f"{emoji} *AUDITOR CHANGE* — {company} ({ticker})\n"
            f"*Previous:* {data.get('previous_auditor', 'N/A')}\n"
            f"*New:* {data.get('new_auditor', 'N/A')}\n"
            f"*Reason:* {data.get('reason', 'N/A')}\n"
            f"*Effective:* {data.get('effective_date', 'N/A')}"
        ),
        "officer_change": (
            f"{emoji} *OFFICER CHANGE* — {company} ({ticker})\n"
            f"*Officer:* {data.get('officer_name', 'N/A')}\n"
            f"*Role:* {data.get('role', 'N/A')}\n"
            f"*Change:* {data.get('change_type', 'N/A')}\n"
            f"*Effective:* {data.get('effective_date', 'N/A')}"
        ),
    }

    return {
        "text": templates.get(event_type, f"{emoji} Unknown event: {event_type}"),
        "severity": severity,
        "ticker": ticker,
        "event_type": event_type,
        "timestamp": datetime.utcnow().isoformat(),
    }


def send_slack_alert(alert: dict):
    """Send a formatted alert to the Slack webhook."""
    payload = {
        "blocks": [
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": alert["text"]},
            },
            {
                "type": "context",
                "elements": [
                    {
                        "type": "mrkdwn",
                        "text": f"Severity: *{alert['severity'].upper()}* | {alert['timestamp']}",
                    }
                ],
            },
        ]
    }

    resp = requests.post(CONFIG["slack_webhook_url"], json=payload)
    resp.raise_for_status()


@app.route("/webhooks/compliance", methods=["POST"])
def handle_compliance_webhook():
    """Handle incoming compliance event webhooks."""
    # Verify signature
    signature = request.headers.get("X-Webhook-Signature", "")
    if not verify_signature(request.data, signature):
        return jsonify({"error": "Invalid signature"}), 401

    payload = request.json
    event_type = payload.get("event", "").replace(".new", "")
    data = payload.get("data", {})
    ticker = data.get("ticker", "").upper()

    # Check if this ticker is in our monitored universe
    client = TICKER_TO_CLIENT.get(ticker)
    if not client:
        return jsonify({"status": "ignored", "reason": "not in watchlist"}), 200

    # Classify and format
    severity = classify_severity(event_type, data)
    alert = format_alert(event_type, data, severity)

    # Send alert
    send_slack_alert(alert)

    # Log for audit trail
    log_entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "event_type": event_type,
        "ticker": ticker,
        "severity": severity,
        "client": client["name"],
        "data": data,
    }
    with open("audit_log.jsonl", "a") as f:
        f.write(json.dumps(log_entry) + "\n")

    return jsonify({"status": "processed", "severity": severity}), 200


@app.route("/health", methods=["GET"])
def health():
    return jsonify({"status": "ok"}), 200


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

Step 4 — Build the polling fallback

For environments where webhooks are not feasible, create poller.py that checks for new events on a schedule.
"""Compliance Screening Agent — polling fallback for environments without webhooks."""

import json
import os
from datetime import datetime, timedelta

import requests

OMNI_API_KEY = os.environ["OMNI_DATASTREAM_API_KEY"]
API_BASE = "https://api.secapi.ai"
HEADERS = {"x-api-key": OMNI_API_KEY}

with open("config.json") as f:
    CONFIG = json.load(f)

ALL_TICKERS = []
for client in CONFIG["clients"]:
    ALL_TICKERS.extend(client["tickers"])
ALL_TICKERS = list(set(ALL_TICKERS))


def load_state():
    try:
        with open("poller_state.json") as f:
            return json.load(f)
    except FileNotFoundError:
        return {"last_checked": None}


def save_state(state):
    with open("poller_state.json", "w") as f:
        json.dump(state, f, indent=2)


def check_enforcement_actions(since: str) -> list[dict]:
    """Check for new enforcement actions since the given date."""
    events = []
    for ticker in ALL_TICKERS:
        resp = requests.get(
            f"{API_BASE}/v1/events/enforcement",
            headers=HEADERS,
            params={"ticker": ticker, "start_date": since, "limit": 50},
        )
        if resp.ok:
            data = resp.json().get("data", [])
            for item in data:
                item["_event_type"] = "enforcement"
            events.extend(data)
    return events


def check_restatements(since: str) -> list[dict]:
    """Check for new financial restatements since the given date."""
    events = []
    for ticker in ALL_TICKERS:
        resp = requests.get(
            f"{API_BASE}/v1/events/restatements",
            headers=HEADERS,
            params={"ticker": ticker, "start_date": since, "limit": 50},
        )
        if resp.ok:
            data = resp.json().get("data", [])
            for item in data:
                item["_event_type"] = "restatement"
            events.extend(data)
    return events


def check_auditor_changes(since: str) -> list[dict]:
    """Check for auditor changes since the given date."""
    events = []
    for ticker in ALL_TICKERS:
        resp = requests.get(
            f"{API_BASE}/v1/events/auditor-changes",
            headers=HEADERS,
            params={"ticker": ticker, "start_date": since, "limit": 50},
        )
        if resp.ok:
            data = resp.json().get("data", [])
            for item in data:
                item["_event_type"] = "auditor_change"
            events.extend(data)
    return events


def check_officer_changes(since: str) -> list[dict]:
    """Check for officer changes since the given date."""
    events = []
    for ticker in ALL_TICKERS:
        resp = requests.get(
            f"{API_BASE}/v1/events/officer-changes",
            headers=HEADERS,
            params={"ticker": ticker, "start_date": since, "limit": 50},
        )
        if resp.ok:
            data = resp.json().get("data", [])
            for item in data:
                item["_event_type"] = "officer_change"
            events.extend(data)
    return events


def run_poll():
    """Run a single polling cycle."""
    state = load_state()
    since = state.get("last_checked") or (
        datetime.utcnow() - timedelta(days=1)
    ).strftime("%Y-%m-%d")

    print(f"Polling for compliance events since {since}...")

    all_events = []
    all_events.extend(check_enforcement_actions(since))
    all_events.extend(check_restatements(since))
    all_events.extend(check_auditor_changes(since))
    all_events.extend(check_officer_changes(since))

    print(f"Found {len(all_events)} event(s).")

    for event in all_events:
        event_type = event.pop("_event_type")
        severity = CONFIG["severity_rules"].get(event_type, "medium")
        ticker = event.get("ticker", "UNKNOWN")
        print(f"  [{severity.upper()}] {event_type}: {ticker}{event.get('date', 'N/A')}")

    # Update state
    state["last_checked"] = datetime.utcnow().strftime("%Y-%m-%d")
    save_state(state)

    return all_events


if __name__ == "__main__":
    run_poll()

Step 5 — Deploy the webhook handler

Run the handler locally for testing with ngrok, then deploy to your server.

Local testing with ngrok

# Terminal 1: Start the handler
pip install flask requests
python handler.py

# Terminal 2: Expose with ngrok
ngrok http 5000
Copy the ngrok URL and update your webhook endpoint:
curl -X PATCH \
  -H "x-api-key: $OMNI_DATASTREAM_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"url": "https://your-ngrok-url.ngrok.io/webhooks/compliance"}' \
  "https://api.secapi.ai/v1/webhook_endpoints/we_abc123"

Production deployment (Docker)

Create a Dockerfile:
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 5000
CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:5000", "handler:app"]
Create requirements.txt:
flask>=3.0.0
requests>=2.31.0
gunicorn>=21.2.0
Build and run:
docker build -t compliance-agent .
docker run -d -p 5000:5000 \
  -e OMNI_DATASTREAM_API_KEY="your-api-key" \
  -e WEBHOOK_SIGNING_SECRET="whsec_..." \
  compliance-agent

Step 6 — Test with sample events

Verify the handler processes events correctly by sending a test payload.
curl -X POST \
  -H "Content-Type: application/json" \
  -H "X-Webhook-Signature: test" \
  -d '{
    "event": "enforcement.new",
    "timestamp": "2025-04-11T14:30:00Z",
    "data": {
      "ticker": "AAPL",
      "company_name": "Apple Inc",
      "violation_type": "Disclosure Violation",
      "respondent_name": "Apple Inc",
      "penalty_amount": 500000,
      "date": "2025-04-11"
    }
  }' \
  "http://localhost:5000/webhooks/compliance"

Expected output

The handler will return:
{
  "status": "processed",
  "severity": "high"
}
And your Slack channel will receive a formatted alert with the enforcement action details.

Step 7 — Set up the polling schedule

For the polling fallback, schedule it to run every 4 hours during business days.
# Crontab: every 4 hours on weekdays
0 8,12,16,20 * * 1-5 cd /path/to/compliance-screening-agent && python poller.py >> /var/log/compliance-poller.log 2>&1

Next steps

  • Add email alerts: Extend the handler to send email notifications using the alert_email from the config for critical severity events.
  • Build a compliance dashboard: Store events in a database and build a web UI that shows the compliance status for each portfolio.
  • Regulatory filing analysis: Use the /v1/intelligence/query endpoint to analyze filing content and surface potential compliance issues.
  • Integration with GRC platforms: Forward alerts to ServiceNow, Archer, or other governance, risk, and compliance platforms via their APIs.
See the Search SEC Enforcement Actions tutorial for more on the enforcement data model.