我试图做多线程在这里,现在我已经使用DbHandler类来更新我的数据库更新dabtabase如何使用循环和获得最后一个序列值
程序执行,在控制器类开始,这有一个主要方法和线程池:
public class RunnableController {
// Main method
public static void main(String[] args) throws InterruptedException {
try {
RunnableController controller = new RunnableController();
controller.initializeDb();
controller.initialiseThreads();
System.out.println("Polling");
} catch (Exception e) {
e.printStackTrace();
}
}
private void initialUpdate()
{
DBhandler dbhandler = new DBhandler();
dbhandler.updateDb(getOutgoingQueue());
}
private void initialiseThreads() {
try {
threadExecutorRead = Executors.newFixedThreadPool(10);
PollingSynchronizer read = new PollingSynchronizer(incomingQueue, dbConncetion);
threadExecutorRead.submit(read);
} catch (Exception e){
e.printStackTrace();
}
}
}
我的轮询类获取新的数据和应该做的simulateously更新:
public class PollingSynchronizer implements Runnable {
public PollingSynchronizer(Collection<KamMessage> incomingQueue,
Connection dbConnection) {
super();
this.incomingQueue = incomingQueue;
this.dbConnection = dbConnection;
}
private int seqId;
public int getSeqId() {
return seqId;
}
public void setSeqId(int seqId) {
this.seqId = seqId;
}
// The method which runs Polling action and record the time at which it is done
public void run() {
int seqId = 0;
while (true) {
List<KamMessage> list = null;
try {
list = fullPoll(seqId);
if (!list.isEmpty()) {
seqId = list.get(0).getSequence();
incomingQueue.addAll(list);
this.outgoingQueue = incomingQueue;
System.out.println("waiting 3 seconds");
System.out.println("new incoming message");
Thread.sleep(3000);//at this wait I should execute run()
//when I debug my execution stops here and throws " Class not found Exception "
// its does not enters the message processor class
MessageProcessor processor = new MessageProcessor() {
//the run method which should fetch the message processor class.
final public void run() {
RunnableController.setOutgoingQueue(generate(outgoingQueue));
}
};
new Thread(processor).start();
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
我的消息处理器类:
public class MessageProcessor implements Runnable {
private Collection<KpiMessage> fetchedMessages;
private Connection dbConnection;
Statement st = null;
ResultSet rs = null;
PreparedStatement pstmt = null;
private Collection<KamMessage> outgoingQueue;
public Collection<KamMessage> MessageProcessor(Collection<KamMessage> outgoingQueue){
this.outgoingQueue = outgoingQueue;
this.dbConnection = dbConnection;
return outgoingQueue;
}
/**
* Method for updating new values into database in preference for dummy processing of message
* @param outgoingQueue
* @return
*/
@SuppressWarnings("javadoc")
public Collection<KamMessage> generate(Collection<KamMessage> outgoingQueue)
{
for (KamMessage pojoClass : outgoingQueue) {
KamMessage updatedValue = createKamMsg804(pojoClass);
System.out.print(" " + pojoClass.getSequence());
System.out.print(" " + pojoClass.getTableName());
System.out.print(" " + pojoClass.getAction());
System.out.print(" " + updatedValue.getKeyInfo1());
System.out.print(" " + updatedValue.getKeyInfo2());
System.out.println(" " + pojoClass.getEntryTime());
}
return outgoingQueue;
}
/**
*
* @param pojoClass
* @return msg
*/
public KamMessage createKamMsg804(KamMessage pojoClass)
{
if(pojoClass.getAction() == 804){
pojoClass.setKeyInfo1("ENTITYKEY9");
pojoClass.setKeyInfo2("STATUSKEY9");
}
return pojoClass;
}
private KamMessage convertRecordsetToPojo(ResultSet rs) throws SQLException {
KamMessage msg = new KamMessage();
int sequence = rs.getInt("SEQ");
msg.setSequence(sequence);
String tablename = rs.getString("TABLENAME");
msg.setTableName(tablename);
Timestamp entrytime = rs.getTimestamp("ENTRYTIME");
Date entryTime = new Date(entrytime.getTime());
msg.setEntryTime(entryTime);
Timestamp processingtime=rs.getTimestamp("PROCESSINGTIME");
if(processingtime!=null){
Date processingTime = new Date(processingtime.getTime());
msg.setProcessingTime(processingTime);
}
String keyInfo1 = rs.getString("KEYINFO1");
msg.setKeyInfo1(keyInfo1);
String keyInfo2 = rs.getString("KEYINFO2");
msg.setKeyInfo2(keyInfo2);
return msg;
}
@Override
public void run() {
// TODO Auto-generated method stub
}
}
这是我DBhandler类,应在数据库中做更新
public class DBhandler {
Connection conn = null;
Statement st = null;
ResultSet rs = null;
PreparedStatement pstmt = null;
public DBhandler(){
super();
}
/**
* Method to initialize the database connection
* @return conn
* @throws Exception
*
*/
public Connection initializeDB() throws Exception {
System.out.println("JDBC connection");
DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver());
conn = DriverManager.getConnection("jdbc:oracle:thin:@VM-SALES-
MB:1521:SALESDB1","bdeuser", "edb"); // Connection for Database SALES-DB1
return conn;
}
//The method for updating Database
public void updateDb(Collection<KpiMessage> updatedQueue){
for(KpiMessage pojoClass : updatedQueue){
//**How the query should be used so that it gets last sequence vale and Updates into
Database**
String query = "UPDATE msg_new_to_bde Set KEYINFO1= ?, KEYINFO2 = ? WHERE SEQ = and
action = 804";
}
}
/**
* Method for Closing the connection
* @throws Exception
*
*/
public void closeDB() throws Exception {
st.close();
conn.close();
}
}
我只需要通过调用updatedQueue在控制类在这个类(DbHAndler)使用更新查询来更新数据库。
我的程序流程 - 我有三类:1.Controller 2.PollerSynchro 3.Msgprocessor
我有数据库记录,其中被转换成POJO的形式,并存储在一个集合。通过这些POJO,我的类可以尝试在单个区间内进行多处理和更新。
控制器 - 具有线程池,启动轮询类调查方法 - 做
轮询 - 应轮询新传入的消息,并将其存储在输入队列 - 做
MsgProcessor - 应该寻找新的传入消息并将它们从传出队列传递到传入队列 - 也完成
DbHandler-应在数据库中更新。
问题:
现在我的问题是
我而投票线程休眠3秒完成度的数
在我的第二无效的run()方法的代码来实现此更新Poller类,传出队列不会传递并馈送到消息处理器类进行更新。我的执行流程才刚刚返回到第一个运行的方法和我得到特例分辨
如何更新此在Dbhanler类数据库
请帮我解决这些问题。
这不是很明显吗?这个例外是因为'fetchedMessages'是一个'ArrayList',它不能被转换为你的自定义类型(除非'KpiMsg804'扩展了'ArrayList') – adarshr
@adarshr我将获取的消息的返回值更改为传出队列,但仍然得到同样的例外。 – Babu