需求来源

我们的服务中需要将一些日志发送至 Kafka。我们直接使用 log4j2 的 KafkaAppender 来完成这个事情。Kafka Appender 的说明在这里找

因为打印日志的地方在服务中比较关键的位置,对打日志到 Kafka 这件事的要求是一定不能阻塞上游业务线程,不能抛异常出去,性能要尽可能的好,在有问题时甚至可以丢弃一些日志。

Log4j2 异步打日志

异步打日志的文档在这里:https://logging.apache.org/log4j/2.x/manual/async.html

即有两个方法,一个是在启动时候带上 System Property:

-DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector

可以让所有的 Logger 自动变成异步的 Logger。也可以用 log4j2.contextSelector 这个配置,效果相同。启动后 Log4j 会创建一个 Disruptor 队列异步的处理日志,所有业务线程作为 Producer 写日志到队列,Log4j2 内一个 Consumer 去消费日志,写入最终目的地。Consumer 线程名如:Log4j2-TF-1-AsyncLogger[AsyncContext@55f96302]-2,比较纳闷的是我自己的服务启动后看到了两个 Consumer 线程,另一个名为 Log4j2-TF-1-AsyncLogger[AsyncContext@55f96302]-1,也就是说是同一个 AsyncContext 创建出来两个 AsyncLogger。 看了半天代码也没理解第二个 Consumer 是怎么来的,我用的 Log4j 2.12.1。从实际测试看只有一个 Consumer 在实际处理日志。无论有多少个 Logger,所有 Logger 包括 Root Logger 都会使用同一个 Disruptor 队列。

另一个方法是同步和异步 Logger 混合使用,即不配置 log4j2.contextSelector但是配置 Logger 时候用 AsyncLogger 替代 Logger,用 AsyncRoot 代替 Root。启动后 Log4j2 也是会启动一个 Disruptor 队列,所有标记 AsyncLogger 和标记 AsyncRoot 的 Logger 都会使用同一个 Disruptor 队列。可以看到名字类似于:Log4j2-TF-10-AsyncLoggerConfig-2 这样的 Disruptor Consumer 线程。这种方式下可以看到只会有一个 Disruptor Consumer 线程存在。

还可以更进一步,既配置 -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector 开启全局的异步日志,又单独在 Log4j 配置里配置 AsyncLogger。Log4j 启动后会创建两个 Disruptor 队列,上面两种 Consumer 都会存在。打到普通 Logger 的日志只会由全局 Consumer 处理。但打给 AsyncLogger 的日志会首先放入全局的 Disruptor 队列,再被全局的 Consumer 将日志塞入 AsyncLogger 对应的 Disruptor 队列,再被 AsyncLogger 对应的 Consumer 消费后实际写入日志最终目的。

一般来说使用了全局的异步日志后就不用再配置 AsyncLoggerAsyncRoot 了。我们最初使用 Kafka Appender 的时候,将这两种配置方式都用上了,希望提供更多的隔离,不光是要尽可能避免 Kafka Appender 异常时候影响业务线程,还要避免影响全局的日志 Consumer 从而影响别的日志。我们将配置了 AsyncLoggerContextSelector 也给使用 Kafka Appender 的 Logger 配置为 AsyncLogger。我们想的这样以来,即使 Kafka Appender 出现堵塞,也只堵塞 AsyncLogger 对应的 Disruptor Consumer 不会影响别的日志的 Disruptor Consumer。但实际即使在这种场景下混用两种配置用处也不大。因为上面分析过运行原理,即使配置了 AsyncLogger,它也确实会单独使用 Consumer 线程,使用单独的 Disruptor 队列,但它还是依赖于全局 Consumer 的。在我们的例子中,如果 Kafka Appender 真的堵塞了,当日志将 AsyncLogger 使用的 Disruptor 队列占满后,就会开始阻塞全局 Disruptor 队列的 Consumer,最终全局 Disruptor 也会堵满,打日志的业务线程也被阻塞住。这种使用方式其实相当于给 Kafka Appender 配置了一个更长一些的队列,但并不能真的做到隔离,也做不到无论如何都不影响业务线程的目标。我们最终也放弃了这种方式,只配置了全局的异步日志。

异步日志队列满了的时候会有个日志在 STDOUT 里:

2020-02-28 16:35:44,862 DefaultQuartzScheduler_Worker-5 WARN Async queue is full, discarding event with level INFO. This message will only appear once; future events from INFO are silently discarded until queue capacity becomes available.

异步队列相关配置

那我们该怎么做呢?我们最主要目标是 Kafka Appender 无论出什么事情都不能影响业务线程。Log4j 提供了 log4j2.asyncQueueFullPolicy 配置,用于控制在异步队列满了以后决定该怎么做。默认是直接堵塞上游线程,我们可以将其配置为 Discard,在队列堵塞后丢弃新来的日志,从而避免堵塞上游业务线程。我们在进程启动时候加上了配置 -Dlog4j2.asyncQueueFullPolicy=Discard

不过还没完,即使配置了 -Dlog4j2.asyncQueueFullPolicy=Discard,Log4j 也不是在队列满的时候任何新来的日志都丢掉的,它默认只丢弃 INFO 以下(包含)级别的日志,需要主动配置 log4j2.discardThreshold 为更高的日志级别才会丢弃更高级别的日志。这也是个不大不小的坑,要是没注意到这个问题特别坚信日志满了也不会阻塞上游进程就麻烦了。特别是我们经常会发现系统故障的时候很少只有一个表现,Kafka 本身比较稳定,如果 Kafka Appender 能阻塞大量日志搞不好系统还会遇到很多别的异常,需要去打印 WARN、ERROR 日志,如果异步日志队列堵满了,又没调整 log4j2.discardThreshold那业务线程可能阻塞在打印这些日志的地方。在我们的场景下,我们配置了 -Dlog4j2.discardThreshold=ERROR

此外,我们还能配置队列的长度。使用配置 log4j2.asyncLoggerConfigRingBufferSize,默认值在 Log4j 2.12.1254 * 1024。一般应该是够用了,而且需要注意这个队列越长,队列堵满的时候会占用更多的堆内存,带去更多的 GC。并且长队列对 Disruptor 性能也有影响,两个都没有堵塞的 Disruptor 队列,Capacity 短的那个一般性能越高。我们没有改队列长度配置。

上述配置可以参考 Log4j – Log4j 2 Lock-free Asynchronous Loggers for Low-Latency LoggingLog4j – Configuring Log4j 2.

Kafka Appender 配置

异步队列配置完了我们来配置 Kafka Appender。最初我们只配置了:

<Kafka name="Kafka" topic="my-topic" syncSend="true" ignoreExceptions="true">
    <PatternLayout pattern="%message"/>
    <Property name="bootstrap.servers">${sys:kafka.servers}</Property>
</Kafka>

其中 syncSendignoreExceptions 是默认值,我们一开始没有配置,这里只是把默认值写出来,因为下面还会提到这两个配置。kafka.servers 是个 System Property,我们通过这个配置将 Kafka 的 Broker List 传给进程,从而不用把 Broker 地址写死在 Log4j 配置中。例如进程启动的时候带上 -Dkafka.servers=queue1:9092,queue2:9092

上面 Kafka Appender 的问题在于它使用了 Kafka Producer 的默认配置,而默认配置下 Kafka Producer 有个 max.block.ms,我们用的 Kafka Client 版本为 1.1.1,官网对该参数说明为:

The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.

这个参数默认值为 60s,如果 Producer 获取不到 metadata 或没有足够内存 (buffer.memory 下面会说),Kafka Appender 发日志时最长会等待 60s 才抛异常并放弃发送。配置了全局异步日志下,这种等待会阻塞所有日志写出。我们需要改配置为:

<Kafka name="Kafka" topic="my-topic" syncSend="true" ignoreExceptions="true">
    <PatternLayout pattern="%message"/>
    <Property name="bootstrap.servers">${sys:kafka.servers}</Property>
    <Property name="max.block.ms">5000</Property>
</Kafka>

syncSend 看上去很有意思,用于控制 Kafka Appender 是否以异步方式发送日志到 Kafka Broker。如果我们将这个配置改为 false,是不是意味着不改 max.block.ms 也可以。Kafka Appender 发送日志的代码如下,来自这里

final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, null, timestamp, newKey, msg);
if (syncSend) {
    final Future<RecordMetadata> response = producer.send(newRecord);
    response.get(timeoutMillis, TimeUnit.MILLISECONDS);
} else {
    producer.send(newRecord, new Callback() {
        @Override
        public void onCompletion(final RecordMetadata metadata, final Exception e) {
            if (e != null) {
                LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e);
            }
        }
    });
}

可以看到 syncSend 控制的是是否等待 producer.send() 返回的 response。而无法控制 producer.send() 内获取 metadata 和分配 buffer 的行为,该堵塞的还是会堵塞。所以 syncSend 在这里没有用,还是要设置 max.block.ms

设置 max.block.ms 后还没结束,我们从上面 Log4j2 发送日志的代码中看到,如果我们使用 syncSend=true,线程会卡在 response.get()处,timeoutMillisLog4j 2.12.1 里默认值是 30s,可以在 Kafka 参数中带上 timeout.ms 来控制这个参数。Kafka 里也有个配置能控制请求的超时时间,可惜在我们使用的 Kafka Client 1.1.1 里不是 timeout.ms 而是 request.timeout.mstimeout.ms 在 Kafka Client 0.9.0 开始 Deprecated,0.11.0 开始正式移除了。看上去是 Log4j 还没来得及跟进。

Kafka 对 request.timeout.ms 的说明是:

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

默认也是 30s。我们也需要设置这个值。以防卡住。因为我们用的是 Kafka Client 1.1.1,所以我们只需要设置 request.timeout.ms 即可,Log4j 里的 timeoutMillis 保持 30s 也没事,Kafka Producer 会按 request.timeout.ms 控制超时,超过后会 cancel 请求,让 Log4j 内执行 response.get() 的 Consumer 线程抛异常。

需要注意的是上面描述里看到,request.timeout.ms 需要大于 Broker 上的 replica.lag.time.max.ms 这个配置,默认是 10s,否则可能因为重试而导致消息重复。对于我们的服务,我们不在乎重复,也不做重试,所以可以将 request.timeout.ms 配置到低于 replica.lag.time.max.ms 的值。

当然还有个办法,是将 syncSend 设置为 false。Log4j 会用异步方式发送日志,这种方式下不会等待每个请求返回结果。Log4j 没有帮我们做速率控制,我们得完全依赖 Kafka Producer 的机制。

Kafka Producer 控制发送速度的方法是:

  1. 通过 max.in.flight.requests.per.connection 参数来控制 inflight 请求数量,当发的速度过快超过这么多请求处于 inflight 状态,Broker 未响应时,Producer 内的 Sender 不再实际发送请求去 Broker。producer.send() 依然能调用但请求会在 Producer 内排队;
  2. buffer.memory 参数会控制 Producer 分配的用于存放 ProducerRecord 的总内存大小。在 producer.send() 调用时 Producer 会申请分配内存,在请求失败或请求完成收到 Broker 的响应后释放。如果 producer.send() 调用的太快,积压的请求占用内存大小超过 buffer.memory 就会强制 producer.send() 等待最长 max.block.ms 时间,超时后会抛异常;

buffer.memory 的默认值是 32M,官方描述如下,基于 Kafka Client 1.1.1:

The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception. This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.

这么看来syncSend 设置为 false 还是挺不错的,能实现我们的目标,还不用配置 request.timeout.ms,因为我们根本不关心 Broker 的 Response。只要我们确认不可能把 Producer 的 buffer 全占满即可。不过如果真占满了会被强制等待最长 max.block.ms 时间,也是挺长的。后面要说,因为我们要使用 Failover Appender,所以我们必须设置 syncSend 为 true,所以我们配置了 request.timeout.ms。对比 syncSend 为 false 和使用 Failover Appender 的方案,Failover Appender 的方案可能更好一些,因为一方面它可以在 Kafka Appender 异常时候还能打印一部分日志,另一方面它可以作为 Circuit Breaker 在 Kafka Appender 异常时候熔断。如果只是异步的发日志去 Kafka 没熔断的话,如果 Kafka 真宕机,Producer 无法发送数据至 Broker,业务还继续写大量日志下来,Producer 可能会持续抛异常,或者间隔的占满 buffer.memory 触发最长 max.block.ms 的阻塞等待。

Kafka Producer 还有很多有用的配置,参考:Apache Kafka。我们还配置了 retrieslinger.msbatch.size 等。

Failover Appender 配置

上面配置好了 Kafka Appender,当它出现异常时业务线程发送的写日志请求会被全部丢弃掉。Log4j 里还提供了一个好用的工具,Failover Appender。它的功能就好像 Circuit Breaker,当下游 Appender 异常时,不再将日志发送给下游 Appender,即 Circuit Breaker Open,断开期间的日志 Failover Appender 会发送给另一个 Appender,并且会按一定周期去探测下游 Appender 是否恢复,即 Circuit Breaker Half-Open。如果恢复了就将新来的日志都发去 Appender,恢复到正常状态,即 Circuit Breaker Closed。

这个东西想正确使用还不太容易,甚至 Log4j 官方文档的示例现在都是错的。他们的错误不记录了,我们配置好后类似如下:

<Kafka name="Kafka" topic="my-topic" syncSend="true" ignoreExceptions="true">
    <PatternLayout pattern="%message"/>
    <Property name="bootstrap.servers">${sys:kafka.servers}</Property>
    <Property name="max.block.ms">5000</Property>
    <Property name="request.timeout.ms">5000</Property>
</Kafka>
<Console name="Out" target="SYSTEM_OUT" ignoreExceptions="false">
      <PatternLayout pattern="%m%n"/>
</Console>
<Failover name="KafkaFailover" primary="Kafka" retryIntervalSeconds="600"> 
    <Failovers> 
        <AppenderRef ref="Out"/> 
    </Failovers> 
</Failover>
<Loggers>
    <Logger additivity="false" name="KafkaLogger" level="info" includeLocation="false">
        <AppenderRef ref="KafkaFailover"/>
    </Logger>
    .... 
</Loggers>

需要注意的点是:

  1. ignoreExceptions 一定得是 true,从而在发送失败的时候会抛异常,Failover Appender 才能知道下游 Appender 有错误,才能真的 Failover;
  2. Kafka Appender 一定要是 syncSend="true",如果是异步发送的,也不会把异常抛出去让 Failover Appender 感知到,只会把 Kafka Appender 异常的信息打入 STDOUT;
  3. 注意 Failover Appender 内 Failovers 下 AppenderRef 的 ref 参数必须是另一个 Appender 的 name。在上面例子中,当 Kafka Appender 异常时,会把日志发送到 STDOUT 去;
  4. 配置 Logger 时,AppenderRef 需要配置为 Failover Appender 的 name;

使用 Failover Appender 的时候可能还会在启动时候在 STDOUT 里看到一个 Error 日志:

ERROR appender Failover has no parameter that matches element Failovers

这个已经在 2.12.x 修复了,需要升级 Log4j2。参考:LOG4J2-1103 Failover appender does not fail over - ASF JIRA

Failover Appender 配置完了以后最好测试一下。我这里测试方法是先把 Kafka 启动,再启动我们的服务,开始使劲打日志,再关掉 Kafka,会在 STDOUT 日志里看到 Kafka Appender 的异常,之后在 Failover 的 Appender 里看到打印出没能发去 Kafka 的日志。