#!/usr/bin/env python3
"""Audit which Zotero top-level items have generated AI child notes.
This script is deterministic and does not call any LLM.
It can build a local cache for fast repeated missing-note checks.
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import urllib.parse
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
try:
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
except Exception:
pass
LOCAL_ZOTERO = "http://127.0.0.1:23119/api/users/0"
def fail(message: str) -> None:
print(f"error: {message}", file=sys.stderr)
raise SystemExit(1)
def zotero_get(path: str) -> Any:
req = urllib.request.Request(LOCAL_ZOTERO + path, headers={"Zotero-API-Version": "3"})
with urllib.request.urlopen(req, timeout=30) as response:
return json.loads(response.read().decode("utf-8", errors="replace"))
def all_top_items() -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
start = 0
limit = 100
while True:
page = zotero_get("/items/top?" + urllib.parse.urlencode({"limit": limit, "start": start}))
if not page:
break
items.extend(page)
if len(page) < limit:
break
start += limit
return items
def item_summary(item: dict[str, Any]) -> dict[str, Any]:
data = item.get("data") or {}
return {
"key": item.get("key"),
"title": data.get("title"),
"itemType": data.get("itemType"),
"version": item.get("version") or data.get("version"),
"dateModified": data.get("dateModified"),
}
def note_is_generated_for_parent(note_html: str, parent_key: str) -> bool:
return (
f"items/{parent_key}" in note_html
or "AI Literature Note" in note_html
or bool(re.search(r"
[^<]*📝", note_html))
)
def scan_parent(parent_key: str) -> dict[str, Any]:
children = zotero_get(f"/items/{urllib.parse.quote(parent_key)}/children")
generated_notes: list[str] = []
child_note_count = 0
for child in children or []:
data = child.get("data") or {}
if data.get("itemType") != "note":
continue
child_note_count += 1
note_key = child.get("key")
if not note_key:
continue
full = zotero_get(f"/items/{urllib.parse.quote(note_key)}")
note_html = ((full.get("data") or {}).get("note") or "")
if note_is_generated_for_parent(note_html, parent_key):
generated_notes.append(note_key)
return {
"hasGeneratedNote": bool(generated_notes),
"generatedNoteKeys": generated_notes,
"childNoteCount": child_note_count,
}
def load_cache(path: Path) -> dict[str, Any]:
if not path.exists():
return {"items": {}}
try:
return json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {"items": {}}
def save_cache(path: Path, cache: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
cache["updatedAt"] = datetime.now(timezone.utc).isoformat()
path.write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8")
def audit(vault: Path, *, refresh: bool, rebuild: bool, limit: int | None) -> dict[str, Any]:
cache_path = vault / "00 Templater" / ".zotero-ai-notes-index.json"
cache = {"items": {}} if rebuild else load_cache(cache_path)
cached_items: dict[str, Any] = cache.setdefault("items", {})
top_items = all_top_items()
if limit:
top_items = top_items[:limit]
current_keys = set()
scanned = 0
for item in top_items:
summary = item_summary(item)
key = summary.get("key")
if not key:
continue
current_keys.add(key)
record = cached_items.get(key)
if rebuild or refresh or not record:
record = {**summary, **scan_parent(key)}
cached_items[key] = record
scanned += 1
else:
record.update(summary)
for key in list(cached_items):
if key not in current_keys:
cached_items[key]["deletedOrNotTopLevel"] = True
save_cache(cache_path, cache)
active_records = [
record for key, record in cached_items.items()
if key in current_keys and not record.get("deletedOrNotTopLevel")
]
missing = [record for record in active_records if not record.get("hasGeneratedNote")]
duplicates = [
record for record in active_records
if len(record.get("generatedNoteKeys") or []) > 1
]
return {
"cache": str(cache_path),
"total": len(active_records),
"withGeneratedNote": len(active_records) - len(missing),
"missingCount": len(missing),
"duplicateGeneratedNoteItems": len(duplicates),
"scannedParentsThisRun": scanned,
"missing": missing,
"duplicates": duplicates,
}
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--vault", default=str(Path.cwd()), help="Obsidian vault path")
parser.add_argument("--refresh", action="store_true", help="Rescan all current top-level parents")
parser.add_argument("--rebuild", action="store_true", help="Discard cache and rescan all current top-level parents")
parser.add_argument("--limit", type=int, help="Only audit the first N top-level items")
parser.add_argument("--keys-only", action="store_true", help="Print only missing item keys")
args = parser.parse_args()
result = audit(Path(args.vault).expanduser().resolve(), refresh=args.refresh, rebuild=args.rebuild, limit=args.limit)
if args.keys_only:
print(" ".join(record["key"] for record in result["missing"] if record.get("key")))
else:
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()