org.apache.skywalking.apm.agent.core.boot.BootService是SkyWalking Agent中所有服务的顶层接口,这个接口定义了一个服务的生命周期,分为prepare、boot、onComplete、shutdown四个阶段

public interface BootService { /** * 准备阶段 * * @throws Throwable */ void prepare() throws Throwable; /** * 启动阶段 * * @throws Throwable */ void boot() throws Throwable; /** * 启动完成阶段 * * @throws Throwable */ void onComplete() throws Throwable; /** * 关闭阶段 * * @throws Throwable */ void shutdown() throws Throwable; /** * 指定服务的优先级,优先级高的服务先启动,关闭的时候后关闭 * {@code BootService}s with higher priorities will be started earlier, and shut down later than those {@code BootService}s with lower priorities. * * @return the priority of this {@code BootService}. */ default int priority() { return 0; } }


1)、GRPCChannel public class GRPCChannel { /** * origin channel */ private final ManagedChannel originChannel; /** * 附带一些额外功能的channel 两个对象指向同一个链接 */ private final Channel channelWithDecorators; private GRPCChannel(String host, int port, List channelBuilders, List decorators) throws Exception { ManagedChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port); NameResolverRegistry.getDefaultRegistry().register(new DnsNameResolverProvider()); // ChannelBuilder.build(channelBuilder) for (ChannelBuilder builder : channelBuilders) { channelBuilder = builder.build(channelBuilder); } this.originChannel = channelBuilder.build(); // 使用装饰器ChannelDecorator装饰originChannel Channel channel = originChannel; for (ChannelDecorator decorator : decorators) { channel = decorator.build(channel); } channelWithDecorators = channel; }



public interface ChannelBuilder { B build(B managedChannelBuilder) throws Exception; }



public class StandardChannelBuilder implements ChannelBuilder { // 50M private final static int MAX_INBOUND_MESSAGE_SIZE = 1024 * 1024 * 50; @Override public ManagedChannelBuilder build(ManagedChannelBuilder managedChannelBuilder) { // 设置接收数据的最大容量,明文传输 return managedChannelBuilder.maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE) .usePlaintext(); } }


public class TLSChannelBuilder implements ChannelBuilder { private static String CA_FILE_NAME = "ca" + Constants.PATH_SEPARATOR + "ca.crt"; @Override public NettyChannelBuilder build( NettyChannelBuilder managedChannelBuilder) throws AgentPackageNotFoundException, SSLException { // 在Agent目录下找到ca文件做认证,设置加密传输 File caFile = new File(AgentPackagePath.getPath(), CA_FILE_NAME); boolean isCAFileExist = caFile.exists() && caFile.isFile(); if (Config.Agent.FORCE_TLS || isCAFileExist) { SslContextBuilder builder = GrpcSslContexts.forClient(); if (isCAFileExist) { builder.trustManager(caFile); } managedChannelBuilder = managedChannelBuilder.negotiationType(NegotiationType.TLS) .sslContext(builder.build()); } return managedChannelBuilder; } }


public interface ChannelDecorator { Channel build(Channel channel); }



public class AgentIDDecorator implements ChannelDecorator { private static final ILog LOGGER = LogManager.getLogger(AgentIDDecorator.class); private static final Metadata.Key AGENT_VERSION_HEAD_HEADER_NAME = Metadata.Key.of("Agent-Version", Metadata.ASCII_STRING_MARSHALLER); private String version = "UNKNOWN"; @Override public Channel build(Channel channel) { return ClientInterceptors.intercept(channel, new ClientInterceptor() { @Override public ClientCall interceptCall(MethodDescriptor method, CallOptions options, Channel channel) { return new ForwardingClientCall.SimpleForwardingClientCall(channel.newCall(method, options)) { @Override public void start(Listener responseListener, Metadata headers) { // 向OAP发送数据的时候,请求头中带上Agent版本号 headers.put(AGENT_VERSION_HEAD_HEADER_NAME, version); super.start(responseListener, headers); } }; } }); }


public class AuthenticationDecorator implements ChannelDecorator { private static final Metadata.Key AUTH_HEAD_HEADER_NAME = Metadata.Key.of("Authentication", Metadata.ASCII_STRING_MARSHALLER); @Override public Channel build(Channel channel) { if (StringUtil.isEmpty(Config.Agent.AUTHENTICATION)) { return channel; } return ClientInterceptors.intercept(channel, new ClientInterceptor() { @Override public ClientCall interceptCall(MethodDescriptor method, CallOptions options, Channel channel) { return new ForwardingClientCall.SimpleForwardingClientCall(channel.newCall(method, options)) { @Override public void start(Listener responseListener, Metadata headers) { // 向OAP发送数据的时候,请求头中带上的Authentication(token) headers.put(AUTH_HEAD_HEADER_NAME, Config.Agent.AUTHENTICATION); super.start(responseListener, headers); } }; } }); } } 2)、GRPCChannelManager /** * Agent到OAP的网络连接 */ @DefaultImplementor public class GRPCChannelManager implements BootService, Runnable { private volatile ScheduledFuture connectCheckFuture; // 网络连接状态定时检查任务调度器 private volatile List grpcServers; // OAP地址列表 @Override public void boot() { // 检查OAP地址 if (Config.Collector.BACKEND_SERVICE.trim().length() == 0) { LOGGER.error("Collector server addresses are not set."); LOGGER.error("Agent will not uplink any data."); return; } grpcServers = Arrays.asList(Config.Collector.BACKEND_SERVICE.split(",")); connectCheckFuture = Executors.newSingleThreadScheduledExecutor( new DefaultNamedThreadFactory("GRPCChannelManager") ).scheduleAtFixedRate( // Runnable传入this new RunnableWithExceptionProtection( this, t -> LOGGER.error("unexpected exception.", t) ), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS ); }



public class DefaultNamedThreadFactory implements ThreadFactory { // 这是第几个线程工厂 private static final AtomicInteger BOOT_SERVICE_SEQ = new AtomicInteger(0); // 这是当前线程工厂创建的第几条线程 private final AtomicInteger threadSeq = new AtomicInteger(0); private final String namePrefix; public DefaultNamedThreadFactory(String name) { namePrefix = "SkywalkingAgent-" + BOOT_SERVICE_SEQ.incrementAndGet() + "-" + name + "-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, namePrefix + threadSeq.getAndIncrement()); t.setDaemon(true); return t; } }


@DefaultImplementor public class GRPCChannelManager implements BootService, Runnable { private volatile GRPCChannel managedChannel = null; // 网络连接 private volatile boolean reconnect = true; // 当前网络连接是否需要重连 private final Random random = new Random(); private final List listeners = Collections.synchronizedList(new LinkedList()); private volatile List grpcServers; // OAP地址列表 private volatile int selectedIdx = -1; // 上次选择的OAP地址下标索引 private volatile int reconnectCount = 0; // 网络重连次数 @Override public void run() { LOGGER.debug("Selected collector grpc service running, reconnect:{}.", reconnect); // 如果需要重连和刷新dns if (IS_RESOLVE_DNS_PERIODICALLY && reconnect) { // 拿到配置中的第一个后端服务地址 String backendService = Config.Collector.BACKEND_SERVICE.split(",")[0]; try { // 拆分成域名和端口的形式 String[] domainAndPort = backendService.split(":"); List newGrpcServers = Arrays // 找到域名对应的所有IP .stream(InetAddress.getAllByName(domainAndPort[0])) .map(InetAddress::getHostAddress) .map(ip -> String.format("%s:%s", ip, domainAndPort[1])) .collect(Collectors.toList()); grpcServers = newGrpcServers; } catch (Throwable t) { LOGGER.error(t, "Failed to resolve {} of backend service.", backendService); } } // 如果需要重连网络连接 if (reconnect) { if (grpcServers.size() > 0) { String server = ""; try { int index = Math.abs(random.nextInt()) % grpcServers.size(); // 如果本次选择的不是上次选择的OAP地址 if (index != selectedIdx) { selectedIdx = index; server = grpcServers.get(index); String[] ipAndPort = server.split(":"); // 关闭上次出问题的网络连接 if (managedChannel != null) { managedChannel.shutdownNow(); } // 重新构建网络连接 managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1])) .addManagedChannelBuilder(new StandardChannelBuilder()) .addManagedChannelBuilder(new TLSChannelBuilder()) .addChannelDecorator(new AgentIDDecorator()) .addChannelDecorator(new AuthenticationDecorator()) .build(); // 通知所有使用该网络连接的其他BootService当前网络状态已经连上了 notify(GRPCChannelStatus.CONNECTED); reconnectCount = 0; reconnect = false; } // 重连次数小于设置的grpc强制重连周期,尝试重连;否则尝试创建新的连接 // 判断managedChannel是否连接上 else if (managedChannel.isConnected(++reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD)) { // grpc自动重连到同一个Server // Reconnect to the same server is automatically done by GRPC, // therefore we are responsible to check the connectivity and // set the state and notify listeners reconnectCount = 0; notify(GRPCChannelStatus.CONNECTED); reconnect = false; } return; } catch (Throwable t) { LOGGER.error(t, "Create channel to {} fail.", server); } } LOGGER.debug( "Selected collector grpc service is not available. Wait {} seconds to retry", Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL ); } }








public enum GRPCChannelStatus { CONNECTED, DISCONNECT }


@DefaultImplementor public class GRPCChannelManager implements BootService, Runnable { private void notify(GRPCChannelStatus status) { for (GRPCChannelListener listener : listeners) { try { listener.statusChanged(status); } catch (Throwable t) { LOGGER.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName()); } } }


public interface GRPCChannelListener { void statusChanged(GRPCChannelStatus status); }


@DefaultImplementor public class GRPCChannelManager implements BootService, Runnable { /** * 如果使用网络连接的时候发生异常,设置重连标志,通知监听器 * If the given exception is triggered by network problem, connect in background. */ public void reportError(Throwable throwable) { // 判断是否是网络异常 if (isNetworkError(throwable)) { // 设置重连标志 reconnect = true; notify(GRPCChannelStatus.DISCONNECT); } }





/** * 1.将当前Agent Client的基本信息汇报给OAP * 2.和OAP保持心跳 */ @DefaultImplementor public class ServiceManagementClient implements BootService, Runnable, GRPCChannelListener { private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; // 当前网络连接状态 private volatile ManagementServiceGrpc.ManagementServiceBlockingStub managementServiceBlockingStub; // 网络服务 @Override public void statusChanged(GRPCChannelStatus status) { // 网络是否是已连接状态 if (GRPCChannelStatus.CONNECTED.equals(status)) { // 找到GRPCChannelManager服务,拿到网络连接 Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); // grpc的stub可以理解为在protobuf中定义的XxxService managementServiceBlockingStub = ManagementServiceGrpc.newBlockingStub(channel); } else { managementServiceBlockingStub = null; } this.status = status; }


@DefaultImplementor public class ServiceManagementClient implements BootService, Runnable, GRPCChannelListener { private static List SERVICE_INSTANCE_PROPERTIES; // Agent Client的信息 @Override public void prepare() { // 向GRPCChannelManager注册自己为监听器 ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this); SERVICE_INSTANCE_PROPERTIES = new ArrayList(); // 把配置文件中的Agent Client信息放入集合,等待发送 for (String key : Config.Agent.INSTANCE_PROPERTIES.keySet()) { SERVICE_INSTANCE_PROPERTIES.add(KeyStringValuePair.newBuilder() .setKey(key) .setValue(Config.Agent.INSTANCE_PROPERTIES.get(key)) .build()); } }

prepare阶段时,先向GRPCChannelManager注册自己为监听器,然后把配置文件中的Agent Client信息放入集合,等待发送


@DefaultImplementor public class ServiceManagementClient implements BootService, Runnable, GRPCChannelListener { private volatile ScheduledFuture heartbeatFuture; // 心跳定时任务 @Override public void boot() { heartbeatFuture = Executors.newSingleThreadScheduledExecutor( new DefaultNamedThreadFactory("ServiceManagementClient") ).scheduleAtFixedRate( new RunnableWithExceptionProtection( this, t -> LOGGER.error("unexpected exception.", t) ), 0, Config.Collector.HEARTBEAT_PERIOD, TimeUnit.SECONDS ); }


@DefaultImplementor public class ServiceManagementClient implements BootService, Runnable, GRPCChannelListener { private volatile ManagementServiceGrpc.ManagementServiceBlockingStub managementServiceBlockingStub; // 网络服务 private volatile AtomicInteger sendPropertiesCounter = new AtomicInteger(0); // Agent Client信息发送次数计数器 @Override public void run() { LOGGER.debug("ServiceManagementClient running, status:{}.", status); // 网络是否是已连接状态 if (GRPCChannelStatus.CONNECTED.equals(status)) { try { if (managementServiceBlockingStub != null) { // 心跳周期 = 30s, 信息汇报频率因子 = 10 => 每5分钟向OAP汇报一次Agent Client Properties // Round1 counter=0 0%10=0 // Round2 counter=1 1%10=1 if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) { managementServiceBlockingStub // 设置请求超时时间,默认30秒 .withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) .reportInstanceProperties(InstanceProperties.newBuilder() .setService(Config.Agent.SERVICE_NAME) .setServiceInstance(Config.Agent.INSTANCE_NAME) // 当前操作系统的信息 .addAllProperties(OSUtil.buildOSInfo( Config.OsInfo.IPV4_LIST_SIZE)) .addAllProperties(SERVICE_INSTANCE_PROPERTIES) // JVM信息 .addAllProperties(LoadedLibraryCollector.buildJVMInfo()) .build()); } else { // 服务端给到的响应交给CommandService去处理 final Commands commands = managementServiceBlockingStub.withDeadlineAfter( GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS ).keepAlive(InstancePingPkg.newBuilder() .setService(Config.Agent.SERVICE_NAME) .setServiceInstance(Config.Agent.INSTANCE_NAME) .build()); ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); } } } catch (Throwable t) { LOGGER.error(t, "ServiceManagementClient execute fail."); ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t); } } }


心跳周期为30s,信息汇报频率因子为10,所以每5分钟向OAP汇报一次Agent Client Properties


1)如果本次是信息上报,上报Agent信息,包括:服务名、实例名、Agent Client的信息、当前操作系统的信息、JVM信息



@Getter public class ServiceInstanceGenerator implements BootService { @Override public void prepare() throws Throwable { if (!isEmpty(Config.Agent.INSTANCE_NAME)) { return; } // 生成Agent实例名 Config.Agent.INSTANCE_NAME = UUID.randomUUID().toString().replaceAll("-", "") + "@" + OSUtil.getIPV4(); }


16、服务-CommandService 1)、CommandService /** * Command Scheduler命令的调度器 * 收集OAP返回的Command,然后分发给不同的处理器去处理 */ @DefaultImplementor public class CommandService implements BootService, Runnable { private ExecutorService executorService = Executors.newSingleThreadExecutor( new DefaultNamedThreadFactory("CommandService") @Override public void boot() throws Throwable { executorService.submit( new RunnableWithExceptionProtection(this, t -> LOGGER.error(t, "CommandService failed to execute commands")) ); }


@DefaultImplementor public class CommandService implements BootService, Runnable { private volatile boolean isRunning = true; // 命令的处理流程是否在运行 private LinkedBlockingQueue commands = new LinkedBlockingQueue(64); // 待处理命令列表 private CommandSerialNumberCache serialNumberCache = new CommandSerialNumberCache(); // 命令的序列号缓存 /** * 不断从命令队列(任务队列)里取出任务,交给执行器去执行 */ @Override public void run() { final CommandExecutorService commandExecutorService = ServiceManager.INSTANCE.findService(CommandExecutorService.class); while (isRunning) { try { BaseCommand command = commands.take(); // 同一个命令不要重复执行 if (isCommandExecuted(command)) { continue; } // 执行command commandExecutorService.execute(command); serialNumberCache.add(command.getSerialNumber()); } catch (InterruptedException e) { LOGGER.error(e, "Failed to take commands."); } catch (CommandExecutionException e) { LOGGER.error(e, "Failed to execute command[{}].", e.command().getCommand()); } catch (Throwable e) { LOGGER.error(e, "There is unexpected exception"); } } } private boolean isCommandExecuted(BaseCommand command) { return serialNumberCache.contain(command.getSerialNumber()); }



/** * 命令的序列号缓存,序列号被放到一个队列里面,并且做了容量控制 */ public class CommandSerialNumberCache { private static final int DEFAULT_MAX_CAPACITY = 64; private final Deque queue; private final int maxCapacity; public CommandSerialNumberCache() { this(DEFAULT_MAX_CAPACITY); } public CommandSerialNumberCache(int maxCapacity) { queue = new LinkedBlockingDeque(maxCapacity); this.maxCapacity = maxCapacity; } public void add(String number) { if (queue.size() >= maxCapacity) { queue.pollFirst(); } queue.add(number); } public boolean contain(String command) { return queue.contains(command); } }


// 服务端给到的响应交给CommandService去处理 final Commands commands = managementServiceBlockingStub.withDeadlineAfter( GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS ).keepAlive(InstancePingPkg.newBuilder() .setService(Config.Agent.SERVICE_NAME) .setServiceInstance(Config.Agent.INSTANCE_NAME) .build()); ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);


@DefaultImplementor public class CommandService implements BootService, Runnable { private LinkedBlockingQueue commands = new LinkedBlockingQueue(64); // 待处理命令列表 public void receiveCommand(Commands commands) { for (Command command : commands.getCommandsList()) { try { // 反序列化 BaseCommand baseCommand = CommandDeserializer.deserialize(command); if (isCommandExecuted(baseCommand)) { LOGGER.warn("Command[{}] is executed, ignored", baseCommand.getCommand()); continue; } // 添加到待处理命令列表 boolean success = this.commands.offer(baseCommand); if (!success && LOGGER.isWarnEnable()) { LOGGER.warn( "Command[{}, {}] cannot add to command list. because the command list is full.", baseCommand.getCommand(), baseCommand.getSerialNumber() ); } } catch (UnsupportedCommandException e) { if (LOGGER.isWarnEnable()) { LOGGER.warn("Received unsupported command[{}].", e.getCommand().getCommand()); } } } }


public class CommandDeserializer { public static BaseCommand deserialize(final Command command) { final String commandName = command.getCommand(); if (ProfileTaskCommand.NAME.equals(commandName)) { // 性能追踪 return ProfileTaskCommand.DESERIALIZER.deserialize(command); } else if (ConfigurationDiscoveryCommand.NAME.equals(commandName)) { // 配置更改 return ConfigurationDiscoveryCommand.DESERIALIZER.deserialize(command); } throw new UnsupportedCommandException(command); } }


ProfileTaskCommand:在SkyWalking UI性能剖析功能中,新建任务,会下发给Agent性能追踪任务ConfigurationDiscoveryCommand:当前版本SkyWalking Agent支持运行时动态调整配置


public class ConfigurationDiscoveryCommand extends BaseCommand implements Serializable, Deserializable { public static final String NAME = "ConfigurationDiscoveryCommand"; public static final String UUID_CONST_NAME = "UUID"; public static final String SERIAL_NUMBER_CONST_NAME = "SerialNumber"; /* * 如果配置没有变,那么OAP返回的UUID就是一样的 * If config is unchanged, then could response the same uuid, and config is not required. */ private String uuid; /* * The configuration of service. */ private List config; public ConfigurationDiscoveryCommand(String serialNumber, String uuid, List config) { super(NAME, serialNumber); this.uuid = uuid; this.config = config; } @Override public ConfigurationDiscoveryCommand deserialize(Command command) { String serialNumber = null; String uuid = null; List config = new ArrayList(); for (final KeyStringValuePair pair : command.getArgsList()) { if (SERIAL_NUMBER_CONST_NAME.equals(pair.getKey())) { serialNumber = pair.getValue(); } else if (UUID_CONST_NAME.equals(pair.getKey())) { uuid = pair.getValue(); } else { config.add(pair); } } return new ConfigurationDiscoveryCommand(serialNumber, uuid, config); } 2)、执行command @DefaultImplementor public class CommandService implements BootService, Runnable { @Override public void run() { final CommandExecutorService commandExecutorService = ServiceManager.INSTANCE.findService(CommandExecutorService.class); while (isRunning) { try { BaseCommand command = commands.take(); // 同一个命令不要重复执行 if (isCommandExecuted(command)) { continue; } // 执行command commandExecutorService.execute(command); serialNumberCache.add(command.getSerialNumber()); } catch (InterruptedException e) { LOGGER.error(e, "Failed to take commands."); } catch (CommandExecutionException e) { LOGGER.error(e, "Failed to execute command[{}].", e.command().getCommand()); } catch (Throwable e) { LOGGER.error(e, "There is unexpected exception"); } } }


@DefaultImplementor public class CommandExecutorService implements BootService, CommandExecutor { /** * key: 命令的名字 value:对应的命令执行器 */ private Map commandExecutorMap; @Override public void prepare() throws Throwable { commandExecutorMap = new HashMap(); // 性能追踪命令执行器 // Profile task executor commandExecutorMap.put(ProfileTaskCommand.NAME, new ProfileTaskCommandExecutor()); // 配置变更命令执行器 //Get ConfigurationDiscoveryCommand executor. commandExecutorMap.put(ConfigurationDiscoveryCommand.NAME, new ConfigurationDiscoveryCommandExecutor()); } @Override public void boot() throws Throwable { } @Override public void onComplete() throws Throwable { } @Override public void shutdown() throws Throwable { } @Override public void execute(final BaseCommand command) throws CommandExecutionException { executorForCommand(command).execute(command); } private CommandExecutor executorForCommand(final BaseCommand command) { // 根据command类型找到对应的命令执行器 final CommandExecutor executor = commandExecutorMap.get(command.getCommand()); if (executor != null) { return executor; } return NoopCommandExecutor.INSTANCE; } }



public class ConfigurationDiscoveryCommandExecutor implements CommandExecutor { private static final ILog LOGGER = LogManager.getLogger(ConfigurationDiscoveryCommandExecutor.class); @Override public void execute(BaseCommand command) throws CommandExecutionException { try { ConfigurationDiscoveryCommand agentDynamicConfigurationCommand = (ConfigurationDiscoveryCommand) command; ServiceManager.INSTANCE.findService(ConfigurationDiscoveryService.class) .handleConfigurationDiscoveryCommand(agentDynamicConfigurationCommand); } catch (Exception e) { LOGGER.error(e, "Handle ConfigurationDiscoveryCommand error, command:{}", command.toString()); } } }


3)、ConfigurationDiscoveryService @DefaultImplementor public class ConfigurationDiscoveryService implements BootService, GRPCChannelListener { private final Register register = new Register(); /** * Register dynamic configuration watcher. * * @param watcher dynamic configuration watcher */ public void registerAgentConfigChangeWatcher(AgentConfigChangeWatcher watcher) { WatcherHolder holder = new WatcherHolder(watcher); if (register.containsKey(holder.getKey())) { throw new IllegalStateException("Duplicate register, watcher=" + watcher); } register.put(holder.getKey(), holder); } /** * Local dynamic configuration center. */ public static class Register { private final Map register = new HashMap(); private boolean containsKey(String key) { return register.containsKey(key); } private void put(String key, WatcherHolder holder) { register.put(key, holder); } public WatcherHolder get(String name) { return register.get(name); } public Set keys() { return register.keySet(); } @Override public String toString() { ArrayList registerTableDescription = new ArrayList(register.size()); register.forEach((key, holder) -> { AgentConfigChangeWatcher watcher = holder.getWatcher(); registerTableDescription.add(new StringBuilder().append("key:") .append(key) .append("value(current):") .append(watcher.value()).toString()); }); return registerTableDescription.stream().collect(Collectors.joining(",", "[", "]")); } } @Getter private static class WatcherHolder { private final AgentConfigChangeWatcher watcher; private final String key; public WatcherHolder(AgentConfigChangeWatcher watcher) { this.watcher = watcher; this.key = watcher.getPropertyKey(); } }


AgentConfigChangeWatcher用于监听SkyWalking Agent的某项配置的值的变化,代码如下:

/** * 监听agent的某项配置的值的变化 */ @Getter public abstract class AgentConfigChangeWatcher { // Config key, should match KEY in the Table of Agent Configuration Properties. // 这个key来源于agent.config,也就是说只有agent配置文件中合法的key才能在这里被使用 private final String propertyKey; public AgentConfigChangeWatcher(String propertyKey) { this.propertyKey = propertyKey; } /** * 配置变更通知对应的watcher * Notify the watcher, the new value received. * * @param value of new. */ public abstract void notify(ConfigChangeEvent value); /** * @return current value of current config. */ public abstract String value(); /** * 配置变更事件 */ @Getter @RequiredArgsConstructor public static class ConfigChangeEvent { // 新的配置值 private final String newValue; // 事件类型 private final EventType eventType; } public enum EventType { ADD, MODIFY, DELETE }





@DefaultImplementor public class ConfigurationDiscoveryService implements BootService, GRPCChannelListener { private volatile ScheduledFuture getDynamicConfigurationFuture; // 获取动态配置器 @Override public void boot() throws Throwable { getDynamicConfigurationFuture = Executors.newSingleThreadScheduledExecutor( new DefaultNamedThreadFactory("ConfigurationDiscoveryService") ).scheduleAtFixedRate( new RunnableWithExceptionProtection( this::getAgentDynamicConfig, t -> LOGGER.error("Sync config from OAP error.", t) ), Config.Collector.GET_AGENT_DYNAMIC_CONFIG_INTERVAL, Config.Collector.GET_AGENT_DYNAMIC_CONFIG_INTERVAL, TimeUnit.SECONDS ); }


@DefaultImplementor public class ConfigurationDiscoveryService implements BootService, GRPCChannelListener { /** * UUID of the last return value. */ private String uuid; private final Register register = new Register(); private volatile int lastRegisterWatcherSize; // 上一次计算的watcher的数量 private volatile ConfigurationDiscoveryServiceGrpc.ConfigurationDiscoveryServiceBlockingStub configurationDiscoveryServiceBlockingStub; /** * 通过grpc获取Agent动态配置 * get agent dynamic config through gRPC. */ private void getAgentDynamicConfig() { LOGGER.debug("ConfigurationDiscoveryService running, status:{}.", status); if (GRPCChannelStatus.CONNECTED.equals(status)) { try { ConfigurationSyncRequest.Builder builder = ConfigurationSyncRequest.newBuilder(); builder.setService(Config.Agent.SERVICE_NAME); // 有些插件会延迟注册watcher // Some plugin will register watcher later. final int size = register.keys().size(); // 如果本次计算的watcher的数量和上一次不相同 if (lastRegisterWatcherSize != size) { // 当watcher的数量发生了变动,代表有新的配置项需要监听 // 重置uuid,避免同样的uuid导致配置信息没有被更新 // reset uuid, avoid the same uuid causing the configuration not to be updated. uuid = null; lastRegisterWatcherSize = size; } if (null != uuid) { builder.setUuid(uuid); } if (configurationDiscoveryServiceBlockingStub != null) { // 拉取配置 反序列化后为ConfigurationDiscoveryCommand类型 final Commands commands = configurationDiscoveryServiceBlockingStub.withDeadlineAfter( GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS ).fetchConfigurations(builder.build()); ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); } } catch (Throwable t) { LOGGER.error(t, "ConfigurationDiscoveryService execute fail."); ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t); } } }




@DefaultImplementor public class ConfigurationDiscoveryService implements BootService, GRPCChannelListener { /** * Process ConfigurationDiscoveryCommand and notify each configuration watcher. * * @param configurationDiscoveryCommand Describe dynamic configuration information */ public void handleConfigurationDiscoveryCommand(ConfigurationDiscoveryCommand configurationDiscoveryCommand) { final String responseUuid = configurationDiscoveryCommand.getUuid(); // uuid相同说明配置没有变化 if (responseUuid != null && Objects.equals(this.uuid, responseUuid)) { return; } // configurationDiscoveryCommand可能返回了10个配置变更 // 但是register里面只有5个配置项的监听器 // 这里就要过滤出有监听器的配置项 List config = readConfig(configurationDiscoveryCommand); config.forEach(property -> { String propertyKey = property.getKey(); WatcherHolder holder = register.get(propertyKey); if (holder != null) { AgentConfigChangeWatcher watcher = holder.getWatcher(); String newPropertyValue = property.getValue(); if (StringUtil.isBlank(newPropertyValue)) { if (watcher.value() != null) { // 通知watcher,删除该配置 // Notify watcher, the new value is null with delete event type. watcher.notify( new AgentConfigChangeWatcher.ConfigChangeEvent( null, AgentConfigChangeWatcher.EventType.DELETE )); } else { // Don't need to notify, stay in null. } } else { if (!newPropertyValue.equals(watcher.value())) { // 通知watcher,配置变更 watcher.notify(new AgentConfigChangeWatcher.ConfigChangeEvent( newPropertyValue, AgentConfigChangeWatcher.EventType.MODIFY )); } else { // Don't need to notify, stay in the same config value. } } } else { LOGGER.warn("Config {} from OAP, doesn't match any watcher, ignore.", propertyKey); } }); // 更新uuid this.uuid = responseUuid; LOGGER.trace("Current configurations after the sync, configurations:{}", register.toString()); }






SamplingService是来控制是否要采样该链路。每条链路都是被追踪到的,但是考虑到序列化/反序列化的CPU消耗以及网络带宽,如果开启采样,SkyWalking Agent并不会把所有的链路都发送给OAP。默认采样是开启的,可以通过修改agent.config中的agent.sample_n_per_3_secs配置项控制每3秒最多采样多少条链路

/** * SamplingService是来控制是否要采样该链路 * 每条链路都是被追踪到的,但是考虑到序列化/反序列化的CPU消耗以及网络带宽,如果开启采样,agent并不会把所有的链路都发送给OAP * The SamplingService take charge of how to sample the {@link TraceSegment}. Every {@link TraceSegment}s * have been traced, but, considering CPU cost of serialization/deserialization, and network bandwidth, the agent do NOT * send all of them to collector, if SAMPLING is on. *

* 默认采样是开启的 * By default, SAMPLING is on, and {@link Config.Agent#SAMPLE_N_PER_3_SECS } * SAMPLE_N_PER_3_SECS:每3秒最多采样多少条链路,如果配置的是负数或者0就表示采样机制关闭(即所有的链路都会被采集并发送) */ @DefaultImplementor public class SamplingService implements BootService { private SamplingRateWatcher samplingRateWatcher; @Override public void boot() { samplingRateWatcher = new SamplingRateWatcher("agent.sample_n_per_3_secs", this); // 注册配置变更监听器 ServiceManager.INSTANCE.findService(ConfigurationDiscoveryService.class) .registerAgentConfigChangeWatcher(samplingRateWatcher); handleSamplingRateChanged(); }



public class SamplingRateWatcher extends AgentConfigChangeWatcher { private static final ILog LOGGER = LogManager.getLogger(SamplingRateWatcher.class); private final AtomicInteger samplingRate; // 每3秒能够采集的最大链路数 private final SamplingService samplingService; public SamplingRateWatcher(final String propertyKey, SamplingService samplingService) { super(propertyKey); this.samplingRate = new AtomicInteger(getDefaultValue()); this.samplingService = samplingService; } private void activeSetting(String config) { if (LOGGER.isDebugEnable()) { LOGGER.debug("Updating using new static config: {}", config); } try { this.samplingRate.set(Integer.parseInt(config)); /* * 通知samplingService每3秒能够采集的最大链路数的配置变更 * We need to notify samplingService the samplingRate changed. */ samplingService.handleSamplingRateChanged(); } catch (NumberFormatException ex) { LOGGER.error(ex, "Cannot load {} from: {}", getPropertyKey(), config); } } @Override public void notify(final ConfigChangeEvent value) { if (EventType.DELETE.equals(value.getEventType())) { // 如果删除动态配置,使用agent.config中的配置 activeSetting(String.valueOf(getDefaultValue())); } else { activeSetting(value.getNewValue()); } } private int getDefaultValue() { return Config.Agent.SAMPLE_N_PER_3_SECS; }


@DefaultImplementor public class SamplingService implements BootService { private static final ILog LOGGER = LogManager.getLogger(SamplingService.class); private volatile boolean on = false; private volatile AtomicInteger samplingFactorHolder; // 用于累加3秒内已经采样的次数 private volatile ScheduledFuture scheduledFuture; // 每3秒重置一次samplingFactorHolder /** * Handle the samplingRate changed. */ public void handleSamplingRateChanged() { if (samplingRateWatcher.getSamplingRate() > 0) { if (!on) { on = true; this.resetSamplingFactor(); ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor( new DefaultNamedThreadFactory("SamplingService")); scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection( this::resetSamplingFactor, t -> LOGGER.error("unexpected exception.", t)), 0, 3, TimeUnit.SECONDS); LOGGER.debug( "Agent sampling mechanism started. Sample {} traces in 3 seconds.", samplingRateWatcher.getSamplingRate() ); } } else { if (on) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } on = false; } } } private void resetSamplingFactor() { samplingFactorHolder = new AtomicInteger(0); }



@DefaultImplementor public class SamplingService implements BootService { private volatile AtomicInteger samplingFactorHolder; // 用于累加3秒内已经采样的次数 /** * 如果采样机制没有开启,即on=false,那么就表示每一条采集到的链路都会上报给OAP * @param operationName The first operation name of the new tracing context. * @return true, if sampling mechanism is on, and getDefault the sampling factor successfully. */ public boolean trySampling(String operationName) { if (on) { int factor = samplingFactorHolder.get(); // 3秒内已经采样的次数 return false; } } return true; }


@DefaultImplementor public class SamplingService implements BootService { /** * Increase the sampling factor by force, to avoid sampling too many traces. If many distributed traces require * sampled, the trace beginning at local, has less chance to be sampled. */ public void forceSampled() { if (on) { samplingFactorHolder.incrementAndGet(); } }




/** * JVMService是一个定时器,来收集jvm的cpu、内存、内存池、gc、线程和类的信息, * 然后将收集到的信息通过GRPCChannelManager提供的网络连接发送给OAP * The JVMService represents a timer, which collectors JVM cpu, memory, memorypool, gc, thread and class info, * and send the collected info to Collector through the channel provided by {@link GRPCChannelManager} */ @DefaultImplementor public class JVMService implements BootService, Runnable { private static final ILog LOGGER = LogManager.getLogger(JVMService.class); private volatile ScheduledFuture collectMetricFuture; // 收集JVM信息的定时任务 private volatile ScheduledFuture sendMetricFuture; // 发送JVM信息的定时任务 private JVMMetricsSender sender; // JVM信息的发送工具 @Override public void prepare() throws Throwable { sender = ServiceManager.INSTANCE.findService(JVMMetricsSender.class); } @Override public void boot() throws Throwable { collectMetricFuture = Executors.newSingleThreadScheduledExecutor( new DefaultNamedThreadFactory("JVMService-produce")) .scheduleAtFixedRate(new RunnableWithExceptionProtection( this, new RunnableWithExceptionProtection.CallbackWhenException() { @Override public void handle(Throwable t) { LOGGER.error("JVMService produces metrics failure.", t); } } ), 0, 1, TimeUnit.SECONDS); sendMetricFuture = Executors.newSingleThreadScheduledExecutor( new DefaultNamedThreadFactory("JVMService-consume")) .scheduleAtFixedRate(new RunnableWithExceptionProtection( sender, new RunnableWithExceptionProtection.CallbackWhenException() { @Override public void handle(Throwable t) { LOGGER.error("JVMService consumes and upload failure.", t); } } ), 0, 1, TimeUnit.SECONDS); }



@DefaultImplementor public class JVMService implements BootService, Runnable { private JVMMetricsSender sender; // JVM信息的发送工具 /** * 构建jvm信息交给sender,由sender负责发送给OAP */ @Override public void run() { long currentTimeMillis = System.currentTimeMillis(); try { JVMMetric.Builder jvmBuilder = JVMMetric.newBuilder(); jvmBuilder.setTime(currentTimeMillis); jvmBuilder.setCpu(CPUProvider.INSTANCE.getCpuMetric()); jvmBuilder.addAllMemory(MemoryProvider.INSTANCE.getMemoryMetricList()); jvmBuilder.addAllMemoryPool(MemoryPoolProvider.INSTANCE.getMemoryPoolMetricsList()); jvmBuilder.addAllGc(GCProvider.INSTANCE.getGCList()); jvmBuilder.setThread(ThreadProvider.INSTANCE.getThreadMetrics()); jvmBuilder.setClazz(ClassProvider.INSTANCE.getClassMetrics()); sender.offer(jvmBuilder.build()); } catch (Exception e) { LOGGER.error(e, "Collect JVM info fail."); } }


@DefaultImplementor public class JVMMetricsSender implements BootService, Runnable, GRPCChannelListener { private static final ILog LOGGER = LogManager.getLogger(JVMMetricsSender.class); private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; private volatile JVMMetricReportServiceGrpc.JVMMetricReportServiceBlockingStub stub = null; private LinkedBlockingQueue queue; @Override public void prepare() { // 初始化queue,默认最多存储600个数据 queue = new LinkedBlockingQueue(Config.Jvm.BUFFER_SIZE); ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this); } @Override public void boot() { } public void offer(JVMMetric metric) { // drop last message and re-deliver if (!queue.offer(metric)) { queue.poll(); queue.offer(metric); } } @Override public void run() { if (status == GRPCChannelStatus.CONNECTED) { try { JVMMetricCollection.Builder builder = JVMMetricCollection.newBuilder(); LinkedList buffer = new LinkedList(); // 将queue中的数据移到buffer中 queue.drainTo(buffer); if (buffer.size() > 0) { builder.addAllMetrics(buffer); builder.setService(Config.Agent.SERVICE_NAME); builder.setServiceInstance(Config.Agent.INSTANCE_NAME); // 发送到OAP,OAP返回的Command交给CommandService去处理 Commands commands = stub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) .collect(builder.build()); ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); } } catch (Throwable t) { LOGGER.error(t, "send JVM metrics to Collector fail."); ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t); } } }




SkyWalking Agent上报数据有两种模式:



KafkaXxxService就是对应服务的Kafka实现,例如:GRPCChannelManager是负责Agent到OAP的网络连接,对应KafkaProducerManager是负责Agent到OAP的Kafka的连接;KafkaJVMMetricsSender负责JVM信息的发送对应GRPC的JVMMetricsSender;KafkaServiceManagementServiceClient负责Agent Client信息的上报对应GRPC的ServiceManagementClient




/** * The StatusCheckService determines whether the span should be tagged in error status if an exception * captured in the scope. * 用来判断哪些异常不算异常 */ @DefaultImplementor public class StatusCheckService implements BootService { @Getter private String[] ignoredExceptionNames; private StatusChecker statusChecker; @Override public void prepare() throws Throwable { // 一条链路如果某个环节出现了异常,默认情况会把异常信息发送给OAP,在SkyWalking UI中看到链路中那个地方出现了异常,方便排查问题 // 但是在一些场景中,异常是用来控制业务流程的,而不应该把它当做错误 // 这个配置就是需要忽略的异常 ignoredExceptionNames = Arrays.stream(Config.StatusCheck.IGNORED_EXCEPTIONS.split(",")) .filter(StringUtil::isNotEmpty) .toArray(String[]::new); // 检查异常时的最大递归程度 // AException // BException // CException // 如果IGNORED_EXCEPTIONS配置的是AException,此时抛出的是CException需要递归找一下是否属于AException的子类 statusChecker = Config.StatusCheck.MAX_RECURSIVE_DEPTH > 0 ? HIERARCHY_MATCH : OFF; }



