#!/usr/bin/env python3 """ Summarize Zotero items into a Markdown table using an OpenAI-compatible LLM. Required environment variables: AWESOMEGPT_API_KEY DeepSeek/OpenAI-compatible API key AWESOMEGPT_BASE_URL Example: https://api.deepseek.com AWESOMEGPT_MODEL Example: deepseek-v4-pro Optional: Load from /.env or AwesomeGPT preferences in the default Zotero profile. """ from __future__ import annotations import argparse import importlib.util import json import os import re import sys import urllib.error import urllib.parse import urllib.request from pathlib import Path from typing import Any try: sys.stdout.reconfigure(encoding="utf-8") sys.stderr.reconfigure(encoding="utf-8") except Exception: pass LOCAL_ZOTERO = "http://127.0.0.1:23119/api/users/0" DEFAULT_VAULT = Path.cwd() DEFAULT_COLUMNS = [ "Zotero Key", "标题", "类型", "年份", "期刊/来源", "研究对象/材料", "关键方法", "核心结论", "相关启发", ] def fail(message: str) -> None: print(f"error: {message}", file=sys.stderr) raise SystemExit(1) def load_dotenv(path: Path) -> None: if not path.exists(): return for raw_line in path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'")) def zotero_profile_prefs() -> Path | None: profiles_ini = Path.home() / "AppData/Roaming/Zotero/Zotero/profiles.ini" profiles_root = profiles_ini.parent try: profiles_ini_exists = profiles_ini.exists() except OSError: return None if profiles_ini_exists: try: text = profiles_ini.read_text(encoding="utf-8", errors="replace") except OSError: return None blocks = re.split(r"\n(?=\[Profile\d+\])", text) for block in blocks: if "Default=1" not in block: continue path_match = re.search(r"^Path=(.+)$", block, re.MULTILINE) relative_match = re.search(r"^IsRelative=(\d+)$", block, re.MULTILINE) if path_match: profile_path = Path(path_match.group(1).strip()) if relative_match and relative_match.group(1) == "1": profile_path = profiles_root / profile_path prefs = profile_path / "prefs.js" try: if prefs.exists(): return prefs except OSError: return None profiles_dir = profiles_root / "Profiles" try: profiles_dir_exists = profiles_dir.exists() except OSError: return None if profiles_dir_exists: try: for prefs in profiles_dir.glob("*/prefs.js"): return prefs except OSError: return None return None def load_awesomegpt_prefs(path: Path | None = None) -> None: if path is None: path = zotero_profile_prefs() if path is None: return try: path_exists = path.exists() except OSError: return if not path_exists: return try: text = path.read_text(encoding="utf-8", errors="replace") except OSError: return prefs: dict[str, Any] = {} for name, raw_value in re.findall(r'user_pref\("([^"]+)",\s*(.*?)\);', text): if not name.startswith("extensions.zotero.zoterogpt."): continue try: prefs[name] = json.loads(raw_value) except json.JSONDecodeError: continue settings_raw = prefs.get("extensions.zotero.zoterogpt.settings") settings = {} if isinstance(settings_raw, str): try: settings = json.loads(settings_raw) except json.JSONDecodeError: settings = {} direct_api = prefs.get("extensions.zotero.zoterogpt.api") direct_model = prefs.get("extensions.zotero.zoterogpt.model") direct_key = prefs.get("extensions.zotero.zoterogpt.secretKey") provider = None if isinstance(settings, dict): provider = settings.get("DeepSeek") or next( (value for key, value in settings.items() if key.lower() == "deepseek"), None, ) if isinstance(provider, dict): os.environ.setdefault("AWESOMEGPT_BASE_URL", provider.get("api") or "") os.environ.setdefault("AWESOMEGPT_MODEL", provider.get("model") or "") os.environ.setdefault("AWESOMEGPT_API_KEY", provider.get("secretKey") or "") if direct_api: os.environ.setdefault("AWESOMEGPT_BASE_URL", str(direct_api)) if direct_model: os.environ.setdefault("AWESOMEGPT_MODEL", str(direct_model)) if direct_key: os.environ.setdefault("AWESOMEGPT_API_KEY", str(direct_key)) def http_json( url: str, *, method: str = "GET", headers: dict[str, str] | None = None, payload: Any = None, timeout: int = 90, ) -> Any: body = None req_headers = dict(headers or {}) if payload is not None: body = json.dumps(payload, ensure_ascii=False).encode("utf-8") req_headers.setdefault("Content-Type", "application/json") request = urllib.request.Request(url, data=body, method=method, headers=req_headers) try: with urllib.request.urlopen(request, timeout=timeout) as response: text = response.read().decode("utf-8", errors="replace") if not text: return None return json.loads(text) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") fail(f"{method} {url} failed: HTTP {exc.code}: {detail[:800]}") except urllib.error.URLError as exc: fail(f"{method} {url} failed: {exc}") def zotero_local(path: str) -> Any: return http_json(LOCAL_ZOTERO + path, headers={"Zotero-API-Version": "3"}, timeout=20) def zotero_local_optional(path: str) -> Any | None: url = LOCAL_ZOTERO + path request = urllib.request.Request(url, headers={"Zotero-API-Version": "3"}) try: with urllib.request.urlopen(request, timeout=20) as response: text = response.read().decode("utf-8", errors="replace") return json.loads(text) if text else None except (urllib.error.HTTPError, urllib.error.URLError, json.JSONDecodeError): return None def find_item(item_key: str | None, query: str | None) -> dict[str, Any]: if item_key: return zotero_local(f"/items/{urllib.parse.quote(item_key)}") if not query: fail("provide --item-key or --query") qs = urllib.parse.urlencode({"q": query, "limit": 5}) matches = zotero_local(f"/items/top?{qs}") if not matches: fail(f"no Zotero item matched query: {query}") return matches[0] def find_items(keys: list[str], query: str | None, limit: int) -> list[dict[str, Any]]: items: list[dict[str, Any]] = [] for key in keys: items.append(find_item(key, None)) if query: qs = urllib.parse.urlencode({"q": query, "limit": limit}) matches = zotero_local(f"/items/top?{qs}") seen = {item.get("key") for item in items} for item in matches or []: if item.get("key") not in seen: items.append(item) seen.add(item.get("key")) if not items: fail("provide --item-key/--item-keys or --query") return items[:limit] if limit else items def all_top_items(limit: int = 0) -> list[dict[str, Any]]: items: list[dict[str, Any]] = [] start = 0 page_limit = 100 while True: qs = urllib.parse.urlencode({"limit": page_limit, "start": start}) page = zotero_local(f"/items/top?{qs}") if not page: break items.extend(page) if limit and len(items) >= limit: return items[:limit] if len(page) < page_limit: break start += page_limit return items def export_bibtex(item_key: str) -> str: qs = urllib.parse.urlencode({"itemKey": item_key, "format": "bibtex"}) request = urllib.request.Request( f"{LOCAL_ZOTERO}/items?{qs}", headers={"Zotero-API-Version": "3"}, ) with urllib.request.urlopen(request, timeout=20) as response: return response.read().decode("utf-8", errors="replace").strip() def extract_pdf_text(path: Path, max_chars: int) -> str: try: path_exists = path.exists() except OSError: return "" if not path_exists: return "" max_pages = 8 if importlib.util.find_spec("fitz"): import fitz # type: ignore chunks = [] with fitz.open(str(path)) as doc: for page in doc[:max_pages]: chunks.append(page.get_text("text")) if sum(len(chunk) for chunk in chunks) >= max_chars: break return "\n".join(chunks)[:max_chars] if importlib.util.find_spec("pypdf"): from pypdf import PdfReader # type: ignore reader = PdfReader(str(path)) chunks = [] for page in reader.pages[:max_pages]: chunks.append(page.extract_text() or "") if sum(len(chunk) for chunk in chunks) >= max_chars: break return "\n".join(chunks)[:max_chars] if importlib.util.find_spec("PyPDF2"): from PyPDF2 import PdfReader # type: ignore reader = PdfReader(str(path)) chunks = [] for page in reader.pages[:max_pages]: chunks.append(page.extract_text() or "") if sum(len(chunk) for chunk in chunks) >= max_chars: break return "\n".join(chunks)[:max_chars] return "" def local_fulltext(parent_key: str, max_chars: int) -> str: children = zotero_local(f"/items/{urllib.parse.quote(parent_key)}/children") parts: list[str] = [] for child in children or []: data = child.get("data") or {} if data.get("itemType") != "attachment": continue key = child.get("key") if not key: continue fulltext = zotero_local_optional(f"/items/{urllib.parse.quote(key)}/fulltext") if not fulltext: continue content = fulltext.get("content") if isinstance(fulltext, dict) else "" if content: parts.append(content) if sum(len(p) for p in parts) >= max_chars: break if not parts: for child in children or []: data = child.get("data") or {} if data.get("itemType") != "attachment" or data.get("contentType") != "application/pdf": continue path = data.get("path") if not path: key = child.get("key") if key: full = zotero_local_optional(f"/items/{urllib.parse.quote(key)}") path = ((full or {}).get("data") or {}).get("path") if not path: continue extracted = extract_pdf_text(Path(path), max_chars=max_chars) if extracted: parts.append(extracted) break return "\n\n".join(parts)[:max_chars] def creators_text(creators: list[dict[str, Any]]) -> str: names = [] for creator in creators: name = creator.get("name") if not name: name = " ".join(x for x in [creator.get("firstName"), creator.get("lastName")] if x) if name: names.append(name) return "; ".join(names) def year_from_date(raw: str | None) -> str: if not raw: return "" match = re.search(r"\b(19|20)\d{2}\b", raw) return match.group(0) if match else raw[:4] def build_source_record(item: dict[str, Any], fulltext_chars: int) -> dict[str, Any]: data = item.get("data") or {} key = item.get("key") return { "zoteroKey": key, "title": data.get("title"), "itemType": data.get("itemType"), "year": year_from_date(data.get("date")), "publicationTitle": data.get("publicationTitle"), "authors": creators_text(data.get("creators") or []), "DOI": data.get("DOI"), "url": data.get("url"), "abstractNote": data.get("abstractNote"), "bibtex": export_bibtex(key) if key else "", "indexedFullTextExcerpt": local_fulltext(key, fulltext_chars) if key else "", } def build_prompt(records: list[dict[str, Any]], columns: list[str]) -> str: instructions = [ "你是材料与化学方向的文献整理助手。", "请根据给定文献信息输出一个 Markdown 表格。", "只输出表格,不要输出任何解释、标题、项目符号或代码块。", f"表头必须严格使用以下列,并保持顺序:{' | '.join(columns)}。", "每篇文献占一行,不要漏项。", "信息不足时填写 - 。", "请使用简体中文概括,保持内容紧凑,单元格内避免超过两句话。", "如果文献明显不是材料方向,也照样总结,但保持客观。", ] payload = {"columns": columns, "papers": records} return "\n\n".join(instructions + [json.dumps(payload, ensure_ascii=False, indent=2)]) def call_llm(prompt: str) -> str: api_key = os.environ.get("AWESOMEGPT_API_KEY") base_url = (os.environ.get("AWESOMEGPT_BASE_URL") or "").rstrip("/") model = os.environ.get("AWESOMEGPT_MODEL") if not api_key or not base_url or not model: fail("AWESOMEGPT_API_KEY, AWESOMEGPT_BASE_URL, and AWESOMEGPT_MODEL are required") if not base_url.endswith("/v1"): base_url = base_url + "/v1" payload = { "model": model, "messages": [ { "role": "system", "content": "You summarize scientific papers into compact Simplified Chinese Markdown tables only.", }, {"role": "user", "content": prompt}, ], "temperature": 0.2, } response = http_json( base_url + "/chat/completions", method="POST", headers={"Authorization": f"Bearer {api_key}"}, payload=payload, timeout=240, ) try: return response["choices"][0]["message"]["content"].strip() except Exception as exc: fail(f"unexpected LLM response shape: {exc}; response={response}") def split_batches(items: list[dict[str, Any]], batch_size: int) -> list[list[dict[str, Any]]]: if batch_size <= 0: return [items] return [items[i:i + batch_size] for i in range(0, len(items), batch_size)] def normalize_table(markdown: str, keep_header: bool) -> list[str]: lines = [line.rstrip() for line in markdown.splitlines() if line.strip().startswith("|")] if not lines: fail("LLM did not return a Markdown table") if keep_header: return lines if len(lines) >= 3: return lines[2:] return lines def write_output(table: str, out_path: Path | None) -> None: if out_path is None: print(table) return out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(table + "\n", encoding="utf-8") print(str(out_path)) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--item-key", action="append", default=[], help="Zotero top-level item key; can be repeated") parser.add_argument("--item-keys", help="Comma/space separated Zotero top-level item keys") parser.add_argument("--query", help="Search query; top-level matches are used") parser.add_argument("--all", action="store_true", help="Process all top-level Zotero items") parser.add_argument("--limit", type=int, default=5, help="Maximum number of items to process; 0 means no limit") parser.add_argument("--batch-size", type=int, default=5, help="Number of papers per LLM call") parser.add_argument("--fulltext-chars", type=int, default=4000) parser.add_argument("--columns", help="Comma-separated Markdown table columns") parser.add_argument("--vault", default=str(DEFAULT_VAULT), help="Obsidian vault containing optional .env") parser.add_argument("--env-file", help="Optional .env path; defaults to /.env") parser.add_argument("--out", help="Optional output Markdown file path") args = parser.parse_args() vault = Path(args.vault).expanduser().resolve() load_dotenv(Path(args.env_file).expanduser().resolve() if args.env_file else vault / ".env") load_awesomegpt_prefs() keys = list(args.item_key) if args.item_keys: keys.extend([key for key in re.split(r"[\s,]+", args.item_keys.strip()) if key]) limit = args.limit if args.limit != 0 else 0 if args.all: items = all_top_items(limit) else: query_limit = limit or 100 items = find_items(keys, args.query, query_limit) if not items: fail("no Zotero items selected") columns = [col.strip() for col in (args.columns.split(",") if args.columns else DEFAULT_COLUMNS) if col.strip()] batches = split_batches(items, args.batch_size) output_lines: list[str] = [] for index, batch in enumerate(batches, 1): print(f"[{index}/{len(batches)}] summarizing {len(batch)} items", file=sys.stderr) records = [build_source_record(item, args.fulltext_chars) for item in batch] table = call_llm(build_prompt(records, columns)) output_lines.extend(normalize_table(table, keep_header=index == 1)) write_output("\n".join(output_lines), Path(args.out).expanduser().resolve() if args.out else None) if __name__ == "__main__": main()