0
点赞
收藏
分享

微信扫一扫

关于openfalcon小米监控些许理解

古得曼_63b6 2023-05-06 阅读 35

原小米团队创建的开源项目,后面团队去了滴滴,再后来团队人员陆续解散,目前创始人已经自己开办公司快猫星云,现在openfalcon已经不维护,新出的另外一个开源监控产品叫夜莺,这是官网地址:Nightingale - 夜莺云原生监控 (n9e.github.io)

一,整个单机搭建,可以参照官方文档单机搭建步骤体验下,官方文档地址是:

单机安装 · GitBook (open-falcon.org)

二,分享下dashbord界面几个重要配置:

1,监控模板配置

关于openfalcon小米监控些许理解_Windows

关于openfalcon小米监控些许理解_sed_02

监控模板具有继承功能

2,nodata也可以配置,nodata就是监控补发值的意思,比如我监控一个服务监听端口,上传值1代表服务是监听状态,但是如果被监控机器如果出现满负载的情况下,无法推送监控值到服务端的话,那么需要我们给一个默认补发值,以此默认值判定服务出现了异常

关于openfalcon小米监控些许理解_sed_03


3,Screen画布添加

关于openfalcon小米监控些许理解_Windows_04


counters列表内容

cpu.user
df.bytes.used.percent/fstype=ext4,mount=/
df.bytes.used.percent/fstype=ext4,mount=/data
disk.io.read_bytes/device=vdb
disk.io.util/device=vdb
disk.io.write_bytes/device=vdb
mem.memused.percent
net.if.in.bytes/iface=eth0
net.if.out.bytes/iface=eth0
net.if.total.bytes/iface=eth0
net.if.total.packets/iface=eth0
online/game=online,project=all
port/port=3306


三,openfalcon可以监控linux和windows主机,那么对于监控windows主机官方是没有现在agent可以下载的,需要我们编写自定义脚本去推送windows主机监控指标数据到服务端上,以下是windows主机监控自定义推送脚本,这个可以做成定时任务,跑在windows上

# !/usr/bin/env Python
# coding=utf8
import requests, json, random
import psutil
import time
import sys
import os
import platform
import pymysql, ConfigParser,pymssql


def get_cpu_info(hostname, step):
    t = int(time.time())
    # 危险值
    cpu_c = 90
    # 警告值
    cpu_w = 70
    # 间隔检查值
    cpu_i = 2
    cpu_times1 = psutil.cpu_times()
    time.sleep(cpu_i)
    cpu_times2 = psutil.cpu_times()
    checkos = platform.system()
    if checkos == "Windows":
        cpu_t1 = cpu_times1.user + cpu_times1.system + cpu_times1.idle + cpu_times1.interrupt + cpu_times1.dpc
        cpu_t2 = cpu_times2.user + cpu_times2.system + cpu_times2.idle + cpu_times2.interrupt + cpu_times2.dpc
        cpu_io = 0
        cpu_thread = psutil.cpu_count()
        cpu_speed = 2.39
        cpu_idle = (cpu_times2.idle - cpu_times1.idle) * 100 / (cpu_t2 - cpu_t1)
        cpu_user = (cpu_times2.user - cpu_times1.user) * 100 / (cpu_t2 - cpu_t1)
        cpu_sys = (cpu_times2.system - cpu_times1.system) * 100 / (cpu_t2 - cpu_t1)
        cpu_usep = 100 - cpu_idle
        cpu_usep = psutil.cpu_percent(interval=cpu_i)
        cpu_data = [{'endpoint': hostname, 'tags': None, 'timestamp': t, 'metric': 'cpu.idle', 'value': cpu_idle,
                     'counterType': 'GAUGE', 'step': step},
                    {'endpoint': hostname, 'tags': None, 'timestamp': t, 'metric': 'cpu.user', 'value': cpu_user,
                     'counterType': 'GAUGE', 'step': step},
                    {'endpoint': hostname, 'tags': None, 'timestamp': t, 'metric': 'cpu.sys', 'value': cpu_sys,
                     'counterType': 'GAUGE', 'step': step},
                    {'endpoint': hostname, 'tags': None, 'timestamp': t, 'metric': 'cpu.system', 'value': cpu_sys,
                     'counterType': 'GAUGE', 'step': step}
                    ]
    else:
        cpu_data = []
    return cpu_data


# 获取内存信息
def get_mem_info(hostname, step):
    t = int(time.time())
    # 危险值
    mem_c = 80
    # 警告值
    mem_w = 60
    mem_info = psutil.virtual_memory()
    swap_info = psutil.swap_memory()
    # 内存
    mem_total = mem_info.total
    mem_usep = mem_info.percent
    mem_memfree = 100 - mem_usep
    mem_t = mem_info.total / 1000
    mem_user = mem_info.used / 1000
    # 交换空间
    swap_usep = swap_info.percent
    swap_user = swap_info.used / 1000
    swap_cache = 0
    # 游戏实际使用内存
    memres_usep = mem_usep
    memres_user = mem_user
    memres_game = 0
    mem_data = [
        {'endpoint': hostname, 'tags': None, 'timestamp': t, 'metric': 'mem.swapused.percent', 'value': swap_usep,
         'counterType': 'GAUGE', 'step': step},
        {'endpoint': hostname, 'tags': None, 'timestamp': t, 'metric': 'mem.memused.percent', 'value': mem_usep,
         'counterType': 'GAUGE', 'step': step},
        {'endpoint': hostname, 'tags': None, 'timestamp': t, 'metric': 'mem.memtotal', 'value': mem_total,
         'counterType': 'GAUGE', 'step': step}]
    return mem_data


# 磁盘名称
# 获取磁盘
def get_disk_info(hostname, step):
    t = int(time.time())
    # 危险值
    disk_c = 85
    # 警告值
    disk_w = 80
    disk_id = []
    # 将每个磁盘的total used free percent 分别存入到相应的list
    disk_total = []
    disk_used = []
    disk_free = []
    disk_percent = []
    disk_all = {}
    for id in psutil.disk_partitions():
        if 'cdrom' in id.opts or id.fstype == '':
            continue
        disk_name = id.device.split(':')
        s = disk_name[0]
        disk_id.append(s)
        disk_info = psutil.disk_usage(id.device)  # sh
        disk_total.append(disk_info.total)
        disk_used.append(disk_info.used)
        disk_free.append(disk_info.free)
        disk_percent.append(disk_info.percent)
    #    for i in range(len(disk_id)):
    #        print u'%s盘空闲率: %s %%  %s' % (disk_id[i],100 - disk_percent[i],disk_info,)

    disk_c_t = round(disk_total[0], 2)  # c硬盘总空间大小
    disk_c_user = round(disk_used[0], 2)  # c已使用的空间
    disk_c_usep = round(disk_used[0] / float(disk_total[0]) * 100, 2)  # c已使用空间百分比
    disk_c_idlep = 100 - disk_c_usep  # c剩余硬盘百分比
    disk_d_t = round(disk_total[1], 2)  # d硬盘总空间大小
    disk_d_user = round(disk_used[1], 2)  # d已使用的空间
    disk_d_usep = round(disk_used[1] / float(disk_total[1]) * 100, 2)  # d已使用空间百分比
    disk_d_idlep = 100 - disk_d_usep  # d剩余硬盘百分比
    #    disk_all = {disk_info_disk_C,disk_info_disk_D}
    disk_data = [{'endpoint': hostname, 'tags': "fstype=ext4,mount=/", 'timestamp': t, 'metric': 'df.bytes.total',
                  'value': disk_c_t, 'counterType': 'GAUGE', 'step': step},
                 {'endpoint': hostname, 'tags': "fstype=ext4,mount=/", 'timestamp': t,
                  'metric': 'df.bytes.free.percent', 'value': disk_c_idlep, 'counterType': 'GAUGE', 'step': step},
                 {'endpoint': hostname, 'tags': "fstype=ext4,mount=/", 'timestamp': t, 'metric': 'df.bytes.used',
                  'value': disk_c_user, 'counterType': 'GAUGE', 'step': step},
                 {'endpoint': hostname, 'tags': "fstype=ext4,mount=/", 'timestamp': t,
                  'metric': 'df.bytes.used.percent', 'value': disk_c_usep, 'counterType': 'GAUGE', 'step': step},
                 {'endpoint': hostname, 'tags': "fstype=ext4,mount=/data", 'timestamp': t, 'metric': 'df.bytes.total',
                  'value': disk_d_t, 'counterType': 'GAUGE', 'step': step},
                 {'endpoint': hostname, 'tags': "fstype=ext4,mount=/data", 'timestamp': t,
                  'metric': 'df.bytes.free.percent', 'value': disk_d_idlep, 'counterType': 'GAUGE', 'step': step},
                 {'endpoint': hostname, 'tags': "fstype=ext4,mount=/data", 'timestamp': t, 'metric': 'df.bytes.used',
                  'value': disk_d_user, 'counterType': 'GAUGE', 'step': step},
                 {'endpoint': hostname, 'tags': "fstype=ext4,mount=/data", 'timestamp': t,
                  'metric': 'df.bytes.used.percent', 'value': disk_d_usep, 'counterType': 'GAUGE', 'step': step},
                 ]

    return disk_data


def get_network_info(hostname, step):
    t = int(time.time())
    checkos = platform.system()
    if checkos == "Windows":
        net_info = psutil.net_io_counters()
        time.sleep(1)
        net_info1 = psutil.net_io_counters()
        net_sent = net_info1.bytes_sent - net_info.bytes_sent
        net_recv = net_info1.bytes_recv - net_info.bytes_recv
        net_psent = net_info1.packets_sent - net_info.packets_sent
        net_precv = net_info1.packets_recv - net_info.packets_recv
        net_speed = net_sent + net_recv
        net_pspeed = net_psent + net_precv
        net_info_dict = {'net.if.in.bytes': [net_recv, "iface=eth0"], 'net.if.out.bytes': [net_sent, "iface=eth0"],
                         'net.if.in.packets': [net_precv, "iface=eth0"],
                         'net.if.out.packets': [net_psent, "iface=eth0"],
                         'net.if.total.bytes': [net_speed, "iface=eth0"],
                         'net.if.total.packets': [net_pspeed, 'iface=eth0'], 'agent.alive': [1, None]}
        net_data = [
            {'endpoint': hostname, 'tags': "iface=eth0", 'timestamp': t, 'metric': 'net.if.in.bytes', 'value': net_recv,
             'counterType': 'GAUGE', 'step': step},
            {'endpoint': hostname, 'tags': "iface=eth0", 'timestamp': t, 'metric': 'net.if.out.bytes',
             'value': net_sent, 'counterType': 'GAUGE', 'step': step},
            {'endpoint': hostname, 'tags': "iface=eth0", 'timestamp': t, 'metric': 'net.if.in.packets',
             'value': net_precv, 'counterType': 'GAUGE', 'step': step},
            {'endpoint': hostname, 'tags': "iface=eth0", 'timestamp': t, 'metric': 'net.if.total.bytes',
             'value': net_speed, 'counterType': 'GAUGE', 'step': step},
            {'endpoint': hostname, 'tags': "iface=eth0", 'timestamp': t, 'metric': 'net.if.total.packets',
             'value': net_pspeed, 'counterType': 'GAUGE', 'step': step},
            ]
    else:
        net_data = []
    return net_data


def get_io_info(hostname, step):
    t = int(time.time())
    checkos = platform.system()
    if checkos == "Windows":
        '''
        c盘io
        '''
        io_info_c = os.popen(
            "wmic path Win32_PerfFormattedData_PerfDisk_LogicalDisk get Name,DiskTransfersPerSec,DiskReadBytesPerSec,DiskWriteBytesPerSec|grep 'C:'").read().strip()
        io_read_c = io_info_c.split()[0]
        io_p_c = io_info_c.split()[1]
        io_write_c = io_info_c.split()[2]
        io_t_c = int(io_read_c) + int(io_write_c)
        '''
        d盘io
        '''
        io_info_d = os.popen(
            "wmic path Win32_PerfFormattedData_PerfDisk_LogicalDisk get Name,DiskTransfersPerSec,DiskReadBytesPerSec,DiskWriteBytesPerSec|grep 'D:'").read().strip()
        io_read_d = io_info_d.split()[0]
        io_p_d = io_info_d.split()[1]
        io_write_d = io_info_d.split()[2]
        io_t_d = int(io_read_d) + int(io_write_d)
        io_info_c = {'disk.io.read_bytes': [io_read_c, 'device=vda'], 'disk.io.write_bytes': [io_write_c, 'device=vda'],
                     'disk.io.util': [io_t_c, 'device=vda']}
        io_info_d = {'disk.io.read_bytes': [io_read_d, 'device=vdb'], 'disk.io.write_bytes': [io_write_d, 'device=vdb'],
                     'disk.io.util': [io_t_d, 'device=vdb']}
        io_data = [{'endpoint': hostname, 'tags': "device=vda", 'timestamp': t, 'metric': 'disk.io.read_bytes',
                    'value': io_read_c, 'counterType': 'GAUGE', 'step': step},
                   {'endpoint': hostname, 'tags': "device=vdb", 'timestamp': t, 'metric': 'disk.io.read_bytes',
                    "value": io_read_d, 'counterType': 'GAUGE', 'step': step},
                   {'endpoint': hostname, 'tags': "device=vda", 'timestamp': t, 'metric': 'disk.io.write_bytes',
                    'value': io_write_c, 'counterType': 'GAUGE', 'step': step},
                   {'endpoint': hostname, 'tags': "device=vda", 'timestamp': t, 'metric': 'disk.io.util',
                    'value': io_t_c, 'counterType': 'GAUGE', 'step': step},
                   {'endpoint': hostname, 'tags': "device=vdb", 'timestamp': t, 'metric': 'disk.io.write_bytes',
                    'value': io_write_d, 'counterType': 'GAUGE', 'step': step},
                   {'endpoint': hostname, 'tags': "device=vdb", 'timestamp': t, 'metric': 'disk.io.util',
                    'value': io_t_d, 'counterType': 'GAUGE', 'step': step},
                   ]
    else:
        io_data = []
    return io_data


def tcp_content(hostname, step):
    t = int(time.time())
    tcp_es = os.popen('netstat -ano|find "ESTABLISHED" /c').read().strip().replace('\r', '').replace('\n', '')
    tcp_listen = os.popen('netstat -ano|find "LISTENING" /c').read().strip().replace('\r', '').replace('\n', '')
    tcp_wait = os.popen('netstat -ano|find "TIME_WAIT" /c').read().strip().replace('\r', '').replace('\n', '')
    # print(tcp_es,tcp_listen)
    tcp_data = [{'endpoint': hostname, 'tags': "count", 'timestamp': t, 'metric': 'tcp.listen', 'value': tcp_es,
                 'counterType': 'GAUGE', 'step': step},
                {'endpoint': hostname, 'tags': "count", 'timestamp': t, 'metric': 'tcp.established',
                 "value": tcp_listen, 'counterType': 'GAUGE', 'step': step},
                {'endpoint': hostname, 'tags': "count", 'timestamp': t, 'metric': 'tcp.time.wait', "value": tcp_wait,
                 'counterType': 'GAUGE', 'step': step},
                ]
    # print(tcp_data)
    return tcp_data


def get_hostname(ip):
    t = int(time.time())
    try:
        with open(r'c:\cygwinroot\etc\hostname.txt', 'r') as fp:
            host_name = fp.read().strip()
    except Exception,e:
        print e
        ip = requests.get(r'http://106.14.207.70:888/count/getip').text.strip()
        # print(ip)
        host_name = ip
    return host_name


# def mysql_log(hostname):
# t=time.strftime("%Y-%m-%d %H:%M:%S")
###print(t,type(t))
# month=time.strftime("%Y-%m")
# user="root"
# pwd="ODUwNmQxYzxycq2017."
# log_path="D:\mysql\logs"
# sql1="SELECT id, user, command, time, state, info FROM information_schema.processlist WHERE command != 'sleep';"
# sql2=" SELECT * FROM information_schema.innodb_locks\G "
# sql3=" SELECT * FROM information_schema.innodb_trx\G "
# processlist='mysql -u%s -p%s -e "%s"' %(user,pwd,sql1)
# innodb_locks='mysql -u%s -p%s -e "%s"' %(user,pwd,sql2)
# innodb_trx='mysql -u%s -p%s -e "%s"' %(user,pwd,sql3)
# print(processlist)
# print(innodb_locks)
# log_file=open(r'%s\%s.log' %(log_path,month),'a')
# log_file.write("Start-time-----"+t)
# log_file.write('\n')
# log_file.write("----information_schema.processlist---\n")
# log_file.write("\t"+os.popen(processlist).read())
# log_file.write('\n')
# log_file.write("----information_schema.innodb_locks---\n")
# log_file.write("\t"+os.popen(innodb_locks).read())
# log_file.write('\n')
# log_file.write("----information_schema.innodb_trx---\n")
# log_file.write("\t"+os.popen(innodb_locks).read()+'\n')
# log_file.write("End_time---%s"'\n' %(time.strftime("%Y-%m-%d %H:%M:%S")))
# log_file.close()

def connect(host, user, password, dbname):
    try:
        conn = pymssql.connect(host=host, user=user, password=password, database=dbname,timeout=40)
    except pymysql.OperationalError:  # 账号或密码等等错误
        print('连接失败!')
        return None
    cur = conn.cursor()  # 游标
    return cur


def exec_sql(cmd, cur):
    cur.execute(cmd)
    num = cur.fetchone()  # 返回在线数量
    return num


def online_info(hostname, step):
    t = int(time.time())
    host = '127.0.0.1'
    online_data = []  # 所有渠道的空列表
    all_oper = []
    try:
        user = os.popen("grep sDBUser /cygdrive/d/game/server/*_s*/Config.json").readline().strip().split('"')[3]
        password = os.popen("grep sDBPsw /cygdrive/d/game/server/*_s*/Config.json").readline().strip().split('"')[-2]
        game_dirs = os.listdir('d:/game/server/')  # 生成game_dirs列表,3v3没有server目录
    except:
        pass
    else:
        # os.popen("grep sDBName /cygdrive/d/game/server/cssy_android_gaore_s10338/Config.json").read().strip().split('"')[-2]
        db_list = []
        for game_dir in game_dirs:
            if os.path.isdir('d:/game/server/%s' % game_dir):  # server目录下有个执行脚本,防止报错
                status = os.popen("cat /cygdrive/d/game/server/%s/scripts/run_status" % game_dir).read().strip()
                if int(status) != 2:  # 当状态值不为2时生成db_name,状态值为2为被合服
                    #                   print(status)
                    db_name = os.popen("grep sDBName /cygdrive/d/game/server/%s/Config.json" % game_dir).read().strip().split('"')[-2]
                    db_list.append(db_name)
        #    print(db)
        #    for db_name in db:
        #        db_list.append(db_name.split('"')[-2])
        all_online_num = 0
        for dbname in db_list:
            cur = connect(host, user, password, dbname)  # 进行连接 返回
            if cur:
                cmd = 'select top 1 OnlineCount from Mir_Online  ORDER by UpdateTime DESC '  # 查询在线人数
                print(cmd)
                num = exec_sql(cmd, cur)
                if num:  # 判断在线数量是否为空
                    all_online_num = all_online_num + num[0]
                    dbname = {'online': [num[0], 'game=online,project=%s' % dbname]}
                    all_oper.append(dbname)
                    online_data = online_data + [
                        {'endpoint': hostname, 'tags': 'game=online,project=%s' % dbname, 'timestamp': t,
                         'metric': 'online', 'value': num[0], 'counterType': 'GAUGE', 'step': step}]
                    cur.close()
            else:
                return None
        online_data = online_data + [
            {'endpoint': hostname, 'tags': "game=online,project=all", 'timestamp': t, 'metric': 'online',
             'value': all_online_num, 'counterType': 'GAUGE', 'step': step}]
        # all_oper.append(online_data)
    return online_data


def post_data():
    step = 60
    ip = requests.get(r'http://106.14.207.70:888/count/getip').text.strip()
    # print(ip)
    hostname = get_hostname(ip)
    # print(hostname)
    # mysql_log(hostname)
    url_list = ["http://xx.xxx.xxx.xxx:1988/v1/push", "http://xx.xxx.xxx.xx:1988/v1/push",
                "http://xx.xx.xx.xx:1988/v1/push"]
    try:
        cpu_data = get_cpu_info(hostname, step)
        mem_data = get_mem_info(hostname, step)
        disk_data = get_disk_info(hostname, step)
        net_data = get_network_info(hostname, step)
        io_data = get_io_info(hostname, step)
        tcp_data = tcp_content(hostname, step)
        online_data = online_info(hostname, step)
        # print(cpu_data,mem_data,disk_data,net_data,io_data)
        data = cpu_data + mem_data + disk_data + net_data + io_data + tcp_data + online_data
        print(data)
        url = random.choice(url_list)
        req = requests.post(url=url, data=json.dumps(data), timeout=30)
        print(req.text)
    except Exception, e:
        print e


if __name__ == "__main__":
    post_data()


四,openfalcon可以监控多种服务,数据库,中间件,域名,功能强大,下面分享对redis和rabbitmq的监控编写的脚本

监控redis的

#!/bin/env python
#-*- coding:utf-8 -*-

__author__ = 'iambocai'

import json
import time
import socket
import os
import re
import sys
import commands
import urllib2, base64

class RedisStats:
    # 如果你是自己编译部署到redis,请将下面的值替换为你到redis-cli路径
    _redis_cli = '/usr/bin/redis-cli'
    _stat_regex = re.compile(ur'(\w+):([0-9]+\.?[0-9]*)\r')

    def __init__(self,  port='6379', passwd=None, host='127.0.0.1'):
        self._cmd = '%s -h %s -p %s info' % (self._redis_cli, host, port)
        if passwd not in ['', None]:
            self._cmd = '%s -h %s -p %s -a %s info' % (self._redis_cli, host, port, passwd)

    def stats(self):
        ' Return a dict containing redis stats '
        info = commands.getoutput(self._cmd)
        return dict(self._stat_regex.findall(info))


def main():
    ip = socket.gethostname()
    timestamp = int(time.time())
    step = 60
    # inst_list中保存了redis配置文件列表,程序将从这些配置中读取port和password,建议使用动态发现的方法获得,如:
    # inst_list = [ i for i in commands.getoutput("find  /etc/ -name 'redis*.conf'" ).split('\n') ]
    insts_list = [ '/etc/redis.conf' ]
    p = []
    
    monit_keys = [
        ('connected_clients','GAUGE'), 
        ('blocked_clients','GAUGE'), 
        ('used_memory','GAUGE'),
        ('used_memory_rss','GAUGE'),
        ('mem_fragmentation_ratio','GAUGE'),
        ('total_commands_processed','COUNTER'),
        ('rejected_connections','COUNTER'),
        ('expired_keys','COUNTER'),
        ('evicted_keys','COUNTER'),
        ('keyspace_hits','COUNTER'),
        ('keyspace_misses','COUNTER'),
        ('keyspace_hit_ratio','GAUGE'),
    ]
  
    for inst in insts_list:
        port = commands.getoutput("sed -n 's/^port *\([0-9]\{4,5\}\)/\\1/p' %s" % inst)
        passwd = commands.getoutput("sed -n 's/^requirepass *\([^ ]*\)/\\1/p' %s" % inst)
        metric = "redis"
        endpoint = ip
        tags = 'port=%s' % port

        try:
            conn = RedisStats(port, passwd)
            stats = conn.stats()
        except Exception,e:
            continue

        for key,vtype in monit_keys:
            #一些老版本的redis中info输出的信息很少,如果缺少一些我们需要采集的key就跳过
            if key not in stats.keys():
                continue
            #计算命中率
            if key == 'keyspace_hit_ratio':
                try:
                    value = float(stats['keyspace_hits'])/(int(stats['keyspace_hits']) + int(stats['keyspace_misses']))
                except ZeroDivisionError:
                    value = 0
            #碎片率是浮点数
            elif key == 'mem_fragmentation_ratio':
                value = float(stats[key])
            else:
                #其他的都采集成counter,int
                try:
                    value = int(stats[key])
                except:
                    continue
            
            i = {
                'Metric': '%s.%s' % (metric, key),
                'Endpoint': endpoint,
                'Timestamp': timestamp,
                'Step': step,
                'Value': value,
                'CounterType': vtype,
                'TAGS': tags
            }
            p.append(i)
        

    print json.dumps(p, sort_keys=True,indent=4)
    method = "POST"
    handler = urllib2.HTTPHandler()
    opener = urllib2.build_opener(handler)
    url = 'http://127.0.0.1:1988/v1/push'
    request = urllib2.Request(url, data=json.dumps(p) )
    request.add_header("Content-Type",'application/json')
    request.get_method = lambda: method
    try:
        connection = opener.open(request)
    except urllib2.HTTPError,e:
        connection = e

    # check. Substitute with appropriate HTTP code.
    if connection.code == 200:
        print connection.read()
    else:
        print '{"err":1,"msg":"%s"}' % connection
if __name__ == '__main__':
    proc = commands.getoutput(' ps -ef|grep %s|grep -v grep|wc -l ' % os.path.basename(sys.argv[0]))
    sys.stdout.flush()
    #print proc
    if int(proc) < 5:
        main()


监控mq的

#!/bin/env python
#-*- coding:utf-8 -*-

__author__ = 'pengyang'

import sys, urllib2, base64, json, time,socket


step = 60
ip = socket.gethostname()
ts = int(time.time())
keys = (u'messages_ready', u'messages_unacknowledged')
rates = ('ack', 'deliver', 'deliver_get', 'publish')

request = urllib2.Request("http://%s:15672/api/queues" %ip)
# see #issue4
base64string = base64.b64encode('fx_jiuzhou:boDmZ9IorfMVV5Wu')
request.add_header("Authorization", "Basic %s" % base64string)   
result = urllib2.urlopen(request)
data = json.loads(result.read())
tag = ''
#tag = sys.argv[1].replace('_',',').replace('.','=')

p = []
for queue in data:
	# ready and unack
        print queue
	msg_total = 0
	for key in keys:
		q = {}
		q["endpoint"] = ip
		q['timestamp'] = ts
		q['step'] = step
		q['counterType'] = "GAUGE"
		q['metric'] = 'rabbitmq.%s' % key
		q['tags'] = 'name=%s,%s' % (queue['name'],tag)
                #print queue[key]
                if key in queue:
		  q['value'] = int(queue[key])
                  msg_total += q['value']
                else:
                  pass	
		p.append(q)

	# total
	q = {}
	q["endpoint"] = ip
	q['timestamp'] = ts
	q['step'] = step
	q['counterType'] = "GAUGE"
	q['metric'] = 'rabbitmq.messages_total'
	q['tags'] = 'name=%s,%s' % (queue['name'],tag)
	q['value'] = msg_total
	p.append(q)
	
	# rates
	for rate in rates:
		q = {}
		q["endpoint"] = ip
		q['timestamp'] = ts
		q['step'] = step
		q['counterType'] = "GAUGE"
		q['metric'] = 'rabbitmq.%s_rate' % rate
		q['tags'] = 'name=%s,%s' % (queue['name'],tag)
		try:
			q['value'] = int(queue['message_stats']["%s_details" % rate]['rate'])
		except:
			q['value'] = 0
		p.append(q)

print json.dumps(p, indent=4)


method = "POST"
handler = urllib2.HTTPHandler()
opener = urllib2.build_opener(handler)
url = 'http://xx.xxx.xxx.xx:1988/v1/push'
request = urllib2.Request(url, data=json.dumps(p) )
request.add_header("Content-Type",'application/json')
request.get_method = lambda: method
try:
    connection = opener.open(request)
except urllib2.HTTPError,e:
    connection = e

# check. Substitute with appropriate HTTP code.
if connection.code == 200:
    print connection.read()
else:
    print '{"err":1,"msg":"%s"}' % connection


五,报警类型丰富,支持钉钉,邮件,短信,电话...

下面分享下对于短信报警改如何对接,首先你得去云商那边申请一个短信模板,模板审核通过后,通过阿里云api方式能调用短信

最后再进行一层封装成接口这种http://ops.xxx.xxx.com/dns/api/v2/sms/send/(这一层不会的话可以让公司运维开发去编写),方便给短信报警脚本去调用,短信报警脚本,脚本名字openfalcon_sms.py如下:

# coding=utf-8
import sys

import time
import hashlib
import urllib
import urllib.request
from flask import Flask, request
import json
import requests

_author_ = 'peng'
'''
短信接口
'''
app = Flask(__name__)


@app.route('/sms/send', methods=['GET', 'POST'])
def check():
    # 默认返回内容
    if request.method == 'POST':
        return_dict = {'code': '0', 'message': '处理成功'}
        # 判断参数是否为空
        if request.args is None:
            return_dict['code'] = '5004'
            return_dict['message'] = '请求参数为空'
            return json.dumps(return_dict, ensure_ascii=False)
        # 获取传入的参数
        mobile = request.form['tos']  # 获取收件人手机号
        content = request.form['content']  # 获取语音内容
        statu = content.split('[]')[0].replace('[', '').replace(']', ' ').strip().split(' ')[1]  # 获取状态
        host = content.split('[]')[0].replace('[', '').replace(']', ' ').strip().split(' ')[2]  # 获取主机
        msg = content.split('[]')[1].replace('[', '').replace(']', ' ').strip().split(' ')[0]  # 获取报警信息
        value = content.split('[]')[1].replace('[', '').replace(']', ' ').strip().split(' ')[4].split('>')[0]  # 报警值
        if statu == "PROBLEM":
            # content="主机%s%s当前值为%s"%(host,msg,value)
            # mobile = mobile.split(',')  #可以添加多个联系人
            # for i in mobile:
            send(mobile, content, host)
        return content


def send(mobile, content, host):
    url = 'http://ops.xxx.xxx.com/dns/api/v2/sms/send/'  # 请求地址,使用时修改为线上地址
    data = {
        "project": "lyzt",
        'supply': "jw",  # 供应商简称 九玩: jw, 盛和: sh
        'tpl_code': "SMS_202810705",
        'tpl_params': {},
        "phones": mobile
    }
    tpl_params = {
        "name": host,
        "msg": content
    }
    data["tpl_params"] = json.dumps(tpl_params)
    token = 'kum0qroumm1nueur'

    # 键值对按key排序
    sdata = sorted(data.items())
    s_list = []
    for s in sdata:
        s_list.append('%s=%s' % (s[0], s[1]))
    nstr = '&'.join(s_list)

    # 对生成字符串md5加密
    m = hashlib.md5()
    m.update((nstr + token).encode())
    sign = m.hexdigest()
    # sign加入请求参数
    data['sign'] = sign
    print(data)
    # 发送请求
    res = requests.post(url, data=data, timeout=300)
    result = res.json()
    print(result)
    # req = urllib.request.urlopen(
    #    url = 'http://ops.xx.xxxx.com/dns/api/v2/sms/send/',#平台发送短信请求地址
    #    data = urllib.parse.urlencode(data).encode('utf-8')
    # )
    # content  = req.text
    # print(content)


if __name__ == '__main__':
    app.run(
        debug=True
    )

然后将这个脚本运行放到后台去运行

screen -R "短信报警"
python3  openfalcon_sms.py

然后修改alarm配置文件

关于openfalcon小米监控些许理解_推送_05

最后重启alarm组件,那么短信报警媒介到这里就配置完成了


六,openfalcon graph负载均衡搭建,现在随着公司机器越来越多,单机架构无法支撑起上万台机器数据的监控要求,可以对graph组件做扩展,下面是配置新graph节点过程:

新节点环境安装:首先购买机器和之前的机器需内网互通,进行简单的环境部署,wget http://repo.mysql.com/mysql-community-release-el7-5.noarch.rpm&&rpm -ivh mysql-community-release-el7-5.noarch.rpm&&systemctl start mysql&&systemctl enable mysqld.service,然后拿出事先准备好的已编译好的包拿到服务器上解压,然后初始化数据库

mysql -uroot -pNsSwEvf37De6CecB5kt <1_uic-db-schema.sql

mysql -uroot -pNsSwEvf37De6CecB5kt <2_portal-db-schema.sql

mysql -uroot -pNsSwEvf37De6CecB5kt <3_dashboard-db-schema.sql

mysql -uroot -pNsSwEvf37De6CecB5kt <4_graph-db-schema.sql

mysql -uroot -pNsSwEvf37De6CecB5kt <5_alarms-db-schema.sql

过程简单不用赘述

整个扩容访问原理:

从修改Transfer开始,流量会按新的哈希规则进入到原始集群和扩容集群;此时扩容集群发现,migrate开关是打开状态;于是,扩容集群接收到流量之后,并没有很着急的去落盘,而是先按照旧的哈希规则从原始集群拉取历史数据(本质上就是一个rrd文件),拉取成功则将整个rrd文件写入本地,若拉取超时(默认1s),则将此次接收到的数据发送给旧的集群,下一个周期会再次重复此过程。


同样的,Query的查询,也是按照新的哈希规则。当查询的流量到达扩容集群,如果Graph发现,本地已有RRD文件,则直接查询返回;如果本地无RRD文件,则Graph会通过旧的哈希规则,去原始集群中拉取到旧数据;然后跟自己cache中的数据做一个聚合,再返回给查询者。


整个过程从技术上来说,可以说是:无损的、可以热迁移。


流程:

  1. 修改新增加的graph的cfg.json
  2. 启动新增加节点的graph

关于openfalcon小米监控些许理解_sed_06


3.修改新增节点的agent

关于openfalcon小米监控些许理解_sed_07


关于openfalcon小米监控些许理解_sed_08


4,新增节点启动agent,这个组件可要可不要,如果后面你自定义推送脚本有用到这个:1988的话那么这个组件还是得启动的

  1. 重启所有节点的graph需要注意的是:当我们执行./open-falcon restart graph这个命令重启时,日志显示如下:
    明显还在强制数据落盘,但是6070端口并没有监听起来,很好的一个检查方式就是我们主节点telnet下
    因为我买的节点都是内网互通所以可以这样探测
    所以你需要先检出graph的pid
    然后强制kill -9 13475
    然后重新启动./open-falcon restart graph
    启动成功的日志显示应该是这样的
  2. 修改所有节点的transfer的cfg.json如下:
  3. 重启所有节点的transfer

这时候,transfer就会将接收到的数据,发送给扩容后的graph实例;同时graph实例,会自动进行数据的rebalance,rebalance的过程持续时间长短,与待迁移的counter数量以及graph机器的负载、性能有关系。

8.修改主节点的api的配置,并重启api进程

关于openfalcon小米监控些许理解_sed_09


./open-falcon restart api


9.到此扩容动作基本完成,如何确认数据rebalance已经完成?

目前只能通过观察graph内部的计数器,来判断整个数据迁移工作是否完成;观察方法如下:对所有新扩容的graph实例,访问其统计接口http://127.0.0.1:6071/counter/migrate 观察到所有的计数器都不再变化,那么就意味着迁移工作完成啦。


新增扩容节点迁移完成后关闭graph中的migrate后服务重启。


总结:

扩容失败会导致迁移期间数据丢失,总之开弓没有回头箭,而且我们还可以通过画布查询看监控指标是否丢失判断扩容是否成功,失败案例显示:

关于openfalcon小米监控些许理解_推送_10


参考文档:https://book.open-falcon.org/zh_0_2/practice/graph-scaling.html

举报

相关推荐

0 条评论