您现在的位置是:首页 >技术交流 >DataGen连接器网站首页技术交流

DataGen连接器

码上淘金 2025-03-26 00:01:02
简介DataGen连接器

DataGen连接器

DataGen连接器提供了一个Source实现,可用于为Flink管道生成输入数据。在本地开发或进行演示,且无法访问Kafka等外部系统时,该连接器非常有用。DataGen连接器是内置的,无需额外依赖项。

用法

DataGeneratorSource会并行生成N个数据点。该数据源会将序列拆分为与并行数据源子任务数量相同的并行子序列。它通过向用户提供的GeneratorFunction提供Long类型的 “索引” 值来驱动数据生成过程。

然后,GeneratorFunction用于将Long值的(子)序列映射为任意数据类型的生成事件。例如,以下代码将生成["Number: 0", "Number: 1", ... , "Number: 999"]记录序列。

GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;
long numberOfRecords = 1000;

DataGeneratorSource<String> source =
        new DataGeneratorSource<>(generatorFunction, numberOfRecords, Types.STRING);

DataStreamSource<String> stream =
        env.fromSource(source,
                WatermarkStrategy.noWatermarks(),
                "Generator Source");

元素的顺序取决于并行度。每个子序列将按顺序生成。因此,如果并行度限制为1,将按顺序生成从"Number: 0""Number: 999"的一个序列。

速率限制

DataGeneratorSource内置了速率限制支持。以下代码将生成一个String值流,其整体数据源速率(跨所有数据源子任务)不超过每秒100个事件。

GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index; 
DataGeneratorSource<String> source =
        new DataGeneratorSource<>(
                generatorFunction,
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(100),
                Types.STRING);

RateLimiterStrategy中可以找到其他速率限制策略,例如限制每个检查点发出的记录数。

有界性

此数据源始终是有界的。然而,从实际角度来看,将记录数设置为Long.MAX_VALUE会使其成为一个实际上无界的数据源(永远不会到达终点)。对于有限序列,用户可能需要考虑在BATCH执行模式下运行应用程序。

注意事项

  • 注意:DataGeneratorSource可用于实现具有至少一次和端到端精确一次处理保证的Flink作业,条件是GeneratorFunction的输出相对于其输入是确定性的,换句话说,提供相同的Long值总是会生成相同的输出。
  • 注意:也可以基于生成的事件和自定义的WatermarkStrategy在数据源处生成确定性水印。

参考链接

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。