2017-03-06 32 views
0

要创建一个自定义的处理器,我也跟着the documentation单元测试失败的定制处理器的“可选”的属性

我做的MyProcessor.java必要的代码修改和MyProcessorTest运行除了当我尝试使用一些“可选”性能优良。注意:我尝试了所有的builder方法,像required(false),addValidator()等等,这些可选的属性是徒劳的。其实,一个验证器没有意义的一个可选属性...

MyProcessor.java

@Tags({ "example" }) 
@CapabilityDescription("Provide a description") 
@SeeAlso({}) 
@ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") }) 
@WritesAttributes({ @WritesAttribute(attribute = "", description = "") }) 
@Stateful(description = "After a db-level LSN is processed, the same should be persisted as the last processed LSN", scopes = { Scope.CLUSTER }) 
public class MyProcessor extends AbstractProcessor { 
public static final Relationship REL_SUCCESS = new Relationship.Builder() 
.name("success") 
.description(
"Successfully created FlowFile from SQL query result set.") 
.build(); 
public static final Relationship REL_FAILURE = new Relationship.Builder() 
.name("failure").description("SQL query execution failed. ???") 
.build(); 


/* Start : Mandatory properties */ 
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() 
.name("Database Connection Pooling Service") 
.description(
"The Controller Service that is used to obtain connection to database") 
.required(true).identifiesControllerService(DBCPService.class) 
.build(); 


public static final PropertyDescriptor CONTAINER_DB = new PropertyDescriptor.Builder() 
.name("containerDB").displayName("Container Database") 
.description("The name of the container database").required(true) 
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); 
... 
...more mandatory properties 
... 

/* End : Mandatory properties */ 


/*Start : Optional properties */ 
public static final PropertyDescriptor CDC_TS_FROM = new PropertyDescriptor.Builder() 
.name("cdcTSFrom").displayName("Load CDC on or after") 
.description("The CDC on or after this datetime will be fetched.") 
.required(false).defaultValue(null).build(); 


public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() 
.name("schema").displayName("DB Schema") 
.description("The schema which contains the xxxxxx") 
.defaultValue(null).required(false).build(); 

/*End : Optional properties */ 


private List<PropertyDescriptor> descriptors; 
private Set<Relationship> relationships; 

@Override 
protected void init(final ProcessorInitializationContext context) { 
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); 
descriptors.add(CONTAINER_DB); 
descriptors.add(DBCP_SERVICE); 
... 
... 
... 
descriptors.add(CDC_TS_FROM); 
descriptors.add(SCHEMA); 
... 
... 
... 
this.descriptors = Collections.unmodifiableList(descriptors); 
final Set<Relationship> relationships = new HashSet<Relationship>(); 
relationships.add(REL_FAILURE); 
relationships.add(REL_SUCCESS); 
this.relationships = Collections.unmodifiableSet(relationships); 
} 
@Override 
public Set<Relationship> getRelationships() { 
return this.relationships; 
} 
@Override 
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { 
return descriptors; 
} 
// TODO : Check if the component lifecycle methods esp. onScheduled() and 
// onShutDown() are required 
@Override 
public void onTrigger(final ProcessContext context, 
final ProcessSession session) throws ProcessException { 
... 
... 
... 
} 

} 

MyProcessorTest.java

public class MyProcessorTest { 
private TestRunner testRunner; 
private final String CONTAINER_DB = "test"; 
private final String DBCP_SERVICE = "test_dbcp"; 
... 
... 
... 

private final String SCHEMA = "dbo"; 
private final String CDC_TS_FROM = ""; 
... 
... 
... 
@Before 
public void init() throws InitializationException { 
testRunner = TestRunners.newTestRunner(MyProcessor.class); 
final DBCPService dbcp = new DBCPServiceSQLServerImpl(...); 
final Map<String, String> dbcpProperties = new HashMap<>(); 
testRunner = TestRunners.newTestRunner(MyProcessor.class); 
testRunner.addControllerService(DBCP_SERVICE, dbcp, dbcpProperties); 
testRunner.enableControllerService(dbcp); 
testRunner.assertValid(dbcp); 
testRunner.setProperty(MyProcessor.DBCP_SERVICE, DBCP_SERVICE); 
testRunner.setProperty(MyProcessor.CONTAINER_DB, CONTAINER_DB); 
... 
... 
... 

testRunner.setProperty(MyProcessor.CDC_TS_FROM, CDC_TS_FROM); 
testRunner.setProperty(MyProcessor.SCHEMA, SCHEMA); 
... 
... 
... 
} 
@Test 
public void testProcessor() { 
testRunner.run(); 
} 
/** 
* Simple implementation only for MyProcessor processor testing. 
*/ 
private class DBCPServiceSQLServerImpl extends AbstractControllerService 
implements DBCPService { 
private static final String SQL_SERVER_CONNECT_URL = "jdbc:sqlserver://%s;database=%s"; 
private String containerDB; 
private String password; 
private String userName; 
private String dbHost; 
public DBCPServiceSQLServerImpl(String containerDB, String password, 
String userName, String dbHost) { 
super(); 
this.containerDB = containerDB; 
this.password = password; 
this.userName = userName; 
this.dbHost = dbHost; 
} 
@Override 
public String getIdentifier() { 
return DBCP_SERVICE; 
} 
@Override 
public Connection getConnection() throws ProcessException { 
try { 
Connection connection = DriverManager.getConnection(String 
.format(SQL_SERVER_CONNECT_URL, dbHost, containerDB), 
userName, password); 
return connection; 
} catch (final Exception e) { 
throw new ProcessException("getConnection failed: " + e); 
} 
} 
} 
} 

现在,如果我评论了可选在测试类属性:

//testRunner.setProperty(MyProcessor.CDC_TS_FROM, CDC_TS_FROM); 

//testRunner.setProperty(MyProcessor.SCHEMA, SCHEMA); 

,测试正常完成,但如果我使任何或所有可选属性,说,CDC_TS_FROM,然后我测试的情况下断言失败,不管我把什么价值CDC_TS_FROM:

java.lang.AssertionError: Processor has 1 validation failures: 
'cdcTSFrom' validated against '' is invalid because 'cdcTSFrom' is not a supported property 
at org.junit.Assert.fail(Assert.java:88) 
at org.apache.nifi.util.MockProcessContext.assertValid(MockProcessContext.java:251) 
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:161) 
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:152) 
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:147) 
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:142) 
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:137) 
at processors.NiFiCDCPoC.sqlserver.MyProcessorTest.testProcessor(MyProcessorTest.java:74) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) 
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) 
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) 
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) 

编辑-1 :(?)

我补充两个验证:

public static final PropertyDescriptor CDC_TS_FROM = new PropertyDescriptor.Builder() 
      .name("cdcTSFrom").displayName("Load CDC on or after") 
      .description("The CDC on or after this datetime will be fetched.") 
      .required(false).defaultValue(null).addValidator(Validator.VALID) 
      .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build(); 

错误:

java.lang.AssertionError: Processor has 1 validation failures: 
'cdcTSFrom' validated against '2017-03-06 10:00:00' is invalid because Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days 

回答

4

所有属性描述(必需或可选)必须有一个验证明确设置,否则将返回你所看到的错误。看样子你是不是想找来执行验证,但你还是必须设置一个验证,所以在你的可选属性添加以下建设者:

.addValidator(Validator.VALID) 

编辑(见下面的注释):打标PropertyDescriptor的作为required(false)允许它是一个可选属性,因此可以没有指定的值。如果用户输入一个值,并且想要根据某些规则验证该值,则可以添加该特定的验证器(或者编写自己的验证器并添加该验证器)。一段时间(2秒,例如),以及用于其他情况下,有一组内置的验证的,例如允许在2和20秒之间仅值:

.addValidator(StandardValidators.createTimePeriodValidator(
     2, TimeUnit.SECONDS, 20, TimeUnit.SECONDS 
    )) 
+0

如果属性是可选的(它可能为空或可能是一个时间戳/字符串),我该如何指定一个验证器? –

+0

我的答案是,添加VALID作为验证器,将标记任何值(或缺少)作为有效。 – mattyb

+0

是啊,我得到了,但如果用户指定一个时间戳/周期,这个“有效”验证是不够的,我怎么处理这种情况? –