Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.secapi.ai/llms.txt

Use this file to discover all available pages before exploring further.

Build a Risk Factor Analysis Agent

Risk factors in 10-K filings reveal what a company’s management believes are the most significant threats to the business. When risk factors change year over year, it signals shifting concerns. This tutorial builds an agent that uses OMNI Datastream’s semantic search and filing section endpoints to compare risk factors across annual reports and identify what has changed.

What you will build

  • A Python script that fetches risk factor sections from consecutive 10-K filings
  • Semantic search to find risk factors matching specific themes
  • A diffing engine that identifies new risks, removed risks, and modified language
  • A structured markdown report of risk factor changes

Prerequisites

  • An Omni Datastream API key (set as OMNI_DATASTREAM_API_KEY)
  • Python 3.9+
  • Basic familiarity with SEC 10-K filings

Step 1 — Set up the project

Create the project and install dependencies.
mkdir -p risk-factor-analysis-agent
cd risk-factor-analysis-agent
Create requirements.txt:
requests>=2.31.0
python-dotenv>=1.0.0
difflib
Note: difflib is part of the Python standard library and does not need to be installed separately, but is listed here for documentation purposes. Install dependencies:
pip install -r requirements.txt
Create a .env file:
OMNI_DATASTREAM_API_KEY=your-api-key

Step 2 — Fetch risk factor sections from 10-K filings

Create risk_analyzer.py with functions to retrieve risk factor text from annual reports.
"""Risk Factor Analysis Agent — compare risk factors across annual reports."""

import os
import sys
from datetime import datetime
from difflib import SequenceMatcher, unified_diff

import requests
from dotenv import load_dotenv

load_dotenv()

API_BASE = "https://api.secapi.ai"
API_KEY = os.environ["OMNI_DATASTREAM_API_KEY"]
HEADERS = {"x-api-key": API_KEY}


def fetch_annual_filings(ticker: str, limit: int = 5) -> list[dict]:
    """Fetch the most recent 10-K filings for a company."""
    resp = requests.get(
        f"{API_BASE}/v1/filings",
        headers=HEADERS,
        params={
            "ticker": ticker,
            "form_type": "10-K",
            "limit": limit,
            "sort": "filed_at:desc",
        },
    )
    resp.raise_for_status()
    return resp.json().get("data", [])


def fetch_risk_factors(accession_number: str) -> str:
    """Fetch the Item 1A (Risk Factors) section from a filing."""
    resp = requests.get(
        f"{API_BASE}/v1/filings/{accession_number}/sections/1A",
        headers=HEADERS,
    )
    if resp.status_code == 404:
        return ""
    resp.raise_for_status()
    data = resp.json()
    return data.get("data", {}).get("text", "")


def fetch_risk_categories(ticker: str) -> list[dict]:
    """Fetch AI-classified risk categories from the latest 10-K."""
    resp = requests.get(
        f"{API_BASE}/v1/filings/latest/risk-categories",
        headers=HEADERS,
        params={"ticker": ticker},
    )
    if resp.status_code == 404:
        return []
    resp.raise_for_status()
    return resp.json().get("data", [])

Step 3 — Use semantic search to find themed risks

Add a function that searches across filings for risk factors matching a specific theme.
def search_risk_factors(query: str, ticker: str = None, limit: int = 10) -> list[dict]:
    """Use semantic search to find risk factor passages matching a theme.

    Args:
        query: Natural language description of the risk theme.
        ticker: Optional ticker to scope the search.
        limit: Maximum number of results.
    """
    payload = {
        "query": query,
        "section": "1A",
        "limit": limit,
    }
    if ticker:
        payload["ticker"] = ticker

    resp = requests.post(
        f"{API_BASE}/v1/sections/search",
        headers={**HEADERS, "Content-Type": "application/json"},
        json=payload,
    )
    resp.raise_for_status()
    return resp.json().get("data", [])

Step 4 — Build the risk factor diff engine

Add logic that splits risk factor sections into individual risk items and compares them across years.
def split_into_risks(text: str) -> list[dict]:
    """Split a risk factor section into individual risk items.

    Most 10-K risk factor sections use bold headings or specific patterns
    to delineate individual risks. This function handles common formats.
    """
    if not text:
        return []

    risks = []
    lines = text.split("\n")
    current_title = ""
    current_body = []

    for line in lines:
        stripped = line.strip()
        if not stripped:
            continue

        # Detect risk headings (typically short, bold, or ending with period)
        is_heading = (
            len(stripped) < 200
            and (stripped.isupper() or stripped.endswith("."))
            and len(stripped.split()) < 30
        )

        if is_heading and current_body:
            risks.append({
                "title": current_title,
                "body": " ".join(current_body).strip(),
            })
            current_title = stripped
            current_body = []
        elif is_heading and not current_body:
            current_title = stripped
        else:
            current_body.append(stripped)

    # Add the last risk
    if current_title or current_body:
        risks.append({
            "title": current_title,
            "body": " ".join(current_body).strip(),
        })

    return risks


def match_risks(
    current_risks: list[dict], previous_risks: list[dict], threshold: float = 0.6
) -> dict:
    """Match risk factors between two years using text similarity.

    Returns a dict with: new_risks, removed_risks, modified_risks, unchanged_risks.
    """
    result = {
        "new_risks": [],
        "removed_risks": [],
        "modified_risks": [],
        "unchanged_risks": [],
    }

    matched_prev = set()

    for curr in current_risks:
        best_score = 0
        best_match = None
        best_idx = -1

        for idx, prev in enumerate(previous_risks):
            if idx in matched_prev:
                continue

            # Compare titles first, then body
            title_sim = SequenceMatcher(
                None, curr["title"].lower(), prev["title"].lower()
            ).ratio()
            body_sim = SequenceMatcher(
                None, curr["body"][:500].lower(), prev["body"][:500].lower()
            ).ratio()

            # Weighted score: title matters more for matching
            score = title_sim * 0.4 + body_sim * 0.6

            if score > best_score:
                best_score = score
                best_match = prev
                best_idx = idx

        if best_score >= threshold:
            matched_prev.add(best_idx)
            # Check if the content actually changed
            body_sim = SequenceMatcher(
                None, curr["body"], best_match["body"]
            ).ratio()
            if body_sim >= 0.95:
                result["unchanged_risks"].append(curr)
            else:
                result["modified_risks"].append({
                    "current": curr,
                    "previous": best_match,
                    "similarity": body_sim,
                })
        else:
            result["new_risks"].append(curr)

    # Any unmatched previous risks were removed
    for idx, prev in enumerate(previous_risks):
        if idx not in matched_prev:
            result["removed_risks"].append(prev)

    return result

Step 5 — Generate the analysis report

Add the report generator that presents the findings in markdown.
def generate_diff_snippet(current: str, previous: str, context: int = 3) -> str:
    """Generate a unified diff snippet between two risk texts."""
    curr_lines = current.split(". ")
    prev_lines = previous.split(". ")
    diff = list(unified_diff(
        prev_lines, curr_lines,
        fromfile="Previous Year", tofile="Current Year",
        lineterm="",
        n=context,
    ))
    if diff:
        return "```diff\n" + "\n".join(diff[:20]) + "\n```"
    return "_Minor wording changes only._"


def generate_report(
    ticker: str,
    current_period: str,
    previous_period: str,
    risk_changes: dict,
    categories: list[dict],
    theme_results: list[dict],
) -> str:
    """Generate a markdown risk factor analysis report."""
    lines = []
    now = datetime.now().strftime("%Y-%m-%d %H:%M UTC")

    lines.append(f"# Risk Factor Analysis: {ticker}")
    lines.append("")
    lines.append(f"**Current filing:** {current_period}  ")
    lines.append(f"**Previous filing:** {previous_period}  ")
    lines.append(f"**Generated:** {now}")
    lines.append("")

    # Summary
    lines.append("## Summary")
    lines.append("")
    lines.append("| Category | Count |")
    lines.append("|----------|-------|")
    lines.append(f"| New risks | {len(risk_changes['new_risks'])} |")
    lines.append(f"| Removed risks | {len(risk_changes['removed_risks'])} |")
    lines.append(f"| Modified risks | {len(risk_changes['modified_risks'])} |")
    lines.append(f"| Unchanged risks | {len(risk_changes['unchanged_risks'])} |")
    lines.append("")

    # AI-classified risk categories
    if categories:
        lines.append("## Risk Categories (AI-Classified)")
        lines.append("")
        for cat in categories:
            name = cat.get("category", "Unknown")
            count = cat.get("count", 0)
            lines.append(f"- **{name}**: {count} risk factors")
        lines.append("")

    # New risks
    if risk_changes["new_risks"]:
        lines.append("## New Risks (not in previous filing)")
        lines.append("")
        for risk in risk_changes["new_risks"]:
            lines.append(f"### {risk['title']}")
            lines.append("")
            lines.append(risk["body"][:500] + ("..." if len(risk["body"]) > 500 else ""))
            lines.append("")

    # Removed risks
    if risk_changes["removed_risks"]:
        lines.append("## Removed Risks (no longer in current filing)")
        lines.append("")
        for risk in risk_changes["removed_risks"]:
            lines.append(f"### ~~{risk['title']}~~")
            lines.append("")
            lines.append(risk["body"][:300] + ("..." if len(risk["body"]) > 300 else ""))
            lines.append("")

    # Modified risks
    if risk_changes["modified_risks"]:
        lines.append("## Modified Risks")
        lines.append("")
        for mod in risk_changes["modified_risks"][:10]:
            sim_pct = mod["similarity"] * 100
            lines.append(f"### {mod['current']['title']}")
            lines.append(f"*Similarity: {sim_pct:.0f}%*")
            lines.append("")
            lines.append(generate_diff_snippet(mod["current"]["body"], mod["previous"]["body"]))
            lines.append("")

    # Theme search results
    if theme_results:
        lines.append("## Thematic Risk Search Results")
        lines.append("")
        for result in theme_results:
            company = result.get("company_name", "Unknown")
            form = result.get("form", "10-K")
            filed = result.get("filed_at", "N/A")
            score = result.get("score", 0)
            text = result.get("text", "")[:300]
            lines.append(f"**{company}** ({form}, {filed}) — relevance: {score:.3f}")
            lines.append(f"> {text}...")
            lines.append("")

    lines.append("---")
    lines.append(
        "*Data sourced from SEC EDGAR via OMNI Datastream API. "
        "Risk factor analysis is automated and should be reviewed by a human analyst.*"
    )

    return "\n".join(lines)

Step 6 — Wire up the main function

Add the entry point that orchestrates the full analysis.
def main():
    ticker = sys.argv[1] if len(sys.argv) > 1 else "AAPL"
    theme = sys.argv[2] if len(sys.argv) > 2 else None

    print(f"Analyzing risk factors for {ticker}...")

    # Step 1: Get recent 10-K filings
    filings = fetch_annual_filings(ticker, limit=3)
    if len(filings) < 2:
        print(f"Need at least 2 annual filings for {ticker}. Only found {len(filings)}.")
        sys.exit(1)

    current_filing = filings[0]
    previous_filing = filings[1]
    print(f"Comparing {current_filing['filed_at']} vs {previous_filing['filed_at']}")

    # Step 2: Fetch risk factor sections
    print("Fetching risk factor sections...")
    current_text = fetch_risk_factors(current_filing["accession_number"])
    previous_text = fetch_risk_factors(previous_filing["accession_number"])

    if not current_text:
        print("Could not retrieve current risk factors. Exiting.")
        sys.exit(1)

    # Step 3: Split into individual risks
    current_risks = split_into_risks(current_text)
    previous_risks = split_into_risks(previous_text)
    print(f"Current: {len(current_risks)} risks | Previous: {len(previous_risks)} risks")

    # Step 4: Compare
    risk_changes = match_risks(current_risks, previous_risks)
    print(
        f"New: {len(risk_changes['new_risks'])} | "
        f"Removed: {len(risk_changes['removed_risks'])} | "
        f"Modified: {len(risk_changes['modified_risks'])} | "
        f"Unchanged: {len(risk_changes['unchanged_risks'])}"
    )

    # Step 5: Get AI risk categories
    categories = fetch_risk_categories(ticker)

    # Step 6: Run thematic search if a theme was provided
    theme_results = []
    if theme:
        print(f"Searching for risks related to: {theme}")
        theme_results = search_risk_factors(theme, ticker=ticker, limit=5)

    # Step 7: Generate report
    report = generate_report(
        ticker,
        current_filing["filed_at"],
        previous_filing["filed_at"],
        risk_changes,
        categories,
        theme_results,
    )

    filename = f"risk-analysis-{ticker.lower()}-{current_filing['filed_at']}.md"
    with open(filename, "w") as f:
        f.write(report)
    print(f"\nReport saved to {filename}")


if __name__ == "__main__":
    main()

Step 7 — Run the analyzer

Execute the script to analyze risk factors for any company.
# Basic analysis — compare latest two 10-K filings
python risk_analyzer.py AAPL

# With thematic search — also find risks related to a specific topic
python risk_analyzer.py NVDA "artificial intelligence regulation"
python risk_analyzer.py JPM "climate change and ESG"

Expected output

Analyzing risk factors for AAPL...
Comparing 2024-10-31 vs 2023-10-28
Fetching risk factor sections...
Current: 32 risks | Previous: 30 risks
New: 3 | Removed: 1 | Modified: 8 | Unchanged: 21

Report saved to risk-analysis-aapl-2024-10-31.md
The generated report will include sections like:
## New Risks (not in previous filing)

### Risks related to artificial intelligence

The Company is increasingly incorporating artificial intelligence
technologies into its products and services. The development and use of AI
involves significant technical, legal, and regulatory challenges...

## Removed Risks (no longer in current filing)

### ~~Risks related to COVID-19 pandemic~~

Previously disclosed risks related to the ongoing impact of COVID-19
on global supply chains and consumer demand...

## Modified Risks

### Risks related to China and international operations
*Similarity: 72%*

```diff
- The Company's operations in China face risks related to trade tensions
+ The Company's operations in China face risks related to trade tensions and
+ new regulatory requirements for data localization

Next steps

  • Track risk evolution over time: Run the analyzer across 5+ years of filings to see how risk disclosures evolve and create a timeline visualization.
  • Cross-company comparison: Compare risk factors across competitors in the same industry to identify shared risks and unique vulnerabilities.
  • Automated alerts: Combine with the compliance screening agent to get notified when a company’s risk factors change significantly in a new filing.
  • Quantify risk sentiment: Use the OMNI intelligence query endpoint to score the severity and likelihood of each risk factor.
See the Find Risk Factors with Semantic Search tutorial for more on the semantic search capabilities.