中国期货市场监控中心爬虫

您所在的位置:网站首页 中国期货市场监控中心cfmmc 中国期货市场监控中心爬虫

中国期货市场监控中心爬虫

2024-07-12 06:17| 来源: 网络整理| 查看: 265

实现功能 验证码自动识别模拟登陆多用户数据下载excel处理数据库操作 梗概

炒期货的朋友是不是也有这样的体验,打开中国期货市场监控中心网站,手动登陆到每个帐户,然后在帐户上进行下载数据(逐日盯市),再通过EXCEL宏等统计制作成树状图的形式。

这样操作有许多缺点和不足的地方,比如:

工作量比较大;不能统计一定时间内的数据;不能统一管理和数据分析;多个账号无法对比分析。

因此,写了个爬虫和后台,爬虫每天自动登录中国期货市场监控中心,自动下载逐日盯市的表格数据,然后队列处理数据,写入到自己的后台,就可以分析每个账号一段时间操作的净值、盈亏比等了。

分析

在写爬虫前,我们先手动操作一遍,分析一下流程。

打开登录页,登录页面有验证码,在实现的时候需要自动识别

手动登录后,进入的第一个页面是今天的逐笔对冲,

我们要的是逐日盯市,而且不一定是今天的数据,打开F12,切换到逐日盯市页面,

从 Network 中可以看到如下信息;

Request URL: https://investorservice.cfmmc.com/customer/setParameter.doRequest Method: POSTorg.apache.struts.taglib.html.TOKEN: 08424ad1b28fabecccd7bf5161108b13tradeDate: 2020-10-09byType: date

从上面可以看到接口和参数,org.apache.struts.taglib.html.TOKEN:是登录页面中的一个 input,经测试,不传似乎没什么影响,tradeDate:查询的日期,byType:trade为默认值,trade时是逐笔对冲,date是逐日盯市。

另外,返回的并不是接口形式的数据,而是一个页面,

有了接口和参数,我们就可以抓取指定日期的逐日盯市了。

思路其实很简单,获取用户账号密码,模拟登陆,切换到逐日盯市,下载数据,然后处理excel文件,写到数据库,后台对数据进行分析。

技术点 自动识别验证码模拟登陆excel处理数据库写入

整个流程下来,技术点基本就上面几个。

这个验证码看似很普通,但Google的开源工具Tesseract-OCR识别率太低了,基本识别不了,于是找了BAT大厂的识别接口,依次封装成接口,方便调用:

Tesseract-OCR腾讯文字识别API百度文字识别API百度文字识别SDK

由于阿里的文档只看到python2的,所以就忽略了。识别率自上而下越来越高,腾讯的有点坑,识别率不算高,每天免费次数少,超过还扣费,然后一直给你发欠费邮件,百度SDK识别率是最好的,一般都是5次以内,而且每月免费5000次好像。

代码

后台我是用laravel-wjfcms写的,这里只展示python爬虫部分的代码,代码微调一下就可以跑起来了。

目录结构如下:

config.yaml是数据库配置文件,格式如下:

mysql_config: host: 'xxx' port: 3306 user: 'root' password: 'sss' db: 'qihuo' charset: 'utf8'

登录用户表大概有以下几个字段,根据自己的情况去修改:

CREATE TABLE `wjf_transaction_users` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '交易用户表', `admin_id` tinyint(4) NOT NULL DEFAULT '0' COMMENT '交易账号对应的管理账号', `username` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '用户名', `password` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '密码', `status` tinyint(4) NOT NULL DEFAULT '2' COMMENT '状态,1:已验证,2:待验证,3:验证失败', `created_at` timestamp NULL DEFAULT NULL, `updated_at` timestamp NULL DEFAULT NULL, `deleted_at` timestamp NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE, KEY `wjf_transaction_users_admin_id_index` (`admin_id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ROW_FORMAT=COMPACT;

识别接口代码

百度API地址:https://ai.baidu.com/ai-doc/OCR/3k3h7yeqabaiduBce.py

# -*- coding:utf8 -*- """ Author: gallopingvijay Email: [email protected] Website: https://www.choudalao.com """ from aip import AipOcr """ 你的 APPID AK SK """ APP_ID = 'xxx' API_KEY = 'xxxx' SECRET_KEY = 'xxxx' # 需要安装扩展 # pip install baidu-aip class BaiduBce(): def __init__(self): self.options = {} self.setOptions() self.client = AipOcr(APP_ID, API_KEY, SECRET_KEY) def dealImg(self, image, type=1, is_local=False): ''' 识别 :param image:图片地址或者url :param type:识别接口类型 :param is_local:是否为本地,False:远程,True,本地 :return: ''' if type == 1: # 通用文字识别 if is_local is False: res = self.client.basicGeneralUrl(image) else: res = self.client.basicGeneral(self.get_file_content(image), self.options) elif type == 2: # 通用文字识别(高精度版) if is_local is True: res = self.client.basicAccurate(self.get_file_content(image)) elif type == 3: # 网络图片文字识别 if is_local is False: res = self.client.webImageUrl(image) else: res = self.client.webImage(self.get_file_content(image)) else: if is_local is False: res = self.client.basicGeneralUrl(image) else: res = self.client.basicGeneral(self.get_file_content(image), self.options) return res['words_result'][0]['words'] def get_file_content(self, filePath): ''' 读取图片 ''' with open(filePath, 'rb') as fp: return fp.read() def setOptions(self, language_type='CHN_ENG', detect_direction='false', detect_language='false', probability='false'): ''' 如果有可选参数 :param language_type: :param detect_direction: :param detect_language: :param probability: :return: ''' options = {} options["language_type"] = language_type options["detect_direction"] = detect_direction options["detect_language"] = detect_language options["probability"] = probability self.options = options

baiduImg.py

# encoding:utf-8 """ Author: gallopingvijay Email: [email protected] Website: https://www.choudalao.com """ import requests import base64 CLIENT_ID = 'xxxx' CLIENT_SECRET = 'xxxx' class BaiduImg(): def __init__(self): self.base_url = 'https://aip.baidubce.com' self.token = '' def dealImg(self, path, type=1): self.getToken() if type == 1: # 通用文字识别 request_url = self.base_url + "/rest/2.0/ocr/v1/general_basic" elif type == 2: # 位置信息版 request_url = self.base_url + "/rest/2.0/ocr/v1/general" elif type == 3: request_url = self.base_url + "/rest/2.0/ocr/v1/accurate_basic" elif type == 4: request_url = self.base_url + "/rest/2.0/ocr/v1/accurate" else: request_url = self.base_url + "/rest/2.0/ocr/v1/general_basic" if isinstance(path, bytes): s = path else: # 二进制方式打开图片文件 f = open(path, 'rb') s = f.read() img = base64.b64encode(s) params = {"image": img} access_token = self.token request_url = request_url + "?access_token=" + access_token headers = {'content-type': 'application/x-www-form-urlencoded'} response = requests.post(request_url, data=params, headers=headers) if response: res = response.json() if 'error_msg' in res: return res['error_msg'] else: return res['words_result'][0]['words'] def getToken(self): # client_id 为官网获取的AK, client_secret 为官网获取的SK host = self.base_url + '/oauth/2.0/token?grant_type=client_credentials&client_id=' + str( CLIENT_ID) + '&client_secret=' + str(CLIENT_SECRET) response = requests.get(host) if response: data = response.json() self.token = data['access_token'] print(data['access_token']) # return data['access_token'] def test(self): print(11) # if __name__ == '__main__': # path = '../code_img/veriCode (1).do' # baidu = BaiduImg() # res = baidu.dealImg(path, 3) # print(res)

tx.py

""" Author: gallopingvijay Email: [email protected] Website: https://www.choudalao.com """ from tencentcloud.common import credential from tencentcloud.common.profile.client_profile import ClientProfile from tencentcloud.common.profile.http_profile import HttpProfile from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException from tencentcloud.ocr.v20181119 import ocr_client, models import json def get_code(path): try: cred = credential.Credential( "xxxx", "xxxx") httpProfile = HttpProfile() httpProfile.endpoint = "ocr.tencentcloudapi.com" clientProfile = ClientProfile() clientProfile.httpProfile = httpProfile client = ocr_client.OcrClient(cred, "ap-guangzhou", clientProfile) req = models.GeneralBasicOCRRequest() params = '{\"ImageUrl\":\"' + path + '\"}' req.from_json_string(params) resp = client.GeneralBasicOCR(req) res = resp.to_json_string() res_json = json.loads(res) if res_json['TextDetections'][0]['DetectedText'] is not None: code = res_json['TextDetections'][0]['DetectedText'] # print(res_json['TextDetections'][0]['DetectedText']) return code except TencentCloudSDKException as err: print(err)

cfmmc.py

# -*- coding:utf8 -*- """ Author: gallopingvijay Email: [email protected] Website: https://www.choudalao.com """ # python3 # from shibie import baiduImg # from shibie import tx from datetime import datetime import time import pytesseract from shibie import baiduBce import os import pymysql # from io import BytesIO from requests import session from PIL import Image import PIL.ImageOps import yaml # 跟域名 BASE_URL = 'https://investorservice.cfmmc.com' class Cfmmc(object): def __init__(self, username=None, max_gap=2, code_num=10): ''' 初始化 :param username:用户名,为None时,查询所有用户 :param max_gap:从什么时候开始抓取数据,2表示从两天前开始 :param code_num:验证码识别次数 :return: ''' # 配置 self.configs = self.get_config() # 数据库 self.conn = pymysql.connect( host=self.configs['mysql_config']['host'], port=self.configs['mysql_config']['port'], user=self.configs['mysql_config']['user'], password=self.configs['mysql_config']['password'], db=self.configs['mysql_config']['db'], charset=self.configs['mysql_config']['charset'] ) # 创建一个游标 self.cursor = self.conn.cursor() # 账号 self.users = () # 指定的用户名 self.username = username # 查询区间,从今天往前数的天数 self.max_gap = max_gap # 验证码执行次数 self.code_num = code_num # 消息通知 self.msg = '' # 客户账号状态 self.user_status = 0 # 运行时间 self.run_time = time.strftime( '%Y-%m-%d %H:%M:%S', time.localtime(time.time())) def pymysql_action(self, sql): # 执行 SQL 语句 self.cursor.execute(sql) # 提交 self.conn.commit() def get_config(self): ''' 获取config.yaml配置 :return:configs ''' # 获取当前文件的Realpath fileNamePath = os.path.split(os.path.realpath(__file__))[0] # 读取文件 yamlPath = os.path.join(fileNamePath, 'config.yaml') # 加上 ,encoding='utf-8',处理配置文件中含中文出现乱码的情况。 file = open(yamlPath, 'r', encoding='utf-8') # 读取文件 cont = file.read() # 返回配置 return yaml.safe_load(cont) def file_path(self, path, file): ''' 返回文件路径 :param path:路径 :param file:文件名 :return: ''' # 是否存在目录 if not os.path.exists(path): os.makedirs(path) return path+'/' + file def reg_img(self, file_obj): ''' OCR识别图片 :param file_obj:文件 :return: ''' pytesseract.pytesseract.tesseract_cmd = r"D:\Program Files\Tesseract-OCR\tesseract.exe" im = Image.open(file_obj) im = im.convert('L') binary_image = im.point([0 if i < 210 else 1 for i in range(256)], '1') im1 = binary_image.convert('L') im2 = PIL.ImageOps.invert(im1) im3 = im2.convert('1') im4 = im3.convert('L') res = pytesseract.image_to_string(im4) # print(res) # im4.show() return res def get_download_url(self, content, key='下载'): ''' 获取下载链接 :param content:内容 :param key:key :return: ''' a = content.split(key)[0] b = a.split('')[1] c = b.split('')[0] return BASE_URL + d def flag_filter(self, content, flag): ''' 匹配内容 :param content:内容 :param key:key :return: ''' if len(content.split(flag)) < 2: return '' result = content.split(flag)[1].split('"')[0] return result def get_day(self, gap=1): ''' 返回年月日时间,并过滤周末周六 :param gap:间隔数 :return: ''' if gap >= self.max_gap: return False # 时间戳 day = time.time() - 86400 * (gap - 1) # 输出年月日 ymd = time.strftime('%Y-%m-%d', time.localtime(day)) # 获取星期几 数字1-7代表周一到周日 week = datetime.strptime(ymd, '%Y-%m-%d').isoweekday() if week > 5: print('...' + ymd + '是周' + str(week)) return True return ymd def get_dingshi_data(self, ss, user_id, header, token=''): ''' 处理逐日盯市页面 :param ss:request.session :param user_id:用户id :param header:header :param token:token :return: ''' for i in range(1, self.max_gap): # 获取时间 ymd = self.get_day(i) if ymd is False: # 说明循环完了 return True elif ymd is True: # 说明是周末跳过 continue post_data = { "org.apache.struts.taglib.html.TOKEN": token, "tradeDate": ymd, "byType": 'data', } dingshi_url = BASE_URL+'/customer/setParameter.do' dingshi_html = ss.post(dingshi_url, data=post_data, headers=header, timeout=5) dingshi_html_code = dingshi_html.content.decode() # #看看html内容对不对 # with open('./other/yemian.html', 'w+', # encoding='utf-8') as files: # files.write(dingshi_html_code) if '(逐日盯市)' in dingshi_html_code: # 找到下载按钮 if '下载' in dingshi_html_code: download_url = self.get_download_url( dingshi_html_code, '下载') xls_res = ss.get(download_url, headers=header) # 下载文件 file_name = self.file_path( './exel', user_id + '_' + ymd + '.xls') # 判断文件是否存在 if os.path.exists(file_name) is True: text = '...' + file_name + '已存在,' print(text) self.msg += text continue print('...准备下载' + file_name + '的数据') with open(file_name, "wb") as xls_file: xls_file.write(xls_res.content) else: text = ymd+'...不包含下载按钮,' print(text) self.msg += text continue else: print('...不是追日盯市页面') return True def main(self, user_id, passwd): ''' 处理逐日盯市页面 :param user_id:用户id :param passwd:passwd :return: ''' header = { 'Connection': 'keep-alive', 'User-Agent': "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36", } url = BASE_URL+"/login.do" token_flag = 'name="org.apache.struts.taglib.html.TOKEN" value="' veri_code_flag = 'src="/veriCode.do?t=' ss = session() res = ss.get(url, headers=header) content = res.content.decode() token = self.flag_filter(content, token_flag) veri_code_url = BASE_URL+'/veriCode.do?t=' + self.flag_filter( content, veri_code_flag) for i in range(self.code_num): print(f'...第{i + 1}次尝试') try: # OCR识别 # tmp_file = BytesIO() # tmp_file.write(ss.get(veri_code_url).content) # veri_code = reg_img(tmp_file) # 百度API # veri_code = baiduImg.BaiduImg().dealImg(path, type) # 腾讯API # veri_code = tx.get_code(veri_code_url) # 百度接口 tmp_file = self.file_path('./code_img', 'logo_code.jpg') req_res = ss.get(veri_code_url).content open(tmp_file, 'wb').write(req_res) veri_code = baiduBce.BaiduBce().dealImg(tmp_file, 2, True) if veri_code and len(veri_code) >= 5: veri_code = ''.join(filter(str.isalnum, veri_code)) print('...识别得到:' + veri_code) post_data = { "org.apache.struts.taglib.html.TOKEN": token, "showSaveCookies": '', "userID": user_id, "password": passwd, "vericode": veri_code, } content2 = ss.post(url, data=post_data, headers=header, timeout=5) res = content2.content.decode() if "验证码错误" in res: # 验证码验证失败 print('...验证码不匹配') elif ("用户名或密码错误" in res) or ("您错误尝试超过3次" in res): # 账号密码不通过 text = '...账号密码错误,数据库修改为验证不通过,' print(text) self.msg += text print(text) # 账号密码错误,数据库修改为验证不通过 update_sql = "UPDATE `wjf_transaction_users` SET status=3,updated_at='{0}' WHERE username='{1}'".format( self.run_time, user_id) self.pymysql_action(update_sql) break elif "登录超时,请重新登录" in res: text = '...登录超时,重新登录,' print(text) self.msg += text print(text) break # elif "资金安全特别提示": # 如有这这几个字表示登录失败 # text = '...登录失败,' # print(text) # self.msg += text # print(text) # break else: # 如果客户账号非正常状态,更新为正常 if self.user_status != 1: update_sql = "UPDATE `wjf_transaction_users` SET status=1,updated_at='{0}' WHERE username='{1}'".format( self.run_time, user_id) self.pymysql_action(update_sql) # 获取页面cookie cookie_dict = dict(ss.cookies) cookie = 'WMONID=' + cookie_dict[ 'WMONID'] + ';JSESSIONID=' + cookie_dict['JSESSIONID'] header['Cookie'] = cookie # 进入逐日盯市页面 res = self.get_dingshi_data( ss, user_id, header, token) if res is True: break time.sleep(1) veri_code_url = BASE_URL+"/veriCode.do?t=" + str( int(time.time() * 1000)) except Exception as e: print(e) text = '...' + user_id + '处理结束。' print(text) self.msg += text def get_users(self): ''' 获取用户 :param username:当指定用户是,查询指定的用户 ''' if self.username is None: users_sql = "SELECT username,password,status FROM `wjf_transaction_users` WHERE `deleted_at` is NULL" else: users_sql = "SELECT username,password,status FROM `wjf_transaction_users` WHERE `deleted_at` is NULL AND `username`="+self.username # password 其实有加密的,这里忽略 self.pymysql_action(users_sql) self.users = self.cursor.fetchall() return self.users def run(self): ''' 入口文件,支持多用户 ''' self.users = self.get_users() default_max_gap = self.max_gap for user in self.users: self.msg = '' # 账号密码必须 if user[0] is None or user[0] == '': text = '...账号为空,' print(text) continue if user[1] is None or user[1] == '': text = f'...{user[0]}的密码为空,' print(text) self.msg += text continue # 获取客户账号状态 self.user_status = user[2] if default_max_gap == 2: # 如果是默认天数,那就按照数据库状态来,如果status为1:表示已验证,只抓取2天的数据,否则就抓取180天的,max_gap if self.user_status != 1: self.max_gap = 180 text = f'账号{user[0]}开始执行,' print(text) self.msg = text self.main(user[0], user[1]) def __del__(self): # 关闭游标 self.cursor.close() # 关闭数据库连接 self.conn.close() if __name__ == '__main__': ''' 模拟登陆 中国期货市场监控中心,抓取逐日盯市数据 ''' obj = Cfmmc() res = obj.run()

excel.py(数据库操作部分,根据实际情况修改)

# -*- coding:utf8 -*- """ Author: gallopingvijay Email: [email protected] Website: https://www.choudalao.com """ import xlrd import pymysql import time from itertools import groupby import os import yaml class Excel(object): def __init__(self): # 配置 self.configs = self.get_config() self.conn = pymysql.connect( host=self.configs['mysql_config']['host'], port=self.configs['mysql_config']['port'], user=self.configs['mysql_config']['user'], password=self.configs['mysql_config']['password'], db=self.configs['mysql_config']['db'], charset=self.configs['mysql_config']['charset'] ) # 创建一个游标 self.cursor = self.conn.cursor() # 存储数据的字典 self.dict = {} def get_config(self): ''' 获取config.yaml配置 :return: ''' # 获取当前文件的Realpath fileNamePath = os.path.split(os.path.realpath(__file__))[0] # 读取文件 yamlPath = os.path.join(fileNamePath, 'config.yaml') # 加上 ,encoding='utf-8',处理配置文件中含中文出现乱码的情况。 file = open(yamlPath, 'r', encoding='utf-8') # 读取文件 cont = file.read() # 返回配置 return yaml.safe_load(cont) def pymysql_action(self, sql): # 创建一个游标 # 执行 SQL 语句 self.cursor.execute(sql) # 提交 self.conn.commit() def del_null(self, list): ''' 清理空数据 :param list: :return: ''' list = [i for i in list if i != ''] return list def filter_field(self, field, typeof='string'): ''' 处理每一个写入数据库的值 :param field: 字段值 :param typeof: 类型:string,decimal(四舍五入保留两位),num :return: ''' if field == '--' or field == '': field = '0' elif typeof == 'decimal': field = round(float(field), 2) elif typeof == 'num': field = int(field) else: field = str(field) field = field.lstrip().rstrip() return field def deal_excel(self, path): workbook = xlrd.open_workbook(path) # 获取所有sheet sheet_name = workbook.sheet_names()[0] # 根据sheet索引或者名称获取sheet内容 sheet = workbook.sheet_by_index(0) # sheet索引从0开始 # sheet的名称,行数,列数 nrows = sheet.nrows # ncols = sheet.ncols # 获取整行和整列的值(数组) # rows = sheet.row_values(2) # 获取第2行内容 # cols = sheet.col_values(3) # 获取第3列内容 if sheet_name != '客户交易结算日报': print('...不是需要的数据') exit() ding_shi = False # 如果是逐日盯市,才写入数据库 cny_i = 0 # cny 期货期权账户出入金明细(单位:人民币)开始 usd_i = 0 # 期货期权账户出入金明细(单位:美元) deal_row_i = 0 # 期货成交汇总 开始的行号 position_row_i = 0 # 期货持仓汇总 开始的行号 for i in range(nrows): try: rows = sheet.row_values(i) # 获取第i行内容 deal_rows = self.del_null(rows) if len(deal_rows) < 1: # print(f'第{i + 1}行没有数据') continue if '客户交易结算日报(逐日盯市)' in deal_rows: ding_shi = True if i > 4 and ding_shi is False: # 表头不是逐日盯市,不处理 print('...不是逐日盯市表格,不处理') break # 客户期货期权内部资金账户 if '客户期货期权内部资金账户' in deal_rows: self.dict['fund_account'] = deal_rows[1] self.dict['deal_time'] = deal_rows[3] self.dict['created_at'] = time.strftime( '%Y-%m-%d %H:%M:%S', time.localtime(time.time())) timeArray = time.strptime( self.dict['deal_time'], "%Y-%m-%d") self.dict['deal_time_stamp'] = int(time.mktime(timeArray)) self.dict['out_money_cny'] = self.dict['enter_money_cny'] = self.dict['out_money_usd'] = self.dict[ 'enter_money_usd'] = 0 self.dict['exchange_name'] = '' self.dict['transaction_fees'] = '0.00' self.dict['reporting_fee'] = '0.00' self.dict['position_buy_position'] = '0.00' self.dict['position_sell_position'] = '0.00' self.dict['position_profit_loss'] = '0.00' self.dict['position_trading_margin'] = '0.00' self.dict['deal_num'] = 0 self.dict['deal_turnover'] = '0.00' self.dict['deal_fee'] = '0.00' self.dict['deal_profit_loss'] = '0.00' if '客户名称' in deal_rows: self.dict['admin_name'] = deal_rows[1].strip() self.dict['query_time'] = deal_rows[3] # 获取用户的后台id admin_sql = "SELECT admin_id FROM `wjf_transaction_users` WHERE `admin_name` = '{0}' LIMIT 1".format( self.dict['admin_name']) self.cursor.execute(admin_sql) first_data = self.cursor.fetchone() if first_data is None: print('没有客户名称为' + str(self.dict['admin_name']) + '的数据') break self.dict['admin_id'] = first_data[0] # 如果数据库存在该用户的该天数据,就跳过 summary_sql = "SELECT id FROM `wjf_summary_datas` WHERE `admin_id` = '{0}' and `deal_time_stamp` = '{1}'LIMIT 1".format( self.dict['admin_id'], self.dict['deal_time_stamp']) self.cursor.execute(summary_sql) summary_data = self.cursor.fetchone() if summary_data is not None: print( '客户:' + str(self.dict['admin_name']) + str(self.dict['deal_time']) + '的数据已存在') break if '期货公司名称' in deal_rows: self.dict['futures_company'] = deal_rows[1] if '上日结存' in deal_rows: self.dict['previous_day_balance'] = deal_rows[1] self.dict['customer_rights'] = deal_rows[3] if '当日存取合计' in deal_rows: self.dict['total_access_for_the_day'] = deal_rows[1] self.dict['actual_monetary_funds'] = deal_rows[3] if '当日盈亏' in deal_rows: self.dict['profit_loss_day'] = deal_rows[1] self.dict['not_currency_credit_amount'] = deal_rows[3] if '当日总权利金' in deal_rows: self.dict['total_royalties_of_the_day'] = deal_rows[1] self.dict['currency_credit_amount'] = deal_rows[3] if '当日手续费' in deal_rows: self.dict['day_handling_fee'] = deal_rows[1] self.dict['frozen_funds'] = deal_rows[3] if '当日结存' in deal_rows: self.dict['day_balance'] = deal_rows[1] self.dict['margin_occupation'] = deal_rows[3] if '可用资金' in deal_rows: self.dict['available_funds'] = deal_rows[1] if '风险度' in deal_rows: self.dict['risk'] = deal_rows[1] if '追加保证金' in deal_rows: self.dict['margin_call'] = deal_rows[1] # 期货期权账户出入金明细(单位:人民币 开始 if '期货期权账户出入金明细(单位:人民币)' in deal_rows: cny_i = i if '合计' in deal_rows and (cny_i > 0): self.dict['out_money_cny'] = self.filter_field( deal_rows[1], 'decimal') self.dict['enter_money_cny'] = self.filter_field( deal_rows[2], 'decimal') cny_i = 0 # 期货期权账户出入金明细(单位:美元) if '期货期权账户出入金明细(单位:美元)' in deal_rows: usd_i = i if '合计' in deal_rows and (usd_i > 0): usd_i = 0 self.dict['out_money_usd'] = self.filter_field( deal_rows[1], 'decimal') self.dict['enter_money_usd'] = self.filter_field( deal_rows[2], 'decimal') # 期货成交汇总 开始 if '期货成交汇总' in deal_rows: # 期货成交汇总 开始 deal_row_i = i if '合约' in deal_rows: continue if ('合计' in deal_rows) and (deal_row_i > 0) and (position_row_i deal_row_i and deal_row_i > 0: # 成交汇总 deal_rows.append(self.dict['admin_name']) deal_rows.append(self.dict['query_time']) deal_rows.append(self.dict['deal_time']) deal_rows.append(self.dict['created_at']) # 合约中提取交易所code contract = deal_rows[0] contract_list = [''.join(list(g)) for k, g in groupby( contract, key=lambda x: x.isdigit())] exchange_code = contract_list[0] exchange_num = contract_list[1] deal_rows.append(exchange_code) deal_rows.append(exchange_num) # 字符串转数字,以免数据库保存报错 deal_rows[0] = self.filter_field(deal_rows[0]) deal_rows[1] = self.filter_field(deal_rows[1]) deal_rows[2] = self.filter_field(deal_rows[2]) deal_rows[3] = self.filter_field(deal_rows[3], 'decimal') deal_rows[4] = self.filter_field(deal_rows[4], 'num') deal_rows[5] = self.filter_field(deal_rows[5], 'decimal') deal_rows[8] = self.filter_field(deal_rows[8], 'decimal') deal_rows[6] = self.filter_field(deal_rows[6]) deal_rows[7] = self.filter_field(deal_rows[7]) deal_rows[9] = self.filter_field(deal_rows[9]) deal_rows[10] = self.filter_field(deal_rows[10]) deal_rows[11] = self.filter_field(deal_rows[11]) deal_rows[12] = self.filter_field(deal_rows[12]) deal_rows[13] = self.filter_field(deal_rows[13]) deal_rows[14] = self.filter_field(deal_rows[14]) sql = "INSERT INTO wjf_deal_datas(contract,buy_sell,speculation,final_price,num,turnover,open_flat,fee,profit_loss,admin_name,query_time,deal_time,created_at,exchange_code,exchange_num,admin_id,deal_time_stamp)VALUES('{0}','{1}','{2}','{3}','{4}','{5}','{6}','{7}','{8}','{9}','{10}','{11}','{12}','{13}','{14}','{15}','{16}');".format( deal_rows[0], deal_rows[1], deal_rows[2], deal_rows[3], deal_rows[4], deal_rows[5], deal_rows[6], deal_rows[7], deal_rows[8], deal_rows[9], deal_rows[10], deal_rows[11], deal_rows[12], deal_rows[13], deal_rows[14], self.dict['admin_id'], self.dict['deal_time_stamp']) self.pymysql_action(sql) # print("..." + str(i + 1) + "行期货成交写入完成") if '期货持仓汇总' in deal_rows: # 期货持仓汇总 开始 position_row_i = i deal_row_i = 0 if ('合计' in deal_rows) and (deal_row_i == 0) and (position_row_i > 0): # 期货持仓汇总 结束 position_row_i = 0 self.dict['position_buy_position'] = self.filter_field( deal_rows[1], 'decimal') self.dict['position_sell_position'] = self.filter_field( deal_rows[2], 'decimal') self.dict['position_profit_loss'] = self.filter_field( deal_rows[3], 'decimal') self.dict['position_trading_margin'] = self.filter_field( deal_rows[4], 'decimal') # 写入汇总数据 summary_sql = "INSERT INTO wjf_summary_datas(admin_id,admin_name,query_time,deal_time,deal_time_stamp,fund_account,futures_company,previous_day_balance,customer_rights,total_access_for_the_day,actual_monetary_funds,profit_loss_day,not_currency_credit_amount,total_royalties_of_the_day,currency_credit_amount,day_handling_fee,frozen_funds,day_balance,margin_occupation,available_funds,risk,margin_call,out_money_cny,enter_money_cny,out_money_usd,enter_money_usd,exchange_name,transaction_fees,reporting_fee,deal_num,deal_turnover,deal_fee,deal_profit_loss,position_buy_position,position_sell_position,position_profit_loss,position_trading_margin,created_at)VALUES('{0}','{1}','{2}','{3}','{4}','{5}','{6}','{7}','{8}','{9}','{10}','{11}',{12},'{13}','{14}','{15}','{16}','{17}','{18}','{19}','{20}','{21}','{22}','{23}','{24}','{25}','{26}','{27}','{28}','{29}','{30}','{31}','{32}','{33}','{34}','{35}','{36}','{37}');".format( self.dict['admin_id'], self.dict['admin_name'], self.dict['query_time'], self.dict['deal_time'], self.dict['deal_time_stamp'], self.dict['fund_account'], self.dict['futures_company'], self.dict['previous_day_balance'], self.dict['customer_rights'], self.dict['total_access_for_the_day'], self.dict['actual_monetary_funds'], self.dict['profit_loss_day'], self.dict['not_currency_credit_amount'], self.dict['total_royalties_of_the_day'], self.dict['currency_credit_amount'], self.dict['day_handling_fee'], self.dict['frozen_funds'], self.dict['day_balance'], self.dict['margin_occupation'], self.dict['available_funds'], self.dict['risk'], self.dict['margin_call'], self.dict['out_money_cny'], self.dict['enter_money_cny'], self.dict['out_money_usd'], self.dict['enter_money_usd'], self.dict['exchange_name'], self.dict['transaction_fees'], self.dict['reporting_fee'], self.dict['deal_num'], self.dict['deal_turnover'], self.dict['deal_fee'], self.dict['deal_profit_loss'], self.dict['position_buy_position'], self.dict['position_sell_position'], self.dict['position_profit_loss'], self.dict['position_trading_margin'], self.dict['created_at']) self.pymysql_action(summary_sql) print("..." + "汇总合计写入完成") break if (i > position_row_i) and (position_row_i > 0): # 期货持仓汇总 rows.append(self.dict['admin_name']) rows.append(self.dict['query_time']) rows.append(self.dict['deal_time']) rows.append(self.dict['created_at']) # 合约中提取交易所code contract = rows[0] contract_list = [''.join(list(g)) for k, g in groupby( contract, key=lambda x: x.isdigit())] exchange_code = contract_list[0] exchange_num = contract_list[1] rows.append(exchange_code) rows.append(exchange_num) # 字符串转数字,以免数据库保存报错 rows[1] = self.filter_field(rows[1], 'int') rows[2] = self.filter_field(rows[2], 'decimal') rows[3] = self.filter_field(rows[3], 'int') rows[4] = self.filter_field(rows[4], 'decimal') rows[5] = self.filter_field(rows[5], 'decimal') rows[6] = self.filter_field(rows[6], 'decimal') rows[7] = self.filter_field(rows[7], 'decimal') rows[8] = self.filter_field(rows[8], 'decimal') rows[0] = self.filter_field(rows[0]) rows[9] = self.filter_field(rows[9]) rows[10] = self.filter_field(rows[10]) rows[11] = self.filter_field(rows[11]) rows[12] = self.filter_field(rows[12]) rows[13] = self.filter_field(rows[13]) rows[14] = self.filter_field(rows[14]) rows[15] = self.filter_field(rows[15]) sql = "INSERT INTO wjf_position_datas(contract,buy_position,buy_average_price,sell_position,sell_average_price,settlement_price_yesterday,settlement_price_today,profit_loss,trading_margin,speculation,admin_name,query_time,deal_time,created_at,exchange_code,exchange_num,admin_id,deal_time_stamp)VALUES('{0}','{1}','{2}','{3}','{4}','{5}','{6}','{7}','{8}','{9}','{10}','{11}','{12}','{13}','{14}','{15}','{16}','{17}');".format( rows[0], rows[1], rows[2], rows[3], rows[4], rows[5], rows[6], rows[7], rows[8], rows[9], rows[10], rows[11], rows[12], rows[13], rows[14], rows[15], self.dict['admin_id'], self.dict['deal_time_stamp']) self.pymysql_action(sql) # print("..." + str(i + 1) + "行期货持仓写入完成") # 当没有期货持仓汇总数据的时候,最后额数据在这里写入,如果有期货持仓汇总,不会执行到这里 if nrows-1 == i: print('...没有期货持仓汇总,在最后一行写入汇总数据') # 写入汇总数据 summary_sql = "INSERT INTO wjf_summary_datas(admin_id,admin_name,query_time,deal_time,deal_time_stamp,fund_account,futures_company,previous_day_balance,customer_rights,total_access_for_the_day,actual_monetary_funds,profit_loss_day,not_currency_credit_amount,total_royalties_of_the_day,currency_credit_amount,day_handling_fee,frozen_funds,day_balance,margin_occupation,available_funds,risk,margin_call,out_money_cny,enter_money_cny,out_money_usd,enter_money_usd,exchange_name,transaction_fees,reporting_fee,deal_num,deal_turnover,deal_fee,deal_profit_loss,position_buy_position,position_sell_position,position_profit_loss,position_trading_margin,created_at)VALUES('{0}','{1}','{2}','{3}','{4}','{5}','{6}','{7}','{8}','{9}','{10}','{11}',{12},'{13}','{14}','{15}','{16}','{17}','{18}','{19}','{20}','{21}','{22}','{23}','{24}','{25}','{26}','{27}','{28}','{29}','{30}','{31}','{32}','{33}','{34}','{35}','{36}','{37}');".format( self.dict['admin_id'], self.dict['admin_name'], self.dict['query_time'], self.dict['deal_time'], self.dict['deal_time_stamp'], self.dict['fund_account'], self.dict['futures_company'], self.dict['previous_day_balance'], self.dict['customer_rights'], self.dict['total_access_for_the_day'], self.dict['actual_monetary_funds'], self.dict['profit_loss_day'], self.dict['not_currency_credit_amount'], self.dict['total_royalties_of_the_day'], self.dict['currency_credit_amount'], self.dict['day_handling_fee'], self.dict['frozen_funds'], self.dict['day_balance'], self.dict['margin_occupation'], self.dict['available_funds'], self.dict['risk'], self.dict['margin_call'], self.dict['out_money_cny'], self.dict['enter_money_cny'], self.dict['out_money_usd'], self.dict['enter_money_usd'], self.dict['exchange_name'], self.dict['transaction_fees'], self.dict['reporting_fee'], self.dict['deal_num'], self.dict['deal_turnover'], self.dict['deal_fee'], self.dict['deal_profit_loss'], self.dict['position_buy_position'], self.dict['position_sell_position'], self.dict['position_profit_loss'], self.dict['position_trading_margin'], self.dict['created_at']) self.pymysql_action(summary_sql) print("..." + "汇总合计写入完成") except Exception as e: print('......' + str(i + 1) + "行有异常抛出:") print(e) def get_files(self, path): ''' 获取所有excel文件 :param path:路径 :return: ''' files = os.listdir(path) if len(files) < 1: print('没有文件') else: for file in files: file_path = path + file # 判断后缀 if '.xls' in file: print('开始读取文件:' + file_path) self.deal_excel(file_path) # 删除文件 try: print('删除文件:' + file_path) os.remove(file_path) except Exception as e: print(e) def test(self): for i in range(5): try: if i == 3: continue print(i) except Exception as e: print(e) def __del__(self): # 关闭游标 self.cursor.close() # 关闭数据库连接 self.conn.close() if __name__ == '__main__': path = './exel/' obj = Excel() obj.get_files(path) 总结

cfmmc.py模拟登录获取数据,excel.py文件处理表格数据,结合定时任务及队列,可以实现自动化处理数据。

如果您需要一个可视化的界面,管理和分析爬取到的数据,让数据更有价值。可以查看期货市场监控后台管理系统。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3