您现在的位置是:首页 >学无止境 >flink group by网站首页学无止境

flink group by

scan724 2024-06-17 11:25:06
简介flink group by

from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)

# write all the data to one file
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
t_env.connect(FileSystem().path('/tmp/input'))
    .with_format(OldCsv().field('word', DataTypes.STRING()))
    .with_schema(Schema().field('word', DataTypes.STRING()))
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output'))
    .with_format(OldCsv().field_delimiter(' ')
    .field('word', DataTypes.STRING())
    .field('count', DataTypes.BIGINT()))
    .with_schema(Schema()
    .field('word', DataTypes.STRING())
    .field('count', DataTypes.BIGINT()))
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
print(dir(tab))
tab.group_by(tab.word).select(tab.word, lit(1).count).execute_insert('mySink').wait()

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。