实战!用 Flink 打通 Kafka 到 MySQL 的数据链路,附完整代码
在实时数据处理场景中,“Kafka 接收数据 + Flink 处理数据 + MySQL 存储结果” 是非常经典的架构。比如用户行为日志实时统计、订单数据实时同步等场景,都离不开这套流程。
今天就带大家从零搭建这套数据链路,包含完整的依赖配置、代码实现和环境准备,新手也能跟着做!
一、先理清核心流程
在写代码前,我们先明确整个数据流转的逻辑:
1. 数据来源:Kafka 主题存储原始数据(比如用户信息 JSON 串)
2. 数据处理:Flink 作为计算引擎,从 Kafka 消费数据,解析、转换格式
3. 数据落地:Flink 将处理后的数据写入 MySQL 表,供后续查询分析
整个流程的核心是 Flink 对上下游的连接能力 —— 通过 Kafka Connector 读数据,通过 JDBC Connector 写数据,中间再做简单的数据转换。
二、环境与依赖准备
工欲善其事,必先利其器。首先要搞定依赖配置和基础环境。
1. 技术版本选型
为了避免版本兼容性问题,这里给出经过验证的版本组合:
• Flink:1.17.0(稳定版,社区支持完善)
• Kafka:2.8.x 或 3.x(与 Flink 1.17 兼容性良好)
• MySQL:8.0.x(注意驱动类与 5.x 不同)
• JDK:1.8(Flink 1.17 仍推荐 JDK 8)
2. Maven 依赖配置(pom.xml)
在 IDEA 中新建 Maven 项目,添加以下核心依赖(复制即用):
<dependencies> <!– 1. Flink 核心依赖 –> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.17.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.17.0</version> </dependency> <!– 2. Flink Kafka 连接器(读 Kafka 用) –> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.17.0</version> </dependency> <!– 3. MySQL 驱动 + Flink JDBC 连接器(写 MySQL 用) –> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.33</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>3.1.0-1.17</version> </dependency> </dependencies> |
注意:如果是生产环境,建议添加 flink-clients 依赖(用于提交任务),并配置打包插件(避免依赖缺失)。
三、完整代码实现(分步骤讲解)
接下来是核心代码,我会按 “环境初始化 → 读 Kafka → 数据处理 → 写 MySQL” 的顺序拆解,每个步骤都加了详细注释。
1. 定义数据模型类(User)
首先定义一个 User 类,对应 Kafka 的原始数据格式和 MySQL 表结构(字段要一一对应):
// 数据模型:与 Kafka 消息、MySQL 表字段对齐 class User { private int id; // 主键(MySQL 表主键) private String name; // 用户名 private int age; // 年龄 // 必须有默认构造函数(Flink 序列化需要) public User() {} // 带参构造函数(用于转换数据) public User(int id, String name, int age) { this.id = id; this.name = name; this.age = age; } // Getter 方法(JDBC 写入时需要获取字段值) public int getId() { return id; } public String getName() { return name; } public int getAge() { return age; } } |
2. 主程序(KafkaToMySQL)
主程序是整个链路的核心,包含 4 个关键步骤:
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import java.util.Properties; public class KafkaToMySQL { public static void main(String[] args) throws Exception { // ==================== 步骤1:初始化 Flink 执行环境 ==================== // 创建流处理环境(本地测试用这个,集群环境用 getExecutionEnvironment()) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 启用 Checkpoint(可选但推荐,保证数据不丢不重) // 每 5 秒做一次 Checkpoint,实际可根据业务调整 env.enableCheckpointing(5000); // ==================== 步骤2:配置 Kafka 消费者,读取数据 ==================== // 1. Kafka 连接参数 Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); // Kafka 集群地址(多地址用逗号分隔) kafkaProps.setProperty("group.id", "flink-kafka-group-01"); // 消费者组 ID(同一组共享 offset) kafkaProps.setProperty("auto.offset.reset", "earliest"); // 无 offset 时从最早位置消费 // 2. 定义 Kafka 主题(提前在 Kafka 中创建) String kafkaTopic = "user-topic"; // 3. 创建 Kafka 消费者(读取 JSON 字符串格式的消息) FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>( kafkaTopic, // 要消费的主题 new SimpleStringSchema(), // 反序列化器(这里用字符串反序列化) kafkaProps // 连接参数 ); // 4. 从 Kafka 读取数据,得到字符串类型的数据流 DataStream<String> kafkaDataStream = env.addSource(kafkaConsumer); // ==================== 步骤3:数据处理(JSON 解析 → 转换为 User 对象) ==================== // 假设 Kafka 中的消息格式是:{"id":1,"name":"张三","age":20} DataStream<User> userDataStream = kafkaDataStream .map(new MapFunction<String, User>() { @Override public User map(String jsonString) throws Exception { // 这里简化了解析逻辑,生产环境建议用 Jackson/Gson 解析 JSON // 步骤:去掉 JSON 的 {} 和 ",按逗号分割字段 String[] fields = jsonString.replaceAll("[{}\"]", "").split(","); // 提取每个字段的值 int id = Integer.parseInt(fields[0].split(":")[1]); String name = fields[1].split(":")[1]; int age = Integer.parseInt(fields[2].split(":")[1]); // 返回 User 对象 return new User(id, name, age); } }) .name("ParseKafkaJsonToUser"); // 给算子起名字,方便在 Flink UI 查看 // ==================== 步骤4:将 User 数据写入 MySQL ==================== // 1. MySQL 连接配置 String mysqlUrl = "jdbc:mysql://localhost:3306/flink_db?useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"; String mysqlUsername = "root"; // MySQL 用户名 String mysqlPassword = "123456"; // MySQL 密码(替换成你的密码) // 2. 创建 JDBC Sink,写入 MySQL userDataStream.addSink( JdbcSink.sink( // SQL 语句:插入或更新(避免主键冲突) "INSERT INTO user_info (id, name, age) " + "VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE name=?, age=?", // 给 SQL 的 ? 赋值(顺序要和 SQL 中的 ? 对应) (statement, user) -> { statement.setInt(1, user.getId()); // 第一个 ?:id statement.setString(2, user.getName()); // 第二个 ?:name statement.setInt(3, user.getAge()); // 第三个 ?:age statement.setString(4, user.getName()); // 第四个 ?:更新的 name statement.setInt(5, user.getAge()); // 第五个 ?:更新的 age }, // MySQL 连接选项 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(mysqlUrl) // MySQL 地址 .withUsername(mysqlUsername) // 用户名 .withPassword(mysqlPassword) // 密码 .withDriverName("com.mysql.cj.jdbc.Driver") // 驱动类(8.0 用这个) .build() ) ).name("WriteUserToMySQL"); // 算子命名,方便监控 // ==================== 步骤5:启动 Flink 任务 ==================== env.execute("Flink-Kafka-To-MySQL-Task"); // 任务名称,在 Flink UI 中显示 } } |
四、前置准备:初始化 Kafka 和 MySQL
代码写好后,不能直接跑,需要先准备好 Kafka 和 MySQL 环境。
1. MySQL 准备(创建数据库和表)
登录 MySQL,执行以下 SQL,创建数据库 flink_db 和表 user_info:
— 1. 创建数据库(如果不存在) CREATE DATABASE IF NOT EXISTS flink_db; — 2. 使用数据库 USE flink_db; — 3. 创建用户表(id 为主键,避免重复插入) CREATE TABLE IF NOT EXISTS user_info ( id INT PRIMARY KEY, name VARCHAR(50) NOT NULL, age INT NOT NULL ); |
2. Kafka 准备(启动集群 + 创建主题)
第一步:启动 Kafka 集群
如果是本地测试,先启动 Zookeeper(Kafka 依赖 Zookeeper 存储元数据),再启动 Kafka:
# 启动 Zookeeper(Kafka 自带,在 bin 目录下) ./zookeeper-server-start.sh ../config/zookeeper.properties # 启动 Kafka(新窗口执行) ./kafka-server-start.sh ../config/server.properties |
第二步:创建 Kafka 主题
执行以下命令,创建名为 user-topic 的主题(和代码中的主题名一致):
# 创建主题:1 个分区,1 个副本(本地测试足够) ./kafka-topics.sh –create –topic user-topic –bootstrap-server localhost:9092 –partitions 1 –replication-factor 1 |
第三步:发送测试数据
为了验证链路,我们可以用 Kafka 自带的生产者工具发送一条测试数据:
# 启动生产者(输入数据后按回车发送) ./kafka-console-producer.sh –topic user-topic –bootstrap-server localhost:9092 # 发送以下 JSON 数据(复制粘贴后回车) {"id":1,"name":"张三","age":20} |
五、运行与验证
1. 运行 Flink 程序
在 IDEA 中直接运行 KafkaToMySQL 类的 main 方法(本地测试用)。如果是集群环境,需要先打包成 JAR,再用 flink run 命令提交。
2. 验证数据是否写入 MySQL
登录 MySQL,执行查询语句,查看是否有数据:
USE flink_db; SELECT * FROM user_info; |
如果能看到 id=1, name=张三, age=20 的数据,说明整个链路通了!
六、生产环境优化建议
上面的代码是基础版本,生产环境需要做这些优化:
1. JSON 解析优化
不要用 replaceAll 这种暴力解析方式,建议用 Jackson 或 Gson 框架,示例(Jackson):
// 1. 引入 Jackson 依赖(pom.xml) <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.15.2</version> </dependency> // 2. 解析代码 ObjectMapper objectMapper = new ObjectMapper(); User user = objectMapper.readValue(jsonString, User.class); |
2. 错误处理
添加侧输出流处理解析失败的数据(比如格式错误的 JSON),避免整个任务失败:
// 定义侧输出流标签 OutputTag<String> errorTag = new OutputTag<String>("parse-error"){}; // 用 process 算子替换 map,处理错误 SingleOutputStreamOperator<User> userStream = kafkaDataStream.process(new ProcessFunction<String, User>() { @Override public void processElement(String value, Context ctx, Collector<User> out) throws Exception { try { // 正常解析逻辑 User user = objectMapper.readValue(value, User.class); out.collect(user); } catch (Exception e) { // 解析失败,发送到侧输出流 ctx.output(errorTag, value); } } }); // 打印错误数据(或写入其他存储) DataStream<String> errorStream = userStream.getSideOutput(errorTag); errorStream.print("Parse Error: "); |
3. 性能优化
• 批量写入:在 MySQL 连接 URL 中添加 rewriteBatchedStatements=true(已在代码中加了),开启批量写入
• 调整并行度:根据 Kafka 分区数和 MySQL 性能,调整 Flink 算子的并行度(比如 setParallelism(2))
• 连接池配置:通过 JdbcExecutionOptions 配置批量大小和重试次数:
JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder() .withBatchSize(1000) // 每 1000 条数据批量写入 .withMaxRetries(3) // 失败重试 3 次 .build(); // 在 JdbcSink 中添加 executionOptions JdbcSink.sink(sql, statementSetter, executionOptions, connectionOptions) |
4. 数据一致性
开启 Flink 的 Checkpoint + MySQL 的事务,实现Exactly-Once(精确一次)语义:
• Checkpoint 间隔:根据业务容忍度调整(比如 10 秒)
• MySQL 事务:Flink JDBC Sink 会自动在 Checkpoint 时提交事务
七、常见问题排查
1. Kafka 连接失败:检查 bootstrap.servers 地址是否正确,Kafka 集群是否启动,端口是否开放
2. MySQL 连接失败:检查 mysqlUrl 中的 IP、端口、数据库名是否正确,用户名密码是否对,MySQL 是否允许远程连接
3. 数据写入后查不到:检查 SQL 语句是否正确,主键是否冲突(看 ON DUPLICATE KEY UPDATE 是否生效)
4. 任务启动报错 “ClassNotFoundException”:检查依赖是否缺失,尤其是 Kafka 和 MySQL 驱动
总结
今天我们完整实现了 “Kafka → Flink → MySQL” 的实时数据链路,从依赖配置到代码实现,再到环境准备和优化,覆盖了新手入门的所有关键点。
这套架构的灵活性很高,比如要处理复杂业务逻辑,可以在 Flink 中添加 keyBy、window 等算子;要更换存储,只需替换 Sink(比如写 HBase、ClickHouse)。
如果在实践中遇到问题,欢迎在评论区留言,我们一起讨论解决!