我的任务很简单 - 我想使用Apache NiFi在HBase(计数器)中增加列值。使用Apache NiFi的Hbase计数器
我有一个作为rowkey accountid,我想基于一个流值incr/decr平衡列。用NiFi做什么是最好的方法。
例如帐户A的余额= 100的起始值。我将(A,-20)作为事件。什么是最好的开箱即用处理器来完成这项工作(余额= 80)。似乎所有这些都将取代价值。我也开放改变我的模式...
我试着编写groovy脚本,但在nifi中得到这个错误。如果我的基本结构是错误的,那只是一个简单的问题。
2017年3月10日06:38:54067 ERROR [定时器驱动进程线程-6] oanifi.processors.script.ExecuteScript ExecuteScript [ID = b5a0e7b7-015a-1000-ab9c-0696c8297e8d] ExecuteScript [ID = b5a0e7b7-015a-1000-ab9c-0696c8297e8d]由于java.lang.NoClassDefFoundError而无法处理:org/apache/hadoop/conf/Configuration;回滚会话:
import org.apache.nifi.controller.ControllerService
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.ResultScanner
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.client.Table
import org.apache.hadoop.hbase.util.Bytes
def lookup = context.controllerServiceLookup
def HbaseServiceName =HBaseClient.value
def HBaseServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == HBaseServiceName
}
def conn = lookup.getControllerService(HBaseServiceId)?.getConnection()
try {
flowFile = session.create()
def table = conn.getTable(TableName.valueOf("crap"))
myfile = flowFile.getAttribute("filename")
def p = new Put(Bytes.toBytes("crap"));
p.add(Bytes.toBytes("crap"), Bytes.toBytes("cf1"),Bytes.toBytes("SomeValue"))
table.put(p);
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Scripting error', e)
session.transfer(flowFile, REL_FAILURE)
}
conn?.close()
同意。 GetHbase问题(获取当前值)并添加流式传输值,最后做PutHBase(更新) - 是2次操作还是数据回到客户端(NiFi)。我喜欢增量范例,因为它速度更快,我不太在意读取当前值(我只是想增加它)。吞吐量对我来说是一个大问题。我在每个NiFi机器上运行phoenix服务器的想法并且从NiFi调用phoenix-jdbc客户机。 –
我没有太多的HBase经验。你可以使用'ExecuteScript'处理器在Groovy中做到这一点,将HBase库包含在相关的模块文件夹中?这里有一个使用HBase'Increment'对象的[示例](https://github.com/larsgeorge/hbase-book/blob/master/ch04/src/main/java/client/IncrementMultipleExample.java) - 你应该能够[从脚本引用HBase控制器服务](https://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html)。 – Andy
感谢安迪这似乎符合我的要求(将尝试),除了涉及做比我们所希望的更多的编码:)。完成后将发布解决方案。 –