Skip to content

源码学习

Category: SkyWalking | 芋道源码 —— 纯源码解析博客

SkyWalking8.7源码解析

告警组件

初始化Kafka消费者

java
@Slf4j
public class KafkaFetcherProvider extends ModuleProvider {
    private KafkaFetcherHandlerRegister handlerRegister;
    private KafkaFetcherConfig config;

    @Override
    public String name() {
        return "default";
    }

    @Override
    public Class<? extends ModuleDefine> module() {
        return KafkaFetcherModule.class;
    }

    @Override
    public ConfigCreator newConfigCreator() {
        return new ConfigCreator<KafkaFetcherConfig>() {
            @Override
            public Class type() {
                return KafkaFetcherConfig.class;
            }

            @Override
            public void onInitialized(final KafkaFetcherConfig initialized) {
                config = initialized;
            }
        };
    }

    @Override
    public void prepare() throws ServiceNotProvidedException {
        handlerRegister = new KafkaFetcherHandlerRegister(config);
    }

    @Override
    public void start() throws ServiceNotProvidedException, ModuleStartException {
        //注册不同指标
        //JVM指标告警
        handlerRegister.register(new JVMMetricsHandler(getManager(), config));
        handlerRegister.register(new ServiceManagementHandler(getManager(), config));
        handlerRegister.register(new TraceSegmentHandler(getManager(), config));
        handlerRegister.register(new ProfileTaskHandler(getManager(), config));
        handlerRegister.register(new MeterServiceHandler(getManager(), config));

        if (config.isEnableNativeProtoLog()) {
            handlerRegister.register(new LogHandler(getManager(), config));
        }
        if (config.isEnableNativeJsonLog()) {
            handlerRegister.register(new JsonLogHandler(getManager(), config));
        }
        //启动kafka消费者
        handlerRegister.start();
    }

    @Override
    public void notifyAfterCompleted() throws ServiceNotProvidedException {
    }

    @Override
    public String[] requiredModules() {
        return new String[] {
            TelemetryModule.NAME,
            AnalyzerModule.NAME,
            LogAnalyzerModule.NAME,
            CoreModule.NAME
        };
    }

}

JVM指标

java
@Slf4j
public class JVMMetricsHandler extends AbstractKafkaHandler {

    private final NamingControl namingLengthControl;
    private final JVMSourceDispatcher jvmSourceDispatcher;

    public JVMMetricsHandler(ModuleManager manager, KafkaFetcherConfig config) {
        super(manager, config);
        this.jvmSourceDispatcher = new JVMSourceDispatcher(manager);
        this.namingLengthControl = manager.find(CoreModule.NAME)
                                          .provider()
                                          .getService(NamingControl.class);
    }

    @Override
    public void handle(final ConsumerRecord<String, Bytes> record) {
        try {
            //kafka获取jvm指标
            //protobuf格式
            //cpu、内存、内存池、线程、GC
            JVMMetricCollection metrics = JVMMetricCollection.parseFrom(record.value().get());

            if (log.isDebugEnabled()) {
                log.debug(
                    "Fetched JVM metrics from service[{}] instance[{}] reported.",
                    metrics.getService(),
                    metrics.getServiceInstance()
                );
            }
            JVMMetricCollection.Builder builder = metrics.toBuilder();
            builder.setService(namingLengthControl.formatServiceName(builder.getService()));
            builder.setServiceInstance(namingLengthControl.formatInstanceName(builder.getServiceInstance()));

            builder.getMetricsList().forEach(jvmMetric -> {
                try {
                    //处理指标数据
                    jvmSourceDispatcher.sendMetric(builder.getService(), builder.getServiceInstance(), jvmMetric);
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
            });
        } catch (Exception e) {
            log.error("handle record failed", e);
        }
    }

    @Override
    protected String getPlainTopic() {
        return config.getTopicNameOfMetrics();
    }
}
java
    public void sendMetric(String service, String serviceInstance, JVMMetric metrics) {
        //获取分钟级别的bucket
        long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());

        //serviceId
        final String serviceId = IDManager.ServiceID.buildId(service, true);
        final String serviceInstanceId = IDManager.ServiceInstanceID.buildId(serviceId, serviceInstance);

        //分别处理JVM指标
        //CPU、内存、内存池、GC、线程、类信息
        this.sendToCpuMetricProcess(
            service, serviceId, serviceInstance, serviceInstanceId, minuteTimeBucket, metrics.getCpu());
        this.sendToMemoryMetricProcess(
            service, serviceId, serviceInstance, serviceInstanceId, minuteTimeBucket, metrics.getMemoryList());
        this.sendToMemoryPoolMetricProcess(
            service, serviceId, serviceInstance, serviceInstanceId, minuteTimeBucket, metrics.getMemoryPoolList());
        this.sendToGCMetricProcess(
            service, serviceId, serviceInstance, serviceInstanceId, minuteTimeBucket, metrics.getGcList());
        this.sendToThreadMetricProcess(
            service, serviceId, serviceInstance, serviceInstanceId, minuteTimeBucket, metrics.getThread());
        this.sendToClassMetricProcess(
                service, serviceId, serviceInstance, serviceInstanceId, minuteTimeBucket, metrics.getClazz());
    }
java
    private void sendToCpuMetricProcess(String service,
                                        String serviceId,
                                        String serviceInstance,
                                        String serviceInstanceId,
                                        long timeBucket,
                                        CPU cpu) {
        ServiceInstanceJVMCPU serviceInstanceJVMCPU = new ServiceInstanceJVMCPU();
        serviceInstanceJVMCPU.setId(serviceInstanceId);
        serviceInstanceJVMCPU.setName(serviceInstance);
        serviceInstanceJVMCPU.setServiceId(serviceId);
        serviceInstanceJVMCPU.setServiceName(service);
        // If the cpu usage percent is less than 1, will set to 1
        double adjustedCpuUsagePercent = Math.max(cpu.getUsagePercent(), 1.0);
        serviceInstanceJVMCPU.setUsePercent(adjustedCpuUsagePercent);
        serviceInstanceJVMCPU.setTimeBucket(timeBucket);
        sourceReceiver.receive(serviceInstanceJVMCPU);
    }

聚合逻辑

L1聚合-MetricsAggregateWorker

MetricsGenerateWorker提供内存中的度量合并功能。这种聚合称为L1聚合,在接收器分析之后合并数据。属于同一实体、指标类型和时间的指标bucket,L1聚合将它们合并到一个度量对象中,以减少不必要的内存和网络有效载荷。

指标放到 channel 里面,然后由consumer去消费。(consumer在启动时候就开启消费逻辑)

消费数据经过流转之后,开始执行处理逻辑。

  • 包含聚合逻辑。
java
    /**
     * Dequeue consuming. According to {@link IConsumer#consume(List)}, this is a serial operation for every work
     * instance.
     *
     * @param metricsList from the queue.
     */
    private void onWork(List<Metrics> metricsList) {
        metricsList.forEach(metrics -> {
            aggregationCounter.inc();
            //处理数据,执行聚合逻辑
            mergeDataCache.accept(metrics);
        });

        flush();
    }

    @Override
    public void accept(final METRICS data) {
        //策略Id
        final StorageID id = data.id();
        final METRICS existed = buffer.get(id);
        if (existed == null) {
            //存放当前指标数据
            buffer.put(id, data);
        } else {
            //如果存在相同类型数据,执行聚合逻辑
            existed.combine(data);
        }
    }

不同类型指标聚合逻辑不一样。

java
    @Override
    public final boolean combine(Metrics metrics) {
        AvgFunction longAvgMetrics = (AvgFunction) metrics;
        this.summation += longAvgMetrics.summation;
        this.count += longAvgMetrics.count;
        return true;
    }
java
    @Override
    public final boolean combine(Metrics metrics) {
        tNum += ((ApdexMetrics) metrics).tNum;
        sNum += ((ApdexMetrics) metrics).sNum;
        totalNum += ((ApdexMetrics) metrics).totalNum;
        return true;
    }