RxJava – 终止无限流

我正在探索反应式编程和RxJava。 这很有趣,但我遇到了一个无法找到答案的问题。 我的基本问题:什么是一种react native的方法来终止一个无限运行的Observable? 我也欢迎关于我的代码的批评和react native最佳实践。

作为练习,我正在编写一个日志文件尾实用程序。 日志文件中的行流由Observable 。 为了让BufferedReader继续读取添加到文件中的文本,我忽略了通常的reader.readLine() == null终止检查,而是将其解释为意味着我的线程应该hibernate并等待更多的记录器文本。

但是虽然我可以使用takeUntil终止Observer,但我需要找到一种干净的方法来终止无限运行的文件观察器。 我可以编写自己的terminateWatcher方法/字段,但这会破坏Observable / Observer封装 – 我希望尽可能严格遵守反应范式。

这是Observable代码:

 public class FileWatcher implements OnSubscribeFunc { private Path path = . . .; @Override // The  generic is pointless but required by the compiler public Subscription onSubscribe(Observer observer) { try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) { String newLine = ""; while (!Thread.interrupted()) { // How do I terminate this reactively? if ((newLine = reader.readLine()) != null) observer.onNext(newLine); else try { // Wait for more text Thread.sleep(250); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } observer.onCompleted(); } catch (Exception e) { observer.onError(e); } return null; // Not sure what Subscription I should return } } 

以下是Observer代码,用于打印新行:

 public static void main(String... args) { . . . Observable lines = Observable.create(createWatcher(file)); lines = lines.takeWhile(new Func1() { @Override public Boolean call(String line) { // Predicate for which to continue processing return !line.contains("shutdown"); } }).subscribeOn(Schedulers.threadPoolForIO()) .observeOn(Schedulers.currentThread()); // Seems like I should use subscribeOn() and observeOn(), but they // make my tailer terminate without reading any text. Subscription subscription = lines.subscribe(new Action1() { @Override public void call(String line) { System.out.printf("%20s\t%s\n", file, line); } }); } 

我的两个问题是:

  1. 什么是终止无限运行流的反应一致方法?
  2. 我的代码中还有哪些错误会让你哭泣? 🙂

由于您在请求订阅时正在启动文件监视,因此在订阅结束时(即Subscription关闭时)终止它是有意义的。 这解决了代码注释中的一个松散结束:您返回一个Subscription ,它告诉FileWatcher停止观看该文件。 然后替换循环条件,以便检查是否已取消订阅,而不是检查当前线程是否已被中断。

问题是,如果你的onSubscribe()方法永远不会返回, onSubscribe()没有用了。 也许您的FileWatcher应该要求指定一个调度程序,并在该调度程序上进行读取。