0
点赞
收藏
分享

微信扫一扫

利用Jaeger+Cassandra生成微服务历史执行图并求关键路径

Benchmark:SocialNetwork

一.分析Cassandra存储数据

Jaeger将请求的Span存储到Cassandra数据库中,如果知道数据库中同一Trace各Span之间的父子关系,就可以对微服务应用生成历史执行图。那么只需要在Jaeger采集数据的时候,记录微服务间的父子关系就ok。查找数据库中的信息,发现有如下信息:

在operation_name一列中,由对应client与server的记录,说明两者是父子关系,则可以通过operation_name对应用生成历史执行图。


二.处理Trace数据

需要记录当前微服务名称,操作名称,以及执行微服务所使用的时间。Trace_id是唯一的,可通过trace_id来进行提取。

提取信息如下(部分):

然后就是对文件的简单读写了。

    def get_trace(self):

        #name,dict = self.get_map()
        with open('trace.csv','r') as f:
            list = f.read()
        f.close()
        list = list.split('\n')
        trace = [[]for i in range(3)]
        for i in range(len(list)-1):
            tmp = list[i].split(',')
            for j in range(len(tmp)):
                trace[j].append(tmp[j])
        return trace

 


三.生成历史执行图

利用前面的分析,只需将对应的client与server对应起来,就可以得到微服务之间的父子关系,就是一个简单的微服务名字的映射。

    def get_graph(self):
    
        G = [[0 for i in range(12)]for i in range(12)]
        trace = self.get_trace()
        name,dict = self.get_map()
        for i in range(len(trace[0])):
            if trace[1][i] == '':
                continue
            tmp1 = dict[trace[0][i]]
            name1 = trace[0][i]
            op1 = trace[1][i]
            if op1[-6:] == 'server' or op1[-7:] == 'compose':
                continue
            for j in range(len(trace[0])):
                if trace[1][j] == '':
                    continue
                name2 = trace[0][j]
                tmp2 = dict[trace[0][j]]
                op2 = trace[1][j]
                if op2[-6:] == 'client' or op2[-7:] == 'compose':
                    continue
                if name1 == name2:
                    continue
                op11 = op1[:-6]
                op22 = op2[:-6]
                if op11 == op22:
                    G[tmp1][tmp2] = 1
                    #print(op1+','+op2)
        print("历史执行图:")
        for i in range(12):
            for j in range(12):
                if G[i][j] == 1:
                    print(name[i]+'--->'+name[j])
        return G

四.查找所有关键路径

将微服务的运行时间做为边长,找到微服务应用运行过程中关键路径,对进行异常处理有巨大的帮助。此处没有选择最长路径的方法查找,而是求出了历史执行图中的所有执行路径,然后求得所有路径的长度,最后得到所有的关键路径(关键路径有可能不止一条)。

   def dfs(self,pos,out,stack,vis,cnt):

        vis[pos] = 1
        stack[cnt] = pos
        cnt += 1
        if out[pos] == 0:
            for i in range(cnt):
                self.micro[self.top].append(stack[i])
            self.top += 1
                #print(str(stack[i])+'\t',end='')
            #print('\n')
            return
        for i in range(12):
            if self.G[pos][i] == 1 and vis[i] == 0:
                self.dfs(i,out,stack,vis,cnt)
        vis[pos] = 0
        cnt -= 1


    def get_CP(self):
        
        trace = self.get_trace()
        G = self.G
        slack = self.slack
        name,dict = self.get_map()
        In = [] #入度,为0的是起点
        Out = [] #出度,为0的是终点
        for i in range(12):
            sum = 0
            s = 0
            for j in range(12):
                if G[j][i] == 1:
                    sum += 1
                if G[i][j] == 1:
                    s += 1
            In.append(sum)
            Out.append(s)
        start = 0
        for i in range(len(In)):
            if int(In[i]) == 0:
                start = i
                break
        latency = [0 for i in range(12)]
        for i in range(len(trace[0])):
            tmp = dict[trace[0][i]]
            latency[tmp] = max(latency[tmp],int(trace[2][i]))
        stack = [0 for i in range(12)]
        vis = [0 for i in range(12)]
        cnt = 0
        self.dfs(start,Out,stack,vis,cnt)
        micro = self.micro
        ans = [[]for i in range(100)]
        G_la = []
        print("打印所有路径:")
        for i in micro:
            if len(i) != 0:
                for j in range(len(i)):
                    if j == len(i)-1:
                        print(name[i[j]])
                    else:
                        print(name[i[j]]+'--->',end='')
                sum_la = 0
                for j in range(len(i)):
                    sum_la += int(latency[i[j]])
                G_la.append(sum_la)
        MAX = -1
        ans = [[]for i in range(12)]
        t = 0
        for i in range(len(G_la)):
            if G_la[i] >= MAX:
                MAX = G_la[i]
        for i in range(len(G_la)):
            if G_la[i] == MAX:
                for j in range(len(micro[i])):
                    ans[t].append(micro[i][j])
                t += 1
        print("关键微服务:")
        for i in range(t):
            for j in range(len(ans[i])):
                if latency[j] >=  int(slack[j]):
                    print(name[j])

五.运行结果


六.完整代码

#!/usr/bin/env python
import numpy
import os
import sys

class Tra():

    def __init__(self):
       
       self.G = self.get_graph()
       self.slack = self.get_slack()
       self.micro = [[]for i in range(100)]
       self.top = 0

    def PrintPath(self,i,path):
        
        list = []
        list.append(i)
        while path[i] != -1:
            i = path[i]
            list.append(i)

        return list
        
    def get_DP(self,i,dp,G_la,path):
        
        if dp[i] > 0:
            return dp[i]
        for j in range(12):
            if G_la[i][j] != 0:
                #print(str(i)+'--->'+str(j))
                tmp = G_la[i][j]+self.get_DP(j,dp,G_la,path)
                if dp[i] <= tmp and path[i][0] == -1:
                    dp[i] = tmp
                    path[i][0] = j
                if dp[i] == tmp and path[i][0] != -1:
                    path[i].append(j)

        return dp[i]

    def dfs(self,pos,out,stack,vis,cnt):

        vis[pos] = 1
        stack[cnt] = pos
        cnt += 1
        if out[pos] == 0:
            for i in range(cnt):
                self.micro[self.top].append(stack[i])
            self.top += 1
                #print(str(stack[i])+'\t',end='')
            #print('\n')
            return
        for i in range(12):
            if self.G[pos][i] == 1 and vis[i] == 0:
                self.dfs(i,out,stack,vis,cnt)
        vis[pos] = 0
        cnt -= 1


    def get_CP(self):
        
        trace = self.get_trace()
        G = self.G
        slack = self.slack
        name,dict = self.get_map()
        In = [] #入度,为0的是起点
        Out = [] #出度,为0的是终点
        for i in range(12):
            sum = 0
            s = 0
            for j in range(12):
                if G[j][i] == 1:
                    sum += 1
                if G[i][j] == 1:
                    s += 1
            In.append(sum)
            Out.append(s)
        start = 0
        for i in range(len(In)):
            if int(In[i]) == 0:
                start = i
                break
        latency = [0 for i in range(12)]
        for i in range(len(trace[0])):
            tmp = dict[trace[0][i]]
            latency[tmp] = max(latency[tmp],int(trace[2][i]))
        stack = [0 for i in range(12)]
        vis = [0 for i in range(12)]
        cnt = 0
        self.dfs(start,Out,stack,vis,cnt)
        micro = self.micro
        ans = [[]for i in range(100)]
        G_la = []
        print("打印所有路径:")
        for i in micro:
            if len(i) != 0:
                for j in range(len(i)):
                    if j == len(i)-1:
                        print(name[i[j]])
                    else:
                        print(name[i[j]]+'--->',end='')
                sum_la = 0
                for j in range(len(i)):
                    sum_la += int(latency[i[j]])
                G_la.append(sum_la)
        MAX = -1
        ans = [[]for i in range(12)]
        t = 0
        for i in range(len(G_la)):
            if G_la[i] >= MAX:
                MAX = G_la[i]
        for i in range(len(G_la)):
            if G_la[i] == MAX:
                for j in range(len(micro[i])):
                    ans[t].append(micro[i][j])
                t += 1
        print("关键微服务:")
        for i in range(t):
            for j in range(len(ans[i])):
                if latency[j] >=  int(slack[j]):
                    print(name[j])

    def get_slack(self):

        with open('slack.txt') as f:
            list = f.read()
        f.close()
        list = list.split('\n')
        slack = []
        for i in range(len(list)-1):
            slack.append(list[i])

        return slack

    def get_graph(self):
    
        G = [[0 for i in range(12)]for i in range(12)]
        trace = self.get_trace()
        name,dict = self.get_map()
        for i in range(len(trace[0])):
            if trace[1][i] == '':
                continue
            tmp1 = dict[trace[0][i]]
            name1 = trace[0][i]
            op1 = trace[1][i]
            if op1[-6:] == 'server' or op1[-7:] == 'compose':
                continue
            for j in range(len(trace[0])):
                if trace[1][j] == '':
                    continue
                name2 = trace[0][j]
                tmp2 = dict[trace[0][j]]
                op2 = trace[1][j]
                if op2[-6:] == 'client' or op2[-7:] == 'compose':
                    continue
                if name1 == name2:
                    continue
                op11 = op1[:-6]
                op22 = op2[:-6]
                if op11 == op22:
                    G[tmp1][tmp2] = 1
                    #print(op1+','+op2)
        print("历史执行图:")
        for i in range(12):
            for j in range(12):
                if G[i][j] == 1:
                    print(name[i]+'--->'+name[j])
        return G

    def get_map(self):
    
        with open('servicename.txt','r') as f:
            list = f.read()
        f.close()
        list = list.split('\n')
        dict = {}
        name = []
        for i in range(12):
            tmp = list[i].split(',')
            dict[tmp[0]] = i
            name.append(tmp[0])
        #    print(str(i)+'---'+name[i])
    
        return name,dict

    def get_trace(self):

        #name,dict = self.get_map()
        with open('trace.csv','r') as f:
            list = f.read()
        f.close()
        list = list.split('\n')
        trace = [[]for i in range(3)]
        for i in range(len(list)-1):
            tmp = list[i].split(',')
            for j in range(len(tmp)):
                trace[j].append(tmp[j])
        return trace

if __name__ == '__main__':

    A = Tra()
    A.get_CP()

 

举报

相关推荐

0 条评论