要打包Apache Nifi的自定义处理器和自定义控制器服务,您可以按照以下步骤进行操作:
mvn archetype:generate -DgroupId=com.example.nifi -DartifactId=nifi-custom -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
pom.xml
文件中,添加以下依赖项以引入Apache Nifi的相关库:
org.apache.nifi
nifi-api
1.13.2
org.apache.nifi
nifi-utils
1.13.2
test
src/main/java/com/example/nifi
目录下,创建您的自定义处理器和控制器服务的Java类文件。例如,您可以创建一个名为CustomProcessor.java
的自定义处理器类和一个名为CustomControllerService.java
的自定义控制器服务类。// CustomProcessor.java
package com.example.nifi;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Tags({"example"})
@CapabilityDescription("Example custom processor.")
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
public class CustomProcessor extends AbstractProcessor {
public static final PropertyDescriptor EXAMPLE_PROPERTY = new PropertyDescriptor
.Builder().name("Example Property")
.description("Example property description")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("Success relationship")
.build();
private List properties;
private Set relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List properties = new ArrayList<>();
properties.add(EXAMPLE_PROPERTY);
this.properties = Collections.unmodifiableList(properties);
final Set relationships = new HashSet<>();
relationships.add(SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set getRelationships() {
return relationships;
}
@Override
public final List getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// TODO: Implement your custom processor logic here
}
}
// CustomControllerService.java
package com.example.nifi;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.Collections;
import java.util.List;
@Tags({"example"})
@CapabilityDescription("Example custom controller service.")
public class CustomControllerService extends AbstractControllerService {
public static final PropertyDescriptor EXAMPLE_PROPERTY = new PropertyDescriptor
.Builder().name("Example Property")
.description("Example property description")
.required(true)
.add