flinksql读写redis

0、前言最近有个需求,需要使用flinksql读写redis,由于官网上并没有redis的connector,在网上找了很久,开源的几个connector又没法满足要求,所有这里就自己动手实现了一个 。已经适配了各个版本的flink,从flink1.12到flink1.15 。
简单介绍一下功能吧:

  • 将redis作为流表时支持BLPOP、BRPOP、LPOP、RPOP、SPOP等命令;使用lua脚本封装的批量弹出提高消费性能
  • 将redis作为维表时支持GET、HGET等命令;支持lookup缓存
  • 将redis作为sink表时支持LPUSH、RPUSH、SADD、SET、HSET等命令;支持指定key的ttl时间
  • 支持flink常见的序列化反序列化方式,如json、csv等,具体参见flink官网:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/overview/
1、redis作为流表1.1、数据准备
@Beforepublic void init() {/**设置当前属于测试模式,在这个测试模式下 , 当流表数据消费完成后程序会停止,方便测试,这个模式默认false*/RedisOptions.IS_TEST = true;RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);List<String> lists = new ArrayList<>();for (int i = 0; i < 1000; i++) {lists.add("{\n" +"\"number\": " + i + ",\n" +"\"name\": \"学生" + i + "\",\n" +"\"school\": \"学校" + ((i % 3) + 1) +"\",\n" +"\"class_id\": " + ((i % 10) + 1) +"\n" +"}");}/*** 初始化学生数据*/for (int i = 0; i < 1; i++) {redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));}/*** 初始化班级数据*/for(int i = 0;i < 10;i++) {redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班");}/*** 初始化学校班级数据*/for(int j = 1;j < 4;j++) {for (int i = 1; i < 11; i++) {redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班");}}}1.2、使用BLPOP、BRPOP、LPOP、RPOP、SPOP消费指定的key的list或者set的数据@Testpublic void testBlpopSQL() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";String sink ="CREATE TABLE sink_students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT \n" +") \n" +"WITH (\n" +"'connector'='print'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(sink);String sql =" insert into sink_students select * from students";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();}2、redis作为维表(不带format)2.1、数据准备@Beforepublic void init() {/**设置当前属于测试模式 , 在这个测试模式下,当流表数据消费完成后程序会停止,方便测试 , 这个模式默认false*/RedisOptions.IS_TEST = true;RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);List<String> lists = new ArrayList<>();for (int i = 0; i < 1000; i++) {lists.add("{\n" +"\"number\": " + i + ",\n" +"\"name\": \"学生" + i + "\",\n" +"\"school\": \"学校" + ((i % 3) + 1) +"\",\n" +"\"class_id\": " + ((i % 10) + 1) +"\n" +"}");}/*** 初始化学生数据*/for (int i = 0; i < 1; i++) {redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));}/*** 初始化班级数据*/for(int i = 0;i < 10;i++) {redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班");}/*** 初始化学校班级数据*/for(int j = 1;j < 4;j++) {for (int i = 1; i < 11; i++) {redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班");}}}2.2、使用GET作为维表查询命令@Testpublic void testGetSQL() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"proctime as PROCTIME() \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";/**这里需要注意的是,由于使用get命令,而且没有加format属性,所以维表只能有两个字段,多了也识别不到,详细可以看源码里的注释*/String daeamon ="CREATE TABLE classes\n" +"(\n" +"class_idBIGINT,\n" +"class_namestring" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'lookup.cache.max-rows'='1000',\n" +"'lookup.cache.ttl'='3600',\n" +"'lookup.cache.load-all'='true',\n" +"'database'='0',\n" +"'command'='GET'\n" +" )";String sink ="CREATE TABLE sink_students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"class_namestring \n" +") \n" +"WITH (\n" +"'connector'='print'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(daeamon);tEnv.executeSql(sink);/**这里join的字段必须是GET命令的key*/String sql =" insert into sink_students "+ " select s.number,s.name,s.school,s.class_id,d.class_namefrom students s"+ "left join classes for system_time as of s.proctime as don d.class_id = s.class_id";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();}

推荐阅读