2D易失性数组:自我赋值帮助还是我需要AtomicIntegerArray?

我正在写一个音频DSP应用程序,我选择使用生产者 – 消费者模型。 我一直在阅读关于volatile和其他线程问题的很多内容,但是我对我的一些细节有一些疑问 – 特别是,我需要在线程之间共享的一个问题是数组数组。

我有一个代表生产者的class级。 为了允许处理时间的变化,生产者存储n缓冲区,每当有更多的音频数据可用时它将填充旋转并将缓冲区传递给消费者线程。

我将从我的问题开始,然后我将尝试详细解释我的系统 – 对于长篇文章感到遗憾,谢谢你的支持! 我也非常感谢关于我的实现和线程安全的一般性评论。

我的缓冲区由volatile byte[][]数组表示。 我很清楚volatile只会使引用变为volatile,但是阅读了SO和各种博客文章,似乎我有两个选择:

我可以使用AtomicIntegerArray 。 但:

  • 我会为这样的应用程序降低性能吗?

  • primefaces性甚至是我需要的吗? 我打算一次写入整个数组, 然后我需要它对另一个线程可见,我不需要每个单独的写入是primefaces的或立即可见的。

如果我理解正确(例如这篇博文 ),自我赋值,在我的情况下是: buffers[currentBuffer] = buffers[currentBuffer]将确保发布,您将在下面的代码中看到。

  • 这是正确的,它会导致所有最近的写入变得可见吗?

  • 这是否适用于像这样的2D数组?


我将简要介绍一下生产者类的概述; 这些是实例变量:

 // The consumer - just an interface with a process(byte[]) method AudioInputConsumer consumer; // The audio data source AudioSource source; // The number of buffers int bufferCount; // Controls the main producer loop volatile boolean isRunning = false; // The actual buffers volatile byte[][] buffers; // The number of buffers left to process. // Shared counter - the producer inrements and checks it has not run // out of buffers, while the consumer decremenets when it processes a buffer AtomicInteger buffersToProcess = new AtomicInteger(0); // The producer thread. Thread producerThread; // The consumer thread. Thread consumerThread; 

一旦我启动了producerThreadconsumerThread ,它们就分别执行方法producerLoopconsumerLoop

producerLoop在等待音频数据时阻塞,读入缓冲区,在缓冲区上执行自我赋值 ,然后使用AtomicInteger实例向消费者循环发送信号。

 private void producerLoop() { int bufferSize = source.getBufferSize(); int currentBuffer = 0; while (isRunning) { if (buffersToProcess.get() == bufferCount) { //This thread must be faster than the processing thread, we have run out // of buffers: decide what to do System.err.println("WARNING: run out of buffers"); } source.read(buffers[currentBuffer], 0, bufferSize); // Read data into the buffer buffers[currentBuffer] = buffers[currentBuffer]; // Self-assignment to force publication (?) buffersToProcess.incrementAndGet(); // Signal to the other thread that there is data to read currentBuffer = (currentBuffer + 1) % bufferCount; // Next buffer } } 

consumerLoop等待AtomicInteger buffersToProcess大于零,然后调用使用者对象以对数据执行任何操作。 之后buffersToProcess递减,我们等待它再次变为非零。

 private void consumerLoop() { int currentBuffer = 0; while (isRunning) { if (buffersToProcess.get() > 0) { consumer.process(buffers[currentBuffer]); // Process the data buffersToProcess.decrementAndGet(); // Signal that we are done with this buffer currentBuffer = (currentBuffer + 1) % bufferCount; // Next buffer } Thread.yield(); } } 

非常感谢!

你确实需要primefaces性,因为写入数组是一个非primefaces过程。 具体来说,Java肯定永远不会保证对数组成员的写入在您选择发布之前对其他线程保持不可见

一种选择是每次创建一个新数组,完全初始化它,然后通过volatile发布,但由于Java坚持必须先将新分配的数组清零,并且由于GC开销,这可能会产生很大的成本。 您可以使用“双缓冲”方案来克服这个问题,您只需保留两个arrays并在它们之间切换。 这种方法有其危险性:线程可能仍在从数组中读取,您的写入线程已将其标记为非活动状态。 这在很大程度上取决于代码的精确细节。

唯一的另一种选择是在经典,无聊, synchronized块中进行整个读写。 这具有在延迟方面非常可预测的优点。 就个人而言,我会从这开始,如果被实际的性能问题严重压迫,那就转向更复杂的事情了。

您也可以使用读写锁进行锁定,但这只会在多个线程同时读取数组时获得回报。 这似乎不是你的情况。

看看java.util.concurrent.ArrayBlockingQueue。

当您在阻塞队列上调用take()时,它将自动等待,直到某些内容可用。

只需将byte []放在队列中,消费者就会处理它。 在这种情况下,需要知道要处理多少缓冲区是无关紧要的。 通常,队列中的“终止”项表示最后一个缓冲区。 将byte []包装在一个带有布尔终止标志的类中会很有帮助。

关心优素福

除了@Marko Topolnik上面解释了volatile和atomic如何工作之外,如果你仍然希望在写入数组时实现跨线程可见性的效果,你可以使用Unsafe.putLongVolatile(),Unsafe.putObjectVolatile()和所有其他同一家族的方法。

是的,这不是Java喜欢,但它解决了你的问题。