2017-10-18 74 views
4

我移动从旧的Hadoop集群的一些软件映射内(使用用户名/密码认证)到较新的一个,具有Kerberos身份验证 2.6.0-cdh5.12.0启用。连接到Accumulo使用Kerberos

我已经能够使用AccumuloInput/OutputFormat类中设置的DelegationToken来使许多使用Accumulo的输入和/或输出的Map/Reduce作业正常工作。

但是,我有1个工作,它使用AccumuloInput/OutputFormat进行输入和输出,而且在其Mapper.setup()方法中,它通过Zookeeper连接到Accumulo,因此在Mapper.map()方法中,它可以将Mapper.map()中正在处理的每个键/值与另一个Accumulo表中的条目进行比较。

我在下面列出了相关代码,其中显示了连接到Zookeeper用户的PasswordToken的setup()方法,然后创建了随后用于映射器方法的Accumulo表扫描器。

所以问题是我该如何替换使用PasswordToken和KerberosToken来设置Mapper.setup()方法中的Accumulo扫描器?我无法找到我设置的AccumuloInput/OutputFormat类使用的DelegationToken。

我试过context.getCredentials()。getAllTokens()并寻找一个类型为org.apache.accumulo.code.client.security.tokens.AuthenticationToken的标记 - 这里返回的所有标记都是org类型的.apache.hadoop.security.token.Token。

请注意,由于代码在未连接到互联网的网络上运行,因此我输入了代码段与切割/粘贴 - 也可能存在拼写错误。 :)

//**************************** 
// code in the M/R driver 
//**************************** 
ClientConfiguration accumuloCfg = ClientConfiguration.loadDefault().withInstance("Accumulo1").withZkHosts("zookeeper1"); 
ZooKeeperInstance inst = new ZooKeeperInstance(accumuloCfg); 
AuthenticationToken dt = conn.securityOperations().getDelegationToken(new DelagationTokenConfig()); 
AccumuloInputFormat.setConnectorInfo(job, username, dt); 
AccumuloOutputFormat.setConnectorInfo(job, username, dt); 
// other job setup and then 
job.waitForCompletion(true) 



//**************************** 
// this is inside the Mapper class of the M/R job 
//**************************** 
private Scanner index_scanner; 

public void setup(Context context) { 
    Configuration cfg = context.getConfiguration(); 

    // properties set and passed from M/R Driver program 
    String username = cfg.get("UserName"); 
    String password = cfg.get("Password"); 
    String accumuloInstName = cfg.get("InstanceName"); 
    String zookeepers = cfg.get("Zookeepers"); 
    String tableName = cfg.get("TableName"); 
    Instance inst = new ZooKeeperInstance(accumuloInstName, zookeepers); 
    try { 
     AuthenticationToken passwordToken = new PasswordToken(password); 

     Connector conn = inst.getConnector(username, passwordToken); 

     index_scanner = conn.createScanner(tableName, conn.securityOperations().getUserAuthorizations(username)); 
    } catch(Exception e) { 
     e.printStackTrace(); 
    } 
} 

public void map(Key key, Value value, Context context) throws IOException, InterruptedException { 
    String uuid = key.getRow().toString(); 
    index_scanner.clearColumns(); 
    index_scanner.setRange(Range.exact(uuid)); 
    for(Entry<Key, Value> entry : index_scanner) { 
     // do some processing in here 
    } 
} 

回答

6

所提供AccumuloInputFormatAccumuloOutputFormat必须设置令牌与Accumulo*putFormat.setConnectorInfo(job, principle, token)作业配置的方法。您还可以使用AuthenticationTokenSerializer在HDFS中的文件中序列化令牌,并使用接受文件名的setConnectorInfo方法的版本。

如果传入一个KerberosToken,作业将创建一个DelegationToken以供使用,并且如果传入一个DelegationToken,它将使用它。

提供的AccumuloInputFormat应该处理自己的扫描仪,所以通常情况下,如果您已正确设置配置,则不必在Mapper中执行此操作。但是,如果您在Mapper内进行辅助扫描(如加入类似内容),则可以查看提供的AccumuloInputFormat的RecordReader源代码,以获取如何检索配置和构建扫描器的示例。

+1

另请参阅http://accumulo.apache.org/1.8/accumulo_user_manual.html#_delegationtokens_with_mapreduce,其中涵盖了与Christopher相同的信息。 – elserj

+0

这工作完美。谢谢。 –

+0

@GlenWarholic - 你说它有效,但你没有把答案标记为接受?为什么不? –