背景

Flink支持多种部署模式,其中on Yarn模式是目前最成熟也是生产环境使用最多的部署方式。但在Flink on Yarn模式下,Flink任务会在不同的NodeManager节点上执行,所以Flink的执行日志会分散在不同机器上。我们通过FlinkWeb页面和YarnHistory页面是可以看到日志的,但任务长时间执行后,日志量十分庞大,会造成页面卡顿甚至于崩溃,且无法按照时间对日志进行检索,定位问题不是十分方便。

目前比较成熟的日志采集方案有Elastic系列的ELK架构,也是我们公司目前使用的日志采集、存储以及检索的方案,现在我们要将Flink日志接入ELK。

Flink引擎使用Log4j作为日志组件,Log4j中有KafkaAppender组件,可以直接将日志输出到消息队列Kafka,我们只需使用Logstash消费kafka,将日志写入ES即可。

Log4j KafkaAppender

https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender

The KafkaAppender logs events to an Apache Kafka topic. Each log event is sent as a Kafka record.

Parameter NameTypeDescription
topicStringThe Kafka topic to use. Required.
keyStringThe key that will be sent to Kafka with every message. Optional value defaulting to null. Any of the Lookups) can be included.
filterFilterA Filter to determine if the event should be handled by this Appender. More than one Filter may be used by using a CompositeFilter.
layoutLayoutThe Layout to use to format the LogEvent. Required, there is no default. New since 2.9, in previous versions was default.
nameStringThe name of the Appender. Required.
ignoreExceptionsbooleanThe default is true, causing exceptions encountered while appending events to be internally logged and then ignored. When set to false exceptions will be propagated to the caller, instead. You must set this to false when wrapping this Appender in a FailoverAppender.
syncSendbooleanThe default is true, causing sends to block until the record has been acknowledged by the Kafka server. When set to false sends return immediately, allowing for lower latency and significantly higher throughput. New since 2.8. Be aware that this is a new addition, and it has not been extensively tested. Any failure sending to Kafka will be reported as error to StatusLogger and the log event will be dropped (the ignoreExceptions parameter will not be effective). Log events may arrive out of order to the Kafka server.
propertiesProperty[]You can set properties in Kafka producer properties. You need to set the bootstrap.servers property, there are sensible default values for the others. Do not set the value.serializer nor key.serializer properties.

Flink配置

flink-conf.yaml中添加:

env.java.opts.taskmanager: -DyarnContainerId=$CONTAINER_ID
env.java.opts.jobmanager: -DyarnContainerId=$CONTAINER_ID

用于获取yarn的containerId。

Log4j 配置

此处我的Flink版本是1.13,Flink >= 1.11版本使用的是log4j2,之前版本使用的是log4j,二者配置不同,可参考官网修改。

log4j.properties中添加:

# kafka appender config
rootLogger.appenderRef.kafka.ref = Kafka
appender.kafka.type=Kafka
appender.kafka.name=Kafka
appender.kafka.syncSend=true
appender.kafka.ignoreExceptions=false
appender.kafka.topic=flink-log
appender.kafka.property.type=Property
appender.kafka.property.name=bootstrap.servers
appender.kafka.property.value=kafka01:9092,kafka02:9092,kafka03:9092
appender.kafka.layout.type=JSONLayout
apender.kafka.layout.value=net.logstash.log4j.JSONEventLayoutV1
appender.kafka.layout.compact=true
appender.kafka.layout.complete=false
# 添加自定义字段
appender.kafka.layout.additionalField1.type=KeyValuePair
appender.kafka.layout.additionalField1.key=flinkJobName
appender.kafka.layout.additionalField1.value=${sys:flinkJobName}
appender.kafka.layout.additionalField2.type=KeyValuePair
appender.kafka.layout.additionalField2.key=yarnContainerId
appender.kafka.layout.additionalField2.value=${sys:yarnContainerId}

kafka的配置根据自己实际配置修改即可。

自定义字段这里我添加了flinkJobName与yarnContainerId,用于获取Flink任务名称与Yarn容器Id。

Flink启动任务

./bin/flink run-application \
-t yarn-application \
-Dyarn.application.name=test-service \
-Dmetrics.reporter.promgateway.jobName=test-service \
-Denv.java.opts="-DflinkJobName=test-service" \
/root/fileUpload/flink-test/test-service.jar 

需要注意-Denv.java.opts="-DflinkJobName=test-service",通过设置flinkJobName将flink作业名传入,否则log4j配置的appender.kafka.layout.additionalField1.value=${sys:flinkJobName}会取不到值。

消费topic测试

任务提交成功后,尝试消费一下topic,得到数据:

{
    "instant": {
        "epochSecond": 1640758870,
        "nanoOfSecond": 933000000
    },
    "thread": "Legacy Source Thread - Source: Custom Source -> Map -> Filter -> Timestamps/Watermarks -> Timestamps/Watermarks -> (Filter, Filter, Filter, Filter, Filter) (1/1)#0",
    "level": "WARN",
    "loggerName": "org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor",
    "message": "Timestamp monotony violated: 1640758878000 < 1640758879000",
    "endOfBatch": false,
    "loggerFqcn": "org.apache.logging.slf4j.Log4jLogger",
    "threadId": 119,
    "threadPriority": 5,
    "flinkJobName": "test-service",
    "yarnContainerId": "container_1624068958527_0190_01_000002"
}

其中message即为日志内容,flinkJobName为任务名称,yarnContainerId为yarn容器名称。

配置Logstash

Logstash配置很简单,如果不需要对数据进行处理,只需要一个输入一个输出即可。

编辑logstash-sample.conf

input {
  kafka {
    bootstrap_servers => ["kafka01:9092,kafka02:9092,kafka03:9092"]
    group_id => "logstash-group"
    topics => ["flink-log"]
    consumer_threads => 6
    type => "flink-log"
    codec => "json"
    auto_offset_reset => "latest"
  }
}

output {
  if [type] == "flink-log" {
    elasticsearch {
      hosts => ["es06:9200","es07:9200","es08:9200"]
      index => "flink-log_%{+YYYY-MM-dd}"
      user => ""
      password => ""
    }
  }
}

启动Logstash

nohup ./bin/logstash -f ./config/logstash-sample.conf  --path.data=/data/  &

观察日志,不报错即可。

到此,Flink日志就已经进入ES了

image-20211229143106645

接下来在Kibana中配置索引模式即可。