从普通Java事件创建Observable

从传统的Java事件模式创建Rx-Java Observable的最佳方法是什么? 也就是说,给定

 class FooEvent { ... } interface FooListener { void fooHappened(FooEvent arg); } class Bar { public void addFooListener(FooListener l); public void removeFooListener(FooListener l); } 

我想实施

 Observable fooEvents(Bar bar); 

我想出的实现是:

 Observable fooEvents(Bar bar) { return Observable.create(new OnSubscribeFunc() { public Subscription onSubscribe(Observer obs) { FooListener l = new FooListener() { public void fooHappened(FooEvent arg) { obs.onNext(arg); } }; bar.addFooListener(l); return new Subscription() { public void unsubscribe() { bar.removeFooListener(l); } }; } }); } 

但是,我真的不喜欢它:

  1. 它非常冗长;

  2. 每个Observer需要一个监听器(理想情况下,如果没有观察者,则应该没有监听器,否则应该有一个监听器)。 这可以通过将观察者计数保持为OnSubscribeFunc的字段来OnSubscribeFunc ,在订阅时递增它并在取消订阅时递减。

有更好的解决方案吗?

要求:

  1. 使用事件模式的现有实现而不更改它们(如果我控制该代码,我可以编写它以返回我需要的Observable )。

  2. 如果/当源API发生变化时,会出现编译错误。 不使用Object而不是实际的事件参数类型或使用属性名称字符串。

我认为没有办法为每个可能的事件创建一个通用的observable,但你可以在任何需要的地方使用它们。

RxJava源代码有一些方便的示例,说明如何从鼠标事件,按钮事件等创建observable。看一下这个类,它从KeyEvents创建它们: KeyEventSource.java 。

你的实现绝对正确。

它非常冗长

使用lambdas(RxJava 2的示例)变得更加冗长:

 Observable fooEvents(Bar bar) { return Observable.create(emitter -> { FooListener listener = event -> emitter.onNext(event); bar.addFooListener(listener); emitter.setCancellable(() -> bar.removeFooListener(listener)); }); } 

理想情况下,如果没有观察者,则应该没有听众,否则就应该有一个听众

您可以使用share()运算符,这会使您的observable ,即所有订阅者共享单个订阅。 它会自动订阅第一个订阅者,并取消订阅最后一个订阅者取消订阅:

 fooEvents(bar).share() 

我想你可以喝同样的汤,如果你使用另一层听众作为实际回调和你的观察者之间的桥梁,就重新加热。 Actual callback → bridge callback → Observer

优点:

  • 更线性的代码
  • 观察者之外的一个实际回调实例
  • 对于高阶函数看起来特别好,比如kotlin中的函数文字:

Ex(注意创建可观察的闭包有多小):

 class LocationService @Inject constructor(private val googleApiClient: GoogleApiClient) : ConnectionCallbacks{ val locationObservable: Observable private var passToObservable: (Location?) -> Unit = {} init { locationObservable = Observable.create { subscription -> passToObservable = { location -> subscription.onNext(location) } }.doOnSubscribe { googleApiClient.registerConnectionCallbacks(this) googleApiClient.connect() }.doOnUnsubscribe { googleApiClient.unregisterConnectionCallbacks(this) } } override fun onConnected(connectionHint: Bundle?) { val location = LocationServices.FusedLocationApi.getLastLocation(googleApiClient) passToObservable(location) } override fun onConnectionSuspended(cause: Int) { //... } } 

如果你想要一些简单和内置的东西,试试这个方法http://examples.javacodegeeks.com/core-java/beans/bean-property-change-event-listener/

 java.beans.PropertyChangeEvent; java.beans.PropertyChangeListener; java.beans.PropertyChangeSupport; 

从该网站,有一个片段,显示如何使用它

 package com.javacodegeeks.snippets.core; import java.beans.PropertyChangeEvent; import java.beans.PropertyChangeListener; import java.beans.PropertyChangeSupport; public class BeanPropertyChangeEventListener { public static void main(String[] args) throws Exception { Bean bean = new Bean(); bean.addPropertyChangeListener(new MyPropertyChangeListener()); bean.setProperty1("newProperty1"); bean.setProperty2(123); bean.setProperty1("newnewProperty1"); bean.setProperty2(234); } public static class MyPropertyChangeListener implements PropertyChangeListener { // This method is called every time the property value is changed public void propertyChange(PropertyChangeEvent evt) { System.out.println("Name = " + evt.getPropertyName()); System.out.println("Old Value = " + evt.getOldValue()); System.out.println("New Value = " + evt.getNewValue()); System.out.println("**********************************"); } } public static class Bean { private PropertyChangeSupport pcs = new PropertyChangeSupport(this); // Property property1 private String property1; // Property property2 private int property2; public String getProperty1() { return property1; } public void setProperty1(String property1) { pcs.firePropertyChange("property1", this.property1, property1); this.property1 = property1; } public int getProperty2() { return property2; } public void setProperty2(int property2) { pcs.firePropertyChange("property2", this.property2, property2); this.property2 = property2; } public void addPropertyChangeListener(PropertyChangeListener listener) { pcs.addPropertyChangeListener(listener); } } } 

这很简单