您现在的位置是:首页 >学无止境 >flink group by网站首页学无止境
flink group by
from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)
# write all the data to one file
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
t_env.connect(FileSystem().path('/tmp/input'))
.with_format(OldCsv().field('word', DataTypes.STRING()))
.with_schema(Schema().field('word', DataTypes.STRING()))
.create_temporary_table('mySource')
t_env.connect(FileSystem().path('/tmp/output'))
.with_format(OldCsv().field_delimiter(' ')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT()))
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT()))
.create_temporary_table('mySink')
tab = t_env.from_path('mySource')
print(dir(tab))
tab.group_by(tab.word).select(tab.word, lit(1).count).execute_insert('mySink').wait()





U8W/U8W-Mini使用与常见问题解决
QT多线程的5种用法,通过使用线程解决UI主界面的耗时操作代码,防止界面卡死。...
stm32使用HAL库配置串口中断收发数据(保姆级教程)
分享几个国内免费的ChatGPT镜像网址(亲测有效)
Allegro16.6差分等长设置及走线总结