一、前期准备
1.注册微信公众号,获取AppID和AppSecret
2.微信公众号接口文档(文档链接),参照文档进行接口调用
3.后台上传素材到微信公众号,也可以通过接口上传
4.获取公网IP地址,微信公众号设置IP白名单
获取公网IP地址的代码如下:
import requests
def get_pubic_ip():
"""
获取公网ip地址
:return:
"""
res = requests.get('http://httpbin.org/ip')
ip = res.json()['origin']
return ip
if __name__ == '__main__':
print(get_pubic_ip())
二、access_token获取
调用微信公众号接口时都需要使用到access_token,故提前获取。
注意:只有添加到白名单的IP才能通过AppID和AppSecret获取到access_token。
import requests
def get_token(
app_id='your AppID', # 微信公众号AppID
app_secret='your AppSecret' # 微信公众号AppSecret
):
"""
获取token
:return:
"""
url = f'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={app_id}&secret={app_secret}'
res = requests.get(url=url)
result = res.json()
if result.get('access_token'):
token = result['access_token']
print(f"获取token成功:{token[:14]}****")
return token
else:
print(f"获取token失败--{result}")
if __name__ == '__main__':
app_token = get_token()
运行结果:
三、access_token刷新
由于access_token的有效期只有2小时,故需要定时刷新。
这里使用app_token.yaml来保存获取到的token以及时间;token在时效内返回保存的token,超过时效会获取新的token
import os
import time
import yaml
def refresh_token():
"""
token刷新机制
:return:
"""
app_token_path = os.path.dirname(os.path.abspath(__file__)) + os.sep + 'app_token.yaml'
try:
# 读取时间和token
if not os.path.exists(app_token_path):
with open(app_token_path, 'w+') as f:
f.write('')
cfg_token = yaml_read(app_token_path)
t = cfg_token['time']
record_token = cfg_token['token']
cur_time = time.time()
# token时间在7200s之内,返回token
if 0 < cur_time - t < 7200:
print(f"token时效内")
return record_token
# token过期,刷新时间和token
else:
print('token已过期')
token = get_token()
if token:
data = {'time': time.time(), 'token': token}
yaml_clear(app_token_path)
yaml_write(data, app_token_path)
return token
except TypeError:
# 获取初始时间和token
print('获取初始token')
token = get_token()
data = {'time': time.time(), 'token': token}
yaml_write(data, app_token_path)
return token
def yaml_read(file):
"""
yaml文件读取
:param file:
:return:
"""
with open(file=file, mode="r", encoding="utf-8") as f:
data = yaml.safe_load(f.read())
return data
def yaml_write(data, file):
"""
yaml文件写入
:param data:
:param file:
:return:
"""
with open(file, 'a', encoding='utf-8') as f:
yaml.dump(
data,
stream=f,
allow_unicode=True, # 避免unicode编码问题
sort_keys=False # 不自动排序
)
def yaml_clear(file):
"""
yaml文件清空
:param file:
:return:
"""
with open(file, 'w', encoding='utf-8') as f:
f.truncate()
if __name__ == '__main__':
app_token = refresh_token()
print(app_token)
运行结果:
四、获取素材列表
传入前面获取到的access_token来调用接口
import json
import requests
def get_material_list(count=20, offset=0, material_type='image'):
"""
获取永久素材列表
:param count: 返回素材的数量,取值在1到20之间
:param offset: 0表示从第一个素材返回
:param material_type: 素材的类型,图片(image)、视频(video)、语音 (voice)、图文(news
:return:
"""
url = f"https://api.weixin.qq.com/cgi-bin/material/batchget_material?access_token={refresh_token()}"
page_data = [] # 用于存储所有数据
while True:
data = {"type": material_type, "offset": offset, "count": count} # 构造分页参数
res = requests.post(url=url, json=data)
if res.status_code != 200 or res.json().get('errcode'): # 请求失败时中断
print(f'获取永久素材列表失败--{res.text}')
break
result = json.loads(res.content) # 使用result=res.json(),响应结果中文乱码
total_count = result.get('total_count')
page_data.extend(result['item'])
if total_count is not None and offset >= total_count: # 没有更多页时中断
break
offset += count
print(f'获取素材列表成功')
return page_data
if __name__ == '__main__':
m_li = get_material_list()
print(m_li)
运行结果:
五、新建草稿
传入文章标题、作者、内容、摘要、随机的封面图media_id以及access_token
import json
import random
import requests
def add_draft(title, author, content, thumb_media_id, digest):
"""
新建草稿--图文
:param title:
:param author:
:param content: 文章内容
:param thumb_media_id: # 封面素材id
:param digest: 摘要--限制54个字符
:return:
"""
url = f'https://api.weixin.qq.com/cgi-bin/draft/add?access_token={refresh_token()}'
data = {
"articles":
[
{
"title": title,
"author": author,
"content": str(content),
"thumb_media_id": thumb_media_id, 'digest': digest
}
]
}
res = requests.post(url=url, data=json.dumps(data, ensure_ascii=False).encode('utf-8'))
if res.status_code == 200:
result = json.loads(res.content)
if result.get('media_id'):
print(f'新建草稿成功--{result}')
return result
else:
print(f'新建草稿失败--{result}')
else:
print(f'新建草稿失败--{res.text}')
if __name__ == '__main__':
m_li = get_material_list()
m_id_li = [i.get('media_id') for i in m_li]
random_m_id = random.choice(m_id_li) # 随机选一个素材id
t = 'Python自动新建文章草稿'
a = 'DriverWon'
c = '使用Python自动新建的微信公众号草稿'
d = '摘要内容'
add_draft(t, a, c, random_m_id, d)
运行结果: