1

我在多线程期间读取广播数据的流数据捕获中面临一个问题,请帮助或建议,

实际上有一个类正在从一个 udp 套接字读取数据。另一个类接受来自每个客户端请求的 tcp 连接,为每个客户端创建一个线程并为数据请求相同的 udp 类。事情正在使用创建的第一个线程。但是当我从另一个 pc/ip 向另一个客户端请求时,数据包会丢失到第二个客户端/线程

我通过创建一个存储线程输出流对象并循环它以将数据发送到所有客户端的列表来解决此问题。但这只是暂时的,因为如果客户端/连接增加,它会延迟数据包。

读取 UDP 数据的代码

   public class EventNotifier    
    {
      private InterestingEvent ie;
      public DatagramSocket clientSocket;
      public String[] split_str;

      byte[] receiveData;

      HashMap<String, String> secMap = new HashMap<String, String>();


      public EventNotifier(InterestingEvent event)
      {
        ie = event; 
     clientSocket = new DatagramSocket(9050);
        receiveData = new byte[500];
      }



      public String getDataFeed(String client_id)
      {
        try 
        {
             DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
             clientSocket.receive(receivePacket);
             String s = new String(receivePacket.getData());
             String split_str = s.split(","); 
             if(secMap.containsValue(split_str[0]))
                  return s;
             else
                  return "";
             } catch(Exception e3) {}
      }
     }// end of eventNotifier class

多线程处理客户端请求的代码

public class multiServer 
{

    static protected List<PrintWriter> writers = new ArrayList<PrintWriter>();
    static String client_id = "";
    public static void main(String[] args)
    { 
       try
       {
           ServerSocket servsock = new ServerSocket(8858);
           Socket incoming;
           while(true)
           {
             incoming = servsock.accept();
             multiServerThread  connection = new multiServerThread(incoming);

             Thread t1 = new Thread(connection);
             t1.start();
           }
       }
       catch(IOException e)
       {
            System.out.println("couldnt make socket");
       }
    }
}

class multiServerThread extends Thread implements InterestingEvent
{

    Socket incoming;
    PrintWriter out=null;
    PrintWriter broad=null;
    BufferedReader in = null;
    String cliString=null;
    private EventNotifier en; 
    int id;

    public static String udp_data;

    public void interestingEvent(String str1)
    {
          this.udp_data = str1;
    }
    public String getUdpData()
    {
      String _udp_data = this.udp_data;
      return _udp_data;
    }
    multiServerThread(Socket incoming)
    {  
        this.incoming=incoming;
        en = new EventNotifier(this);
    } 
    public void run()
    {
        try
        {
            out = new PrintWriter(incoming.getOutputStream(), true);
            in = new BufferedReader(new InputStreamReader(incoming.getInputStream()));
        cliString = in.readLine();
        multiServer.writers.add(out);
        while(true)
                {
               try
                   {
                    udp_data = en.getDataFeed(cliString);
                    if(udp_data!=null && udp_data.length()>0)
                    {
              //workaround for serving the data to all cleints who are connected    
                        for (int i=0; i<multiServer.writers.size();i++)
                        {
                            broad=multiServer.writers.get(i);
                            broad.println(udp_data.trim());
                        }
             //else will directly write to the outputstream object for every thread which is connected
             // out.println(udp_data.trim());
                    }

               }
               catch (Exception e)
               {
                 System.out.println("exception "+e);
               }
               Thread.sleep(1);

            }
        } catch(IOException e)
        {
            System.out.print("IO Exception :: "+ e);
        }
        catch(InterruptedException e)
        {
            System.out.print("exception "+ e);
        }        
    }
}
4

1 回答 1

0

您需要互斥(或不同的设计)。

例如,如果两个线程multiServer.writers.add(out); 同时调用会发生什么?

来自 ArrayList Javadocs

请注意,此实现不同步。如果多个线程同时访问一个 ArrayList 实例,并且至少有一个线程在结构上修改了列表,则必须在外部进行同步。(结构修改是添加或删除一个或多个元素的任何操作,或 [...])

另一个问题是两个同时调用udp_data = en.getDataFeed(cliString);。第二个线程可能会覆盖第一个线程的结果。你会丢失数据!

如果一个线程调用for (int i=0; i<multiServer.writers.size();i++)而另一个线程忙于做会发生什么multiServer.writers.add(out);?大小可能已经增加,之前out实际上已经添加到列表中!

public class multiServer 
{

    private List<PrintWriter> writers = new ArrayList<PrintWriter>();

    public synchronized void addWriter(PrintWrite out) {
         writers.add(out);
    }
    public synchronized void serveAllWriters(String data) {
         for (int i=0; i<multiServer.writers.size();i++)
         {
             broad=multiServer.writers.get(i);
             broad.println(data);
         }
    }
}

现在,当一个线程试图添加一个 writer 时,synchronizeds 将确保没有其他线程正在执行add或打印。所以multiServerThread应该修复使用新方法:

class multiServerThread extends Thread implements InterestingEvent
{
      //...
      private String udp_data;
      //...
      myMultiServer.addWriter(out);

      //...
      udp_data = en.getDataFeed(cliString);
      if(udp_data!=null && udp_data.length()>0)
            myMultiServer.serveAllWriters(udp_data.trim());
      //...
}

可能还有更多问题,不确定我不完全理解您的代码。您必须问自己的问题是,另一个线程可以读取和/或写入相同的数据或对象吗?是的?然后你需要适当的同步。

于 2012-06-25T08:19:04.490 回答