[cxf] branch master updated: CXF-8349: Flux Returns Mixed Response on Errors and Mono Hangs when Empty (#705)

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[cxf] branch master updated: CXF-8349: Flux Returns Mixed Response on Errors and Mono Hangs when Empty (#705)

reta
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 57bb279  CXF-8349: Flux Returns Mixed Response on Errors and Mono Hangs when Empty (#705)
57bb279 is described below

commit 57bb279cb8404e81369e1327c436578937984e23
Author: Andriy Redko <[hidden email]>
AuthorDate: Sat Oct 10 11:06:07 2020 -0400

    CXF-8349: Flux Returns Mixed Response on Errors and Mono Hangs when Empty (#705)
---
 .../server/ResponseStatusOnlyException.java}       |  39 ++++---
 .../server/ResponseStatusOnlyExceptionMapper.java  |  44 ++++++++
 .../server/StreamingAsyncSubscriber.java           |   5 +-
 .../jaxrs/reactor/server/ReactorCustomizer.java    |   3 +
 .../cxf/jaxrs/reactor/server/ReactorInvoker.java   |  23 ++--
 .../cxf/systest/jaxrs/reactor/FluxReactorTest.java | 120 +++++++++++++++++++++
 .../cxf/systest/jaxrs/reactor/FluxService.java     |  51 +++++++++
 .../jaxrs/reactor/IllegalStateExceptionMapper.java |  27 +++--
 .../cxf/systest/jaxrs/reactor/MonoReactorTest.java |  76 +++++++------
 .../cxf/systest/jaxrs/reactor/MonoService.java     |   9 ++
 .../cxf/systest/jaxrs/reactor/ReactorServer.java   |   1 +
 11 files changed, 328 insertions(+), 70 deletions(-)

diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/ResponseStatusOnlyException.java
similarity index 52%
copy from rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
copy to rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/ResponseStatusOnlyException.java
index a57b574..37b0299 100644
--- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
+++ b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/ResponseStatusOnlyException.java
@@ -16,21 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.cxf.jaxrs.reactor.server;
 
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
-import org.apache.cxf.service.invoker.Invoker;
+package org.apache.cxf.jaxrs.reactivestreams.server;
 
-public class ReactorCustomizer extends AbstractStreamingResponseExtension {
-    @Override
-    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
-        Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true)
-                .getOrDefault("useStreamingSubscriber", null);
-        ReactorInvoker invoker = new ReactorInvoker();
-        if (useStreamingSubscriber != null) {
-            invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
-        }
-        return invoker;
+/**
+ * Used in cases when the stream has emitted some elements already but than the error
+ * was encountered. If it happens, the error payload could not be returned (leading to
+ * mixed stream of elements and error) but only response status could be set.
+ */
+class ResponseStatusOnlyException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    ResponseStatusOnlyException() {
+        super();
+    }
+
+    ResponseStatusOnlyException(String message) {
+        super(message);
+    }
+
+    ResponseStatusOnlyException(String message, Throwable cause) {
+        super(message, cause);
+    }
+    
+    ResponseStatusOnlyException(Throwable cause) {
+        super(cause);
     }
-}
\ No newline at end of file
+}
diff --git a/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/ResponseStatusOnlyExceptionMapper.java b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/ResponseStatusOnlyExceptionMapper.java
new file mode 100644
index 0000000..0b1179e
--- /dev/null
+++ b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/ResponseStatusOnlyExceptionMapper.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.reactivestreams.server;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.cxf.jaxrs.utils.ExceptionUtils;
+import org.apache.cxf.jaxrs.utils.JAXRSUtils;
+import org.apache.cxf.message.Message;
+
+@Provider
+public class ResponseStatusOnlyExceptionMapper implements ExceptionMapper<ResponseStatusOnlyException> {
+    @Override
+    public Response toResponse(ResponseStatusOnlyException exception) {
+        final Message message = JAXRSUtils.getCurrentMessage();
+        final Throwable cause = exception.getCause();
+        final Response response = ExceptionUtils.convertFaultToResponse(cause, message);
+        
+        if (response != null) {
+            return Response.fromResponse(response).entity(null).build();
+        } else {
+            return Response.serverError().build();
+        }
+    }
+}
diff --git a/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java
index b95c4b8..680de9d 100644
--- a/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java
+++ b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java
@@ -124,7 +124,10 @@ public class StreamingAsyncSubscriber<T> extends AbstractSubscriber<T> {
             }
 
             if (throwable != null) {
-                if (throwable instanceof RuntimeException) {
+                // non-empty stream
+                if (firstWriteDone.get()) {
+                    throw new ResponseStatusOnlyException(throwable);
+                } else if (throwable instanceof RuntimeException) {
                     throw (RuntimeException)throwable;
                 } else if (throwable instanceof IOException) {
                     throw (IOException)throwable;
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
index a57b574..33ec37b 100644
--- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
@@ -20,6 +20,7 @@ package org.apache.cxf.jaxrs.reactor.server;
 
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.jaxrs.reactivestreams.server.ResponseStatusOnlyExceptionMapper;
 import org.apache.cxf.service.invoker.Invoker;
 
 public class ReactorCustomizer extends AbstractStreamingResponseExtension {
@@ -31,6 +32,8 @@ public class ReactorCustomizer extends AbstractStreamingResponseExtension {
         if (useStreamingSubscriber != null) {
             invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
         }
+        
+        bean.setProvider(new ResponseStatusOnlyExceptionMapper());
         return invoker;
     }
 }
\ No newline at end of file
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
index 82ad81a..43c975c 100644
--- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
@@ -18,6 +18,8 @@
  */
 package org.apache.cxf.jaxrs.reactor.server;
 
+import java.util.Collections;
+
 import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
 import org.apache.cxf.message.Message;
@@ -32,20 +34,29 @@ public class ReactorInvoker extends AbstractReactiveInvoker {
             final Flux<?> flux = (Flux<?>) result;
             final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
             if (!isStreamingSubscriberUsed(flux, asyncResponse, inMessage)) {
-                flux.doOnNext(asyncResponse::resume)
-                    .doOnError(t -> handleThrowable(asyncResponse, t))
-                    .subscribe();
+                subscribe(flux, asyncResponse);
             }
             return asyncResponse;
         } else if (result instanceof Mono) {
             final Mono<?> mono = (Mono<?>) result;
             final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
-            mono.doOnNext(asyncResponse::resume)
-                .doOnError(t -> handleThrowable(asyncResponse, t))
-                .subscribe();
+            subscribe(mono, asyncResponse);
             return asyncResponse;
         }
         return null;
     }
+
+    private void subscribe(final Mono<?> mono, final AsyncResponseImpl asyncResponse) {
+        mono.doOnSuccess(asyncResponse::resume)
+            .doOnError(t -> handleThrowable(asyncResponse, t))
+            .subscribe();
+    }
+
+    private <T> void subscribe(final Flux<T> flux, final AsyncResponseImpl asyncResponse) {
+        flux.doOnNext(asyncResponse::resume)
+            .switchIfEmpty(Mono.<T>empty().doOnSuccess(v -> asyncResponse.resume(Collections.emptyList())))
+            .doOnError(t -> handleThrowable(asyncResponse, t))
+            .subscribe();
+    }
     
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
index 4abe541..b5e6212 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
@@ -19,7 +19,10 @@
 
 package org.apache.cxf.systest.jaxrs.reactor;
 
+import java.util.List;
+
 import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
@@ -43,6 +46,7 @@ public class FluxReactorTest extends AbstractBusClientServerTestBase {
         assertTrue("server did not launch correctly", launchServer(ReactorServer.class, true));
         createStaticBus();
     }
+    
     @Test
     public void testGetHelloWorldJson() throws Exception {
         String address = "http://localhost:" + PORT + "/reactor/flux/textJson";
@@ -59,6 +63,27 @@ public class FluxReactorTest extends AbstractBusClientServerTestBase {
             .expectComplete()
             .verify();
     }
+    
+    @Test
+    public void testGetHelloWorldJsonMany() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor2/flux/textJsonMany";
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .getFlux(HelloWorldBean.class))
+            .expectNextMatches(bean -> bean.getGreeting().equals("Hello 1") && bean.getAudience().equals("World"))
+            .expectNextMatches(bean -> bean.getGreeting().equals("Hello 2") && bean.getAudience().equals("World"))
+            .expectNextMatches(bean -> bean.getGreeting().equals("Hello 3") && bean.getAudience().equals("World"))
+            .expectNextMatches(bean -> bean.getGreeting().equals("Hello 4") && bean.getAudience().equals("World"))
+            .expectNextMatches(bean -> bean.getGreeting().equals("Hello 5") && bean.getAudience().equals("World"))
+            .expectComplete()
+            .verify();
+    }
 
     @Test
     public void testTextJsonImplicitListAsyncStream() throws Exception {
@@ -73,6 +98,23 @@ public class FluxReactorTest extends AbstractBusClientServerTestBase {
     
     @Test
     public void testFluxEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/flux/empty";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .getFlux(HelloWorldBean.class))
+            .expectComplete()
+            .verify();
+    }
+    
+    @Test
+    public void testFluxEmpty2() throws Exception {
         String address = "http://localhost:" + PORT + "/reactor2/flux/empty";
         
         StepVerifier
@@ -158,6 +200,24 @@ public class FluxReactorTest extends AbstractBusClientServerTestBase {
             .expectError()
             .verify();
     }
+    
+    @Test
+    public void testFluxImmediateErrorsWithExceptionMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor2/flux/immediate/mapper/errors";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .get())
+            .expectNextMatches(r -> r.getStatus() == 409 && r.readEntity(String.class).contains("stackTrace"))
+            .expectComplete()
+            .verify();
+    }
 
     @Test
     public void testFluxImmediateErrorsResponse() throws Exception {
@@ -176,6 +236,66 @@ public class FluxReactorTest extends AbstractBusClientServerTestBase {
             .expectComplete()
             .verify();
     }
+    
+    @Test
+    public void testFluxErrorWithExceptionMapperReturnsContentPayload() throws Exception {
+        GenericType<List<HelloWorldBean>> helloWorldBeanListType = new GenericType<List<HelloWorldBean>>() {  };
+        String address = "http://localhost:" + PORT + "/reactor2/flux/mixed/error";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .get())
+            // The response should include the emitted elements prior the error
+            .expectNextMatches(r -> r.getStatus() == 409 && r.readEntity(helloWorldBeanListType).size() == 4)
+            .expectComplete()
+            .verify();
+    }
+    
+    @Test
+    public void testFluxErrorWithWebException() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor2/flux/web/errors";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .get())
+            // The response should not include the exception payload (injected by exception mapper)
+            // if some elements have been emitted before
+            .expectNextMatches(r -> r.getStatus() == 403 && !r.readEntity(String.class).contains("stackTrace"))
+            .expectComplete()
+            .verify();
+    }
+
+    @Test
+    public void testFluxErrorWithExceptionMapperReturnsNoExceptionPayload() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor2/flux/mixed/error";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .get())
+            // The response should not include the exception payload (injected by exception mapper)
+            // if some elements have been emitted before
+            .expectNextMatches(r -> r.getStatus() == 409 && !r.readEntity(String.class).contains("stackTrace"))
+            .expectComplete()
+            .verify();
+    }
 
     private void doTestTextJsonImplicitListAsyncStream(String address) throws Exception {
         StepVerifier
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
index e8f4fd9..17f8465 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
@@ -19,7 +19,9 @@
 
 package org.apache.cxf.systest.jaxrs.reactor;
 
+import javax.ws.rs.ForbiddenException;
 import javax.ws.rs.GET;
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
@@ -43,6 +45,15 @@ public class FluxService {
 
     @GET
     @Produces("application/json")
+    @Path("textJsonMany")
+    public Flux<HelloWorldBean> getJsonMany() {
+        return Flux
+            .range(1, 5)
+            .flatMap(item -> Mono.just(new HelloWorldBean("Hello " + item)));
+    }
+
+    @GET
+    @Produces("application/json")
     @Path("textJsonImplicitListAsyncStream")
     public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
         Flux.just("Hello", "Ciao")
@@ -92,6 +103,21 @@ public class FluxService {
     
     @GET
     @Produces(MediaType.APPLICATION_JSON)
+    @Path("/web/errors")
+    public Flux<HelloWorldBean> webErrors() {
+        return Flux
+            .range(1, 3)
+            .concatMap(item -> {
+                if (item < 3) {
+                    return Mono.just(new HelloWorldBean("Person " + item));
+                } else {
+                    return Mono.error(new ForbiddenException("Oops"));
+                }
+            });
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/immediate/errors")
     public Flux<HelloWorldBean> immediateErrors() {
         return Flux
@@ -101,8 +127,33 @@ public class FluxService {
     
     @GET
     @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/mapper/errors")
+    public Flux<HelloWorldBean> immediateMapperErrors() {
+        return Flux
+            .range(1, 2)
+            .flatMap(item -> Mono.error(new IllegalStateException("Oops")));
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/empty")
     public Flux<HelloWorldBean> empty() {
         return Flux.empty();
     }
+
+    @GET
+    @Path("/mixed/error")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Flux<HelloWorldBean> errorAndData() {
+        return Flux
+            .range(1, 5)
+            .flatMap(item -> {
+                if (item <= 4) {
+                    return Mono.just(new HelloWorldBean(" of Item: " + item));
+                } else {
+                    return Mono.error(new NotFoundException("Item not found"));
+                }
+            })
+            .onErrorMap(e -> new IllegalStateException("Oops", e));
+    }
 }
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/IllegalStateExceptionMapper.java
similarity index 53%
copy from rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/IllegalStateExceptionMapper.java
index a57b574..85d5f39 100644
--- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/IllegalStateExceptionMapper.java
@@ -16,21 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.cxf.jaxrs.reactor.server;
 
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
-import org.apache.cxf.service.invoker.Invoker;
+package org.apache.cxf.systest.jaxrs.reactor;
 
-public class ReactorCustomizer extends AbstractStreamingResponseExtension {
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class IllegalStateExceptionMapper implements ExceptionMapper<IllegalStateException> {
     @Override
-    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
-        Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true)
-                .getOrDefault("useStreamingSubscriber", null);
-        ReactorInvoker invoker = new ReactorInvoker();
-        if (useStreamingSubscriber != null) {
-            invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
-        }
-        return invoker;
+    public Response toResponse(IllegalStateException exception) {
+        return Response
+            .status(409)
+            .entity(exception)
+            .build();
     }
-}
\ No newline at end of file
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
index 4d7be49..18054e3 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
@@ -19,10 +19,6 @@
 
 package org.apache.cxf.systest.jaxrs.reactor;
 
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.MediaType;
 
@@ -32,12 +28,11 @@ import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
 import org.apache.cxf.jaxrs.reactor.client.ReactorInvoker;
 import org.apache.cxf.jaxrs.reactor.client.ReactorInvokerProvider;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import reactor.test.StepVerifier;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 public class MonoReactorTest extends AbstractBusClientServerTestBase {
@@ -51,57 +46,70 @@ public class MonoReactorTest extends AbstractBusClientServerTestBase {
     @Test
     public void testGetHelloWorldJson() throws Exception {
         String address = "http://localhost:" + PORT + "/reactor/mono/textJson";
-        final BlockingQueue<HelloWorldBean> holder = new LinkedBlockingQueue<>();
-        ClientBuilder.newClient()
+
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
                 .register(new JacksonJsonProvider())
                 .register(new ReactorInvokerProvider())
                 .target(address)
                 .request(MediaType.APPLICATION_JSON)
                 .rx(ReactorInvoker.class)
-                .get(HelloWorldBean.class)
-                .doOnNext(holder::offer)
-                .subscribe();
-
-        HelloWorldBean bean = holder.poll(1L, TimeUnit.SECONDS);
-        assertNotNull(bean);
-        assertEquals("Hello", bean.getGreeting());
-        assertEquals("World", bean.getAudience());
+                .get(HelloWorldBean.class))
+            .expectNextMatches(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+            .expectComplete()
+            .verify();
     }
 
     @Test
     public void testTextJsonImplicitListAsyncStream() throws Exception {
         String address = "http://localhost:" + PORT + "/reactor/mono/textJsonImplicitListAsyncStream";
-        final BlockingQueue<HelloWorldBean> holder = new LinkedBlockingQueue<>();
-        ClientBuilder.newClient()
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
                 .register(new JacksonJsonProvider())
                 .register(new ReactorInvokerProvider())
                 .target(address)
                 .request(MediaType.APPLICATION_JSON)
                 .rx(ReactorInvoker.class)
-                .get(HelloWorldBean.class)
-                .doOnNext(holder::offer)
-                .subscribe();
-        HelloWorldBean bean = holder.poll(1L, TimeUnit.SECONDS);
-        assertNotNull(bean);
-        assertEquals("Hello", bean.getGreeting());
-        assertEquals("World", bean.getAudience());
+                .get(HelloWorldBean.class))
+            .expectNextMatches(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+            .expectComplete()
+            .verify();
     }
 
     @Test
     public void testGetString() throws Exception {
         String address = "http://localhost:" + PORT + "/reactor/mono/textAsync";
-        final BlockingQueue<String> holder = new LinkedBlockingQueue<>();
-        ClientBuilder.newClient()
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
                 .register(new ReactorInvokerProvider())
                 .target(address)
                 .request(MediaType.TEXT_PLAIN)
                 .rx(ReactorInvoker.class)
-                .get(String.class)
-                .doOnNext(holder::offer)
-                .subscribe();
-
-        String value = holder.poll(1L, TimeUnit.SECONDS);
-        assertNotNull(value);
-        assertEquals("Hello, world!", value);
+                .get(String.class))
+            .expectNextMatches(r -> "Hello, world!".equals(r))
+            .expectComplete()
+            .verify();
+    }
+    
+    @Test
+    public void testMonoEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/mono/empty";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .get(HelloWorldBean.class))
+            .expectComplete()
+            .verify();
     }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
index ebec34d..300def8 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
@@ -24,6 +24,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
 
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
 import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
@@ -58,6 +59,14 @@ public class MonoService {
                 .subscribe(new StringAsyncSubscriber(ar));
 
     }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/empty")
+    public Mono<HelloWorldBean> empty() {
+        return Mono.empty();
+    }
+
 
     private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
         StringAsyncSubscriber(AsyncResponse ar) {
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
index 88de2ab..4f7f4b3 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
@@ -54,6 +54,7 @@ public class ReactorServer extends AbstractBusTestServerBase {
         JAXRSServerFactoryBean sf2 = new JAXRSServerFactoryBean();
         sf2.setProvider(new JacksonJsonProvider());
         sf2.setProvider(new IllegalArgumentExceptionMapper());
+        sf2.setProvider(new IllegalStateExceptionMapper());
         new ReactorCustomizer().customize(sf2);
         sf2.setResourceClasses(FluxService.class);
         sf2.setResourceProvider(FluxService.class,