Fluentd kafka plugin missing tags and logs are not formatted after upgrade

Hi,
Since the upgrading of fluentd and kafka plugin, the format of the configuration changed a lot from “kafka_buffered” to “kafka2”.
After converting the configuration, I found the log failed to parse “message” filed of the events and tags are missing from kafka output.
Below is my configurations:

     <source>
          @type tail
          path /log/request.log
          pos_file /log/request.log.pos
          emit_unmatched_lines true
          tag myapp.api.request
          read_from_head true
          <parse>
            @type none
            localtime true
            time_format %Y-%m-%dT%H:%M:%S.%NZ
          </parse>
    </source>

    <filter myapp.api.**>
      @type record_transformer
      enable_ruby
      <record>
        topic myapp-api-qa
      </record>
    </filter>

    <match myapp.api.**>
      @type kafka2

      # list of seed brokers
      brokers localhost:6667,localhost:6668

      # buffer settings
      <buffer>
        @type file
        path /fluentd/log/buffer
        # chunk + enqueue
        chunk_limit_size 16MB
        flush_mode interval
        flush_interval 5s
        # flush thread
        flush_thread_count 8

        retry_type exponential_backoff
        retry_timeout 1h
        retry_max_interval 30
        overflow_action drop_oldest_chunk
      </buffer>

      # topic settings
      default_topic myapp-api-qa

      # data type settings
      <format>
        @type json   
      </format>

       # producer settings
      max_send_retries 3
      required_acks 0
      ack_timeout 15
    </match>   

Was using “output_include_tag true” to include the tag ( from source: “tag myapp.api.request” ) for kafka output from the source, but after convert to latest format I failed to find related converting parameters from v0.12 to v1.0 style. I tried to add to the output but it will inject an empty tag, means the value failed to pass to the tag.

<inject>
  tag_key tag
</inject>

Another problem is that before upgrading, the logs were parsed into json with all fields including “message” parsed into fields, but after upgrading, I found the “message” filed is not parsed into fields in the output. Below is the kibana log screenshot before and after upgrading.

After upgrade:

Before upgrade:

Thanks for any feedback in advance.

It seems that this behavior is caused by the parser none configuration.

<source>
  @type tail
...
  <parse>
    @type json
    ...
  </parse>
</source>

The above change may help.

Thanks kenhys,
I have tried to change the source type to json, but it does not work.
Please see below is my previous configuration before upgrading:

 <source>
  @type tail
  path /log/request.log
  pos_file /log/request.log.pos
  time_format %Y-%m-%dT%H:%M:%S.%NZ
  localtime true
  tag myapp.api.request
  read_from_head true
  format none
</source>

Previously I configured with “format none” which works fine.

After added tag in the buffer, everything works, not sure why, including the parsing issue…