使用Java在线程之间管道数据

我正在编写一个模仿电影院的multithreading应用程序。 涉及的每个人都是自己的线程,并发必须完全由信号量完成。 我唯一的问题是如何基本上链接线程,以便他们可以通信(例如通过管道)。

例如:

客户[1]是一个线程,获取一个信号量,让它走向票房。 现在,客户[1]必须告诉Box Office Agent他们想要看电影“X”。 然后BoxOfficeAgent [1]也是一个线程,必须检查以确保电影未满,并要么卖票或告诉客户[1]选择另一部电影。

如何在保持与信号量的并发性的同时来回传递数据?

另外,我可以在java.util.concurrent中使用的唯一类是Semaphore类。

在线程之间来回传递数据的一种简单方法是使用位于java.util.concurrentjava.util.concurrent的接口BlockingQueue的实现。

此接口具有使用不同行为向集合添加元素的方法:

  • add(E) :尽可能添加,否则抛出exception
  • boolean offer(E) :如果已添加元素,则返回true,否则返回false
  • boolean offer(E, long, TimeUnit) :尝试添加元素,等待指定的时间
  • put(E) :阻塞调用线程,直到添加元素

它还定义了具有类似行为的元素检索方法:

  • take() :阻塞,直到有一个元素可用
  • poll(long, TimeUnit) :检索元素或返回null

我最常使用的实现是: ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue

第一个是ArrayBlockingQueue ,它有一个固定的大小,由传递给它的构造函数的参数定义。

第二个是LinkedBlockingQueue,它的大小很小。 它将始终接受任何元素,即, offer将立即返回true, add将永远不会抛出exception。

第三个,也是最有趣的一个, SynchronousQueue ,正是一个管道。 您可以将其视为大小为0的队列。它永远不会保留一个元素:如果某个其他线程试图从中检索元素,则此队列只接受元素。 相反,如果有另一个线程试图推送它,则检索操作将仅返回一个元素。

为了满足专门用信号量完成同步作业要求,你可以从我给你的关于SynchronousQueue的描述中获得灵感,并写一些非常相似的东西:

 class Pipe { private E e; private final Semaphore read = new Semaphore(0); private final Semaphore write = new Semaphore(1); public final void put(final E e) { write.acquire(); this.e = e; read.release(); } public final E take() { read.acquire(); E e = this.e; write.release(); return e; } } 

请注意,此类提供与我所描述的有关SynchronousQueue的类似行为。

一旦调用了方法put(E) ,它就会获取写信号量,它将保留为空,这样对同一方法的另一次调用就会阻塞它的第一行。 然后,此方法存储对传递的对象的引用,并释放读取的信号量。 此版本将使调用take()方法的任何线程都可以继续。

然后, take()方法的第一步自然是获取读取信号量,以便禁止任何其他线程同时检索该元素。 在检索到元素并将其保存在局部变量之后( 练习:如果该行E e = this.e被删除会发生什么? ),该方法释放写信号量,以便方法put(E)可以被任何线程再次调用,并返回已保存在局部变量中的内容。

作为一个重要的评论,请注意对传递的对象的引用保存在私有字段中 ,方法take()put(E)都是最终的 。 这是至关重要的,而且常常被遗漏。 如果这些方法不是最终的(或更糟糕的是,字段不是私有的),inheritance类将能够改变take()put(E)违反合同的行为。

最后,您可以通过使用try {} finally {}来避免在take()方法中声明局部变量,如下所示:

 class Pipe { // ... public final E take() { try { read.acquire(); return e; } finally { write.release(); } } } 

在这里,这个例子的目的只是为了展示在没有经验的开发人员中没有注意到的try/finally的使用。 显然,在这种情况下,没有真正的收获。

哦,该死的,我大部分都为你完成了你的功课。 在报复中 – 并且为了测试您对信号量的了解 – 为什么不实现BlockingQueue合约定义的其他一些方法呢? 例如,您可以实现offer(E)方法和take(E, long, TimeUnit)

祝你好运。

根据具有读/写锁定的共享内存来考虑它。

  1. 创建一个缓冲区来放置消息。
  2. 应使用锁/信号量来控制对缓冲区的访问。
  3. 使用此缓冲区进行线程间通信。

问候

PKV