2017-03-07 165 views
3

我的任务很简单 - 我想使用Apache NiFi在HBase(计数器)中增加列值。使用Apache NiFi的Hb​​ase计数器

我有一个作为rowkey accountid,我想基于一个流值incr/decr平衡列。用NiFi做什么是最好的方法。

例如帐户A的余额= 100的起始值。我将(A,-20)作为事件。什么是最好的开箱即用处理器来完成这项工作(余额= 80)。似乎所有这些都将取代价值。我也开放改变我的模式...

我试着编写groovy脚本,但在nifi中得到这个错误。如果我的基本结构是错误的,那只是一个简单的问题。

2017年3月10日06:38:54067 ERROR [定时器驱动进程线程-6] oanifi.processors.script.ExecuteScript ExecuteScript [ID = b5a0e7b7-015a-1000-ab9c-0696c8297e8d] ExecuteScript [ID = b5a0e7b7-015a-1000-ab9c-0696c8297e8d]由于java.lang.NoClassDefFoundError而无法处理:org/apache/hadoop/conf/Configuration;回滚会话:

import org.apache.nifi.controller.ControllerService 
import org.apache.hadoop.hbase.HBaseConfiguration 
import org.apache.hadoop.hbase.TableName 
import org.apache.hadoop.hbase.client.Connection 
import org.apache.hadoop.hbase.client.ConnectionFactory 
import org.apache.hadoop.hbase.client.Get 
import org.apache.hadoop.hbase.client.Put 
import org.apache.hadoop.hbase.client.Result 
import org.apache.hadoop.hbase.client.ResultScanner 
import org.apache.hadoop.hbase.client.Scan 
import org.apache.hadoop.hbase.client.Table 
import org.apache.hadoop.hbase.util.Bytes 

def lookup = context.controllerServiceLookup 
def HbaseServiceName =HBaseClient.value 
def HBaseServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { 
    cs -> lookup.getControllerServiceName(cs) == HBaseServiceName 
} 
def conn = lookup.getControllerService(HBaseServiceId)?.getConnection() 
try { 
    flowFile = session.create() 
    def table = conn.getTable(TableName.valueOf("crap")) 
    myfile = flowFile.getAttribute("filename") 
    def p = new Put(Bytes.toBytes("crap")); 
    p.add(Bytes.toBytes("crap"), Bytes.toBytes("cf1"),Bytes.toBytes("SomeValue")) 
    table.put(p); 
    session.transfer(flowFile, REL_SUCCESS) 
} catch(e) { 
    log.error('Scripting error', e) 
    session.transfer(flowFile, REL_FAILURE) 
} 
conn?.close() 

回答

0

你是正确PutHBaseCellPutHBaseJSON把flowfile内容到相应的HBase的目的地。你可能想要做的是使用GetHBase来检索初始值,使用计数器(教程请参阅here)保留一个运行计数器,然后用正确的值更新HBase单元。您还可以使用DistributedMapCache系统在共享内存空间中获取/计算/存储值。

+0

同意。 GetHbase问题(获取当前值)并添加流式传输值,最后做PutHBase(更新) - 是2次操作还是数据回到客户端(NiFi)。我喜欢增量范例,因为它速度更快,我不太在意读取当前值(我只是想增加它)。吞吐量对我来说是一个大问题。我在每个NiFi机器上运行phoenix服务器的想法并且从NiFi调用phoenix-jdbc客户机。 –

+0

我没有太多的HBase经验。你可以使用'ExecuteScript'处理器在Groovy中做到这一点,将HBase库包含在相关的模块文件夹中?这里有一个使用HBase'Increment'对象的[示例](https://github.com/larsgeorge/hbase-book/blob/master/ch04/src/main/java/client/IncrementMultipleExample.java) - 你应该能够[从脚本引用HBase控制器服务](https://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html)。 – Andy

+0

感谢安迪这似乎符合我的要求(将尝试),除了涉及做比我们所希望的更多的编码:)。完成后将发布解决方案。 –