高德地图实时路况数据

您所在的位置:网站首页 高德地图怎么不显示实时路况了 高德地图实时路况数据

高德地图实时路况数据

2023-11-22 18:32| 来源: 网络整理| 查看: 265

一、Request爬取数据

1.获取AK 进入【高德地图开放平台】→注册认证成为开发者→【应用管理】→【我的应用】→【创建新应用】→获得AK 2.所需爬取数据 研究所需数据主要由两部分构成, ①高德地图开放平台交通态势数据(简称交通态势数据),爬取链接:https://restapi.amap.com/v3/traffic/status/road? ②高德地图西安城市交通详情数据(简称交通详情数据),爬取链接: https://report.amap.com/detail.do?city=610100 第一部分数据是通过高德地图提供的lbs服务API获取,第二部分数据是通过requests动态爬取页面获得 3.数据展示 ①交通态势数据: 在这里插入图片描述 ②交通详情数据: 在这里插入图片描述 3.爬虫代码 ①交通态势数据:

import requests import json import time import csv def get_one_page(name): url = 'https://restapi.amap.com/v3/traffic/status/road?parameters' params = { 'key': '你自己的ak', 'adcode': '610100', 'name': name } try: response = requests.get(url,params=params) if response.status_code == 200: return response.json(),name except requests.ConnectionError as e: print('Error', e.args) names = {'东二环路','北二环路','二环南路西段','二环南路东段','西二环路','金花北路','大兴西路'} def write_to_file(content): with open('交通态势.csv','a',newline='') as csvfile: fieldnames = ['name','name_description','localtime','expedite','congested','blocked','unknown','status','description'] writer = csv.DictWriter(csvfile,fieldnames = fieldnames) writer.writerow(content) def parse_page(html,name): item = html.get('trafficinfo') luduan = {} luduan['name'] = name luduan['name_description'] = item.get('description') luduan['localtime'] = time.strftime('%Y-%m-%d,%H:%M:%S',time.localtime(time.time())) luduan['expedite'] = item.get('evaluation').get('expedite') luduan['congested'] = item.get('evaluation').get('congested') luduan['blocked'] = item.get('evaluation').get('blocked') luduan['unknown'] = item.get('evaluation').get('unknown') luduan['status'] = item.get('evaluation').get('status') luduan['description'] = item.get('evaluation').get('description') #write_to_file(luduan) yield luduan while True: for name in names: r, n = get_one_page(name) print(r,n) y = parse_page(r, n) #print(list(y)) print("************************************************************") time.sleep(300)

②交通详情数据:

import requests from urllib.parse import urlencode from pyquery import PyQuery as pq import json import csv import time def get_page(roadType): base_url = 'https://report.amap.com/ajax/roadRank.do?' headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36', 'referer': 'https://report.amap.com/detail.do?city=610100', 'x-requested-with': 'XMLHttpRequest', } params = { 'roadType': roadType, 'timeType': '0', 'cityCode': '610100', } url = base_url + urlencode(params) try: response = requests.get(url, headers=headers) if response.status_code == 200: return response.json() except requests.ConnectionError as e: print('Error', e.args) def write_to_file(content): with open('实时拥堵指数.csv','a',newline='') as csvfile: fieldnames = ['name','localtime','dir','id','length','number','index','speed','travelTime','delayTime'] writer = csv.DictWriter(csvfile,fieldnames = fieldnames) writer.writerow(content) def parse_page(html): items = html.get('tableData') for item in items: luduan = {} luduan['name'] = item.get('name') luduan['localtime'] = time.strftime('%Y-%m-%d,%H:%M:%S',time.localtime(time.time())) luduan['dir'] = item.get('dir') luduan['id'] = item.get('id') luduan['length'] = item.get('length') luduan['number'] = item.get('number') luduan['index'] = item.get('index') luduan['speed'] = item.get('speed') luduan['travelTime'] = item.get('travelTime') luduan['delayTime'] = item.get('delayTime') write_to_file(luduan) yield luduan while True: for i in {0,1}: print(i) roadType = i if __name__ == '__main__': html = get_page(roadType) results = parse_page(html) print(list(results)) print("************************************************************") time.sleep(300) 二、数据预处理

运用pandas、numpy分析库将原始数据整理成最终所需的数据格式,过程不详细赘述 最终的数据格式: 在这里插入图片描述

三、RNN神经网络预测

持续时长作为需要预测的变量,其它特征值作为输入变量 RNN代码如下:

# -*- coding: utf-8 -*- import pandas as pd import numpy as np import matplotlib.pyplot as plt import tensorflow as tf #定义常量 rnn_unit=10 #hidden layer units input_size=7 output_size=1 lr=0.001 #学习率 #——————————————————导入数据—————————————————————— f='拥堵事件-预处理.xlsx' df=pd.read_excel(f) data=df.iloc[:,0:8].values print(data) A = np.zeros((1500, 2)) #获取训练集 def get_train_data(batch_size=20,time_step=10,train_begin=0,train_end=80): batch_index=[] data_train=data[train_begin:train_end] #normalized_train_data=data_train normalized_train_data=(data_train-np.mean(data_train,axis=0))/np.std(data_train,axis=0) #标准化 train_x,train_y=[],[] #训练集 for i in range(len(normalized_train_data)-time_step): if i % batch_size==0: batch_index.append(i) x=normalized_train_data[i:i+time_step,:7] y=normalized_train_data[i:i+time_step,7,np.newaxis] train_x.append(x.tolist()) train_y.append(y.tolist()) batch_index.append((len(normalized_train_data)-time_step)) return batch_index,train_x,train_y #获取测试集 def get_test_data(time_step=10,test_begin=80): data_test=data[test_begin:] mean=np.mean(data_test,axis=0) std=np.std(data_test,axis=0) normalized_test_data=(data_test-mean)/std #标准化 #normalized_test_data=data_test size=(len(normalized_test_data)+time_step-1)//time_step #有size个sample test_x,test_y=[],[] for i in range(size-1): x=normalized_test_data[i*time_step:(i+1)*time_step,:7] y=normalized_test_data[i*time_step:(i+1)*time_step,7] test_x.append(x.tolist()) test_y.extend(y) test_x.append((normalized_test_data[(i+1)*time_step:,:7]).tolist()) test_y.extend((normalized_test_data[(i+1)*time_step:,7]).tolist()) return mean,std,test_x,test_y #——————————————————定义神经网络变量—————————————————— #输入层、输出层权重、偏置 weights={ 'in':tf.Variable(tf.random.normal([input_size,rnn_unit])), 'out':tf.Variable(tf.random.normal([rnn_unit,1])) } biases={ 'in':tf.Variable(tf.constant(0.1,shape=[rnn_unit,])), 'out':tf.Variable(tf.constant(0.1,shape=[1,])) } #——————————————————定义神经网络变量—————————————————— def lstm(X): batch_size=tf.shape(X)[0] time_step=tf.shape(X)[1] w_in=weights['in'] b_in=biases['in'] input=tf.reshape(X,[-1,input_size]) #需要将tensor转成2维进行计算,计算后的结果作为隐藏层的输入 input_rnn=tf.matmul(input,w_in)+b_in input_rnn=tf.reshape(input_rnn,[-1,time_step,rnn_unit]) #将tensor转成3维,作为lstm cell的输入 cell=tf.nn.rnn_cell.BasicLSTMCell(rnn_unit) init_state=cell.zero_state(batch_size,dtype=tf.float32) output_rnn,final_states=tf.nn.dynamic_rnn(cell, input_rnn,initial_state=init_state, dtype=tf.float32) #output_rnn是记录lstm每个输出节点的结果,final_states是最后一个cell的结果 output=tf.reshape(output_rnn,[-1,rnn_unit]) #作为输出层的输入 w_out=weights['out'] b_out=biases['out'] pred=tf.matmul(output,w_out)+b_out return pred,final_states #——————————————————训练模型—————————————————— def train_lstm(batch_size=20,time_step=10,train_begin=0,train_end=80): X=tf.placeholder(tf.float32, shape=[None,time_step,input_size]) Y=tf.placeholder(tf.float32, shape=[None,time_step,output_size]) batch_index,train_x,train_y=get_train_data(batch_size,time_step,train_begin,train_end) pred,_=lstm(X) #损失函数 loss=tf.reduce_mean(tf.square(tf.reshape(pred,[-1])-tf.reshape(Y, [-1]))) train_op=tf.train.AdamOptimizer(lr).minimize(loss) saver=tf.train.Saver(tf.global_variables(),max_to_keep=15) #module_file = tf.train.latest_checkpoint('E:/111/TF_RNN(LSTM)') with tf.Session() as sess: sess.run(tf.global_variables_initializer()) #saver.restore(sess, module_file) #重复训练1000次 #A = np.zeros((2000, 2)) np.set_printoptions(suppress=True) for i in range(1500): for step in range(len(batch_index)-1): _, loss_ = sess.run([train_op,loss],feed_dict={X:train_x[batch_index[step]:batch_index[step+1]],Y:train_y[batch_index[step]:batch_index[step+1]]}) A[i, 0] = i A[i, 1] = loss_ if i % 100 == 0: print("保存模型:",saver.save(sess,'F:/python/RNN-Network/stock2.model',global_step=i)) return A with tf.variable_scope('train',reuse=tf.AUTO_REUSE): train_lstm() #————————————————预测模型———————————————————— def prediction(time_step=10): X=tf.placeholder(tf.float32, shape=[None,time_step,input_size]) Y=tf.placeholder(tf.float32, shape=[None,time_step,output_size]) mean,std,test_x,test_y=get_test_data(time_step) pred,_=lstm(X) saver=tf.train.Saver(tf.global_variables()) with tf.Session() as sess: #参数恢复 module_file = tf.train.latest_checkpoint('F:/python/RNN-Network') saver.restore(sess, module_file) test_predict=[] #B = np.zeros((len(test_x)-1, 2)) for step in range(len(test_x)-1): prob=sess.run(pred,feed_dict={X:[test_x[step]]}) predict=prob.reshape((-1)) test_predict.extend(predict) #test_y=np.array(test_y) #test_predict=np.array(test_predict) test_y=np.array(test_y)*std[7]+mean[7] test_predict=np.array(test_predict)*std[7]+mean[7] #acc=np.average(np.abs(test_predict-test_y[:len(test_predict)])/test_y[:len(test_predict)]) print("test_predict:",test_predict) print("test_y:",test_y) #以折线图表示结果 plt.figure() plt.plot(list(range(len(test_predict))), test_predict, color='b') plt.plot(list(range(len(test_y))), test_y, color='r') plt.show() return test_predict with tf.variable_scope('train',reuse=tf.AUTO_REUSE): prediction() 四、预测结果分析

图1为训练集的预测结果,图2为测试集的训练结果 图一: 在这里插入图片描述 图二:在这里插入图片描述 可以看出训练结果很差,后面又查了很多相关的文献,考虑原因是因为 ①高德地图实时路况数据本身存在的误差 ②单纯用之前的拥堵数据预测后面的拥堵误差时很大的,还需多方面考虑其他因素 最终考虑自己想做的时间跨度比较大的拥堵时长预测准确率是很差的,短时的交通拥堵预测才比较现实。虽然效果不太好,也是自己的一次尝试,后面就转用百度地图的数据作分析了,放弃了预测。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3