Python 实现海康机器人工业相机 MV

您所在的位置:网站首页 工业相机api Python 实现海康机器人工业相机 MV

Python 实现海康机器人工业相机 MV

2024-04-13 07:10| 来源: 网络整理| 查看: 265

Python 实现海康机器人工业相机 MV-CU060-10GM 的实时显示视频流及拍照功能

 

一、背景介绍

1、最近项目中需要给客户对接海康机器人工业相机  MV-CU060-10GM;

2、客户要求通过部署的管理平台,可以在页面上实现如下功能:

    1)相机视频流开始预览;

    2)相机视频流停止预览;

    3)相机拍照功能。

    需求背景:客户需要对生产的产品进行定期抽样质检,其中涉及到外观检测,比如,样品的表面清洁度、外观等指标。所以,需要先通过管理平台点击相机的“预览”按钮,进行预览相机拍摄的实时效果,当客户认为清晰度和角度满足条件时,才会点击“拍照”按钮进行拍摄,从而保证质检的照片是有意义和有实用价值的。

 

二、调研历程

    由于项目团队同事之前没有做过工业相机视频和拍照的相关开发,于是乎,就开启了“漫长”而“煎熬”的调研之路(断断续续持续了1个多月)。

    最终于2022年12月6日,通过 Python “完美”实现了上述的三个功能。

    特地写下此篇博客,供需要的网友参考,避免少走很多弯路。

 

    1、海康工业相机官网

     https://www.hikrobotics.com/cn/machinevision/productdetail?id=8518&pageNumber=13&pageSize=20

 

    2、官网示例

    可以在海康机器人官网提供的客户端工具 MVS,“帮助”--> "Development",点击“Development”会跳转到安装目录,从“Samples”中获取官方提供的一些简单示例。

     如果已经安装了 MVS,直接进入 C:\Program Files (x86)\MVS\Development\Samples 目录即可看到,目前支持 C#、C++、Java、OpenCV、Python、VB等语言。

     本人主要使用Java和Python,所以,本篇博文主要从Java和Python两种语言调研了实现方案。

 

 

     1)Java示例

      SaveImage 有获取图片的示例,但是,没有视频流获取并显示的示例。(参考示例相对较少)

 

    2)Python示例

    GrabImage 目录下有获取图片的示例,Recording 目录下有获取视频流的示例,但是没有将视频流返回前端的示例。(参考示例相对较多)

    

 

    3、网上博客参考     1)RTSP(Runtime Stream Protocol)协议方向

    (很遗憾,此路不通!!!)

    因为我们用的 海康机器人工业相机  MV-CU060-10GM 这款相机,不支持 RTSP 协议。

    如下博客适用于 海康威视摄像头,并不适用于 海康工业相机,如果是使用海康威视摄像头的小伙伴可以参考下。

    参考博客:海康威视摄像头对接SDK实时预览功能和抓拍功能,懒癌福利,可直接CV

 

    2)Java实现方向

    (不能完全满足客户需求,此路不全通)

    使用Java目前参考官网的示例,实现了图片抓取,并上传的功能,但是没有实现视频流的实时获取和显示的功能。

    如果需求只是获取图片,不要求视频流实时显示,可以通过Java就可以实现。

 

    3)Python实现方向

    (目前,网上没有直接能完全满足上述三个需求的,本人通过借鉴、整合,结合Flask框架实现的)

    参考博客如下:

    python语言下使用opencv接口cv2.VideoCapture()接口调用海康机器人工业相机 (此篇博文可以重点看!!!)

    通过python调用海康威视工业摄像头并进行图像存储,同时使用opencv实时图像显示(数据流问题已解决)

    python调用海康工业相机并用opencv显示(整体实现)(此篇博文可以重点看!!!)

    pyQT5 学习使用 笔记 六 pyQt5+opencv 显示海康GIGE相机动态视频流 (该方式虽然实现了视频的实时显示,但是,无法被给前端直接调用)

    web实时显示摄像头图像(python)  (此篇博文可以重点看!!!)

 

    4)Flask 的相关教程和博客 https://www.w3cschool.cn/flask/ https://dormousehole.readthedocs.io/en/latest/ https://blog.csdn.net/weixin_44239541/article/details/89390139 https://zhuanlan.zhihu.com/p/104273184https://blog.csdn.net/tulan_xiaoxin/article/details/79132214

 

三、Python 代码实现     1、Python 环境     1)Python 版本

    个人使用的是 Python 3.8.5 版本(建议使用该版本或者更高版本)

    Python 官网地址:https://www.python.org/

    安装和配置可以参考:https://www.runoob.com/python3/python3-install.html

 

    2)Python 虚拟环境

    个人建议给该工程单独创建一个Python虚拟环境(如:hikrobotEnv),后续现场如果出现无法连接外网的情况下,可以直接Copy虚拟环境部署,比较方便。

    当然,如果不是特殊情况,多个项目工程可以共用一个虚拟环境。

    可以使用 virtualenv、anaconda、PyCharm 等创建虚拟环境。

    以 virtualenv 为例,创建 hikrobotEnv 虚拟环境的命令如下:

virtualenv hikrobotEnv --python=python3.8.5

    如果没有安装 virtualenv,可以通过如下命令安装:

pip install virtualenv

    关于 virtualenv 的相关介绍和使用,可以参考博客:Python虚拟环境Virtualenv详解

 

    2、测试代码     1)抓取图片测试代码

    前提条件:将 C:\Program Files (x86)\MVS\Development\Samples\Python\ 目录下的 MvImport 目录,copy到自己的工程目录下,创建测试文件 TestGrabImage.py

    代码如下:

# -- coding: utf-8 -- import cv2 import sys import copy import msvcrt import numpy as np from ctypes import * sys.path.append("./MvImport") from MvCameraControl_class import * if __name__ == "__main__": deviceList = MV_CC_DEVICE_INFO_LIST() tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE # ch:枚举设备 | en:Enum device ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList) if ret != 0: print ("enum devices fail! ret[0x%x]" % ret) sys.exit() if deviceList.nDeviceNum == 0: print ("find no device!") sys.exit() print ("find %d devices!" % deviceList.nDeviceNum) for i in range(0, deviceList.nDeviceNum): mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE: print ("\ngige device: [%d]" % i) strModeName = "" for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName: strModeName = strModeName + chr(per) print ("device model name: %s" % strModeName) nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24) nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16) nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8) nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff) print ("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4)) elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE: print ("\nu3v device: [%d]" % i) strModeName = "" for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName: if per == 0: break strModeName = strModeName + chr(per) print ("device model name: %s" % strModeName) strSerialNumber = "" for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber: if per == 0: break strSerialNumber = strSerialNumber + chr(per) print ("user serial number: %s" % strSerialNumber) nConnectionNum = 0 if int(nConnectionNum) >= deviceList.nDeviceNum: print ("intput error!") sys.exit() # ch:创建相机实例 | en:Creat Camera Object cam = MvCamera() # ch:选择设备并创建句柄 | en:Select device and create handle stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents ret = cam.MV_CC_CreateHandle(stDeviceList) if ret != 0: print ("create handle fail! ret[0x%x]" % ret) sys.exit() # ch:打开设备 | en:Open device ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0) if ret != 0: print ("open device fail! ret[0x%x]" % ret) sys.exit() # ch:探测网络最佳包大小(只对GigE相机有效) | en:Detection network optimal package size(It only works for the GigE camera) if stDeviceList.nTLayerType == MV_GIGE_DEVICE: nPacketSize = cam.MV_CC_GetOptimalPacketSize() if int(nPacketSize) > 0: ret = cam.MV_CC_SetIntValue("GevSCPSPacketSize", nPacketSize) if ret != 0: print ("Warning: Set Packet Size fail! ret[0x%x]" % ret) else: print ("Warning: Get Packet Size fail! ret[0x%x]" % nPacketSize) # ch:设置触发模式为off | en:Set trigger mode as off ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF) if ret != 0: print ("set trigger mode fail! ret[0x%x]" % ret) sys.exit() # ch:获取数据包大小 | en:Get payload size stParam = MVCC_INTVALUE() memset(byref(stParam), 0, sizeof(MVCC_INTVALUE)) ret = cam.MV_CC_GetIntValue("PayloadSize", stParam) if ret != 0: print ("get payload size fail! ret[0x%x]" % ret) sys.exit() nPayloadSize = stParam.nCurValue # ch:开始取流 | en:Start grab image ret = cam.MV_CC_StartGrabbing() if ret != 0: print ("start grabbing fail! ret[0x%x]" % ret) sys.exit() stDeviceList = MV_FRAME_OUT_INFO_EX() memset(byref(stDeviceList), 0, sizeof(stDeviceList)) data_buf = (c_ubyte * nPayloadSize)() ret = cam.MV_CC_GetOneFrameTimeout(byref(data_buf), nPayloadSize, stDeviceList, 1000) if ret == 0: print ("get one frame: Width[%d], Height[%d], nFrameNum[%d]" % (stDeviceList.nWidth, stDeviceList.nHeight, stDeviceList.nFrameNum)) nRGBSize = stDeviceList.nWidth * stDeviceList.nHeight * 3 stConvertParam=MV_SAVE_IMAGE_PARAM_EX() stConvertParam.nWidth = stDeviceList.nWidth stConvertParam.nHeight = stDeviceList.nHeight stConvertParam.pData = data_buf stConvertParam.nDataLen = stDeviceList.nFrameLen stConvertParam.enPixelType = stDeviceList.enPixelType stConvertParam.nImageLen = stConvertParam.nDataLen stConvertParam.nJpgQuality = 70 stConvertParam.enImageType = MV_Image_Jpeg stConvertParam.pImageBuffer = (c_ubyte * nRGBSize)() stConvertParam.nBufferSize = nRGBSize # ret = cam.MV_CC_ConvertPixelType(stConvertParam) print(stConvertParam.nImageLen) ret = cam.MV_CC_SaveImageEx2(stConvertParam) if ret != 0: print ("convert pixel fail ! ret[0x%x]" % ret) del data_buf sys.exit() file_path = "AfterConvert_RGB2.jpg" file_open = open(file_path.encode('ascii'), 'wb+') img_buff = (c_ubyte * stConvertParam.nImageLen)() cdll.msvcrmcpy(byref(img_buff), stConvertParam.pImageBuffer, stConvertParam.nImageLen) file_open.write(img_buff) print ("Save Image succeed!") # ch:停止取流 | en:Stop grab image ret = cam.MV_CC_StopGrabbing() if ret != 0: print ("stop grabbing fail! ret[0x%x]" % ret) del data_buf sys.exit() # ch:关闭设备 | Close device ret = cam.MV_CC_CloseDevice() if ret != 0: print ("close deivce fail! ret[0x%x]" % ret) del data_buf sys.exit() # ch:销毁句柄 | Destroy handle ret = cam.MV_CC_DestroyHandle() if ret != 0: print ("destroy handle fail! ret[0x%x]" % ret) del data_buf sys.exit() del data_buf

    在python 虚拟环境中,执行 python TestGrabImage.py 运行后,会在当前目录下生成一个名为 AfterConvert_RGB2.jpg 的图片文件。

    如果运行过程中提示模块不存在,可以通过 pip install 命令安装相应的模块,建议使用清华源,如 安装 opencv-python,命令如下:

pip install opencv-python==4.1.2.30 -i https://pypi.tuna.tsinghua.edu.cn/simple

 

    2)Python+Qt 实现视频流实时显示测试代码

    前提条件:将 C:\Program Files (x86)\MVS\Development\Samples\Python\ 目录下的 MvImport 目录,copy到自己的工程目录下,创建测试文件 TestVideoStream.py

    代码如下:

# -- coding: utf-8 -- import cv2 from PyQt5 import QtCore, QtGui, QtWidgets from PyQt5.QtCore import pyqtSignal from PyQt5.QtGui import * import numpy as np #from CameraControl_header import MV_CC_DEVICE_INFO_LIST #from mainWindow import Ui_MainWindow # 导入创建的GUI类 import sys import threading import msvcrt from ctypes import * sys.path.append("./MvImport") from MvCameraControl_class import * from Ui_MainWindow import * from CameraParams_header import * class mywindow(QtWidgets.QMainWindow, Ui_MainWindow): sendAddDeviceName = pyqtSignal() #定义一个添加设备列表的信号。 deviceList = MV_CC_DEVICE_INFO_LIST() g_bExit = False # ch:创建相机实例 | en:Creat Camera Object cam = MvCamera() def connect_and_emit_sendAddDeviceName(self): # Connect the sendAddDeviceName signal to a slot. self.sendAddDeviceName.connect(self.SelectDevice) # Emit the signal. self.sendAddDeviceName.emit() def __init__(self): super(mywindow, self).__init__() self.setupUi(self) self.connect_and_emit_sendAddDeviceName() self.butopenCam.clicked.connect(lambda:self.openCam(self.camSelect.currentData())) self.butcloseCam.clicked.connect(self.closeCam) # setting main window geometry desktop_geometry = QtWidgets.QApplication.desktop() # 获取屏幕大小 main_window_width = desktop_geometry.width() # 屏幕的宽 main_window_height = desktop_geometry.height() # 屏幕的高 rect = self.geometry() # 获取窗口界面大小 window_width = rect.width() # 窗口界面的宽 window_height = rect.height() # 窗口界面的高 x = (main_window_width - window_width) // 2 # 计算窗口左上角点横坐标 y = (main_window_height - window_height) // 2 # 计算窗口左上角点纵坐标 self.setGeometry(x, y, window_width, window_height) # 设置窗口界面在屏幕上的位置 # 无边框以及背景透明一般不会在主窗口中用到,一般使用在子窗口中,例如在子窗口中显示gif提示载入信息等等 # self.setWindowFlags(Qt.FramelessWindowHint) # self.setAttribute(Qt.WA_TranslucentBackground) #打开摄像头。 def openCam(self,camid): self.g_bExit = False # ch:选择设备并创建句柄 | en:Select device and create handle stDeviceList = cast(self.deviceList.pDeviceInfo[int(camid)], POINTER(MV_CC_DEVICE_INFO)).contents ret = self.cam.MV_CC_CreateHandle(stDeviceList) if ret != 0: print("create handle fail! ret[0x%x]" % ret) sys.exit() # ch:打开设备 | en:Open device ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0) if ret != 0: print("open device fail! ret[0x%x]" % ret) sys.exit() # ch:探测网络最佳包大小(只对GigE相机有效) | en:Detection network optimal package size(It only works for the GigE camera) if stDeviceList.nTLayerType == MV_GIGE_DEVICE: nPacketSize = self.cam.MV_CC_GetOptimalPacketSize() if int(nPacketSize) > 0: ret = self.cam.MV_CC_SetIntValue("GevSCPSPacketSize", nPacketSize) if ret != 0: print("Warning: Set Packet Size fail! ret[0x%x]" % ret) else: print("Warning: Get Packet Size fail! ret[0x%x]" % nPacketSize) # ch:设置触发模式为off | en:Set trigger mode as off ret = self.cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF) if ret != 0: print("set trigger mode fail! ret[0x%x]" % ret) sys.exit() # ch:获取数据包大小 | en:Get payload size stParam = MVCC_INTVALUE() memset(byref(stParam), 0, sizeof(MVCC_INTVALUE)) ret = self.cam.MV_CC_GetIntValue("PayloadSize", stParam) if ret != 0: print("get payload size fail! ret[0x%x]" % ret) sys.exit() nPayloadSize = stParam.nCurValue # ch:开始取流 | en:Start grab image ret = self.cam.MV_CC_StartGrabbing() if ret != 0: print("start grabbing fail! ret[0x%x]" % ret) sys.exit() data_buf = (c_ubyte * nPayloadSize)() try: hThreadHandle = threading.Thread(target=self.work_thread, args=(self.cam, data_buf, nPayloadSize)) hThreadHandle.start() except: print("error: unable to start thread") #关闭相机 def closeCam(self): self.g_bExit=True # ch:停止取流 | en:Stop grab image ret = self.cam.MV_CC_StopGrabbing() if ret != 0: print("stop grabbing fail! ret[0x%x]" % ret) sys.exit() # ch:关闭设备 | Close device ret = self.cam.MV_CC_CloseDevice() if ret != 0: print("close deivce fail! ret[0x%x]" % ret) # ch:销毁句柄 | Destroy handle ret = self.cam.MV_CC_DestroyHandle() if ret != 0: print("destroy handle fail! ret[0x%x]" % ret) def work_thread(self,cam=0, pData=0, nDataSize=0): stFrameInfo = MV_FRAME_OUT_INFO_EX() memset(byref(stFrameInfo), 0, sizeof(stFrameInfo)) while True: QIm = np.asarray(pData) # 将c_ubyte_Array转化成ndarray得到(3686400,) QIm = QIm.reshape((2048, 3072, 1)) # 根据自己分辨率进行转化 # print(temp) # print(temp.shape) QIm = cv2.cvtColor(QIm, cv2.COLOR_BGR2RGB) # 这一步获取到的颜色不对,因为默认是BRG,要转化成RGB,颜色才正常 pyrD1=cv2.pyrDown(QIm) #向下取样 pyrD2 = cv2.pyrDown(pyrD1) # 向下取样 image_height, image_width, image_depth = pyrD2.shape # 读取图像高宽深度 pyrD3 = QImage(pyrD2, image_width, image_height, image_width * image_depth,QImage.Format_RGB888) self.label.setPixmap(QPixmap.fromImage(pyrD3)) #cv2.namedWindow("result", cv2.WINDOW_AUTOSIZE) #cv2.imshow("result", temp) #if cv2.waitKey(1) & 0xFF == ord('q'): # break ret = cam.MV_CC_GetOneFrameTimeout(pData, nDataSize, stFrameInfo, 1000) if ret == 0: print("get one frame: Width[%d], Height[%d], nFrameNum[%d]" % ( stFrameInfo.nWidth, stFrameInfo.nHeight, stFrameInfo.nFrameNum)) else: print("no data[0x%x]" % ret) if self.g_bExit == True: del pData break #获得所有相机的列表存入cmbSelectDevice中 def SelectDevice(self): '''选择所有能用的相机到列表中, gige相机需要配合 sdk 得到。 ''' #得到相机列表 tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE # ch:枚举设备 | en:Enum device ret = MvCamera.MV_CC_EnumDevices(tlayerType, self.deviceList) if ret != 0: print("enum devices fail! ret[0x%x]" % ret) sys.exit() if self.deviceList.nDeviceNum == 0: print("find no device!") sys.exit() print("Find %d devices!" % self.deviceList.nDeviceNum) for i in range(0, self.deviceList.nDeviceNum): mvcc_dev_info = cast(self.deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE: print("\ngige device: [%d]" % i) strModeName = "" for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName: strModeName = strModeName + chr(per) print("device model name: %s" % strModeName) self.camSelect.addItem(strModeName,i) #写入设备列表。 def pushbutton_function(self): #do some things Img=cv2.imread('JP1.JPG') #通过opencv读入一张图片 image_height, image_width, image_depth=Img.shape #读取图像高宽深度 QIm=cv2.cvtColor(Img,cv2.COLOR_BGR2RGB) QIm=QImage(QIm.data, image_width, image_height, image_width * image_depth,QImage.Format_RGB888) self.label.setPixmap(QPixmap.fromImage(QIm)) if __name__ == '__main__': app = QtWidgets.QApplication(sys.argv) window = mywindow() window.show() sys.exit(app.exec_())

 

上述代码中用到的 Ui_MainWindow.py,放到 MvImport 目录下。

代码如下:

# -*- coding: utf-8 -*- # Form implementation generated from reading ui file 'mainWindow.ui' # # Created by: PyQt5 UI code generator 5.15.0 # # WARNING: Any manual changes made to this file will be lost when pyuic5 is # run again. Do not edit this file unless you know what you are doing. from PyQt5 import QtCore, QtGui, QtWidgets class Ui_MainWindow(object): def setupUi(self, MainWindow): MainWindow.setObjectName("MainWindow") MainWindow.resize(589, 530) self.centralwidget = QtWidgets.QWidget(MainWindow) self.centralwidget.setObjectName("centralwidget") self.verticalLayout = QtWidgets.QVBoxLayout(self.centralwidget) self.verticalLayout.setObjectName("verticalLayout") self.camSelect = QtWidgets.QComboBox(self.centralwidget) self.camSelect.setObjectName("camSelect") self.verticalLayout.addWidget(self.camSelect) self.label = QtWidgets.QLabel(self.centralwidget) self.label.setObjectName("label") self.verticalLayout.addWidget(self.label) self.butopenCam = QtWidgets.QPushButton(self.centralwidget) self.butopenCam.setObjectName("butopenCam") self.verticalLayout.addWidget(self.butopenCam) self.butcloseCam = QtWidgets.QPushButton(self.centralwidget) self.butcloseCam.setObjectName("butcloseCam") self.verticalLayout.addWidget(self.butcloseCam) MainWindow.setCentralWidget(self.centralwidget) self.menubar = QtWidgets.QMenuBar(MainWindow) self.menubar.setGeometry(QtCore.QRect(0, 0, 589, 23)) self.menubar.setObjectName("menubar") MainWindow.setMenuBar(self.menubar) self.statusbar = QtWidgets.QStatusBar(MainWindow) self.statusbar.setObjectName("statusbar") MainWindow.setStatusBar(self.statusbar) self.retranslateUi(MainWindow) QtCore.QMetaObject.connectSlotsByName(MainWindow) def retranslateUi(self, MainWindow): _translate = QtCore.QCoreApplication.translate MainWindow.setWindowTitle(_translate("MainWindow", "MainWindow")) self.label.setText(_translate("MainWindow", "TextLabel")) self.butopenCam.setText(_translate("MainWindow", "打开相机")) self.butcloseCam.setText(_translate("MainWindow", "关闭相机"))

 

在python 虚拟环境中,执行 python TestVideoStream.py 运行,可以看到如下视频中的效果:

    如果运行过程中提示模块不存在,可以通过 pip install 命令安装相应的模块,建议使用清华源,如 安装 opencv-python,命令如下:

pip install opencv-python==4.1.2.30 -i https://pypi.tuna.tsinghua.edu.cn/simple

    通过视频中的效果可以看出:虽然实现了视频流的实时获取和显示,但是无法直接对接前端展示,不够友好。 

 

四、最终代码(完整版)

   之前的两个测试代码,仅作为测试参考,并非最终代码!!!

   此段落中的代码,才是最终代码,可以重点参考!!!

    使用 Python + OpenCV + Flask 实现,可以满足视频实时获取,并返回通过GET请求返回给前端进行实时显示,也可以抓取图片,保存上传。

 

    1、添加依赖文件

    将 C:\Program Files (x86)\MVS\Development\Samples\Python\ 目录下的 MvImport 目录,copy到自己的工程目录下。

   

    2、安装插件 DirectShow     1)进入第三方插件 DirectShow 路径 cd C:\Program Files (x86)\MVS\Development\ThirdPartyPlatformAdapter\DirectShow\x64\MvDSS2

 

    2)安装 DirectShow

    使用 管理员权限 运行 InstallDSSvc_x64.bat

    (本人使用的是64位Windows操作系统,运行代码也是基于64位运行,故而使用该版本;32位系统在上一目录中也存在,大家可以根据实际情况进行安装)

 

    3、安装依赖模块     1)安装 opencv-python pip install opencv-python==4.1.2.30 -i https://pypi.tuna.tsinghua.edu.cn/simple

    个人使用的是 Python 3.8.5,opencv-python 对应使用 4.1.2.30 版本即可。

     如果使用的 opencv-python 的版本过高,可能报如下错误:

cv2.error: Unknown C++ exception from OpenCV code.

 

    2)安装 Flask pip install flask -i https://pypi.tuna.tsinghua.edu.cn/simple

    也可以指定 Flask 的版本,如:

pip install flask=2.2.2 -i https://pypi.tuna.tsinghua.edu.cn/simple

 

    4、核心代码

    工程代码结构如下:

  

    0)index.html

    在工程中创建 templates 目录,用于存放 index.hml

拍照预览         1)JsonResponse.py

    用于规范返回给前端的数据类型,代码如下:

# -- coding: utf-8 -- class JsonResponse(object): """ 统一的json返回格式 """ def __init__(self, code, msg, data): self.code = code self.msg = msg self.data = data @classmethod def success(cls, code=0, msg='success', data=None): return cls(code, msg, data) @classmethod def error(cls, code=-1, msg='error', data=None): return cls(code, msg, data) def to_dict(self): return { "code": self.code, "msg": self.msg, "data": self.data }

 

    2)JsonFlask.py

    用于重定义数据返回格式,代码如下:

# -- coding: utf-8 -- from flask import Flask, jsonify from JsonResponse import * class JsonFlask(Flask): def make_response(self, rv): """视图函数可以直接返回: list、dict、None""" if rv is None or isinstance(rv, (list, dict)): rv = JsonResponse.success(rv) if isinstance(rv, JsonResponse): rv = jsonify(rv.to_dict()) return super().make_response(rv)

 

    3)HikRobotCamera.py

    核心代码文件,实现如下功能:

    开始预览视频流、停止预览视频流、获取图片、记录日志等功能。

 

    代码如下:

# -- coding: utf-8 -- import cv2 from flask import Flask, render_template, Response import sys import msvcrt import base64 import datetime import logging sys.path.append("./MvImport") from MvCameraControl_class import * from JsonResponse import * from JsonFlask import * logging.basicConfig(level=logging.DEBUG,#控制台打印的日志级别 filename='hikrobot.log', filemode='a',##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志 #a是追加模式,默认如果不写的话,就是追加模式 format= '%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s' #日志格式 ) # 这里配置一下 template_folder为当前目录,不然可以找不到 index.html app = JsonFlask(__name__, template_folder='.') # index @app.route('/') def index(): return render_template('./templates/index.html') # 获取码流 def generate(cap): # 捕获异常信息 try: while True: # 如果是关闭相机,先退出取视频流的循环 global open if (not open): break; retgrab = cap.grab() if retgrab == True: logging.debug("Grab true") ret1, frame = cap.retrieve() # print(type(frame)) if frame is None: logging.error("frame is None") continue ret1, jpeg = cv2.imencode('.jpg', frame) jpg_frame = jpeg.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + jpg_frame + b'\r\n') except Exception as e: logging.error("generate error: %s" % str(e)) # 开始预览 @app.route('/startPreview') def startPreview(): logging.info("======================================") logging.info("start to preview video stream, current_time: " + str(datetime.datetime.now())) # 全局变量,用于控制获取视频流的开关状态 global open open = True # 全局变量,获取视频连接 global cap cap = cv2.VideoCapture(1) if False == cap.isOpened(): logging.error("can't open camera") quit() else: logging.info("start to open camera") logging.info("open camera ok") # 分辨率设置 3072*2048(海康机器人工业相机 MV-CU060-10GM) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3072) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 2048) # 帧率配置 cap.set(cv2.CAP_PROP_FPS, 15) return Response(generate(cap), mimetype='multipart/x-mixed-replace;boundary=frame') # 停止预览 @app.route('/stopPreview') def stopPreview(): logging.info("======================================") logging.info("stop to preview video stream, current_time: " + str(datetime.datetime.now())) logging.info("start to close camera") # 全局变量,用于停止循环 global open open = False logging.info("release resources start") # 全局变量,用于释放相机资源 try: global cap cap.release() cv2.destroyAllWindows() except Exception as e: logging.error("stopPreview error: %s" % str(e)) logging.info("release resources end") logging.info("camera closed successfully, current_time: " + str(datetime.datetime.now())) logging.info("======================================") return "stop to preview" @app.route('/openAndSave') def openAndSave(): logging.info("======================================") logging.info("start to grab image, current_time: " + str(datetime.datetime.now())) code = 100000 msg = "连接相机时发生错误" # img_base64 = None try: deviceList = MV_CC_DEVICE_INFO_LIST() tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE # ch:枚举设备 | en:Enum device ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList) if ret != 0: logging.error("enum devices fail! ret[0x%x]" % ret) sys.exit() if deviceList.nDeviceNum == 0: logging.error("find no device!") sys.exit() logging.info("find %d devices!" % deviceList.nDeviceNum) for i in range(0, deviceList.nDeviceNum): mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE: logging.info("\ngige device: [%d]" % i) strModeName = "" for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName: strModeName = strModeName + chr(per) logging.info("device model name: %s" % strModeName) nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24) nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16) nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8) nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff) logging.info("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4)) elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE: logging.info("\nu3v device: [%d]" % i) strModeName = "" for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName: if per == 0: break strModeName = strModeName + chr(per) logging.info("device model name: %s" % strModeName) strSerialNumber = "" for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber: if per == 0: break strSerialNumber = strSerialNumber + chr(per) logging.info("user serial number: %s" % strSerialNumber) nConnectionNum = 0 if int(nConnectionNum) >= deviceList.nDeviceNum: logging.error("intput error!") sys.exit() # ch:创建相机实例 | en:Creat Camera Object cam = MvCamera() # ch:选择设备并创建句柄 | en:Select device and create handle stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents ret = cam.MV_CC_CreateHandle(stDeviceList) if ret != 0: logging.error("create handle fail! ret[0x%x]" % ret) sys.exit() # ch:打开设备 | en:Open device ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0) if ret != 0: logging.error("open device fail! ret[0x%x]" % ret) sys.exit() # ch:探测网络最佳包大小(只对GigE相机有效) | en:Detection network optimal package size(It only works for the GigE camera) if stDeviceList.nTLayerType == MV_GIGE_DEVICE: nPacketSize = cam.MV_CC_GetOptimalPacketSize() if int(nPacketSize) > 0: ret = cam.MV_CC_SetIntValue("GevSCPSPacketSize", nPacketSize) if ret != 0: logging.warn("Warning: Set Packet Size fail! ret[0x%x]" % ret) else: logging.warn("Warning: Get Packet Size fail! ret[0x%x]" % nPacketSize) # ch:设置触发模式为off | en:Set trigger mode as off ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF) if ret != 0: logging.error("set trigger mode fail! ret[0x%x]" % ret) sys.exit() # ch:获取数据包大小 | en:Get payload size stParam = MVCC_INTVALUE() memset(byref(stParam), 0, sizeof(MVCC_INTVALUE)) ret = cam.MV_CC_GetIntValue("PayloadSize", stParam) if ret != 0: logging.error("get payload size fail! ret[0x%x]" % ret) sys.exit() nPayloadSize = stParam.nCurValue # ch:开始取流 | en:Start grab image ret = cam.MV_CC_StartGrabbing() if ret != 0: logging.error("start grabbing fail! ret[0x%x]" % ret) sys.exit() stDeviceList = MV_FRAME_OUT_INFO_EX() memset(byref(stDeviceList), 0, sizeof(stDeviceList)) data_buf = (c_ubyte * nPayloadSize)() ret = cam.MV_CC_GetOneFrameTimeout(byref(data_buf), nPayloadSize, stDeviceList, 1000) if ret == 0: logging.info("get one frame: Width[%d], Height[%d], nFrameNum[%d]" % (stDeviceList.nWidth, stDeviceList.nHeight, stDeviceList.nFrameNum)) nRGBSize = stDeviceList.nWidth * stDeviceList.nHeight * 3 stConvertParam=MV_SAVE_IMAGE_PARAM_EX() stConvertParam.nWidth = stDeviceList.nWidth stConvertParam.nHeight = stDeviceList.nHeight stConvertParam.pData = data_buf stConvertParam.nDataLen = stDeviceList.nFrameLen stConvertParam.enPixelType = stDeviceList.enPixelType stConvertParam.nImageLen = stConvertParam.nDataLen stConvertParam.nJpgQuality = 70 stConvertParam.enImageType = MV_Image_Jpeg stConvertParam.pImageBuffer = (c_ubyte * nRGBSize)() stConvertParam.nBufferSize = nRGBSize # ret = cam.MV_CC_ConvertPixelType(stConvertParam) logging.info("nImageLen: %d" % stConvertParam.nImageLen) ret = cam.MV_CC_SaveImageEx2(stConvertParam) if ret != 0: logging.error("convert pixel fail ! ret[0x%x]" % ret) del data_buf sys.exit() #file_path = "AfterConvert_RGB2.jpg" #file_open = open(file_path, 'wb+') #file_open = open(file_path.encode('utf8'), 'wb') img_buff = (c_ubyte * stConvertParam.nImageLen)() cdll.msvcrmcpy(byref(img_buff), stConvertParam.pImageBuffer, stConvertParam.nImageLen) #file_open.write(img_buff) # 对返回的图片进行 base64 格式转换 img_base64 = "data:image/jpg;base64," + str(base64.b64encode(img_buff)).split("'")[1] code = 200 msg = "success" logging.info("Save Image succeed!") # ch:停止取流 | en:Stop grab image ret = cam.MV_CC_StopGrabbing() if ret != 0: logging.error("stop grabbing fail! ret[0x%x]" % ret) del data_buf sys.exit() # ch:关闭设备 | Close device ret = cam.MV_CC_CloseDevice() if ret != 0: logging.error("close deivce fail! ret[0x%x]" % ret) del data_buf sys.exit() # ch:销毁句柄 | Destroy handle ret = cam.MV_CC_DestroyHandle() if ret != 0: logging.error("destroy handle fail! ret[0x%x]" % ret) del data_buf sys.exit() del data_buf except Exception as e: logging.error("openAndSave error: %s" % str(e)) # print("openAndSave finished, current_time: " + str(datetime.datetime.now())) return JsonResponse(code, msg, img_base64) # 执行web服务, 端口号可自行修订 logging.info("start to run camera app, current_time: " + str(datetime.datetime.now())) app.run(host='0.0.0.0', port=65432, debug=True, threaded=True)

 

    5、运行代码

    进入Python虚拟环境,执行如下命令:

python HikRobotCamera.py

 

    6、功能验证     1)开始预览

    可以将 http://127.0.0.1:65432/startPreview 在前端用 img 标签 显示视频的实时预览效果。

http://127.0.0.1:65432/startPreview

 

    2)停止预览

    由于使用的这款海康机器人工业相机(MV-CU060-10GM)只能创建一个连接,所以,当预览完实时视频,需要调用该接口释放相机资源,避免资源被长期占用。

http://127.0.0.1:65432/stopPreview

 

    3)获取图片

    返回 base64 格式 的图片,前端可以直接接收显示,调用上传接口保存。

http://127.0.0.1:65432/openAndSave

 

    7、运行效果

    由于网速问题和相机没有光圈,相机的拍摄效果有点不清晰。运行效果如下:

 

    8、日志查看

    工程目录下回生产日志文件 hikrobot.log,内容如下:

    

9、彩蛋

HikRobotCamera.py 的改良优化版,对获取图片方法做了简化,更为简洁,代码如下:

# -- coding: utf-8 -- import cv2 from flask import Flask, render_template, Response import sys import msvcrt import base64 import datetime import logging sys.path.append("./MvImport") from MvCameraControl_class import * from JsonResponse import * from JsonFlask import * logging.basicConfig(level=logging.DEBUG,#控制台打印的日志级别 filename='hikrobot.log', filemode='a',##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志 #a是追加模式,默认如果不写的话,就是追加模式 format= '%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s' #日志格式 ) # 这里配置一下 template_folder为当前目录,不然可以找不到 index.html app = JsonFlask(__name__, template_folder='.') # index @app.route('/') def index(): return render_template('./templates/index.html') # 获取码流 def generate(cap): # 捕获异常信息 try: while True: # 如果是关闭相机,先退出取视频流的循环 global open if (not open): break; retgrab = cap.grab() if retgrab == True: logging.debug("Grab true") ret1, frame = cap.retrieve() # print(type(frame)) if frame is None: logging.error("frame is None") continue ret1, jpeg = cv2.imencode('.jpg', frame) jpg_frame = jpeg.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + jpg_frame + b'\r\n') except Exception as e: logging.error("generate error: %s" % str(e)) # 开始预览 @app.route('/startPreview') def startPreview(): logging.info("================== startPreview start ====================") logging.info("start to preview video stream, current_time: " + str(datetime.datetime.now())) # 全局变量,用于控制获取视频流的开关状态 try: global open open = True # 全局变量,获取视频连接 global cap cap = cv2.VideoCapture(1) if False == cap.isOpened(): logging.error("startPreview -- can't open camera") quit() else: logging.info("startPreview -- start to open camera") logging.info("startPreview -- open camera ok") # 分辨率设置 3072*2048(海康机器人工业相机 MV-CU060-10GM) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3072) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 2048) # 帧率配置 cap.set(cv2.CAP_PROP_FPS, 15) response = Response(generate(cap), mimetype='multipart/x-mixed-replace;boundary=frame') except Exception as e: logging.error("startPreview error: %s" % str(e)) return response # 停止预览 @app.route('/stopPreview') def stopPreview(): logging.info("================== stopPreview start ====================") logging.info("stop to preview video stream, current_time: " + str(datetime.datetime.now())) logging.info("start to close camera") # 全局变量,用于停止循环 global open open = False logging.info("release resources start") # 全局变量,用于释放相机资源 try: global cap cap.release() cv2.destroyAllWindows() except Exception as e: logging.error("stopPreview error: %s" % str(e)) logging.info("release resources end") logging.info("camera closed successfully, current_time: " + str(datetime.datetime.now())) logging.info("=================== stopPreview end ===================") return "stop to preview" # 获取base64图片 @app.route('/grabImage') def grabImage(): code = 100000 msg = "连接相机时发生错误" # 捕获异常信息 try: print("grabImage -- stopPreview start") stopPreview() print("grabImage -- stopPreview end") # 全局变量,获取视频连接 global cap cap = cv2.VideoCapture(1) if False == cap.isOpened(): logging.error("can't open camera") quit() else: logging.info("grabImage -- start to open camera") logging.info("grabImage -- open camera ok") # 分辨率设置 3072*2048(海康机器人工业相机 MV-CU060-10GM) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3072) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 2048) # 帧率配置 cap.set(cv2.CAP_PROP_FPS, 15) retgrab1 = cap.grab() if retgrab1 == True: logging.debug("grabImage -- Grab true") ret1, frame = cap.retrieve() ret1, jpeg = cv2.imencode('.jpg', frame) img_buff = jpeg.tobytes() # 对返回的图片进行 base64 格式转换 img_base64 = "data:image/jpg;base64," + str(base64.b64encode(img_buff)).split("'")[1] code = 200 msg = "success" stopPreview() except Exception as e: logging.error("generate error: %s" % str(e)) return JsonResponse(code, msg, img_base64) # 获取base64图片 @app.route('/hik/openAndSave') def openAndSave(): code = 100000 msg = "连接相机时发生错误" # 捕获异常信息 try: logging.info("================= openAndSave start =====================") # 拍照之前,先停止预览 stopPreview() logging.info("start to grab image, current_time: " + str(datetime.datetime.now())) # 全局变量,获取视频连接 global cap cap = cv2.VideoCapture(1) if False == cap.isOpened(): logging.error("openAndSave -- can't open camera") quit() else: logging.info("openAndSave -- start to open camera") logging.info("openAndSave -- open camera ok") # 分辨率设置 3072*2048(海康机器人工业相机 MV-CU060-10GM) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3072) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 2048) # 帧率配置 cap.set(cv2.CAP_PROP_FPS, 15) retgrab1 = cap.grab() if retgrab1 == True: logging.debug("openAndSave -- Grab true") ret1, frame = cap.retrieve() ret1, jpeg = cv2.imencode('.jpg', frame) img_buff = jpeg.tobytes() # 对返回的图片进行 base64 格式转换 img_base64 = "data:image/jpg;base64," + str(base64.b64encode(img_buff)).split("'")[1] code = 200 msg = "success" stopPreview() logging.info("================= openAndSave end =====================") except Exception as e: logging.error("openAndSave error: %s" % str(e)) return JsonResponse(code, msg, img_base64) @app.route('/hik/openAndSave2') def openAndSave2(): # 拍照之前,先停止预览 stopPreview() logging.info("======================================") logging.info("start to grab image, current_time: " + str(datetime.datetime.now())) code = 100000 msg = "连接相机时发生错误" # img_base64 = None try: deviceList = MV_CC_DEVICE_INFO_LIST() tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE # ch:枚举设备 | en:Enum device ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList) if ret != 0: logging.error("enum devices fail! ret[0x%x]" % ret) sys.exit() if deviceList.nDeviceNum == 0: logging.error("find no device!") sys.exit() logging.info("find %d devices!" % deviceList.nDeviceNum) for i in range(0, deviceList.nDeviceNum): mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE: logging.info("\ngige device: [%d]" % i) strModeName = "" for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName: strModeName = strModeName + chr(per) logging.info("device model name: %s" % strModeName) nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24) nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16) nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8) nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff) logging.info("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4)) elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE: logging.info("\nu3v device: [%d]" % i) strModeName = "" for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName: if per == 0: break strModeName = strModeName + chr(per) logging.info("device model name: %s" % strModeName) strSerialNumber = "" for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber: if per == 0: break strSerialNumber = strSerialNumber + chr(per) logging.info("user serial number: %s" % strSerialNumber) nConnectionNum = 0 if int(nConnectionNum) >= deviceList.nDeviceNum: logging.error("intput error!") sys.exit() # ch:创建相机实例 | en:Creat Camera Object cam = MvCamera() # ch:选择设备并创建句柄 | en:Select device and create handle stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents ret = cam.MV_CC_CreateHandle(stDeviceList) if ret != 0: logging.error("create handle fail! ret[0x%x]" % ret) sys.exit() # ch:打开设备 | en:Open device ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0) if ret != 0: logging.error("open device fail! ret[0x%x]" % ret) sys.exit() # ch:探测网络最佳包大小(只对GigE相机有效) | en:Detection network optimal package size(It only works for the GigE camera) if stDeviceList.nTLayerType == MV_GIGE_DEVICE: nPacketSize = cam.MV_CC_GetOptimalPacketSize() if int(nPacketSize) > 0: ret = cam.MV_CC_SetIntValue("GevSCPSPacketSize", nPacketSize) if ret != 0: logging.warn("Warning: Set Packet Size fail! ret[0x%x]" % ret) else: logging.warn("Warning: Get Packet Size fail! ret[0x%x]" % nPacketSize) # ch:设置触发模式为off | en:Set trigger mode as off ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF) if ret != 0: logging.error("set trigger mode fail! ret[0x%x]" % ret) sys.exit() # ch:获取数据包大小 | en:Get payload size stParam = MVCC_INTVALUE() memset(byref(stParam), 0, sizeof(MVCC_INTVALUE)) ret = cam.MV_CC_GetIntValue("PayloadSize", stParam) if ret != 0: logging.error("get payload size fail! ret[0x%x]" % ret) sys.exit() nPayloadSize = stParam.nCurValue # ch:开始取流 | en:Start grab image ret = cam.MV_CC_StartGrabbing() if ret != 0: logging.error("start grabbing fail! ret[0x%x]" % ret) sys.exit() stDeviceList = MV_FRAME_OUT_INFO_EX() memset(byref(stDeviceList), 0, sizeof(stDeviceList)) data_buf = (c_ubyte * nPayloadSize)() ret = cam.MV_CC_GetOneFrameTimeout(byref(data_buf), nPayloadSize, stDeviceList, 1000) if ret == 0: logging.info("get one frame: Width[%d], Height[%d], nFrameNum[%d]" % (stDeviceList.nWidth, stDeviceList.nHeight, stDeviceList.nFrameNum)) nRGBSize = stDeviceList.nWidth * stDeviceList.nHeight * 3 stConvertParam=MV_SAVE_IMAGE_PARAM_EX() stConvertParam.nWidth = stDeviceList.nWidth stConvertParam.nHeight = stDeviceList.nHeight stConvertParam.pData = data_buf stConvertParam.nDataLen = stDeviceList.nFrameLen stConvertParam.enPixelType = stDeviceList.enPixelType stConvertParam.nImageLen = stConvertParam.nDataLen stConvertParam.nJpgQuality = 70 stConvertParam.enImageType = MV_Image_Jpeg stConvertParam.pImageBuffer = (c_ubyte * nRGBSize)() stConvertParam.nBufferSize = nRGBSize # ret = cam.MV_CC_ConvertPixelType(stConvertParam) logging.info("nImageLen: %d" % stConvertParam.nImageLen) ret = cam.MV_CC_SaveImageEx2(stConvertParam) if ret != 0: logging.error("convert pixel fail ! ret[0x%x]" % ret) del data_buf sys.exit() #file_path = "AfterConvert_RGB2.jpg" #file_open = open(file_path, 'wb+') #file_open = open(file_path.encode('utf8'), 'wb') img_buff = (c_ubyte * stConvertParam.nImageLen)() cdll.msvcrmcpy(byref(img_buff), stConvertParam.pImageBuffer, stConvertParam.nImageLen) #file_open.write(img_buff) # 对返回的图片进行 base64 格式转换 img_base64 = "data:image/jpg;base64," + str(base64.b64encode(img_buff)).split("'")[1] code = 200 msg = "success" logging.info("Save Image succeed!") # ch:停止取流 | en:Stop grab image ret = cam.MV_CC_StopGrabbing() if ret != 0: logging.error("stop grabbing fail! ret[0x%x]" % ret) del data_buf sys.exit() # ch:关闭设备 | Close device ret = cam.MV_CC_CloseDevice() if ret != 0: logging.error("close deivce fail! ret[0x%x]" % ret) del data_buf sys.exit() # ch:销毁句柄 | Destroy handle ret = cam.MV_CC_DestroyHandle() if ret != 0: logging.error("destroy handle fail! ret[0x%x]" % ret) del data_buf sys.exit() del data_buf except Exception as e: logging.error("openAndSave error: %s" % str(e)) # print("openAndSave finished, current_time: " + str(datetime.datetime.now())) return JsonResponse(code, msg, img_base64) # 执行web服务, 端口号可自行修订 logging.info("start to run camera app, current_time: " + str(datetime.datetime.now())) if __name__ == '__main__': try: app.run(host='0.0.0.0', port=65432, debug=True, threaded=True) except Exception as e: logging.error("app error: %s" % str(e))

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3