微信定时发送倒数日

微信定时发送倒数日是基于Github项目WeChatFerry实现的
这个项目实现的功能:

  • 检查登录状态
  • 获取登录账号的 wxid
  • 获取消息类型
  • 获取所有联系人
  • 获取所有好友
  • 获取数据库
  • 获取某数据库下的表
  • 获取用户信息
  • 发送文本消息(可 @)
  • 发送图片
  • 发送文件
  • 允许接收消息
  • 停止接收消息
  • 执行 SQL 查询
  • 接受好友申请
  • 添加群成员
  • 删除群成员
  • 解密图片
  • 获取朋友圈消息
  • 保存图片
  • 保存语音
  • 发送卡片消息
  • 拍一拍群友
  • 邀请群成员
  • 图片 OCR

同时支持多个客户端:

  • Go
  • HTTP
  • Java
  • Python
  • Rust

对于HTTP客户端:

1
2
3
4
5
6
7
8
9
10
11
12
# 安装
pip install --upgrade wcfhttp

# 运行
# 查看版本
wcfhttp -v
# 查看帮助
wcfhttp -h
# 忽略新消息运行
wcfhttp
# 新消息转发到指定地址
wcfhttp --cb http://your_host:your_port/callback

访问http://your_host:your_port/docs就可以查看接口文档

对于Python客户端:

1
2
# 安装Python库
pip install --upgrade wcfhttp

接口信息
示例程序

实现过程

保存联系人列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def save_contacts(wcf):
# 保存联系人列表
data = wcf.get_contacts()
for i in data:
wxid = i['wxid']
code = i['code']
name = i['name']
if i['wxid'].startswith('wxid_'):
contacts_dict[wxid] = [name, code]
elif i['wxid'].endswith('@chatroom'):
chatroom_dict[wxid] = [name, code]
with open('contacts.csv', 'w', encoding='utf-8') as f:
for i in contacts_dict:
f.write(f'{i},{contacts_dict[i][0]},{contacts_dict[i][1]}\n')
with open('chatroom.csv', 'w', encoding='utf-8') as f:
for i in chatroom_dict:
f.write(f'{i},{chatroom_dict[i][0]},{chatroom_dict[i][1]}\n')

有关倒数日的操作

获取倒数日

1
2
3
4
5
6
7
8
9
10
11
def get_countdown():
# 获取倒数日
if not os.path.exists('countdown.csv'):
# 创建一个新的文件
with open('countdown.csv', 'w', encoding='utf-8') as f:
pass
with open('countdown.csv', 'r', encoding='utf-8') as f:
for i in f:
title, date = i.strip().split(',')
countdown_day[title] = date
print('[+] 获取倒数日成功')

保存倒数日

1
2
3
4
5
6
def save_countdown():
# 保存倒数日
with open('countdown.csv', 'w', encoding='utf-8') as f:
for i in countdown_day:
f.write(f'{i},{countdown_day[i]}\n')
print('[+] 保存倒数日成功')

处理倒数日

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def process_countdown(wcf):
# 处理倒数日
while True:
try:
if time.strftime('%H:%M', time.localtime()) == time.strftime('%H:%M', remind_time):
content = f'今天是{time.strftime("%Y-%m-%d", time.localtime())}\n\n'
for i in countdown_day:
# 计算还剩多少天
days = (datetime.datetime.strptime(countdown_day[i], '%Y-%m-%d') - datetime.datetime.strptime(time.strftime('%Y-%m-%d', time.localtime()), '%Y-%m-%d')).days
content += f'[+] [{i}]\n'
content += f'[{countdown_day[i]}] [还剩{days}天]\n\n'
# 如果是最后一天 则删除倒数日
if days == 0:
countdown_day.pop(i)
for j in send_wxid:
wcf.send_text(content, j)
print('[+] 发送倒数日成功')
time.sleep(60)
except Exception as e:
print('[-] 处理倒数日错误:', e)

添加倒数日

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def add_countdown(wcf, msg):
# 添加倒数日
stop_date = msg.content.split(' ')[2]
title = msg.content.split(' ')[1]
# 检查时间格式
try:
time.strptime(stop_date, '%Y-%m-%d')
# 如果倒数日已存在
if title in countdown_day:
wcf.send_text(f'[-] 倒数日 {title} 已存在', msg.sender)
return
# 如果时间已过
if (datetime.datetime.strptime(stop_date, '%Y-%m-%d') - datetime.datetime.strptime(time.strftime('%Y-%m-%d', time.localtime()), '%Y-%m-%d')).days < 0:
wcf.send_text(f'[-] 倒数日 {title} 时间已过', msg.sender)
return
except Exception as e:
print('[-] 添加倒数日错误:', e)
wcf.send_text('[-] 添加倒数日错误', msg.sender)
else:
countdown_day[title] = stop_date
print(f'[+] {msg.sender} 添加倒数日 {title} {stop_date}')
wcf.send_text(f'[+] 添加倒数日 {title} 截止日期 {stop_date}', msg.sender)

删除倒数日

1
2
3
4
5
6
7
8
9
10
11
def del_countdown(wcf, msg):
# 删除倒数日
title = msg.content.split(' ')[1]
try:
countdown_day.pop(title)
except Exception as e:
print('[-] 删除倒数日错误:', e)
wcf.send_text('[-] 删除倒数日错误', msg.sender)
else:
print(f'[+] {msg.sender} 删除倒数日 {title}')
wcf.send_text(f'[+] 删除倒数日 {title}', msg.sender)

列出倒数日

1
2
3
4
5
6
7
8
9
10
11
12
def list_countdown(wcf, msg):
# 列出倒数日
content = f'今天是{time.strftime("%Y-%m-%d", time.localtime())}\n\n'
for i in countdown_day:
# 计算还剩多少天
days = (datetime.datetime.strptime(countdown_day[i], '%Y-%m-%d') - datetime.datetime.strptime(time.strftime('%Y-%m-%d', time.localtime()), '%Y-%m-%d')).days
content += f'[+] [{i}]\n'
content += f'[{countdown_day[i]}] [还剩{days}天]\n\n'
content += f'\n提醒时间为每天的 {time.strftime("%H:%M", remind_time)}\n'
content += f'发送倒数日的对象为 {send_wxid}'
print(f'[+] {msg.sender} 列出倒数日')
wcf.send_text(content, msg.sender)

设置提醒时间

1
2
3
4
5
6
7
8
9
10
11
12
13
def set_remind_time(wcf, msg):
# 设置提醒时间
global remind_time
try:
time_ = msg.content.split(' ')[1]
# 检查时间格式
remind_time = time.strptime(time_, '%H:%M')
except Exception as e:
print('[-] 设置提醒时间错误:', e)
wcf.send_text('[-] 设置提醒时间错误', msg.sender)
else:
print(f'[+] {msg.sender} 设置提醒时间为每天的 {time.strftime("%H:%M", remind_time)}')
wcf.send_text(f'[+] 设置提醒时间为每天的 {time.strftime("%H:%M", remind_time)}', msg.sender)

设置发送倒数日的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
def update_send_wxid(wcf, msg):
# 设置发送倒数日的对象
global send_wxid
try:
if ',' in msg.content.split(' ')[1]:
send_wxid = msg.content.split(' ')[1].split(',')
else:
send_wxid = [msg.content.split(' ')[1]]
except Exception as e:
print('[-] 设置发送倒数日的对象错误:', e)
wcf.send_text('[-] 设置发送倒数日的对象错误', msg.sender)
print(f'[+] {msg.sender} 设置发送倒数日的对象为 {send_wxid}')
wcf.send_text(f'[+] 设置发送倒数日的对象为 {send_wxid}', msg.sender)

处理发送的指令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def process_msg(wcf):
while wcf.is_receiving_msg():
try:
msg = wcf.get_msg()
content = msg.content
if content.startswith('$add'): # 添加倒数日
add_countdown(wcf, msg)
elif content.startswith('$del'): # 删除倒数日
del_countdown(wcf, msg)
elif content == '$list': # 列出倒数日
list_countdown(wcf, msg)
elif content == '$help': # 帮助信息
help_info(wcf, msg)
elif content.startswith('$set'): # 设置提醒时间
set_remind_time(wcf, msg)
elif content.startswith('$send'): # 设置发送倒数日的对象
update_send_wxid(wcf, msg)
else:
continue
except Empty:
continue
except Exception as e:
print('[-] 处理消息错误:', e)

主函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def main():
wcf = Wcf()
# 保存联系人列表
save_contacts(wcf)
# 获取倒数日
get_countdown()
# 处理倒数日
Thread(target=process_countdown, args=(wcf,)).start()
# 处理消息
wcf.enable_receiving_msg()
Thread(target=process_msg, args=(wcf,), daemon=True).start()
return wcf

if __name__ == '__main__':
wcf = None
try:
wcf = main()
wcf.keep_running()
except KeyboardInterrupt:
print('[-] 服务停止')
save_countdown()
wcf.cleanup()
sys.exit(0)

实现成果

$help命令
$list命令
$add命令
$del命令
$send命令
$set命令
定时提醒效果

增加讯飞星火大模型

在上面的基础上,也可以接入一些AI大模型或者一些智能聊天的机器人
我这里使用讯飞星火V3.0模型接口,个人认证和免费体验一年,200万token
讯飞星火认知大模型
星火认知大模型Web API文档
参考Web API的文档
实现的结果:
讯飞星火模型