flinksql读写redis( 二 )

2.3、使用HGET作为维表查询命令@Testpublic void testHGetSQL() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"proctime as PROCTIME() \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";/**这里需要注意的是,由于使用hget命令,而且没有加format属性,所以维表只能有三个字段,多了也识别不到 , 详细可以看源码里的注释*/String daeamon ="CREATE TABLE classes\n" +"(\n" +"schoolstring, \n" +"class_idBIGINT,\n" +"class_namestring" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'lookup.cache.max-rows'='1000',\n" +"'lookup.cache.ttl'='3600',\n" +"'lookup.cache.load-all'='true',\n" +"'database'='0',\n" +"'command'='HGET'\n" +" )";String sink ="CREATE TABLE sink_students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"class_namestring \n" +") \n" +"WITH (\n" +"'connector'='print'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(daeamon);tEnv.executeSql(sink);/**这里需要注意的是,由于使用hget命令,这里join的参数两个参数顺序没有关系,真正执行hget命令哪个字段作为key,哪个字段作为field只与维表定义的时候的字段顺序有关系*/String sql =" insert into sink_students "+ " select s.number,s.name,s.school,s.class_id,d.class_namefrom students s"+ "left join classes for system_time as of s.proctime as don d.class_id = s.class_id and d.school = s.school";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();}3、redis作为维表(带format)3.1、数据准备@Beforepublic void init() {RedisOptions.IS_TEST = true;RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);List<String> lists = new ArrayList<>();for (int i = 0; i < 1000; i++) {lists.add("{\n" +"\"number\": " + i + ",\n" +"\"name\": \"学生" + i + "\",\n" +"\"school\": \"学校" + ((i % 3) + 1) +"\",\n" +"\"class_id\": " + ((i % 10) + 1) +"\n" +"}");}/*** 初始化学生数据*/for (int i = 0; i < 1; i++) {redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));}/*** 初始化班级数据*/for(int i = 0;i < 10;i++) {JSONObject jsonObject = new JSONObject();jsonObject.put("class_id",String.valueOf(i + 1));jsonObject.put("class_name","银河" + (i + 1) + "班");jsonObject.put("remark","remark" + i);redisOperator.set(String.valueOf(i + 1),jsonObject.toString());}/*** 初始化学校班级数据*/for(int j = 1;j < 4;j++) {for (int i = 1; i < 11; i++) {JSONObject jsonObject = new JSONObject();jsonObject.put("class_id",String.valueOf(i));jsonObject.put("class_name","银河" + i + "班");jsonObject.put("remark","remark" + i);jsonObject.put("school","学校" + j);redisOperator.hset("学校" + j, String.valueOf(i), jsonObject.toString());}}}3.2、使用GET作为维表查询命令@Testpublic void testGetSQL() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"proctime as PROCTIME() \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";/*** 这里测试的核心是维表有format=json配置项 , 有了format配置项后,字段个数不受限制 , 但是需要注意的是,作为get命令的key的字段* 一定要放在表申明的第一位 , 并且get命令的value的值使用format格式化后 , 比如是json格式,则json里一定要包含作为维表查询的*join on后面带的作为key的查询列 , 不然会报空指针异常*/String daeamon ="CREATE TABLE classes\n" +"(\n" +"class_idBIGINT,\n" +"class_namestring ,\n" +"remarkstring" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'format'='json', \n" +"'password'='123456',\n" +"'lookup.cache.max-rows'='1000',\n" +"'lookup.cache.ttl'='3600',\n" +"'lookup.cache.load-all'='true',\n" +"'database'='0',\n" +"'command'='GET'\n" +" )";String sink ="CREATE TABLE sink_students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"class_namestring, \n" +"remarkstring" +") \n" +"WITH (\n" +"'connector'='print'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(daeamon);tEnv.executeSql(sink);String sql =" insert into sink_students "+ " select s.number,s.name,s.school,s.class_id,d.class_name,d.remarkfrom students s"+ "left join classes for system_time as of s.proctime as don d.class_id = s.class_id";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();}

推荐阅读