import os
import aiohttp
import markdown2
import json
import re
from playwright.async_api import async_playwright
from ncatbot.plugin_system import NcatBotPlugin, command_registry, group_filter
from ncatbot.core.event import BaseMessageEvent
from ncatbot.utils import get_log
# 初始化日志
LOG = get_log("ArticlePlugin")
class ArticlePlugin(NcatBotPlugin):
name = "ArticlePlugin"
version = "1.7.0"
dependencies = {}
# 设置 API 基础路径
API_BASE = "https://api.luogu.me"
# 采用 MathJax v3 与 GitHub Markdown CSS
HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/github-markdown-css/5.2.0/github-markdown.min.css">
<script>
window.MathJax = {{
tex: {{
inlineMath: [['$', '$'], ['\\\\(', '\\\\)']],
displayMath: [['$$', '$$'], ['\\\\[', '\\\\]']],
processEscapes: true
}},
svg: {{ fontCache: 'global' }}
}};
</script>
<script src="https://cdn.jsdelivr.net/npm/mathjax@3/es5/tex-svg.js"></script>
<style>
.markdown-body {{
box-sizing: border-box;
min-width: 200px;
max-width: 850px;
margin: 0 auto;
padding: 45px;
background: white;
}}
.article-header {{
margin-bottom: 30px;
padding-bottom: 20px;
border-bottom: 1px solid #eaecef;
}}
.meta-info {{
color: #57606a;
font-size: 14px;
margin-top: 10px;
}}
.footer {{
margin-top: 50px;
padding-top: 20px;
border-top: 1px dashed #ddd;
text-align: center;
color: #888;
font-size: 12px;
}}
</style>
</head>
<body class="markdown-body">
<div class="article-header">
<h1>{title}</h1>
<div class="meta-info">
文章ID: {id} | 作者: {authorId} | 最后更新: {updatedAt} | 点赞量: {upvote}
</div>
</div>
<div class="content">
{content_html}
</div>
<div class="footer">
Luogu Saver Bot v1.7 | 机器人由 NapCat&NcatBOT 强力驱动 | 拉群请联系QQ 1712076972<br>
</div>
</body>
</html>
"""
async def on_load(self):
LOG.info("文章查询插件已加载喵~")
# ID 提取
def _extract_id(self, link: str) -> str:
"""
从链接或文本中智能提取文章 ID
支持多种链接格式及纯 ID 输入
"""
# 正则匹配不同域名下的 article/XXXXXX 部分
match = re.search(r"article/([a-zA-Z0-9]+)", link.strip())
if match:
return match.group(1)
# 如果不是链接,则直接返回原始输入的字符串(视为纯 ID)
return link.strip()
async def render_to_image(self, data, output_path):
"""调用指定路径的 Edge 浏览器进行 MathJax 渲染截图"""
# 1. 将 Markdown 转换为 HTML
content_html = markdown2.markdown(
data.get('content', ''),
extras=["fenced-code-blocks", "tables", "break-on-newline"]
)
# 2. 注入数据到模板 (处理 CSS/JS 中的双大括号)
full_html = self.HTML_TEMPLATE.format(
title=data.get('title', '无标题'),
id=data.get('id', 'N/A'),
authorId=data.get('authorId', '未知'),
updatedAt=data.get('updatedAt', '未知'),
upvote=data.get('upvote', 0),
content_html=content_html
)
async with async_playwright() as p:
# 使用您提供的 Edge 浏览器精确路径
edge_path = r"C:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe"
browser = await p.chromium.launch(executable_path=edge_path)
page = await browser.new_page()
await page.set_content(full_html)
# 3. 等待 MathJax 渲染完成 (由于是 SVG 渲染,建议预留足够时间)
await page.wait_for_timeout(2000)
# 对 body 区域进行截图
element = await page.query_selector("body")
await element.screenshot(path=output_path)
await browser.close()
@group_filter
@command_registry.command("查看文章")
async def query_article(self, event: BaseMessageEvent, article_id: str):
url = f"{self.API_BASE}/article/query/{article_id}"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
res_json = await response.json()
if res_json.get("code") == 200 or res_json.get("success"):
data = res_json.get("data", {})
# 生成临时图片存放路径
temp_img = os.path.join(os.getcwd(), f"article_{article_id}.png")
id = data.get("id")
title = data.get("title")
authorId = data.get("authorId")
upvote = data.get("upvote")
updatedAt = data.get("updatedAt")
# 发送中间状态提醒
await event.reply(
f"\n获取成功!\n文章ID {id} | 文章标题 {title} | 作者ID {authorId} \n图片渲染要等一会儿去了你别急..."
)
# 渲染并发送图片
await self.render_to_image(data, temp_img)
await event.reply(image=temp_img)
# 清理临时文件
if os.path.exists(temp_img):
os.remove(temp_img)
else:
await event.reply(f"查询失败:{res_json.get('message', '未知错误')} \n LGS-NG和现行保存站数据暂不同步,如果出现Article Not Found请先使用 /保存文章 保存后再查看哦喵~" )
else:
await event.reply(f"网络请求失败,状态码:{response.status}")
except Exception as e:
LOG.error(f"渲染文章出错: {e}")
await event.reply(f"渲染过程中出错了喵:{str(e)}")
# 保存文章
@group_filter # 限制仅在群聊中触发
@command_registry.command("保存文章") # 注册保存命令
async def save_article(self, event: BaseMessageEvent, link: str):
"""
当接收到 "保存文章 {link}" 时触发
link 参数将自动获取命令后的内容
"""
article_id = self._extract_id(link)
save_url = f"{self.API_BASE}/task/create"
# 构造 article 类保存任务的请求载荷
request = {
"type": "save",
"payload": {
"target": "article",
"targetId": f"{article_id}",
"metadata": {},
}
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(save_url, json=request) as response:
if response.status == 200 or response.status == 201:
res_json = await response.json()
# 检查业务返回码
if res_json.get("code") == 200 or res_json.get("success"):
await event.reply(f"任务创建成功!\n文章ID:{article_id}\n任务类型:文章保存 \n 内容保存可能需要10秒或者更久,稍后你可以通过查看命令来查看保存的内容。")
else:
await event.reply(f"任务创建失败:{res_json.get('message', '接口返回异常')}")
else:
await event.reply(f"网络请求失败:{response.status}")
except Exception as e:
LOG.error(f"创建保存任务出错: {e}")
await event.reply(f"发生错误:{str(e)}")
__all__ = ["ArticlePlugin"]
没有回复内容