2013-03-06 86 views
1

我知道,我可以使用JMS和ActiveMQ的限制,但我真正需要的东西很简单,没有大量的开销,可以将数据保存到硬盘驱动器。我用ActiveMQ做了一些测试,并且不太喜欢持久队列的性能。是否有任何Java阻塞队列,当达到

我正在寻找的是基本实现任何阻塞队列与存储在HDD(理想情况下),如果达到一定的大小限制信息的能力。然后,它应该能够从HDD读取存储的消息,并且如果可能的话,停止向HDD写入新的内容(在内存使用中恢复)。

我的方案很简单 - 消息(JSON)从外界传来。我做了一些处理,然后将它们发送到另一个REST服务。当目标REST服务关闭或者我们之间的网络不好时,可能会发生问题。在这种情况下,准备好的事件将存储在可能填满所有可用内存的队列中。我不希望/不需要将所有消息写入HDD/DB - 只有那些不适合内存的消息。

谢谢!

+1

你所要求的不是'东西很simple'。你可能想要'可靠的东西'。 – 2013-03-06 17:52:35

+0

ehcache是​​我知道的透明地将数据移入或移出磁盘的最简单的方法。如果队列顺序很重要,你需要自己处理。 – Affe 2013-03-06 18:01:45

+0

是的,队列顺序很重要。另外,当我说 - “非常简单的事情”时,我的意思是我不需要群集企业解决方案(因为我可以使用ActiveMQ)。所有的魔法都应该发生在1个JVM中。另外一个很好的功能 - 如果JVM停止 - 如果有任何消息,则从HDD填充队列。 – Alex 2013-03-06 18:26:48

回答

0

此代码应为你工作 - 它在内存中的持久阻塞队列的 - 需要一些文件的调优,但应工作

package test; 

    import java.io.BufferedReader; 
    import java.io.BufferedWriter; 
    import java.io.File; 
    import java.io.FileReader; 
    import java.io.FileWriter; 
    import java.io.IOException; 
    import java.util.ArrayList; 
    import java.util.Collections; 
    import java.util.LinkedList; 
    import java.util.List; 

    public class BlockingQueue { 

    //private static Long maxInMenorySize = 1L; 
    private static Long minFlushSize = 3L; 

    private static String baseDirectory = "/test/code/cache/"; 
    private static String fileNameFormat = "Table-"; 

    private static String currentWriteFile = ""; 

    private static List<Object> currentQueue = new LinkedList<Object>(); 
    private static List<Object> lastQueue = new LinkedList<Object>(); 

    static{ 
     try { 
      load(); 
     } catch (IOException e) { 
      System.out.println("Unable To Load"); 
      e.printStackTrace(); 
     } 
    } 

    private static void load() throws IOException{ 
     File baseLocation = new File(baseDirectory); 
     List<String> fileList = new ArrayList<String>(); 

     for(File entry : baseLocation.listFiles()){ 
      if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){ 
       fileList.add(entry.getAbsolutePath()); 
      } 
     } 

     Collections.sort(fileList); 

     if(fileList.size()==0){ 
      //currentQueue = lastQueue = new ArrayList<Object>(); 
      currentWriteFile = baseDirectory + "Table-1"; 
      BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile)); 
      while (!lastQueue.isEmpty()){ 
       writer.write(lastQueue.get(0).toString()+ "\n"); 
       lastQueue.remove(0); 
      } 
      writer.close(); 
     }else{ 
      if(fileList.size()>0){ 
        BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0))); 
        String line=null; 
        while ((line=reader.readLine())!=null){ 
         currentQueue.add(line); 
        } 
        reader.close(); 
        File toDelete = new File(fileList.get(0)); 
        toDelete.delete(); 
      } 

      if(fileList.size()>0){ 
       BufferedReader reader = new BufferedReader(new FileReader(fileList.get(fileList.size()-1))); 
       currentWriteFile = fileList.get(fileList.size()-1); 
       String line=null; 
       while ((line=reader.readLine())!=null){ 
        lastQueue.add(line); 
       } 
       reader.close(); 
       //lastFileNameIndex=Long.parseLong(fileList.get(fileList.size()).substring(6, 9)); 
      } 
     } 

    } 

    private void loadFirst() throws IOException{ 
     File baseLocation = new File(baseDirectory); 
     List<String> fileList = new ArrayList<String>(); 

     for(File entry : baseLocation.listFiles()){ 
      if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){ 
       fileList.add(entry.getAbsolutePath()); 
      } 
     } 

     Collections.sort(fileList); 

     if(fileList.size()>0){ 
       BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0))); 
       String line=null; 
       while ((line=reader.readLine())!=null){ 
        currentQueue.add(line); 
       } 
       reader.close(); 
       File toDelete = new File(fileList.get(0)); 
       toDelete.delete(); 
     } 
    } 

    public Object pop(){ 
     if(currentQueue.size()>0) 
      return currentQueue.remove(0); 

     if(currentQueue.size()==0){ 
      try { 
       loadFirst(); 
      } catch (IOException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

     if(currentQueue.size()>0) 
      return currentQueue.remove(0); 
     else 
      return null; 
    } 

    public synchronized Object waitTillPop() throws InterruptedException{ 
     if(currentQueue.size()==0){ 
      try { 
       loadFirst(); 
      } catch (IOException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      if(currentQueue.size()==0) 
       wait(); 
     } 
     return currentQueue.remove(0); 
    } 

    public synchronized void push(Object data) throws IOException{ 
     lastQueue.add(data); 
     this.notifyAll(); 
     if(lastQueue.size()>=minFlushSize){ 
      BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile)); 
      while (!lastQueue.isEmpty()){ 
       writer.write(lastQueue.get(0).toString() + "\n"); 
       lastQueue.remove(0); 
      } 
      writer.close(); 

      currentWriteFile = currentWriteFile.substring(0,currentWriteFile.indexOf("-")+1) + 
        (Integer.parseInt(currentWriteFile.substring(currentWriteFile.indexOf("-")+1,currentWriteFile.length())) + 1); 
     } 
    } 

    public static void main(String[] args) { 
     try { 
      BlockingQueue bq = new BlockingQueue(); 

      for(int i =0 ; i<=8 ; i++){ 
       bq.push(""+i); 
      } 

      System.out.println(bq.pop()); 
      System.out.println(bq.pop()); 
      System.out.println(bq.pop()); 

      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 



     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 


} 
+0

良好的尝试,所以不会投票,但我没有办法在生产环境中相信这一点,因为JVM可能会在事物持久化到磁盘之前崩溃。 – user924272 2017-10-02 23:08:49

0

好了,有你的队列保存在磁盘上会工作,如果你回你的队列一个RandomAccessFile,一个MemoryMappedFile或一个MappedByteBuffer ..或其他等效的实现。 如果您的JVM过早崩溃或终止,几乎可以依靠您的操作系统将未提交的缓冲区保留到磁盘。需要注意的是,如果您的机器事先崩溃,您可以告别队列中的任何更新,因此请确保您了解这一点。 尽管遭受严重的性能下降,但您仍可以同步磁盘以获得有保证的持久性。 从更加核心的角度来看,另一种选择是复制到另一台机器以获得冗余,鉴于其复杂性,这需要单独的答案。