You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
570 lines
21 KiB
570 lines
21 KiB
"""
|
|
论文分析蓝图 - 使用 Deepseek AI 分析研究论文的 Flask 蓝图
|
|
"""
|
|
|
|
from flask import Blueprint, render_template, request, jsonify
|
|
import requests
|
|
import json
|
|
import os
|
|
import tempfile
|
|
from werkzeug.utils import secure_filename
|
|
|
|
# 创建蓝图
|
|
paper_analysis_bp = Blueprint('paper_analysis', __name__, url_prefix='/paper-analysis')
|
|
|
|
@paper_analysis_bp.route('/')
|
|
def paper_analysis():
|
|
"""论文分析页面"""
|
|
return render_template('paper_analysis.html')
|
|
|
|
@paper_analysis_bp.route('/api/test-deepseek', methods=['POST'])
|
|
def test_deepseek():
|
|
"""测试 Deepseek API 连接"""
|
|
try:
|
|
api_key = request.headers.get('X-API-Key')
|
|
if not api_key:
|
|
return jsonify({'error': '需要 API 密钥'}), 401
|
|
|
|
# 使用简单提示测试 API
|
|
headers = {
|
|
'Authorization': f'Bearer {api_key}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
test_response = requests.post(
|
|
'https://api.deepseek.com/v1/chat/completions', # 使用聊天补全端点
|
|
headers=headers,
|
|
json={
|
|
'model': 'deepseek-chat',
|
|
'messages': [
|
|
{'role': 'user', 'content': '打个招呼'}
|
|
],
|
|
'max_tokens': 10
|
|
},
|
|
timeout=10
|
|
)
|
|
|
|
if test_response.ok:
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Deepseek API 连接成功',
|
|
'response': test_response.json()
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'API 错误: {test_response.status_code}',
|
|
'details': test_response.text
|
|
}), test_response.status_code
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': '连接错误',
|
|
'details': str(e)
|
|
}), 500
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': '意外错误',
|
|
'details': str(e)
|
|
}), 500
|
|
|
|
@paper_analysis_bp.route('/api/analyze-paper', methods=['POST'])
|
|
def analyze_paper():
|
|
"""使用 Deepseek API 分析论文"""
|
|
try:
|
|
# 从请求头获取 API 密钥
|
|
api_key = request.headers.get('X-API-Key')
|
|
if not api_key:
|
|
return jsonify({'error': '需要 API 密钥'}), 401
|
|
|
|
# 获取分析选项
|
|
extract_keywords = request.form.get('extract_keywords') == 'true'
|
|
generate_summary = request.form.get('generate_summary') == 'true'
|
|
find_related = request.form.get('find_related') == 'true'
|
|
|
|
# 获取上传的文件
|
|
if 'file' not in request.files:
|
|
return jsonify({'error': '没有上传文件'}), 400
|
|
|
|
file = request.files['file']
|
|
if file.filename == '':
|
|
return jsonify({'error': '没有选择文件'}), 400
|
|
|
|
# 检查文件大小(限制为 50MB)
|
|
file.seek(0, 2) # 定位到文件末尾
|
|
file_size = file.tell()
|
|
file.seek(0) # 重置到开头
|
|
|
|
if file_size > 50 * 1024 * 1024: # 50MB 限制
|
|
return jsonify({'error': '文件过大。最大大小为 50MB'}), 400
|
|
|
|
if file_size == 0:
|
|
return jsonify({'error': '文件为空'}), 400
|
|
|
|
# 临时保存文件
|
|
filename = secure_filename(file.filename)
|
|
print(f"正在处理文件: {filename} (大小: {file_size} 字节)")
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as temp_file:
|
|
file.save(temp_file.name)
|
|
file_path = temp_file.name
|
|
|
|
try:
|
|
# 初始化结果字典
|
|
results = {
|
|
'keywords': [],
|
|
'summary': '',
|
|
'related_works': []
|
|
}
|
|
|
|
# 从文件提取文本
|
|
text = extract_text_from_file(file_path, filename)
|
|
|
|
if not text or not text.strip():
|
|
return jsonify({'error': '无法从文件中提取文本。文件可能为空或格式不受支持。'}), 400
|
|
|
|
# 清理文本
|
|
text = text.strip()
|
|
print(f"截断前的最终文本长度: {len(text)}")
|
|
|
|
# 检查是否有足够的文本
|
|
if len(text) < 100:
|
|
return jsonify({
|
|
'error': '提取的文本过短。这可能是没有 OCR 文本的扫描 PDF。请确保您的 PDF 包含可选择的文本,而不仅仅是图像。'
|
|
}), 400
|
|
|
|
# 处理大型文档
|
|
text = process_large_document(text)
|
|
|
|
# 为每个请求的分析调用 Deepseek API
|
|
headers = {
|
|
'Authorization': f'Bearer {api_key}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
if extract_keywords:
|
|
results['keywords'] = extract_keywords_with_deepseek(text, headers)
|
|
|
|
if generate_summary:
|
|
results['summary'] = generate_summary_with_deepseek(text, headers)
|
|
|
|
if find_related:
|
|
results['related_works'] = extract_formulas_with_deepseek(text, headers)
|
|
|
|
return jsonify(results)
|
|
|
|
finally:
|
|
# 清理临时文件
|
|
try:
|
|
os.unlink(file_path)
|
|
except Exception as e:
|
|
print(f"删除临时文件时出错: {str(e)}")
|
|
|
|
except Exception as e:
|
|
print(f"分析论文时出错: {str(e)}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
def extract_text_from_file(file_path, filename):
|
|
"""从各种文件格式中提取文本"""
|
|
text = ''
|
|
file_ext = os.path.splitext(filename)[1].lower()
|
|
|
|
try:
|
|
if file_ext == '.pdf':
|
|
text = extract_pdf_text(file_path)
|
|
elif file_ext in ['.docx', '.doc']:
|
|
text = extract_word_text(file_path, file_ext)
|
|
elif file_ext == '.rtf':
|
|
text = extract_rtf_text(file_path)
|
|
elif file_ext in ['.tex', '.latex']:
|
|
text = extract_latex_text(file_path)
|
|
elif file_ext in ['.md', '.markdown']:
|
|
text = extract_markdown_text(file_path)
|
|
else:
|
|
text = extract_plain_text(file_path)
|
|
|
|
except Exception as e:
|
|
print(f"文件处理错误: {str(e)}")
|
|
raise Exception(f"读取文件时出错: {str(e)}")
|
|
|
|
return text
|
|
|
|
def extract_pdf_text(file_path):
|
|
"""从 PDF 文件提取文本"""
|
|
try:
|
|
from PyPDF2 import PdfReader
|
|
reader = PdfReader(file_path)
|
|
text = ''
|
|
num_pages = len(reader.pages)
|
|
print(f"PDF 有 {num_pages} 页")
|
|
|
|
for i, page in enumerate(reader.pages):
|
|
try:
|
|
page_text = page.extract_text()
|
|
if page_text:
|
|
text += page_text + '\n'
|
|
print(f"已提取第 {i+1}/{num_pages} 页")
|
|
except Exception as page_error:
|
|
print(f"提取第 {i+1} 页时出错: {str(page_error)}")
|
|
continue
|
|
|
|
print(f"提取的总文本长度: {len(text)}")
|
|
return text
|
|
|
|
except ImportError:
|
|
# 尝试替代的 PDF 库
|
|
try:
|
|
import pdfplumber
|
|
text = ''
|
|
with pdfplumber.open(file_path) as pdf:
|
|
for page in pdf.pages:
|
|
page_text = page.extract_text()
|
|
if page_text:
|
|
text += page_text + '\n'
|
|
return text
|
|
except ImportError:
|
|
raise Exception('PDF 处理不可用。请安装 PyPDF2 或 pdfplumber。')
|
|
except Exception as pdf_error:
|
|
print(f"PDF 提取错误: {str(pdf_error)}")
|
|
# 尝试使用 PyMuPDF 作为后备方案
|
|
try:
|
|
import fitz # PyMuPDF
|
|
pdf_document = fitz.open(file_path)
|
|
text = ''
|
|
for page_num in range(pdf_document.page_count):
|
|
page = pdf_document[page_num]
|
|
text += page.get_text() + '\n'
|
|
pdf_document.close()
|
|
return text
|
|
except ImportError:
|
|
raise Exception(f'无法从 PDF 提取文本: {str(pdf_error)}。请尝试安装 PyMuPDF。')
|
|
except Exception as mupdf_error:
|
|
raise Exception(f'PDF 提取失败: {str(pdf_error)}')
|
|
|
|
def extract_word_text(file_path, file_ext):
|
|
"""从 Word 文档提取文本"""
|
|
try:
|
|
if file_ext == '.docx':
|
|
from docx import Document
|
|
doc = Document(file_path)
|
|
return '\n'.join([paragraph.text for paragraph in doc.paragraphs])
|
|
else:
|
|
# .doc 文件
|
|
try:
|
|
import docx2txt
|
|
return docx2txt.process(file_path)
|
|
except ImportError:
|
|
raise Exception('DOC 文件支持需要 docx2txt。请使用以下命令安装: pip install docx2txt')
|
|
except ImportError:
|
|
raise Exception('Word 文档支持需要 python-docx。请使用以下命令安装: pip install python-docx')
|
|
except Exception as docx_error:
|
|
raise Exception(f'读取 Word 文档时出错: {str(docx_error)}')
|
|
|
|
def extract_rtf_text(file_path):
|
|
"""从 RTF 文件提取文本"""
|
|
try:
|
|
import striprtf
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
rtf_content = f.read()
|
|
return striprtf.rtf_to_text(rtf_content)
|
|
except ImportError:
|
|
raise Exception('RTF 支持需要 striprtf。请使用以下命令安装: pip install striprtf')
|
|
except Exception as rtf_error:
|
|
raise Exception(f'读取 RTF 文件时出错: {str(rtf_error)}')
|
|
|
|
def extract_latex_text(file_path):
|
|
"""从 LaTeX 文件提取文本"""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
tex_content = f.read()
|
|
# 基础 LaTeX 清理 - 移除常见命令
|
|
import re
|
|
text = tex_content
|
|
# 移除注释
|
|
text = re.sub(r'%.*$', '', text, flags=re.MULTILINE)
|
|
# 移除常见 LaTeX 命令但保留内容
|
|
text = re.sub(r'\\(begin|end)\{[^}]+\}', '', text)
|
|
text = re.sub(r'\\[a-zA-Z]+\*?\{([^}]+)\}', r'\1', text)
|
|
text = re.sub(r'\\[a-zA-Z]+\*?', '', text)
|
|
return text
|
|
except Exception as tex_error:
|
|
raise Exception(f'读取 LaTeX 文件时出错: {str(tex_error)}')
|
|
|
|
def extract_markdown_text(file_path):
|
|
"""从 Markdown 文件提取文本"""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
text = f.read()
|
|
# 清理 markdown 语法
|
|
import re
|
|
# 移除图片链接
|
|
text = re.sub(r'!\[[^\]]*\]\([^)]+\)', '', text)
|
|
# 将链接转换为纯文本
|
|
text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text)
|
|
return text
|
|
except Exception as md_error:
|
|
raise Exception(f'读取 Markdown 文件时出错: {str(md_error)}')
|
|
|
|
def extract_plain_text(file_path):
|
|
"""从纯文本文件提取文本"""
|
|
encodings = ['utf-8', 'utf-16', 'gbk', 'gb2312', 'big5', 'latin-1']
|
|
text = None
|
|
|
|
for encoding in encodings:
|
|
try:
|
|
with open(file_path, 'r', encoding=encoding) as f:
|
|
text = f.read()
|
|
print(f"使用 {encoding} 编码成功读取文件")
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
except Exception as e:
|
|
print(f"使用 {encoding} 读取时出错: {str(e)}")
|
|
continue
|
|
|
|
if text is None:
|
|
# 尝试以二进制方式读取并解码
|
|
with open(file_path, 'rb') as f:
|
|
binary_content = f.read()
|
|
try:
|
|
text = binary_content.decode('utf-8', errors='ignore')
|
|
except:
|
|
text = str(binary_content)
|
|
|
|
return text
|
|
|
|
def process_large_document(text):
|
|
"""通过优先处理公式提取来处理大型文档"""
|
|
if len(text) > 98000:
|
|
print("检测到大型文档,优先处理内容以进行公式提取")
|
|
# 尝试找到包含公式的章节(常见模式)
|
|
import re
|
|
# 查找数学内容指示符
|
|
math_sections = []
|
|
lines = text.split('\n')
|
|
for i, line in enumerate(lines):
|
|
if re.search(r'[=+\-*/∫∑∏√∂∇∆λμσπ]|equation|formula|theorem|lemma|proof', line, re.IGNORECASE):
|
|
# 包含周围上下文
|
|
start = max(0, i-5)
|
|
end = min(len(lines), i+6)
|
|
math_sections.extend(lines[start:end])
|
|
|
|
if math_sections:
|
|
# 使用数学内容丰富的章节以更好地提取公式
|
|
math_text = '\n'.join(math_sections)
|
|
if len(math_text) > 50000: # 仍然太长
|
|
text = math_text[:98000]
|
|
else:
|
|
# 将数学章节与文档开头结合
|
|
remaining_space = 98000 - len(math_text)
|
|
text = text[:remaining_space] + '\n\n[数学内容章节:]\n' + math_text
|
|
else:
|
|
# 未找到数学指示符,使用第一部分
|
|
text = text[:98000]
|
|
|
|
return text
|
|
|
|
def extract_keywords_with_deepseek(text, headers):
|
|
"""使用 Deepseek API 提取关键词"""
|
|
try:
|
|
keyword_messages = [
|
|
{
|
|
'role': 'system',
|
|
'content': '你是一个有助于从学术论文中提取关键词的助手。始终以有效的 JSON 格式响应。'
|
|
},
|
|
{
|
|
'role': 'user',
|
|
'content': f"""分析以下学术论文并提取关键术语和概念。
|
|
对于每个关键词,提供一个介于 0 到 1 之间的相关性分数。
|
|
仅返回具有 'text' 和 'score' 属性的有效 JSON 对象数组。
|
|
示例格式: [{{"text": "machine learning", "score": 0.95}}, {{"text": "neural networks", "score": 0.85}}]
|
|
|
|
论文文本:
|
|
{text}"""
|
|
}
|
|
]
|
|
|
|
keyword_response = requests.post(
|
|
'https://api.deepseek.com/v1/chat/completions',
|
|
headers=headers,
|
|
json={
|
|
'model': 'deepseek-chat',
|
|
'messages': keyword_messages,
|
|
'temperature': 0.3,
|
|
'max_tokens': 4000
|
|
},
|
|
timeout=60
|
|
)
|
|
|
|
if keyword_response.ok:
|
|
response_content = keyword_response.json()['choices'][0]['message']['content']
|
|
try:
|
|
# 尝试从响应中提取 JSON
|
|
import re
|
|
json_match = re.search(r'\[.*\]', response_content, re.DOTALL)
|
|
if json_match:
|
|
return json.loads(json_match.group())
|
|
else:
|
|
return json.loads(response_content)
|
|
except json.JSONDecodeError:
|
|
print(f"来自关键词 API 的无效 JSON: {response_content}")
|
|
return []
|
|
else:
|
|
print(f"关键词 API 错误: {keyword_response.text}")
|
|
return []
|
|
|
|
except Exception as e:
|
|
print(f"关键词提取错误: {str(e)}")
|
|
return []
|
|
|
|
def generate_summary_with_deepseek(text, headers):
|
|
"""使用 Deepseek API 生成摘要"""
|
|
try:
|
|
summary_messages = [
|
|
{
|
|
'role': 'system',
|
|
'content': '你是一个有助于总结学术论文的助手。'
|
|
},
|
|
{
|
|
'role': 'user',
|
|
'content': f"""提供以下学术论文的全面摘要。
|
|
重点关注主要贡献、方法和关键发现。
|
|
保持回应简洁且结构良好。
|
|
|
|
论文文本:
|
|
{text}"""
|
|
}
|
|
]
|
|
|
|
summary_response = requests.post(
|
|
'https://api.deepseek.com/v1/chat/completions',
|
|
headers=headers,
|
|
json={
|
|
'model': 'deepseek-chat',
|
|
'messages': summary_messages,
|
|
'temperature': 0.3,
|
|
'max_tokens': 4000
|
|
},
|
|
timeout=60
|
|
)
|
|
|
|
if summary_response.ok:
|
|
return summary_response.json()['choices'][0]['message']['content']
|
|
else:
|
|
print(f"摘要 API 错误: {summary_response.text}")
|
|
return "生成摘要时出错"
|
|
|
|
except Exception as e:
|
|
print(f"摘要生成错误: {str(e)}")
|
|
return "生成摘要时出错"
|
|
|
|
def extract_formulas_with_deepseek(text, headers):
|
|
"""使用 Deepseek API 提取公式"""
|
|
try:
|
|
related_messages = [
|
|
{
|
|
'role': 'system',
|
|
'content': '''你是一位专业的数学家和 AI 助手,专门从学术论文中提取数学公式。
|
|
你的任务是识别并提取给定文本中所有的数学公式、方程式和数学表达式,尽可能多地提取。
|
|
|
|
重要说明:
|
|
1. 提取你找到的每一个数学公式、方程式或表达式
|
|
2. 包括内联公式、显示方程和数学定义
|
|
3. 尽可能保留原始符号
|
|
4. 对于每个公式,提供其表示内容的上下文
|
|
5. 始终以有效的 JSON 格式响应
|
|
|
|
你必须彻底并提取所有公式,而不仅仅是主要的公式。'''
|
|
},
|
|
{
|
|
'role': 'user',
|
|
'content': f"""从以下论文文本中提取所有的数学公式和方程式。
|
|
|
|
对于找到的每个公式,请提供:
|
|
- 公式本身(尽可能使用 LaTeX 表示法)
|
|
- 详细描述,解释公式表示的内容以及每个变量的含义
|
|
- 出现位置的上下文或章节
|
|
- 它是定义、定理、引理还是一般方程
|
|
- 解释公式目的的中文描述
|
|
|
|
返回一个 JSON 数组,其中每个元素具有以下属性:
|
|
- "formula": 数学表达式(使用 LaTeX 表示法)
|
|
- "description": 公式表示或计算的内容
|
|
- "variables": 对公式中每个变量/符号含义的详细解释
|
|
- "variables_chinese": 变量解释的中文翻译(与 variables 结构相同)
|
|
- "type": 其中之一 ["definition", "theorem", "lemma", "equation", "inequality", "identity", "other"]
|
|
- "context": 关于其使用位置/方式的简要上下文
|
|
- "chinese_description": 关于公式及其目的的综合中文描述
|
|
|
|
示例格式:
|
|
[
|
|
{{
|
|
"formula": "E = mc^2",
|
|
"description": "爱因斯坦质能等价关系",
|
|
"variables": {{"E": "energy (joules)", "m": "mass (kilograms)", "c": "speed of light in vacuum (≈3×10^8 m/s)"}},
|
|
"variables_chinese": {{"E": "能量 (焦耳)", "m": "质量 (千克)", "c": "真空中的光速 (≈3×10^8 m/s)"}},
|
|
"type": "equation",
|
|
"context": "狭义相对论基本方程",
|
|
"chinese_description": "爱因斯坦质能等价公式,表示质量和能量之间的等价关系"
|
|
}},
|
|
{{
|
|
"formula": "F = ma",
|
|
"description": "牛顿第二运动定律",
|
|
"variables": {{"F": "net force (newtons)", "m": "mass (kilograms)", "a": "acceleration (m/s²)"}},
|
|
"variables_chinese": {{"F": "净力 (牛顿)", "m": "质量 (千克)", "a": "加速度 (m/s²)"}},
|
|
"type": "equation",
|
|
"context": "经典力学基本定律",
|
|
"chinese_description": "牛顿第二定律,描述物体受力与加速度的关系"
|
|
}}
|
|
]
|
|
|
|
论文文本:
|
|
{text}
|
|
|
|
重要说明:
|
|
1. 提取每一个公式,即使是简单的如 "x + y = z" 或 "f(x) = ax + b"
|
|
2. 对于公式中的每个变量或符号,解释其代表什么
|
|
3. 相关时包括测量单位
|
|
4. 提供解释公式重要性的全面中文描述
|
|
5. 在变量解释中要彻底且详细"""
|
|
}
|
|
]
|
|
|
|
related_response = requests.post(
|
|
'https://api.deepseek.com/v1/chat/completions',
|
|
headers=headers,
|
|
json={
|
|
'model': 'deepseek-chat',
|
|
'messages': related_messages,
|
|
'temperature': 0.1, # 较低的温度以获得更一致的提取
|
|
'max_tokens': 4000 # 增加令牌限制以获取更多公式
|
|
},
|
|
timeout=120 # 增加超时时间以处理大型文档
|
|
)
|
|
|
|
if related_response.ok:
|
|
response_content = related_response.json()['choices'][0]['message']['content']
|
|
try:
|
|
# 尝试从响应中提取 JSON
|
|
import re
|
|
# 在响应中查找 JSON 数组
|
|
json_match = re.search(r'\[[\s\S]*\]', response_content)
|
|
if json_match:
|
|
formulas = json.loads(json_match.group())
|
|
return formulas
|
|
else:
|
|
# 尝试直接 JSON 解析
|
|
return json.loads(response_content)
|
|
except json.JSONDecodeError as e:
|
|
print(f"来自公式 API 的无效 JSON: {response_content}")
|
|
print(f"JSON 错误: {str(e)}")
|
|
return []
|
|
else:
|
|
print(f"公式 API 错误: {related_response.text}")
|
|
return []
|
|
|
|
except Exception as e:
|
|
print(f"公式提取错误: {str(e)}")
|
|
return [] |