一、需求背景
在有的项目中,尤其是进销存类的saas软件,一开始为了快速把产品做出来,并没有考虑缓存问题。而这类软件,有着复杂的业务逻辑。如果想在原先的代码中,添加redis缓存,改动面将非常大,还需要大量的测试工作。有些时候会有更离谱的情况,比如一些一些项目可能用JDK1.6写的,想要在这个框架下接入redis缓存,也会变得十分困难。
这时我们就会想到,能否像mysql的主从复制一样,监听mysql的binlog,对数据进行更新呢?Flink CDC就呼之欲出。
mysql_3">二、mysql环境搭建
需要注意的是,当前的flink-cdc,仅仅支持mysql8.0,8.4是完全不支持的。
由于我的mysql装的是8.4,为了方便起见,我们使用docker安装mysql8.0
2.1 docker-compose.yml
services:
master:
image: mysql:8.0.41
container_name: mysql-8
restart: always
#mem_limit: 512M
environment:
MYSQL_ROOT_PASSWORD: study@2025
TZ: Asia/Shanghai
ports:
- "3307:3306"
volumes:
- ./cfg/my.cnf:/etc/my.cnf
- ./data:/var/lib/mysql
- ./initdb:/docker-entrypoint-initdb.d
- ./dump:/var/dump
- ./log:/var/log
networks:
- mysql-cluster
networks:
mysql-cluster:
2.2 初始化sql
-- 创建复制用户
create role role_app;
GRANT SELECT,UPDATE,INSERT,DELETE ON *.* to role_app;
GRANT REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO role_app;
CREATE USER 'app'@'%' IDENTIFIED WITH caching_sha2_password by 'study@2025' DEFAULT ROLE role_app COMMENT 'app user';
FLUSH PRIVILEGES;
-- 创建两个数据库,用于测试
CREATE SCHEMA `shop-center`;
FLUSH TABLES WITH READ LOCK;
2.3 注意点
首先把容器卷 - ./cfg/my.cnf:/etc/my.cnf的这一句注释掉,启动服务
而后使用下面语句,把配置文件粘出来
docker exec <id> cp /etc/my.cnf ./cfg/my.cnf
之后把注释打开,再重新启动
三、工程搭建与pom引用
3.1 主模块pom引用
flink程序不需要接入Spring框架,直接一个main就可运行。
但我们还想使用一些我们熟悉的接口,来操作redis和el。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>3.3.0</version>
</dependency>
3.2 common-data模块
一些entity数据,为了保持各模块共通,最好独立到一个common模块。
同时,我还会把redis和el-search的操作,在这个模块接入并封装
3.2.1 pom引用
<dependencies>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.17.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-x-content</artifactId>
<version>8.17.0</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>5.8.32</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.4.2</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2-extension-spring6</artifactId>
<version>2.0.54</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.54</version>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.4.2.RELEASE</version>
</dependency>
<!-- Flink Redis Connector -->
<!-- <dependency>-->
<!-- <groupId>org.apache.bahir</groupId>-->
<!-- <artifactId>flink-connector-redis_2.12</artifactId>-->
<!-- <version>1.1.0</version>-->
<!-- </dependency>-->
</dependencies>
3.2.2 一些基本的entity类
@Data
public class GenItemEntity{
Long id;
String name;
Long price;
String brand;
String specification;
Integer version;
}
rediselsearch_183">四、 redis操作和elsearch操作的封装
redis_184">4.1 redis操作的封装
在pom上,接入spring-data-redis
而后,我们可以使用我们熟悉的RedisTemplate来操作redis
public class RedisConfig {
public RedisConfig(){
init();
}
protected FastJsonConfig redisFastJson(){
FastJsonConfig config = new FastJsonConfig();
config.setWriterFeatures(
JSONWriter.Feature.WriteNullListAsEmpty,
// 写入类名
JSONWriter.Feature.WriteClassName,
// 将 Boolean 类型的 null 转成 false
JSONWriter.Feature.WriteNullBooleanAsFalse,
JSONWriter.Feature.WriteEnumsUsingName);
config.setReaderFeatures(
JSONReader.Feature.SupportClassForName,
// 支持autoType
JSONReader.Feature.SupportAutoType);
return config;
}
protected FastJsonRedisSerializer fastJsonRedisSerializer(FastJsonConfig pFastJsonConfig) {
FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer(Object.class);
fastJsonRedisSerializer.setFastJsonConfig(pFastJsonConfig);
return fastJsonRedisSerializer;
}
protected RedisConnectionFactory redisConnectionFactory(){
// 这里最好读配置,我懒得搞了
RedisStandaloneConfiguration redisConfiguration = new RedisStandaloneConfiguration("192.168.0.64",6379);
redisConfiguration.setPassword("study@2025");
GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(2); // 最大连接数
poolConfig.setMaxIdle(2); // 最大空闲连接数
poolConfig.setMinIdle(2); // 最小空闲连接数
poolConfig.setMaxWait(Duration.ofMillis(3000)); // 连接等待时间
ClientResources clientResources = DefaultClientResources.create();
LettucePoolingClientConfiguration lettucePoolingClientConfiguration = LettucePoolingClientConfiguration.builder()
.poolConfig(poolConfig)
.build();
LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.clientResources(clientResources)
.commandTimeout(Duration.ofSeconds(5))
.poolConfig(poolConfig)
.build();
LettuceConnectionFactory redisConnectionFactory = new LettuceConnectionFactory(redisConfiguration,lettucePoolingClientConfiguration);
redisConnectionFactory.afterPropertiesSet(); // 初始化连接工厂
return redisConnectionFactory;
}
protected RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory, FastJsonRedisSerializer pFastJsonRedisSerializer) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
redisTemplate.setConnectionFactory(factory);
redisTemplate.setEnableTransactionSupport(true);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(pFastJsonRedisSerializer);
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(pFastJsonRedisSerializer);
return redisTemplate;
}
protected void init(){
mFastJsonConfig = redisFastJson();
mFastJsonRedisSerializer = fastJsonRedisSerializer(mFastJsonConfig);
mRedisConnectionFactory = redisConnectionFactory();
mRedisTemplate = redisTemplate(mRedisConnectionFactory,mFastJsonRedisSerializer);
mRedisTemplate.afterPropertiesSet();
}
private FastJsonConfig mFastJsonConfig;
private FastJsonRedisSerializer mFastJsonRedisSerializer;
private RedisConnectionFactory mRedisConnectionFactory;
private RedisTemplate<String, Object> mRedisTemplate;
public static RedisTemplate<String, Object> redisTemplate(){
return Holder.INSTANCE.mRedisTemplate;
}
public static <T> String serialize(T entity){
return JSON.toJSONString(entity,Holder.INSTANCE.mFastJsonConfig.getWriterFeatures());
}
private static class Holder {
private static final RedisConfig INSTANCE = new RedisConfig();
}
}
elasticsearch_286">4.2 elasticsearch操作的封装
由于el-search的连接器,需要配置apikey,以及https,我们最好使用yml配置,并且把http_ca.crt放进该模块的resouce中。
在IDEA环境下,有可能找不到子模块的资源,这时在主模块引入子模块时,只需要这样配置即可:
<dependency>
<groupId>indi.zhifa.study2025</groupId>
<artifactId>common-data</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
注意,重点是<scope>compile</scope>
public class EsClientConfig {
@Setter
@Getter
private String host;
@Setter
@Getter
private Integer port;
@Setter
@Getter
private String apiKey;
}
public class ElasticSearchClientProvider {
private EsClientConfig esClientConfig;
private RestClientBuilder builder;
public ElasticSearchClientProvider() {
try{
init();
}catch (Exception e){
e.printStackTrace();
}
}
public void init() throws IOException {
Yaml yaml = new Yaml();
try (InputStream inputStream = FileUtil.class.getClassLoader().getResourceAsStream("el-config.yml")) {
if (inputStream == null) {
throw new IllegalArgumentException("File not found: el-config.yml");
}
esClientConfig = yaml.loadAs(inputStream, EsClientConfig.class);
} catch (Exception e) {
throw new RuntimeException("Failed to load YAML file", e);
}
SSLContext sslContext;
try (InputStream inputStream = FileUtil.class.getClassLoader().getResourceAsStream("http_ca.crt")){
sslContext = TransportUtils.sslContextFromHttpCaCrt(inputStream);
}catch (Exception e) {
throw new RuntimeException("Failed to load http_ca.crt", e);
}
builder = RestClient.builder(
new HttpHost(esClientConfig.getHost(), esClientConfig.getPort(), "https") // 替换为你的Elasticsearch地址
).setDefaultHeaders(new Header[]{
new BasicHeader("Authorization", "ApiKey " + esClientConfig.getApiKey())
})
.setFailureListener(new RestClient.FailureListener(){
@Override
public void onFailure(Node node) {
super.onFailure(node);
}
}).setHttpClientConfigCallback(hc->
hc.setSSLContext(sslContext)
);
}
public ElasticsearchClient get(){
RestClient restClient = builder.build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchClient esClient = new ElasticsearchClient(transport);
return esClient;
}
public static ElasticSearchClientProvider getInstance(){
return Holder.INSTANCE;
}
private static class Holder {
private static final ElasticSearchClientProvider INSTANCE = new ElasticSearchClientProvider();
}
}
rediselsearchsink_385">五、 redis和elsearch的自定义sink编写
redissink_386">5.1 redis的sink编写
我们希望传入redis时,数据是被处理好的,redis的sink不需要处理任何逻辑,只管更新缓存和删除缓存。
5.1.1 RedisSinkCommand
public class RedisSinkCommand<T> {
@Setter
@Getter
protected ERedisCommand command;
@Setter
@Getter
protected long dua;
@Setter
@Getter
protected String key;
@Setter
@Getter
protected T value;
public void initSet(String pKey, T pValue) {
command = ERedisCommand.SET;
dua = 300;
key = pKey;
value = pValue;
}
public void initDel(String pKey) {
command = ERedisCommand.DEL;
key = pKey;
}
}
public enum ERedisCommand {
SET,
DEL
}
5.1.2 SpringDataRedisSink
@Slf4j
public class SpringDataRedisSink<T> implements Sink<RedisSinkCommand<T>> {
@Override
public SinkWriter<RedisSinkCommand<T>> createWriter(InitContext context) throws IOException {
return null;
}
@Override
public SinkWriter<RedisSinkCommand<T>> createWriter(WriterInitContext context){
return new LettuceRedisSinkWriter();
}
class LettuceRedisSinkWriter implements SinkWriter<RedisSinkCommand<T>> {
@Override
public void write(RedisSinkCommand<T> pCmd, Context context) throws IOException, InterruptedException {
RedisTemplate<String, Object> redisTemplate = RedisConfig.redisTemplate();
switch (pCmd.getCommand()){
case SET-> {
redisTemplate.opsForValue().set(pCmd.getKey(),pCmd.getValue(),pCmd.getDua());
}
case DEL -> {
redisTemplate.delete(pCmd.getKey());
}
}
}
@Override
public void flush(boolean endOfInput) throws IOException, InterruptedException {
}
@Override
public void close() throws Exception {
}
}
}
elasticsearchsink_470">5.2 elasticsearch的sink编写
elasticsearch的sink与redis的要求一致,在sink中不关心业务逻辑
5.2.1 ElCommand
@Data
public class ElCommand<T> {
protected EElCommand command;
protected String index;
protected T entity;
protected String id;
}
public enum EElCommand {
CREATE,UPDATE,DELETE
}
5.2.2 ElSearchSink
public class ElSearchSink<T> implements Sink<ElCommand<T>> {
@Override
public SinkWriter<ElCommand<T>> createWriter(InitContext context) throws IOException {
return null;
}
@Override
public SinkWriter<ElCommand<T>> createWriter(WriterInitContext context){
return new ElSearchSink.ElSearchSinkWriter();
}
class ElSearchSinkWriter implements SinkWriter<ElCommand<T>> {
@Override
public void write(ElCommand<T> pCmd, Context context) throws IOException, InterruptedException {
ElasticSearchClientProvider elasticSearchClientProvider = ElasticSearchClientProvider.getInstance();
ElasticsearchClient elClient = elasticSearchClientProvider.get();
String index = pCmd.getIndex();
String id = pCmd.getId();
T entity = pCmd.getEntity();
switch (pCmd.getCommand()){
case CREATE,UPDATE -> {
elClient.index(i->i.index(index).id(id).document(entity));
}
case DELETE -> {
elClient.delete(d->d.index(index).id(id));
}
}
}
@Override
public void flush(boolean endOfInput) throws IOException, InterruptedException {
}
@Override
public void close() throws Exception {
}
}
}
六、主函数编写
public class FlinkMain {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("192.168.0.64")
.port(3307)
.databaseList("shop-center") // set captured database
.tableList("shop-center.item") // set captured table
.username("app")
.password("study@2025")
.serverTimeZone("Asia/Shanghai")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.startupOptions(StartupOptions.latest())
.includeSchemaChanges(true)
.build();
// FlinkJedisPoolConfig jedisConfig = new FlinkJedisPoolConfig.Builder()
// .setHost("192.168.0.64") // 替换为 Redis 主机
// .setPort(6379) // Redis 端口
// .setPassword("ilv0404@1314") // 如果有密码,设置密码
// .build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// DataStream<BinlogInfo> mysqlStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"Mysql source")
// .map(str->{
// BinlogInfo res =JSONObject.parseObject(str, BinlogInfo.class);
// return res;
// }
// ).filter(bi->bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d"));
//
// mysqlStream.addSink(new RedisSink(jedisConfig,new RedisItemMapper()));
DataStream<RedisSinkCommand<GenItemEntity>> newMysqlStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"Mysql source to redis")
.map(str->JSONObject.parseObject(str, new TypeReference<BinlogInfo<GenItemEntity>>() {
}), TypeInformation.of(new TypeHint<BinlogInfo<GenItemEntity>>() {}))
.filter(bi->bi.getSource().getTable().equals("item") && (bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d")))
.map(bi->{
String op = bi.getOp();
GenItemEntity itemEntity = bi.getAfter();
String key = "item:"+itemEntity.getId();
switch (op){
case "c","u"->{
RedisSinkCommand<GenItemEntity> redisSinkCommand = new RedisSinkCommand();
redisSinkCommand.initSet(key,itemEntity);
return redisSinkCommand;
}
case "d" ->{
RedisSinkCommand<GenItemEntity> redisSinkCommand = new RedisSinkCommand();
redisSinkCommand.initDel(key);
return redisSinkCommand;
}
default -> {
RedisSinkCommand<GenItemEntity> redisSinkCommand = new RedisSinkCommand();
redisSinkCommand.initDel(key);
return redisSinkCommand;
}
}
},TypeInformation.of(new TypeHint<RedisSinkCommand<GenItemEntity>>() {}));
newMysqlStream.sinkTo(new SpringDataRedisSink<GenItemEntity>());
DataStream<ElCommand<GenItemEntity>> mySqlToElStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"Mysql source to el")
.map(str->JSONObject.parseObject(str, new TypeReference<BinlogInfo<GenItemEntity>>() {})
, TypeInformation.of(new TypeHint<BinlogInfo<GenItemEntity>>() {}))
.filter(bi->bi.getSource().getTable().equals("item") && (bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d")))
.map(bi->{
ElCommand elCommand = new ElCommand();
GenItemEntity itemEntity = bi.getAfter();
elCommand.setId(itemEntity.getId().toString());
elCommand.setEntity(itemEntity);
elCommand.setIndex("item_npc");
String op = bi.getOp();
switch (op){
case "c"->elCommand.setCommand(EElCommand.CREATE);
case "u"->elCommand.setCommand(EElCommand.UPDATE);
case "d"->elCommand.setCommand(EElCommand.DELETE);
}
return elCommand;
},TypeInformation.of(new TypeHint<ElCommand<GenItemEntity>>() {}));
mySqlToElStream.sinkTo(new ElSearchSink());
env.execute();
}
}
七、代码展示
请道友移步码云
八、相关实践的思考
redis_624">8.1 redis相关
我这里的代码,仅仅是学习用的。在真实项目中,redis缓存的更新,通常源于查询时,如果发现缓存中没有数据,则查mysql,并把缓存数据加入redis。如果监听到表数据的更改或删除,则直接删除相应缓存,等待查询时重新加入缓存。当然,这样做在同一数据并发访问时,会有重复设置缓存的可能性,我们把这种现象叫缓存穿透。可以在更新缓存前,用redisson加个锁,防止重复读取mysql并更新redis。
public class CacheService {
@Autowired
private RedissonClient redissonClient;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private DataRepository dataRepository;
public Object getData(String key) {
// 第一次检查缓存
Object value = redisTemplate.opsForValue().get(key);
if (value != null) {
return value;
}
RLock lock = redissonClient.getLock(key + ":LOCK");
try {
// 尝试加锁,设置锁超时时间防止死锁
if (lock.tryLock(5, 30, TimeUnit.SECONDS)) {
try {
// 双重检查缓存
value = redisTemplate.opsForValue().get(key);
if (value != null) {
return value;
}
// 查询数据库
Object dbData = dataRepository.findById(key);
// 更新缓存,设置合理过期时间
redisTemplate.opsForValue().set(key, dbData, 1, TimeUnit.HOURS);
return dbData;
} finally {
lock.unlock();
}
} else {
// 未获取到锁,短暂等待后重试
Thread.sleep(100);
return redisTemplate.opsForValue().get(key);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("获取锁失败", e);
}
}
}
8.2 es相关
对于es,其实更新数据不建议采用这种方式。因为es中需要反范式设计,不可能用1张表的数据做es查询数据的。
对于电商系统的商品查询,我们可以在商品上架的时候更新es。并且商品商家状态下,不允许修改商品。商品下架时,删除es的数据。想要修改商品数据,可以先下架,再修改,而后上架。