0
点赞
收藏
分享

微信扫一扫

impala执行的语句告警

小云晓云 03-12 19:45 阅读 2

import requests
import json
from datetime import datetime
import pytz
from requests.auth import HTTPBasicAuth
# CDH Impala API endpoint for running queries
cdh_api_url = "http://192.168.1.1:7180/api/v31/clusters/cluster/services/impala/impalaQueries"
webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=*********************"

def get_running_impala_queries():
    response = requests.get(cdh_api_url,auth=HTTPBasicAuth('admin', '*******'))
    if response.status_code == 200:
        if response.text:  # Check if response is not empty
            try:
                running_queries = response.json()
                return running_queries
            except json.decoder.JSONDecodeError:
                print("Error: Unable to decode JSON response")
                return None
        else:
            print("Error: Empty response received")
            return None
    else:
        print("Error: Failed to fetch running Impala queries")
        return None

def send_alert_via_webhook(alert_message):
    payload = {"text": { "content": alert_message}, "msgtype": "text"}
    headers = {"Content-Type": "application/json"}
    webhook_response = requests.post(webhook_url, data=json.dumps(payload), headers=headers)
    if webhook_response.status_code == 200:
        print(webhook_response.json())
        print("Alert sent successfully")
    else:
        print("Failed to send alert via Webhook")
#读取数据的单位转换
def bytes_to_gb(bytes):
    if bytes is None:
        return None  # 或者根据需要返回一个默认值

    try:
        bytes = int(bytes)  # 将输入转换为整数
        gb = bytes / (1024 * 1024 * 1024)
        return gb
    except ValueError:
        return None  # 处理无法转换为整数的情况


def monitor_impala_queries():
    running_queries = get_running_impala_queries()
    if running_queries:
        for query in running_queries['queries']:
            query_id = query.get("queryId")
            query_statement = query.get("statement")
            query_user =  query.get("user")
            query_startime = query.get("startTime")
            #query_hdfs_red = query.get("hdfs_bytes_read")
            query_hdfs_red = query.get('attributes').get("hdfs_bytes_read")
            query_hdfs_red_gb = bytes_to_gb(query_hdfs_red)
           # print(query_hdfs_red) 
            # Check if the query is currently running
            if query.get("queryState") == "RUNNING" and query_statement.lower().startswith("create table"):
                #转换为中国时间
                start_time = datetime.fromisoformat(query_startime.replace("Z", "+00:00"))
                china_timezone = pytz.timezone('Asia/Shanghai')
                start_time = start_time.replace(tzinfo=pytz.utc).astimezone(china_timezone)
                #formatted_start_time = start_time.strftime('%Y-%m-%d %H:%M:%S')
                #计算执行时间
                current_time = datetime.now(china_timezone)
                execution_time = current_time - start_time
                # 检查查询是否已运行超过1s
                if execution_time.total_seconds() >= 300: # 5 minutes = 300 seconds
                   formatted_start_time = start_time.strftime('%Y-%m-%d %H:%M:%S')
                # Prepare the alert message
                  # alert_message = f"Impala超过10分钟正在查询\n用户: {query_user}\n查询的时间: {formatted_start_time}\nHDFS读取量:{query_hdfs_red}\n正在运行的sql: {query_statement}"
                   if query_hdfs_red_gb is not None:
                      alert_message = f"【Impala告警】运行查询超5分别钟\n【通知类型】CREATE_TABLE\n【用户名称】 {query_user}\n【查询时间】 {formatted_start_time}\n【HDFS读取】 {query_hdfs_red_gb:.2f}GB\n【运行SQL】: {query_statement}"
                   else:
                      alert_message = f"Impala超过5分钟正在查询\n用户: {query_user}\n查询的时间: {formatted_start_time}\nHDFS读取量: 未知\n正在运行的sql: {query_statement}"
                   print(alert_message)
                # Send alert message via Webhook
                   send_alert_via_webhook(alert_message)

monitor_impala_queries()

举报

相关推荐

0 条评论