327 lines
11 KiB
Python
327 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
财务报表数据处理主程序
|
||
功能:
|
||
1. 自动识别并分类处理资产负债表、利润表和现金流量表
|
||
2. 将处理结果合并到一个 Excel 文件的不同 Tab 页
|
||
3. 自动清理文件名,去除冗余文字
|
||
4. 统一数据单位为亿(保留2位小数)
|
||
5. 通过配置指定公司名称,从 input/公司名 和 config/公司名 读取数据
|
||
"""
|
||
|
||
import pandas as pd
|
||
import json
|
||
from pathlib import Path
|
||
import re
|
||
import process_balance_sheet
|
||
import process_income_statement
|
||
import process_cash_flow
|
||
|
||
# ==================== 公司配置 ====================
|
||
# 方式1: 直接在此处配置公司名称(对应 input 和 config 下的文件夹名)
|
||
# 方式2: 在项目根目录的 config.json 中配置 "company": "公司名",会覆盖此处配置
|
||
COMPANY_NAME = "泡泡玛特"
|
||
|
||
|
||
def get_company_name(script_dir):
|
||
"""
|
||
获取当前要处理的公司名称
|
||
优先从 config.json 读取,否则使用入口文件中的 COMPANY_NAME
|
||
"""
|
||
config_file = script_dir / 'config.json'
|
||
if config_file.exists():
|
||
try:
|
||
with open(config_file, 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
company = config.get('company')
|
||
if company:
|
||
print(f"从 config.json 读取公司配置: {company}")
|
||
return company
|
||
except (json.JSONDecodeError, IOError) as e:
|
||
print(f"⚠️ 读取 config.json 失败: {e},使用默认配置")
|
||
return COMPANY_NAME
|
||
|
||
|
||
def clean_filename(filename):
|
||
"""
|
||
清理文件名,去除"合并报表_"等内容
|
||
"""
|
||
# 去除扩展名
|
||
name = Path(filename).stem
|
||
|
||
# 去除"合并报表_"
|
||
name = name.replace('合并报表_', '')
|
||
name = name.replace('合并报表', '')
|
||
|
||
# 去除多余的下划线和空格
|
||
name = re.sub(r'_+', '_', name) # 多个下划线替换为一个
|
||
name = name.strip('_') # 去除首尾下划线
|
||
|
||
return name
|
||
|
||
|
||
def classify_file(filename):
|
||
"""
|
||
根据文件名分类文件类型
|
||
返回: 'balance_sheet', 'income_statement', 'cash_flow', 或 None
|
||
"""
|
||
filename_lower = filename.lower()
|
||
|
||
if '资产负债表' in filename or 'balance' in filename_lower:
|
||
return 'balance_sheet'
|
||
elif '利润表' in filename or 'income' in filename_lower:
|
||
return 'income_statement'
|
||
elif '现金流量表' in filename or ('cash' in filename_lower and 'flow' in filename_lower):
|
||
return 'cash_flow'
|
||
else:
|
||
return None
|
||
|
||
|
||
def process_file(file_path, file_type, config_dir):
|
||
"""
|
||
处理单个文件
|
||
返回: pandas.DataFrame
|
||
"""
|
||
if file_type == 'balance_sheet':
|
||
print(f"\n{'='*60}")
|
||
print(f"资产负债表处理: {file_path.name}")
|
||
print(f"{'='*60}")
|
||
|
||
config_file = config_dir / 'balance_sheet_categories.json'
|
||
|
||
# 调用资产负债表处理函数
|
||
df_result = process_balance_sheet.process_balance_sheet_return_df(
|
||
str(file_path),
|
||
str(config_file)
|
||
)
|
||
return df_result
|
||
|
||
elif file_type == 'income_statement':
|
||
print(f"\n{'='*60}")
|
||
print(f"利润表处理: {file_path.name}")
|
||
print(f"{'='*60}")
|
||
|
||
config_file = config_dir / 'income_statement_config.json'
|
||
|
||
# 调用利润表处理函数
|
||
df_result = process_income_statement.process_income_statement_return_df(
|
||
str(file_path),
|
||
str(config_file)
|
||
)
|
||
return df_result
|
||
|
||
elif file_type == 'cash_flow':
|
||
print(f"\n{'='*60}")
|
||
print(f"现金流量表处理: {file_path.name}")
|
||
print(f"{'='*60}")
|
||
|
||
config_file = config_dir / 'cash_flow_config.json'
|
||
|
||
# 调用现金流量表处理函数
|
||
df_result = process_cash_flow.process_cash_flow_return_df(
|
||
str(file_path),
|
||
str(config_file)
|
||
)
|
||
return df_result
|
||
|
||
return None
|
||
|
||
|
||
def merge_to_excel(balance_sheet_df, income_statement_df, cash_flow_df, output_file, company_name):
|
||
"""
|
||
将资产负债表、利润表和现金流量表合并到一个 Excel 文件的不同 Tab 页
|
||
"""
|
||
print(f"\n{'='*60}")
|
||
print(f"合并 Excel 文件")
|
||
print(f"{'='*60}")
|
||
|
||
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
|
||
if balance_sheet_df is not None:
|
||
balance_sheet_df.to_excel(
|
||
writer,
|
||
sheet_name='资产负债表',
|
||
index=False,
|
||
header=False
|
||
)
|
||
print(f"✓ 已添加【资产负债表】Tab,共 {len(balance_sheet_df)} 行")
|
||
|
||
if income_statement_df is not None:
|
||
income_statement_df.to_excel(
|
||
writer,
|
||
sheet_name='利润表',
|
||
index=False,
|
||
header=False
|
||
)
|
||
print(f"✓ 已添加【利润表】Tab,共 {len(income_statement_df)} 行")
|
||
|
||
if cash_flow_df is not None:
|
||
cash_flow_df.to_excel(
|
||
writer,
|
||
sheet_name='现金流量表',
|
||
index=False,
|
||
header=True # 现金流量表保留列名
|
||
)
|
||
print(f"✓ 已添加【现金流量表】Tab,共 {len(cash_flow_df)} 行")
|
||
|
||
print(f"\n合并完成!输出文件: {output_file}")
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("="*60)
|
||
print("财务报表数据处理工具")
|
||
print("="*60)
|
||
|
||
# 获取脚本所在目录
|
||
script_dir = Path(__file__).parent
|
||
|
||
# 获取公司名称(从 config.json 或入口配置)
|
||
company_name = get_company_name(script_dir)
|
||
|
||
# 按公司设置路径:input/公司名、config/公司名、output
|
||
input_dir = script_dir / 'input' / company_name
|
||
config_dir = script_dir / 'config' / company_name
|
||
output_dir = script_dir / 'output'
|
||
|
||
# 创建输出目录
|
||
output_dir.mkdir(exist_ok=True)
|
||
|
||
# 检查 input 目录
|
||
if not input_dir.exists():
|
||
print(f"错误: input 目录不存在: {input_dir}")
|
||
print(f"请确保 input/{company_name} 文件夹存在,并包含 Excel/CSV 数据文件")
|
||
return
|
||
|
||
# 检查 config 目录
|
||
if not config_dir.exists():
|
||
print(f"错误: config 目录不存在: {config_dir}")
|
||
print(f"请确保 config/{company_name} 文件夹存在,并包含配置文件")
|
||
return
|
||
|
||
# 获取该公司目录下的所有 CSV 和 Excel 文件
|
||
input_files = list(input_dir.glob('*.csv')) + \
|
||
list(input_dir.glob('*.xlsx')) + \
|
||
list(input_dir.glob('*.xls'))
|
||
|
||
if not input_files:
|
||
print(f"错误: 在 {input_dir} 目录下未找到任何文件")
|
||
return
|
||
|
||
print(f"\n当前公司: {company_name}")
|
||
print(f"数据目录: {input_dir}")
|
||
print(f"配置目录: {config_dir}")
|
||
print(f"找到 {len(input_files)} 个文件待处理\n")
|
||
|
||
# 按文件类型分类(资产负债表、利润表、现金流量表)
|
||
files_by_type = {} # {'balance': file, 'income': file, 'cash_flow': file}
|
||
|
||
for file_path in input_files:
|
||
file_type = classify_file(file_path.name)
|
||
|
||
if file_type is None:
|
||
print(f"⚠️ 跳过: {file_path.name} (无法识别文件类型)")
|
||
continue
|
||
|
||
if file_type == 'balance_sheet':
|
||
files_by_type['balance'] = file_path
|
||
print(f"✓ 识别为【资产负债表】: {file_path.name}")
|
||
elif file_type == 'income_statement':
|
||
files_by_type['income'] = file_path
|
||
print(f"✓ 识别为【利润表】: {file_path.name}")
|
||
elif file_type == 'cash_flow':
|
||
files_by_type['cash_flow'] = file_path
|
||
print(f"✓ 识别为【现金流量表】: {file_path.name}")
|
||
|
||
# 处理文件
|
||
print(f"\n{'='*60}")
|
||
print(f"处理公司: {company_name}")
|
||
print(f"{'='*60}")
|
||
|
||
balance_sheet_df = None
|
||
income_statement_df = None
|
||
cash_flow_df = None
|
||
|
||
# 处理资产负债表
|
||
if 'balance' in files_by_type:
|
||
try:
|
||
balance_sheet_df = process_file(
|
||
files_by_type['balance'],
|
||
'balance_sheet',
|
||
config_dir
|
||
)
|
||
except Exception as e:
|
||
print(f"❌ 处理资产负债表时出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
# 处理利润表
|
||
if 'income' in files_by_type:
|
||
try:
|
||
income_statement_df = process_file(
|
||
files_by_type['income'],
|
||
'income_statement',
|
||
config_dir
|
||
)
|
||
except Exception as e:
|
||
print(f"❌ 处理利润表时出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
# 处理现金流量表
|
||
if 'cash_flow' in files_by_type:
|
||
try:
|
||
cash_flow_df = process_file(
|
||
files_by_type['cash_flow'],
|
||
'cash_flow',
|
||
config_dir
|
||
)
|
||
except Exception as e:
|
||
print(f"❌ 处理现金流量表时出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
# 生成输出文件名
|
||
if 'balance' in files_by_type:
|
||
base_filename = files_by_type['balance'].stem
|
||
elif 'income' in files_by_type:
|
||
base_filename = files_by_type['income'].stem
|
||
elif 'cash_flow' in files_by_type:
|
||
base_filename = files_by_type['cash_flow'].stem
|
||
else:
|
||
base_filename = company_name
|
||
|
||
# 清理文件名:去除"资产负债表"、"利润表"、"现金流量表"、"合并报表"等
|
||
clean_name = base_filename
|
||
clean_name = re.sub(r'_?(资产负债表|利润表|现金流量表|合并报表)_?', '', clean_name)
|
||
clean_name = re.sub(r'_+', '_', clean_name).strip('_')
|
||
|
||
output_file = output_dir / f"{clean_name}.xlsx"
|
||
|
||
# 合并到一个 Excel 文件
|
||
if balance_sheet_df is not None or income_statement_df is not None or cash_flow_df is not None:
|
||
try:
|
||
merge_to_excel(
|
||
balance_sheet_df,
|
||
income_statement_df,
|
||
cash_flow_df,
|
||
str(output_file),
|
||
company_name
|
||
)
|
||
except Exception as e:
|
||
print(f"❌ 合并 Excel 时出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
else:
|
||
print(f"⚠️ {company_name}: 没有可处理的数据")
|
||
|
||
print(f"\n{'='*60}")
|
||
print("处理完成!")
|
||
print(f"结果保存在: {output_dir}")
|
||
print(f"{'='*60}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|
||
|