def split_into_risks(text: str) -> list[dict]:
"""Split a risk factor section into individual risk items.
Most 10-K risk factor sections use bold headings or specific patterns
to delineate individual risks. This function handles common formats.
"""
if not text:
return []
risks = []
lines = text.split("\n")
current_title = ""
current_body = []
for line in lines:
stripped = line.strip()
if not stripped:
continue
# Detect risk headings (typically short, bold, or ending with period)
is_heading = (
len(stripped) < 200
and (stripped.isupper() or stripped.endswith("."))
and len(stripped.split()) < 30
)
if is_heading and current_body:
risks.append({
"title": current_title,
"body": " ".join(current_body).strip(),
})
current_title = stripped
current_body = []
elif is_heading and not current_body:
current_title = stripped
else:
current_body.append(stripped)
# Add the last risk
if current_title or current_body:
risks.append({
"title": current_title,
"body": " ".join(current_body).strip(),
})
return risks
def match_risks(
current_risks: list[dict], previous_risks: list[dict], threshold: float = 0.6
) -> dict:
"""Match risk factors between two years using text similarity.
Returns a dict with: new_risks, removed_risks, modified_risks, unchanged_risks.
"""
result = {
"new_risks": [],
"removed_risks": [],
"modified_risks": [],
"unchanged_risks": [],
}
matched_prev = set()
for curr in current_risks:
best_score = 0
best_match = None
best_idx = -1
for idx, prev in enumerate(previous_risks):
if idx in matched_prev:
continue
# Compare titles first, then body
title_sim = SequenceMatcher(
None, curr["title"].lower(), prev["title"].lower()
).ratio()
body_sim = SequenceMatcher(
None, curr["body"][:500].lower(), prev["body"][:500].lower()
).ratio()
# Weighted score: title matters more for matching
score = title_sim * 0.4 + body_sim * 0.6
if score > best_score:
best_score = score
best_match = prev
best_idx = idx
if best_score >= threshold:
matched_prev.add(best_idx)
# Check if the content actually changed
body_sim = SequenceMatcher(
None, curr["body"], best_match["body"]
).ratio()
if body_sim >= 0.95:
result["unchanged_risks"].append(curr)
else:
result["modified_risks"].append({
"current": curr,
"previous": best_match,
"similarity": body_sim,
})
else:
result["new_risks"].append(curr)
# Any unmatched previous risks were removed
for idx, prev in enumerate(previous_risks):
if idx not in matched_prev:
result["removed_risks"].append(prev)
return result