0
点赞
收藏
分享

微信扫一扫

【分布式监控系统】第3章——监控客户端开发


客户端程序启动入口

#_*_coding:utf-8_*_
__author__ = 'Alex Li'

from core import client

class command_handler(object):

def __init__(self, sys_args):
self.sys_args = sys_args
if len(self.sys_args)<2:
self.help_msg()

self.command_allowcator()


def command_allowcator(self):
'''分捡用户输入的不同指令'''
print(self.sys_args[1])

if hasattr(self,self.sys_args[1]):
func= getattr(self,self.sys_args[1])
return func()
else:
print("command does not exist!")
self.help_msg()

def help_msg(self):
valid_commands = '''
start start monitor client
stop stop monitor client

'''
exit(valid_commands)


def start(self):
print("going to start the monitor client")
#exit_flag = False

Client = client.ClientHandle()
Client.forever_run()

def stop(self):
print("stopping the monitor client")

配置信息

#_*_coding:utf-8_*_
__author__ = 'Alex Li'


configs ={
'HostID': 1,
"Server": "192.168.16.56",
"ServerPort": 8000,
"urls":{
# 获取到配置信息
'get_configs' :['api/client/config','get'], #acquire all the services will be monitored
# 发送监控到的数据给服务端
'service_report': ['api/client/service/report/','post'],

},
'RequestTimeout':30,
# 重新向服务器端获得最新的配置信息,来根据配置信息获取监控服务的数据
'ConfigUpdateInterval': 300, #5 mins as default

}

客户端发送监控到的数据

客户端通过自己本地的ID号,获取到服务器端的配置信息,将需要监控的服务数据再响应给服务端

服务端响应的配置信息数据结构如下:

【分布式监控系统】第3章——监控客户端开发_redis

#_*_coding:utf-8_*_
__author__ = 'Alex Li'

import time
from conf import settings
import urllib
import urllib2
import json
import threading
from plugins import plugin_api

class ClientHandle(object):
def __init__(self):
self.monitored_services = {}

def load_latest_configs(self):
'''
load the latest monitor configs from monitor server
:return:
'''
request_type = settings.configs['urls']['get_configs'][1]
url = "%s/%s" %(settings.configs['urls']['get_configs'][0], settings.configs['HostID'])
latest_configs = self.url_request(request_type,url)
latest_configs = json.loads(latest_configs)
self.monitored_services.update(latest_configs)

def forever_run(self):
'''
start the client program forever
:return:
'''
exit_flag = False
config_last_update_time = 0

while not exit_flag:
if time.time() - config_last_update_time > settings.configs['ConfigUpdateInterval']:
self.load_latest_configs() #获取最新的监控配置信息
print("Loaded latest config:", self.monitored_services)
config_last_update_time = time.time()
#start to monitor services

for service_name,val in self.monitored_services['services'].items():
if len(val) == 2:# means it's the first time to monitor
self.monitored_services['services'][service_name].append(0)
#为什么是0, 因为为了保证第一次肯定触发监控这个服务
monitor_interval = val[1]
last_invoke_time = val[2] #0
if time.time() - last_invoke_time > monitor_interval: #needs to run the plugin
print(last_invoke_time,time.time())
self.monitored_services['services'][service_name][2]= time.time() #更新此服务最后一次监控的时间
#start a new thread to call each monitor plugin
t = threading.Thread(target=self.invoke_plugin,args=(service_name,val))
t.start()
print("Going to monitor [%s]" % service_name)

else:
print("Going to monitor [%s] in [%s] secs" % (service_name,
monitor_interval - (time.time()-last_invoke_time)))

time.sleep(1)
def invoke_plugin(self,service_name,val):
'''
invoke the monitor plugin here, and send the data to monitor server after plugin returned status data each time
:param val: [pulgin_name,monitor_interval,last_run_time]
:return:
'''
plugin_name = val[0]
if hasattr(plugin_api,plugin_name):
func = getattr(plugin_api,plugin_name)
plugin_callback = func()
#print("--monitor result:",plugin_callback)

report_data = {
'client_id':settings.configs['HostID'],
'service_name':service_name,
'data':json.dumps(plugin_callback)
}

request_action = settings.configs['urls']['service_report'][1]
request_url = settings.configs['urls']['service_report'][0]

#report_data = json.dumps(report_data)
print('---report data:',report_data)
self.url_request(request_action,request_url,params=report_data)
else:
print("\033[31;1mCannot find service [%s]'s plugin name [%s] in plugin_api\033[0m"% (service_name,plugin_name ))
print('--plugin:',val)


def url_request(self,action,url,**extra_data):
'''
cope with monitor server by url
:param action: "get" or "post"
:param url: witch url you want to request from the monitor server
:param extra_data: extra parameters needed to be submited
:return:
'''
abs_url = "http://%s:%s/%s" % (settings.configs['Server'],
settings.configs["ServerPort"],
url)
if action in ('get','GET'):
print(abs_url,extra_data)
try:
req = urllib2.Request(abs_url)
req_data = urllib2.urlopen(req,timeout=settings.configs['RequestTimeout'])
callback = req_data.read()
#print "-->server response:",callback
return callback
except urllib2.URLError as e:
exit("\033[31;1m%s\033[0m"%e)

elif action in ('post','POST'):
#print(abs_url,extra_data['params'])
try:
data_encode = urllib.urlencode(extra_data['params'])
req = urllib2.Request(url=abs_url,data=data_encode)
res_data = urllib2.urlopen(req,timeout=settings.configs['RequestTimeout'])
callback = res_data.read()
callback = json.loads(callback)
print "\033[31;1m[%s]:[%s]\033[0m response:\n%s" %(action,abs_url,callback)
return callback
except Exception as e:
print('---exec',e)
exit("\033[31;1m%s\033[0m"%e)

通过反射来完成服务名与插件的映射,从而执行对应的插件

#_*_coding:utf-8_*_
__author__ = 'Alex Li'

from linux import sysinfo,load,cpu_mac,cpu,memory,network,host_alive



def LinuxCpuPlugin():
return cpu.monitor()

def host_alive_check():
return host_alive.monitor()

def GetMacCPU():
#return cpu.monitor()
return cpu_mac.monitor()

def LinuxNetworkPlugin():
return network.monitor()

def LinuxMemoryPlugin():
return memory.monitor()


def get_linux_load():
return load.monitor()

插件(以linux系统上的cpu为例)

#!/usr/bin/env python
#coding:utf-8

#apt-get install sysstat

import commands

def monitor(frist_invoke=1):
shell_command = 'sar 1 3| grep "^Average:"'
status,result = commands.getstatusoutput(shell_command)
if status != 0:
value_dic = {'status': status}
else:
value_dic = {}
#print('---res:',result)
user,nice,system,idle = result.split()[1:]
value_dic= {
'user': user,
'nice': nice,
'system': system,
'idle': idle,
'status': status
}
return value_dic

if __name__ == '__main__':
print monitor()

服务端API接口编写

(r'client/service/report/$',api_views.service_report),

REDIS_OBJ = redis_conn.redis_conn(settings)

@csrf_exempt #csrf豁免验证
def service_report(request):
print("client data:",request.POST)

if request.method == 'POST':
#REDIS_OBJ.set("test_alex",'hahaha')
try:
print('host=%s, service=%s' %(request.POST.get('client_id'),request.POST.get('service_name') ) )
data = json.loads(request.POST['data'])
#print(data)
#StatusData_1_memory_latest
client_id = request.POST.get('client_id')
service_name = request.POST.get('service_name')
#把数据存下来
data_saveing_obj = data_optimization.DataStore(client_id,service_name,data,REDIS_OBJ)

#redis_key_format = "StatusData_%s_%s_latest" %(client_id,service_name)
#data['report_time'] = time.time()
#REDIS_OBJ.lpush(redis_key_format,json.dumps(data))
#同时触发trigger检查
print("-------触发trigger检查---------")
host_obj = models.Host.objects.get(id=client_id)
service_triggers = get_host_triggers(host_obj)

trigger_handler = data_processing.DataHandler(settings,connect_redis=False)
for trigger in service_triggers:
trigger_handler.load_service_data_and_calulating(host_obj,trigger,REDIS_OBJ)
print("service trigger::",service_triggers)


except IndexError as e:
print('----->err:',e)


return HttpResponse(json.dumps("---report success---"))


举报

相关推荐

0 条评论