Files
organize_excel_data/process_income_statement.py
2026-03-05 22:02:57 +08:00

321 lines
9.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
利润表数据处理脚本
功能:
1. 将数字转换为以亿为单位(保留两位小数)
2. 删除配置文件中指定的行
3. 第一行为日期
4. 输出为 Excel 格式
"""
import pandas as pd
import json
import os
from pathlib import Path
def load_config(config_path):
"""加载配置文件"""
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config
def is_decimal_number(value):
"""判断是否为小数绝对值小于1的数字"""
if pd.isna(value) or value == '' or value is None:
return False
try:
num = float(value)
# 如果绝对值小于1认为是小数/比率
return abs(num) < 1
except (ValueError, TypeError):
return False
def is_large_number(value):
"""判断是否为需要转换的大数字(绝对值>=1"""
if pd.isna(value) or value == '' or value is None:
return False
try:
num = float(value)
# 如果绝对值>=1认为是需要转换的大数字
return abs(num) >= 1
except (ValueError, TypeError):
return False
def convert_to_yi(value):
"""将数字转换为以亿为单位,保留两位小数"""
if pd.isna(value) or value == '' or value is None:
return value
# 统一转换为浮点数进行处理
num = None
# 如果已经是数值类型,直接使用
if isinstance(value, (int, float)):
num = float(value)
else:
# 处理字符串格式
try:
# 转换为字符串并清理
str_value = str(value).strip()
# 去除等号Excel公式格式如 "=557395000000"
if str_value.startswith('='):
str_value = str_value[1:]
# 去除常见的数字格式字符(逗号、空格等)
str_value = str_value.replace(',', '').replace(' ', '').replace('', '')
# 如果为空,返回原值
if str_value == '' or str_value.lower() == 'nan':
return value
# 转换为浮点数(支持科学计数法,如 "2.07327E+12"
num = float(str_value)
except (ValueError, TypeError):
# 如果无法转换,返回原值
return value
# 如果成功转换为数字
if num is not None:
# 如果是小数(比率),返回去除等号后的值
if abs(num) < 1:
# 如果原值是字符串且以等号开头,返回去除等号后的值
if isinstance(value, str) and value.strip().startswith('='):
return str_value
return value
# 转换为亿,保留两位小数
yi_value = num / 100000000
return round(yi_value, 2)
# 如果无法处理,返回原值
return value
def find_row_by_name(df, row_name):
"""在数据框中查找指定名称的行"""
first_col = df.iloc[:, 0]
# 尝试精确匹配
mask = first_col == row_name
if mask.any():
return df[mask]
# 尝试模糊匹配(去除空格)
mask = first_col.str.strip() == row_name.strip()
if mask.any():
return df[mask]
return None
def process_income_statement_return_df(input_file, config_file):
"""
处理利润表文件并返回 DataFrame
参数:
input_file: 输入的CSV/Excel文件路径
config_file: 配置文件路径
返回:
pandas.DataFrame: 处理后的数据框
"""
print(f"正在读取文件: {input_file}")
# 读取数据文件
if input_file.endswith('.csv'):
df = pd.read_csv(input_file, encoding='utf-8')
else:
df = pd.read_excel(input_file)
print(f"文件读取成功,共 {len(df)}")
# 加载配置文件
config = load_config(config_file)
rows_to_delete = config.get('rows_to_delete', [])
print(f"配置文件加载成功,需要删除 {len(rows_to_delete)} 种行")
# 记录要删除的行索引
indices_to_delete = set()
# 查找并标记要删除的行
for row_name in rows_to_delete:
row_data = find_row_by_name(df, row_name)
if row_data is not None and len(row_data) > 0:
for idx in row_data.index:
indices_to_delete.add(idx)
print(f" 标记删除: {row_name}")
print(f"\n共标记 {len(indices_to_delete)} 行待删除")
# 创建结果数据框
result_rows = []
# 首先添加日期行
date_row = find_row_by_name(df, '日期')
date_idx = None
if date_row is not None and len(date_row) > 0:
date_idx = date_row.index[0]
result_rows.append(df.iloc[date_idx].tolist())
indices_to_delete.add(date_idx)
print("已添加日期行作为第一行")
else:
print("警告: 未找到日期行")
# 处理其他行
print("\n开始处理数据...")
processed_count = 0
for idx in range(len(df)):
# 跳过要删除的行
if idx in indices_to_delete:
continue
row = df.iloc[idx].tolist()
processed_row = []
# 处理每一列
for col_idx, value in enumerate(row):
if col_idx == 0:
# 第一列是名称,不处理
processed_row.append(value)
else:
# 其他列,转换大数字为亿
converted_value = convert_to_yi(value)
processed_row.append(converted_value)
result_rows.append(processed_row)
processed_count += 1
print(f"共处理 {processed_count} 行数据")
# 创建结果数据框(不使用列名)
result_df = pd.DataFrame(result_rows)
print("\n处理完成!")
print(f"总行数: {len(result_df)}")
return result_df
def process_income_statement(input_file, config_file, output_file):
"""
处理利润表文件并保存
参数:
input_file: 输入的CSV/Excel文件路径
config_file: 配置文件路径
output_file: 输出文件路径
"""
# 调用处理函数获取 DataFrame
result_df = process_income_statement_return_df(input_file, config_file)
# 保存结果
print(f"\n正在保存结果到: {output_file}")
# 强制使用 xlsx 格式
if not output_file.endswith('.xlsx'):
output_file = output_file.rsplit('.', 1)[0] + '.xlsx'
# 保存时不写入列名和索引
result_df.to_excel(output_file, index=False, header=False, engine='openpyxl')
print(f"输出文件: {output_file}")
def main():
"""主函数"""
# 获取脚本所在目录
script_dir = Path(__file__).parent
# 设置路径
input_dir = script_dir / 'input'
config_dir = script_dir / 'config'
output_dir = script_dir / 'output'
# 创建输出目录
output_dir.mkdir(exist_ok=True)
# 配置文件路径
config_file = config_dir / 'income_statement_config.json'
if not config_file.exists():
print(f"错误: 配置文件不存在: {config_file}")
return
# 查找input目录下的文件
if not input_dir.exists():
print(f"错误: input目录不存在: {input_dir}")
return
# 获取所有CSV和Excel文件可以根据文件名筛选利润表
input_files = []
for pattern in ['*利润表*.csv', '*利润表*.xlsx', '*利润表*.xls',
'*income*.csv', '*income*.xlsx', '*income*.xls']:
input_files.extend(list(input_dir.glob(pattern)))
# 去重
input_files = list(set(input_files))
if not input_files:
print(f"警告: 在 {input_dir} 目录下未找到利润表文件")
print("提示: 文件名应包含'利润表''income'")
print("\n如需处理所有文件,请修改脚本中的文件匹配规则")
# 备选:处理所有文件
all_files = list(input_dir.glob('*.csv')) + \
list(input_dir.glob('*.xlsx')) + \
list(input_dir.glob('*.xls'))
if all_files:
print(f"\n发现 {len(all_files)} 个文件,是否全部处理?")
input_files = all_files
if not input_files:
print("未找到任何文件")
return
print(f"找到 {len(input_files)} 个文件待处理\n")
# 处理每个文件
for input_file in input_files:
print(f"\n{'='*60}")
print(f"处理文件: {input_file.name}")
print(f"{'='*60}")
# 生成输出文件名(统一使用 .xlsx 扩展名,去除"合并报表_"
base_name = input_file.stem # 不带扩展名的文件名
# 清理文件名
import re
base_name = base_name.replace('合并报表_', '').replace('合并报表', '')
base_name = re.sub(r'_+', '_', base_name).strip('_')
output_file = output_dir / f"{base_name}.xlsx"
try:
process_income_statement(
str(input_file),
str(config_file),
str(output_file)
)
except Exception as e:
print(f"处理文件时出错: {e}")
import traceback
traceback.print_exc()
print(f"\n{'='*60}")
print("所有文件处理完成!")
print(f"结果保存在: {output_dir}")
print(f"{'='*60}")
if __name__ == '__main__':
main()