如果您使用的是Apache NiFi 1.16.0及以上版本,并且在提交重试参数时遇到问题,可以尝试使用以下代码示例进行解决:
def retryDelayMillis = context.getProperty(RETRY_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)
def retryMaxAttempts = context.getProperty(RETRY_MAX_ATTEMPTS).evaluateAttributeExpressions(flowFile).asInteger()
flowFile = retryWithDelayAndMaxAttempts({
flowFile: flowFile,
throwable: exception,
processor: Processor,
context: context},
retryDelayMillis as Long,
retryMaxAttempts as Integer
)
def retryWithDelayAndMaxAttempts(paramsObject, delayMillis, maxAttempts) {
for (int i = 0; i < maxAttempts; i++) {
try {
return Processor.process(paramsObject.flowFile, paramsObject.context)
} catch (throwable) {
if (paramsObject.throwable) {
throwable << paramsObject.throwable
}
if (i == maxAttempts - 1) {
throw throwable
}
Thread.sleep(delayMillis)
}
}
}
以上方法可以确保重试功能正常工作,并减少提交重试参数时遇到的问题。