Python 使用 WeChatFerry 搭建部署微信机器人详细教程(更新中)

您所在的位置:网站首页 微信机器人是怎么弄的 Python 使用 WeChatFerry 搭建部署微信机器人详细教程(更新中)

Python 使用 WeChatFerry 搭建部署微信机器人详细教程(更新中)

2024-06-09 15:09| 来源: 网络整理| 查看: 265

下载安装 wcferry 库

通过 pip 快速安装 wcferry

pip install wcferry

免责声明:仅供学习和技术研究使用,不得用于任何商业或非法行为,否则后果自负。

基本原理

当微信收到消息时,抢在微信处理(显示到页面)前,先让工具处理,处理完之后再交还给原来的处理模块。需要发送消息时,模拟微信发送消息,组装好消息体,调用微信发送消息的模块。获取联系人,则是遍历一块特定的内存空间。通过好友验证,则是组装好验证信息,调用微信的验证模块。数据库相关功能,则是通过获取到数据库句柄,基于 sqlite3 的接口来执行。

在这里插入图片描述 博客地址:微信机器人 DIY 从 0 到 1

Wcf 函数说明

WeChatFerry:一个玩微信的工具(函数说明)

函数名称描述返回类型cleanup关闭连接 回收资源Nonekeep_running阻塞进程(让 RPC 一直维持连接)is_receiving_msg是否已启动接收消息功能boolget_qrcode获取登录二维码(已经登录则返回空字符串)stris_login检查登录状态boolget_self_wxid获取登录账号的 wxidstrget_msg_types获取所有消息类型Dictget_contacts获取所有联系人List[Dict]get_friends获取所有好友List[Dict]get_dbs获取数据库List[str]get_tables获取某数据库下的表List[Dict]get_user_info获取登录账号个人信息Dictget_audio_msg取语音消息并转成 MP3strsend_text发送文本消息(可 @)int_download_file下载文件str_process_path处理路径(如果是网络路径则下载文件)strsend_image发送图片(非线程安全)intsend_file发送文件(非线程安全)intsend_xml发送 XMLintsend_emotion发送表情intsend_rich_text发送富文本消息intsend_pat_msg拍一拍群友intforward_msg转发消息intget_msg从消息队列中获取消息WxMsgenable_receiving_msg允许接收消息boolenable_recv_msg允许接收消息(旧接口)booldisable_recv_msg停止接收消息boolquery_sql执行 SQL 查询List[Dict]accept_new_friend接受好友申请intrefresh_pyq刷新朋友圈intdownload_attach下载附件intget_info_by_wxid通过 wxid 查询微信号昵称等信息dictrevoke_msg撤回消息intdecrypt_image解密图片strget_ocr_result获取 OCR 结果strdownload_image下载图片stradd_chatroom_members添加群成员intdel_chatroom_members删除群成员intinvite_chatroom_members邀请群成员intget_chatroom_members获取群成员Dictget_alias_in_chatroom获取群名片strreceive_transfer接收转账int 电脑端检测登录微信 from wcferry import Wcf wcf = Wcf() 检测微信登陆状态

检查当前 PC 端微信登陆状态?

from wcferry import Wcf wcf = Wcf() print(wcf.is_login()) 获取登录账号信息

获取当前 PC 端微信账号信息?

from wcferry import Wcf wcf = Wcf() print(wcf.get_user_info())

运行结果

{'wxid': 'wxid_***', 'name': '字里行间', 'mobile': '195********', 'home': 'C:\\Users\\Administrator\\Documents\\WeChat Files\\'} 开辟线程监听群消息

开启线程监听消息:判断是否是群消息?

from queue import Empty from threading import Thread from wcferry import Wcf, WxMsg wcf = Wcf() def processMsg(msg: WxMsg): if msg.from_group(): print(msg.content) def enableReceivingMsg(): def innerWcFerryProcessMsg(): while wcf.is_receiving_msg(): try: msg = wcf.get_msg() processMsg(msg) except Empty: continue except Exception as e: print(f"ERROR: {e}") wcf.enable_receiving_msg() Thread(target=innerWcFerryProcessMsg, name="ListenMessageThread", daemon=True).start() enableReceivingMsg() wcf.keep_running() 微信消息属性说明 class WxMsg() 微信消息属性说明

属性说明

字段名类型描述typeint消息类型 可通过 get_msg_types 获取idstr消息 idxmlstr消息 xml 部分senderstr消息发送人roomidstr(仅群消息有)群 idcontentstr消息内容thumbstr视频或图片消息的缩略图路径extrastr视频或图片消息的路径

消息类型

from wcferry import Wcf wcf = Wcf() print(wcf.get_msg_types()) 消息类型编号消息类型描述属性0朋友圈消息int1文字int3图片int34语音int37好友确认int40POSSIBLEFRIEND_MSGint42名片int43视频int47石头剪刀布表情图片48位置int49共享实时位置、文件、转账、链接int50VOIPMSGint51微信初始化int52VOIPNOTIFYint53VOIPINVITEint62小视频int66微信红包int9999SYSNOTICEint10000红包、系统消息int10002撤回消息int1048625搜狗表情int16777265链接int436207665微信红包int536936497红包封面int754974769视频号视频int771751985视频号名片int822083633引用消息int922746929拍一拍int973078577视频号直播int974127153商品链接int975175729视频号直播int1040187441音乐链接int1090519089文件int 根据群名称查询群 wxid

特别注意:Wcf 没有提供根据群名称查询群 wxid 功能。我们可以先获取全部联系人数据(微信好友、微信群等等),基于 wxid 进行区分,因为微信群 wxid 后缀都是 “chatroom” 结尾。

from wcferry import Wcf, WxMsg wcf = Wcf() wcf_rooms = [] for contact in wcf.get_contacts(): if contact['wxid'].endswith("chatroom"): wcf_rooms.append(contact) def get_chatroom_roomid(wcf_rooms: list, room_name: str): for room in wcf_rooms: if room['name'] == room_name: return room['wxid'] return None room_id = get_chatroom_roomid(wcf_rooms=wcf_rooms, room_name="测试群") 定时发送群文件

如何进行定时发送群文件?通过 aspschedule 第三方库实现定时任务,然后调用 wcf.send_file 函数执行发送文件的消息。

from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime from wcferry import Wcf wcf = Wcf() wcf_rooms = [] for contact in wcf.get_contacts(): if contact['wxid'].endswith("chatroom"): wcf_rooms.append(contact) def get_chatroom_roomid(wcf_rooms: list, room_name: str): for room in wcf_rooms: if room['name'] == room_name: return room['wxid'] return None def schedule_task_job(room_id: str, wcf: Wcf): wcf.send_file(path="test.txt", receiver=room_id) customize_time = "2024-05-09 09:10:10" customize_room = "唤醒手腕测试群" run_date = datetime.strptime(customize_time, "%Y-%m-%d %H:%M:%S") room_id = get_chatroom_roomid(wcf_rooms, customize_room) scheduler = BackgroundScheduler() scheduler.add_job(schedule_task_job, args=(room_id, wcf), run_date=run_date) scheduler.start() wcf.keep_running()

运行结果

在这里插入图片描述

监听保存语音消息 from queue import Empty from threading import Thread from wcferry import Wcf, WxMsg wcf = Wcf() def processMsg(msg: WxMsg): if msg.from_group(): response = wcf.get_audio_msg(id=msg.id, dir=f"audio") print("语音地址:" + response) def enableReceivingMsg(): def innerWcFerryProcessMsg(): while wcf.is_receiving_msg(): try: msg = wcf.get_msg() processMsg(msg) except Empty: continue except Exception as e: print(f"ERROR: {e}") wcf.enable_receiving_msg() Thread(target=innerWcFerryProcessMsg, name="ListenMessageThread", daemon=True).start() enableReceivingMsg() wcf.keep_running() 更新中······


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3