299 lines
9.5 KiB
Python
299 lines
9.5 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
现金流量表数据处理脚本
|
||
功能:
|
||
1. 根据配置保留指定的列
|
||
2. 删除其他列
|
||
3. 将数字转换为以亿为单位(保留两位小数)
|
||
4. 日期字段在第一列(竖轴)
|
||
5. 输出为 Excel 格式
|
||
"""
|
||
|
||
import pandas as pd
|
||
import json
|
||
import os
|
||
from pathlib import Path
|
||
|
||
|
||
def load_config(config_path):
|
||
"""加载配置文件"""
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
return config
|
||
|
||
|
||
def convert_to_yi(value):
|
||
"""将数字转换为以亿为单位,保留两位小数"""
|
||
if pd.isna(value) or value == '' or value is None:
|
||
return value
|
||
|
||
# 统一转换为浮点数进行处理
|
||
num = None
|
||
|
||
# 如果已经是数值类型,直接使用
|
||
if isinstance(value, (int, float)):
|
||
num = float(value)
|
||
else:
|
||
# 处理字符串格式
|
||
try:
|
||
# 转换为字符串并清理
|
||
str_value = str(value).strip()
|
||
|
||
# 去除等号(Excel公式格式,如 "=557395000000")
|
||
if str_value.startswith('='):
|
||
str_value = str_value[1:]
|
||
|
||
# 去除常见的数字格式字符(逗号、空格等)
|
||
str_value = str_value.replace(',', '').replace(' ', '').replace(',', '')
|
||
|
||
# 如果为空,返回原值
|
||
if str_value == '' or str_value.lower() == 'nan':
|
||
return value
|
||
|
||
# 转换为浮点数(支持科学计数法,如 "2.07327E+12")
|
||
num = float(str_value)
|
||
except (ValueError, TypeError):
|
||
# 如果无法转换,返回原值
|
||
return value
|
||
|
||
# 如果成功转换为数字
|
||
if num is not None:
|
||
# 如果是小数(比率),返回去除等号后的值
|
||
if abs(num) < 1:
|
||
# 如果原值是字符串且以等号开头,返回去除等号后的值
|
||
if isinstance(value, str) and value.strip().startswith('='):
|
||
return str_value
|
||
return value
|
||
|
||
# 转换为亿,保留两位小数
|
||
yi_value = num / 100000000
|
||
return round(yi_value, 2)
|
||
|
||
# 如果无法处理,返回原值
|
||
return value
|
||
|
||
|
||
def find_date_column(df):
|
||
"""
|
||
查找日期列(通常在列名中包含'日期',或者第一列)
|
||
返回日期列的索引,如果找不到返回 0(第一列)
|
||
"""
|
||
for col_idx, col_name in enumerate(df.columns):
|
||
col_str = str(col_name).strip()
|
||
if '日期' in col_str:
|
||
return col_idx
|
||
# 如果找不到,返回第一列(通常日期在第一列)
|
||
return 0
|
||
|
||
|
||
def process_cash_flow_return_df(input_file, config_file):
|
||
"""
|
||
处理现金流量表文件并返回 DataFrame
|
||
|
||
参数:
|
||
input_file: 输入的CSV/Excel文件路径
|
||
config_file: 配置文件路径
|
||
|
||
返回:
|
||
pandas.DataFrame: 处理后的数据框
|
||
"""
|
||
print(f"正在读取文件: {input_file}")
|
||
|
||
# 读取数据文件
|
||
if input_file.endswith('.csv'):
|
||
df = pd.read_csv(input_file, encoding='utf-8')
|
||
else:
|
||
df = pd.read_excel(input_file)
|
||
|
||
print(f"文件读取成功,共 {len(df)} 行,{len(df.columns)} 列")
|
||
|
||
# 加载配置文件
|
||
config = load_config(config_file)
|
||
columns_to_keep = config.get('columns_to_keep', [])
|
||
print(f"配置文件加载成功,需要保留 {len(columns_to_keep)} 列")
|
||
|
||
# 查找日期列(通常在第一列)
|
||
date_col_idx = find_date_column(df)
|
||
date_col_name = df.columns[date_col_idx]
|
||
print(f"日期列: {date_col_name} (第 {date_col_idx + 1} 列)")
|
||
|
||
# 查找需要保留的列在原始数据中的位置
|
||
columns_to_keep_indices = {}
|
||
for col_name in columns_to_keep:
|
||
found = False
|
||
for orig_col_idx, orig_col_name in enumerate(df.columns):
|
||
if str(orig_col_name).strip() == str(col_name).strip():
|
||
columns_to_keep_indices[col_name] = orig_col_idx
|
||
found = True
|
||
print(f" ✓ 找到列: {col_name}")
|
||
break
|
||
if not found:
|
||
print(f" ⚠️ 未找到列: {col_name}")
|
||
columns_to_keep_indices[col_name] = None
|
||
|
||
# 创建结果数据框
|
||
result_rows = []
|
||
|
||
# 处理每一行
|
||
print("\n开始处理数据...")
|
||
kept_count = 0
|
||
deleted_count = 0
|
||
|
||
for idx in range(len(df)):
|
||
row = df.iloc[idx]
|
||
|
||
# 提取日期(第一列)
|
||
date_value = row.iloc[date_col_idx]
|
||
|
||
# 如果日期为空,跳过该行
|
||
if pd.isna(date_value) or str(date_value).strip() == '':
|
||
deleted_count += 1
|
||
continue
|
||
|
||
# 创建结果行:日期 + 保留的列
|
||
result_row = [date_value]
|
||
|
||
# 添加需要保留的列(按配置顺序)
|
||
for col_name in columns_to_keep:
|
||
col_idx = columns_to_keep_indices.get(col_name)
|
||
if col_idx is not None:
|
||
# 找到匹配的列,转换数字并添加
|
||
value = row.iloc[col_idx]
|
||
converted_value = convert_to_yi(value)
|
||
result_row.append(converted_value)
|
||
else:
|
||
# 如果找不到该列,添加空值
|
||
result_row.append('')
|
||
|
||
result_rows.append(result_row)
|
||
kept_count += 1
|
||
|
||
print(f"共保留 {kept_count} 行,删除 {deleted_count} 行")
|
||
|
||
# 创建结果数据框
|
||
# 列名:日期 + 保留的列名
|
||
column_names = ['日期'] + columns_to_keep
|
||
result_df = pd.DataFrame(result_rows, columns=column_names)
|
||
|
||
print("\n处理完成!")
|
||
print(f"总行数: {len(result_df)}")
|
||
print(f"总列数: {len(result_df.columns)}")
|
||
print(f"保留的列: {', '.join(columns_to_keep[:3])}... (共 {len(columns_to_keep)} 列)")
|
||
|
||
return result_df
|
||
|
||
|
||
def process_cash_flow(input_file, config_file, output_file):
|
||
"""
|
||
处理现金流量表文件并保存
|
||
|
||
参数:
|
||
input_file: 输入的CSV/Excel文件路径
|
||
config_file: 配置文件路径
|
||
output_file: 输出文件路径
|
||
"""
|
||
# 调用处理函数获取 DataFrame
|
||
result_df = process_cash_flow_return_df(input_file, config_file)
|
||
|
||
# 保存结果
|
||
print(f"\n正在保存结果到: {output_file}")
|
||
|
||
# 强制使用 xlsx 格式
|
||
if not output_file.endswith('.xlsx'):
|
||
output_file = output_file.rsplit('.', 1)[0] + '.xlsx'
|
||
|
||
# 保存时不写入列名和索引(因为第一行就是日期)
|
||
result_df.to_excel(output_file, index=False, header=True, engine='openpyxl')
|
||
|
||
print(f"输出文件: {output_file}")
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
# 获取脚本所在目录
|
||
script_dir = Path(__file__).parent
|
||
|
||
# 设置路径
|
||
input_dir = script_dir / 'input'
|
||
config_dir = script_dir / 'config'
|
||
output_dir = script_dir / 'output'
|
||
|
||
# 创建输出目录
|
||
output_dir.mkdir(exist_ok=True)
|
||
|
||
# 配置文件路径
|
||
config_file = config_dir / 'cash_flow_config.json'
|
||
|
||
if not config_file.exists():
|
||
print(f"错误: 配置文件不存在: {config_file}")
|
||
return
|
||
|
||
# 查找input目录下的文件
|
||
if not input_dir.exists():
|
||
print(f"错误: input目录不存在: {input_dir}")
|
||
return
|
||
|
||
# 获取所有CSV和Excel文件(可以根据文件名筛选现金流量表)
|
||
input_files = []
|
||
for pattern in ['*现金流量表*.csv', '*现金流量表*.xlsx', '*现金流量表*.xls',
|
||
'*cash*.csv', '*cash*.xlsx', '*cash*.xls']:
|
||
input_files.extend(list(input_dir.glob(pattern)))
|
||
|
||
# 去重
|
||
input_files = list(set(input_files))
|
||
|
||
if not input_files:
|
||
print(f"警告: 在 {input_dir} 目录下未找到现金流量表文件")
|
||
print("提示: 文件名应包含'现金流量表'或'cash'")
|
||
print("\n如需处理所有文件,请修改脚本中的文件匹配规则")
|
||
|
||
# 备选:处理所有文件
|
||
all_files = list(input_dir.glob('*.csv')) + \
|
||
list(input_dir.glob('*.xlsx')) + \
|
||
list(input_dir.glob('*.xls'))
|
||
|
||
if all_files:
|
||
print(f"\n发现 {len(all_files)} 个文件,是否全部处理?")
|
||
input_files = all_files
|
||
|
||
if not input_files:
|
||
print("未找到任何文件")
|
||
return
|
||
|
||
print(f"找到 {len(input_files)} 个文件待处理\n")
|
||
|
||
# 处理每个文件
|
||
for input_file in input_files:
|
||
print(f"\n{'='*60}")
|
||
print(f"处理文件: {input_file.name}")
|
||
print(f"{'='*60}")
|
||
|
||
# 生成输出文件名(统一使用 .xlsx 扩展名,去除"合并报表_")
|
||
base_name = input_file.stem # 不带扩展名的文件名
|
||
# 清理文件名
|
||
import re
|
||
base_name = base_name.replace('合并报表_', '').replace('合并报表', '')
|
||
base_name = re.sub(r'_+', '_', base_name).strip('_')
|
||
output_file = output_dir / f"{base_name}.xlsx"
|
||
|
||
try:
|
||
process_cash_flow(
|
||
str(input_file),
|
||
str(config_file),
|
||
str(output_file)
|
||
)
|
||
except Exception as e:
|
||
print(f"处理文件时出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
print(f"\n{'='*60}")
|
||
print("所有文件处理完成!")
|
||
print(f"结果保存在: {output_dir}")
|
||
print(f"{'='*60}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|