Benchmark:SocialNetwork
一.分析Cassandra存储数据
Jaeger将请求的Span存储到Cassandra数据库中,如果知道数据库中同一Trace各Span之间的父子关系,就可以对微服务应用生成历史执行图。那么只需要在Jaeger采集数据的时候,记录微服务间的父子关系就ok。查找数据库中的信息,发现有如下信息:
在operation_name一列中,由对应client与server的记录,说明两者是父子关系,则可以通过operation_name对应用生成历史执行图。
二.处理Trace数据
需要记录当前微服务名称,操作名称,以及执行微服务所使用的时间。Trace_id是唯一的,可通过trace_id来进行提取。
提取信息如下(部分):
然后就是对文件的简单读写了。
def get_trace(self):
#name,dict = self.get_map()
with open('trace.csv','r') as f:
list = f.read()
f.close()
list = list.split('\n')
trace = [[]for i in range(3)]
for i in range(len(list)-1):
tmp = list[i].split(',')
for j in range(len(tmp)):
trace[j].append(tmp[j])
return trace
三.生成历史执行图
利用前面的分析,只需将对应的client与server对应起来,就可以得到微服务之间的父子关系,就是一个简单的微服务名字的映射。
def get_graph(self):
G = [[0 for i in range(12)]for i in range(12)]
trace = self.get_trace()
name,dict = self.get_map()
for i in range(len(trace[0])):
if trace[1][i] == '':
continue
tmp1 = dict[trace[0][i]]
name1 = trace[0][i]
op1 = trace[1][i]
if op1[-6:] == 'server' or op1[-7:] == 'compose':
continue
for j in range(len(trace[0])):
if trace[1][j] == '':
continue
name2 = trace[0][j]
tmp2 = dict[trace[0][j]]
op2 = trace[1][j]
if op2[-6:] == 'client' or op2[-7:] == 'compose':
continue
if name1 == name2:
continue
op11 = op1[:-6]
op22 = op2[:-6]
if op11 == op22:
G[tmp1][tmp2] = 1
#print(op1+','+op2)
print("历史执行图:")
for i in range(12):
for j in range(12):
if G[i][j] == 1:
print(name[i]+'--->'+name[j])
return G
四.查找所有关键路径
将微服务的运行时间做为边长,找到微服务应用运行过程中关键路径,对进行异常处理有巨大的帮助。此处没有选择最长路径的方法查找,而是求出了历史执行图中的所有执行路径,然后求得所有路径的长度,最后得到所有的关键路径(关键路径有可能不止一条)。
def dfs(self,pos,out,stack,vis,cnt):
vis[pos] = 1
stack[cnt] = pos
cnt += 1
if out[pos] == 0:
for i in range(cnt):
self.micro[self.top].append(stack[i])
self.top += 1
#print(str(stack[i])+'\t',end='')
#print('\n')
return
for i in range(12):
if self.G[pos][i] == 1 and vis[i] == 0:
self.dfs(i,out,stack,vis,cnt)
vis[pos] = 0
cnt -= 1
def get_CP(self):
trace = self.get_trace()
G = self.G
slack = self.slack
name,dict = self.get_map()
In = [] #入度,为0的是起点
Out = [] #出度,为0的是终点
for i in range(12):
sum = 0
s = 0
for j in range(12):
if G[j][i] == 1:
sum += 1
if G[i][j] == 1:
s += 1
In.append(sum)
Out.append(s)
start = 0
for i in range(len(In)):
if int(In[i]) == 0:
start = i
break
latency = [0 for i in range(12)]
for i in range(len(trace[0])):
tmp = dict[trace[0][i]]
latency[tmp] = max(latency[tmp],int(trace[2][i]))
stack = [0 for i in range(12)]
vis = [0 for i in range(12)]
cnt = 0
self.dfs(start,Out,stack,vis,cnt)
micro = self.micro
ans = [[]for i in range(100)]
G_la = []
print("打印所有路径:")
for i in micro:
if len(i) != 0:
for j in range(len(i)):
if j == len(i)-1:
print(name[i[j]])
else:
print(name[i[j]]+'--->',end='')
sum_la = 0
for j in range(len(i)):
sum_la += int(latency[i[j]])
G_la.append(sum_la)
MAX = -1
ans = [[]for i in range(12)]
t = 0
for i in range(len(G_la)):
if G_la[i] >= MAX:
MAX = G_la[i]
for i in range(len(G_la)):
if G_la[i] == MAX:
for j in range(len(micro[i])):
ans[t].append(micro[i][j])
t += 1
print("关键微服务:")
for i in range(t):
for j in range(len(ans[i])):
if latency[j] >= int(slack[j]):
print(name[j])
五.运行结果
六.完整代码
#!/usr/bin/env python
import numpy
import os
import sys
class Tra():
def __init__(self):
self.G = self.get_graph()
self.slack = self.get_slack()
self.micro = [[]for i in range(100)]
self.top = 0
def PrintPath(self,i,path):
list = []
list.append(i)
while path[i] != -1:
i = path[i]
list.append(i)
return list
def get_DP(self,i,dp,G_la,path):
if dp[i] > 0:
return dp[i]
for j in range(12):
if G_la[i][j] != 0:
#print(str(i)+'--->'+str(j))
tmp = G_la[i][j]+self.get_DP(j,dp,G_la,path)
if dp[i] <= tmp and path[i][0] == -1:
dp[i] = tmp
path[i][0] = j
if dp[i] == tmp and path[i][0] != -1:
path[i].append(j)
return dp[i]
def dfs(self,pos,out,stack,vis,cnt):
vis[pos] = 1
stack[cnt] = pos
cnt += 1
if out[pos] == 0:
for i in range(cnt):
self.micro[self.top].append(stack[i])
self.top += 1
#print(str(stack[i])+'\t',end='')
#print('\n')
return
for i in range(12):
if self.G[pos][i] == 1 and vis[i] == 0:
self.dfs(i,out,stack,vis,cnt)
vis[pos] = 0
cnt -= 1
def get_CP(self):
trace = self.get_trace()
G = self.G
slack = self.slack
name,dict = self.get_map()
In = [] #入度,为0的是起点
Out = [] #出度,为0的是终点
for i in range(12):
sum = 0
s = 0
for j in range(12):
if G[j][i] == 1:
sum += 1
if G[i][j] == 1:
s += 1
In.append(sum)
Out.append(s)
start = 0
for i in range(len(In)):
if int(In[i]) == 0:
start = i
break
latency = [0 for i in range(12)]
for i in range(len(trace[0])):
tmp = dict[trace[0][i]]
latency[tmp] = max(latency[tmp],int(trace[2][i]))
stack = [0 for i in range(12)]
vis = [0 for i in range(12)]
cnt = 0
self.dfs(start,Out,stack,vis,cnt)
micro = self.micro
ans = [[]for i in range(100)]
G_la = []
print("打印所有路径:")
for i in micro:
if len(i) != 0:
for j in range(len(i)):
if j == len(i)-1:
print(name[i[j]])
else:
print(name[i[j]]+'--->',end='')
sum_la = 0
for j in range(len(i)):
sum_la += int(latency[i[j]])
G_la.append(sum_la)
MAX = -1
ans = [[]for i in range(12)]
t = 0
for i in range(len(G_la)):
if G_la[i] >= MAX:
MAX = G_la[i]
for i in range(len(G_la)):
if G_la[i] == MAX:
for j in range(len(micro[i])):
ans[t].append(micro[i][j])
t += 1
print("关键微服务:")
for i in range(t):
for j in range(len(ans[i])):
if latency[j] >= int(slack[j]):
print(name[j])
def get_slack(self):
with open('slack.txt') as f:
list = f.read()
f.close()
list = list.split('\n')
slack = []
for i in range(len(list)-1):
slack.append(list[i])
return slack
def get_graph(self):
G = [[0 for i in range(12)]for i in range(12)]
trace = self.get_trace()
name,dict = self.get_map()
for i in range(len(trace[0])):
if trace[1][i] == '':
continue
tmp1 = dict[trace[0][i]]
name1 = trace[0][i]
op1 = trace[1][i]
if op1[-6:] == 'server' or op1[-7:] == 'compose':
continue
for j in range(len(trace[0])):
if trace[1][j] == '':
continue
name2 = trace[0][j]
tmp2 = dict[trace[0][j]]
op2 = trace[1][j]
if op2[-6:] == 'client' or op2[-7:] == 'compose':
continue
if name1 == name2:
continue
op11 = op1[:-6]
op22 = op2[:-6]
if op11 == op22:
G[tmp1][tmp2] = 1
#print(op1+','+op2)
print("历史执行图:")
for i in range(12):
for j in range(12):
if G[i][j] == 1:
print(name[i]+'--->'+name[j])
return G
def get_map(self):
with open('servicename.txt','r') as f:
list = f.read()
f.close()
list = list.split('\n')
dict = {}
name = []
for i in range(12):
tmp = list[i].split(',')
dict[tmp[0]] = i
name.append(tmp[0])
# print(str(i)+'---'+name[i])
return name,dict
def get_trace(self):
#name,dict = self.get_map()
with open('trace.csv','r') as f:
list = f.read()
f.close()
list = list.split('\n')
trace = [[]for i in range(3)]
for i in range(len(list)-1):
tmp = list[i].split(',')
for j in range(len(tmp)):
trace[j].append(tmp[j])
return trace
if __name__ == '__main__':
A = Tra()
A.get_CP()