[cxf] branch 3.2.x-fixes updated (3925ae1 -> 8564a03)

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[cxf] branch 3.2.x-fixes updated (3925ae1 -> 8564a03)

reta
This is an automated email from the ASF dual-hosted git repository.

reta pushed a change to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git.


    from 3925ae1  Recording .gitmergeinfo Changes
     new 6f4e659  CXF-8282: Set read timeout using netty client (#673)
     new 8564a03  Recording .gitmergeinfo Changes

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitmergeinfo                                      |   3 +
 .../http/netty/client/CxfResponseCallBack.java     |   2 +-
 .../http/netty/client/NettyHttpClientHandler.java  |  12 +-
 .../client/NettyHttpClientPipelineFactory.java     |  14 +-
 .../http/netty/client/NettyHttpConduit.java        |  10 +-
 .../http/netty/client/NettyHttpConduitTest.java}   | 199 ++++-----------------
 6 files changed, 65 insertions(+), 175 deletions(-)
 copy rt/transports/{http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java => http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java} (58%)

Reply | Threaded
Open this post in threaded view
|

[cxf] 01/02: CXF-8282: Set read timeout using netty client (#673)

reta
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 6f4e6597e935dc042c68a6198c701291d74c5205
Author: Andriy Redko <[hidden email]>
AuthorDate: Sun May 17 12:41:00 2020 -0400

    CXF-8282: Set read timeout using netty client (#673)
   
    (cherry picked from commit f1f312b6dcd3770a52d1dd42e260a4ac26c779b7)
    (cherry picked from commit 504b31611e70ca26b968cf9d96d1dc3d7c920aab)
---
 .../http/netty/client/CxfResponseCallBack.java     |   2 +-
 .../http/netty/client/NettyHttpClientHandler.java  |  12 +-
 .../client/NettyHttpClientPipelineFactory.java     |  14 +-
 .../http/netty/client/NettyHttpConduit.java        |  10 +-
 .../http/netty/client/NettyHttpConduitTest.java    | 265 +++++++++++++++++++++
 5 files changed, 295 insertions(+), 8 deletions(-)

diff --git a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java
index baa4f62..1a0bc09 100644
--- a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java
+++ b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java
@@ -22,6 +22,6 @@ package org.apache.cxf.transport.http.netty.client;
 import io.netty.handler.codec.http.HttpResponse;
 
 public interface CxfResponseCallBack {
-
     void responseReceived(HttpResponse response);
+    void error(Throwable ex);
 }
diff --git a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java
index 3595a1a..f6d1d5f 100644
--- a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java
+++ b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.cxf.transport.http.netty.client;
 
+import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 
@@ -26,6 +27,7 @@ import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.timeout.ReadTimeoutException;
 
 public class NettyHttpClientHandler extends ChannelDuplexHandler {
     private final BlockingQueue<NettyHttpClientRequest> sendedQueue =
@@ -49,7 +51,6 @@ public class NettyHttpClientHandler extends ChannelDuplexHandler {
 
     @Override
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
-
         // need to deal with the request
         if (msg instanceof NettyHttpClientRequest) {
             NettyHttpClientRequest request = (NettyHttpClientRequest)msg;
@@ -61,9 +62,12 @@ public class NettyHttpClientHandler extends ChannelDuplexHandler {
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-        throws Exception {
-        //TODO need to handle the exception here
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        if (cause instanceof ReadTimeoutException) {
+            final NettyHttpClientRequest request = sendedQueue.poll();
+            request.getCxfResponseCallback().error(new IOException(cause));
+        }
+        
         cause.printStackTrace();
         ctx.close();
     }
diff --git a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java
index bc52ac7..2196016 100644
--- a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java
+++ b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.cxf.transport.http.netty.client;
 
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -36,6 +37,7 @@ import io.netty.handler.codec.http.HttpRequestEncoder;
 import io.netty.handler.codec.http.HttpResponseDecoder;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.ReadTimeoutHandler;
 
 
 public class NettyHttpClientPipelineFactory extends ChannelInitializer<Channel> {
@@ -43,9 +45,15 @@ public class NettyHttpClientPipelineFactory extends ChannelInitializer<Channel>
     private static final Logger LOG =
         LogUtils.getL7dLogger(NettyHttpClientPipelineFactory.class);
     private final TLSClientParameters tlsClientParameters;
-
+    private final int readTimeout;
+    
     public NettyHttpClientPipelineFactory(TLSClientParameters clientParameters) {
+        this(clientParameters, 0);
+    }
+
+    public NettyHttpClientPipelineFactory(TLSClientParameters clientParameters, int readTimeout) {
         this.tlsClientParameters = clientParameters;
+        this.readTimeout = readTimeout;
     }
 
     @Override
@@ -66,9 +74,13 @@ public class NettyHttpClientPipelineFactory extends ChannelInitializer<Channel>
         pipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
         pipeline.addLast("encoder", new HttpRequestEncoder());
         pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
+        if (readTimeout > 0) {
+            pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS));
+        }
         pipeline.addLast("client", new NettyHttpClientHandler());
     }
 
+
     private SslHandler configureClientSSLOnDemand() throws Exception {
         if (tlsClientParameters != null) {
             SSLEngine sslEngine = SSLUtils.createClientSSLEngine(tlsClientParameters);
diff --git a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java
index d11da9c..e4919ef 100644
--- a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java
+++ b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java
@@ -333,10 +333,11 @@ public class NettyHttpConduit extends URLConnectionHTTPConduit implements BusLif
         protected void connect(boolean output) {
             if (url.getScheme().equals("https")) {
                 TLSClientParameters clientParameters = findTLSClientParameters();
-                bootstrap.handler(new NettyHttpClientPipelineFactory(clientParameters));
+                bootstrap.handler(new NettyHttpClientPipelineFactory(clientParameters, entity.getReceiveTimeout()));
             } else {
-                bootstrap.handler(new NettyHttpClientPipelineFactory(null));
+                bootstrap.handler(new NettyHttpClientPipelineFactory(null, entity.getReceiveTimeout()));
             }
+
             ChannelFuture connFuture =
                 bootstrap.connect(new InetSocketAddress(url.getHost(), url.getPort() != -1 ? url.getPort()
                                                             : "http".equals(url.getScheme()) ? 80 : 443));
@@ -372,6 +373,11 @@ public class NettyHttpConduit extends URLConnectionHTTPConduit implements BusLif
                 public void responseReceived(HttpResponse response) {
                     setHttpResponse(response);
                 }
+                
+                @Override
+                public void error(Throwable ex) {
+                    setException(ex);
+                }
             };
             entity.setCxfResponseCallback(callBack);
 
diff --git a/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java b/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java
new file mode 100644
index 0000000..b270445
--- /dev/null
+++ b/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.netty.client;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.xml.ws.AsyncHandler;
+import javax.xml.ws.Endpoint;
+import javax.xml.ws.Response;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.frontend.ClientProxy;
+import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.hello_world_soap_http.Greeter;
+import org.apache.hello_world_soap_http.SOAPService;
+import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
+import org.apache.hello_world_soap_http.types.GreetMeResponse;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public class NettyHttpConduitTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = allocatePort(NettyHttpConduitTest.class);
+    public static final String PORT_INV = allocatePort(NettyHttpConduitTest.class, 2);
+    public static final String FILL_BUFFER = "FillBuffer";
+
+    private Endpoint ep;
+    private String request;
+    private Greeter g;
+
+    @Before
+    public void start() throws Exception {
+        Bus b = createStaticBus();
+        b.setProperty(NettyHttpConduit.USE_ASYNC, NettyHttpConduitFactory.UseAsyncPolicy.ALWAYS);
+
+        BusFactory.setThreadDefaultBus(b);
+
+        ep = Endpoint.publish("http://localhost:" + PORT + "/SoapContext/SoapPort",
+                              new org.apache.hello_world_soap_http.GreeterImpl() {
+                public String greetMeLater(long cnt) {
+                    //use the continuations so the async client can
+                    //have a ton of connections, use less threads
+                    //
+                    //mimic a slow server by delaying somewhere between
+                    //1 and 2 seconds, with a preference of delaying the earlier
+                    //requests longer to create a sort of backlog/contention
+                    //with the later requests
+                    ContinuationProvider p = (ContinuationProvider)
+                        getContext().getMessageContext().get(ContinuationProvider.class.getName());
+                    Continuation c = p.getContinuation();
+                    if (c.isNew()) {
+                        if (cnt < 0) {
+                            c.suspend(-cnt);
+                        } else {
+                            c.suspend(2000 - (cnt % 1000));
+                        }
+                        return null;
+                    }
+                    return "Hello, finally! " + cnt;
+                }
+                public String greetMe(String me) {
+                    if (me.equals(FILL_BUFFER)) {
+                        return String.join("", Collections.nCopies(16093, " "));
+                    } else {
+                        return "Hello " + me;
+                    }
+                }
+            });
+
+        StringBuilder builder = new StringBuilder("NaNaNa");
+        for (int x = 0; x < 50; x++) {
+            builder.append(" NaNaNa ");
+        }
+        request = builder.toString();
+
+        URL wsdl = NettyHttpConduitTest.class.getResource("/wsdl/hello_world_services.wsdl");
+        assertNotNull("WSDL is null", wsdl);
+
+        SOAPService service = new SOAPService();
+        assertNotNull("Service is null", service);
+
+        g = service.getSoapPort();
+        assertNotNull("Port is null", g);
+    }
+
+    @After
+    public void stop() throws Exception {
+        ((java.io.Closeable)g).close();
+        ep.stop();
+        ep = null;
+    }
+
+    @Test
+    public void testResponseSameBufferSize() throws Exception {
+        updateAddressPort(g, PORT);
+        HTTPConduit c = (HTTPConduit)ClientProxy.getClient(g).getConduit();
+        c.getClient().setReceiveTimeout(12000);
+        try {
+            g.greetMe(FILL_BUFFER);
+            g.greetMe("Hello");
+        } catch (Exception ex) {
+            fail();
+        }
+    }
+
+    @Test
+    public void testTimeout() throws Exception {
+        updateAddressPort(g, PORT);
+        HTTPConduit c = (HTTPConduit)ClientProxy.getClient(g).getConduit();
+        c.getClient().setReceiveTimeout(3000);
+        try {
+            assertEquals("Hello " + request, g.greetMeLater(-5000));
+            fail();
+        } catch (Exception ex) {
+            //expected!!!
+        }
+    }
+
+
+    @Test
+    public void testTimeoutWithPropertySetting() throws Exception {
+        ((javax.xml.ws.BindingProvider)g).getRequestContext().put("javax.xml.ws.client.receiveTimeout",
+            "3000");
+        updateAddressPort(g, PORT);
+
+        try {
+            assertEquals("Hello " + request, g.greetMeLater(-5000));
+            fail();
+        } catch (Exception ex) {
+            //expected!!!
+        }
+    }
+
+    @Test
+    public void testTimeoutAsync() throws Exception {
+        updateAddressPort(g, PORT);
+        HTTPConduit c = (HTTPConduit)ClientProxy.getClient(g).getConduit();
+        c.getClient().setReceiveTimeout(3000);
+        c.getClient().setAsyncExecuteTimeout(3000);
+        try {
+            Response<GreetMeLaterResponse> future = g.greetMeLaterAsync(-5000L);
+            future.get();
+            fail();
+        } catch (Exception ex) {
+            //expected!!!
+        }
+    }
+
+    @Test
+    public void testTimeoutAsyncWithPropertySetting() throws Exception {
+        updateAddressPort(g, PORT);
+        ((javax.xml.ws.BindingProvider)g).getRequestContext().put("javax.xml.ws.client.receiveTimeout",
+            "3000");
+        try {
+            Response<GreetMeLaterResponse> future = g.greetMeLaterAsync(-5000L);
+            future.get();
+            fail();
+        } catch (Exception ex) {
+            //expected!!!
+        }
+    }
+
+    @Test
+    public void testConnectIssue() throws Exception {
+        updateAddressPort(g, PORT_INV);
+        try {
+            g.greetMe(request);
+            fail("should have connect exception");
+        } catch (Exception ex) {
+            //expected
+        }
+    }
+
+    @Test
+    public void testInovationWithNettyAddress() throws Exception {
+        String address = "netty://http://localhost:" + PORT + "/SoapContext/SoapPort";
+        JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
+        factory.setServiceClass(Greeter.class);
+        factory.setAddress(address);
+        Greeter greeter = factory.create(Greeter.class);
+        String response = greeter.greetMe("test");
+        assertEquals("Get a wrong response", "Hello test", response);
+    }
+
+    @Test
+    public void testInvocationWithTransportId() throws Exception {
+        String address = "http://localhost:" + PORT + "/SoapContext/SoapPort";
+        JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
+        factory.setServiceClass(Greeter.class);
+        factory.setAddress(address);
+        factory.setTransportId("http://cxf.apache.org/transports/http/netty/client");
+        Greeter greeter = factory.create(Greeter.class);
+        String response = greeter.greetMe("test");
+        assertEquals("Get a wrong response", "Hello test", response);
+    }
+
+    @Test
+    public void testCallAsync() throws Exception {
+        updateAddressPort(g, PORT);
+        GreetMeResponse resp = (GreetMeResponse)g.greetMeAsync(request, new AsyncHandler<GreetMeResponse>() {
+            public void handleResponse(Response<GreetMeResponse> res) {
+                try {
+                    res.get().getResponseType();
+                } catch (InterruptedException | ExecutionException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).get();
+        assertEquals("Hello " + request, resp.getResponseType());
+
+        g.greetMeLaterAsync(1000, new AsyncHandler<GreetMeLaterResponse>() {
+            public void handleResponse(Response<GreetMeLaterResponse> res) {
+            }
+        }).get();
+    }
+
+    @Test
+    public void testCallAsyncCallbackInvokedOnlyOnce() throws Exception {
+        updateAddressPort(g, PORT_INV);
+        int repeat = 20;
+        final AtomicInteger count = new AtomicInteger(0);
+        for (int i = 0; i < repeat; i++) {
+            try {
+                g.greetMeAsync(request, new AsyncHandler<GreetMeResponse>() {
+                    public void handleResponse(Response<GreetMeResponse> res) {
+                        count.incrementAndGet();
+                    }
+                }).get();
+            } catch (Exception e) {
+            }
+        }
+        Thread.sleep(1000);
+        assertEquals("Callback should be invoked only once per request", repeat, count.intValue());
+    }
+}

Reply | Threaded
Open this post in threaded view
|

[cxf] 02/02: Recording .gitmergeinfo Changes

reta
In reply to this post by reta
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 8564a03ee2f765ffb4ee967096c5fe2e2b0ccebc
Author: reta <[hidden email]>
AuthorDate: Sun May 17 21:52:15 2020 -0400

    Recording .gitmergeinfo Changes
---
 .gitmergeinfo | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/.gitmergeinfo b/.gitmergeinfo
index 24d5c29..980f6d4 100644
--- a/.gitmergeinfo
+++ b/.gitmergeinfo
@@ -301,6 +301,7 @@ B 4d8a0d4ebf64f0a488e21722997a77a1fa4d6bea
 B 4da42032f95e667a402b113d6daf4bd0514c6d60
 B 4da70b8f4f82788ab7ee62db8af71be4a1f4ddda
 B 4dfe29cf59f2f0241d2515c63e86daeb7e2e853f
+B 4e64e4c40fe6ef1bf5740834022c54ea3a54ac6a
 B 4eac03478a0a934bb1bb43ac7d32b9047d5aa8d6
 B 4f0b7e84c1c57bdfc54c32a5c8396fa6b9324df0
 B 505563aa79f06d0c77babf8a98d43d62ed50e278
@@ -993,8 +994,10 @@ M 4f9923c32688c57e31f933c69d9c2a667f20d63d
 M 4fea734b1349ba023c2e560deda4aa0eaede7f3b
 M 50360a938a6e6d807560ac89748fb9f931d44fcf
 M 504a1b7827bc76f3c5106b901b44e54513db17aa
+M 504b31611e70ca26b968cf9d96d1dc3d7c920aab
 M 52f2cc152ceccd3981361dc840338c16416786fa
 M 53e45f515d36a31d99d0eab4e1776e5a17eeba55
+M 53fba25bd2a57cca0e0cb0fa00e28ad9b4a1d428
 M 540dcbbeb0833fec994eb794fd98edf0449956be
 M 56f74c18d7c709307b174cb211b28e4384574e9b
 M 57f8c554b04af4e06b4e97bb349cbb8adeeb7acc