读取Excel 进行汇总

This commit is contained in:
2026-03-05 22:02:57 +08:00
commit 01b7759921
40 changed files with 3843 additions and 0 deletions

326
main.py Normal file
View File

@@ -0,0 +1,326 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
财务报表数据处理主程序
功能:
1. 自动识别并分类处理资产负债表、利润表和现金流量表
2. 将处理结果合并到一个 Excel 文件的不同 Tab 页
3. 自动清理文件名,去除冗余文字
4. 统一数据单位为亿保留2位小数
5. 通过配置指定公司名称,从 input/公司名 和 config/公司名 读取数据
"""
import pandas as pd
import json
from pathlib import Path
import re
import process_balance_sheet
import process_income_statement
import process_cash_flow
# ==================== 公司配置 ====================
# 方式1: 直接在此处配置公司名称(对应 input 和 config 下的文件夹名)
# 方式2: 在项目根目录的 config.json 中配置 "company": "公司名",会覆盖此处配置
COMPANY_NAME = "泡泡玛特"
def get_company_name(script_dir):
"""
获取当前要处理的公司名称
优先从 config.json 读取,否则使用入口文件中的 COMPANY_NAME
"""
config_file = script_dir / 'config.json'
if config_file.exists():
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
company = config.get('company')
if company:
print(f"从 config.json 读取公司配置: {company}")
return company
except (json.JSONDecodeError, IOError) as e:
print(f"⚠️ 读取 config.json 失败: {e},使用默认配置")
return COMPANY_NAME
def clean_filename(filename):
"""
清理文件名,去除"合并报表_"等内容
"""
# 去除扩展名
name = Path(filename).stem
# 去除"合并报表_"
name = name.replace('合并报表_', '')
name = name.replace('合并报表', '')
# 去除多余的下划线和空格
name = re.sub(r'_+', '_', name) # 多个下划线替换为一个
name = name.strip('_') # 去除首尾下划线
return name
def classify_file(filename):
"""
根据文件名分类文件类型
返回: 'balance_sheet', 'income_statement', 'cash_flow', 或 None
"""
filename_lower = filename.lower()
if '资产负债表' in filename or 'balance' in filename_lower:
return 'balance_sheet'
elif '利润表' in filename or 'income' in filename_lower:
return 'income_statement'
elif '现金流量表' in filename or ('cash' in filename_lower and 'flow' in filename_lower):
return 'cash_flow'
else:
return None
def process_file(file_path, file_type, config_dir):
"""
处理单个文件
返回: pandas.DataFrame
"""
if file_type == 'balance_sheet':
print(f"\n{'='*60}")
print(f"资产负债表处理: {file_path.name}")
print(f"{'='*60}")
config_file = config_dir / 'balance_sheet_categories.json'
# 调用资产负债表处理函数
df_result = process_balance_sheet.process_balance_sheet_return_df(
str(file_path),
str(config_file)
)
return df_result
elif file_type == 'income_statement':
print(f"\n{'='*60}")
print(f"利润表处理: {file_path.name}")
print(f"{'='*60}")
config_file = config_dir / 'income_statement_config.json'
# 调用利润表处理函数
df_result = process_income_statement.process_income_statement_return_df(
str(file_path),
str(config_file)
)
return df_result
elif file_type == 'cash_flow':
print(f"\n{'='*60}")
print(f"现金流量表处理: {file_path.name}")
print(f"{'='*60}")
config_file = config_dir / 'cash_flow_config.json'
# 调用现金流量表处理函数
df_result = process_cash_flow.process_cash_flow_return_df(
str(file_path),
str(config_file)
)
return df_result
return None
def merge_to_excel(balance_sheet_df, income_statement_df, cash_flow_df, output_file, company_name):
"""
将资产负债表、利润表和现金流量表合并到一个 Excel 文件的不同 Tab 页
"""
print(f"\n{'='*60}")
print(f"合并 Excel 文件")
print(f"{'='*60}")
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
if balance_sheet_df is not None:
balance_sheet_df.to_excel(
writer,
sheet_name='资产负债表',
index=False,
header=False
)
print(f"✓ 已添加【资产负债表】Tab{len(balance_sheet_df)}")
if income_statement_df is not None:
income_statement_df.to_excel(
writer,
sheet_name='利润表',
index=False,
header=False
)
print(f"✓ 已添加【利润表】Tab{len(income_statement_df)}")
if cash_flow_df is not None:
cash_flow_df.to_excel(
writer,
sheet_name='现金流量表',
index=False,
header=True # 现金流量表保留列名
)
print(f"✓ 已添加【现金流量表】Tab{len(cash_flow_df)}")
print(f"\n合并完成!输出文件: {output_file}")
def main():
"""主函数"""
print("="*60)
print("财务报表数据处理工具")
print("="*60)
# 获取脚本所在目录
script_dir = Path(__file__).parent
# 获取公司名称(从 config.json 或入口配置)
company_name = get_company_name(script_dir)
# 按公司设置路径input/公司名、config/公司名、output
input_dir = script_dir / 'input' / company_name
config_dir = script_dir / 'config' / company_name
output_dir = script_dir / 'output'
# 创建输出目录
output_dir.mkdir(exist_ok=True)
# 检查 input 目录
if not input_dir.exists():
print(f"错误: input 目录不存在: {input_dir}")
print(f"请确保 input/{company_name} 文件夹存在,并包含 Excel/CSV 数据文件")
return
# 检查 config 目录
if not config_dir.exists():
print(f"错误: config 目录不存在: {config_dir}")
print(f"请确保 config/{company_name} 文件夹存在,并包含配置文件")
return
# 获取该公司目录下的所有 CSV 和 Excel 文件
input_files = list(input_dir.glob('*.csv')) + \
list(input_dir.glob('*.xlsx')) + \
list(input_dir.glob('*.xls'))
if not input_files:
print(f"错误: 在 {input_dir} 目录下未找到任何文件")
return
print(f"\n当前公司: {company_name}")
print(f"数据目录: {input_dir}")
print(f"配置目录: {config_dir}")
print(f"找到 {len(input_files)} 个文件待处理\n")
# 按文件类型分类(资产负债表、利润表、现金流量表)
files_by_type = {} # {'balance': file, 'income': file, 'cash_flow': file}
for file_path in input_files:
file_type = classify_file(file_path.name)
if file_type is None:
print(f"⚠️ 跳过: {file_path.name} (无法识别文件类型)")
continue
if file_type == 'balance_sheet':
files_by_type['balance'] = file_path
print(f"✓ 识别为【资产负债表】: {file_path.name}")
elif file_type == 'income_statement':
files_by_type['income'] = file_path
print(f"✓ 识别为【利润表】: {file_path.name}")
elif file_type == 'cash_flow':
files_by_type['cash_flow'] = file_path
print(f"✓ 识别为【现金流量表】: {file_path.name}")
# 处理文件
print(f"\n{'='*60}")
print(f"处理公司: {company_name}")
print(f"{'='*60}")
balance_sheet_df = None
income_statement_df = None
cash_flow_df = None
# 处理资产负债表
if 'balance' in files_by_type:
try:
balance_sheet_df = process_file(
files_by_type['balance'],
'balance_sheet',
config_dir
)
except Exception as e:
print(f"❌ 处理资产负债表时出错: {e}")
import traceback
traceback.print_exc()
# 处理利润表
if 'income' in files_by_type:
try:
income_statement_df = process_file(
files_by_type['income'],
'income_statement',
config_dir
)
except Exception as e:
print(f"❌ 处理利润表时出错: {e}")
import traceback
traceback.print_exc()
# 处理现金流量表
if 'cash_flow' in files_by_type:
try:
cash_flow_df = process_file(
files_by_type['cash_flow'],
'cash_flow',
config_dir
)
except Exception as e:
print(f"❌ 处理现金流量表时出错: {e}")
import traceback
traceback.print_exc()
# 生成输出文件名
if 'balance' in files_by_type:
base_filename = files_by_type['balance'].stem
elif 'income' in files_by_type:
base_filename = files_by_type['income'].stem
elif 'cash_flow' in files_by_type:
base_filename = files_by_type['cash_flow'].stem
else:
base_filename = company_name
# 清理文件名:去除"资产负债表"、"利润表"、"现金流量表"、"合并报表"等
clean_name = base_filename
clean_name = re.sub(r'_?(资产负债表|利润表|现金流量表|合并报表)_?', '', clean_name)
clean_name = re.sub(r'_+', '_', clean_name).strip('_')
output_file = output_dir / f"{clean_name}.xlsx"
# 合并到一个 Excel 文件
if balance_sheet_df is not None or income_statement_df is not None or cash_flow_df is not None:
try:
merge_to_excel(
balance_sheet_df,
income_statement_df,
cash_flow_df,
str(output_file),
company_name
)
except Exception as e:
print(f"❌ 合并 Excel 时出错: {e}")
import traceback
traceback.print_exc()
else:
print(f"⚠️ {company_name}: 没有可处理的数据")
print(f"\n{'='*60}")
print("处理完成!")
print(f"结果保存在: {output_dir}")
print(f"{'='*60}")
if __name__ == '__main__':
main()