编写一个单一的NiFi标准处理器需要以下步骤:
AbstractProcessor
类。import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Tags({"custom", "example"})
@CapabilityDescription("A simple custom NiFi processor example.")
@InputRequirement(Requirement.INPUT_REQUIRED)
public class CustomProcessor extends AbstractProcessor {
public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
.Builder().name("My Property")
.description("An example property")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private List properties;
@Override
protected void init(final ProcessorInitializationContext context) {
final List properties = new ArrayList<>();
properties.add(MY_PROPERTY);
this.properties = Collections.unmodifiableList(properties);
}
@Override
public Set getRelationships() {
return new HashSet<>();
}
@Override
public final List getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
// 在此处编写自定义处理逻辑
session.transfer(flowFile, REL_SUCCESS);
}
}
创建一个NiFi的自定义NAR包,将上面的Java类打包到NAR包中,并在NAR包的META-INF/bundled-dependencies文件夹中添加所需的依赖。
将NAR包放置在NiFi的lib目录下,并重新启动NiFi。这将使NiFi能够加载并识别自定义处理器。
在NiFi的用户界面中,您应该能够找到自定义处理器并将其添加到流程中。在添加和配置处理器后,它将开始处理输入流。
这是一个简单的示例,演示了如何编写一个单一的NiFi标准处理器。您可以根据自己的需求对其进行扩展,并在onTrigger
方法中编写自定义的处理逻辑。