2013-01-22 46 views
2

我试图做多线程在这里,现在我已经使用DbHandler类来更新我的数据库更新dabtabase如何使用循环和获得最后一个序列值

程序执行,在控制器类开始,这有一个主要方法和线程池:

public class RunnableController { 
// Main method 
public static void main(String[] args) throws InterruptedException { 
    try { 
     RunnableController controller = new RunnableController(); 
     controller.initializeDb(); 
     controller.initialiseThreads(); 
     System.out.println("Polling"); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

    private void initialUpdate() 
{ 
    DBhandler dbhandler = new DBhandler(); 
    dbhandler.updateDb(getOutgoingQueue()); 
} 

private void initialiseThreads() {  
    try { 
     threadExecutorRead = Executors.newFixedThreadPool(10); 
     PollingSynchronizer read = new PollingSynchronizer(incomingQueue, dbConncetion); 
     threadExecutorRead.submit(read); 
    } catch (Exception e){ 
     e.printStackTrace(); 
    } 
} 
} 

我的轮询类获取新的数据和应该做的simulateously更新:

public class PollingSynchronizer implements Runnable { 
    public PollingSynchronizer(Collection<KamMessage> incomingQueue, 
    Connection dbConnection) { 
    super(); 
    this.incomingQueue = incomingQueue; 
    this.dbConnection = dbConnection; 
} 

    private int seqId; 

    public int getSeqId() { 
    return seqId; 
    } 

    public void setSeqId(int seqId) { 
    this.seqId = seqId; 
} 

    // The method which runs Polling action and record the time at which it is done 
    public void run() { 
    int seqId = 0; 

     while (true) { 
     List<KamMessage> list = null; 

     try { 
      list = fullPoll(seqId); 

      if (!list.isEmpty()) { 
      seqId = list.get(0).getSequence(); 
      incomingQueue.addAll(list); 
      this.outgoingQueue = incomingQueue; 
      System.out.println("waiting 3 seconds"); 
      System.out.println("new incoming message"); 
      Thread.sleep(3000);//at this wait I should execute run() 

      //when I debug my execution stops here and throws " Class not found Exception " 
      // its does not enters the message processor class 
      MessageProcessor processor = new MessageProcessor() { 
       //the run method which should fetch the message processor class. 
       final public void run() { 
       RunnableController.setOutgoingQueue(generate(outgoingQueue)); 
       } 
      }; 
      new Thread(processor).start(); 
     } 
    } catch (Exception e1) { 
     e1.printStackTrace(); 
    } 
    } 
} 
} 

我的消息处理器类:

public class MessageProcessor implements Runnable { 
private Collection<KpiMessage> fetchedMessages; 
private Connection dbConnection; 
Statement st = null; 
ResultSet rs = null; 
PreparedStatement pstmt = null; 
private Collection<KamMessage> outgoingQueue; 

public Collection<KamMessage> MessageProcessor(Collection<KamMessage> outgoingQueue){ 
    this.outgoingQueue = outgoingQueue; 
    this.dbConnection = dbConnection; 
    return outgoingQueue; 
} 
/** 
* Method for updating new values into database in preference for dummy processing of message 
* @param outgoingQueue 
* @return 
*/ 
@SuppressWarnings("javadoc") 
public Collection<KamMessage> generate(Collection<KamMessage> outgoingQueue) 
{ 
     for (KamMessage pojoClass : outgoingQueue) { 
      KamMessage updatedValue = createKamMsg804(pojoClass); 
      System.out.print(" " + pojoClass.getSequence()); 
      System.out.print(" " + pojoClass.getTableName()); 
      System.out.print(" " + pojoClass.getAction()); 
      System.out.print(" " + updatedValue.getKeyInfo1()); 
      System.out.print(" " + updatedValue.getKeyInfo2()); 
      System.out.println(" " + pojoClass.getEntryTime()); 
     } 
     return outgoingQueue; 
} 

/** 
* 
* @param pojoClass 
* @return msg 
*/ 
public KamMessage createKamMsg804(KamMessage pojoClass) 
{ 
    if(pojoClass.getAction() == 804){ 
    pojoClass.setKeyInfo1("ENTITYKEY9"); 
    pojoClass.setKeyInfo2("STATUSKEY9"); 
    } 
    return pojoClass; 
} 
private KamMessage convertRecordsetToPojo(ResultSet rs) throws SQLException { 

    KamMessage msg = new KamMessage(); 
    int sequence = rs.getInt("SEQ"); 
    msg.setSequence(sequence); 
    String tablename = rs.getString("TABLENAME"); 
    msg.setTableName(tablename); 
    Timestamp entrytime = rs.getTimestamp("ENTRYTIME"); 
    Date entryTime = new Date(entrytime.getTime()); 
    msg.setEntryTime(entryTime); 
    Timestamp processingtime=rs.getTimestamp("PROCESSINGTIME"); 
    if(processingtime!=null){ 
     Date processingTime = new Date(processingtime.getTime()); 
     msg.setProcessingTime(processingTime); 
    } 
    String keyInfo1 = rs.getString("KEYINFO1"); 
    msg.setKeyInfo1(keyInfo1); 
    String keyInfo2 = rs.getString("KEYINFO2"); 
    msg.setKeyInfo2(keyInfo2); 
    return msg; 
} 


@Override 
public void run() { 

    // TODO Auto-generated method stub 

} 

    } 

这是我DBhandler类,应在数据库中做更新

  public class DBhandler { 
    Connection conn = null; 
Statement st = null; 
ResultSet rs = null; 
PreparedStatement pstmt = null; 

public DBhandler(){ 
    super(); 
} 

/** 
* Method to initialize the database connection 
* @return conn 
* @throws Exception 
* 
*/ 
public Connection initializeDB() throws Exception { 
    System.out.println("JDBC connection"); 
    DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver()); 
    conn = DriverManager.getConnection("jdbc:oracle:thin:@VM-SALES- 
      MB:1521:SALESDB1","bdeuser", "edb"); // Connection for Database SALES-DB1 
    return conn; 
} 

//The method for updating Database 

    public void updateDb(Collection<KpiMessage> updatedQueue){ 
    for(KpiMessage pojoClass : updatedQueue){ 
     //**How the query should be used so that it gets last sequence vale and Updates into 
      Database** 
      String query = "UPDATE msg_new_to_bde Set KEYINFO1= ?, KEYINFO2 = ? WHERE SEQ = and 
      action = 804"; 
     } 
} 
/** 
* Method for Closing the connection 
* @throws Exception 
* 
*/ 

    public void closeDB() throws Exception { 
    st.close(); 
    conn.close(); 
    } 

    } 

我只需要通过调用updatedQueue在控制类在这个类(DbHAndler)使用更新查询来更新数据库。

我的程序流程 - 我有三类:1.Controller 2.PollerSynchro 3.Msgprocessor

我有数据库记录,其中被转换成POJO的形式,并存储在一个集合。通过这些POJO,我的类可以尝试在单个区间内进行多处理和更新。

控制器 - 具有线程池,启动轮询类调查方法 - 做

轮询 - 应轮询新传入的消息,并将其存储在输入队列 - 做

MsgProcessor - 应该寻找新的传入消息并将它们从传出队列传递到传入队列 - 也完成

DbHandler-应在数据库中更新。

问题:

现在我的问题是

我而投票线程休眠3秒完成度的数

在我的第二无效的run()方法的代码来实现此更新Poller类,传出队列不会传递并馈送到消息处理器类进行更新。我的执行流程才刚刚返回到第一个运行的方法和我得到特例分辨

如何更新此在Dbhanler类数据库

请帮我解决这些问题。

+2

这不是很明显吗?这个例外是因为'fetchedMessages'是一个'ArrayList',它不能被转换为你的自定义类型(除非'KpiMsg804'扩展了'ArrayList') – adarshr

+2

@adarshr我将获取的消息的返回值更改为传出队列,但仍然得到同样的例外。 – Babu

回答

4

唯一的例外似乎是来自这条线(这是MessageProcessor.java线38?)

return (KpiMsg804) fetchedMessages; 

fetchedMessages在这一点上似乎是一个ArrayList

+2

+1虽然提供了很多代码,但似乎缺少一些关键代码。 –

+1

@Henry谢谢你的回应,我将Fetched的返回值更改为传出Queue,但仍然得到相同的异常。 – Babu

+0

@PeterLawrey,雅我已根据您的要求更新。 – Babu

相关问题