#!/usr/bin/env python3 """ Generate an AI literature note from Zotero metadata and save it as a Zotero child note. Required environment variables: AWESOMEGPT_API_KEY DeepSeek/OpenAI-compatible API key AWESOMEGPT_BASE_URL Example: https://api.deepseek.com AWESOMEGPT_MODEL Example: deepseek-v4-pro ZOTERO_API_KEY Zotero Web API key with library write permission Optional: ZOTERO_USER_ID If omitted, resolved from /keys/current Private local config: \config\config.local.json, created by scripts\init_private_config.py """ from __future__ import annotations import argparse import html import importlib.util import json import os import re import sys import urllib.error import urllib.parse import urllib.request from pathlib import Path from typing import Any try: sys.stdout.reconfigure(encoding="utf-8") sys.stderr.reconfigure(encoding="utf-8") except Exception: pass LOCAL_ZOTERO = "http://127.0.0.1:23119/api/users/0" ZOTERO_WEB = "https://api.zotero.org" DEFAULT_VAULT = Path.cwd() SKILL_DIR = Path(__file__).resolve().parents[1] DEFAULT_PRIVATE_CONFIG = SKILL_DIR / "config" / "config.local.json" def fail(message: str) -> None: print(f"error: {message}", file=sys.stderr) raise SystemExit(1) def load_dotenv(path: Path) -> None: if not path.exists(): return for raw_line in path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") os.environ.setdefault(key, value) def load_private_config(path: Path) -> None: if not path.exists(): return try: config = json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError as exc: fail(f"invalid private config JSON at {path}: {exc}") zotero = config.get("zotero", {}) if isinstance(config.get("zotero"), dict) else {} awesomegpt = config.get("awesomegpt", {}) if isinstance(config.get("awesomegpt"), dict) else {} mappings = { "ZOTERO_API_KEY": zotero.get("api_key"), "ZOTERO_USER_ID": zotero.get("user_id"), "AWESOMEGPT_API_KEY": awesomegpt.get("api_key"), "AWESOMEGPT_BASE_URL": awesomegpt.get("base_url"), "AWESOMEGPT_MODEL": awesomegpt.get("model"), } for key, value in mappings.items(): if value: os.environ.setdefault(key, str(value)) def zotero_profile_prefs() -> Path | None: profiles_ini = Path.home() / "AppData/Roaming/Zotero/Zotero/profiles.ini" profiles_root = profiles_ini.parent if profiles_ini.exists(): text = profiles_ini.read_text(encoding="utf-8", errors="replace") blocks = re.split(r"\n(?=\[Profile\d+\])", text) for block in blocks: if "Default=1" not in block: continue path_match = re.search(r"^Path=(.+)$", block, re.MULTILINE) relative_match = re.search(r"^IsRelative=(\d+)$", block, re.MULTILINE) if path_match: profile_path = Path(path_match.group(1).strip()) if relative_match and relative_match.group(1) == "1": profile_path = profiles_root / profile_path prefs = profile_path / "prefs.js" if prefs.exists(): return prefs profiles_dir = profiles_root / "Profiles" if profiles_dir.exists(): for prefs in profiles_dir.glob("*/prefs.js"): return prefs return None def load_awesomegpt_prefs(path: Path | None = None) -> None: if path is None: path = zotero_profile_prefs() if path is None: return if not path.exists(): return text = path.read_text(encoding="utf-8", errors="replace") prefs: dict[str, Any] = {} for name, raw_value in re.findall(r'user_pref\("([^"]+)",\s*(.*?)\);', text): if not name.startswith("extensions.zotero.zoterogpt."): continue try: prefs[name] = json.loads(raw_value) except json.JSONDecodeError: continue settings_raw = prefs.get("extensions.zotero.zoterogpt.settings") if isinstance(settings_raw, str): try: settings = json.loads(settings_raw) except json.JSONDecodeError: settings = {} else: settings = {} direct_api = prefs.get("extensions.zotero.zoterogpt.api") direct_model = prefs.get("extensions.zotero.zoterogpt.model") direct_key = prefs.get("extensions.zotero.zoterogpt.secretKey") provider = None if isinstance(settings, dict): provider = settings.get("DeepSeek") or next( (value for key, value in settings.items() if key.lower() == "deepseek"), None, ) if isinstance(provider, dict): os.environ.setdefault("AWESOMEGPT_BASE_URL", provider.get("api") or "") os.environ.setdefault("AWESOMEGPT_MODEL", provider.get("model") or "") os.environ.setdefault("AWESOMEGPT_API_KEY", provider.get("secretKey") or "") if direct_api: os.environ.setdefault("AWESOMEGPT_BASE_URL", str(direct_api)) if direct_model: os.environ.setdefault("AWESOMEGPT_MODEL", str(direct_model)) if direct_key: os.environ.setdefault("AWESOMEGPT_API_KEY", str(direct_key)) def http_json( url: str, *, method: str = "GET", headers: dict[str, str] | None = None, payload: Any = None, timeout: int = 90, ) -> Any: body = None req_headers = dict(headers or {}) if payload is not None: body = json.dumps(payload, ensure_ascii=False).encode("utf-8") req_headers.setdefault("Content-Type", "application/json") request = urllib.request.Request(url, data=body, method=method, headers=req_headers) try: with urllib.request.urlopen(request, timeout=timeout) as response: text = response.read().decode("utf-8", errors="replace") if not text: return None return json.loads(text) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") fail(f"{method} {url} failed: HTTP {exc.code}: {detail[:800]}") except urllib.error.URLError as exc: fail(f"{method} {url} failed: {exc}") def zotero_local(path: str) -> Any: url = LOCAL_ZOTERO + path return http_json(url, headers={"Zotero-API-Version": "3"}, timeout=20) def zotero_local_optional(path: str) -> Any | None: url = LOCAL_ZOTERO + path request = urllib.request.Request(url, headers={"Zotero-API-Version": "3"}) try: with urllib.request.urlopen(request, timeout=20) as response: text = response.read().decode("utf-8", errors="replace") return json.loads(text) if text else None except (urllib.error.HTTPError, urllib.error.URLError, json.JSONDecodeError): return None def zotero_web(path: str, *, method: str = "GET", payload: Any = None) -> Any: api_key = os.environ.get("ZOTERO_API_KEY") if not api_key: fail("ZOTERO_API_KEY is required to write Zotero child notes") url = ZOTERO_WEB + path return http_json( url, method=method, headers={"Zotero-API-Version": "3", "Zotero-API-Key": api_key}, payload=payload, timeout=60, ) def resolve_user_id() -> str: explicit = os.environ.get("ZOTERO_USER_ID") if explicit: return explicit current = zotero_web("/keys/current") user_id = current.get("userID") if isinstance(current, dict) else None if not user_id: fail("could not resolve Zotero userID from /keys/current") return str(user_id) def find_item(item_key: str | None, query: str | None) -> dict[str, Any]: if item_key: return zotero_local(f"/items/{urllib.parse.quote(item_key)}") if not query: fail("provide --item-key or --query") qs = urllib.parse.urlencode({"q": query, "limit": 5}) matches = zotero_local(f"/items/top?{qs}") if not matches: fail(f"no Zotero item matched query: {query}") if len(matches) > 1: print(f"warning: {len(matches)} matches; using {matches[0].get('key')}", file=sys.stderr) return matches[0] def find_items(keys: list[str], query: str | None, limit: int) -> list[dict[str, Any]]: items: list[dict[str, Any]] = [] for key in keys: items.append(find_item(key, None)) if query: qs = urllib.parse.urlencode({"q": query, "limit": limit}) matches = zotero_local(f"/items/top?{qs}") seen = {item.get("key") for item in items} for item in matches or []: if item.get("key") not in seen: items.append(item) seen.add(item.get("key")) if not items: fail("provide --item-key/--item-keys or --query") return items[:limit] if limit else items def all_top_items(limit: int = 0) -> list[dict[str, Any]]: items: list[dict[str, Any]] = [] start = 0 page_limit = 100 while True: qs = urllib.parse.urlencode({"limit": page_limit, "start": start}) page = zotero_local(f"/items/top?{qs}") if not page: break items.extend(page) if limit and len(items) >= limit: return items[:limit] if len(page) < page_limit: break start += page_limit return items def has_existing_ai_note(parent_key: str) -> bool: children = zotero_local(f"/items/{urllib.parse.quote(parent_key)}/children") for child in children or []: data = child.get("data") or {} if data.get("itemType") != "note": continue note = data.get("note") or "" if not note and child.get("key"): full_note = zotero_local_optional(f"/items/{urllib.parse.quote(child['key'])}") note = ((full_note or {}).get("data") or {}).get("note") or "" if "AI文献笔记" in note or "AI Literature Note" in note or f"items/{parent_key}" in note: return True return False def export_bibtex(item_key: str) -> str: qs = urllib.parse.urlencode({"itemKey": item_key, "format": "bibtex"}) url = f"{LOCAL_ZOTERO}/items?{qs}" request = urllib.request.Request(url, headers={"Zotero-API-Version": "3"}) with urllib.request.urlopen(request, timeout=20) as response: return response.read().decode("utf-8", errors="replace").strip() def local_fulltext(parent_key: str, max_chars: int) -> str: children = zotero_local(f"/items/{urllib.parse.quote(parent_key)}/children") parts: list[str] = [] for child in children or []: data = child.get("data") or {} if data.get("itemType") != "attachment": continue key = child.get("key") if not key: continue fulltext = zotero_local_optional(f"/items/{urllib.parse.quote(key)}/fulltext") if not fulltext: continue content = fulltext.get("content") if isinstance(fulltext, dict) else "" if content: parts.append(content) if sum(len(p) for p in parts) >= max_chars: break if not parts: for child in children or []: data = child.get("data") or {} if data.get("itemType") != "attachment" or data.get("contentType") != "application/pdf": continue path = data.get("path") if not path: key = child.get("key") if key: full = zotero_local_optional(f"/items/{urllib.parse.quote(key)}") path = ((full or {}).get("data") or {}).get("path") if not path: continue extracted = extract_pdf_text(Path(path), max_chars=max_chars) if extracted: parts.append(extracted) break text = "\n\n".join(parts) return text[:max_chars] def template_paths(vault: Path) -> tuple[Path, Path, Path]: templates = vault / "00 Templater" if not templates.exists(): fail(f"template directory not found: {templates}") files = {path.name: path for path in templates.glob("*.md")} def match(prefix: str, contains: str) -> Path: candidates = [ path for name, path in files.items() if name.startswith(prefix) and contains in name ] if not candidates: candidates = [path for name, path in files.items() if name.startswith(prefix)] if not candidates: fail(f"missing template file starting with {prefix} in {templates}") return candidates[0] return ( match("03", "AI"), match("01", ""), match("02", ""), ) def extract_pdf_text(path: Path, max_chars: int) -> str: if not path.exists(): return "" max_pages = 8 if importlib.util.find_spec("fitz"): import fitz # type: ignore chunks = [] with fitz.open(str(path)) as doc: for page in doc[:max_pages]: chunks.append(page.get_text("text")) if sum(len(chunk) for chunk in chunks) >= max_chars: break return "\n".join(chunks)[:max_chars] if importlib.util.find_spec("pypdf"): from pypdf import PdfReader # type: ignore reader = PdfReader(str(path)) chunks = [] for page in reader.pages[:max_pages]: chunks.append(page.extract_text() or "") if sum(len(chunk) for chunk in chunks) >= max_chars: break return "\n".join(chunks)[:max_chars] if importlib.util.find_spec("PyPDF2"): from PyPDF2 import PdfReader # type: ignore reader = PdfReader(str(path)) chunks = [] for page in reader.pages[:max_pages]: chunks.append(page.extract_text() or "") if sum(len(chunk) for chunk in chunks) >= max_chars: break return "\n".join(chunks)[:max_chars] return "" def creators_text(creators: list[dict[str, Any]]) -> str: names = [] for creator in creators: name = creator.get("name") if not name: name = " ".join(x for x in [creator.get("firstName"), creator.get("lastName")] if x) if name: names.append(name) return "; ".join(names) def build_prompt(item: dict[str, Any], bibtex: str, fulltext: str, vault: Path) -> str: data = item.get("data") or {} prompt_path, research_template_path, review_template_path = template_paths(vault) prompt = prompt_path.read_text(encoding="utf-8") research_template = research_template_path.read_text(encoding="utf-8") review_template = review_template_path.read_text(encoding="utf-8") metadata = { "zoteroKey": item.get("key"), "title": data.get("title"), "itemType": data.get("itemType"), "authors": creators_text(data.get("creators") or []), "publicationTitle": data.get("publicationTitle"), "date": data.get("date"), "DOI": data.get("DOI"), "url": data.get("url"), "abstractNote": data.get("abstractNote"), } source = { "metadata": metadata, "bibtex": bibtex, "indexedFullTextExcerpt": fulltext, } return "\n\n".join( [ prompt, "请先判断文献类型:综述型文献或研究型文献。", "若为研究型文献,请严格填充下面的研究型模板:", research_template, "若为综述型文献,请严格填充下面的综述型模板:", review_template, "请将模板中的 ${topItem.getField('title')} 替换为真实题名,将 ${topItem.key} 替换为 Zotero key。", "只输出最终 Markdown 笔记,不要输出解释、判断过程或代码围栏。", "文献材料如下:", json.dumps(source, ensure_ascii=False, indent=2), ] ) def call_llm(prompt: str) -> str: api_key = os.environ.get("AWESOMEGPT_API_KEY") base_url = (os.environ.get("AWESOMEGPT_BASE_URL") or "").rstrip("/") model = os.environ.get("AWESOMEGPT_MODEL") if not api_key or not base_url or not model: fail("AWESOMEGPT_API_KEY, AWESOMEGPT_BASE_URL, and AWESOMEGPT_MODEL are required") if not base_url.endswith("/v1"): base_url = base_url + "/v1" payload = { "model": model, "messages": [ {"role": "system", "content": "You are a materials science literature-note assistant. Output Simplified Chinese Markdown only."}, {"role": "user", "content": prompt}, ], "temperature": 0.3, } response = http_json( base_url + "/chat/completions", method="POST", headers={"Authorization": f"Bearer {api_key}"}, payload=payload, timeout=180, ) try: return response["choices"][0]["message"]["content"].strip() except Exception as exc: fail(f"unexpected LLM response shape: {exc}; response={response}") def markdown_to_zotero_html(markdown: str) -> str: lines = markdown.strip().splitlines() out: list[str] = [] in_pre = False pre_lines: list[str] = [] for line in lines: stripped = line.strip() if stripped.startswith("```") or stripped.startswith("~~~"): if in_pre: out.append("
" + html.escape("\n".join(pre_lines)) + "
") pre_lines = [] in_pre = False else: in_pre = True continue if in_pre: pre_lines.append(line) continue if not stripped: continue heading = re.match(r"^(#{1,6})\s+(.+)$", stripped) if heading: level = min(len(heading.group(1)), 6) out.append(f"{html.escape(heading.group(2))}") elif stripped.startswith(">"): out.append(f"
{html.escape(stripped.lstrip('> ').strip())}
") elif re.match(r"^[-*]\s+", stripped): out.append(f"

{html.escape(stripped)}

") else: out.append(f"

{html.escape(stripped)}

") if in_pre: out.append("
" + html.escape("\n".join(pre_lines)) + "
") return "\n".join(out) def create_child_note(user_id: str, parent_key: str, markdown: str, dry_run: bool) -> Any: note_html = markdown_to_zotero_html(markdown) payload = [{"itemType": "note", "parentItem": parent_key, "note": note_html}] if dry_run: return {"dryRun": True, "payload": payload} return zotero_web(f"/users/{user_id}/items", method="POST", payload=payload) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--item-key", action="append", default=[], help="Zotero top-level item key; can be repeated") parser.add_argument("--item-keys", help="Comma/space separated Zotero top-level item keys") parser.add_argument("--query", help="Search query; first top-level match is used") parser.add_argument("--all", action="store_true", help="Process all top-level Zotero items") parser.add_argument("--limit", type=int, default=1, help="Maximum number of items to process") parser.add_argument("--fulltext-chars", type=int, default=12000) parser.add_argument("--skip-existing", action="store_true", help="Skip items that already have a generated AI child note") parser.add_argument("--dry-run", action="store_true", help="generate but do not write Zotero note") parser.add_argument("--vault", default=str(DEFAULT_VAULT), help="Obsidian vault containing 00 Templater") parser.add_argument("--env-file", help="Optional .env path; defaults to /.env") parser.add_argument("--config", default=str(DEFAULT_PRIVATE_CONFIG), help="Private local config JSON") args = parser.parse_args() vault = Path(args.vault).expanduser().resolve() load_dotenv(Path(args.env_file).expanduser().resolve() if args.env_file else vault / ".env") load_private_config(Path(args.config).expanduser().resolve()) load_awesomegpt_prefs() keys = list(args.item_key) if args.item_keys: keys.extend([key for key in re.split(r"[\s,]+", args.item_keys.strip()) if key]) if args.all: items = all_top_items(args.limit if args.limit != 1 else 0) else: items = find_items(keys, args.query, args.limit) user_id = resolve_user_id() results = [] for index, item in enumerate(items, 1): try: key = item.get("key") title = (item.get("data") or {}).get("title") if not key: results.append({"index": index, "status": "skipped", "reason": "missing key", "title": title}) continue if args.skip_existing and has_existing_ai_note(key): results.append({"index": index, "itemKey": key, "title": title, "status": "skipped", "reason": "existing AI note"}) continue print(f"[{index}/{len(items)}] generating {key}: {title}", file=sys.stderr) bibtex = export_bibtex(key) fulltext = local_fulltext(key, args.fulltext_chars) prompt = build_prompt(item, bibtex, fulltext, vault) markdown = call_llm(prompt) result = create_child_note(user_id, key, markdown, args.dry_run) results.append({"index": index, "itemKey": key, "title": title, "status": "ok", "result": result}) except SystemExit as exc: results.append({ "index": index, "itemKey": item.get("key"), "title": (item.get("data") or {}).get("title"), "status": "error", "error": str(exc), }) print(f"[{index}/{len(items)}] error; continuing", file=sys.stderr) continue except Exception as exc: results.append({ "index": index, "itemKey": item.get("key"), "title": (item.get("data") or {}).get("title"), "status": "error", "error": repr(exc), }) print(f"[{index}/{len(items)}] error; continuing: {exc}", file=sys.stderr) continue print(json.dumps(results, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()