预初始化工作线程池以重用连接对象(套接字)

我需要在Java中构建一个工作池,每个工作者都有自己的连接套接字; 当工作线程运行时,它使用套接字但保持打开状态以便以后重用。 我们决定使用这种方法,因为与临时创建,连接和销毁套接字相关的开销需要太多的开销,因此我们需要一种方法,通过这种方法,工作池预先初始化其套接字连接,准备好承担工作同时保持套接字资源免受其他线程的影响(套接字不是线程安全的),所以我们需要沿着这些方向做点什么……

public class SocketTask implements Runnable { Socket socket; public SocketTask(){ //create + connect socket here } public void run(){ //use socket here } 

}

在应用程序启动时,我们想要初始化工作程序,并希望套接字连接也能以某种方式…

 MyWorkerPool pool = new MyWorkerPool(); for( int i = 0; i < 100; i++) pool.addWorker( new WorkerThread()); 

当应用程序请求工作时,我们将任务发送到工作池以立即执行…

 pool.queueWork( new SocketTask(..)); 

更新了工作代码
基于Gray和jontejj的有用评论,我得到以下代码…

SocketTask

 public class SocketTask implements Runnable { private String workDetails; private static final ThreadLocal threadLocal = new ThreadLocal(){ @Override protected Socket initialValue(){ return new Socket(); } }; public SocketTask(String details){ this.workDetails = details; } public void run(){ Socket s = getSocket(); //gets from threadlocal //send data on socket based on workDetails, etc. } public static Socket getSocket(){ return threadLocal.get(); } } 

ExecutorService的

 ExecutorService threadPool = Executors.newFixedThreadPool(5, Executors.defaultThreadFactory()); int tasks = 15; for( int i = 1; i <= tasks; i++){ threadPool.execute(new SocketTask("foobar-" + i)); } 

我喜欢这种方法有几个原因……

  • 套接字是运行任务可用的本地对象(通过ThreadLocal),消除了并发问题。
  • 套接字创建一次并保持打开状态,在新任务排队时重用,消除套接字对象创建/销毁开销。

一个想法是将Socket放入BlockingQueue 。 然后,无论何时需要Socket您的线程都可以从队列中take() ,当它们完成Socket它们put()put()重新放回队列中。

 public void run() { Socket socket = socketQueue.take(); try { // use the socket ... } finally { socketQueue.put(socket); } } 

这有额外的好处:

  • 您可以返回使用ExecutorService代码。
  • 您可以将套接字通信与结果处理分开。
  • 处理线程和套接字不需要一对一的对应关系。 但是套接字通信可能是98%的工作,所以可能没有收获。
  • 当您完成并且您的ExecutorService完成后,您可以通过将它们出列并关闭它们来关闭套接字。

这确实增加了另一个BlockingQueue的额外开销,但是如果你正在进行Socket通信,你就不会注意到它。

我们不相信ThreadFactory能满足我们的需求……

如果您使用线程本地,我认为您可以使这项工作。 您的线程工厂将创建一个线程,首先打开套接字,将其存储在线程本地,然后调用Runnable arg,它执行与套接字的所有工作,从ExecutorService内部队列中取出作业。 完成后, arg.run()方法将完成,您可以从线程本地获取套接字并关闭它。

像下面这样的东西。 这有点乱,但你应该明白这个想法。

 ExecutorService threadPool = Executors.newFixedThreadPool(10, new ThreadFactory() { public Thread newThread(final Runnable r) { Thread thread = new Thread(new Runnable() { public void run() { openSocketAndStoreInThreadLocal(); // our tasks would then get the socket from the thread-local r.run(); getSocketFromThreadLocalAndCloseIt(); } }); return thread; } })); 

所以你的任务将实现Runnable ,如下所示:

 public SocketWorker implements Runnable { private final ThreadLocal threadLocal; public SocketWorker(ThreadLocal threadLocal) { this.threadLocal = threadLocal; } public void run() { Socket socket = threadLocal.get(); // use the socket ... } } 

我认为你应该使用ThreadLocal

 package com.stackoverflow.q16680096; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { ExecutorService pool = Executors.newCachedThreadPool(); int nrOfConcurrentUsers = 100; for(int i = 0; i < nrOfConcurrentUsers; i++) { pool.submit(new InitSocketTask()); } // do stuff... pool.submit(new Task()); } } package com.stackoverflow.q16680096; import java.net.Socket; public class InitSocketTask implements Runnable { public void run() { Socket socket = SocketPool.get(); // Do initial setup here } } package com.stackoverflow.q16680096; import java.net.Socket; public final class SocketPool { private static final ThreadLocal SOCKETS = new ThreadLocal(){ @Override protected Socket initialValue() { return new Socket(); // Pass in suitable arguments here... } }; public static Socket get() { return SOCKETS.get(); } } package com.stackoverflow.q16680096; import java.net.Socket; public class Task implements Runnable { public void run() { Socket socket = SocketPool.get(); // Do stuff with socket... } } 

每个线程都有自己的套接字。