如何以multithreading方式调用不同类的相同方法

我在我的两个类中有一个名为process的方法,比如说CLASS-A and CLASS-B 。 现在在下面的循环中,我调用我的两个类的process method顺序意义一个接一个,它工作正常,但这不是我想要的方式。

 for (ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) { final Map response = entry.getPlugin().process(outputs); // write to database System.out.println(response); } 

有什么办法,我可以用multithreading方式调用我的两个类的进程方法。 这意味着一个线程将调用CLASS-A的进程方法,第二个线程将调用CLASS-B的进程方法。

之后我想将process方法返回的数据写入数据库。 所以我可以再写一个线程来写入数据库。

下面是我以multithreading方式提出的代码,但不知何故它根本没有运行。

 public void writeEvents(final Map data) { // Three threads: one thread for the database writer, two threads for the plugin processors final ExecutorService executor = Executors.newFixedThreadPool(3); final BlockingQueue<Map> queue = new LinkedBlockingQueue<Map>(); @SuppressWarnings("unchecked") final Map outputs = (Map)data.get(ModelConstants.EVENT_HOLDER); for (final ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) { executor.submit(new Runnable () { public void run() { final Map response = entry.getPlugin().process(outputs); // put the response map in the queue for the database to read queue.offer(response); } }); } Future future = executor.submit(new Runnable () { public void run() { Map map; try { while(true) { // blocks until a map is available in the queue, or until interrupted map = queue.take(); // write map to database System.out.println(map); } } catch (InterruptedException ex) { // IF we're catching InterruptedException then this means that future.cancel(true) // was called, which means that the plugin processors are finished; // process the rest of the queue and then exit while((map = queue.poll()) != null) { // write map to database System.out.println(map); } } } }); // this interrupts the database thread, which sends it into its catch block // where it processes the rest of the queue and exits future.cancel(true); // interrupt database thread // wait for the threads to finish try { executor.awaitTermination(5, TimeUnit.MINUTES); } catch (InterruptedException e) { //log error here } } 

但如果我删除最后一行executor.awaitTermination(5, TimeUnit.MINUTES); 然后它开始运行良好,一段时间后,我总是得到这样的错误 –

 JVMDUMP006I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" - please wait. JVMDUMP032I JVM requested Heap dump using 'S:\GitViews\Stream\goldseye\heapdump.20130827.142415.16456.0001.phd' in response to an event JVMDUMP010I Heap dump written to S:\GitViews\Stream\goldseye\heapdump.20130827.142415.16456.0001.phd JVMDUMP006I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" - please wait. 

任何人都可以帮我弄清楚我的上述代码中出现的问题和错误是什么? 如果我顺序运行然后我没有得到任何错误,它工作正常。

与我的工作方式相比,还有更好的方法吗? 因为将来我可以拥有多个插件处理器,而不是两个。

我想要做的是 – 以multithreading的方式调用我的两个类的进程方法然后写入数据库bcoz我的进程方法将返回一个Map。

任何帮助将在此受到赞赏..如果可能的话,我正在寻找一个可行的例子。 谢谢您的帮助,

您粘贴的代码段几乎没有问题,如果您修复它们,这应该有效。
1.您正在使用无限循环从阻塞队列中获取元素并尝试使用future来解除此问题。 这绝对不是一个好方法。 这种方法的问题是,您的数据库线程可能永远不会运行,因为即使在运行之前,调用程序线程中运行的未来任务也可能会取消它。 这很容易出错。
– 你应该运行while循环固定的次数(你已经知道有多少生产者或者你要获得响应的次数)。

  1. 此外,提交给执行程序服务的任务应该是独立的任务…这里您的数据库任务依赖于其他任务的执行。如果您的执行策略发生更改,这也可能导致死锁…例如,如果您使用单线程池执行程序如果数据库线程被调度,它只会阻止生成器在队列中添加数据。

    • 一种好方法是创建检索数据并在同一线程中更新数据库的任务。
    • 或者首先检索所有响应,然后并行执行数据库操作

    public void writeEvents(final map data){

     final ExecutorService executor = Executors.newFixedThreadPool(3); @SuppressWarnings("unchecked") final Map outputs = (Map)data.get(ModelConstants.EVENT_HOLDER); for (final ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) { executor.submit(new Runnable () { public void run() { try { final Map response = entry.getPlugin().process(outputs); //process the response and update database. System.out.println(map); } catch (Throwable e) { //handle execption } finally { //clean up resources } } }); } 

    //这将等待运行线程完成..这是一个有序的关闭。 executor.shutdown(); }

好的,这是我上面建议的评论的一些代码。 免责声明:我不确定它是否有效甚至可以编译,或者它是否能解决问题。 但我的想法是控制取消过程而不是依赖future.cancel ,我怀疑这可能会导致问题。

 class CheckQueue implements Runnable { private volatile boolean cancelled = false; public void cancel() { cancelled = true; } public void run() { Map map; try { while(!cancelled) { // blocks until a map is available in the queue, or until interrupted map = queue.take(); if (cancelled) break; // write map to database System.out.println(map); } catch (InterruptedException e) { } while((map = queue.poll()) != null) { // write map to database System.out.println(map); } } } CheckQueue queueChecker = new CheckQueue (); Future future = executor.submit(queueChecker); // this interrupts the database thread, which sends it into its catch block // where it processes the rest of the queue and exits queueChecker.cancel();