0
点赞
收藏
分享

微信扫一扫

Python 利用Selenium截图上传腾讯云COS并发送至钉钉群

程序小小黑 2022-02-10 阅读 71


钉钉配置机器人

/*钉钉群聊添加机器人*/

Python 利用Selenium截图上传腾讯云COS并发送至钉钉群_python


Python 利用Selenium截图上传腾讯云COS并发送至钉钉群_cos_02


Python 利用Selenium截图上传腾讯云COS并发送至钉钉群_cos_03


Python 利用Selenium截图上传腾讯云COS并发送至钉钉群_钉钉通知_04


Python 利用Selenium截图上传腾讯云COS并发送至钉钉群_钉钉通知_05


Python 利用Selenium截图上传腾讯云COS并发送至钉钉群_cos_06

代码实现

/*截取百度主页上传至cos并发送到钉钉群聊*/

from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.by import By
import os
import json
import hmac
import hashlib
import base64
import urllib
import requests
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
import time
from selenium.webdriver.chrome.options import Options
filename ="monitor_"+time.strftime('%Y-%m-%d-%H_%M_%S', time.localtime(time.time()))+'.png'


class Notification () :
def __init__(self):
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('window-size=1920x1080')
chrome_options.add_argument('--start-maximized')
self.driver = webdriver.Chrome(options=chrome_options ,executable_path='C:\\Users\\Administrator\\AppData\\Local\\Google\\Chrome\\Application\\chromedriver.exe')
self.driver.maximize_window()

def get_screen(self):
try:
self.driver.get("https://www.baidu.com/")
# self.driver.implicitly_wait(2)
if self.find_element('xpath', '//input[@id="su"]'):
self.driver.save_screenshot(filename)
time.sleep(2)
self.send_dingding()
except Exception as ex:
print("获取主页异常 {}".format(ex))
self.driver.quit()

def find_element(self,*args):
try:
loc = args[0]
value = args[1]
if loc == 'id':
element = By.ID
if loc == 'xpath':
element = By.XPATH
if loc == 'css':
element = By.CSS_SELECTOR
WebDriverWait(self.driver, 30).until(
lambda driver: True if driver.find_element(element, value) is not None else False)
print('{}存在,断言成功'.format(value))
return True
except Exception as ex:
print('{}不存在,断言失败'.format(value))
self.driver.save_screenshot('element_not_exist_'+time.strftime('%m-%d %H:%M', time.localtime(time.time()))+'.png')

def get_url(self):
timestamp = str(round(time.time() * 1000))
secret = 'SEC*********' # 添加机器人步骤选择加签选项所生成的secret
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
has_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(has_code))
return timestamp ,sign

def send_dingding(self):
try:
# URL参数值为添加机器人生成的webhook链接
url = 'https://oapi.dingtalk.com/robot/send?access_token=**********' \
'&timestamp='+self.get_url()[0]+\
'&sign='+self.get_url()[1]
header = {'content-type': 'application/json','User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:22.0) Gecko/20100101 Firefox/22.0'}
param = json.dumps(
{
"msgtype": "markdown",
"markdown": {
"title": "ElasticSearch 状态概览",
"text": "#### ElasticSearch 状态概览 ("+time.strftime('%m-%d %H:%M', time.localtime(time.time()))+") \n> > ![screenshot]("+self.upload_cos(os.getcwd()+'\\'+filename)+")\n> \n"
}
}
)
requests.post(url, data=param, headers=header)
except Exception as ex:
print('发送钉钉群异常{}'.format(ex))
raise ex

def upload_cos(self,path):
"""
图片上传至腾讯云cos
:param path:
:return:
"""
secret_id = 'AKID**********' # cos secretId
secret_key = 'EC**********' # cos secretKey
region = 'ap-guangzhou' # cos region
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key)
client = CosS3Client(config)
file_name ='es_monitor_'+time.strftime('%Y-%m-%d-%H_%M', time.localtime(time.time()))+'.png'
client.upload_file(
Bucket='**-*****', # cos bucket
LocalFilePath=path, # 本地文件的路径
Key=file_name, # 上传到桶之后的文件名
)
visit = 'https://**-*****.cos.ap-guangzhou.myqcloud.com/' # cos visitUrl
image_link = visit+file_name
print('image link:{}'.format(image_link))
return image_link


if __name__ == '__main__':
no = Notification()
no.get_screen()



举报

相关推荐

0 条评论