commit d008ff4ca29a336485f2e96c81968ad6de3c1d25 Author: qyh15 Date: Tue May 19 21:49:08 2026 +0800 Initial zotero ai notes skill diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b8c8130 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +__pycache__/ +*.py[cod] +.env +*.log +*.tmp +.zotero-ai-notes-index.json diff --git a/SKILL.md b/SKILL.md new file mode 100644 index 0000000..55ae541 --- /dev/null +++ b/SKILL.md @@ -0,0 +1,93 @@ +--- +name: zotero-ai-notes +description: Generate, resume, audit, and maintain AI-generated Zotero child-note literature notes using Zotero Local API metadata, Zotero Web API writes, and AwesomeGPT/DeepSeek-compatible chat settings. Use when the user asks to batch-generate Zotero literature notes, fill Zotero notes from Obsidian templates, use AwesomeGPT or DeepSeek for Zotero papers, skip existing AI notes, resume interrupted Zotero note generation, audit duplicate Zotero AI notes, or package/upgrade this Zotero note workflow. +--- + +# Zotero AI Notes + +Use this skill to create Zotero child notes from local Zotero items with the user's literature-note templates. + +Core script: + +```powershell +py "$env:USERPROFILE\.codex\skills\zotero-ai-notes\scripts\generate_zotero_ai_note.py" --vault "C:\Users\qyh15\Documents\Obsidian Vault" --item-key SXAIQUJT --skip-existing +``` + +Fast audit script: + +```powershell +py "$env:USERPROFILE\.codex\skills\zotero-ai-notes\scripts\audit_zotero_ai_notes.py" --vault "C:\Users\qyh15\Documents\Obsidian Vault" --keys-only +``` + +## Requirements + +- Zotero Desktop must be open and Local API enabled at `http://127.0.0.1:23119`. +- The vault must contain `00 Templater` with: + - `01 ...md` research-article template + - `02 ...md` review-article template + - `03 ...md` AI prompt +- `ZOTERO_API_KEY` must be available in the process environment for write access. +- LLM settings come from either: + - `AWESOMEGPT_API_KEY`, `AWESOMEGPT_BASE_URL`, `AWESOMEGPT_MODEL`, or + - AwesomeGPT preferences in the default Zotero profile. + +Never store API keys in the skill. If keys appear in chat or logs, tell the user to rotate them. + +## Common Workflows + +### Single item + +```powershell +py "$env:USERPROFILE\.codex\skills\zotero-ai-notes\scripts\generate_zotero_ai_note.py" --vault "C:\Users\qyh15\Documents\Obsidian Vault" --item-key SXAIQUJT --skip-existing +``` + +Set `ZOTERO_API_KEY` in the current shell before running live writes, but do not print or paste the value into logs or chat. + +### Multiple item keys + +```powershell +py "$env:USERPROFILE\.codex\skills\zotero-ai-notes\scripts\generate_zotero_ai_note.py" --vault "C:\Users\qyh15\Documents\Obsidian Vault" --item-keys "SXAIQUJT X7GJZ627 ZCZXGRAM" --limit 0 --skip-existing --fulltext-chars 4000 +``` + +### Whole library + +Use only after the user explicitly approves library-wide writes. + +```powershell +py "$env:USERPROFILE\.codex\skills\zotero-ai-notes\scripts\generate_zotero_ai_note.py" --vault "C:\Users\qyh15\Documents\Obsidian Vault" --all --limit 0 --skip-existing --fulltext-chars 4000 +``` + +For long runs, prefer 20-30 item batches. If a run times out, rerun with `--skip-existing`; the script resumes by checking child notes for the source Zotero item link. + +### Find items missing generated notes + +Do this deterministically; do not ask DeepSeek or another LLM to compare library state. + +First build or refresh the cache: + +```powershell +py "$env:USERPROFILE\.codex\skills\zotero-ai-notes\scripts\audit_zotero_ai_notes.py" --vault "C:\Users\qyh15\Documents\Obsidian Vault" --rebuild +``` + +Later fast checks: + +```powershell +py "$env:USERPROFILE\.codex\skills\zotero-ai-notes\scripts\audit_zotero_ai_notes.py" --vault "C:\Users\qyh15\Documents\Obsidian Vault" --keys-only +``` + +The cache lives at `\00 Templater\.zotero-ai-notes-index.json`. Use `--refresh` or `--rebuild` when notes were edited outside this workflow. + +## Operating Rules + +- For live writes, state that Zotero child notes will be created. +- Use `--dry-run` for first-time validation or template changes. +- Use `--fulltext-chars 4000` for cheap, lightweight notes; increase only when the user asks for deeper notes. +- Do not add visible machine markers to note bodies. Use the Zotero item link for duplicate detection. +- Do not delete duplicate notes unless the user explicitly requests cleanup. +- Use `audit_zotero_ai_notes.py` for missing-note checks. Reserve DeepSeek for generating or improving note content. +- If PowerShell/cmd output has Unicode issues, set `PYTHONIOENCODING=utf-8` or rely on script UTF-8 reconfiguration. +- If PDF full text is unavailable, allow the script to fall back to metadata, abstract, BibTeX, and optional local PDF extraction if a Python PDF library exists. + +## Upgrade Points + +Read [references/upgrade-plan.md](references/upgrade-plan.md) before changing behavior, adding cleanup, adding collection targeting, or improving note quality. diff --git a/agents/openai.yaml b/agents/openai.yaml new file mode 100644 index 0000000..14a7e6a --- /dev/null +++ b/agents/openai.yaml @@ -0,0 +1,4 @@ +interface: + display_name: "Zotero AI Notes" + short_description: "Batch AI notes into Zotero child notes" + default_prompt: "Use $zotero-ai-notes to generate Zotero child-note literature notes from my vault templates." diff --git a/references/upgrade-plan.md b/references/upgrade-plan.md new file mode 100644 index 0000000..51ddde5 --- /dev/null +++ b/references/upgrade-plan.md @@ -0,0 +1,32 @@ +# Upgrade Plan + +Keep upgrades backward compatible with the current command line. + +## Near-term additions + +- Add `--collection` to process one Zotero collection by name or key. +- Add `--delete-duplicates` with a dry-run default and explicit confirmation requirement. +- Add `--report-out ` to save run summaries without dumping full note HTML to terminal. +- Add retry/backoff for Zotero Web API writes and LLM calls. +- Add `--provider ` to select AwesomeGPT providers other than DeepSeek. +- Add incremental audit invalidation based on Zotero item/library version headers, so only changed parents are rescanned after the initial cache build. + +## Quality upgrades + +- Install or use `pypdf`/`PyMuPDF` when available for better PDF text extraction. +- Add a template variable layer so the script can fill title/key/link before sending to the model. +- Split prompts for article-type classification and note generation when the user wants stronger consistency. +- For review articles, only request representative-work BibTeX when full references are available. + +## Safety upgrades + +- Never log API keys. +- Do not write keys into `.env` automatically. +- For destructive actions such as duplicate cleanup, require an explicit user request and perform a dry run first. +- Prefer `--skip-existing` for all batch jobs. + +## Known constraints + +- Zotero Local API is read-only; child-note creation uses Zotero Web API. +- AwesomeGPT's in-Zotero connector may require a session secret and should not be treated as a stable external API. +- Some Zotero child-note list responses may omit full note HTML; duplicate detection should fetch full note items when needed. diff --git a/scripts/audit_zotero_ai_notes.py b/scripts/audit_zotero_ai_notes.py new file mode 100644 index 0000000..a554b7c --- /dev/null +++ b/scripts/audit_zotero_ai_notes.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 +"""Audit which Zotero top-level items have generated AI child notes. + +This script is deterministic and does not call any LLM. +It can build a local cache for fast repeated missing-note checks. +""" + +from __future__ import annotations + +import argparse +import json +import re +import sys +import urllib.parse +import urllib.request +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +try: + sys.stdout.reconfigure(encoding="utf-8") + sys.stderr.reconfigure(encoding="utf-8") +except Exception: + pass + +LOCAL_ZOTERO = "http://127.0.0.1:23119/api/users/0" + + +def fail(message: str) -> None: + print(f"error: {message}", file=sys.stderr) + raise SystemExit(1) + + +def zotero_get(path: str) -> Any: + req = urllib.request.Request(LOCAL_ZOTERO + path, headers={"Zotero-API-Version": "3"}) + with urllib.request.urlopen(req, timeout=30) as response: + return json.loads(response.read().decode("utf-8", errors="replace")) + + +def all_top_items() -> list[dict[str, Any]]: + items: list[dict[str, Any]] = [] + start = 0 + limit = 100 + while True: + page = zotero_get("/items/top?" + urllib.parse.urlencode({"limit": limit, "start": start})) + if not page: + break + items.extend(page) + if len(page) < limit: + break + start += limit + return items + + +def item_summary(item: dict[str, Any]) -> dict[str, Any]: + data = item.get("data") or {} + return { + "key": item.get("key"), + "title": data.get("title"), + "itemType": data.get("itemType"), + "version": item.get("version") or data.get("version"), + "dateModified": data.get("dateModified"), + } + + +def note_is_generated_for_parent(note_html: str, parent_key: str) -> bool: + return ( + f"items/{parent_key}" in note_html + or "AI Literature Note" in note_html + or bool(re.search(r"

[^<]*📝", note_html)) + ) + + +def scan_parent(parent_key: str) -> dict[str, Any]: + children = zotero_get(f"/items/{urllib.parse.quote(parent_key)}/children") + generated_notes: list[str] = [] + child_note_count = 0 + for child in children or []: + data = child.get("data") or {} + if data.get("itemType") != "note": + continue + child_note_count += 1 + note_key = child.get("key") + if not note_key: + continue + full = zotero_get(f"/items/{urllib.parse.quote(note_key)}") + note_html = ((full.get("data") or {}).get("note") or "") + if note_is_generated_for_parent(note_html, parent_key): + generated_notes.append(note_key) + return { + "hasGeneratedNote": bool(generated_notes), + "generatedNoteKeys": generated_notes, + "childNoteCount": child_note_count, + } + + +def load_cache(path: Path) -> dict[str, Any]: + if not path.exists(): + return {"items": {}} + try: + return json.loads(path.read_text(encoding="utf-8")) + except json.JSONDecodeError: + return {"items": {}} + + +def save_cache(path: Path, cache: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + cache["updatedAt"] = datetime.now(timezone.utc).isoformat() + path.write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8") + + +def audit(vault: Path, *, refresh: bool, rebuild: bool, limit: int | None) -> dict[str, Any]: + cache_path = vault / "00 Templater" / ".zotero-ai-notes-index.json" + cache = {"items": {}} if rebuild else load_cache(cache_path) + cached_items: dict[str, Any] = cache.setdefault("items", {}) + top_items = all_top_items() + if limit: + top_items = top_items[:limit] + + current_keys = set() + scanned = 0 + for item in top_items: + summary = item_summary(item) + key = summary.get("key") + if not key: + continue + current_keys.add(key) + record = cached_items.get(key) + if rebuild or refresh or not record: + record = {**summary, **scan_parent(key)} + cached_items[key] = record + scanned += 1 + else: + record.update(summary) + + for key in list(cached_items): + if key not in current_keys: + cached_items[key]["deletedOrNotTopLevel"] = True + + save_cache(cache_path, cache) + + active_records = [ + record for key, record in cached_items.items() + if key in current_keys and not record.get("deletedOrNotTopLevel") + ] + missing = [record for record in active_records if not record.get("hasGeneratedNote")] + duplicates = [ + record for record in active_records + if len(record.get("generatedNoteKeys") or []) > 1 + ] + return { + "cache": str(cache_path), + "total": len(active_records), + "withGeneratedNote": len(active_records) - len(missing), + "missingCount": len(missing), + "duplicateGeneratedNoteItems": len(duplicates), + "scannedParentsThisRun": scanned, + "missing": missing, + "duplicates": duplicates, + } + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--vault", default=str(Path.cwd()), help="Obsidian vault path") + parser.add_argument("--refresh", action="store_true", help="Rescan all current top-level parents") + parser.add_argument("--rebuild", action="store_true", help="Discard cache and rescan all current top-level parents") + parser.add_argument("--limit", type=int, help="Only audit the first N top-level items") + parser.add_argument("--keys-only", action="store_true", help="Print only missing item keys") + args = parser.parse_args() + + result = audit(Path(args.vault).expanduser().resolve(), refresh=args.refresh, rebuild=args.rebuild, limit=args.limit) + if args.keys_only: + print(" ".join(record["key"] for record in result["missing"] if record.get("key"))) + else: + print(json.dumps(result, ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/scripts/generate_zotero_ai_note.py b/scripts/generate_zotero_ai_note.py new file mode 100644 index 0000000..5a37667 --- /dev/null +++ b/scripts/generate_zotero_ai_note.py @@ -0,0 +1,561 @@ +#!/usr/bin/env python3 +""" +Generate an AI literature note from Zotero metadata and save it as a Zotero child note. + +Required environment variables: + AWESOMEGPT_API_KEY DeepSeek/OpenAI-compatible API key + AWESOMEGPT_BASE_URL Example: https://api.deepseek.com + AWESOMEGPT_MODEL Example: deepseek-v4-pro + ZOTERO_API_KEY Zotero Web API key with library write permission + +Optional: + ZOTERO_USER_ID If omitted, resolved from /keys/current +""" + +from __future__ import annotations + +import argparse +import html +import importlib.util +import json +import os +import re +import sys +import urllib.error +import urllib.parse +import urllib.request +from pathlib import Path +from typing import Any + +try: + sys.stdout.reconfigure(encoding="utf-8") + sys.stderr.reconfigure(encoding="utf-8") +except Exception: + pass + + +LOCAL_ZOTERO = "http://127.0.0.1:23119/api/users/0" +ZOTERO_WEB = "https://api.zotero.org" +DEFAULT_VAULT = Path.cwd() + + +def fail(message: str) -> None: + print(f"error: {message}", file=sys.stderr) + raise SystemExit(1) + + +def load_dotenv(path: Path) -> None: + if not path.exists(): + return + for raw_line in path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + value = value.strip().strip('"').strip("'") + os.environ.setdefault(key, value) + + +def zotero_profile_prefs() -> Path | None: + profiles_ini = Path.home() / "AppData/Roaming/Zotero/Zotero/profiles.ini" + profiles_root = profiles_ini.parent + if profiles_ini.exists(): + text = profiles_ini.read_text(encoding="utf-8", errors="replace") + blocks = re.split(r"\n(?=\[Profile\d+\])", text) + for block in blocks: + if "Default=1" not in block: + continue + path_match = re.search(r"^Path=(.+)$", block, re.MULTILINE) + relative_match = re.search(r"^IsRelative=(\d+)$", block, re.MULTILINE) + if path_match: + profile_path = Path(path_match.group(1).strip()) + if relative_match and relative_match.group(1) == "1": + profile_path = profiles_root / profile_path + prefs = profile_path / "prefs.js" + if prefs.exists(): + return prefs + profiles_dir = profiles_root / "Profiles" + if profiles_dir.exists(): + for prefs in profiles_dir.glob("*/prefs.js"): + return prefs + return None + + +def load_awesomegpt_prefs(path: Path | None = None) -> None: + if path is None: + path = zotero_profile_prefs() + if path is None: + return + if not path.exists(): + return + text = path.read_text(encoding="utf-8", errors="replace") + prefs: dict[str, Any] = {} + for name, raw_value in re.findall(r'user_pref\("([^"]+)",\s*(.*?)\);', text): + if not name.startswith("extensions.zotero.zoterogpt."): + continue + try: + prefs[name] = json.loads(raw_value) + except json.JSONDecodeError: + continue + + settings_raw = prefs.get("extensions.zotero.zoterogpt.settings") + if isinstance(settings_raw, str): + try: + settings = json.loads(settings_raw) + except json.JSONDecodeError: + settings = {} + else: + settings = {} + + direct_api = prefs.get("extensions.zotero.zoterogpt.api") + direct_model = prefs.get("extensions.zotero.zoterogpt.model") + direct_key = prefs.get("extensions.zotero.zoterogpt.secretKey") + provider = None + if isinstance(settings, dict): + provider = settings.get("DeepSeek") or next( + (value for key, value in settings.items() if key.lower() == "deepseek"), + None, + ) + if isinstance(provider, dict): + os.environ.setdefault("AWESOMEGPT_BASE_URL", provider.get("api") or "") + os.environ.setdefault("AWESOMEGPT_MODEL", provider.get("model") or "") + os.environ.setdefault("AWESOMEGPT_API_KEY", provider.get("secretKey") or "") + if direct_api: + os.environ.setdefault("AWESOMEGPT_BASE_URL", str(direct_api)) + if direct_model: + os.environ.setdefault("AWESOMEGPT_MODEL", str(direct_model)) + if direct_key: + os.environ.setdefault("AWESOMEGPT_API_KEY", str(direct_key)) + + +def http_json( + url: str, + *, + method: str = "GET", + headers: dict[str, str] | None = None, + payload: Any = None, + timeout: int = 90, +) -> Any: + body = None + req_headers = dict(headers or {}) + if payload is not None: + body = json.dumps(payload, ensure_ascii=False).encode("utf-8") + req_headers.setdefault("Content-Type", "application/json") + request = urllib.request.Request(url, data=body, method=method, headers=req_headers) + try: + with urllib.request.urlopen(request, timeout=timeout) as response: + text = response.read().decode("utf-8", errors="replace") + if not text: + return None + return json.loads(text) + except urllib.error.HTTPError as exc: + detail = exc.read().decode("utf-8", errors="replace") + fail(f"{method} {url} failed: HTTP {exc.code}: {detail[:800]}") + except urllib.error.URLError as exc: + fail(f"{method} {url} failed: {exc}") + + +def zotero_local(path: str) -> Any: + url = LOCAL_ZOTERO + path + return http_json(url, headers={"Zotero-API-Version": "3"}, timeout=20) + + +def zotero_local_optional(path: str) -> Any | None: + url = LOCAL_ZOTERO + path + request = urllib.request.Request(url, headers={"Zotero-API-Version": "3"}) + try: + with urllib.request.urlopen(request, timeout=20) as response: + text = response.read().decode("utf-8", errors="replace") + return json.loads(text) if text else None + except (urllib.error.HTTPError, urllib.error.URLError, json.JSONDecodeError): + return None + + +def zotero_web(path: str, *, method: str = "GET", payload: Any = None) -> Any: + api_key = os.environ.get("ZOTERO_API_KEY") + if not api_key: + fail("ZOTERO_API_KEY is required to write Zotero child notes") + url = ZOTERO_WEB + path + return http_json( + url, + method=method, + headers={"Zotero-API-Version": "3", "Zotero-API-Key": api_key}, + payload=payload, + timeout=60, + ) + + +def resolve_user_id() -> str: + explicit = os.environ.get("ZOTERO_USER_ID") + if explicit: + return explicit + current = zotero_web("/keys/current") + user_id = current.get("userID") if isinstance(current, dict) else None + if not user_id: + fail("could not resolve Zotero userID from /keys/current") + return str(user_id) + + +def find_item(item_key: str | None, query: str | None) -> dict[str, Any]: + if item_key: + return zotero_local(f"/items/{urllib.parse.quote(item_key)}") + if not query: + fail("provide --item-key or --query") + qs = urllib.parse.urlencode({"q": query, "limit": 5}) + matches = zotero_local(f"/items/top?{qs}") + if not matches: + fail(f"no Zotero item matched query: {query}") + if len(matches) > 1: + print(f"warning: {len(matches)} matches; using {matches[0].get('key')}", file=sys.stderr) + return matches[0] + + +def find_items(keys: list[str], query: str | None, limit: int) -> list[dict[str, Any]]: + items: list[dict[str, Any]] = [] + for key in keys: + items.append(find_item(key, None)) + if query: + qs = urllib.parse.urlencode({"q": query, "limit": limit}) + matches = zotero_local(f"/items/top?{qs}") + seen = {item.get("key") for item in items} + for item in matches or []: + if item.get("key") not in seen: + items.append(item) + seen.add(item.get("key")) + if not items: + fail("provide --item-key/--item-keys or --query") + return items[:limit] if limit else items + + +def all_top_items(limit: int = 0) -> list[dict[str, Any]]: + items: list[dict[str, Any]] = [] + start = 0 + page_limit = 100 + while True: + qs = urllib.parse.urlencode({"limit": page_limit, "start": start}) + page = zotero_local(f"/items/top?{qs}") + if not page: + break + items.extend(page) + if limit and len(items) >= limit: + return items[:limit] + if len(page) < page_limit: + break + start += page_limit + return items + + +def has_existing_ai_note(parent_key: str) -> bool: + children = zotero_local(f"/items/{urllib.parse.quote(parent_key)}/children") + for child in children or []: + data = child.get("data") or {} + if data.get("itemType") != "note": + continue + note = data.get("note") or "" + if not note and child.get("key"): + full_note = zotero_local_optional(f"/items/{urllib.parse.quote(child['key'])}") + note = ((full_note or {}).get("data") or {}).get("note") or "" + if "AI文献笔记" in note or "AI Literature Note" in note or f"items/{parent_key}" in note: + return True + return False + + +def export_bibtex(item_key: str) -> str: + qs = urllib.parse.urlencode({"itemKey": item_key, "format": "bibtex"}) + url = f"{LOCAL_ZOTERO}/items?{qs}" + request = urllib.request.Request(url, headers={"Zotero-API-Version": "3"}) + with urllib.request.urlopen(request, timeout=20) as response: + return response.read().decode("utf-8", errors="replace").strip() + + +def local_fulltext(parent_key: str, max_chars: int) -> str: + children = zotero_local(f"/items/{urllib.parse.quote(parent_key)}/children") + parts: list[str] = [] + for child in children or []: + data = child.get("data") or {} + if data.get("itemType") != "attachment": + continue + key = child.get("key") + if not key: + continue + fulltext = zotero_local_optional(f"/items/{urllib.parse.quote(key)}/fulltext") + if not fulltext: + continue + content = fulltext.get("content") if isinstance(fulltext, dict) else "" + if content: + parts.append(content) + if sum(len(p) for p in parts) >= max_chars: + break + if not parts: + for child in children or []: + data = child.get("data") or {} + if data.get("itemType") != "attachment" or data.get("contentType") != "application/pdf": + continue + path = data.get("path") + if not path: + key = child.get("key") + if key: + full = zotero_local_optional(f"/items/{urllib.parse.quote(key)}") + path = ((full or {}).get("data") or {}).get("path") + if not path: + continue + extracted = extract_pdf_text(Path(path), max_chars=max_chars) + if extracted: + parts.append(extracted) + break + text = "\n\n".join(parts) + return text[:max_chars] + + +def template_paths(vault: Path) -> tuple[Path, Path, Path]: + templates = vault / "00 Templater" + if not templates.exists(): + fail(f"template directory not found: {templates}") + files = {path.name: path for path in templates.glob("*.md")} + + def match(prefix: str, contains: str) -> Path: + candidates = [ + path for name, path in files.items() + if name.startswith(prefix) and contains in name + ] + if not candidates: + candidates = [path for name, path in files.items() if name.startswith(prefix)] + if not candidates: + fail(f"missing template file starting with {prefix} in {templates}") + return candidates[0] + + return ( + match("03", "AI"), + match("01", ""), + match("02", ""), + ) + + +def extract_pdf_text(path: Path, max_chars: int) -> str: + if not path.exists(): + return "" + max_pages = 8 + if importlib.util.find_spec("fitz"): + import fitz # type: ignore + + chunks = [] + with fitz.open(str(path)) as doc: + for page in doc[:max_pages]: + chunks.append(page.get_text("text")) + if sum(len(chunk) for chunk in chunks) >= max_chars: + break + return "\n".join(chunks)[:max_chars] + if importlib.util.find_spec("pypdf"): + from pypdf import PdfReader # type: ignore + + reader = PdfReader(str(path)) + chunks = [] + for page in reader.pages[:max_pages]: + chunks.append(page.extract_text() or "") + if sum(len(chunk) for chunk in chunks) >= max_chars: + break + return "\n".join(chunks)[:max_chars] + if importlib.util.find_spec("PyPDF2"): + from PyPDF2 import PdfReader # type: ignore + + reader = PdfReader(str(path)) + chunks = [] + for page in reader.pages[:max_pages]: + chunks.append(page.extract_text() or "") + if sum(len(chunk) for chunk in chunks) >= max_chars: + break + return "\n".join(chunks)[:max_chars] + return "" + + +def creators_text(creators: list[dict[str, Any]]) -> str: + names = [] + for creator in creators: + name = creator.get("name") + if not name: + name = " ".join(x for x in [creator.get("firstName"), creator.get("lastName")] if x) + if name: + names.append(name) + return "; ".join(names) + + +def build_prompt(item: dict[str, Any], bibtex: str, fulltext: str, vault: Path) -> str: + data = item.get("data") or {} + prompt_path, research_template_path, review_template_path = template_paths(vault) + prompt = prompt_path.read_text(encoding="utf-8") + research_template = research_template_path.read_text(encoding="utf-8") + review_template = review_template_path.read_text(encoding="utf-8") + metadata = { + "zoteroKey": item.get("key"), + "title": data.get("title"), + "itemType": data.get("itemType"), + "authors": creators_text(data.get("creators") or []), + "publicationTitle": data.get("publicationTitle"), + "date": data.get("date"), + "DOI": data.get("DOI"), + "url": data.get("url"), + "abstractNote": data.get("abstractNote"), + } + source = { + "metadata": metadata, + "bibtex": bibtex, + "indexedFullTextExcerpt": fulltext, + } + return "\n\n".join( + [ + prompt, + "请先判断文献类型:综述型文献或研究型文献。", + "若为研究型文献,请严格填充下面的研究型模板:", + research_template, + "若为综述型文献,请严格填充下面的综述型模板:", + review_template, + "请将模板中的 ${topItem.getField('title')} 替换为真实题名,将 ${topItem.key} 替换为 Zotero key。", + "只输出最终 Markdown 笔记,不要输出解释、判断过程或代码围栏。", + "文献材料如下:", + json.dumps(source, ensure_ascii=False, indent=2), + ] + ) + + +def call_llm(prompt: str) -> str: + api_key = os.environ.get("AWESOMEGPT_API_KEY") + base_url = (os.environ.get("AWESOMEGPT_BASE_URL") or "").rstrip("/") + model = os.environ.get("AWESOMEGPT_MODEL") + if not api_key or not base_url or not model: + fail("AWESOMEGPT_API_KEY, AWESOMEGPT_BASE_URL, and AWESOMEGPT_MODEL are required") + if not base_url.endswith("/v1"): + base_url = base_url + "/v1" + payload = { + "model": model, + "messages": [ + {"role": "system", "content": "You are a materials science literature-note assistant. Output Simplified Chinese Markdown only."}, + {"role": "user", "content": prompt}, + ], + "temperature": 0.3, + } + response = http_json( + base_url + "/chat/completions", + method="POST", + headers={"Authorization": f"Bearer {api_key}"}, + payload=payload, + timeout=180, + ) + try: + return response["choices"][0]["message"]["content"].strip() + except Exception as exc: + fail(f"unexpected LLM response shape: {exc}; response={response}") + + +def markdown_to_zotero_html(markdown: str) -> str: + lines = markdown.strip().splitlines() + out: list[str] = [] + in_pre = False + pre_lines: list[str] = [] + for line in lines: + stripped = line.strip() + if stripped.startswith("```") or stripped.startswith("~~~"): + if in_pre: + out.append("
" + html.escape("\n".join(pre_lines)) + "
") + pre_lines = [] + in_pre = False + else: + in_pre = True + continue + if in_pre: + pre_lines.append(line) + continue + if not stripped: + continue + heading = re.match(r"^(#{1,6})\s+(.+)$", stripped) + if heading: + level = min(len(heading.group(1)), 6) + out.append(f"{html.escape(heading.group(2))}") + elif stripped.startswith(">"): + out.append(f"
{html.escape(stripped.lstrip('> ').strip())}
") + elif re.match(r"^[-*]\s+", stripped): + out.append(f"

{html.escape(stripped)}

") + else: + out.append(f"

{html.escape(stripped)}

") + if in_pre: + out.append("
" + html.escape("\n".join(pre_lines)) + "
") + return "\n".join(out) + + +def create_child_note(user_id: str, parent_key: str, markdown: str, dry_run: bool) -> Any: + note_html = markdown_to_zotero_html(markdown) + payload = [{"itemType": "note", "parentItem": parent_key, "note": note_html}] + if dry_run: + return {"dryRun": True, "payload": payload} + return zotero_web(f"/users/{user_id}/items", method="POST", payload=payload) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--item-key", action="append", default=[], help="Zotero top-level item key; can be repeated") + parser.add_argument("--item-keys", help="Comma/space separated Zotero top-level item keys") + parser.add_argument("--query", help="Search query; first top-level match is used") + parser.add_argument("--all", action="store_true", help="Process all top-level Zotero items") + parser.add_argument("--limit", type=int, default=1, help="Maximum number of items to process") + parser.add_argument("--fulltext-chars", type=int, default=12000) + parser.add_argument("--skip-existing", action="store_true", help="Skip items that already have a generated AI child note") + parser.add_argument("--dry-run", action="store_true", help="generate but do not write Zotero note") + parser.add_argument("--vault", default=str(DEFAULT_VAULT), help="Obsidian vault containing 00 Templater") + parser.add_argument("--env-file", help="Optional .env path; defaults to /.env") + args = parser.parse_args() + + vault = Path(args.vault).expanduser().resolve() + load_dotenv(Path(args.env_file).expanduser().resolve() if args.env_file else vault / ".env") + load_awesomegpt_prefs() + + keys = list(args.item_key) + if args.item_keys: + keys.extend([key for key in re.split(r"[\s,]+", args.item_keys.strip()) if key]) + if args.all: + items = all_top_items(args.limit if args.limit != 1 else 0) + else: + items = find_items(keys, args.query, args.limit) + user_id = resolve_user_id() + results = [] + for index, item in enumerate(items, 1): + try: + key = item.get("key") + title = (item.get("data") or {}).get("title") + if not key: + results.append({"index": index, "status": "skipped", "reason": "missing key", "title": title}) + continue + if args.skip_existing and has_existing_ai_note(key): + results.append({"index": index, "itemKey": key, "title": title, "status": "skipped", "reason": "existing AI note"}) + continue + print(f"[{index}/{len(items)}] generating {key}: {title}", file=sys.stderr) + bibtex = export_bibtex(key) + fulltext = local_fulltext(key, args.fulltext_chars) + prompt = build_prompt(item, bibtex, fulltext, vault) + markdown = call_llm(prompt) + result = create_child_note(user_id, key, markdown, args.dry_run) + results.append({"index": index, "itemKey": key, "title": title, "status": "ok", "result": result}) + except SystemExit as exc: + results.append({ + "index": index, + "itemKey": item.get("key"), + "title": (item.get("data") or {}).get("title"), + "status": "error", + "error": str(exc), + }) + print(f"[{index}/{len(items)}] error; continuing", file=sys.stderr) + continue + except Exception as exc: + results.append({ + "index": index, + "itemKey": item.get("key"), + "title": (item.get("data") or {}).get("title"), + "status": "error", + "error": repr(exc), + }) + print(f"[{index}/{len(items)}] error; continuing: {exc}", file=sys.stderr) + continue + print(json.dumps(results, ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main()