2010-11-15 71 views
1

我正在创建一个由Log4J appender组成的库,它异步地将事件发送到远程服务器。在创建日志语句时,appender会将事件异步记录到本地队列中,然后消费者池将检索并发送到远程。JMS是否需要持久阻塞队列?

完全内存解决方案将创建一个BlockingQueue来处理并发问题。但是,我希望队列被保留,这样如果远程服务器不可用,我不会将队列扩大或者在有限队列的情况下开始丢弃消息。

我正在考虑使用嵌入式H2数据库在本地存储事件,然后使用轮询机制来检索事件并发送到远程。我更愿意使用BlockingQueue而不是轮询数据库表。

JMS是答案吗?

编辑:

如果JMS是答案,这似乎是想通过这种方式,没有任何人有对可配置为只接受过程中的消息的轻量级的,可嵌入的JMS解决方案的建议?换句话说,我不想,也可能不会被允许打开一个TCP套接字来侦听。

编辑:

我有现在的ActiveMQ嵌入式和它似乎是工作。谢谢大家。

+0

要知道JMS的实现,你正在使用以及基础设施如何建立。 JMS的一些实现允许多个线程同时读取和处理消息。从队列获取的角度来看,这保留了FIFO,但从处理消息的角度来看,不一定保留FIFO。再次检查一下你的实现。如果您正在运行支持并发读取的JMS,那么只要确保它被调整为禁止它们即可。或者如果你不在乎,那么你可以忽略我刚才所说的。 :) – 2010-11-15 13:53:17

+0

谢谢。如果某些数据稍微无序,因为它可以按事件数据排序,那么也可以。我对JMS的关注是整合另一个库的开销。有许多不同的JMS实现 - 是否有比其他实现更容易实现的实现,并且可以嵌入式方式运行? – Collin 2010-11-15 13:58:02

+1

我想找一个只支持通过JMS 1.1和/或1.2规范访问它的JMS实现。通过这种方式,您可以仅使用API​​以编程方式读取和写入队列。我之所以提到实现的唯一原因是,有时你必须使用供应商实现来配置某些东西来打开/关闭并发等。在我的情况下,我们坚持使用IBM产品,使我的公司热爱蓝色。但是你可能会逃避任何实现。在我们的例子中,我仍然告诉我们的开发人员坚持只使用API​​,并且不做特定的实现。 – 2010-11-15 19:46:05

回答

1

您可以使用JMS将消息异步发送到远程机器(假设它可以接收它们),Log4j有一个可用于此的JMS Appender。

+0

实现将是供应商特定的,但log4j文档应该让你开始,我相信他们在他们的文档中有一个示例配置/应用程序 – 2010-11-15 14:01:48

1

您肯定可以为此使用JMS。据我所知,你使用的是Log4J JMS appender。该组件将消息发送到预先配置的JMS目标(通常是队列)。您可以将此队列配置为保持。在这种情况下,插入到队列中的所有消息都将自动存储在某个持久存储区(通常是数据库)中。不幸的是,这种配置是特定于供应商的(取决于JMS供应商),但通常非常简单。请参阅您的JMS提供商的文档。

0

看看这个工程

此代码应为你工作 - 它在内存中的持久阻塞队列的 - 需要一些文件的调优,但应工作

 package test; 

    import java.io.BufferedReader; 
    import java.io.BufferedWriter; 
    import java.io.File; 
    import java.io.FileReader; 
    import java.io.FileWriter; 
    import java.io.IOException; 
    import java.util.ArrayList; 
    import java.util.Collections; 
    import java.util.LinkedList; 
    import java.util.List; 

    public class BlockingQueue { 

    //private static Long maxInMenorySize = 1L; 
    private static Long minFlushSize = 3L; 

    private static String baseDirectory = "/test/code/cache/"; 
    private static String fileNameFormat = "Table-"; 

    private static String currentWriteFile = ""; 

    private static List<Object> currentQueue = new LinkedList<Object>(); 
    private static List<Object> lastQueue = new LinkedList<Object>(); 

    static{ 
     try { 
      load(); 
     } catch (IOException e) { 
      System.out.println("Unable To Load"); 
      e.printStackTrace(); 
     } 
    } 

    private static void load() throws IOException{ 
     File baseLocation = new File(baseDirectory); 
     List<String> fileList = new ArrayList<String>(); 

     for(File entry : baseLocation.listFiles()){ 
      if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){ 
       fileList.add(entry.getAbsolutePath()); 
      } 
     } 

     Collections.sort(fileList); 

     if(fileList.size()==0){ 
      //currentQueue = lastQueue = new ArrayList<Object>(); 
      currentWriteFile = baseDirectory + "Table-1"; 
      BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile)); 
      while (!lastQueue.isEmpty()){ 
       writer.write(lastQueue.get(0).toString()+ "\n"); 
       lastQueue.remove(0); 
      } 
      writer.close(); 
     }else{ 
      if(fileList.size()>0){ 
        BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0))); 
        String line=null; 
        while ((line=reader.readLine())!=null){ 
         currentQueue.add(line); 
        } 
        reader.close(); 
        File toDelete = new File(fileList.get(0)); 
        toDelete.delete(); 
      } 

      if(fileList.size()>0){ 
       BufferedReader reader = new BufferedReader(new FileReader(fileList.get(fileList.size()-1))); 
       currentWriteFile = fileList.get(fileList.size()-1); 
       String line=null; 
       while ((line=reader.readLine())!=null){ 
        lastQueue.add(line); 
       } 
       reader.close(); 
       //lastFileNameIndex=Long.parseLong(fileList.get(fileList.size()).substring(6, 9)); 
      } 
     } 

    } 

    private void loadFirst() throws IOException{ 
     File baseLocation = new File(baseDirectory); 
     List<String> fileList = new ArrayList<String>(); 

     for(File entry : baseLocation.listFiles()){ 
      if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){ 
       fileList.add(entry.getAbsolutePath()); 
      } 
     } 

     Collections.sort(fileList); 

     if(fileList.size()>0){ 
       BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0))); 
       String line=null; 
       while ((line=reader.readLine())!=null){ 
        currentQueue.add(line); 
       } 
       reader.close(); 
       File toDelete = new File(fileList.get(0)); 
       toDelete.delete(); 
     } 
    } 

    public Object pop(){ 
     if(currentQueue.size()>0) 
      return currentQueue.remove(0); 

     if(currentQueue.size()==0){ 
      try { 
       loadFirst(); 
      } catch (IOException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

     if(currentQueue.size()>0) 
      return currentQueue.remove(0); 
     else 
      return null; 
    } 

    public synchronized Object waitTillPop() throws InterruptedException{ 
     if(currentQueue.size()==0){ 
      try { 
       loadFirst(); 
      } catch (IOException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      if(currentQueue.size()==0) 
       wait(); 
     } 
     return currentQueue.remove(0); 
    } 

    public synchronized void push(Object data) throws IOException{ 
     lastQueue.add(data); 
     this.notifyAll(); 
     if(lastQueue.size()>=minFlushSize){ 
      BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile)); 
      while (!lastQueue.isEmpty()){ 
       writer.write(lastQueue.get(0).toString() + "\n"); 
       lastQueue.remove(0); 
      } 
      writer.close(); 

      currentWriteFile = currentWriteFile.substring(0,currentWriteFile.indexOf("-")+1) + 
        (Integer.parseInt(currentWriteFile.substring(currentWriteFile.indexOf("-")+1,currentWriteFile.length())) + 1); 
     } 
    } 

    public static void main(String[] args) { 
     try { 
      BlockingQueue bq = new BlockingQueue(); 

      for(int i =0 ; i<=8 ; i++){ 
       bq.push(""+i); 
      } 

      System.out.println(bq.pop()); 
      System.out.println(bq.pop()); 
      System.out.println(bq.pop()); 

      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 



     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    }