Python pandas大批量处理多个excel,并进行处理、统计和改进思路

您所在的位置:网站首页 查找重复项怎么操作excel大数据分析报告 Python pandas大批量处理多个excel,并进行处理、统计和改进思路

Python pandas大批量处理多个excel,并进行处理、统计和改进思路

2024-07-05 22:46| 来源: 网络整理| 查看: 265

处理目标:读取800多个excel中存储的各个城市一段时间的企业信息(每个城市都至少有一个excel的数据),统计每个城市2012-2023年每年各个二级制造业的企业数量

数据大小:800多个excel,共计45GB大小,单个excel大小在1MB-250MB之间

需求分析:由于需要二级制造业和年份两个维度,加上excel中的行和列,不难联想到pandas中的Dataframe;除此之外还需要考虑到大量数据下,普通性能的笔记本要如何简化处理流程,缩短程序的运行时间,字符串的处理和输入、处理、输出的细节;最后代码编写成功后需要先对单个excel进行测试,再对多个excel进行测试,最后加上一些输出信息(为了监控程序的运行进度),再对整体数据跑,最终得出结果

程序设计:为了缩短程序的运行时间,所以先将所有需要的数据在内存中处理好,最后一次性输出到excel中;按照信息专家的设计模式来说,年份和制造子行业的信息都是城市所拥有的信息,所以需要城市类;其次还需要一些筛选符合条件的数据的函数;以及处理单个excel的函数和持久化存储的函数

先看处理单个excel的函数,读入excel之前就要确定excel的列有两种情况,所以先用

pd.read_excel(file_path, engine='openpyxl', nrows=1).columns.tolist()

读取出列并存为列表,将分成两中处理方式,在将excel中的数据读入时,有个很有效提高效率的方式,就是只读取需要的列,read_excel函数的usercols参数就起到这个作用,只有它含有的列才会被读入存储为dataframe;filtered_df生成的方式是原df符合要求的数据才会被保留,.str[:4]指只取前4位,.fillna('0') 指的是空置添0 ,.astype(int)指的是转换成int型变量,.between(2012,2023) 当前列在这个范围中的数据才会被保留,&连接下一个条件,当条件被写成函数时,要用df[column].apply(function_name)来应用函数,数据会被作为参数传入

再按照行业分类和年份作为列和行创建元素全为0的Dataframe,对其进行初始化;对filtered_df逐行遍历按照其中的一列数据为行,一列数据为列的原则对应在result_df上计数,最后返回result_df,这就是处理单个excel的逻辑

# 对每一个excel进行处理 def data_analyse(file_path): print(file_path) cols = pd.read_excel(file_path, engine='openpyxl', nrows=1).columns.tolist() # 存在二级行业分类的excel if '二级行业分类' in cols: df = pd.read_excel(file_path, engine='openpyxl', usecols=['成立日期', '二级行业分类']) # 只取两列 # 只要12-23年和31个制造子行业的数据 filtered_df = df[ df['成立日期'].str[:4].fillna('0').astype(int).between(2012, 2023) & df['二级行业分类'].apply( is_manufacture_industry)] # 计算企业总数量 count = len(filtered_df) # 建立行业为col和年份为index且初始元素都为0的dataframe index_years = list(range(2012, 2024)) columns_names = manufacturing_industries result_df = pd.DataFrame(0, index=index_years, columns=columns_names) # 遍历符合要求的每行数据并用计数df进行计数 for index, row in filtered_df.iterrows(): year = int(row['成立日期'][:4]) sub_industry = row['二级行业分类'] result_df.at[year, sub_industry] += 1 # 对result_df的每一行求和,计算当前城市的年度制造业总量 result_df['年度制造业总量'] = result_df.sum(axis=1) return result_df, count # 没有二级行业分类只有所属行业的excel else: df = pd.read_excel(file_path, engine='openpyxl', usecols=['成立日期', '所属行业']) # 只取两列 # 只要12-23年和31个制造子行业的数据 filtered_df = df[ df['成立日期'].str[:4].fillna('0').astype(int).between(2012, 2023) & df['所属行业'].apply( is_manufacture_industry)] # 计算企业总数量 count = len(filtered_df) # 建立行业为col和年份为index且初始元素都为0的dataframe index_years = list(range(2012, 2024)) columns_names = manufacturing_industries result_df = pd.DataFrame(0, index=index_years, columns=columns_names) # 遍历符合要求的每行数据并用计数df进行计数 for index, row in filtered_df.iterrows(): year = int(row['成立日期'][:4]) sub_industry = row['所属行业'] result_df.at[year, sub_industry] += 1 # 对result_df的每一行求和,计算当前城市的年度制造业总量 result_df['年度制造业总量'] = result_df.sum(axis=1) return result_df, count

再看主程序

city_flag是一个开关变量,其作用是判断相邻的两个excel是否是同一个城市的数据,判断的函数是check_cities,其中对excel文件名进行判断,使用全局变量last_city记录上一个excel的数据所属城市,若城市相同则对两个excel的result_df的对应元素相加,使用city_merge函数实现;若不同则创建新的城市对象加入城市列表中,最后完成之后进行持久化存储

if __name__ == '__main__': # 用以区别是否为一同城市的不同数据表 city_flag = False # 默认是不同的城市 last_city = '' city_list = [] # 所有城市对象的容器 directory_path = r'G:\中国工商注册企业全量信息(2023.9更新)\全国数据' all_files = os.listdir(directory_path) # 开始循环处理目录中的文件 for file in all_files: # 检查城市是否已经有存在的数据了,并调整city_flag last_city = check_cities(file) # 文件绝对路径 file_path = os.path.join(directory_path, file) # 数量矩阵 result_df, count = data_analyse(file_path) if city_flag: # 不同的excel,相同的城市,进行df对应元素相加 city_merge(result_df, count) else: # 新的城市,创建对象,并加入城市列表 temp_city = City(last_city, result_df, count) city_list.append(temp_city) # 进行excel结果输出 save_as_excel() print('done!')

check_cities函数,对城市名称进行检查,并更新last_city;提取城市名时使用了正则表达式,表达式会匹配所有的汉字,返回值为更新后的last_city

# 城市名称检查,返回值为该次的城市名,将作为下一次检查的lastcity def check_cities(file): global city_flag # 声明全局变量 global last_city # 提取城市名 simplified_name = ''.join(re.findall(r'[\u4e00-\u9fa5]', file)) if last_city != simplified_name: city_flag = False last_city = simplified_name else: city_flag = True return last_city

对相同的城市数据进行合并,若有相同的城市,说明城市列表中最后一个元素与当前城市相同,所以对city_list的最后一个元素的result_df进行修改,加上当前的result_df,就完成了合并

# 相同城市,对应元素合并 def city_merge(result_df, count): global city_list city_list[-1].result_df = city_list[-1].result_df + result_df # 最新加入的城市的dataframe和其后续的excel表的df对应元素相加 city_list[-1].count = city_list[-1].count + count

持久化存存储结果

最后的内存中的结果是一个city_list,主要是一些格式上的要求:dataframe的前一行要有city_name,且只有第一个dataframe需要列名(因为excel中只有第一行有列名);主要用到两种写法:1.ws.cell,直接对单元格进行写入  2.dataframe.to_excel函数可以整体的将dataframe写入excel

# 存储到excel中 def save_as_excel(): with pd.ExcelWriter('result.xlsx', engine='openpyxl') as writer: startrow = 0 for idx, city in enumerate(city_list): # 将city_name写入到当前的startrow位置 ws = writer.sheets['Sheet1'] if 'Sheet1' in writer.sheets else None if not ws: ws = writer.book.create_sheet('Sheet1') ws.cell(row=startrow + 1, column=1, value=city.city_name) # 对每个城市的12-23年11年制造业总量求和 # ws.cell(row=startrow + 1, column=2, value='2012-2023年制造业总量') # ws.cell(row=startrow + 1, column=3, value=city.count) # 如果是第一个DataFrame,写入header if idx == 0: header = True else: header = False # 更新startrow为city_name之后的行,并将DataFrame写入到这个位置 startrow += 1 city.result_df.to_excel(writer, sheet_name='Sheet1', startrow=startrow, index=True, header=header) # 更新下一个dataframe的起始行,包括DataFrame本身的长度和一个额外的行为下一个city_name预留的位置 startrow += city.result_df.shape[0] + 1

所有源码:

import os import re import pandas as pd # 制造业二级分类 manufacturing_industries = ['农副食品加工业', '食品制造业', '酒、饮料和精制茶制造业', '烟草制品业', '纺织业', '纺织服装、服饰业', '皮革、毛皮、羽毛及其制品和制鞋业', '木材加工和木、竹、藤、棕、草制品业', '家具制造业', '造纸和纸制品业', '印刷和记录媒介复制业', '文教、工美、体育和娱乐用品制造业', '石油、煤炭及其他燃料加工业', '化学原料和化学制品制造业', '医药制造业', '化学纤维制造业', '橡胶和塑料制品业', '非金属矿物制品业', '黑色金属冶炼和压延加工业', '有色金属冶炼和压延加工业', '金属制品业', '通用设备制造业', '专用设备制造业', '汽车制造业', '铁路、船舶、航空航天和其他运输设备制造业', '电气机械和器材制造业', '计算机、通信和其他电子设备制造业', '仪器仪表制造业', '其他制造业', '废弃资源综合利用业', '金属制品、机械和设备修理业'] # 城市类 class City: def __init__(self, city_name, result_df, count): self.city_name = city_name self.result_df = result_df self.result_df.index.name = city_name # 给这个城市的df标注城市名称 self.count = count # 12-23年该城市的制造业企业总数 # 判断时候为制造业,返回值为布尔值 def is_manufacture_industry(value): # 31个制造子行业 manufacturing_industries = ['农副食品加工业', '食品制造业', '酒、饮料和精制茶制造业', '烟草制品业', '纺织业', '纺织服装、服饰业', '皮革、毛皮、羽毛及其制品和制鞋业', '木材加工和木、竹、藤、棕、草制品业', '家具制造业', '造纸和纸制品业', '印刷和记录媒介复制业', '文教、工美、体育和娱乐用品制造业', '石油、煤炭及其他燃料加工业', '化学原料和化学制品制造业', '医药制造业', '化学纤维制造业', '橡胶和塑料制品业', '非金属矿物制品业', '黑色金属冶炼和压延加工业', '有色金属冶炼和压延加工业', '金属制品业', '通用设备制造业', '专用设备制造业', '汽车制造业', '铁路、船舶、航空航天和其他运输设备制造业', '电气机械和器材制造业', '计算机、通信和其他电子设备制造业', '仪器仪表制造业', '其他制造业', '废弃资源综合利用业', '金属制品、机械和设备修理业'] return value in manufacturing_industries # 对每一个excel进行处理 def data_analyse(file_path): print(file_path) cols = pd.read_excel(file_path, engine='openpyxl', nrows=1).columns.tolist() # 存在二级行业分类的excel if '二级行业分类' in cols: df = pd.read_excel(file_path, engine='openpyxl', usecols=['成立日期', '二级行业分类']) # 只取两列 # 只要12-23年和31个制造子行业的数据 filtered_df = df[ df['成立日期'].str[:4].fillna('0').astype(int).between(2012, 2023) & df['二级行业分类'].apply( is_manufacture_industry)] # 计算企业总数量 count = len(filtered_df) # 建立行业为col和年份为index且初始元素都为0的dataframe index_years = list(range(2012, 2024)) columns_names = manufacturing_industries result_df = pd.DataFrame(0, index=index_years, columns=columns_names) # 遍历符合要求的每行数据并用计数df进行计数 for index, row in filtered_df.iterrows(): year = int(row['成立日期'][:4]) sub_industry = row['二级行业分类'] result_df.at[year, sub_industry] += 1 # 对result_df的每一行求和,计算当前城市的年度制造业总量 result_df['年度制造业总量'] = result_df.sum(axis=1) return result_df, count # 没有二级行业分类只有所属行业的excel else: df = pd.read_excel(file_path, engine='openpyxl', usecols=['成立日期', '所属行业']) # 只取两列 # 只要12-23年和31个制造子行业的数据 filtered_df = df[ df['成立日期'].str[:4].fillna('0').astype(int).between(2012, 2023) & df['所属行业'].apply( is_manufacture_industry)] # 计算企业总数量 count = len(filtered_df) # 建立行业为col和年份为index且初始元素都为0的dataframe index_years = list(range(2012, 2024)) columns_names = manufacturing_industries result_df = pd.DataFrame(0, index=index_years, columns=columns_names) # 遍历符合要求的每行数据并用计数df进行计数 for index, row in filtered_df.iterrows(): year = int(row['成立日期'][:4]) sub_industry = row['所属行业'] result_df.at[year, sub_industry] += 1 # 对result_df的每一行求和,计算当前城市的年度制造业总量 result_df['年度制造业总量'] = result_df.sum(axis=1) return result_df, count # 城市名称检查,返回值为该次的城市名,将作为下一次检查的lastcity def check_cities(file): global city_flag # 声明全局变量 global last_city # 提取城市名 simplified_name = ''.join(re.findall(r'[\u4e00-\u9fa5]', file)) if last_city != simplified_name: city_flag = False last_city = simplified_name else: city_flag = True return last_city # 相同城市,对应元素合并 def city_merge(result_df, count): global city_list city_list[-1].result_df = city_list[-1].result_df + result_df # 最新加入的城市的dataframe和其后续的excel表的df对应元素相加 city_list[-1].count = city_list[-1].count + count # 存储到excel中 def save_as_excel(): with pd.ExcelWriter('result.xlsx', engine='openpyxl') as writer: startrow = 0 for idx, city in enumerate(city_list): # 将city_name写入到当前的startrow位置 ws = writer.sheets['Sheet1'] if 'Sheet1' in writer.sheets else None if not ws: ws = writer.book.create_sheet('Sheet1') ws.cell(row=startrow + 1, column=1, value=city.city_name) # 对每个城市的12-23年11年制造业总量求和 # ws.cell(row=startrow + 1, column=2, value='2012-2023年制造业总量') # ws.cell(row=startrow + 1, column=3, value=city.count) # 如果是第一个DataFrame,写入header if idx == 0: header = True else: header = False # 更新startrow为city_name之后的行,并将DataFrame写入到这个位置 startrow += 1 city.result_df.to_excel(writer, sheet_name='Sheet1', startrow=startrow, index=True, header=header) # 更新下一个dataframe的起始行,包括DataFrame本身的长度和一个额外的行为下一个city_name预留的位置 startrow += city.result_df.shape[0] + 1 if __name__ == '__main__': # 用以区别是否为一同城市的不同数据表 city_flag = False # 默认是不同的城市 last_city = '' city_list = [] # 所有城市对象的容器 directory_path = r'G:\中国工商注册企业全量信息(2023.9更新)\全国数据' all_files = os.listdir(directory_path) # 开始循环处理目录中的文件 for file in all_files: # 检查城市是否已经有存在的数据了,并调整city_flag last_city = check_cities(file) # 文件绝对路径 file_path = os.path.join(directory_path, file) # 数量矩阵 result_df, count = data_analyse(file_path) if city_flag: # 不同的excel,相同的城市,进行df对应元素相加 city_merge(result_df, count) else: # 新的城市,创建对象,并加入城市列表 temp_city = City(last_city, result_df, count) city_list.append(temp_city) # 进行excel结果输出 save_as_excel() print('done!') # 进行 时间优化,然后开始运行

改进思路:

对于excel的统计尤其是数据量比较大的excel,实际上是CPU密集型的;本程序单核运行,最后完成这45个g的数据的执行时间是20小时左右;CPU密集型的操作适合使用多进程编程来充分利用计算机的性能

用一个简单的多进程例子展示使用python的multiprocessing库来完成多进程编程

在进程函数中,由于使用了共享数据结构,所以在将结果添加到共享数据结构时,要使用.get_lock()方法加锁,来保证当某一个进程向共享数据结构中添加元素时,其他进程不能同时对其进行操作

import multiprocessing # 步骤 2:定义一个函数,接受参数并执行任务 def process_function(arg1, shared_list): # 在这里执行任务,可以使用参数 arg1 result = arg1 * 2 # 将结果添加到共享列表 with shared_list.get_lock(): shared_list.append(result) if __name__ == "__main__": # 步骤 1:导入 multiprocessing 模块 # 步骤 3:创建一个共享列表 manager = multiprocessing.Manager() shared_list = manager.list() # 步骤 4:创建一个进程池 num_processes = multiprocessing.cpu_count() pool = multiprocessing.Pool(processes=num_processes) # 步骤 5:使用进程池提交任务,传递参数和共享列表 arg1_values = [1, 2, 3, 4, 5] # 使用 partial 函数创建一个包装函数,传递共享列表作为参数 from functools import partial process_function_with_shared_list = partial(process_function, shared_list=shared_list) # 使用 map 方法执行任务 pool.map(process_function_with_shared_list, arg1_values) # 步骤 6:等待所有任务完成,关闭进程池 pool.close() pool.join() # 打印共享列表的内容 print("Shared List:", shared_list)



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3