0
点赞
收藏
分享

微信扫一扫

PYTHON 连接钉钉传输工作数据监控


先导

工作中运维工作经常会遇到一些数据汇报,数据监控, 作为一个新人真心感觉这些数据没有什么意思(当然也许是我菜),有句话怎么说,懒惰是人类进步的阶梯.这里使用 python 连接数据把数据 传到钉钉, 这样可以进行数据监控 ,看看是哪家小可爱又 搞事情了 ╭(╯^╰)╮

钉钉接口

钉钉提供了群机器人接口等很多很多接口,网上 demo 一大堆.这里只制作简单的表述

钉钉官网:

[​​developers.dingtalk.com/document/ap…​​]

可以给群里增加一个机器人,通过 @固定人 ,或者所有人,广播等方式发送信息

创建钉钉机器人

首先你得建立个群

PYTHON 连接钉钉传输工作数据监控_触发器

增加群机器人

完成必要的安全设置,勾选我已阅读并同意《自定义机器人服务及免责条款》,然后单击完成

目前有 3 种安全设置方式,请根据需要选择一种:

  • 自定义关键词:最多可以设置 10 个关键词,消息中至少包含其中 1 个关键词才可以发送成功。
    例如添加了一个自定义关键词:监控报警,则这个机器人所发送的消息,必须包含监控报警这个词,才能发送成功。
  • 加签
    把​​timestamp+"\n"+​​密钥当做签名字符串,使用 HmacSHA256 算法计算签名,然后进行 Base64 encode,最后再把签名参数再进行 urlEncode,得到最终的签名(需要使用 UTF-8 字符集)。

PYTHON 连接钉钉传输工作数据监控_触发器_02

https://oapi.dingtalk.com/robot/send?access_token=XXXXXX×tamp=XXX&sign=XXX

PYTHON 连接钉钉传输工作数据监控_数据_03

PYTHON 连接钉钉传输工作数据监控_后端_04

PYTHON 连接钉钉传输工作数据监控_后端_05

测试机器人

python 代码版本

这里直接使用 加签版本的,因为这种时间判定的才是最常用的

import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse

timestamp = str(round(time.time() * 1000)) #毫秒级时间戳
secret = '你的secret'
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
#往post接口里面仍数据

def dingmessage():

# 请求的URL,WebHook地址

webhook = "https://oapi.dingtalk.com/robot/send?access_token=你的token×tamp="+str(timestamp)+"&sign="+str(sign)
print(webhook)
#构建请求头部
header = {
"Content-Type": "application/json",
"Charset": "UTF-8"
}
#构建请求数据
tex = "能力越大,责任越大。"
message ={

"msgtype": "text",
"text": {
"content": tex
},
"at": {

"isAtAll": False
}

}
#对请求的数据进行json封装
message_json = json.dumps(message)
#发送请求
info = requests.post(url=webhook,data=message_json,headers=header)
#打印返回的结果
print(info.text)

if __name__=="__main__":
dingmessage()

打印结果

PYTHON 连接钉钉传输工作数据监控_json_06

如果显示 ok 那么就是说数据传输成功了

这时候接入钉钉即可 查看数据

PYTHON 连接钉钉传输工作数据监控_json_07

好了现在 py 已经能够给钉钉发送信息了,那么怎么做监控呢

创建时间监控 实时发送信息

我这里以 apscheduler 框架 进行定时巡回

一、安装 APScheduler

pip install apscheduler

复制代码

二、基本概念

APScheduler 有四大组件:

1、触发器 triggers :触发器包含调度逻辑。每个作业都有自己的触发器,用于确定下一个任务何时运行。除了初始配置之外,触发器是完全无状态的。有三种内建的 trigger:

(1)date: 特定的时间点触发

(2)interval: 固定时间间隔触发

(3)cron: 在特定时间周期性地触发

2、任务储存器 job stores:用于存放任务,把任务存放在内存(为默认 MemoryJobStore)或数据库中。3、执行器 executors: 执行器是将任务提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。

4、调度器 schedulers: 把上方三个组件作为参数,通过创建调度器实例来运行根据开发需求选择相应的组件,下面是不同的调度器组件:BlockingScheduler 阻塞式调度器:适用于只跑调度器的程序。BackgroundScheduler 后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。AsyncIOScheduler AsyncIO 调度器,适用于应用使用 AsnycIO 的情况。GeventScheduler Gevent 调度器,适用于应用通过 Gevent 的情况。TornadoScheduler Tornado 调度器,适用于构建 Tornado 应用。TwistedScheduler Twisted 调度器,适用于构建 Twisted 应用。QtScheduler Qt 调度器,适用于构建 Qt 应用。

三、使用步骤

1、新建一个调度器 schedulers

2、添加调度任务

3、运行调度任务

PYTHON 连接钉钉传输工作数据监控_后端_08

PYTHON 连接钉钉传输工作数据监控_json_09

PYTHON 连接钉钉传输工作数据监控_json_10

比如写 3 就是说 冯 3 触发

*/3 就是说 每隔 3 单位触发

假设触发条件

我们这里判定如果当前时间 4 个数据加起来大于 12 那么 就发送消息

每分钟发送一次

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
# 输出时间
import pymssql
import copy
import time
import requests
import json
import hmac
import hashlib
import base64
import urllib.parse

timestamp = str(round(time.time() * 1000))
secret = '你的secret'
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))

# now = time.time() #返回float数据
#
#
# #毫秒级时间戳
# print(int(round(now * 1000)))
# now1=int(round(now * 1000))




def dingmessage(num):

# 请求的URL,WebHook地址

webhook = "https://oapi.dingtalk.com/robot/send?access_token=你的token×tamp="+str(timestamp)+"&sign="+str(sign)
print(webhook)
#构建请求头部
header = {
"Content-Type": "application/json",
"Charset": "UTF-8"
}
#构建请求数据
tex = "能力越大,责任越大。"+str(num)
message ={

"msgtype": "text",
"text": {
"content": tex
},
"at": {

"isAtAll": False
}

}
#对请求的数据进行json封装
message_json = json.dumps(message)
#发送请求
info = requests.post(url=webhook,data=message_json,headers=header)
#打印返回的结果
print(info.text)


def job():
hour = datetime.datetime.now().hour
min = datetime.datetime.now().minute
f1 = str(hour)[0]
f2 = str(hour)[1]
f3 = str(min)[0]
f4 = str(min)[1]
print(f1)
print(f2)
print(f3)
print(f4)
print(int(f1) + int(f2) + int(f3) + int(f4))
if int(f1) + int(f2) + int(f3) + int(f4) > 12:
dingmessage(int(f1) + int(f2) + int(f3) + int(f4) )


# BlockingScheduler
scheduler = BlockingScheduler()
#1-23小时内,每一分钟触发一次, 后面那个意思是说如果3600 秒触发 那么讲重新执行该任务
scheduler.add_job(job, 'cron', hour='1-23', minute='*/1',misfire_grace_time=3600)#1
scheduler.start()

结果

运行结果

PYTHON 连接钉钉传输工作数据监控_后端_11

钉钉结果

如图 23.18 分 2+3+1+8=14 >12 触发条件 就给钉钉 发送信息了

PYTHON 连接钉钉传输工作数据监控_数据_12

致此一个见得 发送文字功能实现,有问题即可发送钉钉

还有一点就是钉钉一定要调成前台允许显示, 然后在录音个对应的提示. 懒人必备,

举报

相关推荐

0 条评论