1

Хочу подписаться на ответ, полученный на запрос из сети.

Допустим у меня есть такой интерфейс:

interface Net {
    // Посылает запрос в сеть, а когда приходит ответ
    // дергает callback в своем сетевом потоке
    void sendRequestGetResponse(String request, Consumer<String> responseCallback);
}

Но я не хочу обрабатывать данные в сетевом потоке, мне надо обрабатывать их в нужном потоке, например в моем ThreadPoolExecutor.

И хочется сделать что-то такое:

Observable<String> observable = <как то создали свой observable, я не знаю как>;

// Подписались на обработку ответа в нужном executor
observable.subscribeOn(Schedulers.from(executor)).subscribe(response -> {
    System.out.println("received response " + response);
});

net.sendRequestGetResponse("hello!", response -> {
    // Получили ответ и положили его в observable
    observable.push(response);
});

Как это правильно делается с помощью RxJava?

  • Судя по всему, вам нужен observeOn. События будут доставляться туда, куда вы скажете. – VladD Jun 10 '16 at 18:29
  • Я догадываюсь что нужно что-то такое, и даже попытался это сделать с помощью observable.subscribeOn(Schedulers.from(executor)).

    Мне непонятно, как полученный в сетевом потоке response засунуть в мой observable.

    – Натрий Хлорий Jun 10 '16 at 18:33
  • 1
    Не, вам нужен именно observeOn. subscribeOn производит подписку в потоке, который вы указали. А observeOn доставляет события в поток, который вы указали. – VladD Jun 10 '16 at 18:40
  • Ах, вот оно что. Спасибо, пусть будет observeOn. Но как все таки ответ из сетевого calback переложить? – Натрий Хлорий Jun 10 '16 at 18:43
  • Ну, observable.observeOn(<куда-вы-там-хотите-переложить>).subscribe(response -> { System.out.println("received response " + response); });, судя по всему. (У меня компилятора нет под рукой, чтобы проверить, так что помогайте.) – VladD Jun 10 '16 at 18:45
  • В качестве первоначального Observable возьмите хоть Subject (наверняка есть более идиоматический путь, но и Subject покатит) – VladD Jun 10 '16 at 18:49
  • А, я кажется начинаю врубаться. Попробую написать ответ к своему вопросу. Но вообще что-то голова идет кругом от изучения этого rx. – Натрий Хлорий Jun 10 '16 at 19:04

1 Answers1

1

После некоторого обдумывания кажется понял, как это делать:

net.sendRequestGetResponse("hello!", response -> {

    Observable.just(response).
            observeOn(Schedulers.from(executor)).
            subscribe(response1 -> {
                System.out.println("received response " + response1 +
                    ", thread = " + Thread.currentThread().getName());
            });
});

По крайней мере тест напечал то что требуется.

  • А если вместо Observable.just всё же взять всё же один subject? – VladD Jun 10 '16 at 19:48
  • Я попробовал заменить на PublishSubject.create() - тоже работает. – Натрий Хлорий Jun 10 '16 at 22:22
  • Идея в том, чтобы вы создали Subject один раз. Тогда sendRequestGetResponse будет в него плеваться сообщениями, а подписка и приём может проходить в другой части программы. – VladD Jun 10 '16 at 22:26
  • Да, так тоже работает. Я вынес PublishSubject.create() наружу и кладу в него ответы в sendRequestGetResponse. Подписка ловит. – Натрий Хлорий Jun 11 '16 at 07:08