QQzot-skill/scripts/summarize_zotero_table.py

494 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Summarize Zotero items into a Markdown table using an OpenAI-compatible LLM.
Required environment variables:
AWESOMEGPT_API_KEY DeepSeek/OpenAI-compatible API key
AWESOMEGPT_BASE_URL Example: https://api.deepseek.com
AWESOMEGPT_MODEL Example: deepseek-v4-pro
Optional:
Load from <vault>/.env or AwesomeGPT preferences in the default Zotero profile.
"""
from __future__ import annotations
import argparse
import importlib.util
import json
import os
import re
import sys
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
try:
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
except Exception:
pass
LOCAL_ZOTERO = "http://127.0.0.1:23119/api/users/0"
DEFAULT_VAULT = Path.cwd()
DEFAULT_COLUMNS = [
"Zotero Key",
"标题",
"类型",
"年份",
"期刊/来源",
"研究对象/材料",
"关键方法",
"核心结论",
"相关启发",
]
def fail(message: str) -> None:
print(f"error: {message}", file=sys.stderr)
raise SystemExit(1)
def load_dotenv(path: Path) -> None:
if not path.exists():
return
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'"))
def zotero_profile_prefs() -> Path | None:
profiles_ini = Path.home() / "AppData/Roaming/Zotero/Zotero/profiles.ini"
profiles_root = profiles_ini.parent
try:
profiles_ini_exists = profiles_ini.exists()
except OSError:
return None
if profiles_ini_exists:
try:
text = profiles_ini.read_text(encoding="utf-8", errors="replace")
except OSError:
return None
blocks = re.split(r"\n(?=\[Profile\d+\])", text)
for block in blocks:
if "Default=1" not in block:
continue
path_match = re.search(r"^Path=(.+)$", block, re.MULTILINE)
relative_match = re.search(r"^IsRelative=(\d+)$", block, re.MULTILINE)
if path_match:
profile_path = Path(path_match.group(1).strip())
if relative_match and relative_match.group(1) == "1":
profile_path = profiles_root / profile_path
prefs = profile_path / "prefs.js"
try:
if prefs.exists():
return prefs
except OSError:
return None
profiles_dir = profiles_root / "Profiles"
try:
profiles_dir_exists = profiles_dir.exists()
except OSError:
return None
if profiles_dir_exists:
try:
for prefs in profiles_dir.glob("*/prefs.js"):
return prefs
except OSError:
return None
return None
def load_awesomegpt_prefs(path: Path | None = None) -> None:
if path is None:
path = zotero_profile_prefs()
if path is None:
return
try:
path_exists = path.exists()
except OSError:
return
if not path_exists:
return
try:
text = path.read_text(encoding="utf-8", errors="replace")
except OSError:
return
prefs: dict[str, Any] = {}
for name, raw_value in re.findall(r'user_pref\("([^"]+)",\s*(.*?)\);', text):
if not name.startswith("extensions.zotero.zoterogpt."):
continue
try:
prefs[name] = json.loads(raw_value)
except json.JSONDecodeError:
continue
settings_raw = prefs.get("extensions.zotero.zoterogpt.settings")
settings = {}
if isinstance(settings_raw, str):
try:
settings = json.loads(settings_raw)
except json.JSONDecodeError:
settings = {}
direct_api = prefs.get("extensions.zotero.zoterogpt.api")
direct_model = prefs.get("extensions.zotero.zoterogpt.model")
direct_key = prefs.get("extensions.zotero.zoterogpt.secretKey")
provider = None
if isinstance(settings, dict):
provider = settings.get("DeepSeek") or next(
(value for key, value in settings.items() if key.lower() == "deepseek"),
None,
)
if isinstance(provider, dict):
os.environ.setdefault("AWESOMEGPT_BASE_URL", provider.get("api") or "")
os.environ.setdefault("AWESOMEGPT_MODEL", provider.get("model") or "")
os.environ.setdefault("AWESOMEGPT_API_KEY", provider.get("secretKey") or "")
if direct_api:
os.environ.setdefault("AWESOMEGPT_BASE_URL", str(direct_api))
if direct_model:
os.environ.setdefault("AWESOMEGPT_MODEL", str(direct_model))
if direct_key:
os.environ.setdefault("AWESOMEGPT_API_KEY", str(direct_key))
def http_json(
url: str,
*,
method: str = "GET",
headers: dict[str, str] | None = None,
payload: Any = None,
timeout: int = 90,
) -> Any:
body = None
req_headers = dict(headers or {})
if payload is not None:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req_headers.setdefault("Content-Type", "application/json")
request = urllib.request.Request(url, data=body, method=method, headers=req_headers)
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
text = response.read().decode("utf-8", errors="replace")
if not text:
return None
return json.loads(text)
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
fail(f"{method} {url} failed: HTTP {exc.code}: {detail[:800]}")
except urllib.error.URLError as exc:
fail(f"{method} {url} failed: {exc}")
def zotero_local(path: str) -> Any:
return http_json(LOCAL_ZOTERO + path, headers={"Zotero-API-Version": "3"}, timeout=20)
def zotero_local_optional(path: str) -> Any | None:
url = LOCAL_ZOTERO + path
request = urllib.request.Request(url, headers={"Zotero-API-Version": "3"})
try:
with urllib.request.urlopen(request, timeout=20) as response:
text = response.read().decode("utf-8", errors="replace")
return json.loads(text) if text else None
except (urllib.error.HTTPError, urllib.error.URLError, json.JSONDecodeError):
return None
def find_item(item_key: str | None, query: str | None) -> dict[str, Any]:
if item_key:
return zotero_local(f"/items/{urllib.parse.quote(item_key)}")
if not query:
fail("provide --item-key or --query")
qs = urllib.parse.urlencode({"q": query, "limit": 5})
matches = zotero_local(f"/items/top?{qs}")
if not matches:
fail(f"no Zotero item matched query: {query}")
return matches[0]
def find_items(keys: list[str], query: str | None, limit: int) -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
for key in keys:
items.append(find_item(key, None))
if query:
qs = urllib.parse.urlencode({"q": query, "limit": limit})
matches = zotero_local(f"/items/top?{qs}")
seen = {item.get("key") for item in items}
for item in matches or []:
if item.get("key") not in seen:
items.append(item)
seen.add(item.get("key"))
if not items:
fail("provide --item-key/--item-keys or --query")
return items[:limit] if limit else items
def all_top_items(limit: int = 0) -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
start = 0
page_limit = 100
while True:
qs = urllib.parse.urlencode({"limit": page_limit, "start": start})
page = zotero_local(f"/items/top?{qs}")
if not page:
break
items.extend(page)
if limit and len(items) >= limit:
return items[:limit]
if len(page) < page_limit:
break
start += page_limit
return items
def export_bibtex(item_key: str) -> str:
qs = urllib.parse.urlencode({"itemKey": item_key, "format": "bibtex"})
request = urllib.request.Request(
f"{LOCAL_ZOTERO}/items?{qs}",
headers={"Zotero-API-Version": "3"},
)
with urllib.request.urlopen(request, timeout=20) as response:
return response.read().decode("utf-8", errors="replace").strip()
def extract_pdf_text(path: Path, max_chars: int) -> str:
try:
path_exists = path.exists()
except OSError:
return ""
if not path_exists:
return ""
max_pages = 8
if importlib.util.find_spec("fitz"):
import fitz # type: ignore
chunks = []
with fitz.open(str(path)) as doc:
for page in doc[:max_pages]:
chunks.append(page.get_text("text"))
if sum(len(chunk) for chunk in chunks) >= max_chars:
break
return "\n".join(chunks)[:max_chars]
if importlib.util.find_spec("pypdf"):
from pypdf import PdfReader # type: ignore
reader = PdfReader(str(path))
chunks = []
for page in reader.pages[:max_pages]:
chunks.append(page.extract_text() or "")
if sum(len(chunk) for chunk in chunks) >= max_chars:
break
return "\n".join(chunks)[:max_chars]
if importlib.util.find_spec("PyPDF2"):
from PyPDF2 import PdfReader # type: ignore
reader = PdfReader(str(path))
chunks = []
for page in reader.pages[:max_pages]:
chunks.append(page.extract_text() or "")
if sum(len(chunk) for chunk in chunks) >= max_chars:
break
return "\n".join(chunks)[:max_chars]
return ""
def local_fulltext(parent_key: str, max_chars: int) -> str:
children = zotero_local(f"/items/{urllib.parse.quote(parent_key)}/children")
parts: list[str] = []
for child in children or []:
data = child.get("data") or {}
if data.get("itemType") != "attachment":
continue
key = child.get("key")
if not key:
continue
fulltext = zotero_local_optional(f"/items/{urllib.parse.quote(key)}/fulltext")
if not fulltext:
continue
content = fulltext.get("content") if isinstance(fulltext, dict) else ""
if content:
parts.append(content)
if sum(len(p) for p in parts) >= max_chars:
break
if not parts:
for child in children or []:
data = child.get("data") or {}
if data.get("itemType") != "attachment" or data.get("contentType") != "application/pdf":
continue
path = data.get("path")
if not path:
key = child.get("key")
if key:
full = zotero_local_optional(f"/items/{urllib.parse.quote(key)}")
path = ((full or {}).get("data") or {}).get("path")
if not path:
continue
extracted = extract_pdf_text(Path(path), max_chars=max_chars)
if extracted:
parts.append(extracted)
break
return "\n\n".join(parts)[:max_chars]
def creators_text(creators: list[dict[str, Any]]) -> str:
names = []
for creator in creators:
name = creator.get("name")
if not name:
name = " ".join(x for x in [creator.get("firstName"), creator.get("lastName")] if x)
if name:
names.append(name)
return "; ".join(names)
def year_from_date(raw: str | None) -> str:
if not raw:
return ""
match = re.search(r"\b(19|20)\d{2}\b", raw)
return match.group(0) if match else raw[:4]
def build_source_record(item: dict[str, Any], fulltext_chars: int) -> dict[str, Any]:
data = item.get("data") or {}
key = item.get("key")
return {
"zoteroKey": key,
"title": data.get("title"),
"itemType": data.get("itemType"),
"year": year_from_date(data.get("date")),
"publicationTitle": data.get("publicationTitle"),
"authors": creators_text(data.get("creators") or []),
"DOI": data.get("DOI"),
"url": data.get("url"),
"abstractNote": data.get("abstractNote"),
"bibtex": export_bibtex(key) if key else "",
"indexedFullTextExcerpt": local_fulltext(key, fulltext_chars) if key else "",
}
def build_prompt(records: list[dict[str, Any]], columns: list[str]) -> str:
instructions = [
"你是材料与化学方向的文献整理助手。",
"请根据给定文献信息输出一个 Markdown 表格。",
"只输出表格,不要输出任何解释、标题、项目符号或代码块。",
f"表头必须严格使用以下列,并保持顺序:{' | '.join(columns)}",
"每篇文献占一行,不要漏项。",
"信息不足时填写 - 。",
"请使用简体中文概括,保持内容紧凑,单元格内避免超过两句话。",
"如果文献明显不是材料方向,也照样总结,但保持客观。",
]
payload = {"columns": columns, "papers": records}
return "\n\n".join(instructions + [json.dumps(payload, ensure_ascii=False, indent=2)])
def call_llm(prompt: str) -> str:
api_key = os.environ.get("AWESOMEGPT_API_KEY")
base_url = (os.environ.get("AWESOMEGPT_BASE_URL") or "").rstrip("/")
model = os.environ.get("AWESOMEGPT_MODEL")
if not api_key or not base_url or not model:
fail("AWESOMEGPT_API_KEY, AWESOMEGPT_BASE_URL, and AWESOMEGPT_MODEL are required")
if not base_url.endswith("/v1"):
base_url = base_url + "/v1"
payload = {
"model": model,
"messages": [
{
"role": "system",
"content": "You summarize scientific papers into compact Simplified Chinese Markdown tables only.",
},
{"role": "user", "content": prompt},
],
"temperature": 0.2,
}
response = http_json(
base_url + "/chat/completions",
method="POST",
headers={"Authorization": f"Bearer {api_key}"},
payload=payload,
timeout=240,
)
try:
return response["choices"][0]["message"]["content"].strip()
except Exception as exc:
fail(f"unexpected LLM response shape: {exc}; response={response}")
def split_batches(items: list[dict[str, Any]], batch_size: int) -> list[list[dict[str, Any]]]:
if batch_size <= 0:
return [items]
return [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
def normalize_table(markdown: str, keep_header: bool) -> list[str]:
lines = [line.rstrip() for line in markdown.splitlines() if line.strip().startswith("|")]
if not lines:
fail("LLM did not return a Markdown table")
if keep_header:
return lines
if len(lines) >= 3:
return lines[2:]
return lines
def write_output(table: str, out_path: Path | None) -> None:
if out_path is None:
print(table)
return
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(table + "\n", encoding="utf-8")
print(str(out_path))
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--item-key", action="append", default=[], help="Zotero top-level item key; can be repeated")
parser.add_argument("--item-keys", help="Comma/space separated Zotero top-level item keys")
parser.add_argument("--query", help="Search query; top-level matches are used")
parser.add_argument("--all", action="store_true", help="Process all top-level Zotero items")
parser.add_argument("--limit", type=int, default=5, help="Maximum number of items to process; 0 means no limit")
parser.add_argument("--batch-size", type=int, default=5, help="Number of papers per LLM call")
parser.add_argument("--fulltext-chars", type=int, default=4000)
parser.add_argument("--columns", help="Comma-separated Markdown table columns")
parser.add_argument("--vault", default=str(DEFAULT_VAULT), help="Obsidian vault containing optional .env")
parser.add_argument("--env-file", help="Optional .env path; defaults to <vault>/.env")
parser.add_argument("--out", help="Optional output Markdown file path")
args = parser.parse_args()
vault = Path(args.vault).expanduser().resolve()
load_dotenv(Path(args.env_file).expanduser().resolve() if args.env_file else vault / ".env")
load_awesomegpt_prefs()
keys = list(args.item_key)
if args.item_keys:
keys.extend([key for key in re.split(r"[\s,]+", args.item_keys.strip()) if key])
limit = args.limit if args.limit != 0 else 0
if args.all:
items = all_top_items(limit)
else:
query_limit = limit or 100
items = find_items(keys, args.query, query_limit)
if not items:
fail("no Zotero items selected")
columns = [col.strip() for col in (args.columns.split(",") if args.columns else DEFAULT_COLUMNS) if col.strip()]
batches = split_batches(items, args.batch_size)
output_lines: list[str] = []
for index, batch in enumerate(batches, 1):
print(f"[{index}/{len(batches)}] summarizing {len(batch)} items", file=sys.stderr)
records = [build_source_record(item, args.fulltext_chars) for item in batch]
table = call_llm(build_prompt(records, columns))
output_lines.extend(normalize_table(table, keep_header=index == 1))
write_output("\n".join(output_lines), Path(args.out).expanduser().resolve() if args.out else None)
if __name__ == "__main__":
main()