正确的设计在阿卡。 – 消息传递

我已经阅读了一些关于akka如何以及为何不保证邮件传递的post。 文档 ,本次讨论和关于小组的其他讨论确实很好地解释了它。

我对akka很新,并希望知道一个案例的合适设计。 例如,假设我在不同的机器上有3个不同的演员。 一个是烹饪书籍,另一个是历史,最后一个是技术书籍。

我在另一台机器上有一个主演员。 假设有一个查询主要参与者搜索我们是否有一本书可用。 主actor向3个远程actor发送请求,并期望结果。 所以我这样做:

val scatter = system.actorOf( Props[SearchActor].withRouter(ScatterGatherFirstCompletedRouter( routees=someRoutees, within = 10 seconds)), "router") implicit val timeout = Timeout(10 seconds) val futureResult = scatter ? Text("Concurrency in Practice") // What should I do here?. //val result = Await.result(futureResult, timeout.duration) line(a) 

简而言之,我已经向所有3个远程演员发送了请求,并期望在10秒内得到结果。

应该采取什么行动?

  1. 假设我在10秒钟内没有得到结果,我是否应该再次向所有人发送新请求?
  2. 如果within上述时间within是不成熟的。 但我不知道预先花了多少时间。
  3. 如果within时间足够但信息被删除了怎么办?

如果我没有within时间within得到回复并再次重新发送请求。 像这样的东西,它仍然是异步的:

 futureResult onComplete{ case Success(i) => println("Result "+i) case Failure(e) => //send again } 

但是在太多的问题下,通话中的线程太多而且体积庞大? 如果我取消注释line(a) ,它将变为同步,并且在负载下可能会执行不良。

说我10秒内没有得到回复。 如果within一段时间within过早,那么它将再次发生无用的大量计算。 如果消息掉落,那么浪费了10秒的宝贵时间。 如果说我知道消息已经传递,我可能会等待更长的时间而不会持怀疑态度。

人们如何解决这些问题? ACK ? 但后来我必须将状态存储在所有查询的actor中。 它一定是常见的,我正在寻找合适的设计。

我将尝试为您解答其中的一些问题。 我不会对所有事情都有具体的答案,但希望我能引导你朝着正确的方向前进。

对于初学者,您需要更改如何将请求传达给进行图书搜索的3位演员。 在这里使用ScatterGatherFirstCompletedRouter可能不是正确的方法。 此路由器只会等待其中一个路由(第一个响应)的回答,因此您的结果集将不完整,因为它不包含其他2个路由的结果。 还有一个BroadcastRouter ,但它不适合你的需要,因为它只处理tell (!)而不是ask (?) 。 要做你想做的事,一个选择是将请求发送到每个接收者,获取响应的Futures ,然后使用Future.sequence将它们组合成一个聚合Future 。 简化示例可能如下所示:

 case class SearchBooks(title:String) case class Book(id:Long, title:String) class BookSearcher extends Actor{ def receive = { case req:SearchBooks => val routees:List[ActorRef] = ...//Lookup routees here implicit val timeout = Timeout(10 seconds) implicit val ec = context.system.dispatcher val futures = routees.map(routee => (routee ? req).mapTo[List[Book]]) val fut = Future.sequence(futures) val caller = sender //Important to not close over sender fut onComplete{ case Success(books) => caller ! books.flatten case Failure(ex) => caller ! Status.Failure(ex) } } } 

现在这不是我们的最终代码,但它是你的样本试图做的近似值。 在此示例中,如果下游路由中的任何一个失败/超时,我们将点击我们的Failure块,并且调用者也将失败。 如果它们都成功,则调用者将获得聚合的Book对象列表。

现在谈谈你的问题。 首先,如果你没有在超时时间内从其中一个路线得到答案,你会问你是否应该再次向所有演员发送请求。 这个问题的答案真的取决于你。 您是否允许另一端的用户看到部分结果(即3个演员中的2个的结果),还是每次都必须是完整的结果集? 如果答案是肯定的,您可以调整发送给路由器的代码,如下所示:

 val futures = routees.map(routee => (routee ? req).mapTo[List[Book]].recover{ case ex => //probably log something here List() }) 

使用此代码,如果任何路由因任何原因超时或失败,则会将“Book”的空列表替换为响应而不是失败。 现在,如果您无法获得部分结果,那么您可以再次重新发送整个请求,但是您必须记住,另一端可能有人在等待他们的图书结果并且他们不想永远等待。

对于你的第二个问题,你问你的超时是否为时尚早? 您选择的超时值将完全取决于您,但它很可能应基于两个因素。 第一个因素将来自测试搜索的通话时间。 找出平均需要多长时间,并选择一个基于具有一点缓冲的值,以确保安全。 第二个因素是另一方的人愿意等待他们的结果多久。 你可以在你的超时时间保持非常保守,让它像60秒一样安全,但如果确实有人在另一端等待结果,他们愿意等多久? 我宁愿得到一个失败的回应,表明我应该再试一次,而不是永远等待。 因此,考虑到这两个因素,您应该选择一个值,使您能够在很长的时间内获得响应,同时仍然不会让另一端的调用者等待太长时间。

对于问题3,您可以询问如果邮件被删除会发生什么。 在这种情况下,我猜测接收该消息的人的未来将暂停,因为它不会得到响应,因为接收方actor将永远不会收到要响应的消息。 Akka不是JMS; 它没有确认模式,如果收件人没有收到并确认消息,则可以多次重发该消息。

另外,从我的示例中可以看出,我同意不使用Await阻止聚合Future 。 我更喜欢使用非阻塞回调。 阻塞接收函数并不理想,因为Actor实例将停止处理其邮箱,直到阻塞操作完成。 通过使用非阻塞回调,您可以释放该实例以返回处理其邮箱,并允许处理结果只是在ExecutionContext另一个作业,与处理其邮箱的actor分离。

现在,如果你真的不想在网络不可靠时浪费通信,你可以查看Akka 2.2中提供的Reliable Proxy 。 如果您不想使用此路由,可以通过定期向路由发送ping类型消息来自行滚动它。 如果没有及时响应,则将其标记为关闭并且不向其发送消息,直到您可以获得可靠(在非常短的时间内) ping它,有点像每个被保护者的FSM。 如果你绝对需要这种行为,其中任何一个都可以工作,但你需要记住,这些解决方案增加了复杂性,只有在你绝对需要这种行为时才应该使用。 如果您正在开发银行软件并且您绝对需要有保证的交付语义,否则会产生不良的财务影响,无论如何都要采用这种方法。 只是明智地决定你是否需要这样的东西,因为我有90%的时间不打赌。 在你的模型中,唯一可能受到等待某些你可能已经知道的东西不会成功的人就是另一端的来电者。 通过在actor中使用非阻塞回调,它不会因某些事情可能需要很长时间而停止; 它已经进入下一条消息。 如果您决定在失败时重新提交,也需要小心。 您不希望泛滥接收actor的邮箱。 如果您决定重新发送,请按固定次数限制。

如果您需要这些保证类型的语义,另一种可能的方法可能是查看Akka的聚类模型 。 如果您对下游路由进行了群集,并且其中一个服务器出现故障,那么所有流量都将路由到仍在运行的节点,直到该另一个节点恢复为止。