2016-02-07 33 views
5

我想缓存一些数据在风暴螺栓,但不知道如果这是正确的方式来做到这一点。在下面的类中,员工ID和工作人员名称被缓存到哈希映射。为此,对Employee表进行了一次数据库调用,以选择所有员工,并在准备方法中填充哈希映射(这是初始化映射的正确位置?)。 (在运行风暴拓扑时),拓扑正在进行多个数据库连接并且多次初始化映射。当然,我想避免这种情况,这就是为什么我要缓存结果,以便它不会每次都进入数据库。请帮忙?在风暴螺栓缓存

public class TestBolt extends BaseRichBolt { 
    private static final long serialVersionUID = 2946379346389650348L; 
    private OutputCollector collector; 
    private Map<String, String> employeeIdToNameMap; 
    private static final Logger LOG = Logger.getLogger(TestBolt.class); 

    @Override 
    public void execute(Tuple tuple) {  
     String employeeId = tuple.getStringByField("employeeId"); 
     String employeeName = employeeIdToNameMap.get(employeeId); 

     collector.emit(tuple, new Values(employeeId, employeeName)); 
     collector.ack(tuple); 
    } 

    @Override 
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     // TODO Auto-generated method stub 
     this.collector = collector; 
     try { 
      employeeIdToNameMap = createEmployeIdToNameMap(); 
     } catch (SQLException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields(/*some fields*/)); 

    } 

    private Map<String, String> createEmployeIdToNameMap() throws SQLException { 
     final Map<String, String> employeeIdToNameMap = new HashMap<>(); 
     final DatabaseManager dbm = new PostgresManager(); 
     final String query = "select id, name from employee;"; 
     final Connection conn = dbm.createDefaultConnection(); 
     final ResultSet result = dbm.executeSelectQuery(conn, query); 
     while(result.next()) { 
      String employeId = result.getString("id"); 
      String name = result.getString("name"); 
      employeeIdToNameMap.put(employeId, name); 
     } 
     conn.close(); 
     return employeeIdToNameMap; 
    }  
} 

SOLUTION 我创建同步映射及其对我

private static Map<String, String> employeeIdToNameMap = Collections 
      .synchronizedMap(new HashMap<String, String>()); 
+0

代码看起来不错。您在prepare()中初始化HashMap的方法绝对正确。关于到数据库的多重连接:我假设在拓扑中有多个螺栓实例(即parallelim_hint> 1)。因此,每个实例都会打开它自己的连接,这是合理的。 –

+0

我在此螺栓旁有多个其他螺栓,其平行度> 1,但对于此螺栓,其平行度为1 – big

回答

1

工作正常,既然你有多个螺栓的任务,你可以标记employeeIdToNameMap静态震荡。初始化地图准备像这样 -

try { 
synchronized(TestBolt.class) { 
    if (null == employeeIdToNameMap) { 
    employeeIdToNameMap = createEmployeIdToNameMap(); 
    } 
    } 
} catch (SQLException e) { 
... 
}