实战!用 Flink 打通 Kafka MySQL 的数据链路,附完整代码

在实时数据处理场景中,“Kafka 接收数据 + Flink 处理数据 + MySQL 存储结果是非常经典的架构。比如用户行为日志实时统计、订单数据实时同步等场景,都离不开这套流程。

今天就带大家从零搭建这套数据链路,包含完整的依赖配置、代码实现和环境准备,新手也能跟着做!

一、先理清核心流程

在写代码前,我们先明确整个数据流转的逻辑:

1. 数据来源Kafka 主题存储原始数据(比如用户信息 JSON 串)

2. 数据处理Flink 作为计算引擎,从 Kafka 消费数据,解析、转换格式

3. 数据落地Flink 将处理后的数据写入 MySQL 表,供后续查询分析

整个流程的核心是 Flink 对上下游的连接能力 —— 通过 Kafka Connector 读数据,通过 JDBC Connector 写数据,中间再做简单的数据转换。

二、环境与依赖准备

工欲善其事,必先利其器。首先要搞定依赖配置和基础环境。

1. 技术版本选型

为了避免版本兼容性问题,这里给出经过验证的版本组合:

• Flink1.17.0(稳定版,社区支持完善)

• Kafka2.8.x 3.x(与 Flink 1.17 兼容性良好)

• MySQL8.0.x(注意驱动类与 5.x 不同)

• JDK1.8Flink 1.17 仍推荐 JDK 8

2. Maven 依赖配置(pom.xml

IDEA 中新建 Maven 项目,添加以下核心依赖(复制即用):

 

<dependencies>

    <!– 1. Flink 核心依赖 –>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-java</artifactId>

        <version>1.17.0</version>

    </dependency>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-streaming-java</artifactId>

        <version>1.17.0</version>

    </dependency>

    <!– 2. Flink Kafka 连接器(读 Kafka 用) –>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-connector-kafka</artifactId>

        <version>1.17.0</version>

    </dependency>

    <!– 3. MySQL 驱动 + Flink JDBC 连接器(写 MySQL 用) –>

    <dependency>

        <groupId>mysql</groupId>

        <artifactId>mysql-connector-java</artifactId>

        <version>8.0.33</version>

    </dependency>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-connector-jdbc</artifactId>

        <version>3.1.0-1.17</version>

    </dependency>

</dependencies>

注意:如果是生产环境,建议添加 flink-clients 依赖(用于提交任务),并配置打包插件(避免依赖缺失)。

三、完整代码实现(分步骤讲解)

接下来是核心代码,我会按 环境初始化 Kafka → 数据处理 MySQL” 的顺序拆解,每个步骤都加了详细注释。

1. 定义数据模型类(User

首先定义一个 User 类,对应 Kafka 的原始数据格式和 MySQL 表结构(字段要一一对应):

 

// 数据模型:与 Kafka 消息、MySQL 表字段对齐

class User {

    private int id;         // 主键(MySQL 表主键)

    private String name;    // 用户名

    private int age;        // 年龄

    // 必须有默认构造函数(Flink 序列化需要)

    public User() {}

    // 带参构造函数(用于转换数据)

    public User(int id, String name, int age) {

        this.id = id;

        this.name = name;

        this.age = age;

    }

    // Getter 方法(JDBC 写入时需要获取字段值)

    public int getId() { return id; }

    public String getName() { return name; }

    public int getAge() { return age; }

}

2. 主程序(KafkaToMySQL

主程序是整个链路的核心,包含 4 个关键步骤:

 

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;

import org.apache.flink.connector.jdbc.JdbcSink;

import java.util.Properties;

public class KafkaToMySQL {

    public static void main(String[] args) throws Exception {

        // ==================== 步骤1:初始化 Flink 执行环境 ====================

        // 创建流处理环境(本地测试用这个,集群环境用 getExecutionEnvironment()

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 启用 Checkpoint(可选但推荐,保证数据不丢不重)

        // 5 秒做一次 Checkpoint,实际可根据业务调整

        env.enableCheckpointing(5000);

        // ==================== 步骤2:配置 Kafka 消费者,读取数据 ====================

        // 1. Kafka 连接参数

        Properties kafkaProps = new Properties();

        kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); // Kafka 集群地址(多地址用逗号分隔)

        kafkaProps.setProperty("group.id", "flink-kafka-group-01");    // 消费者组 ID(同一组共享 offset

        kafkaProps.setProperty("auto.offset.reset", "earliest");       // offset 时从最早位置消费

        // 2. 定义 Kafka 主题(提前在 Kafka 中创建)

        String kafkaTopic = "user-topic";

        // 3. 创建 Kafka 消费者(读取 JSON 字符串格式的消息)

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(

                kafkaTopic,                  // 要消费的主题

                new SimpleStringSchema(),    // 反序列化器(这里用字符串反序列化)

                kafkaProps                   // 连接参数

        );

        // 4. Kafka 读取数据,得到字符串类型的数据流

        DataStream<String> kafkaDataStream = env.addSource(kafkaConsumer);

        // ==================== 步骤3:数据处理(JSON 解析 转换为 User 对象) ====================

        // 假设 Kafka 中的消息格式是:{"id":1,"name":"张三","age":20}

        DataStream<User> userDataStream = kafkaDataStream

                .map(new MapFunction<String, User>() {

                    @Override

                    public User map(String jsonString) throws Exception {

                        // 这里简化了解析逻辑,生产环境建议用 Jackson/Gson 解析 JSON

                        // 步骤:去掉 JSON {} ",按逗号分割字段

                        String[] fields = jsonString.replaceAll("[{}\"]", "").split(",");

                        // 提取每个字段的值

                        int id = Integer.parseInt(fields[0].split(":")[1]);

                        String name = fields[1].split(":")[1];

                        int age = Integer.parseInt(fields[2].split(":")[1]);

                        // 返回 User 对象

                        return new User(id, name, age);

                    }

                })

                .name("ParseKafkaJsonToUser"); // 给算子起名字,方便在 Flink UI 查看

        // ==================== 步骤4:将 User 数据写入 MySQL ====================

        // 1. MySQL 连接配置

        String mysqlUrl = "jdbc:mysql://localhost:3306/flink_db?useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true";

        String mysqlUsername = "root";       // MySQL 用户名

        String mysqlPassword = "123456";     // MySQL 密码(替换成你的密码)

        // 2. 创建 JDBC Sink,写入 MySQL

        userDataStream.addSink(

                JdbcSink.sink(

                        // SQL 语句:插入或更新(避免主键冲突)

                        "INSERT INTO user_info (id, name, age) " +

                        "VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE name=?, age=?",

                        // SQL ? 赋值(顺序要和 SQL 中的 ? 对应)

                        (statement, user) -> {

                            statement.setInt(1, user.getId());       // 第一个 ?id

                            statement.setString(2, user.getName());  // 第二个 ?name

                            statement.setInt(3, user.getAge());       // 第三个 ?age

                            statement.setString(4, user.getName());  // 第四个 ?:更新的 name

                            statement.setInt(5, user.getAge());       // 第五个 ?:更新的 age

                        },

                        // MySQL 连接选项

                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

                                .withUrl(mysqlUrl)                  // MySQL 地址

                                .withUsername(mysqlUsername)        // 用户名

                                .withPassword(mysqlPassword)        // 密码

                                .withDriverName("com.mysql.cj.jdbc.Driver") // 驱动类(8.0 用这个)

                                .build()

                )

        ).name("WriteUserToMySQL"); // 算子命名,方便监控

        // ==================== 步骤5:启动 Flink 任务 ====================

        env.execute("Flink-Kafka-To-MySQL-Task"); // 任务名称,在 Flink UI 中显示

    }

}

四、前置准备:初始化 Kafka MySQL

代码写好后,不能直接跑,需要先准备好 Kafka MySQL 环境。

1. MySQL 准备(创建数据库和表)

登录 MySQL,执行以下 SQL,创建数据库 flink_db 和表 user_info

 

— 1. 创建数据库(如果不存在)

CREATE DATABASE IF NOT EXISTS flink_db;

— 2. 使用数据库

USE flink_db;

— 3. 创建用户表(id 为主键,避免重复插入)

CREATE TABLE IF NOT EXISTS user_info (

    id INT PRIMARY KEY,

    name VARCHAR(50) NOT NULL,

    age INT NOT NULL

);

2. Kafka 准备(启动集群 + 创建主题)

第一步:启动 Kafka 集群

如果是本地测试,先启动 ZookeeperKafka 依赖 Zookeeper 存储元数据),再启动 Kafka

 

# 启动 ZookeeperKafka 自带,在 bin 目录下)

./zookeeper-server-start.sh ../config/zookeeper.properties

# 启动 Kafka(新窗口执行)

./kafka-server-start.sh ../config/server.properties

第二步:创建 Kafka 主题

执行以下命令,创建名为 user-topic 的主题(和代码中的主题名一致):

 

# 创建主题:1 个分区,1 个副本(本地测试足够)

./kafka-topics.sh –create –topic user-topic –bootstrap-server localhost:9092 –partitions 1 –replication-factor 1

第三步:发送测试数据

为了验证链路,我们可以用 Kafka 自带的生产者工具发送一条测试数据:

 

# 启动生产者(输入数据后按回车发送)

./kafka-console-producer.sh –topic user-topic –bootstrap-server localhost:9092

# 发送以下 JSON 数据(复制粘贴后回车)

{"id":1,"name":"张三","age":20}

五、运行与验证

1. 运行 Flink 程序

IDEA 中直接运行 KafkaToMySQL 类的 main 方法(本地测试用)。如果是集群环境,需要先打包成 JAR,再用 flink run 命令提交。

2. 验证数据是否写入 MySQL

登录 MySQL,执行查询语句,查看是否有数据:

 

USE flink_db;

SELECT * FROM user_info;

如果能看到 id=1, name=张三, age=20 的数据,说明整个链路通了!

六、生产环境优化建议

上面的代码是基础版本,生产环境需要做这些优化:

1.  JSON 解析优化

不要用 replaceAll 这种暴力解析方式,建议用 Jackson Gson 框架,示例(Jackson):

 

// 1. 引入 Jackson 依赖(pom.xml

<dependency>

    <groupId>com.fasterxml.jackson.core</groupId>

    <artifactId>jackson-databind</artifactId>

    <version>2.15.2</version>

</dependency>

// 2. 解析代码

ObjectMapper objectMapper = new ObjectMapper();

User user = objectMapper.readValue(jsonString, User.class);

2. 错误处理

添加侧输出流处理解析失败的数据(比如格式错误的 JSON),避免整个任务失败:

 

// 定义侧输出流标签

OutputTag<String> errorTag = new OutputTag<String>("parse-error"){};

// process 算子替换 map,处理错误

SingleOutputStreamOperator<User> userStream = kafkaDataStream.process(new ProcessFunction<String, User>() {

    @Override

    public void processElement(String value, Context ctx, Collector<User> out) throws Exception {

        try {

            // 正常解析逻辑

            User user = objectMapper.readValue(value, User.class);

            out.collect(user);

        } catch (Exception e) {

            // 解析失败,发送到侧输出流

            ctx.output(errorTag, value);

        }

    }

});

// 打印错误数据(或写入其他存储)

DataStream<String> errorStream = userStream.getSideOutput(errorTag);

errorStream.print("Parse Error: ");

3. 性能优化

• 批量写入:在 MySQL 连接 URL 中添加 rewriteBatchedStatements=true(已在代码中加了),开启批量写入

• 调整并行度:根据 Kafka 分区数和 MySQL 性能,调整 Flink 算子的并行度(比如 setParallelism(2)

• 连接池配置:通过 JdbcExecutionOptions 配置批量大小和重试次数:

 

JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder()

        .withBatchSize(1000)    // 1000 条数据批量写入

        .withMaxRetries(3)      // 失败重试 3

        .build();

// JdbcSink 中添加 executionOptions

JdbcSink.sink(sql, statementSetter, executionOptions, connectionOptions)

4. 数据一致性

开启 Flink Checkpoint + MySQL 的事务,实现Exactly-Once(精确一次)语义:

• Checkpoint 间隔:根据业务容忍度调整(比如 10 秒)

• MySQL 事务:Flink JDBC Sink 会自动在 Checkpoint 时提交事务

七、常见问题排查

1. Kafka 连接失败:检查 bootstrap.servers 地址是否正确,Kafka 集群是否启动,端口是否开放

2. MySQL 连接失败:检查 mysqlUrl 中的 IP、端口、数据库名是否正确,用户名密码是否对,MySQL 是否允许远程连接

3. 数据写入后查不到:检查 SQL 语句是否正确,主键是否冲突(看 ON DUPLICATE KEY UPDATE 是否生效)

4. 任务启动报错 “ClassNotFoundException”:检查依赖是否缺失,尤其是 Kafka MySQL 驱动

总结

今天我们完整实现了 “Kafka → Flink → MySQL” 的实时数据链路,从依赖配置到代码实现,再到环境准备和优化,覆盖了新手入门的所有关键点。

这套架构的灵活性很高,比如要处理复杂业务逻辑,可以在 Flink 中添加 keyBywindow 等算子;要更换存储,只需替换 Sink(比如写 HBaseClickHouse)。

如果在实践中遇到问题,欢迎在评论区留言,我们一起讨论解决!