Since the upgrading of fluentd and kafka plugin, the format of the configuration changed a lot from “kafka_buffered” to “kafka2”.
After converting the configuration, I found the log failed to parse “message” filed of the events and tags are missing from kafka output.
Below is my configurations:
<source> @type tail path /log/request.log pos_file /log/request.log.pos emit_unmatched_lines true tag myapp.api.request read_from_head true <parse> @type none localtime true time_format %Y-%m-%dT%H:%M:%S.%NZ </parse> </source> <filter myapp.api.**> @type record_transformer enable_ruby <record> topic myapp-api-qa </record> </filter> <match myapp.api.**> @type kafka2 # list of seed brokers brokers localhost:6667,localhost:6668 # buffer settings <buffer> @type file path /fluentd/log/buffer # chunk + enqueue chunk_limit_size 16MB flush_mode interval flush_interval 5s # flush thread flush_thread_count 8 retry_type exponential_backoff retry_timeout 1h retry_max_interval 30 overflow_action drop_oldest_chunk </buffer> # topic settings default_topic myapp-api-qa # data type settings <format> @type json </format> # producer settings max_send_retries 3 required_acks 0 ack_timeout 15 </match>
Was using “output_include_tag true” to include the tag ( from source: “tag myapp.api.request” ) for kafka output from the source, but after convert to latest format I failed to find related converting parameters from v0.12 to v1.0 style. I tried to add to the output but it will inject an empty tag, means the value failed to pass to the tag.
<inject> tag_key tag </inject>
Another problem is that before upgrading, the logs were parsed into json with all fields including “message” parsed into fields, but after upgrading, I found the “message” filed is not parsed into fields in the output. Below is the kibana log screenshot before and after upgrading.
Thanks for any feedback in advance.