2013-01-02 83 views
1

我已经编写了一个打开UDP套接字并接收一些UDP数据包的小程序。 我试过的是每个数据包都由自己的线程使用fork-join处理(字节顺序被改变),然后改变后的数据包由udp转发到另一个系统。fork-join,compute()方法在一段时间后崩溃了

下面是代码:

package cwstreamswitcher; 

import java.io.IOException; 
import java.net.*; 
import java.util.concurrent.*; 
import java.util.logging.Level; 
import java.util.logging.Logger; 

public class CwStreamSwitcher { 

    public class Switch extends RecursiveAction { 

     private byte[] mMessage; 
     DatagramSocket mSocket; 

     public Switch(DatagramSocket serverSocket) throws UnknownHostException, IOException { 
      // mMessage = message; 
      mSocket = serverSocket; 
      int cwPort = 51001; 
      String host = "localhost"; 

      byte[] receiveData = new byte[33]; 
      DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); 
      serverSocket.receive(receivePacket); 

      StringBuilder sb = new StringBuilder();     // 
      for (byte b : receiveData) {       // 
       sb.append(String.format("%02X ", b));    //just for checking the data , will be removed in final version 
      }              // 
      System.out.println("Received: " + sb.toString());  // 


      byte[] newCache = new byte[29]; 

      newCache[0] = (byte) 0x02;      
      System.arraycopy(receiveData, 32, newCache, 1, 1);  
      System.arraycopy(receiveData, 1, newCache, 2, 2);  
      byte onid = receiveData[7]; 
      switch (onid) { 
       case (byte) 0x6d: 
        newCache[4] = (byte) 0x03; 
        newCache[5] = (byte) 0x6e; 
        break; 
       case (byte) 0x08: 
        newCache[4] = (byte) 0x08; 
        newCache[5] = (byte) 0x00; 
        break; 
       case (byte) 0x3f: 
        newCache[4] = (byte) 0x01; 
        newCache[5] = (byte) 0x3e; 
        break; 
       case (byte) 0x01: 
        newCache[4] = (byte) 0x00; 
        newCache[5] = (byte) 0x01; 
        break; 
       case (byte) 0x04: 
        newCache[4] = (byte) 0xfb; 
        newCache[5] = (byte) 0xff; 
        break; 
       case (byte) 0xab: 
        newCache[4] = (byte) 0x00; 
        newCache[5] = (byte) 0xab; 
        break; 
       case (byte) 0x56: 
        newCache[4] = (byte) 0x00; 
        newCache[5] = (byte) 0x56; 
        break; 
       case (byte) 0x02: 
        newCache[4] = (byte) 0x00; 
        newCache[5] = (byte) 0x02; 
        break; 
       case (byte) 0x7e: 
        newCache[4] = (byte) 0x00; 
        newCache[5] = (byte) 0x7e; 
        break; 
       case (byte) 0x06: 
        newCache[4] = (byte) 0x06; 
        newCache[5] = (byte) 0x00; 
        break; 
       case (byte) 0x85: 
        newCache[4] = (byte) 0x00; 
        newCache[5] = (byte) 0x85; 
        break; 
       case (byte) 0x71: 
        newCache[4] = (byte) 0x00; 
        newCache[5] = (byte) 0x71; 
        break; 
       case (byte) 0x46: 
        newCache[4] = (byte) 0x00; 
        newCache[5] = (byte) 0x46; 
        break; 
       default: 
        newCache[4] = (byte) 0x00; 
        newCache[5] = (byte) 0x85; 
      } 
      newCache[6] = (byte) 0x06;      
      newCache[7] = (byte) 0x04;      
      System.arraycopy(receiveData, 12, newCache, 8, 4);  
      newCache[12] = (byte) 0x01;      
      System.arraycopy(receiveData, 16, newCache, 13, 16);  

      //send the converted data 
      DatagramPacket response = new DatagramPacket(newCache, newCache.length, InetAddress.getByName(host), cwPort); 
      mSocket.send(response); 


      StringBuilder sb2 = new StringBuilder();    // 
      for (byte k : newCache) {        // 
       sb2.append(String.format("%02X ", k));    //just for checking the data , will be removed in final version 
      }              // 
      System.out.println("Sent:  " + sb2.toString());  // 
     } 

     @Override 
     protected void compute() { 
      try { 
       invokeAll(new Switch(mSocket)); 
      } catch (UnknownHostException ex) { 
       Logger.getLogger(CwStreamSwitcher.class.getName()).log(Level.SEVERE, null, ex); 
      } catch (IOException ex) { 
       Logger.getLogger(CwStreamSwitcher.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } 
    } 

    public void CwServer(int port) throws SocketException, IOException { 
     DatagramSocket serverSocket = new DatagramSocket(port); 
     int processors = Runtime.getRuntime().availableProcessors(); 
     CwStreamSwitcher.Switch fb = new CwStreamSwitcher.Switch(serverSocket); 
     ForkJoinPool pool = new ForkJoinPool(); 
     pool.invoke(fb); 
    } 

    public static void main(String[] args) throws SocketException, IOException { 
     int port; 
     if (args.length > 0) { 
      port = Integer.parseInt(args[0]); 
     } else { 
      port = 9876; 
     } 
     new CwStreamSwitcher().CwServer(port); 
    } 
} 

的程序编译和运行良好,但约5分钟后,与此错误消息时停止:

Exception in thread "main" java.lang.StackOverflowError 
     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
     at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) 
     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
     at java.lang.reflect.Constructor.newInstance(Constructor.java:525) 
     at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:536) 
     at java.util.concurrent.ForkJoinTask.reportResult(ForkJoinTask.java:596) 
     at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:640) 
     at java.util.concurrent.ForkJoinPool.invoke(ForkJoinPool.java:1521) 
     at cwstreamswitcher.CwStreamSwitcher.CwServer(CwStreamSwitcher.java:150) 
     at cwstreamswitcher.CwStreamSwitcher.main(CwStreamSwitcher.java:171) 
Caused by: java.lang.StackOverflowError 
     at java.net.PlainDatagramSocketImpl.receive0(Native Method) 
     at java.net.AbstractPlainDatagramSocketImpl.receive(AbstractPlainDatagramSocketImpl.java:135) 
     at java.net.DatagramSocket.receive(DatagramSocket.java:775) 
     at cwstreamswitcher.CwStreamSwitcher$Switch.<init>(CwStreamSwitcher.java:31) 
     at cwstreamswitcher.CwStreamSwitcher$Switch.compute(CwStreamSwitcher.java:126) 
     at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:177) 
     at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:377) 
     at java.util.concurrent.ForkJoinTask.invokeAll(ForkJoinTask.java:721) 

CwStreamSwitcher.java:126是这种方法调用: protected void compute(){

这是什么原因造成的?

+0

CwStreamSwitcher.java:126是这个方法调用:protected void compute(){ – user1943183

+0

什么是异常? – greyfairer

+0

我们可以看到完整的堆栈跟踪和错误消息吗? –

回答

0

看来你的RecursiveAction没有停止条件。 在compute()中,你应该决定何时产生一个新的开关,以及何时产生了足够的开关。

你的代码永远是产卵开关。

相关问题