ApacheBeam从Kafka进行流式处理,在固定时间窗口内将数据写入文件时不起作用。
创始人
2024-09-05 11:02:21
0

使用以下 Beam 代码来从 Kafka 读取数据并将其写入到文件中,每 10 秒钟创建一个新文件:

import apache_beam as beam
from apache_beam import window
from apache_beam.transforms.trigger import AfterProcessingTime

# Kafka configuration
KAFKA_BOOTSTRAP_SERVERS = ''
KAFKA_TOPIC = ''

# Beam pipeline configuration
OUTPUT_DIR = ''

# Pipeline
def run():
    with beam.Pipeline() as p:
        records = (
            p
            | 'ReadFromKafka' >> beam.io.ReadFromKafka(
                consumer_config={'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS},
                topics=[KAFKA_TOPIC],
            )
            | 'Format' >> beam.Map(lambda r: str(r.value, 'utf-8'))
            | 'Window' >> beam.WindowInto(window.FixedWindows(10))
            | 'WriteToFiles' >> beam.io.WriteToText(
                OUTPUT_DIR,
                file_name_suffix='.txt'
            )
        )

if __name__ == '__main__':
    run()

这段代码定义了一个 Beam 管道,它包含从 Kafka 读取记录、转换记录、应用固定窗口和将数据写入文本文件的步骤。这里的窗口大小为10秒钟,并且每10秒钟都会创建一个新文件。

相关内容

热门资讯

避免在粘贴双引号时向VS 20... 在粘贴双引号时向VS 2022添加反斜杠的问题通常是由于编辑器的自动转义功能引起的。为了避免这个问题...
安装apache-beam==... 出现此错误可能是因为用户的Python版本太低,而apache-beam==2.34.0需要更高的P...
安装了Anaconda之后找不... 在安装Anaconda后,如果找不到Jupyter Notebook,可以尝试以下解决方法:检查环境...
Android Recycle... 要在Android RecyclerView中实现滑动卡片效果,可以按照以下步骤进行操作:首先,在项...
omi系统和安卓系统哪个好,揭... OMI系统和安卓系统哪个好?这个问题就像是在问“苹果和橘子哪个更甜”,每个人都有自己的答案。今天,我...
本地化字符串和默认值 本地化字符串是指将应用程序中的文本内容根据不同的语言和地区进行翻译和适配的过程。当应用程序需要显示不...
原生ios和安卓系统,原生对比... 亲爱的读者们,你是否曾好奇过,为什么你的iPhone和安卓手机在操作体验上有着天壤之别?今天,就让我...
Android - NDK 预... 在Android NDK的构建过程中,LOCAL_SRC_FILES只能包含一个项目。如果需要在ND...
windows安装系统退不出来... Windows安装系统退不出来的解决方法详解在电脑使用过程中,有时会遇到在安装Windows系统时无...
不匹配以value="... 解决方法一:使用正则表达式匹配可以使用正则表达式来匹配不以value="开头的字符串。示例如下:im...