181 lines
5.8 KiB
Python
181 lines
5.8 KiB
Python
#!/usr/bin/env python3
|
|
"""Audit which Zotero top-level items have generated AI child notes.
|
|
|
|
This script is deterministic and does not call any LLM.
|
|
It can build a local cache for fast repeated missing-note checks.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
import urllib.parse
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
try:
|
|
sys.stdout.reconfigure(encoding="utf-8")
|
|
sys.stderr.reconfigure(encoding="utf-8")
|
|
except Exception:
|
|
pass
|
|
|
|
LOCAL_ZOTERO = "http://127.0.0.1:23119/api/users/0"
|
|
|
|
|
|
def fail(message: str) -> None:
|
|
print(f"error: {message}", file=sys.stderr)
|
|
raise SystemExit(1)
|
|
|
|
|
|
def zotero_get(path: str) -> Any:
|
|
req = urllib.request.Request(LOCAL_ZOTERO + path, headers={"Zotero-API-Version": "3"})
|
|
with urllib.request.urlopen(req, timeout=30) as response:
|
|
return json.loads(response.read().decode("utf-8", errors="replace"))
|
|
|
|
|
|
def all_top_items() -> list[dict[str, Any]]:
|
|
items: list[dict[str, Any]] = []
|
|
start = 0
|
|
limit = 100
|
|
while True:
|
|
page = zotero_get("/items/top?" + urllib.parse.urlencode({"limit": limit, "start": start}))
|
|
if not page:
|
|
break
|
|
items.extend(page)
|
|
if len(page) < limit:
|
|
break
|
|
start += limit
|
|
return items
|
|
|
|
|
|
def item_summary(item: dict[str, Any]) -> dict[str, Any]:
|
|
data = item.get("data") or {}
|
|
return {
|
|
"key": item.get("key"),
|
|
"title": data.get("title"),
|
|
"itemType": data.get("itemType"),
|
|
"version": item.get("version") or data.get("version"),
|
|
"dateModified": data.get("dateModified"),
|
|
}
|
|
|
|
|
|
def note_is_generated_for_parent(note_html: str, parent_key: str) -> bool:
|
|
return (
|
|
f"items/{parent_key}" in note_html
|
|
or "AI Literature Note" in note_html
|
|
or bool(re.search(r"<h1>[^<]*📝", note_html))
|
|
)
|
|
|
|
|
|
def scan_parent(parent_key: str) -> dict[str, Any]:
|
|
children = zotero_get(f"/items/{urllib.parse.quote(parent_key)}/children")
|
|
generated_notes: list[str] = []
|
|
child_note_count = 0
|
|
for child in children or []:
|
|
data = child.get("data") or {}
|
|
if data.get("itemType") != "note":
|
|
continue
|
|
child_note_count += 1
|
|
note_key = child.get("key")
|
|
if not note_key:
|
|
continue
|
|
full = zotero_get(f"/items/{urllib.parse.quote(note_key)}")
|
|
note_html = ((full.get("data") or {}).get("note") or "")
|
|
if note_is_generated_for_parent(note_html, parent_key):
|
|
generated_notes.append(note_key)
|
|
return {
|
|
"hasGeneratedNote": bool(generated_notes),
|
|
"generatedNoteKeys": generated_notes,
|
|
"childNoteCount": child_note_count,
|
|
}
|
|
|
|
|
|
def load_cache(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {"items": {}}
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError:
|
|
return {"items": {}}
|
|
|
|
|
|
def save_cache(path: Path, cache: dict[str, Any]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
cache["updatedAt"] = datetime.now(timezone.utc).isoformat()
|
|
path.write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
def audit(vault: Path, *, refresh: bool, rebuild: bool, limit: int | None) -> dict[str, Any]:
|
|
cache_path = vault / "00 Templater" / ".zotero-ai-notes-index.json"
|
|
cache = {"items": {}} if rebuild else load_cache(cache_path)
|
|
cached_items: dict[str, Any] = cache.setdefault("items", {})
|
|
top_items = all_top_items()
|
|
if limit:
|
|
top_items = top_items[:limit]
|
|
|
|
current_keys = set()
|
|
scanned = 0
|
|
for item in top_items:
|
|
summary = item_summary(item)
|
|
key = summary.get("key")
|
|
if not key:
|
|
continue
|
|
current_keys.add(key)
|
|
record = cached_items.get(key)
|
|
if rebuild or refresh or not record:
|
|
record = {**summary, **scan_parent(key)}
|
|
cached_items[key] = record
|
|
scanned += 1
|
|
else:
|
|
record.update(summary)
|
|
|
|
for key in list(cached_items):
|
|
if key not in current_keys:
|
|
cached_items[key]["deletedOrNotTopLevel"] = True
|
|
|
|
save_cache(cache_path, cache)
|
|
|
|
active_records = [
|
|
record for key, record in cached_items.items()
|
|
if key in current_keys and not record.get("deletedOrNotTopLevel")
|
|
]
|
|
missing = [record for record in active_records if not record.get("hasGeneratedNote")]
|
|
duplicates = [
|
|
record for record in active_records
|
|
if len(record.get("generatedNoteKeys") or []) > 1
|
|
]
|
|
return {
|
|
"cache": str(cache_path),
|
|
"total": len(active_records),
|
|
"withGeneratedNote": len(active_records) - len(missing),
|
|
"missingCount": len(missing),
|
|
"duplicateGeneratedNoteItems": len(duplicates),
|
|
"scannedParentsThisRun": scanned,
|
|
"missing": missing,
|
|
"duplicates": duplicates,
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--vault", default=str(Path.cwd()), help="Obsidian vault path")
|
|
parser.add_argument("--refresh", action="store_true", help="Rescan all current top-level parents")
|
|
parser.add_argument("--rebuild", action="store_true", help="Discard cache and rescan all current top-level parents")
|
|
parser.add_argument("--limit", type=int, help="Only audit the first N top-level items")
|
|
parser.add_argument("--keys-only", action="store_true", help="Print only missing item keys")
|
|
args = parser.parse_args()
|
|
|
|
result = audit(Path(args.vault).expanduser().resolve(), refresh=args.refresh, rebuild=args.rebuild, limit=args.limit)
|
|
if args.keys_only:
|
|
print(" ".join(record["key"] for record in result["missing"] if record.get("key")))
|
|
else:
|
|
print(json.dumps(result, ensure_ascii=False, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|