#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 利润表数据处理脚本 功能: 1. 将数字转换为以亿为单位(保留两位小数) 2. 删除配置文件中指定的行 3. 第一行为日期 4. 输出为 Excel 格式 """ import pandas as pd import json import os from pathlib import Path def load_config(config_path): """加载配置文件""" with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config def is_decimal_number(value): """判断是否为小数(绝对值小于1的数字)""" if pd.isna(value) or value == '' or value is None: return False try: num = float(value) # 如果绝对值小于1,认为是小数/比率 return abs(num) < 1 except (ValueError, TypeError): return False def is_large_number(value): """判断是否为需要转换的大数字(绝对值>=1)""" if pd.isna(value) or value == '' or value is None: return False try: num = float(value) # 如果绝对值>=1,认为是需要转换的大数字 return abs(num) >= 1 except (ValueError, TypeError): return False def convert_to_yi(value): """将数字转换为以亿为单位,保留两位小数""" if pd.isna(value) or value == '' or value is None: return value # 统一转换为浮点数进行处理 num = None # 如果已经是数值类型,直接使用 if isinstance(value, (int, float)): num = float(value) else: # 处理字符串格式 try: # 转换为字符串并清理 str_value = str(value).strip() # 去除等号(Excel公式格式,如 "=557395000000") if str_value.startswith('='): str_value = str_value[1:] # 去除常见的数字格式字符(逗号、空格等) str_value = str_value.replace(',', '').replace(' ', '').replace(',', '') # 如果为空,返回原值 if str_value == '' or str_value.lower() == 'nan': return value # 转换为浮点数(支持科学计数法,如 "2.07327E+12") num = float(str_value) except (ValueError, TypeError): # 如果无法转换,返回原值 return value # 如果成功转换为数字 if num is not None: # 如果是小数(比率),返回去除等号后的值 if abs(num) < 1: # 如果原值是字符串且以等号开头,返回去除等号后的值 if isinstance(value, str) and value.strip().startswith('='): return str_value return value # 转换为亿,保留两位小数 yi_value = num / 100000000 return round(yi_value, 2) # 如果无法处理,返回原值 return value def find_row_by_name(df, row_name): """在数据框中查找指定名称的行""" first_col = df.iloc[:, 0] # 尝试精确匹配 mask = first_col == row_name if mask.any(): return df[mask] # 尝试模糊匹配(去除空格) mask = first_col.str.strip() == row_name.strip() if mask.any(): return df[mask] return None def _parse_cell_number(cell): """将单元格解析为数字(支持 =123 格式),无法解析返回 None""" if pd.isna(cell) or cell == '' or cell is None: return None s = str(cell).strip() if s.startswith('='): s = s[1:] s = s.replace(',', '').replace(' ', '').replace(',', '') if not s or s.lower() == 'nan': return None try: return float(s) except (ValueError, TypeError): return None def _parse_date_cell(cell): """解析日期单元格,返回 'YYYY-MM-DD' 或 None""" if pd.isna(cell) or cell == '' or cell is None: return None s = str(cell).strip() if s.startswith('='): s = s[1:].strip('"\'') # 兼容 2025-12-31 或 2025/12/31 s = s.replace('/', '-') if len(s) == 10 and s[4] == '-' and s[7] == '-': return s return None def _date_to_quarter(date_str): """将日期转为季度显示:2024-03-31 -> 2024Q1, 2024-06-30 -> 2024Q2 等""" if not date_str or len(date_str) < 10: return date_str or '' year = date_str[:4] end = date_str[5:] # MM-DD if end == '03-31': return f'{year}Q1' if end == '06-30': return f'{year}Q2' if end == '09-30': return f'{year}Q3' if end == '12-31': return f'{year}Q4' # 其他日期按月份推断 try: m = int(date_str[5:7]) q = (m - 1) // 3 + 1 return f'{year}Q{q}' except (ValueError, TypeError): return date_str def build_profitability_df(input_file): """ 从利润表原始文件构建「盈利能力」Tab 数据。 提取:营业收入、毛利(营业收入-营业成本)、毛利率、归母净利润。 当存在季度/半年日期(非12-31)时,计算单季数据:Q1=3-31,Q2=6-30-3-31,Q3=9-30-6-30,Q4=12-31-9-30。 累计与单季之间空两列,时间竖排,金额转为亿。 返回:pd.DataFrame,含表头,可直接写入 Excel。 """ if input_file.endswith('.csv'): df = pd.read_csv(input_file, encoding='utf-8') else: df = pd.read_excel(input_file) # 日期行:第一列名为「日期」 date_row_idx = None for i in range(min(5, len(df))): if str(df.iloc[i, 0]).strip() == '日期': date_row_idx = i break if date_row_idx is None: print("盈利能力: 未找到日期行,跳过") return None # 数据列从第 4 列开始(跳过 财报类型/日期/股票代码/上市公司 等前几列) data_start_col = 3 row_names = df.iloc[:, 0].astype(str).str.strip() def get_row_values(row_name): idx = row_names[row_names == row_name].index if len(idx) == 0: return None row = df.iloc[idx[0]] return [_parse_cell_number(row.iloc[c]) for c in range(data_start_col, len(row))] def get_dates(): row = df.iloc[date_row_idx] return [_parse_date_cell(row.iloc[c]) for c in range(data_start_col, len(row))] dates = get_dates() rev = get_row_values('营业收入') cost = get_row_values('营业成本') # 毛利率优先用报表里的「毛利率(GM)」,没有则用 毛利/营业收入 算 gross_margin = get_row_values('毛利率(GM)') net_parent = get_row_values('归属于母公司股东及其他权益持有者的净利润') if net_parent is None: net_parent = get_row_values('归属于母公司普通股股东的净利润') if rev is None or cost is None or net_parent is None: print("盈利能力: 缺少营业收入/营业成本/归母净利润行,跳过") return None n = len(dates) # 毛利 = 营业收入 - 营业成本 gross_profit = [] for i in range(n): r, c = rev[i], cost[i] if r is not None and c is not None: gross_profit.append(r - c) else: gross_profit.append(None) # 若没有毛利率行,用 毛利/营业收入 if gross_margin is None: gross_margin = [] for i in range(n): if gross_profit[i] is not None and rev[i] is not None and rev[i] != 0: gross_margin.append(gross_profit[i] / rev[i]) else: gross_margin.append(None) else: gross_margin = list(gross_margin) # 按日期过滤有效行,并排序(时间竖排:从旧到新) YI = 100000000 # 亿 rows_cum = [] for i in range(n): if dates[i] is None: continue rows_cum.append({ 'date': dates[i], 'rev': rev[i], 'gross': gross_profit[i], 'margin': gross_margin[i], 'net_parent': net_parent[i], }) if not rows_cum: return None # 按日期降序(最新在前) rows_cum.sort(key=lambda x: x['date'], reverse=True) # 判断是否为季度/半年数据(存在非 12-31 的期末日) has_quarterly = any( (r['date'] or '').endswith('-03-31') or (r['date'] or '').endswith('-06-30') or (r['date'] or '').endswith('-09-30') for r in rows_cum ) # 累计转亿 def to_yi(v): if v is None: return None return round(v / YI, 2) result_rows = [] # 表头 if has_quarterly: header = ['时间', '营业收入(亿)', '毛利(亿)', '毛利率', '归母净利润(亿)', '', '', '营业收入(亿)', '毛利(亿)', '毛利率', '归母净利润(亿)'] sub_header = ['', '累计', '累计', '累计', '累计', '', '', '单季', '单季', '单季', '单季'] else: header = ['时间', '营业收入(亿)', '毛利(亿)', '毛利率', '归母净利润(亿)'] sub_header = None result_rows.append(header) if sub_header is not None: result_rows.append(sub_header) if not has_quarterly: for r in rows_cum: result_rows.append([ _date_to_quarter(r['date']), to_yi(r['rev']), to_yi(r['gross']), round(r['margin'], 4) if r['margin'] is not None else None, to_yi(r['net_parent']), ]) return pd.DataFrame(result_rows) # 按年分组,计算单季 from collections import defaultdict by_year = defaultdict(list) for r in rows_cum: y = (r['date'] or '')[:4] if y: by_year[y].append(r) # 单季规则:Q1=3-31, Q2=6-30-3-31, Q3=9-30-6-30, Q4=12-31-9-30;同年内按 3-31,6-30,9-30,12-31 排序 quarter_order = ['-03-31', '-06-30', '-09-30', '-12-31'] single_quarter = [] # 与 rows_cum 对齐:每个期末日对应一行,单季数据按期末日填 date_to_single = {} # date -> { rev, gross, margin, net_parent } for year in sorted(by_year.keys()): items = by_year[year] items_sorted = sorted(items, key=lambda x: (x['date'] or '')) prev_rev = prev_gross = prev_net = None for item in items_sorted: d = item['date'] rev_val, gross_val, net_val = item['rev'], item['gross'], item['net_parent'] if d.endswith('-03-31'): sq_rev = rev_val sq_gross = gross_val sq_net = net_val else: if prev_rev is None: sq_rev = sq_gross = sq_net = None else: sq_rev = rev_val - prev_rev if rev_val is not None and prev_rev is not None else None sq_gross = gross_val - prev_gross if gross_val is not None and prev_gross is not None else None sq_net = net_val - prev_net if net_val is not None and prev_net is not None else None prev_rev, prev_gross, prev_net = rev_val, gross_val, net_val sq_margin = (sq_gross / sq_rev) if (sq_rev and sq_rev != 0 and sq_gross is not None) else None date_to_single[d] = {'rev': sq_rev, 'gross': sq_gross, 'margin': sq_margin, 'net_parent': sq_net} for r in rows_cum: d = r['date'] sq = date_to_single.get(d, {}) result_rows.append([ _date_to_quarter(d), to_yi(r['rev']), to_yi(r['gross']), round(r['margin'], 4) if r['margin'] is not None else None, to_yi(r['net_parent']), '', '', to_yi(sq.get('rev')), to_yi(sq.get('gross')), round(sq['margin'], 4) if sq.get('margin') is not None else None, to_yi(sq.get('net_parent')), ]) return pd.DataFrame(result_rows) def process_income_statement_return_df(input_file, config_file): """ 处理利润表文件并返回 DataFrame 参数: input_file: 输入的CSV/Excel文件路径 config_file: 配置文件路径 返回: pandas.DataFrame: 处理后的数据框 """ print(f"正在读取文件: {input_file}") # 读取数据文件 if input_file.endswith('.csv'): df = pd.read_csv(input_file, encoding='utf-8') else: df = pd.read_excel(input_file) print(f"文件读取成功,共 {len(df)} 行") # 加载配置文件 config = load_config(config_file) rows_to_delete = config.get('rows_to_delete', []) print(f"配置文件加载成功,需要删除 {len(rows_to_delete)} 种行") # 记录要删除的行索引 indices_to_delete = set() # 查找并标记要删除的行 for row_name in rows_to_delete: row_data = find_row_by_name(df, row_name) if row_data is not None and len(row_data) > 0: for idx in row_data.index: indices_to_delete.add(idx) print(f" 标记删除: {row_name}") print(f"\n共标记 {len(indices_to_delete)} 行待删除") # 创建结果数据框 result_rows = [] # 首先添加日期行 date_row = find_row_by_name(df, '日期') date_idx = None if date_row is not None and len(date_row) > 0: date_idx = date_row.index[0] result_rows.append(df.iloc[date_idx].tolist()) indices_to_delete.add(date_idx) print("已添加日期行作为第一行") else: print("警告: 未找到日期行") # 处理其他行 print("\n开始处理数据...") processed_count = 0 for idx in range(len(df)): # 跳过要删除的行 if idx in indices_to_delete: continue row = df.iloc[idx].tolist() processed_row = [] # 处理每一列 for col_idx, value in enumerate(row): if col_idx == 0: # 第一列是名称,不处理 processed_row.append(value) else: # 其他列,转换大数字为亿 converted_value = convert_to_yi(value) processed_row.append(converted_value) result_rows.append(processed_row) processed_count += 1 print(f"共处理 {processed_count} 行数据") # 创建结果数据框(不使用列名) result_df = pd.DataFrame(result_rows) print("\n处理完成!") print(f"总行数: {len(result_df)}") return result_df def process_income_statement(input_file, config_file, output_file): """ 处理利润表文件并保存 参数: input_file: 输入的CSV/Excel文件路径 config_file: 配置文件路径 output_file: 输出文件路径 """ # 调用处理函数获取 DataFrame result_df = process_income_statement_return_df(input_file, config_file) # 保存结果 print(f"\n正在保存结果到: {output_file}") # 强制使用 xlsx 格式 if not output_file.endswith('.xlsx'): output_file = output_file.rsplit('.', 1)[0] + '.xlsx' # 保存时不写入列名和索引 result_df.to_excel(output_file, index=False, header=False, engine='openpyxl') print(f"输出文件: {output_file}") def main(): """主函数""" # 获取脚本所在目录 script_dir = Path(__file__).parent # 设置路径 input_dir = script_dir / 'input' config_dir = script_dir / 'config' output_dir = script_dir / 'output' # 创建输出目录 output_dir.mkdir(exist_ok=True) # 配置文件路径 config_file = config_dir / 'income_statement_config.json' if not config_file.exists(): print(f"错误: 配置文件不存在: {config_file}") return # 查找input目录下的文件 if not input_dir.exists(): print(f"错误: input目录不存在: {input_dir}") return # 获取所有CSV和Excel文件(可以根据文件名筛选利润表) input_files = [] for pattern in ['*利润表*.csv', '*利润表*.xlsx', '*利润表*.xls', '*income*.csv', '*income*.xlsx', '*income*.xls']: input_files.extend(list(input_dir.glob(pattern))) # 去重 input_files = list(set(input_files)) if not input_files: print(f"警告: 在 {input_dir} 目录下未找到利润表文件") print("提示: 文件名应包含'利润表'或'income'") print("\n如需处理所有文件,请修改脚本中的文件匹配规则") # 备选:处理所有文件 all_files = list(input_dir.glob('*.csv')) + \ list(input_dir.glob('*.xlsx')) + \ list(input_dir.glob('*.xls')) if all_files: print(f"\n发现 {len(all_files)} 个文件,是否全部处理?") input_files = all_files if not input_files: print("未找到任何文件") return print(f"找到 {len(input_files)} 个文件待处理\n") # 处理每个文件 for input_file in input_files: print(f"\n{'='*60}") print(f"处理文件: {input_file.name}") print(f"{'='*60}") # 生成输出文件名(统一使用 .xlsx 扩展名,去除"合并报表_") base_name = input_file.stem # 不带扩展名的文件名 # 清理文件名 import re base_name = base_name.replace('合并报表_', '').replace('合并报表', '') base_name = re.sub(r'_+', '_', base_name).strip('_') output_file = output_dir / f"{base_name}.xlsx" try: process_income_statement( str(input_file), str(config_file), str(output_file) ) except Exception as e: print(f"处理文件时出错: {e}") import traceback traceback.print_exc() print(f"\n{'='*60}") print("所有文件处理完成!") print(f"结果保存在: {output_dir}") print(f"{'='*60}") if __name__ == '__main__': main()