如何在将数据发送到另一个应用程序时实施重试策略?

我正在处理我的应用程序,它将数据发送到zeromq 。 以下是我的应用程序:

  • 我有一个SendToZeroMQ类, SendToZeroMQ数据发送到zeromq。
  • 将相同的数据添加到同一个类中的retryQueue ,以便以后可以在未收到确认的情况下重试。 它使用带有maximumSize限制的guava缓存。
  • 有一个单独的线程从zeromq接收先前发送的数据的确认,如果没有收到确认,则SendToZeroMQ将重试发送相同的数据。 如果收到确认,那么我们将从retryQueue删除它,以便不能再次重试。

想法非常简单,我必须确保我的重试策略正常工作,这样我就不会丢失数据。 这是非常罕见的,但如果我们没有收到acknolwedgements。

我正在考虑构建两种类型的RetryPolicies但我无法理解如何在这里构建与我的程序相对应的:

  • RetryNTimes:在此期间,它将在每次重试之间以特定睡眠重试N次,之后,它将删除记录。
  • ExponentialBackoffRetry:在此它将指数级地继续重试。 我们可以设置一些最大重试限制,然后它将不会重试并将丢弃记录。

下面是我的SendToZeroMQ类,它将数据发送到zeromq,也从后台线程每30秒重试一次,并启动可持续运行的ResponsePoller runnable:

 public class SendToZeroMQ { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); private final Cache retryQueue = CacheBuilder .newBuilder() .maximumSize(10000000) .concurrencyLevel(200) .removalListener( RemovalListeners.asynchronous(new CustomListener(), executorService)).build(); private static class Holder { private static final SendToZeroMQ INSTANCE = new SendToZeroMQ(); } public static SendToZeroMQ getInstance() { return Holder.INSTANCE; } private SendToZeroMQ() { executorService.submit(new ResponsePoller()); // retry every 30 seconds for now executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { for (Entry entry : retryQueue.asMap().entrySet()) { sendTo(entry.getKey(), entry.getValue()); } } }, 0, 30, TimeUnit.SECONDS); } public boolean sendTo(final long address, final byte[] encodedRecords) { Optional liveSockets = PoolManager.getInstance().getNextSocket(); if (!liveSockets.isPresent()) { return false; } return sendTo(address, encodedRecords, liveSockets.get().getSocket()); } public boolean sendTo(final long address, final byte[] encodedByteArray, final Socket socket) { ZMsg msg = new ZMsg(); msg.add(encodedByteArray); boolean sent = msg.send(socket); msg.destroy(); // adding to retry queue retryQueue.put(address, encodedByteArray); return sent; } public void removeFromRetryQueue(final long address) { retryQueue.invalidate(address); } } 

下面是我的ResponsePoller类,它会轮询来自zeromq的所有确认。 如果我们从zeromq得到一个确认,那么我们将从重试队列中删除该记录,以便它不会被重试,否则它将被重试。

 public class ResponsePoller implements Runnable { private static final Random random = new Random(); @Override public void run() { ZContext ctx = new ZContext(); Socket client = ctx.createSocket(ZMQ.PULL); String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt()); client.setIdentity(identity.getBytes(ZMQ.CHARSET)); client.bind("tcp://" + TestUtils.getIpaddress() + ":8076"); PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)}; while (!Thread.currentThread().isInterrupted()) { // Tick once per second, pulling in arriving messages for (int centitick = 0; centitick < 100; centitick++) { ZMQ.poll(items, 10); if (items[0].isReadable()) { ZMsg msg = ZMsg.recvMsg(client); Iterator it = msg.iterator(); while (it.hasNext()) { ZFrame frame = it.next(); try { long address = TestUtils.getAddress(frame.getData()); // remove from retry queue since we got the acknowledgment for this record SendToZeroMQ.getInstance().removeFromRetryQueue(address); } catch (Exception ex) { // log error } finally { frame.destroy(); } } msg.destroy(); } } } ctx.destroy(); } } 

题:

如上所示,我使用SendToZeroMQ类将encodedRecords发送到zeromq,然后每30秒重试一次,具体取决于我们是否从ResponsePoller类返回了acknolwedgement。

对于每个encodedRecords都有一个名为address的唯一键,这就是我们将从zeromq作为确认回复的内容。

我如何继续并扩展此示例以构建我上面提到的两个重试策略,然后我可以选择在发送数据时要使用的重试策略。 我提出了以下界面,但后来我无法理解我应该如何继续实施这些重试策略并在上面的代码中使用它。

 public interface RetryPolicy { /** * Called when an operation has failed for some reason. This method should return * true to make another attempt. */ public boolean allowRetry(int retryCount, long elapsedTimeMs); } 

我可以在这里使用guava-retrying或failsafe ,因为这些库已经有许多我可以使用的重试策略吗?

我无法计算出有关如何使用相关API的所有细节,但对于算法,您可以尝试:

  • 重试策略需要在每条消息上附加某种状态(至少重试当前消息的次数,可能是当前的延迟)。 您需要确定RetryPolicy是应该保留它自己还是要将其存储在消息中。
  • 而不是allowRetry,你可以有一个方法计算下次重试应该发生的时间(绝对时间或未来的毫秒数),这将是上述状态的函数
  • 重试队列应包含有关何时应重试每条消息的信息。
  • 而不是使用scheduleAtFixedRate ,在重试队列中找到具有最低when_is_next_retry (可能通过对绝对重试时间戳进行排序并选择第一个),并让executorService使用scheduletime_to_next_retry重新安排自己。
  • 对于每次重试,将其从重试队列中拉出,发送消息,使用RetryPolicy计算下次重试的时间(如果要重试),并使用when_is_next_retry的新值插入重试队列(如果RetryPolicy返回-1,这可能意味着不再重试该消息)

不是一个完美的方式,但也可以通过以下方式实现。

 public interface RetryPolicy { public boolean allowRetry(); public void decreaseRetryCount(); 

}

创建两个实现。 对于RetryNTimes

 public class RetryNTimes implements RetryPolicy { private int maxRetryCount; public RetryNTimes(int maxRetryCount) { this.maxRetryCount = maxRetryCount; } public boolean allowRetry() { return maxRetryCount > 0; } public void decreaseRetryCount() { maxRetryCount = maxRetryCount-1; }} 

对于ExponentialBackoffRetry

 public class ExponentialBackoffRetry implements RetryPolicy { private int maxRetryCount; private final Date retryUpto; public ExponentialBackoffRetry(int maxRetryCount, Date retryUpto) { this.maxRetryCount = maxRetryCount; this.retryUpto = retryUpto; } public boolean allowRetry() { Date date = new Date(); if(maxRetryCount <= 0 || date.compareTo(retryUpto)>=0) { return false; } return true; } public void decreaseRetryCount() { maxRetryCount = maxRetryCount-1; }} 

您需要在SendToZeroMQ类中进行一些更改

 public class SendToZeroMQ { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); private final Cache retryQueue = CacheBuilder .newBuilder() .maximumSize(10000000) .concurrencyLevel(200) .removalListener( RemovalListeners.asynchronous(new CustomListener(), executorService)).build(); private static class Holder { private static final SendToZeroMQ INSTANCE = new SendToZeroMQ(); } public static SendToZeroMQ getInstance() { return Holder.INSTANCE; } private SendToZeroMQ() { executorService.submit(new ResponsePoller()); // retry every 30 seconds for now executorService.scheduleAtFixedRate(new Runnable() { public void run() { for (Map.Entry entry : retryQueue.asMap().entrySet()) { RetryMessage retryMessage = entry.getValue(); if(retryMessage.getRetryPolicy().allowRetry()) { retryMessage.getRetryPolicy().decreaseRetryCount(); entry.setValue(retryMessage); sendTo(entry.getKey(), retryMessage.getMessage(),retryMessage); }else { retryQueue.asMap().remove(entry.getKey()); } } } }, 0, 30, TimeUnit.SECONDS); } public boolean sendTo(final long address, final byte[] encodedRecords, RetryMessage retryMessage) { Optional liveSockets = PoolManager.getInstance().getNextSocket(); if (!liveSockets.isPresent()) { return false; } if(null==retryMessage) { RetryPolicy retryPolicy = new RetryNTimes(10); retryMessage = new RetryMessage(retryPolicy,encodedRecords); retryQueue.asMap().put(address,retryMessage); } return sendTo(address, encodedRecords, liveSockets.get().getSocket()); } public boolean sendTo(final long address, final byte[] encodedByteArray, final ZMQ.Socket socket) { ZMsg msg = new ZMsg(); msg.add(encodedByteArray); boolean sent = msg.send(socket); msg.destroy(); return sent; } public void removeFromRetryQueue(final long address) { retryQueue.invalidate(address); }} 

你可以使用apache camel 。 它为zeromq提供了一个组件,并且本机提供了诸如errohandler,redeliverypolicy,deadletter channel等工具。

这是一个有关环境的小模拟,显示了如何完成此操作。 注意,这里的Guava缓存是错误的数据结构,因为你对驱逐不感兴趣(我认为)。 所以我使用的是并发hashmap:

 package experimental; import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; class Experimental { /** Return the desired backoff delay in millis for the given retry number, which is 1-based. */ interface RetryStrategy { long getDelayMs(int retry); } enum ConstantBackoff implements RetryStrategy { INSTANCE; @Override public long getDelayMs(int retry) { return 1000L; } } enum ExponentialBackoff implements RetryStrategy { INSTANCE; @Override public long getDelayMs(int retry) { return 100 + (1L << retry); } } static class Sender { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); private final ConcurrentMap pending = new ConcurrentHashMap<>(); /** Send the given data with given address on the given socket. */ void sendTo(long addr, byte[] data, int socket) { System.err.println("Sending " + Arrays.toString(data) + "@" + addr + " on " + socket); } private class Retrier implements Runnable { private final RetryStrategy retryStrategy; private final long addr; private final byte[] data; private final int socket; private int retry; private Future future; Retrier(RetryStrategy retryStrategy, long addr, byte[] data, int socket) { this.retryStrategy = retryStrategy; this.addr = addr; this.data = data; this.socket = socket; this.retry = 0; } synchronized void start() { if (future == null) { future = executorService.submit(this); pending.put(addr, this); } } synchronized void cancel() { if (future != null) { future.cancel(true); future = null; } } private synchronized void reschedule() { if (future != null) { future = executorService.schedule(this, retryStrategy.getDelayMs(++retry), MILLISECONDS); } } @Override synchronized public void run() { sendTo(addr, data, socket); reschedule(); } } long getVerifiedAddr() { System.err.println("Pending messages: " + pending.size()); Iterator i = pending.keySet().iterator(); long addr = i.hasNext() ? i.next() : 0; return addr; } class CancellationPoller implements Runnable { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { try { Thread.sleep(1000); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } long addr = getVerifiedAddr(); if (addr == 0) { continue; } System.err.println("Verified message (to be cancelled) " + addr); Retrier retrier = pending.remove(addr); if (retrier != null) { retrier.cancel(); } } } } Sender initialize() { executorService.submit(new CancellationPoller()); return this; } void sendWithRetriesTo(RetryStrategy retryStrategy, long addr, byte[] data, int socket) { new Retrier(retryStrategy, addr, data, socket).start(); } } public static void main(String[] args) { Sender sender = new Sender().initialize(); for (long i = 1; i <= 10; i++) { sender.sendWithRetriesTo(ConstantBackoff.INSTANCE, i, null, 42); } for (long i = -1; i >= -10; i--) { sender.sendWithRetriesTo(ExponentialBackoff.INSTANCE, i, null, 37); } } }