您现在的位置是:首页 >技术交流 >pyflink读取文件并行度问题网站首页技术交流

pyflink读取文件并行度问题

scan724 2024-07-01 11:59:55
简介pyflink读取文件并行度问题

[root@master pyflink]# cat /root/pyflink/test.log 
111111111   aaaaa
222222222   bbbbb
111111111   ccccc
222222222   ddddd
333333333   eeeee
111111111   fffff
111111111   ggggg
111111111   eeeee
111111111   hhhhh
111111111   iiiii
111111111   jjjjj
222222222   eeeee


[root@master pyflink]# cat t107.py 
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironment


from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds1 = env.read_text_file("/root/pyflink/test.log",'utf-8')
ds1.print()
env.execute()
[root@master pyflink]# python3 t107.py 
111111111   aaaaa
222222222   bbbbb
111111111   ccccc
222222222   ddddd
333333333   eeeee
111111111   fffff
111111111   ggggg
111111111   eeeee
111111111   hhhhh
111111111   iiiii
111111111   jjjjj
222222222   eeeee


并发读取乱序:

[root@master pyflink]# cat t107.py 
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironment


from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(3)
ds1 = env.read_text_file("/root/pyflink/test.log",'utf-8')
ds1.print()
env.execute()
[root@master pyflink]# python3 t107.py 
3> 111111111   iiiii
3> 111111111   jjjjj
3> 222222222   eeeee
2> 111111111   fffff
2> 111111111   ggggg
1> 111111111   aaaaa
1> 222222222   bbbbb
2> 111111111   eeeee
2> 111111111   hhhhh
1> 111111111   ccccc
1> 222222222   ddddd
1> 333333333   eeeee

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。