Java:multithreading和UDP套接字编程

我是Java中multithreading和套接字编程的新手。 我想知道实现2个线程的最佳方法是什么 – 一个用于接收套接字,另一个用于发送套接字。 如果我想做的事听起来很荒谬,请告诉我原因! 该代码很大程度上受到Sun在线教程的启发。我想使用多播套接字,以便我可以使用多播组。

class Server extends Thread { static protected MulticastSocket socket = null; protected BufferedReader in = null; public InetAddress group; private static class Receive implements Runnable { public void run() { try { byte[] buf = new byte[256]; DatagramPacket pkt = new DatagramPacket(buf,buf.length); socket.receive(pkt); String received = new String(pkt.getData(),0,pkt.getLength()); System.out.println("From server@" + received); Thread.sleep(1000); } catch (IOException e) { System.out.println("Error:"+e); } catch (InterruptedException e) { System.out.println("Error:"+e); } } } public Server() throws IOException { super("server"); socket = new MulticastSocket(4446); group = InetAddress.getByName("239.231.12.3"); socket.joinGroup(group); } public void run() { while(1>0) { try { byte[] buf = new byte[256]; DatagramPacket pkt = new DatagramPacket(buf,buf.length); //String msg = reader.readLine(); String pid = ManagementFactory.getRuntimeMXBean().getName(); buf = pid.getBytes(); pkt = new DatagramPacket(buf,buf.length,group,4446); socket.send(pkt); Thread t = new Thread(new Receive()); t.start(); while(t.isAlive()) { t.join(1000); } sleep(1); } catch (IOException e) { System.out.println("Error:"+e); } catch (InterruptedException e) { System.out.println("Error:"+e); } } //socket.close(); } public static void main(String[] args) throws IOException { new Server().start(); //System.out.println("Hello"); } } 

首先要做的是:根据Java命名约定,您的类应以大写字母开头:

类名应该是名词,大小写混合,每个内部单词的首字母大写。 尽量保持您的类名简单和描述性。 使用整个单词 – 避免使用首字母缩写词和缩写词(除非缩写词比长词forms使用得更广泛,例如URL或HTML)。

第二:尝试将代码分解为连贯的部分,并围绕您正在处理的一些常见function组织它们……可能围绕您正在编程的function或模型。

服务器的(基本)模型是它唯一能做的就是接收套接字连接……服务器依靠处理程序来处理这些连接,就是这样。 如果您尝试构建该模型,它将看起来像这样:

 class Server{ private final ServerSocket serverSocket; private final ExecutorService pool; public Server(int port, int poolSize) throws IOException { serverSocket = new ServerSocket(port); pool = Executors.newFixedThreadPool(poolSize); } public void serve() { try { while(true) { pool.execute(new Handler(serverSocket.accept())); } } catch (IOException ex) { pool.shutdown(); } } } class Handler implements Runnable { private final Socket socket; Handler(Socket socket) { this.socket = socket; } public void run() { // receive the datagram packets } } 

第三:我建议你看一些现有的例子。

每条评论更新:
好的Ravi,您的代码存在一些重大问题,以及一些问题:

  1. 我假设Receive类是您的客户端…您应该将其作为一个单独的程序(具有自己的主类)并同时运行您的服务器和多个客户端。 从您的服务器为您发送的每个新UDP包生成一个新的“客户端线程”是一个令人不安的想法( 问题)。

  2. 当你创建客户端应用程序时,你应该让它在自己的while循环中运行接收代码( 次要问题),例如:

     public class Client extends Thread { public Client(/*..*/) { // initialize your client } public void run() { while(true) { // receive UDP packets // process the UDP packets } } public static void main(String[] args) throws IOException { // start your client new Client().start(); } } 
  3. 每个客户端只需要一个线程,每个服务器只需要一个线程(因为main有自己的线程,所以在技术上甚至没有单独的线程),因此您可能找不到有用的ExecutorService

否则你的方法是正确的……但我仍然建议你查看一些例子。

想要在应用程序中创建线程并不荒谬! 您不需要完全2个线程,但我认为您正在讨论实现Runnable接口的2个类。

自Java 1.5以来,线程API已经变得更好,你不再需要乱用java.lang.Thread了。 您只需创建一个java.util.concurrent.Executor并向其提交Runnable实例即可。

Java Concurrency in Practice一书使用了确切的问题 – 创建一个线程套接字服务器 – 并遍历代码的几次迭代以显示最佳方法。 查看免费样本章节,这很棒。 我不会在这里复制/粘贴代码,但具体看清单6.8。

Eclipse的历史甚至可以在一天之后起作用也是一件好事:)多亏了这一点,我能够给Ravi一个实际的例子和Lirik他的泄漏答案。

首先让我说明我不知道造成这种泄漏的原因,但如果我留下足够长的时间,它将在OutOfMemoryError上失败。

其次,我将工作代码留给Ravi注释掉了我的UDP服务器的基本工作示例。 超时是为了测试我的防火墙将终止接收器的时间(30秒)。 只需删除游泳池中的任何东西,你就可以去了。

所以这是我的示例线程UDP服务器的工作但泄漏版本。

 public class TestServer { private static Integer TIMEOUT = 30; private final static int MAX_BUFFER_SIZE = 8192; private final static int MAX_LISTENER_THREADS = 5; private final static SimpleDateFormat DateFormat = new SimpleDateFormat("yyyy-dd-MM HH:mm:ss.SSSZ"); private int mPort; private DatagramSocket mSocket; // You can remove this for a working version private ExecutorService mPool; public TestServer(int port) { mPort = port; try { mSocket = new DatagramSocket(mPort); mSocket.setReceiveBufferSize(MAX_BUFFER_SIZE); mSocket.setSendBufferSize(MAX_BUFFER_SIZE); mSocket.setSoTimeout(0); // You can uncomment this for a working version //for (int i = 0; i < MAX_LISTENER_THREADS; i++) { // new Thread(new Listener(mSocket)).start(); //} // You can remove this for a working version mPool = Executors.newFixedThreadPool(MAX_LISTENER_THREADS); } catch (IOException e) { e.printStackTrace(); } } // You can remove this for a working version public void start() { try { try { while (true) { mPool.execute(new Listener(mSocket)); } } catch (Exception e) { e.printStackTrace(); } } finally { mPool.shutdown(); } } private class Listener implements Runnable { private final DatagramSocket socket; public Listener(DatagramSocket serverSocket) { socket = serverSocket; } private String readLn(DatagramPacket packet) throws IOException { socket.receive(packet); return new BufferedReader(new InputStreamReader(new ByteArrayInputStream(packet.getData())), MAX_BUFFER_SIZE).readLine(); } private void writeLn(DatagramPacket packet, String string) throws IOException { packet.setData(string.concat("\r\n").getBytes()); socket.send(packet); } @Override public void run() { DatagramPacket packet = new DatagramPacket(new byte[MAX_BUFFER_SIZE], MAX_BUFFER_SIZE); String s; while (true) { try { packet = new DatagramPacket(new byte[MAX_BUFFER_SIZE], MAX_BUFFER_SIZE); s = readLn(packet); System.out.println(DateFormat.format(new Date()) + " Received: " + s); Thread.sleep(TIMEOUT * 1000); writeLn(packet, s); System.out.println(DateFormat.format(new Date()) + " Sent: " + s); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { if (args.length == 1) { try { TIMEOUT = Integer.parseInt(args[0]); } catch (Exception e) { TIMEOUT = 30; } } System.out.println(DateFormat.format(new Date()) + " Timeout: " + TIMEOUT); //new TestServer(4444); new TestServer(4444).start(); } } 

顺便说一句。 @Lirik,我首先在Eclipse中目睹了这种行为,之后我从命令行测试了它。 再说一遍,我不知道造成它的原因;)抱歉......

2个线程很好。 一位读者另一位作家。 请记住,使用UDP时,您不应该生成新的处理程序线程(除非您正在执行的操作需要很长时间),我建议将传入的消息放入处理队列中。 发送相同,有一个发送线程阻塞传入的队列进行UDP发送。