Documentation Index
Fetch the complete documentation index at: https://docs.secapi.ai/llms.txt
Use this file to discover all available pages before exploring further.
Build a Risk Factor Analysis Agent
Risk factors in 10-K filings reveal what a company’s management believes are the most significant threats to the business. When risk factors change year over year, it signals shifting concerns. This tutorial builds an agent that uses OMNI Datastream’s semantic search and filing section endpoints to compare risk factors across annual reports and identify what has changed.
What you will build
- A Python script that fetches risk factor sections from consecutive 10-K filings
- Semantic search to find risk factors matching specific themes
- A diffing engine that identifies new risks, removed risks, and modified language
- A structured markdown report of risk factor changes
Prerequisites
- An Omni Datastream API key (set as
OMNI_DATASTREAM_API_KEY)
- Python 3.9+
- Basic familiarity with SEC 10-K filings
Step 1 — Set up the project
Create the project and install dependencies.
mkdir -p risk-factor-analysis-agent
cd risk-factor-analysis-agent
Create requirements.txt:
requests>=2.31.0
python-dotenv>=1.0.0
difflib
Note: difflib is part of the Python standard library and does not need to be installed separately, but is listed here for documentation purposes.
Install dependencies:
pip install -r requirements.txt
Create a .env file:
OMNI_DATASTREAM_API_KEY=your-api-key
Step 2 — Fetch risk factor sections from 10-K filings
Create risk_analyzer.py with functions to retrieve risk factor text from annual reports.
"""Risk Factor Analysis Agent — compare risk factors across annual reports."""
import os
import sys
from datetime import datetime
from difflib import SequenceMatcher, unified_diff
import requests
from dotenv import load_dotenv
load_dotenv()
API_BASE = "https://api.secapi.ai"
API_KEY = os.environ["OMNI_DATASTREAM_API_KEY"]
HEADERS = {"x-api-key": API_KEY}
def fetch_annual_filings(ticker: str, limit: int = 5) -> list[dict]:
"""Fetch the most recent 10-K filings for a company."""
resp = requests.get(
f"{API_BASE}/v1/filings",
headers=HEADERS,
params={
"ticker": ticker,
"form_type": "10-K",
"limit": limit,
"sort": "filed_at:desc",
},
)
resp.raise_for_status()
return resp.json().get("data", [])
def fetch_risk_factors(accession_number: str) -> str:
"""Fetch the Item 1A (Risk Factors) section from a filing."""
resp = requests.get(
f"{API_BASE}/v1/filings/{accession_number}/sections/1A",
headers=HEADERS,
)
if resp.status_code == 404:
return ""
resp.raise_for_status()
data = resp.json()
return data.get("data", {}).get("text", "")
def fetch_risk_categories(ticker: str) -> list[dict]:
"""Fetch AI-classified risk categories from the latest 10-K."""
resp = requests.get(
f"{API_BASE}/v1/filings/latest/risk-categories",
headers=HEADERS,
params={"ticker": ticker},
)
if resp.status_code == 404:
return []
resp.raise_for_status()
return resp.json().get("data", [])
Step 3 — Use semantic search to find themed risks
Add a function that searches across filings for risk factors matching a specific theme.
def search_risk_factors(query: str, ticker: str = None, limit: int = 10) -> list[dict]:
"""Use semantic search to find risk factor passages matching a theme.
Args:
query: Natural language description of the risk theme.
ticker: Optional ticker to scope the search.
limit: Maximum number of results.
"""
payload = {
"query": query,
"section": "1A",
"limit": limit,
}
if ticker:
payload["ticker"] = ticker
resp = requests.post(
f"{API_BASE}/v1/sections/search",
headers={**HEADERS, "Content-Type": "application/json"},
json=payload,
)
resp.raise_for_status()
return resp.json().get("data", [])
Step 4 — Build the risk factor diff engine
Add logic that splits risk factor sections into individual risk items and compares them across years.
def split_into_risks(text: str) -> list[dict]:
"""Split a risk factor section into individual risk items.
Most 10-K risk factor sections use bold headings or specific patterns
to delineate individual risks. This function handles common formats.
"""
if not text:
return []
risks = []
lines = text.split("\n")
current_title = ""
current_body = []
for line in lines:
stripped = line.strip()
if not stripped:
continue
# Detect risk headings (typically short, bold, or ending with period)
is_heading = (
len(stripped) < 200
and (stripped.isupper() or stripped.endswith("."))
and len(stripped.split()) < 30
)
if is_heading and current_body:
risks.append({
"title": current_title,
"body": " ".join(current_body).strip(),
})
current_title = stripped
current_body = []
elif is_heading and not current_body:
current_title = stripped
else:
current_body.append(stripped)
# Add the last risk
if current_title or current_body:
risks.append({
"title": current_title,
"body": " ".join(current_body).strip(),
})
return risks
def match_risks(
current_risks: list[dict], previous_risks: list[dict], threshold: float = 0.6
) -> dict:
"""Match risk factors between two years using text similarity.
Returns a dict with: new_risks, removed_risks, modified_risks, unchanged_risks.
"""
result = {
"new_risks": [],
"removed_risks": [],
"modified_risks": [],
"unchanged_risks": [],
}
matched_prev = set()
for curr in current_risks:
best_score = 0
best_match = None
best_idx = -1
for idx, prev in enumerate(previous_risks):
if idx in matched_prev:
continue
# Compare titles first, then body
title_sim = SequenceMatcher(
None, curr["title"].lower(), prev["title"].lower()
).ratio()
body_sim = SequenceMatcher(
None, curr["body"][:500].lower(), prev["body"][:500].lower()
).ratio()
# Weighted score: title matters more for matching
score = title_sim * 0.4 + body_sim * 0.6
if score > best_score:
best_score = score
best_match = prev
best_idx = idx
if best_score >= threshold:
matched_prev.add(best_idx)
# Check if the content actually changed
body_sim = SequenceMatcher(
None, curr["body"], best_match["body"]
).ratio()
if body_sim >= 0.95:
result["unchanged_risks"].append(curr)
else:
result["modified_risks"].append({
"current": curr,
"previous": best_match,
"similarity": body_sim,
})
else:
result["new_risks"].append(curr)
# Any unmatched previous risks were removed
for idx, prev in enumerate(previous_risks):
if idx not in matched_prev:
result["removed_risks"].append(prev)
return result
Step 5 — Generate the analysis report
Add the report generator that presents the findings in markdown.
def generate_diff_snippet(current: str, previous: str, context: int = 3) -> str:
"""Generate a unified diff snippet between two risk texts."""
curr_lines = current.split(". ")
prev_lines = previous.split(". ")
diff = list(unified_diff(
prev_lines, curr_lines,
fromfile="Previous Year", tofile="Current Year",
lineterm="",
n=context,
))
if diff:
return "```diff\n" + "\n".join(diff[:20]) + "\n```"
return "_Minor wording changes only._"
def generate_report(
ticker: str,
current_period: str,
previous_period: str,
risk_changes: dict,
categories: list[dict],
theme_results: list[dict],
) -> str:
"""Generate a markdown risk factor analysis report."""
lines = []
now = datetime.now().strftime("%Y-%m-%d %H:%M UTC")
lines.append(f"# Risk Factor Analysis: {ticker}")
lines.append("")
lines.append(f"**Current filing:** {current_period} ")
lines.append(f"**Previous filing:** {previous_period} ")
lines.append(f"**Generated:** {now}")
lines.append("")
# Summary
lines.append("## Summary")
lines.append("")
lines.append("| Category | Count |")
lines.append("|----------|-------|")
lines.append(f"| New risks | {len(risk_changes['new_risks'])} |")
lines.append(f"| Removed risks | {len(risk_changes['removed_risks'])} |")
lines.append(f"| Modified risks | {len(risk_changes['modified_risks'])} |")
lines.append(f"| Unchanged risks | {len(risk_changes['unchanged_risks'])} |")
lines.append("")
# AI-classified risk categories
if categories:
lines.append("## Risk Categories (AI-Classified)")
lines.append("")
for cat in categories:
name = cat.get("category", "Unknown")
count = cat.get("count", 0)
lines.append(f"- **{name}**: {count} risk factors")
lines.append("")
# New risks
if risk_changes["new_risks"]:
lines.append("## New Risks (not in previous filing)")
lines.append("")
for risk in risk_changes["new_risks"]:
lines.append(f"### {risk['title']}")
lines.append("")
lines.append(risk["body"][:500] + ("..." if len(risk["body"]) > 500 else ""))
lines.append("")
# Removed risks
if risk_changes["removed_risks"]:
lines.append("## Removed Risks (no longer in current filing)")
lines.append("")
for risk in risk_changes["removed_risks"]:
lines.append(f"### ~~{risk['title']}~~")
lines.append("")
lines.append(risk["body"][:300] + ("..." if len(risk["body"]) > 300 else ""))
lines.append("")
# Modified risks
if risk_changes["modified_risks"]:
lines.append("## Modified Risks")
lines.append("")
for mod in risk_changes["modified_risks"][:10]:
sim_pct = mod["similarity"] * 100
lines.append(f"### {mod['current']['title']}")
lines.append(f"*Similarity: {sim_pct:.0f}%*")
lines.append("")
lines.append(generate_diff_snippet(mod["current"]["body"], mod["previous"]["body"]))
lines.append("")
# Theme search results
if theme_results:
lines.append("## Thematic Risk Search Results")
lines.append("")
for result in theme_results:
company = result.get("company_name", "Unknown")
form = result.get("form", "10-K")
filed = result.get("filed_at", "N/A")
score = result.get("score", 0)
text = result.get("text", "")[:300]
lines.append(f"**{company}** ({form}, {filed}) — relevance: {score:.3f}")
lines.append(f"> {text}...")
lines.append("")
lines.append("---")
lines.append(
"*Data sourced from SEC EDGAR via OMNI Datastream API. "
"Risk factor analysis is automated and should be reviewed by a human analyst.*"
)
return "\n".join(lines)
Step 6 — Wire up the main function
Add the entry point that orchestrates the full analysis.
def main():
ticker = sys.argv[1] if len(sys.argv) > 1 else "AAPL"
theme = sys.argv[2] if len(sys.argv) > 2 else None
print(f"Analyzing risk factors for {ticker}...")
# Step 1: Get recent 10-K filings
filings = fetch_annual_filings(ticker, limit=3)
if len(filings) < 2:
print(f"Need at least 2 annual filings for {ticker}. Only found {len(filings)}.")
sys.exit(1)
current_filing = filings[0]
previous_filing = filings[1]
print(f"Comparing {current_filing['filed_at']} vs {previous_filing['filed_at']}")
# Step 2: Fetch risk factor sections
print("Fetching risk factor sections...")
current_text = fetch_risk_factors(current_filing["accession_number"])
previous_text = fetch_risk_factors(previous_filing["accession_number"])
if not current_text:
print("Could not retrieve current risk factors. Exiting.")
sys.exit(1)
# Step 3: Split into individual risks
current_risks = split_into_risks(current_text)
previous_risks = split_into_risks(previous_text)
print(f"Current: {len(current_risks)} risks | Previous: {len(previous_risks)} risks")
# Step 4: Compare
risk_changes = match_risks(current_risks, previous_risks)
print(
f"New: {len(risk_changes['new_risks'])} | "
f"Removed: {len(risk_changes['removed_risks'])} | "
f"Modified: {len(risk_changes['modified_risks'])} | "
f"Unchanged: {len(risk_changes['unchanged_risks'])}"
)
# Step 5: Get AI risk categories
categories = fetch_risk_categories(ticker)
# Step 6: Run thematic search if a theme was provided
theme_results = []
if theme:
print(f"Searching for risks related to: {theme}")
theme_results = search_risk_factors(theme, ticker=ticker, limit=5)
# Step 7: Generate report
report = generate_report(
ticker,
current_filing["filed_at"],
previous_filing["filed_at"],
risk_changes,
categories,
theme_results,
)
filename = f"risk-analysis-{ticker.lower()}-{current_filing['filed_at']}.md"
with open(filename, "w") as f:
f.write(report)
print(f"\nReport saved to {filename}")
if __name__ == "__main__":
main()
Step 7 — Run the analyzer
Execute the script to analyze risk factors for any company.
# Basic analysis — compare latest two 10-K filings
python risk_analyzer.py AAPL
# With thematic search — also find risks related to a specific topic
python risk_analyzer.py NVDA "artificial intelligence regulation"
python risk_analyzer.py JPM "climate change and ESG"
Expected output
Analyzing risk factors for AAPL...
Comparing 2024-10-31 vs 2023-10-28
Fetching risk factor sections...
Current: 32 risks | Previous: 30 risks
New: 3 | Removed: 1 | Modified: 8 | Unchanged: 21
Report saved to risk-analysis-aapl-2024-10-31.md
The generated report will include sections like:
## New Risks (not in previous filing)
### Risks related to artificial intelligence
The Company is increasingly incorporating artificial intelligence
technologies into its products and services. The development and use of AI
involves significant technical, legal, and regulatory challenges...
## Removed Risks (no longer in current filing)
### ~~Risks related to COVID-19 pandemic~~
Previously disclosed risks related to the ongoing impact of COVID-19
on global supply chains and consumer demand...
## Modified Risks
### Risks related to China and international operations
*Similarity: 72%*
```diff
- The Company's operations in China face risks related to trade tensions
+ The Company's operations in China face risks related to trade tensions and
+ new regulatory requirements for data localization
Next steps
- Track risk evolution over time: Run the analyzer across 5+ years of filings to see how risk disclosures evolve and create a timeline visualization.
- Cross-company comparison: Compare risk factors across competitors in the same industry to identify shared risks and unique vulnerabilities.
- Automated alerts: Combine with the compliance screening agent to get notified when a company’s risk factors change significantly in a new filing.
- Quantify risk sentiment: Use the OMNI intelligence query endpoint to score the severity and likelihood of each risk factor.
See the Find Risk Factors with Semantic Search tutorial for more on the semantic search capabilities.