源码学习
Category: SkyWalking | 芋道源码 —— 纯源码解析博客
告警组件
初始化Kafka消费者
java
@Slf4j
public class KafkaFetcherProvider extends ModuleProvider {
private KafkaFetcherHandlerRegister handlerRegister;
private KafkaFetcherConfig config;
@Override
public String name() {
return "default";
}
@Override
public Class<? extends ModuleDefine> module() {
return KafkaFetcherModule.class;
}
@Override
public ConfigCreator newConfigCreator() {
return new ConfigCreator<KafkaFetcherConfig>() {
@Override
public Class type() {
return KafkaFetcherConfig.class;
}
@Override
public void onInitialized(final KafkaFetcherConfig initialized) {
config = initialized;
}
};
}
@Override
public void prepare() throws ServiceNotProvidedException {
handlerRegister = new KafkaFetcherHandlerRegister(config);
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
//注册不同指标
//JVM指标告警
handlerRegister.register(new JVMMetricsHandler(getManager(), config));
handlerRegister.register(new ServiceManagementHandler(getManager(), config));
handlerRegister.register(new TraceSegmentHandler(getManager(), config));
handlerRegister.register(new ProfileTaskHandler(getManager(), config));
handlerRegister.register(new MeterServiceHandler(getManager(), config));
if (config.isEnableNativeProtoLog()) {
handlerRegister.register(new LogHandler(getManager(), config));
}
if (config.isEnableNativeJsonLog()) {
handlerRegister.register(new JsonLogHandler(getManager(), config));
}
//启动kafka消费者
handlerRegister.start();
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override
public String[] requiredModules() {
return new String[] {
TelemetryModule.NAME,
AnalyzerModule.NAME,
LogAnalyzerModule.NAME,
CoreModule.NAME
};
}
}
JVM指标
java
@Slf4j
public class JVMMetricsHandler extends AbstractKafkaHandler {
private final NamingControl namingLengthControl;
private final JVMSourceDispatcher jvmSourceDispatcher;
public JVMMetricsHandler(ModuleManager manager, KafkaFetcherConfig config) {
super(manager, config);
this.jvmSourceDispatcher = new JVMSourceDispatcher(manager);
this.namingLengthControl = manager.find(CoreModule.NAME)
.provider()
.getService(NamingControl.class);
}
@Override
public void handle(final ConsumerRecord<String, Bytes> record) {
try {
//kafka获取jvm指标
//protobuf格式
//cpu、内存、内存池、线程、GC
JVMMetricCollection metrics = JVMMetricCollection.parseFrom(record.value().get());
if (log.isDebugEnabled()) {
log.debug(
"Fetched JVM metrics from service[{}] instance[{}] reported.",
metrics.getService(),
metrics.getServiceInstance()
);
}
JVMMetricCollection.Builder builder = metrics.toBuilder();
builder.setService(namingLengthControl.formatServiceName(builder.getService()));
builder.setServiceInstance(namingLengthControl.formatInstanceName(builder.getServiceInstance()));
builder.getMetricsList().forEach(jvmMetric -> {
try {
//处理指标数据
jvmSourceDispatcher.sendMetric(builder.getService(), builder.getServiceInstance(), jvmMetric);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
});
} catch (Exception e) {
log.error("handle record failed", e);
}
}
@Override
protected String getPlainTopic() {
return config.getTopicNameOfMetrics();
}
}
java
public void sendMetric(String service, String serviceInstance, JVMMetric metrics) {
//获取分钟级别的bucket
long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());
//serviceId
final String serviceId = IDManager.ServiceID.buildId(service, true);
final String serviceInstanceId = IDManager.ServiceInstanceID.buildId(serviceId, serviceInstance);
//分别处理JVM指标
//CPU、内存、内存池、GC、线程、类信息
this.sendToCpuMetricProcess(
service, serviceId, serviceInstance, serviceInstanceId, minuteTimeBucket, metrics.getCpu());
this.sendToMemoryMetricProcess(
service, serviceId, serviceInstance, serviceInstanceId, minuteTimeBucket, metrics.getMemoryList());
this.sendToMemoryPoolMetricProcess(
service, serviceId, serviceInstance, serviceInstanceId, minuteTimeBucket, metrics.getMemoryPoolList());
this.sendToGCMetricProcess(
service, serviceId, serviceInstance, serviceInstanceId, minuteTimeBucket, metrics.getGcList());
this.sendToThreadMetricProcess(
service, serviceId, serviceInstance, serviceInstanceId, minuteTimeBucket, metrics.getThread());
this.sendToClassMetricProcess(
service, serviceId, serviceInstance, serviceInstanceId, minuteTimeBucket, metrics.getClazz());
}
java
private void sendToCpuMetricProcess(String service,
String serviceId,
String serviceInstance,
String serviceInstanceId,
long timeBucket,
CPU cpu) {
ServiceInstanceJVMCPU serviceInstanceJVMCPU = new ServiceInstanceJVMCPU();
serviceInstanceJVMCPU.setId(serviceInstanceId);
serviceInstanceJVMCPU.setName(serviceInstance);
serviceInstanceJVMCPU.setServiceId(serviceId);
serviceInstanceJVMCPU.setServiceName(service);
// If the cpu usage percent is less than 1, will set to 1
double adjustedCpuUsagePercent = Math.max(cpu.getUsagePercent(), 1.0);
serviceInstanceJVMCPU.setUsePercent(adjustedCpuUsagePercent);
serviceInstanceJVMCPU.setTimeBucket(timeBucket);
sourceReceiver.receive(serviceInstanceJVMCPU);
}
聚合逻辑
L1聚合-MetricsAggregateWorker
MetricsGenerateWorker提供内存中的度量合并功能。这种聚合称为L1聚合,在接收器分析之后合并数据。属于同一实体、指标类型和时间的指标bucket,L1聚合将它们合并到一个度量对象中,以减少不必要的内存和网络有效载荷。
指标放到 channel 里面,然后由consumer去消费。(consumer在启动时候就开启消费逻辑)
消费数据经过流转之后,开始执行处理逻辑。
- 包含聚合逻辑。
java
/**
* Dequeue consuming. According to {@link IConsumer#consume(List)}, this is a serial operation for every work
* instance.
*
* @param metricsList from the queue.
*/
private void onWork(List<Metrics> metricsList) {
metricsList.forEach(metrics -> {
aggregationCounter.inc();
//处理数据,执行聚合逻辑
mergeDataCache.accept(metrics);
});
flush();
}
@Override
public void accept(final METRICS data) {
//策略Id
final StorageID id = data.id();
final METRICS existed = buffer.get(id);
if (existed == null) {
//存放当前指标数据
buffer.put(id, data);
} else {
//如果存在相同类型数据,执行聚合逻辑
existed.combine(data);
}
}
不同类型指标聚合逻辑不一样。
java
@Override
public final boolean combine(Metrics metrics) {
AvgFunction longAvgMetrics = (AvgFunction) metrics;
this.summation += longAvgMetrics.summation;
this.count += longAvgMetrics.count;
return true;
}
java
@Override
public final boolean combine(Metrics metrics) {
tNum += ((ApdexMetrics) metrics).tNum;
sNum += ((ApdexMetrics) metrics).sNum;
totalNum += ((ApdexMetrics) metrics).totalNum;
return true;
}