利用Python调用海康威视综合管理平台openAPI接口

您所在的位置:网站首页 海康威视综合安防平台服务器地址 利用Python调用海康威视综合管理平台openAPI接口

利用Python调用海康威视综合管理平台openAPI接口

2024-07-12 19:53| 来源: 网络整理| 查看: 265

前言:对海康威视综合管理平台iSecureCenter进行二次开发,主要想获取直播流地址,需要进行二次开发。但是官网给出的Demo没有Python版本,这里给出调用示例。

核心问题:获取到指定接口的签名(CA认证),然后才能正常调用各个接口,自己也走了一些弯路。主要问题在:

1、生成签名拼接的字符串,拼接的不对导致出了验证失败;

2、得到了签名不知道怎么用,主要问题还是POST请求的各参数不明白真正含义;

3、平台对应的各端口需要开通;

4、多利用官网给出的小工具进行接口测试。

参考链接:1、https://open.hikvision.com/download/5c67f1e2f05948198c909700?type=10

2、https://open.hikvision.com/docs/3784e07072ee4f09a573d61f20782267

3、https://open.hikvision.com/docs/8530061f19534a9993e2afeb70e7c96a

# -*- coding: utf-8 -*- # _author_ = # Email: """ 协议:仅支持 HTTPS 传输 url格式:https://{hostname}:{port}/artemis/{uri} # AK\SK摘要认证 调用 API 时,如果API需要安全认证,首先需要获取API的授权,得到AppKey和AppSecret; 其次,拼接签名字符串,将计算后的签名放在请求的 Header 传入,网关会通过对称计算签名来验证请求者的身份。 """ import os import base64 import json import time import uuid import hmac # hex-based message authentication code 哈希消息认证码 import hashlib # 提供了很多加密的算法 import requests base_url = "https://ip:4432" # 可以正常访问的IP地址 # 注意增加/artemis api_get_address_url = "/artemis/api/video/v2/cameras/previewURLs" appKey = "2*****" appSecret = "HjjuaNJGsv*******" http_method = "POST" def sign(key, value): temp = hmac.new(key.encode(), value.encode(), digestmod=hashlib.sha256) return base64.b64encode(temp.digest()).decode() x_ca_nonce = str(uuid.uuid4()) x_ca_timestamp = str(int(round(time.time()) * 1000)) # sign_str 的拼接很关键,不然得不到正确的签名 sign_str = "POST\n*/*\napplication/json" + "\nx-ca-key:" + appKey + "\nx-ca-nonce:" + \ x_ca_nonce + "\nx-ca-timestamp:" + x_ca_timestamp + "\n" + \ api_get_address_url signature = sign(appSecret, sign_str) print("[INFO] 获取到的签名值为:", signature) headers = { "Accept": "*/*", "Content-Type": "application/json", "x-ca-key": appKey, # appKey,即 AK "x-ca-signature-headers": "x-ca-key,x-ca-nonce,x-ca-timestamp", "x-ca-signature": signature, # 需要计算得到的签名,此处通过后台得到 "x-ca-timestamp": x_ca_timestamp, # 时间戳 "x-ca-nonce": x_ca_nonce # UUID,结合时间戳防重复 } # 获取监控点的在线流样例 body = { "cameraIndexCode": "36e0421391f7****", "streamType": 0, "protocol": "rtsp", } url = base_url + api_get_address_url results = requests.post(url, data=json.dumps(body), headers=headers, verify=False) print(results) print(results.json() # {'code': '0', 'msg': 'success', 'data': {'url': 'rtsp://ip:554/openUrl/Kp0VqXS'}}

根据以上编码改变的自己业务:

base_url = "https://*****:443" # 可以正常访问的IP地址 appKey = "2****5" appSecret = "1****7" http_method = "POST" """ 签名方法(公共) """ def sign(key, value): temp = hmac.new(key.encode(), value.encode(), digestmod=hashlib.sha256) return base64.b64encode(temp.digest()).decode() """ 获取海康威视摄像头列表 :return: """ @app.route("/get_hikvision_cameras", methods=["POST"]) def get_hikvision_cameras(): try: body = { "pageNo": 1, "pageSize": 100 } #api地址 api_get_address_url = "/artemis/api/resource/v1/cameras" x_ca_nonce = str(uuid.uuid4()) x_ca_timestamp = str(int(round(time.time()) * 1000)) # sign_str 的拼接很关键,不然得不到正确的签名 sign_str = "POST\n*/*\napplication/json" + "\nx-ca-key:" + appKey + "\nx-ca-nonce:" + \ x_ca_nonce + "\nx-ca-timestamp:" + x_ca_timestamp + "\n" + \ api_get_address_url signature = sign(appSecret, sign_str) headers = { "Accept": "*/*", "Content-Type": "application/json", "x-ca-key": appKey, # appKey,即 AK "x-ca-signature-headers": "x-ca-key,x-ca-nonce,x-ca-timestamp", "x-ca-signature": signature, # 需要计算得到的签名,此处通过后台得到 "x-ca-timestamp": x_ca_timestamp, # 时间戳 "x-ca-nonce": x_ca_nonce # UUID,结合时间戳防重复 } url = base_url + api_get_address_url response = requests.post(url, data=json.dumps(body), headers=headers, verify=False) list = response.json()['data']['list'] url_list = [] for camera in list: url = { 'cameraName': camera['cameraName'], 'cameraIndexCode': camera['cameraIndexCode'], 'cameraTypeName': camera['cameraTypeName'] } url_list.append(url) resultJson = { "code": 200, "data": url_list, "message": "success" } return resultJson except Exception as e: print(e) """ 获取当摄像头预览视频地址 :return: """ @app.route("/get_hikvision_cameras_url", methods=["POST"]) def get_hikvision_cameras_url(): try: # 请求参数 cameraIndexCode = request.json.get("cameraIndexCode") #新地址+签名 api_get_address_url = "/artemis/api/video/v2/cameras/previewURLs" x_ca_nonce = str(uuid.uuid4()) x_ca_timestamp = str(int(round(time.time()) * 1000)) sign_str = "POST\n*/*\napplication/json" + "\nx-ca-key:" + appKey + "\nx-ca-nonce:" + \ x_ca_nonce + "\nx-ca-timestamp:" + x_ca_timestamp + "\n" + \ api_get_address_url signature = sign(appSecret, sign_str) headers = { "Accept": "*/*", "Content-Type": "application/json", "x-ca-key": appKey, # appKey,即 AK "x-ca-signature-headers": "x-ca-key,x-ca-nonce,x-ca-timestamp", "x-ca-signature": signature, # 需要计算得到的签名,此处通过后台得到 "x-ca-timestamp": x_ca_timestamp, # 时间戳 "x-ca-nonce": x_ca_nonce # UUID,结合时间戳防重复 } body = { "cameraIndexCode": cameraIndexCode, "streamType": 0, "protocol": "ws", "transmode": 1, "expand": "transcode=0", "streamform": "ps" } url = base_url + api_get_address_url response = requests.post(url, data=json.dumps(body), headers=headers, verify=False) data = response.json()['data'] resultJson = { "code": 200, "data": data, "message": "success" } return resultJson except Exception as e: print(e) """ 获取当前摄像头对讲地址 :return: """ @app.route("/get_hikvision_cameras_talkurl", methods=["POST"]) def get_hikvision_cameras_talkurl(): try: # 请求参数 cameraIndexCode = request.json.get("cameraIndexCode") #新地址+签名 api_get_address_url = "/artemis/api/video/v1/cameras/talkURLs" x_ca_nonce = str(uuid.uuid4()) x_ca_timestamp = str(int(round(time.time()) * 1000)) sign_str = "POST\n*/*\napplication/json" + "\nx-ca-key:" + appKey + "\nx-ca-nonce:" + \ x_ca_nonce + "\nx-ca-timestamp:" + x_ca_timestamp + "\n" + \ api_get_address_url signature = sign(appSecret, sign_str) headers = { "Accept": "*/*", "Content-Type": "application/json", "x-ca-key": appKey, # appKey,即 AK "x-ca-signature-headers": "x-ca-key,x-ca-nonce,x-ca-timestamp", "x-ca-signature": signature, # 需要计算得到的签名,此处通过后台得到 "x-ca-timestamp": x_ca_timestamp, # 时间戳 "x-ca-nonce": x_ca_nonce # UUID,结合时间戳防重复 } body = { "cameraIndexCode": cameraIndexCode, "transmode": 1 } url = base_url + api_get_address_url response = requests.post(url, data=json.dumps(body), headers=headers, verify=False) print(response) data = response.json()['data'] resultJson = { "code": 200, "data": data, "message": "success" } return resultJson except Exception as e: print(e) """ 获取当前摄像头回放地址 :return: """ @app.route("/get_hikvision_cameras_playbackURLs", methods=["POST"]) def get_hikvision_cameras_playbackURLs(): try: # 请求参数 cameraIndexCode = request.json.get("cameraIndexCode") # 开始时间 beginTime = request.json.get("beginTime") # 结束时间 endTime = request.json.get("endTime") #新地址+签名 api_get_address_url = "/artemis/api/video/v2/cameras/playbackURLs" x_ca_nonce = str(uuid.uuid4()) x_ca_timestamp = str(int(round(time.time()) * 1000)) sign_str = "POST\n*/*\napplication/json" + "\nx-ca-key:" + appKey + "\nx-ca-nonce:" + \ x_ca_nonce + "\nx-ca-timestamp:" + x_ca_timestamp + "\n" + \ api_get_address_url signature = sign(appSecret, sign_str) headers = { "Accept": "*/*", "Content-Type": "application/json", "x-ca-key": appKey, # appKey,即 AK "x-ca-signature-headers": "x-ca-key,x-ca-nonce,x-ca-timestamp", "x-ca-signature": signature, # 需要计算得到的签名,此处通过后台得到 "x-ca-timestamp": x_ca_timestamp, # 时间戳 "x-ca-nonce": x_ca_nonce # UUID,结合时间戳防重复 } body = { "cameraIndexCode": cameraIndexCode, "recordLocation": "1", "protocol": "wss", "transmode": 1, "beginTime":beginTime+".000+08:00" , "endTime": endTime+".000+08:00", "expand": "streamform=rtp", "streamform": "ps", "lockType": 0 } print(body) url = base_url + api_get_address_url response = requests.post(url, data=json.dumps(body), headers=headers, verify=False) data = response.json()['data']['url'] resultJson = { "code": 200, "data": data, "message": "success" } return resultJson except Exception as e: print(e)

 

 

 

参考地址:https://blog.csdn.net/weixin_41713230/article/details/112411978



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3