微信定时发送倒数日 微信定时发送倒数日是基于Github项目WeChatFerry 实现的 这个项目实现的功能:
检查登录状态
获取登录账号的 wxid
获取消息类型
获取所有联系人
获取所有好友
获取数据库
获取某数据库下的表
获取用户信息
发送文本消息(可 @)
发送图片
发送文件
允许接收消息
停止接收消息
执行 SQL 查询
接受好友申请
添加群成员
删除群成员
解密图片
获取朋友圈消息
保存图片
保存语音
发送卡片消息
拍一拍群友
邀请群成员
图片 OCR
同时支持多个客户端:
对于HTTP客户端:
1 2 3 4 5 6 7 8 9 10 11 12 pip install --upgrade wcfhttp wcfhttp -v wcfhttp -h wcfhttp wcfhttp --cb http://your_host:your_port/callback
访问http://your_host:your_port/docs
就可以查看接口文档
对于Python客户端:
1 2 pip install --upgrade wcfhttp
接口信息 示例程序
实现过程 保存联系人列表 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def save_contacts (wcf ): data = wcf.get_contacts() for i in data: wxid = i['wxid' ] code = i['code' ] name = i['name' ] if i['wxid' ].startswith('wxid_' ): contacts_dict[wxid] = [name, code] elif i['wxid' ].endswith('@chatroom' ): chatroom_dict[wxid] = [name, code] with open ('contacts.csv' , 'w' , encoding='utf-8' ) as f: for i in contacts_dict: f.write(f'{i} ,{contacts_dict[i][0 ]} ,{contacts_dict[i][1 ]} \n' ) with open ('chatroom.csv' , 'w' , encoding='utf-8' ) as f: for i in chatroom_dict: f.write(f'{i} ,{chatroom_dict[i][0 ]} ,{chatroom_dict[i][1 ]} \n' )
有关倒数日的操作 获取倒数日
1 2 3 4 5 6 7 8 9 10 11 def get_countdown (): if not os.path.exists('countdown.csv' ): with open ('countdown.csv' , 'w' , encoding='utf-8' ) as f: pass with open ('countdown.csv' , 'r' , encoding='utf-8' ) as f: for i in f: title, date = i.strip().split(',' ) countdown_day[title] = date print ('[+] 获取倒数日成功' )
保存倒数日
1 2 3 4 5 6 def save_countdown (): with open ('countdown.csv' , 'w' , encoding='utf-8' ) as f: for i in countdown_day: f.write(f'{i} ,{countdown_day[i]} \n' ) print ('[+] 保存倒数日成功' )
处理倒数日
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def process_countdown (wcf ): while True : try : if time.strftime('%H:%M' , time.localtime()) == time.strftime('%H:%M' , remind_time): content = f'今天是{time.strftime("%Y-%m-%d" , time.localtime())} \n\n' for i in countdown_day: days = (datetime.datetime.strptime(countdown_day[i], '%Y-%m-%d' ) - datetime.datetime.strptime(time.strftime('%Y-%m-%d' , time.localtime()), '%Y-%m-%d' )).days content += f'[+] [{i} ]\n' content += f'[{countdown_day[i]} ] [还剩{days} 天]\n\n' if days == 0 : countdown_day.pop(i) for j in send_wxid: wcf.send_text(content, j) print ('[+] 发送倒数日成功' ) time.sleep(60 ) except Exception as e: print ('[-] 处理倒数日错误:' , e)
添加倒数日
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def add_countdown (wcf, msg ): stop_date = msg.content.split(' ' )[2 ] title = msg.content.split(' ' )[1 ] try : time.strptime(stop_date, '%Y-%m-%d' ) if title in countdown_day: wcf.send_text(f'[-] 倒数日 {title} 已存在' , msg.sender) return if (datetime.datetime.strptime(stop_date, '%Y-%m-%d' ) - datetime.datetime.strptime(time.strftime('%Y-%m-%d' , time.localtime()), '%Y-%m-%d' )).days < 0 : wcf.send_text(f'[-] 倒数日 {title} 时间已过' , msg.sender) return except Exception as e: print ('[-] 添加倒数日错误:' , e) wcf.send_text('[-] 添加倒数日错误' , msg.sender) else : countdown_day[title] = stop_date print (f'[+] {msg.sender} 添加倒数日 {title} {stop_date} ' ) wcf.send_text(f'[+] 添加倒数日 {title} 截止日期 {stop_date} ' , msg.sender)
删除倒数日
1 2 3 4 5 6 7 8 9 10 11 def del_countdown (wcf, msg ): title = msg.content.split(' ' )[1 ] try : countdown_day.pop(title) except Exception as e: print ('[-] 删除倒数日错误:' , e) wcf.send_text('[-] 删除倒数日错误' , msg.sender) else : print (f'[+] {msg.sender} 删除倒数日 {title} ' ) wcf.send_text(f'[+] 删除倒数日 {title} ' , msg.sender)
列出倒数日
1 2 3 4 5 6 7 8 9 10 11 12 def list_countdown (wcf, msg ): content = f'今天是{time.strftime("%Y-%m-%d" , time.localtime())} \n\n' for i in countdown_day: days = (datetime.datetime.strptime(countdown_day[i], '%Y-%m-%d' ) - datetime.datetime.strptime(time.strftime('%Y-%m-%d' , time.localtime()), '%Y-%m-%d' )).days content += f'[+] [{i} ]\n' content += f'[{countdown_day[i]} ] [还剩{days} 天]\n\n' content += f'\n提醒时间为每天的 {time.strftime("%H:%M" , remind_time)} \n' content += f'发送倒数日的对象为 {send_wxid} ' print (f'[+] {msg.sender} 列出倒数日' ) wcf.send_text(content, msg.sender)
设置提醒时间
1 2 3 4 5 6 7 8 9 10 11 12 13 def set_remind_time (wcf, msg ): global remind_time try : time_ = msg.content.split(' ' )[1 ] remind_time = time.strptime(time_, '%H:%M' ) except Exception as e: print ('[-] 设置提醒时间错误:' , e) wcf.send_text('[-] 设置提醒时间错误' , msg.sender) else : print (f'[+] {msg.sender} 设置提醒时间为每天的 {time.strftime("%H:%M" , remind_time)} ' ) wcf.send_text(f'[+] 设置提醒时间为每天的 {time.strftime("%H:%M" , remind_time)} ' , msg.sender)
设置发送倒数日的对象
1 2 3 4 5 6 7 8 9 10 11 12 13 def update_send_wxid (wcf, msg ): global send_wxid try : if ',' in msg.content.split(' ' )[1 ]: send_wxid = msg.content.split(' ' )[1 ].split(',' ) else : send_wxid = [msg.content.split(' ' )[1 ]] except Exception as e: print ('[-] 设置发送倒数日的对象错误:' , e) wcf.send_text('[-] 设置发送倒数日的对象错误' , msg.sender) print (f'[+] {msg.sender} 设置发送倒数日的对象为 {send_wxid} ' ) wcf.send_text(f'[+] 设置发送倒数日的对象为 {send_wxid} ' , msg.sender)
处理发送的指令 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def process_msg (wcf ): while wcf.is_receiving_msg(): try : msg = wcf.get_msg() content = msg.content if content.startswith('$add' ): add_countdown(wcf, msg) elif content.startswith('$del' ): del_countdown(wcf, msg) elif content == '$list' : list_countdown(wcf, msg) elif content == '$help' : help_info(wcf, msg) elif content.startswith('$set' ): set_remind_time(wcf, msg) elif content.startswith('$send' ): update_send_wxid(wcf, msg) else : continue except Empty: continue except Exception as e: print ('[-] 处理消息错误:' , e)
主函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def main (): wcf = Wcf() save_contacts(wcf) get_countdown() Thread(target=process_countdown, args=(wcf,)).start() wcf.enable_receiving_msg() Thread(target=process_msg, args=(wcf,), daemon=True ).start() return wcf if __name__ == '__main__' : wcf = None try : wcf = main() wcf.keep_running() except KeyboardInterrupt: print ('[-] 服务停止' ) save_countdown() wcf.cleanup() sys.exit(0 )
实现成果
增加讯飞星火大模型 在上面的基础上,也可以接入一些AI大模型或者一些智能聊天的机器人 我这里使用讯飞星火V3.0模型接口,个人认证和免费体验一年,200万token讯飞星火认知大模型 星火认知大模型Web API文档 参考Web API的文档 实现的结果: