import requests
import json
from datetime import datetime
import pytz
from requests.auth import HTTPBasicAuth
# CDH Impala API endpoint for running queries
cdh_api_url = "http://192.168.1.1:7180/api/v31/clusters/cluster/services/impala/impalaQueries"
webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=*********************"
def get_running_impala_queries():
response = requests.get(cdh_api_url,auth=HTTPBasicAuth('admin', '*******'))
if response.status_code == 200:
if response.text: # Check if response is not empty
try:
running_queries = response.json()
return running_queries
except json.decoder.JSONDecodeError:
print("Error: Unable to decode JSON response")
return None
else:
print("Error: Empty response received")
return None
else:
print("Error: Failed to fetch running Impala queries")
return None
def send_alert_via_webhook(alert_message):
payload = {"text": { "content": alert_message}, "msgtype": "text"}
headers = {"Content-Type": "application/json"}
webhook_response = requests.post(webhook_url, data=json.dumps(payload), headers=headers)
if webhook_response.status_code == 200:
print(webhook_response.json())
print("Alert sent successfully")
else:
print("Failed to send alert via Webhook")
#读取数据的单位转换
def bytes_to_gb(bytes):
if bytes is None:
return None # 或者根据需要返回一个默认值
try:
bytes = int(bytes) # 将输入转换为整数
gb = bytes / (1024 * 1024 * 1024)
return gb
except ValueError:
return None # 处理无法转换为整数的情况
def monitor_impala_queries():
running_queries = get_running_impala_queries()
if running_queries:
for query in running_queries['queries']:
query_id = query.get("queryId")
query_statement = query.get("statement")
query_user = query.get("user")
query_startime = query.get("startTime")
#query_hdfs_red = query.get("hdfs_bytes_read")
query_hdfs_red = query.get('attributes').get("hdfs_bytes_read")
query_hdfs_red_gb = bytes_to_gb(query_hdfs_red)
# print(query_hdfs_red)
# Check if the query is currently running
if query.get("queryState") == "RUNNING" and query_statement.lower().startswith("create table"):
#转换为中国时间
start_time = datetime.fromisoformat(query_startime.replace("Z", "+00:00"))
china_timezone = pytz.timezone('Asia/Shanghai')
start_time = start_time.replace(tzinfo=pytz.utc).astimezone(china_timezone)
#formatted_start_time = start_time.strftime('%Y-%m-%d %H:%M:%S')
#计算执行时间
current_time = datetime.now(china_timezone)
execution_time = current_time - start_time
# 检查查询是否已运行超过1s
if execution_time.total_seconds() >= 300: # 5 minutes = 300 seconds
formatted_start_time = start_time.strftime('%Y-%m-%d %H:%M:%S')
# Prepare the alert message
# alert_message = f"Impala超过10分钟正在查询\n用户: {query_user}\n查询的时间: {formatted_start_time}\nHDFS读取量:{query_hdfs_red}\n正在运行的sql: {query_statement}"
if query_hdfs_red_gb is not None:
alert_message = f"【Impala告警】运行查询超5分别钟\n【通知类型】CREATE_TABLE\n【用户名称】 {query_user}\n【查询时间】 {formatted_start_time}\n【HDFS读取】 {query_hdfs_red_gb:.2f}GB\n【运行SQL】: {query_statement}"
else:
alert_message = f"Impala超过5分钟正在查询\n用户: {query_user}\n查询的时间: {formatted_start_time}\nHDFS读取量: 未知\n正在运行的sql: {query_statement}"
print(alert_message)
# Send alert message via Webhook
send_alert_via_webhook(alert_message)
monitor_impala_queries()