示例代码:
nifi.flowfile.processor.PulsarPublish -> PulsarConsumer -> DeltaLakeJSONWriter
配置PulsarPublish处理器:
pulsar://127.0.0.1:6650 //Pulsar服务的地址和端口
my-topic //要发送和接收的Pulsar主题
persistent://public/default/my-topic //Pulsar主题的完整名称
{"id":"${uuid}","name":"${name}","age":${age}} //要发送的消息的JSON格式
配置PulsarConsumer处理器:
pulsar://127.0.0.1:6650 //Pulsar服务的地址和端口
my-topic //要发送和接收的Pulsar主题
persistent://public/default/my-topic //Pulsar主题的完整名称
配置DeltaLakeJSONWriter处理器:
/home/user/data-lake //Delta Lake的文件路径
delta_lake_table //要写入的Delta Lake表格名称
$ ./bin/nifi.sh start //在终端中启动NiFi