Java中的线程安全循环缓冲区

考虑一些并行运行的Web服务器实例。 每个服务器都拥有对单个共享“状态管理器”的引用,其作用是保留来自所有服务器的最后N请求。

例如( N=3 ):

 Server a: "Request id = ABCD" Status keeper=["ABCD"] Server b: "Request id = XYZZ" Status keeper=["ABCD", "XYZZ"] Server c: "Request id = 1234" Status keeper=["ABCD", "XYZZ", "1234"] Server b: "Request id = FOO" Status keeper=["XYZZ", "1234", "FOO"] Server a: "Request id = BAR" Status keeper=["1234", "FOO", "BAR"] 

在任何时间点,可以从监视应用程序调用“状态管理器”,该应用程序读取最后N个SLA报告请求。

在Java中实现这种生产者 – 消费者场景的最佳方法是什么,使Web服务器的优先级高于SLA报告?

CircularFifoBuffer似乎是保存请求的适当数据结构,但我不确定实现高效并发的最佳方法是什么。

 Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer()); 

这是一个无锁环缓冲区实现。 它实现了固定大小的缓冲区 – 没有FIFOfunction。 我建议您存储每个服务器的请求Collection 。 这样,您的报告可以进行过滤,而不是让您的数据结构进行过滤。

 /** * Container * --------- * * A lock-free container that offers a close-to O(1) add/remove performance. * */ public class Container implements Iterable { // The capacity of the container. final int capacity; // The list. AtomicReference> head = new AtomicReference>(); // TESTING { AtomicLong totalAdded = new AtomicLong(0); AtomicLong totalFreed = new AtomicLong(0); AtomicLong totalSkipped = new AtomicLong(0); private void resetStats() { totalAdded.set(0); totalFreed.set(0); totalSkipped.set(0); } // TESTING } // Constructor public Container(int capacity) { this.capacity = capacity; // Construct the list. Node h = new Node(); Node it = h; // One created, now add (capacity - 1) more for (int i = 0; i < capacity - 1; i++) { // Add it. it.next = new Node(); // Step on to it. it = it.next; } // Make it a ring. it.next = h; // Install it. head.set(h); } // Empty ... NOT thread safe. public void clear() { Node it = head.get(); for (int i = 0; i < capacity; i++) { // Trash the element it.element = null; // Mark it free. it.free.set(true); it = it.next; } // Clear stats. resetStats(); } // Add a new one. public Node add(T element) { // Get a free node and attach the element. totalAdded.incrementAndGet(); return getFree().attach(element); } // Find the next free element and mark it not free. private Node getFree() { Node freeNode = head.get(); int skipped = 0; // Stop when we hit the end of the list // ... or we successfully transit a node from free to not-free. while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) { skipped += 1; freeNode = freeNode.next; } // Keep count of skipped. totalSkipped.addAndGet(skipped); if (skipped < capacity) { // Put the head as next. // Doesn't matter if it fails. That would just mean someone else was doing the same. head.set(freeNode.next); } else { // We hit the end! No more free nodes. throw new IllegalStateException("Capacity exhausted."); } return freeNode; } // Mark it free. public void remove(Node it, T element) { totalFreed.incrementAndGet(); // Remove the element first. it.detach(element); // Mark it as free. if (!it.free.compareAndSet(false, true)) { throw new IllegalStateException("Freeing a freed node."); } } // The Node class. It is static so needs the  repeated. public static class Node { // The element in the node. private T element; // Are we free? private AtomicBoolean free = new AtomicBoolean(true); // The next reference in whatever list I am in. private Node next; // Construct a node of the list private Node() { // Start empty. element = null; } // Attach the element. public Node attach(T element) { // Sanity check. if (this.element == null) { this.element = element; } else { throw new IllegalArgumentException("There is already an element attached."); } // Useful for chaining. return this; } // Detach the element. public Node detach(T element) { // Sanity check. if (this.element == element) { this.element = null; } else { throw new IllegalArgumentException("Removal of wrong element."); } // Useful for chaining. return this; } public T get () { return element; } @Override public String toString() { return element != null ? element.toString() : "null"; } } // Provides an iterator across all items in the container. public Iterator iterator() { return new UsedNodesIterator(this); } // Iterates across used nodes. private static class UsedNodesIterator implements Iterator { // Where next to look for the next used node. Node it; int limit = 0; T next = null; public UsedNodesIterator(Container c) { // Snapshot the head node at this time. it = c.head.get(); limit = c.capacity; } public boolean hasNext() { // Made into a `while` loop to fix issue reported by @Nim in code review while (next == null && limit > 0) { // Scan to the next non-free node. while (limit > 0 && it.free.get() == true) { it = it.next; // Step down 1. limit -= 1; } if (limit != 0) { next = it.element; } } return next != null; } public T next() { T n = null; if ( hasNext () ) { // Give it to them. n = next; next = null; // Step forward. it = it.next; limit -= 1; } else { // Not there!! throw new NoSuchElementException (); } return n; } public void remove() { throw new UnsupportedOperationException("Not supported."); } } @Override public String toString() { StringBuilder s = new StringBuilder(); Separator comma = new Separator(","); // Keep counts too. int usedCount = 0; int freeCount = 0; // I will iterate the list myself as I want to count free nodes too. Node it = head.get(); int count = 0; s.append("["); // Scan to the end. while (count < capacity) { // Is it in-use? if (it.free.get() == false) { // Grab its element. T e = it.element; // Is it null? if (e != null) { // Good element. s.append(comma.sep()).append(e.toString()); // Count them. usedCount += 1; } else { // Probably became free while I was traversing. // Because the element is detached before the entry is marked free. freeCount += 1; } } else { // Free one. freeCount += 1; } // Next it = it.next; count += 1; } // Decorate with counts "]used+free". s.append("]").append(usedCount).append("+").append(freeCount); if (usedCount + freeCount != capacity) { // Perhaps something was added/freed while we were iterating. s.append("?"); } return s.toString(); } } 

请注意,这接近于O1 put和get。 Separator只是第一次发出“”,然后从那时起它的参数。

编辑:添加测试方法。

 // ***** Following only needed for testing. ***** private static boolean Debug = false; private final static String logName = "Container.log"; private final static NamedFileOutput log = new NamedFileOutput("C:\\Junk\\"); private static synchronized void log(boolean toStdoutToo, String s) { if (Debug) { if (toStdoutToo) { System.out.println(s); } log(s); } } private static synchronized void log(String s) { if (Debug) { try { log.writeLn(logName, s); } catch (IOException ex) { ex.printStackTrace(); } } } static volatile boolean testing = true; // Tester object to exercise the container. static class Tester implements Runnable { // My name. T me; // The container I am testing. Container c; public Tester(Container container, T name) { c = container; me = name; } private void pause() { try { Thread.sleep(0); } catch (InterruptedException ex) { testing = false; } } public void run() { // Spin on add/remove until stopped. while (testing) { // Add it. Node n = c.add(me); log("Added " + me + ": " + c.toString()); pause(); // Remove it. c.remove(n, me); log("Removed " + me + ": " + c.toString()); pause(); } } } static final String[] strings = { "One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine", "Ten" }; static final int TEST_THREADS = Math.min(10, strings.length); public static void main(String[] args) throws InterruptedException { Debug = true; log.delete(logName); Container c = new Container(10); // Simple add/remove log(true, "Simple test"); Node it = c.add(strings[0]); log("Added " + c.toString()); c.remove(it, strings[0]); log("Removed " + c.toString()); // Capacity test. log(true, "Capacity test"); ArrayList> nodes = new ArrayList>(strings.length); // Fill it. for (int i = 0; i < strings.length; i++) { nodes.add(i, c.add(strings[i])); log("Added " + strings[i] + " " + c.toString()); } // Add one more. try { c.add("Wafer thin mint!"); } catch (IllegalStateException ise) { log("Full!"); } c.clear(); log("Empty: " + c.toString()); // Iterate test. log(true, "Iterator test"); for (int i = 0; i < strings.length; i++) { nodes.add(i, c.add(strings[i])); } StringBuilder all = new StringBuilder (); Separator sep = new Separator(","); for (String s : c) { all.append(sep.sep()).append(s); } log("All: "+all); for (int i = 0; i < strings.length; i++) { c.remove(nodes.get(i), strings[i]); } sep.reset(); all.setLength(0); for (String s : c) { all.append(sep.sep()).append(s); } log("None: " + all.toString()); // Multiple add/remove log(true, "Multi test"); for (int i = 0; i < strings.length; i++) { nodes.add(i, c.add(strings[i])); log("Added " + strings[i] + " " + c.toString()); } log("Filled " + c.toString()); for (int i = 0; i < strings.length - 1; i++) { c.remove(nodes.get(i), strings[i]); log("Removed " + strings[i] + " " + c.toString()); } c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]); log("Empty " + c.toString()); // Multi-threaded add/remove log(true, "Threads test"); c.clear(); for (int i = 0; i < TEST_THREADS; i++) { Thread t = new Thread(new Tester(c, strings[i])); t.setName("Tester " + strings[i]); log("Starting " + t.getName()); t.start(); } // Wait for 10 seconds. long stop = System.currentTimeMillis() + 10 * 1000; while (System.currentTimeMillis() < stop) { Thread.sleep(100); } // Stop the testers. testing = false; // Wait some more. Thread.sleep(1 * 100); // Get stats. double added = c.totalAdded.doubleValue(); double skipped = c.totalSkipped.doubleValue(); //double freed = c.freed.doubleValue(); log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped) / added) + ")"); } 

我将看看ArrayDeque,或者对于更多并发实现,请查看Disruptor库,它是Java中最复杂/最复杂的环形缓冲区之一。

另一种方法是使用一个更加并发的无界队列,因为生产者永远不需要等待消费者。 Java Chronicle

除非您的需求certificate了复杂性,否则ArrayDeque可能就是您所需要的。

也许你想看看Disruptor – Concurrent Programming Framework 。

  • 查找描述替代方案,设计以及与java.util.concurrent.ArrayBlockingQueue的性能比较的论文: pdf
  • 考虑阅读BlogsAndArticles中的前三篇文章

如果库太多,请坚持使用java.util.concurrent.ArrayBlockingQueue

另请看一下java.util.concurrent

阻塞队列将阻塞,直到有消耗或(可选)空间产生:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html

并发链接队列是非阻塞的,并使用一个灵活的算法,允许生产者和消费者同时处于活动状态:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html

Hazelcast的Queue提供您要求的几乎所有内容,但不支持循环。 但是根据你的描述,我不确定你是否真的需要它。

如果是我,我会按照你的指示使用CircularFIFOBuffer,并在写入(添加)时在缓冲区周围进行同步。 当监视应用程序想要读取缓冲区时,在缓冲区上同步,然后复制或克隆它以用于报告。

该建议基于以下假设:将缓冲区复制/克隆到新对象的延迟最小。 如果有大量的元素,并且复制时间很慢,那么这不是一个好主意。

伪代码示例:

 public void writeRequest(String requestID) { synchronized(buffer) { buffer.add(requestID); } } public Collection getRequests() { synchronized(buffer) { return buffer.clone(); } }