2016-01-22 99 views
1

比方说,我有一个相对较大的文件(大约100MB),我想要多播到群集的所有成员。我如何使用jgroups(最好是代码演示)以块的形式发送文件?文件应该在接收端以块读取。另外我怎样才能确保数据块的顺序在接收端保持不变。如何在jgroups中多播大文件

编辑1 这是我到目前为止尝试过的。我只是在接收器的侧发送文件作为一个整体和写入其内容的临时文件

public class SimpleFileTransfer extends ReceiverAdapter { 

    JChannel channel; 

    private void start() throws Exception{ 
     channel = new JChannel(); 
     channel.setReceiver(this); 
     channel.connect("FileCluster"); 
//  channel.getState(null, 10000); 
     File file = new File("/res/test.txt"); //the file to be sent 
     eventLoop(file); 
     channel.close(); 
    } 

    private void eventLoop(File file) throws IOException{ 
     BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(file))); 
     try { 
      Message msg = new Message(null, null, in); 
      channel.send(msg); 
     } 
     catch (Exception e){ 
      e.printStackTrace(); 
     } 
    } 


    public void receive(Message msg) 
    { 
     try { 
      File temp = new File("/res/temp.txt"); 
      FileWriter writer = new FileWriter(temp); 
      InputStream in = new ByteArrayInputStream(msg.getBuffer()); 
      int next = in.read(); 
      while (next != -1){ 
       writer.write(next); 
       next = in.read(); 
      } 
     } 
     catch (IOException ie) 
     { 
      ie.printStackTrace(); 
     } 


    } 

} 
+0

这不是人们编写代码对你的站点。显示你已经尝试了什么,然后人们会帮助你。 Downvoting。 –

+0

@我们是博格,够公平的。我用迄今为止的尝试更新了这个问题。 – avidProgrammer

+0

@WeareBorg我认为你提出了你的观点,你不需要冷静下来一个非常好的问题。 – kimathie

回答

3

下面是更好的版本,其口吃了起来大文件到8K的块。 将文件X写入/ tmp/X。需要注意的是/home/bela/fast.xml配置必须被改变,当然,:

public class SimpleFileTransfer extends ReceiverAdapter { 
protected String filename; 
protected JChannel channel; 
protected Map<String,OutputStream> files=new ConcurrentHashMap<>(); 
protected static final short ID=3500; 

private void start(String name, String filename) throws Exception { 
    ClassConfigurator.add((short)3500, FileHeader.class); 
    this.filename=filename; 
    channel=new JChannel("/home/bela/fast.xml").name(name); 
    channel.setReceiver(this); 
    channel.connect("FileCluster"); 
    eventLoop(); 
} 

private void eventLoop() throws Exception { 
    while(true) { 
     Util.keyPress(String.format("<enter to send %s>\n", filename)); 
     sendFile(); 
    } 
} 

protected void sendFile() throws Exception { 
    FileInputStream in=new FileInputStream(filename); 
    try { 
     for(;;) { 
      byte[] buf=new byte[8096]; 
      int bytes=in.read(buf); 
      if(bytes == -1) 
       break; 
      sendMessage(buf, 0, bytes, false); 
     } 
    } 
    catch(Exception e) { 
     e.printStackTrace(); 
    } 
    finally { 
     sendMessage(null, 0, 0, true); 
    } 
} 


public void receive(Message msg) { 
    byte[] buf=msg.getRawBuffer(); 
    FileHeader hdr=(FileHeader)msg.getHeader(ID); 
    if(hdr == null) 
     return; 
    OutputStream out=files.get(hdr.filename); 
    try { 
     if(out == null) { 
      File tmp=new File(hdr.filename); 
      String fname=tmp.getName(); 
      fname="/tmp/" + fname; 
      out=new FileOutputStream(fname); 
      files.put(hdr.filename, out); 
     } 
     if(hdr.eof) { 
      Util.close(files.remove(hdr.filename)); 
     } 
     else { 
      out.write(msg.getRawBuffer(), msg.getOffset(), msg.getLength()); 
     } 
    } 
    catch(Throwable t) { 
     System.err.println(t); 
    } 
} 


protected void sendMessage(byte[] buf, int offset, int length, boolean eof) throws Exception { 
    Message msg=new Message(null, buf, offset, length).putHeader(ID, new FileHeader(filename, eof)); 
    // set this if the sender doesn't want to receive the file 
    // msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK); 
    channel.send(msg); 
} 

protected static class FileHeader extends Header { 
    protected String filename; 
    protected boolean eof; 

    public FileHeader() {} // for de-serialization 

    public FileHeader(String filename, boolean eof) { 
     this.filename=filename; 
     this.eof=eof; 
    } 

    public int size() { 
     return Util.size(filename) + Global.BYTE_SIZE; 
    } 

    public void writeTo(DataOutput out) throws Exception { 
     Util.writeObject(filename, out); 
     out.writeBoolean(eof); 
    } 

    public void readFrom(DataInput in) throws Exception { 
     filename=(String)Util.readObject(in); 
     eof=in.readBoolean(); 
    } 
} 

public static void main(String[] args) throws Exception { 
    if(args.length != 2) { 
     System.out.printf("%s <name> <filename>\n", SimpleFileTransfer.class.getSimpleName()); 
     return; 
    } 
    new SimpleFileTransfer().start(args[0], args[1]); // name and file 
} 

}

0

没有人会写代码你,但:

  1. 打开文件到字节阵列
  2. 分段阵列成块
  3. 裹在说,这是其片
  4. 发送块的包络每个块
  5. 阅读信封再次将它们放回到一起

这些事情都不是很难。

+0

请参阅我的问题 – avidProgrammer

+0

的更新您的更新对我给出的建议没有任何影响 –

1

下面是不好的解决方案。要运行它,配置需要有bundler_type =“发送者发送”(在UDP中)并且应用程序需要足够的内存。 这个解决方案不好,因为它将整个文件读入一个缓冲区,该缓冲区也在JGroups中复制一次。 我将发布的下一个解决方案更好,因为它将大文件分成多个小块。请注意,发送大文件JGroups也会执行内部分块(碎片),但您仍然必须在应用程序级别创建大型字节[]缓冲区,这很糟糕。

public class SimpleFileTransfer extends ReceiverAdapter { 
protected String filename; 
protected JChannel channel; 

private void start(String name, String filename) throws Exception { 
    this.filename=filename; 
    channel=new JChannel("/home/bela/fast.xml").name(name); 
    channel.setReceiver(this); 
    channel.connect("FileCluster"); 
    eventLoop(); 
    channel.close(); 
} 

private void eventLoop() throws Exception { 
    while(true) { 
     Util.keyPress(String.format("<enter to send %s>\n", filename)); 
     sendFile(); 
    } 
} 

protected void sendFile() throws Exception { 
    Buffer buffer=readFile(filename); 
    try { 
     Message msg=new Message(null, buffer); 
     channel.send(msg); 
    } 
    catch(Exception e) { 
     e.printStackTrace(); 
    } 
} 


public void receive(Message msg) { 
    System.out.printf("received %s from %s\n", Util.printBytes(msg.getLength()), msg.src()); 
    try { 
     File temp=new File("/tmp/temp.txt"); 
     FileWriter writer=new FileWriter(temp); 
     InputStream in=new ByteArrayInputStream(msg.getBuffer()); 
     int next=in.read(); 
     while(next != -1) { 
      writer.write(next); 
      next=in.read(); 
     } 
    } 
    catch(IOException ie) { 
     ie.printStackTrace(); 
    } 
} 


protected static Buffer readFile(String filename) throws Exception { 
    File file=new File(filename); 
    int size=(int)file.length(); 
    FileInputStream input=new FileInputStream(file); 
    ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(size); 
    byte[] read_buf=new byte[1024]; 
    int bytes; 
    while((bytes=input.read(read_buf)) != -1) 
     out.write(read_buf, 0, bytes); 
    return out.getBuffer(); 
} 


public static void main(String[] args) throws Exception { 
    if(args.length != 2) { 
     System.out.printf("%s <name> <filename>\n", SimpleFileTransfer.class.getSimpleName()); 
     return; 
    } 
    new SimpleFileTransfer().start(args[0], args[1]); // name and file 
} 

}