recently, when I was learning reactor, "s back pressure, I saw that there was an onBackpressureBuffer method, but it didn"t seem to work after configuring buffer. The code is as follows, given a situation where the production speed is much greater than the consumption speed, and the buffer value is small, it is expected to throw an exception, but there is no, where is the use of the wrong posture?
@Test
public void test_onBackpressureBuffer() throws InterruptedException {
Flux.interval(Duration.of(10, ChronoUnit.MILLIS)) //10
.onBackpressureBuffer(10) //buffer10
.subscribe( //
i -> {
System.out.println(Thread.currentThread().getName() + " "+i);
try {
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e.getMessage());
}
},
System.out::println
);
TimeUnit.MINUTES.sleep(10);
}
similar logic, using RxJava2, will cause a normal exception:
Observable.interval(1, TimeUnit.MILLISECONDS)
.toFlowable(BackpressureStrategy.BUFFER)
.onBackpressureBuffer(256)
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) {
}
});
TimeUnit.SECONDS.sleep(1);
exception information is as follows:
0
1
2
io.reactivex.exceptions.OnErrorNotImplementedException: Buffer is full
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.checkTerminated(FlowableObserveOn.java:207)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:392)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)