



package com.demo.flink.sink;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;

 * @author jiajingsi
 * Date 2019-04-09 14:24
 * Description
 * Version 1.0
public class CustomKafkaSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {

    private final static String ENCODING = "UTF8";

     * Method to decide whether the element signals the end of the stream. If
     * true is returned the element won't be emitted.
     * @param nextElement The element to test for the end-of-stream signal.
     * @return True, if the element signals end of stream, false otherwise.
    public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
        return false;

     * Deserializes the Kafka record.
     * @param record Kafka record to be deserialized.
     * @return The deserialized message as an object (null if the message cannot be deserialized).
    public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return ConsumerRecord(record.topic(),
                new String(record.key(), ENCODING),
                new String(record.value(), ENCODING));

    public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
        return null;

当我们需要将处理完的数据写入Kafka时,因为FlinkKafkaProducer011需要传入的必要参数有两个:String topic,xxSerializationSchema serializationSchema。第二个参数就是写入消息的格式。可以通过实现两个方法来达到我们的目的:SerializationSchema、KeyedSerializationSchema,后者在1.10版本时已经过时,所以我们可以通过前者来实现。两者本质上的区别可能就是后者是key/value格式的数据,同时后者也允许重写目标topic这样可以满足我们的数据动态写入不同topic的需求。因为FlinkKafkaProducer011都继承了TwoPhaseCommitSinkFunction,从而保证数据不丢失。KeyedSerializationSchema也可以通过Semantic来实现EXACTLY_ONCE仅一次语义。具体可以自己查看FlinkKafkaProducer011。

package com.demo.flink.sink;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

 * @author jiajingsi
 * Date 2019-04-09 14:24
 * Description
 * Version 1.0
public class CustomKeyedSerializationSchema implements KeyedSerializationSchema<ObjectNode> {

    private final static String ENCODING = "UTF8";

     * Serializes the key of the incoming element to a byte array
     * This method might return null if no key is available.
     * @param record The incoming element to be serialized
     * @return the key of the element as a byte array
    public byte[] serializeKey(ObjectNode record) {
        return record.get("after").get("id").toString().getBytes();

     * Serializes the value of the incoming element to a byte array.
     * @param record The incoming element to be serialized
     * @return the value of the element as a byte array
    public byte[] serializeValue(ObjectNode record) {
        return record.toString().getBytes();

     * Optional method to determine the target topic for the element.
     * @param record Incoming element to determine the target topic from
     * @return null or the target topic
    public String getTargetTopic(ObjectNode record) {
        return "mysql-31.ods.demo";

分区KafkaProducer 默认使用的是FlinkFixedPartitioner。



自定义Flink消费和生产Kafka消息(消费时Schema、生产时Key&Value&分区) 的相关文章

  • aspose操作文档

    操作aspose版本 aspose words 21 4 官网下载地址 https releases aspose com words java 问题 1 每次操作文档的时候都要重新保存 不然文件损坏 打开的文件就是乱码的 记录一些简单的功
  • RobotFramework环境配置七:多浏览器兼容性测试(1)

    多浏览器兼容性测试 1 RIDE已经支持多浏览器兼容性测试 例如 firefox ie chrome safari 但是 项目要求支持360极速和360安全浏览器 所以 我们需要增加代码让RIDE识别 其他浏览器类似 本地浏览器 说明 基于
