Java ThreadPool用法

我正在尝试编写一个multithreadingWeb爬虫。

我的主入门类有以下代码:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers); while(true){ URL url = frontier.get(); if(url == null) return; exec.execute(new URLCrawler(this, url)); } 

URLCrawler获取指定的URL,解析HTML从中提取链接,并将看不见的链接安排回边界。

边界是未抓取的URL队列。 问题是如何编写get()方法。 如果队列为空,则应等待任何URLCrawler完成,然后再次尝试。 仅当队列为空且当前没有活动的URLCrawler时,它才应返回null。

我的第一个想法是使用AtomicInteger来计算当前工作URLCrawler的数量以及notifyAll()/ wait()调用的辅助对象。 开始时每个爬虫都会增加当前工作URLCrawler的数量,并在退出时递减它,并通知对象它已完成。

但我读到notify()/ notifyAll()和wait()是一些不赞成做线程通信的方法。

我应该在这个工作模式中使用什么? 它类似于M生产者和N个消费者,问题是如何应对生产者的厌倦。

我认为在这种情况下使用wait / notify是合理的。 想不出用juc做任何直接的方法
在课堂上,我们打电话给协调员:

 private final int numOfCrawlers; private int waiting; public boolean shouldTryAgain(){ synchronized(this){ waiting++; if(waiting>=numOfCrawlers){ //Everybody is waiting, terminate return false; }else{ wait();//spurious wake up is okay //waked up for whatever reason. Try again waiting--; return true; } } public void hasEnqueued(){ synchronized(this){ notifyAll(); } } 

然后,

 ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers); while(true){ URL url = frontier.get(); if(url == null){ if(!coordinator.shouldTryAgain()){ //all threads are waiting. No possibility of new jobs. return; }else{ //Possible that there are other jobs. Try again continue; } } exec.execute(new URLCrawler(this, url)); }//while(true) 

我不确定我理解你的设计,但这可能是Semaphore的工作

一种选择是使“前沿”成为阻塞队列,因此任何试图从中“获取”的线程都会阻塞。 只要任何其他URLCrawler将对象放入该队列,就会自动通知任何其他线程(对象已出列)

我认为用例的基本构建块是一个“latch”,类似于CountDownLatch,但与CountDownLatch不同,它也允许递增计数。

这种锁存器的接口可能是

 public interface Latch { public void countDown(); public void countUp(); public void await() throws InterruptedException; public int getCount(); } 

计数的合法值将为0及以上。 await()方法可以阻止,直到计数降为零。

如果你有这样的闩锁,你的用例可以很容易地描述。 我还怀疑在这个解决方案中可以消除队列(边界)(执行器提供了一个,所以它有点多余)。 我会把你的主要例程重写为

 ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers); Latch latch = ...; // instantiate a latch URL[] initialUrls = ...; for (URL url: initialUrls) { executor.execute(new URLCrawler(this, url, latch)); } // now wait for all crawling tasks to finish latch.await(); 

你的URLCrawler会以这种方式使用latch:

 public class URLCrawler implements Runnable { private final Latch latch; public URLCrawler(..., Latch l) { ... latch = l; latch.countUp(); // increment the count as early as possible } public void run() { try { List secondaryUrls = crawl(); for (URL url: secondaryUrls) { // submit new tasks directly executor.execute(new URLCrawler(..., latch)); } } finally { // as a last step, decrement the count latch.countDown(); } } } 

至于latch实现,可以有许多可能的实现,范围从一个基于wait()和notifyAll(),一个使用Lock和Condition,到一个使用AbstractQueuedSynchronizer的实现。 我认为所有这些实现都非常简单。 请注意,wait() – notifyAll()版本和Lock-Condition版本将基于互斥,而AQS版本将使用CAS(比较和交换),因此在某些情况下可能会更好地扩展。

问题有点旧,但我想我找到了一些简单,有效的解决方案:

像下面一样扩展ThreadPoolExecutor类。 新function是保持活动任务计数(不幸的是,如果getActiveCount()不可靠)。 如果taskCount.get() == 0并且没有更多排队的任务,则意味着没有任何事情要做,执行程序关闭。 你有退出标准。 此外,如果您创建执行程序,但未能提交任何任务,则不会阻止:

 public class CrawlingThreadPoolExecutor extends ThreadPoolExecutor { private final AtomicInteger taskCount = new AtomicInteger(); public CrawlingThreadPoolExecutor() { super(8, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue()); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); taskCount.incrementAndGet(); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); taskCount.decrementAndGet(); if (getQueue().isEmpty() && taskCount.get() == 0) { shutdown(); } } } 

还有一件事是实现Runnable ,它保持对正在使用的Executor引用,以便能够提交新任务。 这是一个模拟:

 public class MockFetcher implements Runnable { private final String url; private final Executor e; public MockFetcher(final Executor e, final String url) { this.e = e; this.url = url; } @Override public void run() { final List newUrls = new ArrayList<>(); // Parse doc and build url list, and then: for (final String newUrl : newUrls) { e.execute(new MockFetcher(this.e, newUrl)); } } } 

我想建议一个AdaptiveExecuter。 根据特征值,您可以选择序列化或并行化线程以执行。 在下面的示例中,PUID是我想用来做出决定的字符串/对象。 您可以更改逻辑以适合您的代码。 对某些代码部分进行了评论以允许进一步的实验。

class AdaptiveExecutor实现Executor {final Queue tasks = new LinkedBlockingQueue(); Runnable active; // ExecutorService threadExecutor = Executors.newCachedThreadPool(); static ExecutorService threadExecutor = Executors.newFixedThreadPool(4);

 AdaptiveExecutor() { System.out.println("Initial Queue Size=" + tasks.size()); } public void execute(final Runnable r) { /* if immediate start is needed do either of below two new Thread(r).start(); try { threadExecutor.execute(r); } catch(RejectedExecutionException rEE ) { System.out.println("Thread Rejected " + new Thread(r).getName()); } */ tasks.offer(r); // otherwise, queue them up scheduleNext(new Thread(r)); // and kick next thread either serial or parallel. /* tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); */ if ((active == null)&& !tasks.isEmpty()) { active = tasks.poll(); try { threadExecutor.submit(active); } catch (RejectedExecutionException rEE) { System.out.println("Thread Rejected " + new Thread(r).getName()); } } /* if ((active == null)&& !tasks.isEmpty()) { scheduleNext(); } else tasks.offer(r); */ //tasks.offer(r); //System.out.println("Queue Size=" + tasks.size()); } private void serialize(Thread th) { try { Thread activeThread = new Thread(active); th.wait(200); threadExecutor.submit(th); } catch (InterruptedException iEx) { } /* active=tasks.poll(); System.out.println("active thread is " + active.toString() ); threadExecutor.execute(active); */ } private void parallalize() { if(null!=active) threadExecutor.submit(active); } protected void scheduleNext(Thread r) { //System.out.println("scheduleNext called") ; if(false==compareKeys(r,new Thread(active))) parallalize(); else serialize(r); } private boolean compareKeys(Thread r, Thread active) { // TODO: obtain names of threads. If they contain same PUID, serialize them. if(null==active) return true; // first thread should be serialized else return false; //rest all go parallel, unless logic controlls it } 

}