要创建一个自定义的处理器,我也跟着the documentation。单元测试失败的定制处理器的“可选”的属性
我做的MyProcessor.java必要的代码修改和MyProcessorTest运行除了当我尝试使用一些“可选”性能优良。注意:我尝试了所有的builder方法,像required(false),addValidator()等等,这些可选的属性是徒劳的。其实,一个验证器没有意义的一个可选属性...
MyProcessor.java
@Tags({ "example" })
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") })
@WritesAttributes({ @WritesAttribute(attribute = "", description = "") })
@Stateful(description = "After a db-level LSN is processed, the same should be persisted as the last processed LSN", scopes = { Scope.CLUSTER })
public class MyProcessor extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description(
"Successfully created FlowFile from SQL query result set.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure").description("SQL query execution failed. ???")
.build();
/* Start : Mandatory properties */
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("Database Connection Pooling Service")
.description(
"The Controller Service that is used to obtain connection to database")
.required(true).identifiesControllerService(DBCPService.class)
.build();
public static final PropertyDescriptor CONTAINER_DB = new PropertyDescriptor.Builder()
.name("containerDB").displayName("Container Database")
.description("The name of the container database").required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
...
...more mandatory properties
...
/* End : Mandatory properties */
/*Start : Optional properties */
public static final PropertyDescriptor CDC_TS_FROM = new PropertyDescriptor.Builder()
.name("cdcTSFrom").displayName("Load CDC on or after")
.description("The CDC on or after this datetime will be fetched.")
.required(false).defaultValue(null).build();
public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
.name("schema").displayName("DB Schema")
.description("The schema which contains the xxxxxx")
.defaultValue(null).required(false).build();
/*End : Optional properties */
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(CONTAINER_DB);
descriptors.add(DBCP_SERVICE);
...
...
...
descriptors.add(CDC_TS_FROM);
descriptors.add(SCHEMA);
...
...
...
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_FAILURE);
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
// TODO : Check if the component lifecycle methods esp. onScheduled() and
// onShutDown() are required
@Override
public void onTrigger(final ProcessContext context,
final ProcessSession session) throws ProcessException {
...
...
...
}
}
MyProcessorTest.java
public class MyProcessorTest {
private TestRunner testRunner;
private final String CONTAINER_DB = "test";
private final String DBCP_SERVICE = "test_dbcp";
...
...
...
private final String SCHEMA = "dbo";
private final String CDC_TS_FROM = "";
...
...
...
@Before
public void init() throws InitializationException {
testRunner = TestRunners.newTestRunner(MyProcessor.class);
final DBCPService dbcp = new DBCPServiceSQLServerImpl(...);
final Map<String, String> dbcpProperties = new HashMap<>();
testRunner = TestRunners.newTestRunner(MyProcessor.class);
testRunner.addControllerService(DBCP_SERVICE, dbcp, dbcpProperties);
testRunner.enableControllerService(dbcp);
testRunner.assertValid(dbcp);
testRunner.setProperty(MyProcessor.DBCP_SERVICE, DBCP_SERVICE);
testRunner.setProperty(MyProcessor.CONTAINER_DB, CONTAINER_DB);
...
...
...
testRunner.setProperty(MyProcessor.CDC_TS_FROM, CDC_TS_FROM);
testRunner.setProperty(MyProcessor.SCHEMA, SCHEMA);
...
...
...
}
@Test
public void testProcessor() {
testRunner.run();
}
/**
* Simple implementation only for MyProcessor processor testing.
*/
private class DBCPServiceSQLServerImpl extends AbstractControllerService
implements DBCPService {
private static final String SQL_SERVER_CONNECT_URL = "jdbc:sqlserver://%s;database=%s";
private String containerDB;
private String password;
private String userName;
private String dbHost;
public DBCPServiceSQLServerImpl(String containerDB, String password,
String userName, String dbHost) {
super();
this.containerDB = containerDB;
this.password = password;
this.userName = userName;
this.dbHost = dbHost;
}
@Override
public String getIdentifier() {
return DBCP_SERVICE;
}
@Override
public Connection getConnection() throws ProcessException {
try {
Connection connection = DriverManager.getConnection(String
.format(SQL_SERVER_CONNECT_URL, dbHost, containerDB),
userName, password);
return connection;
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
}
}
}
}
现在,如果我评论了可选在测试类属性:
//testRunner.setProperty(MyProcessor.CDC_TS_FROM, CDC_TS_FROM);
//testRunner.setProperty(MyProcessor.SCHEMA, SCHEMA);
,测试正常完成,但如果我使任何或所有可选属性,说,CDC_TS_FROM,然后我测试的情况下断言失败,不管我把什么价值CDC_TS_FROM:
java.lang.AssertionError: Processor has 1 validation failures:
'cdcTSFrom' validated against '' is invalid because 'cdcTSFrom' is not a supported property
at org.junit.Assert.fail(Assert.java:88)
at org.apache.nifi.util.MockProcessContext.assertValid(MockProcessContext.java:251)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:161)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:152)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:147)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:142)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:137)
at processors.NiFiCDCPoC.sqlserver.MyProcessorTest.testProcessor(MyProcessorTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
编辑-1 :(?)
我补充两个验证:
public static final PropertyDescriptor CDC_TS_FROM = new PropertyDescriptor.Builder()
.name("cdcTSFrom").displayName("Load CDC on or after")
.description("The CDC on or after this datetime will be fetched.")
.required(false).defaultValue(null).addValidator(Validator.VALID)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
错误:
java.lang.AssertionError: Processor has 1 validation failures:
'cdcTSFrom' validated against '2017-03-06 10:00:00' is invalid because Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days
如果属性是可选的(它可能为空或可能是一个时间戳/字符串),我该如何指定一个验证器? –
我的答案是,添加VALID作为验证器,将标记任何值(或缺少)作为有效。 – mattyb
是啊,我得到了,但如果用户指定一个时间戳/周期,这个“有效”验证是不够的,我怎么处理这种情况? –