[cxf] branch 3.2.x-fixes updated (7bc7ad9 -> 9cb82de)

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[cxf] branch 3.2.x-fixes updated (7bc7ad9 -> 9cb82de)

reta
This is an automated email from the ASF dual-hosted git repository.

reta pushed a change to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git.


    from 7bc7ad9  Recording .gitmergeinfo Changes
     new caf4e4c  CXF-8349: Flux Returns Mixed Response on Errors and Mono Hangs when Empty (#705)
     new 9cb82de  Recording .gitmergeinfo Changes

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitmergeinfo                                      |   1 +
 .../server/ResponseStatusOnlyException.java        |  21 ++--
 .../server/ResponseStatusOnlyExceptionMapper.java  |  20 +++-
 .../server/StreamingAsyncSubscriber.java           |   5 +-
 .../jaxrs/reactor/server/ReactorCustomizer.java    |   3 +
 .../cxf/jaxrs/reactor/server/ReactorInvoker.java   |  23 ++--
 .../cxf/systest/jaxrs/reactor/FluxReactorTest.java | 120 +++++++++++++++++++++
 .../cxf/systest/jaxrs/reactor/FluxService.java     |  51 +++++++++
 ...apper.java => IllegalStateExceptionMapper.java} |   9 +-
 .../cxf/systest/jaxrs/reactor/MonoReactorTest.java |  74 +++++++------
 .../cxf/systest/jaxrs/reactor/MonoService.java     |   9 ++
 .../cxf/systest/jaxrs/reactor/ReactorServer.java   |   1 +
 12 files changed, 284 insertions(+), 53 deletions(-)
 copy core/src/main/java/org/apache/cxf/io/CacheSizeExceededException.java => rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/ResponseStatusOnlyException.java (59%)
 copy systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/MappedExceptionMapper.java => rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/ResponseStatusOnlyExceptionMapper.java (55%)
 copy systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/{IllegalArgumentExceptionMapper.java => IllegalStateExceptionMapper.java} (79%)

Reply | Threaded
Open this post in threaded view
|

[cxf] 01/02: CXF-8349: Flux Returns Mixed Response on Errors and Mono Hangs when Empty (#705)

reta
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit caf4e4c50aefd181ff11ae104fea10a03bc1fa6a
Author: Andriy Redko <[hidden email]>
AuthorDate: Sat Oct 10 11:06:07 2020 -0400

    CXF-8349: Flux Returns Mixed Response on Errors and Mono Hangs when Empty (#705)
   
    (cherry picked from commit 57bb279cb8404e81369e1327c436578937984e23)
    (cherry picked from commit aa0e937735f876a58e20e3da817b30bd2c8cf0db)
   
    # Conflicts:
    # systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
---
 .../server/ResponseStatusOnlyException.java}       |  39 ++++---
 .../server/ResponseStatusOnlyExceptionMapper.java  |  44 ++++++++
 .../server/StreamingAsyncSubscriber.java           |   5 +-
 .../jaxrs/reactor/server/ReactorCustomizer.java    |   3 +
 .../cxf/jaxrs/reactor/server/ReactorInvoker.java   |  23 ++--
 .../cxf/systest/jaxrs/reactor/FluxReactorTest.java | 120 +++++++++++++++++++++
 .../cxf/systest/jaxrs/reactor/FluxService.java     |  51 +++++++++
 .../jaxrs/reactor/IllegalStateExceptionMapper.java |  27 +++--
 .../cxf/systest/jaxrs/reactor/MonoReactorTest.java |  74 +++++++------
 .../cxf/systest/jaxrs/reactor/MonoService.java     |   9 ++
 .../cxf/systest/jaxrs/reactor/ReactorServer.java   |   1 +
 11 files changed, 329 insertions(+), 67 deletions(-)

diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/ResponseStatusOnlyException.java
similarity index 52%
copy from rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
copy to rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/ResponseStatusOnlyException.java
index a57b574..37b0299 100644
--- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
+++ b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/ResponseStatusOnlyException.java
@@ -16,21 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.cxf.jaxrs.reactor.server;
 
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
-import org.apache.cxf.service.invoker.Invoker;
+package org.apache.cxf.jaxrs.reactivestreams.server;
 
-public class ReactorCustomizer extends AbstractStreamingResponseExtension {
-    @Override
-    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
-        Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true)
-                .getOrDefault("useStreamingSubscriber", null);
-        ReactorInvoker invoker = new ReactorInvoker();
-        if (useStreamingSubscriber != null) {
-            invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
-        }
-        return invoker;
+/**
+ * Used in cases when the stream has emitted some elements already but than the error
+ * was encountered. If it happens, the error payload could not be returned (leading to
+ * mixed stream of elements and error) but only response status could be set.
+ */
+class ResponseStatusOnlyException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    ResponseStatusOnlyException() {
+        super();
+    }
+
+    ResponseStatusOnlyException(String message) {
+        super(message);
+    }
+
+    ResponseStatusOnlyException(String message, Throwable cause) {
+        super(message, cause);
+    }
+    
+    ResponseStatusOnlyException(Throwable cause) {
+        super(cause);
     }
-}
\ No newline at end of file
+}
diff --git a/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/ResponseStatusOnlyExceptionMapper.java b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/ResponseStatusOnlyExceptionMapper.java
new file mode 100644
index 0000000..0b1179e
--- /dev/null
+++ b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/ResponseStatusOnlyExceptionMapper.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.reactivestreams.server;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.cxf.jaxrs.utils.ExceptionUtils;
+import org.apache.cxf.jaxrs.utils.JAXRSUtils;
+import org.apache.cxf.message.Message;
+
+@Provider
+public class ResponseStatusOnlyExceptionMapper implements ExceptionMapper<ResponseStatusOnlyException> {
+    @Override
+    public Response toResponse(ResponseStatusOnlyException exception) {
+        final Message message = JAXRSUtils.getCurrentMessage();
+        final Throwable cause = exception.getCause();
+        final Response response = ExceptionUtils.convertFaultToResponse(cause, message);
+        
+        if (response != null) {
+            return Response.fromResponse(response).entity(null).build();
+        } else {
+            return Response.serverError().build();
+        }
+    }
+}
diff --git a/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java
index cae8f6e..20f527f 100644
--- a/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java
+++ b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java
@@ -124,7 +124,10 @@ public class StreamingAsyncSubscriber<T> extends AbstractSubscriber<T> {
             }
 
             if (throwable != null) {
-                if (throwable instanceof RuntimeException) {
+                // non-empty stream
+                if (firstWriteDone.get()) {
+                    throw new ResponseStatusOnlyException(throwable);
+                } else if (throwable instanceof RuntimeException) {
                     throw (RuntimeException)throwable;
                 } else if (throwable instanceof IOException) {
                     throw (IOException)throwable;
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
index a57b574..33ec37b 100644
--- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
@@ -20,6 +20,7 @@ package org.apache.cxf.jaxrs.reactor.server;
 
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.jaxrs.reactivestreams.server.ResponseStatusOnlyExceptionMapper;
 import org.apache.cxf.service.invoker.Invoker;
 
 public class ReactorCustomizer extends AbstractStreamingResponseExtension {
@@ -31,6 +32,8 @@ public class ReactorCustomizer extends AbstractStreamingResponseExtension {
         if (useStreamingSubscriber != null) {
             invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
         }
+        
+        bean.setProvider(new ResponseStatusOnlyExceptionMapper());
         return invoker;
     }
 }
\ No newline at end of file
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
index 82ad81a..43c975c 100644
--- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
+++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
@@ -18,6 +18,8 @@
  */
 package org.apache.cxf.jaxrs.reactor.server;
 
+import java.util.Collections;
+
 import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
 import org.apache.cxf.message.Message;
@@ -32,20 +34,29 @@ public class ReactorInvoker extends AbstractReactiveInvoker {
             final Flux<?> flux = (Flux<?>) result;
             final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
             if (!isStreamingSubscriberUsed(flux, asyncResponse, inMessage)) {
-                flux.doOnNext(asyncResponse::resume)
-                    .doOnError(t -> handleThrowable(asyncResponse, t))
-                    .subscribe();
+                subscribe(flux, asyncResponse);
             }
             return asyncResponse;
         } else if (result instanceof Mono) {
             final Mono<?> mono = (Mono<?>) result;
             final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
-            mono.doOnNext(asyncResponse::resume)
-                .doOnError(t -> handleThrowable(asyncResponse, t))
-                .subscribe();
+            subscribe(mono, asyncResponse);
             return asyncResponse;
         }
         return null;
     }
+
+    private void subscribe(final Mono<?> mono, final AsyncResponseImpl asyncResponse) {
+        mono.doOnSuccess(asyncResponse::resume)
+            .doOnError(t -> handleThrowable(asyncResponse, t))
+            .subscribe();
+    }
+
+    private <T> void subscribe(final Flux<T> flux, final AsyncResponseImpl asyncResponse) {
+        flux.doOnNext(asyncResponse::resume)
+            .switchIfEmpty(Mono.<T>empty().doOnSuccess(v -> asyncResponse.resume(Collections.emptyList())))
+            .doOnError(t -> handleThrowable(asyncResponse, t))
+            .subscribe();
+    }
     
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
index e9c2c43..8ca4223 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
@@ -19,7 +19,10 @@
 
 package org.apache.cxf.systest.jaxrs.reactor;
 
+import java.util.List;
+
 import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
@@ -41,6 +44,7 @@ public class FluxReactorTest extends AbstractBusClientServerTestBase {
         assertTrue("server did not launch correctly", launchServer(ReactorServer.class, true));
         createStaticBus();
     }
+    
     @Test
     public void testGetHelloWorldJson() throws Exception {
         String address = "http://localhost:" + PORT + "/reactor/flux/textJson";
@@ -58,6 +62,27 @@ public class FluxReactorTest extends AbstractBusClientServerTestBase {
             .expectComplete()
             .verify();
     }
+    
+    @Test
+    public void testGetHelloWorldJsonMany() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor2/flux/textJsonMany";
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .getFlux(HelloWorldBean.class))
+            .expectNextMatches(bean -> bean.getGreeting().equals("Hello 1") && bean.getAudience().equals("World"))
+            .expectNextMatches(bean -> bean.getGreeting().equals("Hello 2") && bean.getAudience().equals("World"))
+            .expectNextMatches(bean -> bean.getGreeting().equals("Hello 3") && bean.getAudience().equals("World"))
+            .expectNextMatches(bean -> bean.getGreeting().equals("Hello 4") && bean.getAudience().equals("World"))
+            .expectNextMatches(bean -> bean.getGreeting().equals("Hello 5") && bean.getAudience().equals("World"))
+            .expectComplete()
+            .verify();
+    }
 
     @Test
     public void testTextJsonImplicitListAsyncStream() throws Exception {
@@ -72,6 +97,23 @@ public class FluxReactorTest extends AbstractBusClientServerTestBase {
     
     @Test
     public void testFluxEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/flux/empty";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .getFlux(HelloWorldBean.class))
+            .expectComplete()
+            .verify();
+    }
+    
+    @Test
+    public void testFluxEmpty2() throws Exception {
         String address = "http://localhost:" + PORT + "/reactor2/flux/empty";
         
         StepVerifier
@@ -157,6 +199,24 @@ public class FluxReactorTest extends AbstractBusClientServerTestBase {
             .expectError()
             .verify();
     }
+    
+    @Test
+    public void testFluxImmediateErrorsWithExceptionMapper() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor2/flux/immediate/mapper/errors";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .get())
+            .expectNextMatches(r -> r.getStatus() == 409 && r.readEntity(String.class).contains("stackTrace"))
+            .expectComplete()
+            .verify();
+    }
 
     @Test
     public void testFluxImmediateErrorsResponse() throws Exception {
@@ -175,6 +235,66 @@ public class FluxReactorTest extends AbstractBusClientServerTestBase {
             .expectComplete()
             .verify();
     }
+    
+    @Test
+    public void testFluxErrorWithExceptionMapperReturnsContentPayload() throws Exception {
+        GenericType<List<HelloWorldBean>> helloWorldBeanListType = new GenericType<List<HelloWorldBean>>() {  };
+        String address = "http://localhost:" + PORT + "/reactor2/flux/mixed/error";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .get())
+            // The response should include the emitted elements prior the error
+            .expectNextMatches(r -> r.getStatus() == 409 && r.readEntity(helloWorldBeanListType).size() == 4)
+            .expectComplete()
+            .verify();
+    }
+    
+    @Test
+    public void testFluxErrorWithWebException() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor2/flux/web/errors";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .get())
+            // The response should not include the exception payload (injected by exception mapper)
+            // if some elements have been emitted before
+            .expectNextMatches(r -> r.getStatus() == 403 && !r.readEntity(String.class).contains("stackTrace"))
+            .expectComplete()
+            .verify();
+    }
+
+    @Test
+    public void testFluxErrorWithExceptionMapperReturnsNoExceptionPayload() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor2/flux/mixed/error";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .get())
+            // The response should not include the exception payload (injected by exception mapper)
+            // if some elements have been emitted before
+            .expectNextMatches(r -> r.getStatus() == 409 && !r.readEntity(String.class).contains("stackTrace"))
+            .expectComplete()
+            .verify();
+    }
 
     private void doTestTextJsonImplicitListAsyncStream(String address) throws Exception {
         StepVerifier
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
index e8f4fd9..17f8465 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
@@ -19,7 +19,9 @@
 
 package org.apache.cxf.systest.jaxrs.reactor;
 
+import javax.ws.rs.ForbiddenException;
 import javax.ws.rs.GET;
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
@@ -43,6 +45,15 @@ public class FluxService {
 
     @GET
     @Produces("application/json")
+    @Path("textJsonMany")
+    public Flux<HelloWorldBean> getJsonMany() {
+        return Flux
+            .range(1, 5)
+            .flatMap(item -> Mono.just(new HelloWorldBean("Hello " + item)));
+    }
+
+    @GET
+    @Produces("application/json")
     @Path("textJsonImplicitListAsyncStream")
     public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
         Flux.just("Hello", "Ciao")
@@ -92,6 +103,21 @@ public class FluxService {
     
     @GET
     @Produces(MediaType.APPLICATION_JSON)
+    @Path("/web/errors")
+    public Flux<HelloWorldBean> webErrors() {
+        return Flux
+            .range(1, 3)
+            .concatMap(item -> {
+                if (item < 3) {
+                    return Mono.just(new HelloWorldBean("Person " + item));
+                } else {
+                    return Mono.error(new ForbiddenException("Oops"));
+                }
+            });
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/immediate/errors")
     public Flux<HelloWorldBean> immediateErrors() {
         return Flux
@@ -101,8 +127,33 @@ public class FluxService {
     
     @GET
     @Produces(MediaType.APPLICATION_JSON)
+    @Path("/immediate/mapper/errors")
+    public Flux<HelloWorldBean> immediateMapperErrors() {
+        return Flux
+            .range(1, 2)
+            .flatMap(item -> Mono.error(new IllegalStateException("Oops")));
+    }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/empty")
     public Flux<HelloWorldBean> empty() {
         return Flux.empty();
     }
+
+    @GET
+    @Path("/mixed/error")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Flux<HelloWorldBean> errorAndData() {
+        return Flux
+            .range(1, 5)
+            .flatMap(item -> {
+                if (item <= 4) {
+                    return Mono.just(new HelloWorldBean(" of Item: " + item));
+                } else {
+                    return Mono.error(new NotFoundException("Item not found"));
+                }
+            })
+            .onErrorMap(e -> new IllegalStateException("Oops", e));
+    }
 }
diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/IllegalStateExceptionMapper.java
similarity index 53%
copy from rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/IllegalStateExceptionMapper.java
index a57b574..85d5f39 100644
--- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/IllegalStateExceptionMapper.java
@@ -16,21 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.cxf.jaxrs.reactor.server;
 
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
-import org.apache.cxf.service.invoker.Invoker;
+package org.apache.cxf.systest.jaxrs.reactor;
 
-public class ReactorCustomizer extends AbstractStreamingResponseExtension {
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class IllegalStateExceptionMapper implements ExceptionMapper<IllegalStateException> {
     @Override
-    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
-        Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true)
-                .getOrDefault("useStreamingSubscriber", null);
-        ReactorInvoker invoker = new ReactorInvoker();
-        if (useStreamingSubscriber != null) {
-            invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
-        }
-        return invoker;
+    public Response toResponse(IllegalStateException exception) {
+        return Response
+            .status(409)
+            .entity(exception)
+            .build();
     }
-}
\ No newline at end of file
+}
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
index d66660c..a4a40c0 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java
@@ -19,11 +19,7 @@
 
 package org.apache.cxf.systest.jaxrs.reactor;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import javax.ws.rs.client.ClientBuilder;
-import javax.xml.ws.Holder;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
@@ -31,10 +27,13 @@ import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
 import org.apache.cxf.jaxrs.reactor.client.ReactorInvoker;
 import org.apache.cxf.jaxrs.reactor.client.ReactorInvokerProvider;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import reactor.test.StepVerifier;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.junit.Assert.assertTrue;
+
 public class MonoReactorTest extends AbstractBusClientServerTestBase {
     public static final String PORT = ReactorServer.PORT;
     @BeforeClass
@@ -46,56 +45,69 @@ public class MonoReactorTest extends AbstractBusClientServerTestBase {
     @Test
     public void testGetHelloWorldJson() throws Exception {
         String address = "http://localhost:" + PORT + "/reactor/mono/textJson";
-        List<HelloWorldBean> holder = new ArrayList<>();
-        ClientBuilder.newClient()
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
                 .register(new JacksonJsonProvider())
                 .register(new ReactorInvokerProvider())
                 .target(address)
                 .request("application/json")
                 .rx(ReactorInvoker.class)
-                .get(HelloWorldBean.class)
-                .doOnNext(holder::add)
-                .subscribe();
-        Thread.sleep(500);
-        assertEquals(1, holder.size());
-        HelloWorldBean bean = holder.get(0);
-        assertEquals("Hello", bean.getGreeting());
-        assertEquals("World", bean.getAudience());
+                .get(HelloWorldBean.class))
+            .expectNextMatches(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+            .expectComplete()
+            .verify();
     }
 
     @Test
     public void testTextJsonImplicitListAsyncStream() throws Exception {
         String address = "http://localhost:" + PORT + "/reactor/mono/textJsonImplicitListAsyncStream";
-        Holder<HelloWorldBean> holder = new Holder<>();
-        ClientBuilder.newClient()
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
                 .register(new JacksonJsonProvider())
                 .register(new ReactorInvokerProvider())
                 .target(address)
                 .request("application/json")
                 .rx(ReactorInvoker.class)
-                .get(HelloWorldBean.class)
-                .doOnNext(helloWorldBean -> holder.value = helloWorldBean)
-                .subscribe();
-        Thread.sleep(500);
-        assertNotNull(holder.value);
-        assertEquals("Hello", holder.value.getGreeting());
-        assertEquals("World", holder.value.getAudience());
+                .get(HelloWorldBean.class))
+            .expectNextMatches(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World"))
+            .expectComplete()
+            .verify();
     }
 
     @Test
     public void testGetString() throws Exception {
         String address = "http://localhost:" + PORT + "/reactor/mono/textAsync";
-        Holder<String> holder = new Holder<>();
-        ClientBuilder.newClient()
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
                 .register(new ReactorInvokerProvider())
                 .target(address)
                 .request("text/plain")
                 .rx(ReactorInvoker.class)
-                .get(String.class)
-                .doOnNext(msg -> holder.value = msg)
-                .subscribe();
-
-        Thread.sleep(500);
-        assertEquals("Hello, world!", holder.value);
+                .get(String.class))
+            .expectNextMatches(r -> "Hello, world!".equals(r))
+            .expectComplete()
+            .verify();
+    }
+    
+    @Test
+    public void testMonoEmpty() throws Exception {
+        String address = "http://localhost:" + PORT + "/reactor/mono/empty";
+        
+        StepVerifier
+            .create(ClientBuilder
+                .newClient()
+                .register(new JacksonJsonProvider())
+                .register(new ReactorInvokerProvider())
+                .target(address)
+                .request(MediaType.APPLICATION_JSON)
+                .rx(ReactorInvoker.class)
+                .get(HelloWorldBean.class))
+            .expectComplete()
+            .verify();
     }
 }
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
index d45eb79..1848266 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
@@ -24,6 +24,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
 
 import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
 import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
@@ -58,6 +59,14 @@ public class MonoService {
                 .subscribe(new StringAsyncSubscriber(ar));
 
     }
+    
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/empty")
+    public Mono<HelloWorldBean> empty() {
+        return Mono.empty();
+    }
+
 
     private static class StringAsyncSubscriber extends AbstractSubscriber<String> {
         StringAsyncSubscriber(AsyncResponse ar) {
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
index 88de2ab..4f7f4b3 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
@@ -54,6 +54,7 @@ public class ReactorServer extends AbstractBusTestServerBase {
         JAXRSServerFactoryBean sf2 = new JAXRSServerFactoryBean();
         sf2.setProvider(new JacksonJsonProvider());
         sf2.setProvider(new IllegalArgumentExceptionMapper());
+        sf2.setProvider(new IllegalStateExceptionMapper());
         new ReactorCustomizer().customize(sf2);
         sf2.setResourceClasses(FluxService.class);
         sf2.setResourceProvider(FluxService.class,

Reply | Threaded
Open this post in threaded view
|

[cxf] 02/02: Recording .gitmergeinfo Changes

reta
In reply to this post by reta
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 9cb82deea735481fc3ae5be7031d94dcab1606bb
Author: reta <[hidden email]>
AuthorDate: Sat Oct 10 20:56:45 2020 -0400

    Recording .gitmergeinfo Changes
---
 .gitmergeinfo | 1 +
 1 file changed, 1 insertion(+)

diff --git a/.gitmergeinfo b/.gitmergeinfo
index c4928d5..2435a02 100644
--- a/.gitmergeinfo
+++ b/.gitmergeinfo
@@ -1109,6 +1109,7 @@ M a5a629e3bad6bbdd69c0dd368d5c019bf969d248
 M a7638c1562631b9c66c0de8a35b0cb47f2622d8a
 M a7f082120fa482d7b68a1aee3644412e19d9a35e
 M a8132de2a0204a1247f3e63e9140aa7a34bdf628
+M aa0e937735f876a58e20e3da817b30bd2c8cf0db
 M aacfb57762fd92c835fc543a6d30570f0e2edd95
 M ac1be213e67767f01af5b7077d882ee6f52e5895
 M ac561c4a2b27bd8b427ff1f91d19c7929a8222ab