#!/usr/bin/env python3 """ Generate an AI literature note from Zotero metadata and save it as a Zotero child note. Required environment variables: AWESOMEGPT_API_KEY DeepSeek/OpenAI-compatible API key AWESOMEGPT_BASE_URL Example: https://api.deepseek.com AWESOMEGPT_MODEL Example: deepseek-v4-pro ZOTERO_API_KEY Zotero Web API key with library write permission Optional: ZOTERO_USER_ID If omitted, resolved from /keys/current Private local config: \config\config.local.json, created by scripts\init_private_config.py """ from __future__ import annotations import argparse import html import importlib.util import json import os import re import sys import urllib.error import urllib.parse import urllib.request from pathlib import Path from typing import Any try: sys.stdout.reconfigure(encoding="utf-8") sys.stderr.reconfigure(encoding="utf-8") except Exception: pass LOCAL_ZOTERO = "http://127.0.0.1:23119/api/users/0" ZOTERO_WEB = "https://api.zotero.org" DEFAULT_VAULT = Path.cwd() SKILL_DIR = Path(__file__).resolve().parents[1] DEFAULT_PRIVATE_CONFIG = SKILL_DIR / "config" / "config.local.json" WORKSPACE_PYDEPS = Path.home() / "Documents" / "Obsidian Vault" / ".codex_tmp" / "pydeps" if WORKSPACE_PYDEPS.exists(): sys.path.insert(0, str(WORKSPACE_PYDEPS)) def fail(message: str) -> None: print(f"error: {message}", file=sys.stderr) raise SystemExit(1) def load_dotenv(path: Path) -> None: if not path.exists(): return for raw_line in path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") os.environ.setdefault(key, value) def load_private_config(path: Path) -> None: if not path.exists(): return try: config = json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError as exc: fail(f"invalid private config JSON at {path}: {exc}") zotero = config.get("zotero", {}) if isinstance(config.get("zotero"), dict) else {} awesomegpt = config.get("awesomegpt", {}) if isinstance(config.get("awesomegpt"), dict) else {} mappings = { "ZOTERO_API_KEY": zotero.get("api_key"), "ZOTERO_USER_ID": zotero.get("user_id"), "AWESOMEGPT_API_KEY": awesomegpt.get("api_key"), "AWESOMEGPT_BASE_URL": awesomegpt.get("base_url"), "AWESOMEGPT_MODEL": awesomegpt.get("model"), } for key, value in mappings.items(): if value: os.environ.setdefault(key, str(value)) def zotero_profile_prefs() -> Path | None: profiles_ini = Path.home() / "AppData/Roaming/Zotero/Zotero/profiles.ini" profiles_root = profiles_ini.parent if profiles_ini.exists(): text = profiles_ini.read_text(encoding="utf-8", errors="replace") blocks = re.split(r"\n(?=\[Profile\d+\])", text) for block in blocks: if "Default=1" not in block: continue path_match = re.search(r"^Path=(.+)$", block, re.MULTILINE) relative_match = re.search(r"^IsRelative=(\d+)$", block, re.MULTILINE) if path_match: profile_path = Path(path_match.group(1).strip()) if relative_match and relative_match.group(1) == "1": profile_path = profiles_root / profile_path prefs = profile_path / "prefs.js" if prefs.exists(): return prefs profiles_dir = profiles_root / "Profiles" if profiles_dir.exists(): for prefs in profiles_dir.glob("*/prefs.js"): return prefs return None def load_awesomegpt_prefs(path: Path | None = None) -> None: if path is None: path = zotero_profile_prefs() if path is None: return if not path.exists(): return text = path.read_text(encoding="utf-8", errors="replace") prefs: dict[str, Any] = {} for name, raw_value in re.findall(r'user_pref\("([^"]+)",\s*(.*?)\);', text): if not name.startswith("extensions.zotero.zoterogpt."): continue try: prefs[name] = json.loads(raw_value) except json.JSONDecodeError: continue settings_raw = prefs.get("extensions.zotero.zoterogpt.settings") if isinstance(settings_raw, str): try: settings = json.loads(settings_raw) except json.JSONDecodeError: settings = {} else: settings = {} direct_api = prefs.get("extensions.zotero.zoterogpt.api") direct_model = prefs.get("extensions.zotero.zoterogpt.model") direct_key = prefs.get("extensions.zotero.zoterogpt.secretKey") provider = None if isinstance(settings, dict): provider = settings.get("DeepSeek") or next( (value for key, value in settings.items() if key.lower() == "deepseek"), None, ) if isinstance(provider, dict): os.environ.setdefault("AWESOMEGPT_BASE_URL", provider.get("api") or "") os.environ.setdefault("AWESOMEGPT_MODEL", provider.get("model") or "") os.environ.setdefault("AWESOMEGPT_API_KEY", provider.get("secretKey") or "") if direct_api: os.environ.setdefault("AWESOMEGPT_BASE_URL", str(direct_api)) if direct_model: os.environ.setdefault("AWESOMEGPT_MODEL", str(direct_model)) if direct_key: os.environ.setdefault("AWESOMEGPT_API_KEY", str(direct_key)) def http_json( url: str, *, method: str = "GET", headers: dict[str, str] | None = None, payload: Any = None, timeout: int = 90, ) -> Any: body = None req_headers = dict(headers or {}) if payload is not None: body = json.dumps(payload, ensure_ascii=False).encode("utf-8") req_headers.setdefault("Content-Type", "application/json") request = urllib.request.Request(url, data=body, method=method, headers=req_headers) try: with urllib.request.urlopen(request, timeout=timeout) as response: text = response.read().decode("utf-8", errors="replace") if not text: return None return json.loads(text) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") fail(f"{method} {url} failed: HTTP {exc.code}: {detail[:800]}") except urllib.error.URLError as exc: fail(f"{method} {url} failed: {exc}") def zotero_local(path: str) -> Any: url = LOCAL_ZOTERO + path return http_json(url, headers={"Zotero-API-Version": "3"}, timeout=20) def zotero_local_optional(path: str) -> Any | None: url = LOCAL_ZOTERO + path request = urllib.request.Request(url, headers={"Zotero-API-Version": "3"}) try: with urllib.request.urlopen(request, timeout=20) as response: text = response.read().decode("utf-8", errors="replace") return json.loads(text) if text else None except (urllib.error.HTTPError, urllib.error.URLError, json.JSONDecodeError): return None def zotero_web(path: str, *, method: str = "GET", payload: Any = None) -> Any: api_key = os.environ.get("ZOTERO_API_KEY") if not api_key: fail("ZOTERO_API_KEY is required to write Zotero child notes") url = ZOTERO_WEB + path return http_json( url, method=method, headers={"Zotero-API-Version": "3", "Zotero-API-Key": api_key}, payload=payload, timeout=60, ) def resolve_user_id() -> str: explicit = os.environ.get("ZOTERO_USER_ID") if explicit: return explicit current = zotero_web("/keys/current") user_id = current.get("userID") if isinstance(current, dict) else None if not user_id: fail("could not resolve Zotero userID from /keys/current") return str(user_id) def find_item(item_key: str | None, query: str | None) -> dict[str, Any]: if item_key: return zotero_local(f"/items/{urllib.parse.quote(item_key)}") if not query: fail("provide --item-key or --query") qs = urllib.parse.urlencode({"q": query, "limit": 5}) matches = zotero_local(f"/items/top?{qs}") if not matches: fail(f"no Zotero item matched query: {query}") if len(matches) > 1: print(f"warning: {len(matches)} matches; using {matches[0].get('key')}", file=sys.stderr) return matches[0] def find_items(keys: list[str], query: str | None, limit: int) -> list[dict[str, Any]]: items: list[dict[str, Any]] = [] for key in keys: items.append(find_item(key, None)) if query: qs = urllib.parse.urlencode({"q": query, "limit": limit}) matches = zotero_local(f"/items/top?{qs}") seen = {item.get("key") for item in items} for item in matches or []: if item.get("key") not in seen: items.append(item) seen.add(item.get("key")) if not items: fail("provide --item-key/--item-keys or --query") return items[:limit] if limit else items def all_top_items(limit: int = 0) -> list[dict[str, Any]]: items: list[dict[str, Any]] = [] start = 0 page_limit = 100 while True: qs = urllib.parse.urlencode({"limit": page_limit, "start": start}) page = zotero_local(f"/items/top?{qs}") if not page: break items.extend(page) if limit and len(items) >= limit: return items[:limit] if len(page) < page_limit: break start += page_limit return items def has_existing_ai_note(parent_key: str) -> bool: children = zotero_local(f"/items/{urllib.parse.quote(parent_key)}/children") for child in children or []: data = child.get("data") or {} if data.get("itemType") != "note": continue note = data.get("note") or "" if not note and child.get("key"): full_note = zotero_local_optional(f"/items/{urllib.parse.quote(child['key'])}") note = ((full_note or {}).get("data") or {}).get("note") or "" if "AI文献笔记" in note or "AI Literature Note" in note or f"items/{parent_key}" in note: return True return False def export_bibtex(item_key: str) -> str: qs = urllib.parse.urlencode({"itemKey": item_key, "format": "bibtex"}) url = f"{LOCAL_ZOTERO}/items?{qs}" request = urllib.request.Request(url, headers={"Zotero-API-Version": "3"}) with urllib.request.urlopen(request, timeout=20) as response: return response.read().decode("utf-8", errors="replace").strip() def local_fulltext(parent_key: str, max_chars: int) -> str: children = zotero_local(f"/items/{urllib.parse.quote(parent_key)}/children") parts: list[str] = [] for child in children or []: data = child.get("data") or {} if data.get("itemType") != "attachment": continue key = child.get("key") if not key: continue fulltext = zotero_local_optional(f"/items/{urllib.parse.quote(key)}/fulltext") if not fulltext: continue content = fulltext.get("content") if isinstance(fulltext, dict) else "" if content: parts.append(content) if sum(len(p) for p in parts) >= max_chars: break if not parts: for child in children or []: data = child.get("data") or {} if data.get("itemType") != "attachment" or data.get("contentType") != "application/pdf": continue path = data.get("path") if not path: key = child.get("key") if key: full = zotero_local_optional(f"/items/{urllib.parse.quote(key)}") path = ((full or {}).get("data") or {}).get("path") extracted = extract_pdf_text(resolve_attachment_path(child, path), max_chars=max_chars) if extracted: parts.append(extracted) break text = "\n\n".join(parts) return text[:max_chars] def resolve_attachment_path(child: dict[str, Any], path: str | None) -> Path: if path: candidate = Path(path) if candidate.exists(): return candidate key = child.get("key") if key: storage_dir = Path.home() / "Zotero" / "storage" / str(key) if storage_dir.exists(): pdfs = list(storage_dir.glob("*.pdf")) if pdfs: return pdfs[0] return Path(path or "") def template_paths(vault: Path) -> tuple[Path, Path, Path]: templates = vault / "00 Templater" if not templates.exists(): fail(f"template directory not found: {templates}") files = {path.name: path for path in templates.glob("*.md")} def match(prefix: str, contains: str) -> Path: candidates = [ path for name, path in files.items() if name.startswith(prefix) and contains in name ] if not candidates: candidates = [path for name, path in files.items() if name.startswith(prefix)] if not candidates: fail(f"missing template file starting with {prefix} in {templates}") return candidates[0] return ( match("03", "AI"), match("01", ""), match("02", ""), ) def extract_pdf_text(path: Path, max_chars: int) -> str: if not path.exists(): return "" max_pages = 80 if max_chars >= 50000 else 12 if importlib.util.find_spec("fitz"): import fitz # type: ignore chunks = [] with fitz.open(str(path)) as doc: for page in doc[:max_pages]: chunks.append(page.get_text("text")) if sum(len(chunk) for chunk in chunks) >= max_chars: break return "\n".join(chunks)[:max_chars] if importlib.util.find_spec("pypdf"): from pypdf import PdfReader # type: ignore reader = PdfReader(str(path)) chunks = [] for page in reader.pages[:max_pages]: chunks.append(page.extract_text() or "") if sum(len(chunk) for chunk in chunks) >= max_chars: break return "\n".join(chunks)[:max_chars] if importlib.util.find_spec("PyPDF2"): from PyPDF2 import PdfReader # type: ignore reader = PdfReader(str(path)) chunks = [] for page in reader.pages[:max_pages]: chunks.append(page.extract_text() or "") if sum(len(chunk) for chunk in chunks) >= max_chars: break return "\n".join(chunks)[:max_chars] return "" def creators_text(creators: list[dict[str, Any]]) -> str: names = [] for creator in creators: name = creator.get("name") if not name: name = " ".join(x for x in [creator.get("firstName"), creator.get("lastName")] if x) if name: names.append(name) return "; ".join(names) def build_prompt(item: dict[str, Any], bibtex: str, fulltext: str, vault: Path, mode: str) -> str: data = item.get("data") or {} prompt_path, research_template_path, review_template_path = template_paths(vault) prompt = prompt_path.read_text(encoding="utf-8") research_template = research_template_path.read_text(encoding="utf-8") review_template = review_template_path.read_text(encoding="utf-8") metadata = { "zoteroKey": item.get("key"), "title": data.get("title"), "itemType": data.get("itemType"), "authors": creators_text(data.get("creators") or []), "publicationTitle": data.get("publicationTitle"), "date": data.get("date"), "DOI": data.get("DOI"), "url": data.get("url"), "abstractNote": data.get("abstractNote"), } source = { "metadata": metadata, "bibtex": bibtex, "indexedFullTextExcerpt": fulltext, } instructions = [ prompt, "请先判断文献类型:综述型文献或研究型文献。", "若为研究型文献,请严格填充下面的研究型模板:", research_template, "若为综述型文献,请严格填充下面的综述型模板:", review_template, "请将模板中的 ${topItem.getField('title')} 替换为真实题名,将 ${topItem.key} 替换为 Zotero key。", ] if mode == "deep": instructions.extend( [ "这是满血精读模式。请优先追求完整、具体、可复用的文献阅读笔记,而不是短摘要。", "每个模板栏目都要充分展开;如果原文提供了材料配比、制备参数、测试条件、器件结构、性能数据、机理解释或对照实验,必须具体写出。", "核心数据必须尽量保留数值和单位;不要用'显著提高'、'性能优异'这类空泛表达替代具体结果。", "机制部分要写清因果链条:材料/结构设计如何影响微观结构、界面、导电网络、离子/电子传输、力学响应或器件输出。", "局限与启发部分要给出可用于后续课题设计的具体启发,而不是泛泛评价。", "如果全文摘录中缺少某项信息,请明确写'原文摘录中未提供',不要编造。", ] ) instructions.extend( [ "只输出最终 Markdown 笔记,不要输出解释、判断过程或代码围栏。", "文献材料如下:", json.dumps(source, ensure_ascii=False, indent=2), ] ) return "\n\n".join(instructions) def call_llm(prompt: str, max_tokens: int | None = None) -> str: api_key = os.environ.get("AWESOMEGPT_API_KEY") base_url = (os.environ.get("AWESOMEGPT_BASE_URL") or "").rstrip("/") model = os.environ.get("AWESOMEGPT_MODEL") if not api_key or not base_url or not model: fail("AWESOMEGPT_API_KEY, AWESOMEGPT_BASE_URL, and AWESOMEGPT_MODEL are required") if not base_url.endswith("/v1"): base_url = base_url + "/v1" payload = { "model": model, "messages": [ {"role": "system", "content": "You are a materials science literature-note assistant. Output Simplified Chinese Markdown only."}, {"role": "user", "content": prompt}, ], "temperature": 0.3, } if max_tokens: payload["max_tokens"] = max_tokens response = http_json( base_url + "/chat/completions", method="POST", headers={"Authorization": f"Bearer {api_key}"}, payload=payload, timeout=180, ) try: return response["choices"][0]["message"]["content"].strip() except Exception as exc: fail(f"unexpected LLM response shape: {exc}; response={response}") def markdown_to_zotero_html(markdown: str) -> str: lines = markdown.strip().splitlines() out: list[str] = [] in_pre = False pre_lines: list[str] = [] for line in lines: stripped = line.strip() if stripped.startswith("```") or stripped.startswith("~~~"): if in_pre: out.append("
" + html.escape("\n".join(pre_lines)) + "
") pre_lines = [] in_pre = False else: in_pre = True continue if in_pre: pre_lines.append(line) continue if not stripped: continue heading = re.match(r"^(#{1,6})\s+(.+)$", stripped) if heading: level = min(len(heading.group(1)), 6) out.append(f"{html.escape(heading.group(2))}") elif stripped.startswith(">"): out.append(f"
{html.escape(stripped.lstrip('> ').strip())}
") elif re.match(r"^[-*]\s+", stripped): out.append(f"

{html.escape(stripped)}

") else: out.append(f"

{html.escape(stripped)}

") if in_pre: out.append("
" + html.escape("\n".join(pre_lines)) + "
") return "\n".join(out) def create_child_note(user_id: str, parent_key: str, markdown: str, dry_run: bool) -> Any: note_html = markdown_to_zotero_html(markdown) payload = [{"itemType": "note", "parentItem": parent_key, "note": note_html}] if dry_run: return {"dryRun": True, "payload": payload} return zotero_web(f"/users/{user_id}/items", method="POST", payload=payload) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--item-key", action="append", default=[], help="Zotero top-level item key; can be repeated") parser.add_argument("--item-keys", help="Comma/space separated Zotero top-level item keys") parser.add_argument("--query", help="Search query; first top-level match is used") parser.add_argument("--all", action="store_true", help="Process all top-level Zotero items") parser.add_argument("--limit", type=int, default=1, help="Maximum number of items to process") parser.add_argument("--fulltext-chars", type=int, default=12000) parser.add_argument("--mode", choices=["quick", "deep"], default="quick", help="quick for batch notes; deep for full-detail notes") parser.add_argument("--max-tokens", type=int, default=0, help="Optional LLM output token limit; deep mode defaults to 12000") parser.add_argument("--skip-existing", action="store_true", help="Skip items that already have a generated AI child note") parser.add_argument("--dry-run", action="store_true", help="generate but do not write Zotero note") parser.add_argument("--vault", default=str(DEFAULT_VAULT), help="Obsidian vault containing 00 Templater") parser.add_argument("--env-file", help="Optional .env path; defaults to /.env") parser.add_argument("--config", default=str(DEFAULT_PRIVATE_CONFIG), help="Private local config JSON") args = parser.parse_args() vault = Path(args.vault).expanduser().resolve() load_dotenv(Path(args.env_file).expanduser().resolve() if args.env_file else vault / ".env") load_private_config(Path(args.config).expanduser().resolve()) load_awesomegpt_prefs() keys = list(args.item_key) if args.item_keys: keys.extend([key for key in re.split(r"[\s,]+", args.item_keys.strip()) if key]) if args.all: items = all_top_items(args.limit if args.limit != 1 else 0) else: items = find_items(keys, args.query, args.limit) user_id = resolve_user_id() results = [] for index, item in enumerate(items, 1): try: key = item.get("key") title = (item.get("data") or {}).get("title") if not key: results.append({"index": index, "status": "skipped", "reason": "missing key", "title": title}) continue if args.skip_existing and has_existing_ai_note(key): results.append({"index": index, "itemKey": key, "title": title, "status": "skipped", "reason": "existing AI note"}) continue print(f"[{index}/{len(items)}] generating {key}: {title}", file=sys.stderr) bibtex = export_bibtex(key) fulltext_chars = args.fulltext_chars if args.mode == "deep" and fulltext_chars == 12000: fulltext_chars = 80000 max_tokens = args.max_tokens if args.mode == "deep" and not max_tokens: max_tokens = 12000 fulltext = local_fulltext(key, fulltext_chars) prompt = build_prompt(item, bibtex, fulltext, vault, args.mode) markdown = call_llm(prompt, max_tokens or None) result = create_child_note(user_id, key, markdown, args.dry_run) results.append({"index": index, "itemKey": key, "title": title, "status": "ok", "result": result}) except SystemExit as exc: results.append({ "index": index, "itemKey": item.get("key"), "title": (item.get("data") or {}).get("title"), "status": "error", "error": str(exc), }) print(f"[{index}/{len(items)}] error; continuing", file=sys.stderr) continue except Exception as exc: results.append({ "index": index, "itemKey": item.get("key"), "title": (item.get("data") or {}).get("title"), "status": "error", "error": repr(exc), }) print(f"[{index}/{len(items)}] error; continuing: {exc}", file=sys.stderr) continue print(json.dumps(results, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()