Files
organize_excel_data/process_balance_sheet.py
2026-03-05 22:02:57 +08:00

376 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
资产负债表数据处理脚本
功能:按照配置的分类重新组织资产负债表数据
"""
import pandas as pd
import json
import os
from pathlib import Path
def load_config(config_path):
"""加载分类配置文件"""
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config['categories']
def find_row_by_item_name(df, item_name):
"""在数据框中查找指定项目名称的行"""
# 获取第一列的数据
first_col = df.iloc[:, 0]
# 尝试精确匹配
mask = first_col == item_name
if mask.any():
return df[mask]
# 尝试模糊匹配(去除空格)
mask = first_col.str.strip() == item_name.strip()
if mask.any():
return df[mask]
return None
def safe_float_convert(value):
"""安全地将值转换为浮点数,处理各种格式"""
if pd.isna(value) or value == '' or value is None:
return 0.0
try:
# 如果已经是数值类型
if isinstance(value, (int, float)):
return float(value)
# 转换为字符串并清理
str_value = str(value).strip()
if str_value == '' or str_value.lower() == 'nan':
return 0.0
# 去除等号Excel公式格式如 "=557395000000"
if str_value.startswith('='):
str_value = str_value[1:]
# 去除常见的数字格式字符(逗号、空格等)
str_value = str_value.replace(',', '').replace(' ', '').replace('', '')
if str_value == '' or str_value.lower() == 'nan':
return 0.0
# 尝试转换(支持科学计数法,如 "2.07327E+12"
return float(str_value)
except (ValueError, TypeError):
return 0.0
def convert_to_yi(value):
"""将数字转换为以亿为单位,保留两位小数"""
if pd.isna(value) or value == '' or value is None:
return value
# 统一转换为浮点数进行处理
num = None
# 如果已经是数值类型,直接使用
if isinstance(value, (int, float)):
num = float(value)
else:
# 处理字符串格式
try:
# 转换为字符串并清理
str_value = str(value).strip()
# 去除等号Excel公式格式如 "=557395000000"
if str_value.startswith('='):
str_value = str_value[1:]
# 去除常见的数字格式字符(逗号、空格等)
str_value = str_value.replace(',', '').replace(' ', '').replace('', '')
# 如果为空,返回原值
if str_value == '' or str_value.lower() == 'nan':
return value
# 转换为浮点数(支持科学计数法,如 "2.07327E+12"
num = float(str_value)
except (ValueError, TypeError):
# 如果无法转换,返回原值
return value
# 如果成功转换为数字
if num is not None:
# 如果是小数(比率),返回去除等号后的值
if abs(num) < 1:
# 如果原值是字符串且以等号开头,返回去除等号后的值
if isinstance(value, str) and value.strip().startswith('='):
return str_value
return value
# 转换为亿,保留两位小数
yi_value = num / 100000000
return round(yi_value, 2)
# 如果无法处理,返回原值
return value
def process_balance_sheet_return_df(input_file, config_file):
"""
处理资产负债表文件并返回 DataFrame
参数:
input_file: 输入的CSV/Excel文件路径
config_file: 分类配置文件路径
返回:
pandas.DataFrame: 处理后的数据框
"""
print(f"正在读取文件: {input_file}")
# 读取数据文件
if input_file.endswith('.csv'):
df = pd.read_csv(input_file, encoding='utf-8')
else:
df = pd.read_excel(input_file)
print(f"文件读取成功,共 {len(df)}")
# 加载分类配置
categories = load_config(config_file)
print(f"配置文件加载成功,共 {len(categories)} 个分类")
# 创建结果数据框
result_rows = []
processed_indices = set()
# 提取日期行作为第一行
date_row = find_row_by_item_name(df, '日期')
if date_row is not None and len(date_row) > 0:
date_idx = date_row.index[0]
result_rows.append(df.iloc[date_idx].tolist())
processed_indices.add(date_idx)
print("已添加日期行作为第一行")
else:
print("警告: 未找到日期行")
# 定义特殊分类(只有一项,直接显示数据,不需要额外的分类标题行)
special_categories = ['资产总计', '所有者权益合计', '负债合计']
# 按分类处理
for category in categories:
category_name = category['name']
items = category['items']
print(f"\n处理分类: {category_name}")
# 查找该分类下的所有项目
found_rows = []
found_count = 0
for item_name in items:
row_data = find_row_by_item_name(df, item_name)
if row_data is not None and len(row_data) > 0:
for idx in row_data.index:
if idx not in processed_indices:
found_rows.append(df.iloc[idx].tolist())
processed_indices.add(idx)
found_count += 1
print(f" 找到: {item_name}")
if found_count == 0:
print(f" 警告: 该分类下未找到任何项目")
continue
# 处理特殊分类
if category_name in special_categories:
# 特殊分类:直接添加数据行,但将第一列的名称替换为配置的 name并转换数字为亿
for row in found_rows:
modified_row = row[:] # 创建列表的副本
modified_row[0] = category_name # 使用配置中的 name 替换原始名称
# 转换数字为亿从第2列开始
for col_idx in range(1, len(modified_row)):
original_value = modified_row[col_idx]
converted_value = convert_to_yi(original_value)
modified_row[col_idx] = converted_value
# 调试:打印前几个转换示例
if col_idx <= 2 and not pd.isna(original_value) and original_value != '':
print(f" 转换示例: {original_value} -> {converted_value}")
result_rows.append(modified_row)
else:
# 普通分类:添加分类标题行(包含汇总数据)+ 明细行
# 计算每列的汇总从第2列开始第1列是名称
category_header = [category_name]
# 获取列数(使用第一个找到的行的长度)
num_cols = len(found_rows[0]) if found_rows else len(df.columns)
for col_idx in range(1, num_cols):
col_sum = 0.0
for row in found_rows:
if col_idx < len(row):
col_sum += safe_float_convert(row[col_idx])
# 汇总后转换为亿
if col_sum == 0.0:
category_header.append('')
else:
category_header.append(convert_to_yi(col_sum))
result_rows.append(category_header)
# 添加明细行(转换数字为亿)
for row_idx, row in enumerate(found_rows):
converted_row = row[:] # 创建副本
# 转换数字为亿从第2列开始
for col_idx in range(1, len(converted_row)):
original_value = converted_row[col_idx]
converted_value = convert_to_yi(original_value)
converted_row[col_idx] = converted_value
# 调试:打印前几个转换示例
if row_idx == 0 and col_idx <= 2 and not pd.isna(original_value) and original_value != '':
print(f" 转换示例: {original_value} -> {converted_value}")
result_rows.append(converted_row)
# 添加3个空行
# 使用第一行(日期行)的长度来确定列数
num_cols = len(result_rows[0]) if result_rows else len(df.columns)
empty_row = [''] * num_cols
for _ in range(3):
result_rows.append(empty_row)
# 添加未分类的行
print(f"\n处理未分类项目...")
uncategorized_header = ['未分类项目'] + [''] * (num_cols - 1)
result_rows.append(uncategorized_header)
uncategorized_count = 0
for idx in range(len(df)):
if idx not in processed_indices:
row = df.iloc[idx].tolist()
# 转换数字为亿从第2列开始
converted_row = [row[0]] # 保留第一列(名称)
for col_idx in range(1, len(row)):
converted_row.append(convert_to_yi(row[col_idx]))
result_rows.append(converted_row)
uncategorized_count += 1
# 打印未分类项目的名称
item_name = df.iloc[idx, 0]
if pd.notna(item_name) and str(item_name).strip():
print(f" 未分类: {item_name}")
print(f"\n共有 {uncategorized_count} 个未分类项目")
# 创建结果数据框(不使用列名,因为第一行就是日期行)
result_df = pd.DataFrame(result_rows)
print("\n处理完成!")
print(f"总行数: {len(result_df)}")
print(f"已分类项目: {len(processed_indices)}")
print(f"未分类项目: {uncategorized_count}")
return result_df
def process_balance_sheet(input_file, config_file, output_file):
"""
处理资产负债表文件并保存
参数:
input_file: 输入的CSV/Excel文件路径
config_file: 分类配置文件路径
output_file: 输出文件路径
"""
# 调用处理函数获取 DataFrame
result_df = process_balance_sheet_return_df(input_file, config_file)
# 保存结果(默认保存为 xlsx 格式)
print(f"\n正在保存结果到: {output_file}")
# 强制使用 xlsx 格式
if not output_file.endswith('.xlsx'):
output_file = output_file.rsplit('.', 1)[0] + '.xlsx'
# 保存时不写入列名和索引
result_df.to_excel(output_file, index=False, header=False, engine='openpyxl')
print(f"输出文件: {output_file}")
def main():
"""主函数"""
# 获取脚本所在目录
script_dir = Path(__file__).parent
# 设置路径
input_dir = script_dir / 'input'
config_dir = script_dir / 'config'
output_dir = script_dir / 'output'
# 创建输出目录
output_dir.mkdir(exist_ok=True)
# 配置文件路径
config_file = config_dir / 'balance_sheet_categories.json'
if not config_file.exists():
print(f"错误: 配置文件不存在: {config_file}")
return
# 查找input目录下的文件
if not input_dir.exists():
print(f"错误: input目录不存在: {input_dir}")
return
# 获取所有CSV和Excel文件
input_files = list(input_dir.glob('*.csv')) + \
list(input_dir.glob('*.xlsx')) + \
list(input_dir.glob('*.xls'))
if not input_files:
print(f"错误: 在 {input_dir} 目录下未找到任何CSV或Excel文件")
return
print(f"找到 {len(input_files)} 个文件待处理\n")
# 处理每个文件
for input_file in input_files:
print(f"\n{'='*60}")
print(f"处理文件: {input_file.name}")
print(f"{'='*60}")
# 生成输出文件名(统一使用 .xlsx 扩展名,去除"合并报表_"
base_name = input_file.stem # 不带扩展名的文件名
# 清理文件名
import re
base_name = base_name.replace('合并报表_', '').replace('合并报表', '')
base_name = re.sub(r'_+', '_', base_name).strip('_')
output_file = output_dir / f"{base_name}.xlsx"
try:
process_balance_sheet(
str(input_file),
str(config_file),
str(output_file)
)
except Exception as e:
print(f"处理文件时出错: {e}")
import traceback
traceback.print_exc()
print(f"\n{'='*60}")
print("所有文件处理完成!")
print(f"结果保存在: {output_dir}")
print(f"{'='*60}")
if __name__ == '__main__':
main()