614 lines
24 KiB
Python
614 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Generate an AI literature note from Zotero metadata and save it as a Zotero child note.
|
|
|
|
Required environment variables:
|
|
AWESOMEGPT_API_KEY DeepSeek/OpenAI-compatible API key
|
|
AWESOMEGPT_BASE_URL Example: https://api.deepseek.com
|
|
AWESOMEGPT_MODEL Example: deepseek-v4-pro
|
|
ZOTERO_API_KEY Zotero Web API key with library write permission
|
|
|
|
Optional:
|
|
ZOTERO_USER_ID If omitted, resolved from /keys/current
|
|
|
|
Private local config:
|
|
<skill>\config\config.local.json, created by scripts\init_private_config.py
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import html
|
|
import importlib.util
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
try:
|
|
sys.stdout.reconfigure(encoding="utf-8")
|
|
sys.stderr.reconfigure(encoding="utf-8")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
LOCAL_ZOTERO = "http://127.0.0.1:23119/api/users/0"
|
|
ZOTERO_WEB = "https://api.zotero.org"
|
|
DEFAULT_VAULT = Path.cwd()
|
|
SKILL_DIR = Path(__file__).resolve().parents[1]
|
|
DEFAULT_PRIVATE_CONFIG = SKILL_DIR / "config" / "config.local.json"
|
|
|
|
|
|
def fail(message: str) -> None:
|
|
print(f"error: {message}", file=sys.stderr)
|
|
raise SystemExit(1)
|
|
|
|
|
|
def load_dotenv(path: Path) -> None:
|
|
if not path.exists():
|
|
return
|
|
for raw_line in path.read_text(encoding="utf-8").splitlines():
|
|
line = raw_line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
key = key.strip()
|
|
value = value.strip().strip('"').strip("'")
|
|
os.environ.setdefault(key, value)
|
|
|
|
|
|
def load_private_config(path: Path) -> None:
|
|
if not path.exists():
|
|
return
|
|
try:
|
|
config = json.loads(path.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError as exc:
|
|
fail(f"invalid private config JSON at {path}: {exc}")
|
|
zotero = config.get("zotero", {}) if isinstance(config.get("zotero"), dict) else {}
|
|
awesomegpt = config.get("awesomegpt", {}) if isinstance(config.get("awesomegpt"), dict) else {}
|
|
mappings = {
|
|
"ZOTERO_API_KEY": zotero.get("api_key"),
|
|
"ZOTERO_USER_ID": zotero.get("user_id"),
|
|
"AWESOMEGPT_API_KEY": awesomegpt.get("api_key"),
|
|
"AWESOMEGPT_BASE_URL": awesomegpt.get("base_url"),
|
|
"AWESOMEGPT_MODEL": awesomegpt.get("model"),
|
|
}
|
|
for key, value in mappings.items():
|
|
if value:
|
|
os.environ.setdefault(key, str(value))
|
|
|
|
|
|
def zotero_profile_prefs() -> Path | None:
|
|
profiles_ini = Path.home() / "AppData/Roaming/Zotero/Zotero/profiles.ini"
|
|
profiles_root = profiles_ini.parent
|
|
if profiles_ini.exists():
|
|
text = profiles_ini.read_text(encoding="utf-8", errors="replace")
|
|
blocks = re.split(r"\n(?=\[Profile\d+\])", text)
|
|
for block in blocks:
|
|
if "Default=1" not in block:
|
|
continue
|
|
path_match = re.search(r"^Path=(.+)$", block, re.MULTILINE)
|
|
relative_match = re.search(r"^IsRelative=(\d+)$", block, re.MULTILINE)
|
|
if path_match:
|
|
profile_path = Path(path_match.group(1).strip())
|
|
if relative_match and relative_match.group(1) == "1":
|
|
profile_path = profiles_root / profile_path
|
|
prefs = profile_path / "prefs.js"
|
|
if prefs.exists():
|
|
return prefs
|
|
profiles_dir = profiles_root / "Profiles"
|
|
if profiles_dir.exists():
|
|
for prefs in profiles_dir.glob("*/prefs.js"):
|
|
return prefs
|
|
return None
|
|
|
|
|
|
def load_awesomegpt_prefs(path: Path | None = None) -> None:
|
|
if path is None:
|
|
path = zotero_profile_prefs()
|
|
if path is None:
|
|
return
|
|
if not path.exists():
|
|
return
|
|
text = path.read_text(encoding="utf-8", errors="replace")
|
|
prefs: dict[str, Any] = {}
|
|
for name, raw_value in re.findall(r'user_pref\("([^"]+)",\s*(.*?)\);', text):
|
|
if not name.startswith("extensions.zotero.zoterogpt."):
|
|
continue
|
|
try:
|
|
prefs[name] = json.loads(raw_value)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
settings_raw = prefs.get("extensions.zotero.zoterogpt.settings")
|
|
if isinstance(settings_raw, str):
|
|
try:
|
|
settings = json.loads(settings_raw)
|
|
except json.JSONDecodeError:
|
|
settings = {}
|
|
else:
|
|
settings = {}
|
|
|
|
direct_api = prefs.get("extensions.zotero.zoterogpt.api")
|
|
direct_model = prefs.get("extensions.zotero.zoterogpt.model")
|
|
direct_key = prefs.get("extensions.zotero.zoterogpt.secretKey")
|
|
provider = None
|
|
if isinstance(settings, dict):
|
|
provider = settings.get("DeepSeek") or next(
|
|
(value for key, value in settings.items() if key.lower() == "deepseek"),
|
|
None,
|
|
)
|
|
if isinstance(provider, dict):
|
|
os.environ.setdefault("AWESOMEGPT_BASE_URL", provider.get("api") or "")
|
|
os.environ.setdefault("AWESOMEGPT_MODEL", provider.get("model") or "")
|
|
os.environ.setdefault("AWESOMEGPT_API_KEY", provider.get("secretKey") or "")
|
|
if direct_api:
|
|
os.environ.setdefault("AWESOMEGPT_BASE_URL", str(direct_api))
|
|
if direct_model:
|
|
os.environ.setdefault("AWESOMEGPT_MODEL", str(direct_model))
|
|
if direct_key:
|
|
os.environ.setdefault("AWESOMEGPT_API_KEY", str(direct_key))
|
|
|
|
|
|
def http_json(
|
|
url: str,
|
|
*,
|
|
method: str = "GET",
|
|
headers: dict[str, str] | None = None,
|
|
payload: Any = None,
|
|
timeout: int = 90,
|
|
) -> Any:
|
|
body = None
|
|
req_headers = dict(headers or {})
|
|
if payload is not None:
|
|
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
req_headers.setdefault("Content-Type", "application/json")
|
|
request = urllib.request.Request(url, data=body, method=method, headers=req_headers)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=timeout) as response:
|
|
text = response.read().decode("utf-8", errors="replace")
|
|
if not text:
|
|
return None
|
|
return json.loads(text)
|
|
except urllib.error.HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="replace")
|
|
fail(f"{method} {url} failed: HTTP {exc.code}: {detail[:800]}")
|
|
except urllib.error.URLError as exc:
|
|
fail(f"{method} {url} failed: {exc}")
|
|
|
|
|
|
def zotero_local(path: str) -> Any:
|
|
url = LOCAL_ZOTERO + path
|
|
return http_json(url, headers={"Zotero-API-Version": "3"}, timeout=20)
|
|
|
|
|
|
def zotero_local_optional(path: str) -> Any | None:
|
|
url = LOCAL_ZOTERO + path
|
|
request = urllib.request.Request(url, headers={"Zotero-API-Version": "3"})
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=20) as response:
|
|
text = response.read().decode("utf-8", errors="replace")
|
|
return json.loads(text) if text else None
|
|
except (urllib.error.HTTPError, urllib.error.URLError, json.JSONDecodeError):
|
|
return None
|
|
|
|
|
|
def zotero_web(path: str, *, method: str = "GET", payload: Any = None) -> Any:
|
|
api_key = os.environ.get("ZOTERO_API_KEY")
|
|
if not api_key:
|
|
fail("ZOTERO_API_KEY is required to write Zotero child notes")
|
|
url = ZOTERO_WEB + path
|
|
return http_json(
|
|
url,
|
|
method=method,
|
|
headers={"Zotero-API-Version": "3", "Zotero-API-Key": api_key},
|
|
payload=payload,
|
|
timeout=60,
|
|
)
|
|
|
|
|
|
def resolve_user_id() -> str:
|
|
explicit = os.environ.get("ZOTERO_USER_ID")
|
|
if explicit:
|
|
return explicit
|
|
current = zotero_web("/keys/current")
|
|
user_id = current.get("userID") if isinstance(current, dict) else None
|
|
if not user_id:
|
|
fail("could not resolve Zotero userID from /keys/current")
|
|
return str(user_id)
|
|
|
|
|
|
def find_item(item_key: str | None, query: str | None) -> dict[str, Any]:
|
|
if item_key:
|
|
return zotero_local(f"/items/{urllib.parse.quote(item_key)}")
|
|
if not query:
|
|
fail("provide --item-key or --query")
|
|
qs = urllib.parse.urlencode({"q": query, "limit": 5})
|
|
matches = zotero_local(f"/items/top?{qs}")
|
|
if not matches:
|
|
fail(f"no Zotero item matched query: {query}")
|
|
if len(matches) > 1:
|
|
print(f"warning: {len(matches)} matches; using {matches[0].get('key')}", file=sys.stderr)
|
|
return matches[0]
|
|
|
|
|
|
def find_items(keys: list[str], query: str | None, limit: int) -> list[dict[str, Any]]:
|
|
items: list[dict[str, Any]] = []
|
|
for key in keys:
|
|
items.append(find_item(key, None))
|
|
if query:
|
|
qs = urllib.parse.urlencode({"q": query, "limit": limit})
|
|
matches = zotero_local(f"/items/top?{qs}")
|
|
seen = {item.get("key") for item in items}
|
|
for item in matches or []:
|
|
if item.get("key") not in seen:
|
|
items.append(item)
|
|
seen.add(item.get("key"))
|
|
if not items:
|
|
fail("provide --item-key/--item-keys or --query")
|
|
return items[:limit] if limit else items
|
|
|
|
|
|
def all_top_items(limit: int = 0) -> list[dict[str, Any]]:
|
|
items: list[dict[str, Any]] = []
|
|
start = 0
|
|
page_limit = 100
|
|
while True:
|
|
qs = urllib.parse.urlencode({"limit": page_limit, "start": start})
|
|
page = zotero_local(f"/items/top?{qs}")
|
|
if not page:
|
|
break
|
|
items.extend(page)
|
|
if limit and len(items) >= limit:
|
|
return items[:limit]
|
|
if len(page) < page_limit:
|
|
break
|
|
start += page_limit
|
|
return items
|
|
|
|
|
|
def has_existing_ai_note(parent_key: str) -> bool:
|
|
children = zotero_local(f"/items/{urllib.parse.quote(parent_key)}/children")
|
|
for child in children or []:
|
|
data = child.get("data") or {}
|
|
if data.get("itemType") != "note":
|
|
continue
|
|
note = data.get("note") or ""
|
|
if not note and child.get("key"):
|
|
full_note = zotero_local_optional(f"/items/{urllib.parse.quote(child['key'])}")
|
|
note = ((full_note or {}).get("data") or {}).get("note") or ""
|
|
if "AI文献笔记" in note or "AI Literature Note" in note or f"items/{parent_key}" in note:
|
|
return True
|
|
return False
|
|
|
|
|
|
def export_bibtex(item_key: str) -> str:
|
|
qs = urllib.parse.urlencode({"itemKey": item_key, "format": "bibtex"})
|
|
url = f"{LOCAL_ZOTERO}/items?{qs}"
|
|
request = urllib.request.Request(url, headers={"Zotero-API-Version": "3"})
|
|
with urllib.request.urlopen(request, timeout=20) as response:
|
|
return response.read().decode("utf-8", errors="replace").strip()
|
|
|
|
|
|
def local_fulltext(parent_key: str, max_chars: int) -> str:
|
|
children = zotero_local(f"/items/{urllib.parse.quote(parent_key)}/children")
|
|
parts: list[str] = []
|
|
for child in children or []:
|
|
data = child.get("data") or {}
|
|
if data.get("itemType") != "attachment":
|
|
continue
|
|
key = child.get("key")
|
|
if not key:
|
|
continue
|
|
fulltext = zotero_local_optional(f"/items/{urllib.parse.quote(key)}/fulltext")
|
|
if not fulltext:
|
|
continue
|
|
content = fulltext.get("content") if isinstance(fulltext, dict) else ""
|
|
if content:
|
|
parts.append(content)
|
|
if sum(len(p) for p in parts) >= max_chars:
|
|
break
|
|
if not parts:
|
|
for child in children or []:
|
|
data = child.get("data") or {}
|
|
if data.get("itemType") != "attachment" or data.get("contentType") != "application/pdf":
|
|
continue
|
|
path = data.get("path")
|
|
if not path:
|
|
key = child.get("key")
|
|
if key:
|
|
full = zotero_local_optional(f"/items/{urllib.parse.quote(key)}")
|
|
path = ((full or {}).get("data") or {}).get("path")
|
|
if not path:
|
|
continue
|
|
extracted = extract_pdf_text(Path(path), max_chars=max_chars)
|
|
if extracted:
|
|
parts.append(extracted)
|
|
break
|
|
text = "\n\n".join(parts)
|
|
return text[:max_chars]
|
|
|
|
|
|
def template_paths(vault: Path) -> tuple[Path, Path, Path]:
|
|
templates = vault / "00 Templater"
|
|
if not templates.exists():
|
|
fail(f"template directory not found: {templates}")
|
|
files = {path.name: path for path in templates.glob("*.md")}
|
|
|
|
def match(prefix: str, contains: str) -> Path:
|
|
candidates = [
|
|
path for name, path in files.items()
|
|
if name.startswith(prefix) and contains in name
|
|
]
|
|
if not candidates:
|
|
candidates = [path for name, path in files.items() if name.startswith(prefix)]
|
|
if not candidates:
|
|
fail(f"missing template file starting with {prefix} in {templates}")
|
|
return candidates[0]
|
|
|
|
return (
|
|
match("03", "AI"),
|
|
match("01", ""),
|
|
match("02", ""),
|
|
)
|
|
|
|
|
|
def extract_pdf_text(path: Path, max_chars: int) -> str:
|
|
if not path.exists():
|
|
return ""
|
|
max_pages = 8
|
|
if importlib.util.find_spec("fitz"):
|
|
import fitz # type: ignore
|
|
|
|
chunks = []
|
|
with fitz.open(str(path)) as doc:
|
|
for page in doc[:max_pages]:
|
|
chunks.append(page.get_text("text"))
|
|
if sum(len(chunk) for chunk in chunks) >= max_chars:
|
|
break
|
|
return "\n".join(chunks)[:max_chars]
|
|
if importlib.util.find_spec("pypdf"):
|
|
from pypdf import PdfReader # type: ignore
|
|
|
|
reader = PdfReader(str(path))
|
|
chunks = []
|
|
for page in reader.pages[:max_pages]:
|
|
chunks.append(page.extract_text() or "")
|
|
if sum(len(chunk) for chunk in chunks) >= max_chars:
|
|
break
|
|
return "\n".join(chunks)[:max_chars]
|
|
if importlib.util.find_spec("PyPDF2"):
|
|
from PyPDF2 import PdfReader # type: ignore
|
|
|
|
reader = PdfReader(str(path))
|
|
chunks = []
|
|
for page in reader.pages[:max_pages]:
|
|
chunks.append(page.extract_text() or "")
|
|
if sum(len(chunk) for chunk in chunks) >= max_chars:
|
|
break
|
|
return "\n".join(chunks)[:max_chars]
|
|
return ""
|
|
|
|
|
|
def creators_text(creators: list[dict[str, Any]]) -> str:
|
|
names = []
|
|
for creator in creators:
|
|
name = creator.get("name")
|
|
if not name:
|
|
name = " ".join(x for x in [creator.get("firstName"), creator.get("lastName")] if x)
|
|
if name:
|
|
names.append(name)
|
|
return "; ".join(names)
|
|
|
|
|
|
def build_prompt(item: dict[str, Any], bibtex: str, fulltext: str, vault: Path, mode: str) -> str:
|
|
data = item.get("data") or {}
|
|
prompt_path, research_template_path, review_template_path = template_paths(vault)
|
|
prompt = prompt_path.read_text(encoding="utf-8")
|
|
research_template = research_template_path.read_text(encoding="utf-8")
|
|
review_template = review_template_path.read_text(encoding="utf-8")
|
|
metadata = {
|
|
"zoteroKey": item.get("key"),
|
|
"title": data.get("title"),
|
|
"itemType": data.get("itemType"),
|
|
"authors": creators_text(data.get("creators") or []),
|
|
"publicationTitle": data.get("publicationTitle"),
|
|
"date": data.get("date"),
|
|
"DOI": data.get("DOI"),
|
|
"url": data.get("url"),
|
|
"abstractNote": data.get("abstractNote"),
|
|
}
|
|
source = {
|
|
"metadata": metadata,
|
|
"bibtex": bibtex,
|
|
"indexedFullTextExcerpt": fulltext,
|
|
}
|
|
instructions = [
|
|
prompt,
|
|
"请先判断文献类型:综述型文献或研究型文献。",
|
|
"若为研究型文献,请严格填充下面的研究型模板:",
|
|
research_template,
|
|
"若为综述型文献,请严格填充下面的综述型模板:",
|
|
review_template,
|
|
"请将模板中的 ${topItem.getField('title')} 替换为真实题名,将 ${topItem.key} 替换为 Zotero key。",
|
|
]
|
|
if mode == "deep":
|
|
instructions.extend(
|
|
[
|
|
"这是满血精读模式。请优先追求完整、具体、可复用的文献阅读笔记,而不是短摘要。",
|
|
"每个模板栏目都要充分展开;如果原文提供了材料配比、制备参数、测试条件、器件结构、性能数据、机理解释或对照实验,必须具体写出。",
|
|
"核心数据必须尽量保留数值和单位;不要用'显著提高'、'性能优异'这类空泛表达替代具体结果。",
|
|
"机制部分要写清因果链条:材料/结构设计如何影响微观结构、界面、导电网络、离子/电子传输、力学响应或器件输出。",
|
|
"局限与启发部分要给出可用于后续课题设计的具体启发,而不是泛泛评价。",
|
|
"如果全文摘录中缺少某项信息,请明确写'原文摘录中未提供',不要编造。",
|
|
]
|
|
)
|
|
instructions.extend(
|
|
[
|
|
"只输出最终 Markdown 笔记,不要输出解释、判断过程或代码围栏。",
|
|
"文献材料如下:",
|
|
json.dumps(source, ensure_ascii=False, indent=2),
|
|
]
|
|
)
|
|
return "\n\n".join(instructions)
|
|
|
|
|
|
def call_llm(prompt: str, max_tokens: int | None = None) -> str:
|
|
api_key = os.environ.get("AWESOMEGPT_API_KEY")
|
|
base_url = (os.environ.get("AWESOMEGPT_BASE_URL") or "").rstrip("/")
|
|
model = os.environ.get("AWESOMEGPT_MODEL")
|
|
if not api_key or not base_url or not model:
|
|
fail("AWESOMEGPT_API_KEY, AWESOMEGPT_BASE_URL, and AWESOMEGPT_MODEL are required")
|
|
if not base_url.endswith("/v1"):
|
|
base_url = base_url + "/v1"
|
|
payload = {
|
|
"model": model,
|
|
"messages": [
|
|
{"role": "system", "content": "You are a materials science literature-note assistant. Output Simplified Chinese Markdown only."},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
"temperature": 0.3,
|
|
}
|
|
if max_tokens:
|
|
payload["max_tokens"] = max_tokens
|
|
response = http_json(
|
|
base_url + "/chat/completions",
|
|
method="POST",
|
|
headers={"Authorization": f"Bearer {api_key}"},
|
|
payload=payload,
|
|
timeout=180,
|
|
)
|
|
try:
|
|
return response["choices"][0]["message"]["content"].strip()
|
|
except Exception as exc:
|
|
fail(f"unexpected LLM response shape: {exc}; response={response}")
|
|
|
|
|
|
def markdown_to_zotero_html(markdown: str) -> str:
|
|
lines = markdown.strip().splitlines()
|
|
out: list[str] = []
|
|
in_pre = False
|
|
pre_lines: list[str] = []
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
if stripped.startswith("```") or stripped.startswith("~~~"):
|
|
if in_pre:
|
|
out.append("<pre>" + html.escape("\n".join(pre_lines)) + "</pre>")
|
|
pre_lines = []
|
|
in_pre = False
|
|
else:
|
|
in_pre = True
|
|
continue
|
|
if in_pre:
|
|
pre_lines.append(line)
|
|
continue
|
|
if not stripped:
|
|
continue
|
|
heading = re.match(r"^(#{1,6})\s+(.+)$", stripped)
|
|
if heading:
|
|
level = min(len(heading.group(1)), 6)
|
|
out.append(f"<h{level}>{html.escape(heading.group(2))}</h{level}>")
|
|
elif stripped.startswith(">"):
|
|
out.append(f"<blockquote>{html.escape(stripped.lstrip('> ').strip())}</blockquote>")
|
|
elif re.match(r"^[-*]\s+", stripped):
|
|
out.append(f"<p>{html.escape(stripped)}</p>")
|
|
else:
|
|
out.append(f"<p>{html.escape(stripped)}</p>")
|
|
if in_pre:
|
|
out.append("<pre>" + html.escape("\n".join(pre_lines)) + "</pre>")
|
|
return "\n".join(out)
|
|
|
|
|
|
def create_child_note(user_id: str, parent_key: str, markdown: str, dry_run: bool) -> Any:
|
|
note_html = markdown_to_zotero_html(markdown)
|
|
payload = [{"itemType": "note", "parentItem": parent_key, "note": note_html}]
|
|
if dry_run:
|
|
return {"dryRun": True, "payload": payload}
|
|
return zotero_web(f"/users/{user_id}/items", method="POST", payload=payload)
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--item-key", action="append", default=[], help="Zotero top-level item key; can be repeated")
|
|
parser.add_argument("--item-keys", help="Comma/space separated Zotero top-level item keys")
|
|
parser.add_argument("--query", help="Search query; first top-level match is used")
|
|
parser.add_argument("--all", action="store_true", help="Process all top-level Zotero items")
|
|
parser.add_argument("--limit", type=int, default=1, help="Maximum number of items to process")
|
|
parser.add_argument("--fulltext-chars", type=int, default=12000)
|
|
parser.add_argument("--mode", choices=["quick", "deep"], default="quick", help="quick for batch notes; deep for full-detail notes")
|
|
parser.add_argument("--max-tokens", type=int, default=0, help="Optional LLM output token limit; deep mode defaults to 12000")
|
|
parser.add_argument("--skip-existing", action="store_true", help="Skip items that already have a generated AI child note")
|
|
parser.add_argument("--dry-run", action="store_true", help="generate but do not write Zotero note")
|
|
parser.add_argument("--vault", default=str(DEFAULT_VAULT), help="Obsidian vault containing 00 Templater")
|
|
parser.add_argument("--env-file", help="Optional .env path; defaults to <vault>/.env")
|
|
parser.add_argument("--config", default=str(DEFAULT_PRIVATE_CONFIG), help="Private local config JSON")
|
|
args = parser.parse_args()
|
|
|
|
vault = Path(args.vault).expanduser().resolve()
|
|
load_dotenv(Path(args.env_file).expanduser().resolve() if args.env_file else vault / ".env")
|
|
load_private_config(Path(args.config).expanduser().resolve())
|
|
load_awesomegpt_prefs()
|
|
|
|
keys = list(args.item_key)
|
|
if args.item_keys:
|
|
keys.extend([key for key in re.split(r"[\s,]+", args.item_keys.strip()) if key])
|
|
if args.all:
|
|
items = all_top_items(args.limit if args.limit != 1 else 0)
|
|
else:
|
|
items = find_items(keys, args.query, args.limit)
|
|
user_id = resolve_user_id()
|
|
results = []
|
|
for index, item in enumerate(items, 1):
|
|
try:
|
|
key = item.get("key")
|
|
title = (item.get("data") or {}).get("title")
|
|
if not key:
|
|
results.append({"index": index, "status": "skipped", "reason": "missing key", "title": title})
|
|
continue
|
|
if args.skip_existing and has_existing_ai_note(key):
|
|
results.append({"index": index, "itemKey": key, "title": title, "status": "skipped", "reason": "existing AI note"})
|
|
continue
|
|
print(f"[{index}/{len(items)}] generating {key}: {title}", file=sys.stderr)
|
|
bibtex = export_bibtex(key)
|
|
fulltext_chars = args.fulltext_chars
|
|
if args.mode == "deep" and fulltext_chars == 12000:
|
|
fulltext_chars = 80000
|
|
max_tokens = args.max_tokens
|
|
if args.mode == "deep" and not max_tokens:
|
|
max_tokens = 12000
|
|
fulltext = local_fulltext(key, fulltext_chars)
|
|
prompt = build_prompt(item, bibtex, fulltext, vault, args.mode)
|
|
markdown = call_llm(prompt, max_tokens or None)
|
|
result = create_child_note(user_id, key, markdown, args.dry_run)
|
|
results.append({"index": index, "itemKey": key, "title": title, "status": "ok", "result": result})
|
|
except SystemExit as exc:
|
|
results.append({
|
|
"index": index,
|
|
"itemKey": item.get("key"),
|
|
"title": (item.get("data") or {}).get("title"),
|
|
"status": "error",
|
|
"error": str(exc),
|
|
})
|
|
print(f"[{index}/{len(items)}] error; continuing", file=sys.stderr)
|
|
continue
|
|
except Exception as exc:
|
|
results.append({
|
|
"index": index,
|
|
"itemKey": item.get("key"),
|
|
"title": (item.get("data") or {}).get("title"),
|
|
"status": "error",
|
|
"error": repr(exc),
|
|
})
|
|
print(f"[{index}/{len(items)}] error; continuing: {exc}", file=sys.stderr)
|
|
continue
|
|
print(json.dumps(results, ensure_ascii=False, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|