您现在的位置是:首页 >其他 >Flink sql网站首页其他

Flink sql

风声是歌 2024-06-16 12:01:02
简介Flink sql

1.创建表的执行环境

第一种

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> streamOperator = env.addSource(new ClickSource())
        .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event event, long l) {
                        return event.timestamp;
                    }
                }));


//创建表的执行环境
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);

//转化为table
Table table  = streamTableEnvironment.fromDataStream(streamOperator);

第二种

EnvironmentSettings settings =EnvironmentSettings.newInstance()
        .inStreamingMode()
        .useBlinkPlanner()
        .build();
TableEnvironment tableEnv =  TableEnvironment.create(settings);

2.注册表用于把数据载入输入

String inputDDL = " CREATE TABLE clicktable ( "+
        "url STRING ,"+
        "user_name STRING," +
        "timestamp BIGINT "+
        " ) with ("+
        " 'connector' = 'filesystem' ,"+
        " 'path' = 'file/click.txt', "+
        " 'format' = 'csv' )";

TableResult tableResult = tableEnv.executeSql(inputDDL);

3.通过sql查询语句得到一张结果表result

Table result= tableEnv.sqlQuery("select url,count(1) as cnt from clicktable group by url");

4.表输出

String createDDL = " CREATE TABLE output ( "+
        "url STRING ,"+
        "cnt BIGINT ) with ("+
        " 'connector' = 'filesystem' ,"+
        " 'path' = 'output', "+
        " 'format' = 'csv' )";

tableEnv.executeSql(createDDL);

result.executeInsert("output");

 

流表转换:

1.表转流

StreamTableEnvironment 

todataStream 

toChangelogStream

2.流转表

 

 

 

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。