QQnote-skill/scripts/generate_zotero_ai_note.py

653 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Generate an AI literature note from Zotero metadata and save it as a Zotero child note.
Required environment variables:
AWESOMEGPT_API_KEY DeepSeek/OpenAI-compatible API key
AWESOMEGPT_BASE_URL Example: https://api.deepseek.com
AWESOMEGPT_MODEL Example: deepseek-v4-pro
ZOTERO_API_KEY Zotero Web API key with library write permission
Optional:
ZOTERO_USER_ID If omitted, resolved from /keys/current
Private local config:
<skill>\config\config.local.json, created by scripts\init_private_config.py
"""
from __future__ import annotations
import argparse
import html
import importlib.util
import json
import os
import re
import sys
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
try:
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
except Exception:
pass
LOCAL_ZOTERO = "http://127.0.0.1:23119/api/users/0"
ZOTERO_WEB = "https://api.zotero.org"
DEFAULT_VAULT = Path.cwd()
SKILL_DIR = Path(__file__).resolve().parents[1]
DEFAULT_PRIVATE_CONFIG = SKILL_DIR / "config" / "config.local.json"
WORKSPACE_PYDEPS = Path.home() / "Documents" / "Obsidian Vault" / ".codex_tmp" / "pydeps"
if WORKSPACE_PYDEPS.exists():
sys.path.insert(0, str(WORKSPACE_PYDEPS))
def fail(message: str) -> None:
print(f"error: {message}", file=sys.stderr)
raise SystemExit(1)
def load_dotenv(path: Path) -> None:
if not path.exists():
return
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
os.environ.setdefault(key, value)
def load_private_config(path: Path) -> None:
if not path.exists():
return
try:
config = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
fail(f"invalid private config JSON at {path}: {exc}")
zotero = config.get("zotero", {}) if isinstance(config.get("zotero"), dict) else {}
awesomegpt = config.get("awesomegpt", {}) if isinstance(config.get("awesomegpt"), dict) else {}
mappings = {
"ZOTERO_API_KEY": zotero.get("api_key"),
"ZOTERO_USER_ID": zotero.get("user_id"),
"AWESOMEGPT_API_KEY": awesomegpt.get("api_key"),
"AWESOMEGPT_BASE_URL": awesomegpt.get("base_url"),
"AWESOMEGPT_MODEL": awesomegpt.get("model"),
}
for key, value in mappings.items():
if value:
os.environ.setdefault(key, str(value))
def zotero_profile_prefs() -> Path | None:
profiles_ini = Path.home() / "AppData/Roaming/Zotero/Zotero/profiles.ini"
profiles_root = profiles_ini.parent
if profiles_ini.exists():
text = profiles_ini.read_text(encoding="utf-8", errors="replace")
blocks = re.split(r"\n(?=\[Profile\d+\])", text)
for block in blocks:
if "Default=1" not in block:
continue
path_match = re.search(r"^Path=(.+)$", block, re.MULTILINE)
relative_match = re.search(r"^IsRelative=(\d+)$", block, re.MULTILINE)
if path_match:
profile_path = Path(path_match.group(1).strip())
if relative_match and relative_match.group(1) == "1":
profile_path = profiles_root / profile_path
prefs = profile_path / "prefs.js"
if prefs.exists():
return prefs
profiles_dir = profiles_root / "Profiles"
if profiles_dir.exists():
for prefs in profiles_dir.glob("*/prefs.js"):
return prefs
return None
def load_awesomegpt_prefs(path: Path | None = None) -> None:
if path is None:
path = zotero_profile_prefs()
if path is None:
return
if not path.exists():
return
text = path.read_text(encoding="utf-8", errors="replace")
prefs: dict[str, Any] = {}
for name, raw_value in re.findall(r'user_pref\("([^"]+)",\s*(.*?)\);', text):
if not name.startswith("extensions.zotero.zoterogpt."):
continue
try:
prefs[name] = json.loads(raw_value)
except json.JSONDecodeError:
continue
settings_raw = prefs.get("extensions.zotero.zoterogpt.settings")
if isinstance(settings_raw, str):
try:
settings = json.loads(settings_raw)
except json.JSONDecodeError:
settings = {}
else:
settings = {}
direct_api = prefs.get("extensions.zotero.zoterogpt.api")
direct_model = prefs.get("extensions.zotero.zoterogpt.model")
direct_key = prefs.get("extensions.zotero.zoterogpt.secretKey")
provider = None
if isinstance(settings, dict):
provider = settings.get("DeepSeek") or next(
(value for key, value in settings.items() if key.lower() == "deepseek"),
None,
)
if isinstance(provider, dict):
os.environ.setdefault("AWESOMEGPT_BASE_URL", provider.get("api") or "")
os.environ.setdefault("AWESOMEGPT_MODEL", provider.get("model") or "")
os.environ.setdefault("AWESOMEGPT_API_KEY", provider.get("secretKey") or "")
if direct_api:
os.environ.setdefault("AWESOMEGPT_BASE_URL", str(direct_api))
if direct_model:
os.environ.setdefault("AWESOMEGPT_MODEL", str(direct_model))
if direct_key:
os.environ.setdefault("AWESOMEGPT_API_KEY", str(direct_key))
def http_json(
url: str,
*,
method: str = "GET",
headers: dict[str, str] | None = None,
payload: Any = None,
timeout: int = 90,
) -> Any:
body = None
req_headers = dict(headers or {})
if payload is not None:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req_headers.setdefault("Content-Type", "application/json")
request = urllib.request.Request(url, data=body, method=method, headers=req_headers)
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
text = response.read().decode("utf-8", errors="replace")
if not text:
return None
return json.loads(text)
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
fail(f"{method} {url} failed: HTTP {exc.code}: {detail[:800]}")
except urllib.error.URLError as exc:
fail(f"{method} {url} failed: {exc}")
def zotero_local(path: str) -> Any:
url = LOCAL_ZOTERO + path
return http_json(url, headers={"Zotero-API-Version": "3"}, timeout=20)
def zotero_local_optional(path: str) -> Any | None:
url = LOCAL_ZOTERO + path
request = urllib.request.Request(url, headers={"Zotero-API-Version": "3"})
try:
with urllib.request.urlopen(request, timeout=20) as response:
text = response.read().decode("utf-8", errors="replace")
return json.loads(text) if text else None
except (urllib.error.HTTPError, urllib.error.URLError, json.JSONDecodeError):
return None
def zotero_web(path: str, *, method: str = "GET", payload: Any = None) -> Any:
api_key = os.environ.get("ZOTERO_API_KEY")
if not api_key:
fail("ZOTERO_API_KEY is required to write Zotero child notes")
url = ZOTERO_WEB + path
return http_json(
url,
method=method,
headers={"Zotero-API-Version": "3", "Zotero-API-Key": api_key},
payload=payload,
timeout=60,
)
def resolve_user_id() -> str:
explicit = os.environ.get("ZOTERO_USER_ID")
if explicit:
return explicit
current = zotero_web("/keys/current")
user_id = current.get("userID") if isinstance(current, dict) else None
if not user_id:
fail("could not resolve Zotero userID from /keys/current")
return str(user_id)
def find_item(item_key: str | None, query: str | None) -> dict[str, Any]:
if item_key:
return zotero_local(f"/items/{urllib.parse.quote(item_key)}")
if not query:
fail("provide --item-key or --query")
qs = urllib.parse.urlencode({"q": query, "limit": 5})
matches = zotero_local(f"/items/top?{qs}")
if not matches:
fail(f"no Zotero item matched query: {query}")
if len(matches) > 1:
print(f"warning: {len(matches)} matches; using {matches[0].get('key')}", file=sys.stderr)
return matches[0]
def find_items(keys: list[str], query: str | None, limit: int) -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
for key in keys:
items.append(find_item(key, None))
if query:
qs = urllib.parse.urlencode({"q": query, "limit": limit})
matches = zotero_local(f"/items/top?{qs}")
seen = {item.get("key") for item in items}
for item in matches or []:
if item.get("key") not in seen:
items.append(item)
seen.add(item.get("key"))
if not items:
fail("provide --item-key/--item-keys or --query")
return items[:limit] if limit else items
def all_top_items(limit: int = 0) -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
start = 0
page_limit = 100
while True:
qs = urllib.parse.urlencode({"limit": page_limit, "start": start})
page = zotero_local(f"/items/top?{qs}")
if not page:
break
items.extend(page)
if limit and len(items) >= limit:
return items[:limit]
if len(page) < page_limit:
break
start += page_limit
return items
def existing_ai_notes(parent_key: str) -> list[dict[str, Any]]:
children = zotero_local(f"/items/{urllib.parse.quote(parent_key)}/children")
matches: list[dict[str, Any]] = []
for child in children or []:
data = child.get("data") or {}
if data.get("itemType") != "note":
continue
note = data.get("note") or ""
if not note and child.get("key"):
full_note = zotero_local_optional(f"/items/{urllib.parse.quote(child['key'])}")
note = ((full_note or {}).get("data") or {}).get("note") or ""
if "AI文献笔记" in note or "AI Literature Note" in note or f"items/{parent_key}" in note:
matches.append(child)
return matches
def has_existing_ai_note(parent_key: str) -> bool:
return bool(existing_ai_notes(parent_key))
def export_bibtex(item_key: str) -> str:
qs = urllib.parse.urlencode({"itemKey": item_key, "format": "bibtex"})
url = f"{LOCAL_ZOTERO}/items?{qs}"
request = urllib.request.Request(url, headers={"Zotero-API-Version": "3"})
with urllib.request.urlopen(request, timeout=20) as response:
return response.read().decode("utf-8", errors="replace").strip()
def local_fulltext(parent_key: str, max_chars: int) -> str:
children = zotero_local(f"/items/{urllib.parse.quote(parent_key)}/children")
parts: list[str] = []
for child in children or []:
data = child.get("data") or {}
if data.get("itemType") != "attachment":
continue
key = child.get("key")
if not key:
continue
fulltext = zotero_local_optional(f"/items/{urllib.parse.quote(key)}/fulltext")
if not fulltext:
continue
content = fulltext.get("content") if isinstance(fulltext, dict) else ""
if content:
parts.append(content)
if sum(len(p) for p in parts) >= max_chars:
break
if not parts:
for child in children or []:
data = child.get("data") or {}
if data.get("itemType") != "attachment" or data.get("contentType") != "application/pdf":
continue
path = data.get("path")
if not path:
key = child.get("key")
if key:
full = zotero_local_optional(f"/items/{urllib.parse.quote(key)}")
path = ((full or {}).get("data") or {}).get("path")
extracted = extract_pdf_text(resolve_attachment_path(child, path), max_chars=max_chars)
if extracted:
parts.append(extracted)
break
text = "\n\n".join(parts)
return text[:max_chars]
def resolve_attachment_path(child: dict[str, Any], path: str | None) -> Path:
if path:
candidate = Path(path)
if candidate.exists():
return candidate
key = child.get("key")
if key:
storage_dir = Path.home() / "Zotero" / "storage" / str(key)
if storage_dir.exists():
pdfs = list(storage_dir.glob("*.pdf"))
if pdfs:
return pdfs[0]
return Path(path or "")
def template_paths(vault: Path) -> tuple[Path, Path, Path]:
templates = vault / "00 Templater"
if not templates.exists():
fail(f"template directory not found: {templates}")
files = {path.name: path for path in templates.glob("*.md")}
def match(prefix: str, contains: str) -> Path:
candidates = [
path for name, path in files.items()
if name.startswith(prefix) and contains in name
]
if not candidates:
candidates = [path for name, path in files.items() if name.startswith(prefix)]
if not candidates:
fail(f"missing template file starting with {prefix} in {templates}")
return candidates[0]
return (
match("03", "AI"),
match("01", ""),
match("02", ""),
)
def extract_pdf_text(path: Path, max_chars: int) -> str:
if not path.exists():
return ""
max_pages = 80 if max_chars >= 50000 else 12
if importlib.util.find_spec("fitz"):
import fitz # type: ignore
chunks = []
with fitz.open(str(path)) as doc:
for page in doc[:max_pages]:
chunks.append(page.get_text("text"))
if sum(len(chunk) for chunk in chunks) >= max_chars:
break
return "\n".join(chunks)[:max_chars]
if importlib.util.find_spec("pypdf"):
from pypdf import PdfReader # type: ignore
reader = PdfReader(str(path))
chunks = []
for page in reader.pages[:max_pages]:
chunks.append(page.extract_text() or "")
if sum(len(chunk) for chunk in chunks) >= max_chars:
break
return "\n".join(chunks)[:max_chars]
if importlib.util.find_spec("PyPDF2"):
from PyPDF2 import PdfReader # type: ignore
reader = PdfReader(str(path))
chunks = []
for page in reader.pages[:max_pages]:
chunks.append(page.extract_text() or "")
if sum(len(chunk) for chunk in chunks) >= max_chars:
break
return "\n".join(chunks)[:max_chars]
return ""
def creators_text(creators: list[dict[str, Any]]) -> str:
names = []
for creator in creators:
name = creator.get("name")
if not name:
name = " ".join(x for x in [creator.get("firstName"), creator.get("lastName")] if x)
if name:
names.append(name)
return "; ".join(names)
def build_prompt(item: dict[str, Any], bibtex: str, fulltext: str, vault: Path, mode: str) -> str:
data = item.get("data") or {}
prompt_path, research_template_path, review_template_path = template_paths(vault)
prompt = prompt_path.read_text(encoding="utf-8")
research_template = research_template_path.read_text(encoding="utf-8")
review_template = review_template_path.read_text(encoding="utf-8")
metadata = {
"zoteroKey": item.get("key"),
"title": data.get("title"),
"itemType": data.get("itemType"),
"authors": creators_text(data.get("creators") or []),
"publicationTitle": data.get("publicationTitle"),
"date": data.get("date"),
"DOI": data.get("DOI"),
"url": data.get("url"),
"abstractNote": data.get("abstractNote"),
}
source = {
"metadata": metadata,
"bibtex": bibtex,
"indexedFullTextExcerpt": fulltext,
}
instructions = [
prompt,
"请先判断文献类型:综述型文献或研究型文献。",
"若为研究型文献,请严格填充下面的研究型模板:",
research_template,
"若为综述型文献,请严格填充下面的综述型模板:",
review_template,
"请将模板中的 ${topItem.getField('title')} 替换为真实题名,将 ${topItem.key} 替换为 Zotero key。",
]
if mode == "deep":
instructions.extend(
[
"这是满血精读模式。请优先追求完整、具体、可复用的文献阅读笔记,而不是短摘要。",
"每个模板栏目都要充分展开;如果原文提供了材料配比、制备参数、测试条件、器件结构、性能数据、机理解释或对照实验,必须具体写出。",
"核心数据必须尽量保留数值和单位;不要用'显著提高''性能优异'这类空泛表达替代具体结果。",
"机制部分要写清因果链条:材料/结构设计如何影响微观结构、界面、导电网络、离子/电子传输、力学响应或器件输出。",
"局限与启发部分要给出可用于后续课题设计的具体启发,而不是泛泛评价。",
"如果全文摘录中缺少某项信息,请明确写'原文摘录中未提供',不要编造。",
]
)
instructions.extend(
[
"只输出最终 Markdown 笔记,不要输出解释、判断过程或代码围栏。",
"文献材料如下:",
json.dumps(source, ensure_ascii=False, indent=2),
]
)
return "\n\n".join(instructions)
def call_llm(prompt: str, max_tokens: int | None = None) -> str:
api_key = os.environ.get("AWESOMEGPT_API_KEY")
base_url = (os.environ.get("AWESOMEGPT_BASE_URL") or "").rstrip("/")
model = os.environ.get("AWESOMEGPT_MODEL")
if not api_key or not base_url or not model:
fail("AWESOMEGPT_API_KEY, AWESOMEGPT_BASE_URL, and AWESOMEGPT_MODEL are required")
if not base_url.endswith("/v1"):
base_url = base_url + "/v1"
payload = {
"model": model,
"messages": [
{"role": "system", "content": "You are a materials science literature-note assistant. Output Simplified Chinese Markdown only."},
{"role": "user", "content": prompt},
],
"temperature": 0.3,
}
if max_tokens:
payload["max_tokens"] = max_tokens
response = http_json(
base_url + "/chat/completions",
method="POST",
headers={"Authorization": f"Bearer {api_key}"},
payload=payload,
timeout=180,
)
try:
return response["choices"][0]["message"]["content"].strip()
except Exception as exc:
fail(f"unexpected LLM response shape: {exc}; response={response}")
def markdown_to_zotero_html(markdown: str) -> str:
lines = markdown.strip().splitlines()
out: list[str] = []
in_pre = False
pre_lines: list[str] = []
for line in lines:
stripped = line.strip()
if stripped.startswith("```") or stripped.startswith("~~~"):
if in_pre:
out.append("<pre>" + html.escape("\n".join(pre_lines)) + "</pre>")
pre_lines = []
in_pre = False
else:
in_pre = True
continue
if in_pre:
pre_lines.append(line)
continue
if not stripped:
continue
heading = re.match(r"^(#{1,6})\s+(.+)$", stripped)
if heading:
level = min(len(heading.group(1)), 6)
out.append(f"<h{level}>{html.escape(heading.group(2))}</h{level}>")
elif stripped.startswith(">"):
out.append(f"<blockquote>{html.escape(stripped.lstrip('> ').strip())}</blockquote>")
elif re.match(r"^[-*]\s+", stripped):
out.append(f"<p>{html.escape(stripped)}</p>")
else:
out.append(f"<p>{html.escape(stripped)}</p>")
if in_pre:
out.append("<pre>" + html.escape("\n".join(pre_lines)) + "</pre>")
return "\n".join(out)
def create_child_note(user_id: str, parent_key: str, markdown: str, dry_run: bool) -> Any:
note_html = markdown_to_zotero_html(markdown)
payload = [{"itemType": "note", "parentItem": parent_key, "note": note_html}]
if dry_run:
return {"dryRun": True, "payload": payload}
return zotero_web(f"/users/{user_id}/items", method="POST", payload=payload)
def update_child_note(user_id: str, note_key: str, markdown: str, dry_run: bool) -> Any:
note = zotero_web(f"/users/{user_id}/items/{urllib.parse.quote(note_key)}")
data = note.get("data") if isinstance(note, dict) else None
if not isinstance(data, dict):
fail(f"could not load existing note {note_key}")
data["note"] = markdown_to_zotero_html(markdown)
if dry_run:
return {"dryRun": True, "itemKey": note_key, "payload": data}
return zotero_web(f"/users/{user_id}/items/{urllib.parse.quote(note_key)}", method="PUT", payload=data)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--item-key", action="append", default=[], help="Zotero top-level item key; can be repeated")
parser.add_argument("--item-keys", help="Comma/space separated Zotero top-level item keys")
parser.add_argument("--query", help="Search query; first top-level match is used")
parser.add_argument("--all", action="store_true", help="Process all top-level Zotero items")
parser.add_argument("--limit", type=int, default=1, help="Maximum number of items to process")
parser.add_argument("--fulltext-chars", type=int, default=12000)
parser.add_argument("--mode", choices=["quick", "deep"], default="quick", help="quick for batch notes; deep for full-detail notes")
parser.add_argument("--max-tokens", type=int, default=0, help="Optional LLM output token limit; deep mode defaults to 12000")
parser.add_argument("--skip-existing", action="store_true", help="Skip items that already have a generated AI child note")
parser.add_argument("--replace-existing", action="store_true", help="Update the first existing AI child note instead of creating a duplicate")
parser.add_argument("--dry-run", action="store_true", help="generate but do not write Zotero note")
parser.add_argument("--vault", default=str(DEFAULT_VAULT), help="Obsidian vault containing 00 Templater")
parser.add_argument("--env-file", help="Optional .env path; defaults to <vault>/.env")
parser.add_argument("--config", default=str(DEFAULT_PRIVATE_CONFIG), help="Private local config JSON")
args = parser.parse_args()
vault = Path(args.vault).expanduser().resolve()
load_dotenv(Path(args.env_file).expanduser().resolve() if args.env_file else vault / ".env")
load_private_config(Path(args.config).expanduser().resolve())
load_awesomegpt_prefs()
keys = list(args.item_key)
if args.item_keys:
keys.extend([key for key in re.split(r"[\s,]+", args.item_keys.strip()) if key])
if args.all:
items = all_top_items(args.limit if args.limit != 1 else 0)
else:
items = find_items(keys, args.query, args.limit)
user_id = resolve_user_id()
results = []
for index, item in enumerate(items, 1):
try:
key = item.get("key")
title = (item.get("data") or {}).get("title")
if not key:
results.append({"index": index, "status": "skipped", "reason": "missing key", "title": title})
continue
existing_notes = existing_ai_notes(key)
if args.skip_existing and existing_notes:
results.append({"index": index, "itemKey": key, "title": title, "status": "skipped", "reason": "existing AI note"})
continue
print(f"[{index}/{len(items)}] generating {key}: {title}", file=sys.stderr)
bibtex = export_bibtex(key)
fulltext_chars = args.fulltext_chars
if args.mode == "deep" and fulltext_chars == 12000:
fulltext_chars = 80000
max_tokens = args.max_tokens
if args.mode == "deep" and not max_tokens:
max_tokens = 12000
fulltext = local_fulltext(key, fulltext_chars)
prompt = build_prompt(item, bibtex, fulltext, vault, args.mode)
markdown = call_llm(prompt, max_tokens or None)
if args.replace_existing and existing_notes:
note_key = existing_notes[0].get("key")
result = update_child_note(user_id, str(note_key), markdown, args.dry_run)
results.append({"index": index, "itemKey": key, "title": title, "status": "ok", "action": "updated", "noteKey": note_key, "result": result})
else:
result = create_child_note(user_id, key, markdown, args.dry_run)
results.append({"index": index, "itemKey": key, "title": title, "status": "ok", "action": "created", "result": result})
except SystemExit as exc:
results.append({
"index": index,
"itemKey": item.get("key"),
"title": (item.get("data") or {}).get("title"),
"status": "error",
"error": str(exc),
})
print(f"[{index}/{len(items)}] error; continuing", file=sys.stderr)
continue
except Exception as exc:
results.append({
"index": index,
"itemKey": item.get("key"),
"title": (item.get("data") or {}).get("title"),
"status": "error",
"error": repr(exc),
})
print(f"[{index}/{len(items)}] error; continuing: {exc}", file=sys.stderr)
continue
print(json.dumps(results, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()