""" 论文分析蓝图 - 使用 Deepseek AI 分析研究论文的 Flask 蓝图 """ from flask import Blueprint, render_template, request, jsonify import requests import json import os import tempfile from werkzeug.utils import secure_filename # 创建蓝图 paper_analysis_bp = Blueprint('paper_analysis', __name__, url_prefix='/paper-analysis') @paper_analysis_bp.route('/') def paper_analysis(): """论文分析页面""" return render_template('paper_analysis.html') @paper_analysis_bp.route('/api/test-deepseek', methods=['POST']) def test_deepseek(): """测试 Deepseek API 连接""" try: api_key = request.headers.get('X-API-Key') if not api_key: return jsonify({'error': '需要 API 密钥'}), 401 # 使用简单提示测试 API headers = { 'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json' } test_response = requests.post( 'https://api.deepseek.com/v1/chat/completions', # 使用聊天补全端点 headers=headers, json={ 'model': 'deepseek-chat', 'messages': [ {'role': 'user', 'content': '打个招呼'} ], 'max_tokens': 10 }, timeout=10 ) if test_response.ok: return jsonify({ 'success': True, 'message': 'Deepseek API 连接成功', 'response': test_response.json() }) else: return jsonify({ 'success': False, 'error': f'API 错误: {test_response.status_code}', 'details': test_response.text }), test_response.status_code except requests.exceptions.RequestException as e: return jsonify({ 'success': False, 'error': '连接错误', 'details': str(e) }), 500 except Exception as e: return jsonify({ 'success': False, 'error': '意外错误', 'details': str(e) }), 500 @paper_analysis_bp.route('/api/analyze-paper', methods=['POST']) def analyze_paper(): """使用 Deepseek API 分析论文""" try: # 从请求头获取 API 密钥 api_key = request.headers.get('X-API-Key') if not api_key: return jsonify({'error': '需要 API 密钥'}), 401 # 获取分析选项 extract_keywords = request.form.get('extract_keywords') == 'true' generate_summary = request.form.get('generate_summary') == 'true' find_related = request.form.get('find_related') == 'true' # 获取上传的文件 if 'file' not in request.files: return jsonify({'error': '没有上传文件'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': '没有选择文件'}), 400 # 检查文件大小(限制为 50MB) file.seek(0, 2) # 定位到文件末尾 file_size = file.tell() file.seek(0) # 重置到开头 if file_size > 50 * 1024 * 1024: # 50MB 限制 return jsonify({'error': '文件过大。最大大小为 50MB'}), 400 if file_size == 0: return jsonify({'error': '文件为空'}), 400 # 临时保存文件 filename = secure_filename(file.filename) print(f"正在处理文件: {filename} (大小: {file_size} 字节)") with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as temp_file: file.save(temp_file.name) file_path = temp_file.name try: # 初始化结果字典 results = { 'keywords': [], 'summary': '', 'related_works': [] } # 从文件提取文本 text = extract_text_from_file(file_path, filename) if not text or not text.strip(): return jsonify({'error': '无法从文件中提取文本。文件可能为空或格式不受支持。'}), 400 # 清理文本 text = text.strip() print(f"截断前的最终文本长度: {len(text)}") # 检查是否有足够的文本 if len(text) < 100: return jsonify({ 'error': '提取的文本过短。这可能是没有 OCR 文本的扫描 PDF。请确保您的 PDF 包含可选择的文本,而不仅仅是图像。' }), 400 # 处理大型文档 text = process_large_document(text) # 为每个请求的分析调用 Deepseek API headers = { 'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json' } if extract_keywords: results['keywords'] = extract_keywords_with_deepseek(text, headers) if generate_summary: results['summary'] = generate_summary_with_deepseek(text, headers) if find_related: results['related_works'] = extract_formulas_with_deepseek(text, headers) return jsonify(results) finally: # 清理临时文件 try: os.unlink(file_path) except Exception as e: print(f"删除临时文件时出错: {str(e)}") except Exception as e: print(f"分析论文时出错: {str(e)}") return jsonify({'error': str(e)}), 500 def extract_text_from_file(file_path, filename): """从各种文件格式中提取文本""" text = '' file_ext = os.path.splitext(filename)[1].lower() try: if file_ext == '.pdf': text = extract_pdf_text(file_path) elif file_ext in ['.docx', '.doc']: text = extract_word_text(file_path, file_ext) elif file_ext == '.rtf': text = extract_rtf_text(file_path) elif file_ext in ['.tex', '.latex']: text = extract_latex_text(file_path) elif file_ext in ['.md', '.markdown']: text = extract_markdown_text(file_path) else: text = extract_plain_text(file_path) except Exception as e: print(f"文件处理错误: {str(e)}") raise Exception(f"读取文件时出错: {str(e)}") return text def extract_pdf_text(file_path): """从 PDF 文件提取文本""" try: from PyPDF2 import PdfReader reader = PdfReader(file_path) text = '' num_pages = len(reader.pages) print(f"PDF 有 {num_pages} 页") for i, page in enumerate(reader.pages): try: page_text = page.extract_text() if page_text: text += page_text + '\n' print(f"已提取第 {i+1}/{num_pages} 页") except Exception as page_error: print(f"提取第 {i+1} 页时出错: {str(page_error)}") continue print(f"提取的总文本长度: {len(text)}") return text except ImportError: # 尝试替代的 PDF 库 try: import pdfplumber text = '' with pdfplumber.open(file_path) as pdf: for page in pdf.pages: page_text = page.extract_text() if page_text: text += page_text + '\n' return text except ImportError: raise Exception('PDF 处理不可用。请安装 PyPDF2 或 pdfplumber。') except Exception as pdf_error: print(f"PDF 提取错误: {str(pdf_error)}") # 尝试使用 PyMuPDF 作为后备方案 try: import fitz # PyMuPDF pdf_document = fitz.open(file_path) text = '' for page_num in range(pdf_document.page_count): page = pdf_document[page_num] text += page.get_text() + '\n' pdf_document.close() return text except ImportError: raise Exception(f'无法从 PDF 提取文本: {str(pdf_error)}。请尝试安装 PyMuPDF。') except Exception as mupdf_error: raise Exception(f'PDF 提取失败: {str(pdf_error)}') def extract_word_text(file_path, file_ext): """从 Word 文档提取文本""" try: if file_ext == '.docx': from docx import Document doc = Document(file_path) return '\n'.join([paragraph.text for paragraph in doc.paragraphs]) else: # .doc 文件 try: import docx2txt return docx2txt.process(file_path) except ImportError: raise Exception('DOC 文件支持需要 docx2txt。请使用以下命令安装: pip install docx2txt') except ImportError: raise Exception('Word 文档支持需要 python-docx。请使用以下命令安装: pip install python-docx') except Exception as docx_error: raise Exception(f'读取 Word 文档时出错: {str(docx_error)}') def extract_rtf_text(file_path): """从 RTF 文件提取文本""" try: import striprtf with open(file_path, 'r', encoding='utf-8') as f: rtf_content = f.read() return striprtf.rtf_to_text(rtf_content) except ImportError: raise Exception('RTF 支持需要 striprtf。请使用以下命令安装: pip install striprtf') except Exception as rtf_error: raise Exception(f'读取 RTF 文件时出错: {str(rtf_error)}') def extract_latex_text(file_path): """从 LaTeX 文件提取文本""" try: with open(file_path, 'r', encoding='utf-8') as f: tex_content = f.read() # 基础 LaTeX 清理 - 移除常见命令 import re text = tex_content # 移除注释 text = re.sub(r'%.*$', '', text, flags=re.MULTILINE) # 移除常见 LaTeX 命令但保留内容 text = re.sub(r'\\(begin|end)\{[^}]+\}', '', text) text = re.sub(r'\\[a-zA-Z]+\*?\{([^}]+)\}', r'\1', text) text = re.sub(r'\\[a-zA-Z]+\*?', '', text) return text except Exception as tex_error: raise Exception(f'读取 LaTeX 文件时出错: {str(tex_error)}') def extract_markdown_text(file_path): """从 Markdown 文件提取文本""" try: with open(file_path, 'r', encoding='utf-8') as f: text = f.read() # 清理 markdown 语法 import re # 移除图片链接 text = re.sub(r'!\[[^\]]*\]\([^)]+\)', '', text) # 将链接转换为纯文本 text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text) return text except Exception as md_error: raise Exception(f'读取 Markdown 文件时出错: {str(md_error)}') def extract_plain_text(file_path): """从纯文本文件提取文本""" encodings = ['utf-8', 'utf-16', 'gbk', 'gb2312', 'big5', 'latin-1'] text = None for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: text = f.read() print(f"使用 {encoding} 编码成功读取文件") break except UnicodeDecodeError: continue except Exception as e: print(f"使用 {encoding} 读取时出错: {str(e)}") continue if text is None: # 尝试以二进制方式读取并解码 with open(file_path, 'rb') as f: binary_content = f.read() try: text = binary_content.decode('utf-8', errors='ignore') except: text = str(binary_content) return text def process_large_document(text): """通过优先处理公式提取来处理大型文档""" if len(text) > 98000: print("检测到大型文档,优先处理内容以进行公式提取") # 尝试找到包含公式的章节(常见模式) import re # 查找数学内容指示符 math_sections = [] lines = text.split('\n') for i, line in enumerate(lines): if re.search(r'[=+\-*/∫∑∏√∂∇∆λμσπ]|equation|formula|theorem|lemma|proof', line, re.IGNORECASE): # 包含周围上下文 start = max(0, i-5) end = min(len(lines), i+6) math_sections.extend(lines[start:end]) if math_sections: # 使用数学内容丰富的章节以更好地提取公式 math_text = '\n'.join(math_sections) if len(math_text) > 50000: # 仍然太长 text = math_text[:98000] else: # 将数学章节与文档开头结合 remaining_space = 98000 - len(math_text) text = text[:remaining_space] + '\n\n[数学内容章节:]\n' + math_text else: # 未找到数学指示符,使用第一部分 text = text[:98000] return text def extract_keywords_with_deepseek(text, headers): """使用 Deepseek API 提取关键词""" try: keyword_messages = [ { 'role': 'system', 'content': '你是一个有助于从学术论文中提取关键词的助手。始终以有效的 JSON 格式响应。' }, { 'role': 'user', 'content': f"""分析以下学术论文并提取关键术语和概念。 对于每个关键词,提供一个介于 0 到 1 之间的相关性分数。 仅返回具有 'text' 和 'score' 属性的有效 JSON 对象数组。 示例格式: [{{"text": "machine learning", "score": 0.95}}, {{"text": "neural networks", "score": 0.85}}] 论文文本: {text}""" } ] keyword_response = requests.post( 'https://api.deepseek.com/v1/chat/completions', headers=headers, json={ 'model': 'deepseek-chat', 'messages': keyword_messages, 'temperature': 0.3, 'max_tokens': 4000 }, timeout=60 ) if keyword_response.ok: response_content = keyword_response.json()['choices'][0]['message']['content'] try: # 尝试从响应中提取 JSON import re json_match = re.search(r'\[.*\]', response_content, re.DOTALL) if json_match: return json.loads(json_match.group()) else: return json.loads(response_content) except json.JSONDecodeError: print(f"来自关键词 API 的无效 JSON: {response_content}") return [] else: print(f"关键词 API 错误: {keyword_response.text}") return [] except Exception as e: print(f"关键词提取错误: {str(e)}") return [] def generate_summary_with_deepseek(text, headers): """使用 Deepseek API 生成摘要""" try: summary_messages = [ { 'role': 'system', 'content': '你是一个有助于总结学术论文的助手。' }, { 'role': 'user', 'content': f"""提供以下学术论文的全面摘要。 重点关注主要贡献、方法和关键发现。 保持回应简洁且结构良好。 论文文本: {text}""" } ] summary_response = requests.post( 'https://api.deepseek.com/v1/chat/completions', headers=headers, json={ 'model': 'deepseek-chat', 'messages': summary_messages, 'temperature': 0.3, 'max_tokens': 4000 }, timeout=60 ) if summary_response.ok: return summary_response.json()['choices'][0]['message']['content'] else: print(f"摘要 API 错误: {summary_response.text}") return "生成摘要时出错" except Exception as e: print(f"摘要生成错误: {str(e)}") return "生成摘要时出错" def extract_formulas_with_deepseek(text, headers): """使用 Deepseek API 提取公式""" try: related_messages = [ { 'role': 'system', 'content': '''你是一位专业的数学家和 AI 助手,专门从学术论文中提取数学公式。 你的任务是识别并提取给定文本中所有的数学公式、方程式和数学表达式,尽可能多地提取。 重要说明: 1. 提取你找到的每一个数学公式、方程式或表达式 2. 包括内联公式、显示方程和数学定义 3. 尽可能保留原始符号 4. 对于每个公式,提供其表示内容的上下文 5. 始终以有效的 JSON 格式响应 你必须彻底并提取所有公式,而不仅仅是主要的公式。''' }, { 'role': 'user', 'content': f"""从以下论文文本中提取所有的数学公式和方程式。 对于找到的每个公式,请提供: - 公式本身(尽可能使用 LaTeX 表示法) - 详细描述,解释公式表示的内容以及每个变量的含义 - 出现位置的上下文或章节 - 它是定义、定理、引理还是一般方程 - 解释公式目的的中文描述 返回一个 JSON 数组,其中每个元素具有以下属性: - "formula": 数学表达式(使用 LaTeX 表示法) - "description": 公式表示或计算的内容 - "variables": 对公式中每个变量/符号含义的详细解释 - "variables_chinese": 变量解释的中文翻译(与 variables 结构相同) - "type": 其中之一 ["definition", "theorem", "lemma", "equation", "inequality", "identity", "other"] - "context": 关于其使用位置/方式的简要上下文 - "chinese_description": 关于公式及其目的的综合中文描述 示例格式: [ {{ "formula": "E = mc^2", "description": "爱因斯坦质能等价关系", "variables": {{"E": "energy (joules)", "m": "mass (kilograms)", "c": "speed of light in vacuum (≈3×10^8 m/s)"}}, "variables_chinese": {{"E": "能量 (焦耳)", "m": "质量 (千克)", "c": "真空中的光速 (≈3×10^8 m/s)"}}, "type": "equation", "context": "狭义相对论基本方程", "chinese_description": "爱因斯坦质能等价公式,表示质量和能量之间的等价关系" }}, {{ "formula": "F = ma", "description": "牛顿第二运动定律", "variables": {{"F": "net force (newtons)", "m": "mass (kilograms)", "a": "acceleration (m/s²)"}}, "variables_chinese": {{"F": "净力 (牛顿)", "m": "质量 (千克)", "a": "加速度 (m/s²)"}}, "type": "equation", "context": "经典力学基本定律", "chinese_description": "牛顿第二定律,描述物体受力与加速度的关系" }} ] 论文文本: {text} 重要说明: 1. 提取每一个公式,即使是简单的如 "x + y = z" 或 "f(x) = ax + b" 2. 对于公式中的每个变量或符号,解释其代表什么 3. 相关时包括测量单位 4. 提供解释公式重要性的全面中文描述 5. 在变量解释中要彻底且详细""" } ] related_response = requests.post( 'https://api.deepseek.com/v1/chat/completions', headers=headers, json={ 'model': 'deepseek-chat', 'messages': related_messages, 'temperature': 0.1, # 较低的温度以获得更一致的提取 'max_tokens': 4000 # 增加令牌限制以获取更多公式 }, timeout=120 # 增加超时时间以处理大型文档 ) if related_response.ok: response_content = related_response.json()['choices'][0]['message']['content'] try: # 尝试从响应中提取 JSON import re # 在响应中查找 JSON 数组 json_match = re.search(r'\[[\s\S]*\]', response_content) if json_match: formulas = json.loads(json_match.group()) return formulas else: # 尝试直接 JSON 解析 return json.loads(response_content) except json.JSONDecodeError as e: print(f"来自公式 API 的无效 JSON: {response_content}") print(f"JSON 错误: {str(e)}") return [] else: print(f"公式 API 错误: {related_response.text}") return [] except Exception as e: print(f"公式提取错误: {str(e)}") return []