생성 된 목록의 각 항목에 대한 RxJava 지연
Rx에서 상당히 간단하다고 생각한 것을 구현하기 위해 고군분투하고 있습니다.
항목 목록이 있고 각 항목을 지연하여 내보내고 싶습니다.
Rx delay () 연산자는 각 개별 항목이 아닌 지정된 지연만큼 모든 항목의 방출을 이동시키는 것 같습니다.
다음은 몇 가지 테스트 코드입니다. 목록의 항목을 그룹화합니다. 각 그룹은 방출되기 전에 지연을 적용해야합니다.
Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.delay(50, TimeUnit.MILLISECONDS)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
결과는 다음과 같습니다.
154ms
[5]
155ms
[2]
155ms
[1]
155ms
[3]
155ms
[4]
그러나 내가 기대하는 것은 다음과 같습니다.
174ms
[5]
230ms
[2]
285ms
[1]
345ms
[3]
399ms
[4]
내가 도대체 뭘 잘못하고있는 겁니까?
이를 수행하는 한 가지 방법 은 출력을 지연시키기 위해 zip
Observable과 Observable을 결합하는 데 사용 하는 것입니다 Interval
.
Observable.zip(Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
이를 수행하는 가장 간단한 방법은 concatMap을 사용하고 지연된 Obserable에 각 항목을 래핑하는 것입니다.
long startTime = System.currentTimeMillis();
Observable.range(1, 5)
.concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
.doOnNext(i-> System.out.println(
"Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
.toCompletable().await();
인쇄물:
Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms
간격을두고 컬렉션의 각 항목을 내보내는 간단한 방법을 공유하면됩니다.
Observable.just(1,2,3,4,5)
.zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
.subscribe(System.out::println);
각 항목은 500 밀리 초마다 방출됩니다.
kotlin 사용자를 위해 '간격이있는 zip'접근 방식에 대한 확장 기능을 작성했습니다.
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit
fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
Observable.zip(
this,
Observable.interval(interval, timeUnit),
BiFunction { item, _ -> item }
)
동일한 방식으로 작동하지만 재사용이 가능합니다. 예:
Observable.range(1, 5)
.delayEach(1, TimeUnit.SECONDS)
나는 그것이 정확히 당신이 필요하다고 생각합니다. 보세요 :
long startTime = System.currentTimeMillis();
Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
.timestamp(TimeUnit.MILLISECONDS)
.subscribe(emitTime -> {
System.out.println(emitTime.time() - startTime);
});
다음 과 같은 사용자 지정 rx 연산자를 구현 한 MinRegularIntervalDelayOperator
다음이를 lift
함수 와 함께 사용할 수 있습니다.
Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.lift(new MinRegularIntervalDelayOperator<Integer>(50L))
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
방출되는 각 항목 사이에 지연을 도입하는 것이 유용합니다.
List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));
Observable.fromIterable(letters)
.concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
.take(1)
.map(second -> item))
.subscribe(System.out::println);
https://github.com/ReactiveX/RxJava/issues/3505 에서 더 많은 좋은 옵션
각 그룹을 지연 시키려면 그룹 flatMap()
방출을 지연시키는 Observable을 반환하도록 변경할 수 있습니다 .
Observable
.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g ->
Observable
.timer(50, TimeUnit.MILLISECONDS)
.flatMap(t -> g.toList())
)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
그렇게 깔끔하지 않은 방법은 .delay (Func1) 연산자를 사용하여 반복으로 지연을 변경하는 것입니다.
Observable.range(1, 5)
.delay(n -> n*50)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
There is other way to do it using concatMap as concatMap returns observable of source items. so we can add delay on that observable.
here what i have tried.
Observable.range(1, 5)
.groupBy(n -> n % 5)
.concatMap(integerIntegerGroupedObservable ->
integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS))
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
You can use
Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long, Integer>() {
@Override
public Integer apply(Long aLong) throws Exception {
return aLong.intValue() + 1;
}
})
.startWith(0)
.take(listInput.size())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer index) throws Exception {
Log.d(TAG, "---index of your list --" + index);
}
});
This code above not duplicate value(index). "I'm sure"
I think you want this:
Observable.range(1, 5)
.delay(50, TimeUnit.MILLISECONDS)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
This way it will delay the numbers going into the group rather than delaying the reduced list by 5 seconds.
You can add a delay between emitted items by using flatMap, maxConcurrent and delay()
Here is an example - emit 0..4 with delay
@Test
fun testEmitWithDelays() {
val DELAY = 500L
val COUNT = 5
val latch = CountDownLatch(1)
val startMoment = System.currentTimeMillis()
var endMoment : Long = 0
Observable
.range(0, COUNT)
.flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1
.subscribe(
{ println("... value: $it, ${System.currentTimeMillis() - startMoment}") },
{},
{
endMoment = System.currentTimeMillis()
latch.countDown()
})
latch.await()
assertTrue { endMoment - startMoment >= DELAY * COUNT }
}
... value: 0, 540
... value: 1, 1042
... value: 2, 1544
... value: 3, 2045
... value: 4, 2547
you should be able to achieve this by using Timer
operator. I tried with delay
but couldn't achieve the desired output. Note nested operations done in flatmap
operator.
Observable.range(1,5)
.flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
.map(y -> x))
// attach timestamp
.timestamp()
.subscribe(timedIntegers ->
Log.i(TAG, "Timed String: "
+ timedIntegers.value()
+ " "
+ timedIntegers.time()));
Observable.just("A", "B", "C", "D", "E", "F")
.flatMap { item -> Thread.sleep(2000)
Observable.just( item ) }
.subscribe { println( it ) }
ReferenceURL : https://stackoverflow.com/questions/33291245/rxjava-delay-for-each-item-of-list-emitted
'IT story' 카테고리의 다른 글
% 및 _와 같은 SQLite (0) | 2020.12.28 |
---|---|
장고 모델 : ID 목록 가져 오기 (0) | 2020.12.28 |
선택한 항목 (ListView에서)이 ContextMenu (Android)를 생성했는지 감지 (0) | 2020.12.28 |
Yarn에서 글로벌 패키지를 업데이트하려면 어떻게해야합니까? (0) | 2020.12.25 |
RESTful 서비스를 디버깅하는 방법은 무엇입니까? (0) | 2020.12.25 |