浅谈分布式项目日志监控

浅谈分布式项目日志监控

目前公司项目采用dubbo服务化升级之后,原先大而全的几个主要应用,拆散重构成多个分布式服务。
这个公司业务架构和系统架构实现一次升级,并发和业务开发效率得到提升。
但是事情是两面的,引入dubbo服务化之后,导致业务链路过长,日志分散。不能在使用原来的日志处理方式了。
分布式情况下,每个日志分散到各自服务所在机器,
日志的收集和分析使用原来古老的模式,肯定是过时了,集群和服务规模小还好,数量一大,我想不管是运维人员还是开发人员都会头疼。
目前处理这个需求最为火热的中间套件,自然首选是ELK,ELK是java技术栈的。
也符合目前公司需求。ELK的安装就不讲述了,感兴趣的可以查看官网或者自行百度,资料还是挺多的。
确定了日志收集和分析的中间件,剩下一个就是日志埋点和怎么把日志串起来了。
以前单个应用的时代,系统级别的日志可以通过aop解决。在分布式情况下对每一个独立服务而言,自身的日志系统还是通过aop解决,唯一需要的就是怎么把分散到各自不同应用的日志串起来。
这个有个高大上的说法叫做业务链监控。
目前国内开源的产品有大众点评的cat,是整套业务链监控解决方案。
对于我公司目前来说太重了,我这边日志已经有elk,就没必要在额外引入cat。那如何自己实现呢。
既然是链路,那自然有入口有出口。我们需要做的就是在入口出生成一个全局唯一的traceId,然后把这个traceId按照业务链路传递到各个服务中去。
traceId就是一根线,把各个服务的日志串起来。注意一点,服务的时间要同步,因为是根据来时间排序的。
traceId的生成,简单方案可以采用uuid,其次推荐使用twiiter的snowflake算法。
traceId的传递,需要根据rpc框架来实现了。dubbo框架采用dubbo的fiter来实现,参考代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

// 调用过程拦截
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
//异步获取serviceId,没获取到不进行采样
String serviceId = tracer.getServiceId(RpcContext.getContext().getUrl().getServiceInterface());
if (serviceId == null) {
Tracer.startTraceWork();
return invoker.invoke(invocation);
}
long start = System.currentTimeMillis();
RpcContext context = RpcContext.getContext();
boolean isConsumerSide = context.isConsumerSide();
Span span = null;
Endpoint endpoint = null;
try {
endpoint = tracer.newEndPoint();
endpoint.setServiceName(serviceId);
endpoint.setIp(context.getLocalAddressString());
endpoint.setPort(context.getLocalPort());
if (context.isConsumerSide()) { //是否是消费者
Span span1 = tracer.getParentSpan();
if (span1 == null) { //为rootSpan
span = tracer.newSpan(context.getMethodName(), endpoint, serviceId);//生成root Span
} else {
span = tracer.genSpan(span1.getTraceId(), span1.getId(), tracer.genSpanId(), context.getMethodName(), span1.isSample(), null);
}
} else if (context.isProviderSide()) {
Long traceId, parentId, spanId;
traceId = TracerUtils.getAttachmentLong(invocation.getAttachment(TracerUtils.TID));
parentId = TracerUtils.getAttachmentLong(invocation.getAttachment(TracerUtils.PID));
spanId = TracerUtils.getAttachmentLong(invocation.getAttachment(TracerUtils.SID));
boolean isSample = (traceId != null);
span = tracer.genSpan(traceId, parentId, spanId, context.getMethodName(), isSample, serviceId);
}
invokerBefore(invocation, span, endpoint, start);//记录annotation
RpcInvocation invocation1 = (RpcInvocation) invocation;
setAttachment(span, invocation1);//设置需要向下游传递的参数
Result result = invoker.invoke(invocation);
if (result.getException() != null){
catchException(result.getException(), endpoint);
}
return result;
}catch (RpcException e) {
if (e.getCause() != null && e.getCause() instanceof TimeoutException){
catchTimeoutException(e, endpoint);
}else {
catchException(e, endpoint);
}
throw e;
}finally {
if (span != null) {
long end = System.currentTimeMillis();
invokerAfter(invocation, endpoint, span, end, isConsumerSide);//调用后记录annotation
}
}
}

dubbo通过invocation.setAttachmen来在消费者和调用者之间传递traceId。
如果是http接口调用实现的rpc建议采用在request的head里面传递traceId。
在本地服务里面通过threadlocal变量来传递traceId。
如果想打印sql语句,通过orm框架的拦截器机制实现,以下是mybatis的参考代码

schema.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88

@Intercepts({ @Signature(type = Executor.class, method = "update", args = { MappedStatement.class, Object.class }),
@Signature(type = Executor.class, method = "query", args = { MappedStatement.class, Object.class,
RowBounds.class, ResultHandler.class }) })
public class MidaiLogMybatisPlugn implements Interceptor {
@Override
public Object intercept(Invocation invocation) throws Throwable {
Object result = null;
//从当前线程获取trace
MidaiLogTrace trace = MidaiLogTraceService.getMidaiLogTrace();
if(trace !=null){
Object[] arguments = invocation.getArgs();
MidaiLogTraceService.traceSqlLog(trace.getTraceId(), getSqlStatement(arguments));
}
try {
result = invocation.proceed();
} catch (Exception e) {
throw e;
}
return result;
}
@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this); // mybatis提供的包装工具类
}
@Override
public void setProperties(Properties properties) {
}
private String getSqlStatement(Object[] arguments) {
MappedStatement mappedStatement = (MappedStatement) arguments[0];
Object parameter = null;
if (arguments.length > 1) {
parameter = arguments[1];
}
String sqlId = mappedStatement.getId();
BoundSql boundSql = mappedStatement.getBoundSql(parameter);
Configuration configuration = mappedStatement.getConfiguration();
String sql = showSql(configuration, boundSql);
StringBuilder str = new StringBuilder(100);
str.append(sqlId);
str.append(":");
str.append(sql);
str.append(":");
return str.toString();
}
public String showSql(Configuration configuration, BoundSql boundSql) {
Object parameterObject = boundSql.getParameterObject();
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
String sql = boundSql.getSql().replaceAll("[\\s]+", " ");
if (parameterMappings.size() > 0 && parameterObject != null) {
TypeHandlerRegistry typeHandlerRegistry = configuration.getTypeHandlerRegistry();
if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
sql = sql.replaceFirst("\\?", getParameterValue(parameterObject));
} else {
MetaObject metaObject = configuration.newMetaObject(parameterObject);
for (ParameterMapping parameterMapping : parameterMappings) {
String propertyName = parameterMapping.getProperty();
if (metaObject.hasGetter(propertyName)) {
Object obj = metaObject.getValue(propertyName);
sql = sql.replaceFirst("\\?", getParameterValue(obj));
} else if (boundSql.hasAdditionalParameter(propertyName)) {
Object obj = boundSql.getAdditionalParameter(propertyName);
sql = sql.replaceFirst("\\?", getParameterValue(obj));
}
}
}
}
return sql;
}
private static String getParameterValue(Object obj) {
String value = null;
if (obj instanceof String) {
value = "‘" + obj.toString() + "‘";
} else if (obj instanceof Date) {
DateFormat formatter = DateFormat.getDateTimeInstance(DateFormat.DEFAULT, DateFormat.DEFAULT, Locale.CHINA);
value = "‘" + formatter.format(new Date()) + "‘";
} else {
if (obj != null) {
value = obj.toString();
} else {
value = "";
}

}
return value;
}
}

当系统并发达到一定数量级,log4j日志打印本身会成为瓶颈,这个时候需要mq来解耦了,
不在打印日志,而是发送mq消息,由mq消费端处理。因为目前公司项目并发数量还不足以导致该问题,因此尚未采用。
elk收集日志之后,通过kibana可以提供搜索。

剩下最后的工作量就是提供一个web界面来更好的分析和展示数据。