From add61eb58874a6199b4f23f739efad55155f6bcd Mon Sep 17 00:00:00 2001 From: Pavel Tupitsyn Date: Wed, 21 Aug 2024 15:31:41 +0300 Subject: [PATCH] IGNITE-22964 Java thin: fix client init hang on unreachable discovered address (#11486) * Do not perform cluster discovery synchronously while initializing the client - do `applyOnDefaultChannel` before checking `channelsCnt.get() == 0` in `channelsInit` * Do not disconnect active channels when performing discovery, even if those addresses are not in the new list to avoid unnecessary reconnects --- .../internal/client/thin/ReliableChannel.java | 22 +++++++- .../apache/ignite/client/ReliabilityTest.java | 6 +- .../internal/client/thin/ComputeTaskTest.java | 4 +- ...nClientAbstractPartitionAwarenessTest.java | 56 +++++++++++++++++-- .../thin/ThinClientEnpointsDiscoveryTest.java | 45 +++++++++++++++ ...artitionAwarenessUnstableTopologyTest.java | 4 +- 6 files changed, 124 insertions(+), 13 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java index 6559ea8f7397e..1d0115630a561 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java @@ -631,6 +631,19 @@ synchronized void initChannelHolders() { return; } + // Add connected channels to the list to avoid unnecessary reconnects, unless address finder is used. + if (holders != null && clientCfg.getAddressesFinder() == null) { + // Do not modify the original list. + newAddrs = new ArrayList<>(newAddrs); + + for (ClientChannelHolder h : holders) { + ClientChannel ch = h.ch; + + if (ch != null && !ch.closed()) + newAddrs.add(h.getAddresses()); + } + } + Map curAddrs = new HashMap<>(); Set newAddrsSet = newAddrs.stream().flatMap(Collection::stream).collect(Collectors.toSet()); @@ -744,13 +757,16 @@ void channelsInit(@Nullable List failures) { initChannelHolders(); if (failures == null || failures.size() < attemptsLimit) { + // Establish default channel connection. + applyOnDefaultChannel(channel -> null, null, failures); + if (channelsCnt.get() == 0) { // Establish default channel connection and retrive nodes endpoints if applicable. - if (applyOnDefaultChannel(discoveryCtx::refresh, null, failures)) + boolean discoveryUpdated = applyOnDefaultChannel(discoveryCtx::refresh, null, failures); + + if (discoveryUpdated) initChannelHolders(); } - else // Apply no-op function. Establish default channel connection. - applyOnDefaultChannel(channel -> null, null, failures); } if (partitionAwarenessEnabled) diff --git a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java index 81127e8d09129..6803f3c2d4777 100644 --- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java +++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java @@ -205,9 +205,11 @@ public void testSingleServerFailover() throws Exception { // Fail. dropAllThinClientConnections(Ignition.allGrids().get(0)); - Throwable ex = GridTestUtils.assertThrowsWithCause(() -> cachePut(cache, 0, 0), ClientConnectionException.class); + if (!partitionAware) { + Throwable ex = GridTestUtils.assertThrowsWithCause(() -> cachePut(cache, 0, 0), ClientConnectionException.class); - GridTestUtils.assertContains(null, ex.getMessage(), F.first(cluster.clientAddresses())); + GridTestUtils.assertContains(null, ex.getMessage(), F.first(cluster.clientAddresses())); + } // Recover after fail. cachePut(cache, 0, 0); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java index 38a3da9f7797a..6d6b96dc5cdaf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java @@ -420,13 +420,13 @@ public void testExecuteTaskConnectionLost() throws Exception { Future fut1 = compute.executeAsync(TestLatchTask.class.getName(), null); // Wait for the task to start, then drop connections. - TestLatchTask.startLatch.await(); + assertTrue(TestLatchTask.startLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)); dropAllThinClientConnections(); TestLatchTask.startLatch = new CountDownLatch(1); Future fut2 = compute.executeAsync(TestLatchTask.class.getName(), null); - TestLatchTask.startLatch.await(); + assertTrue(TestLatchTask.startLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)); dropAllThinClientConnections(); TestLatchTask.latch = new CountDownLatch(1); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java index 93dc575d8cc95..8430fc5ff81ef 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.client.thin; +import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Queue; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -54,7 +56,7 @@ @SuppressWarnings("rawtypes") public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommonAbstractTest { /** Wait timeout. */ - protected static final long WAIT_TIMEOUT = 5_000L; + protected static final long WAIT_TIMEOUT = 15_000L; /** Replicated cache name. */ protected static final String REPL_CACHE_NAME = "replicated_cache"; @@ -153,16 +155,31 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo * Checks that operation goes through specified channel. */ protected void assertOpOnChannel(@Nullable TestTcpClientChannel expCh, ClientOperation expOp) { + assertOpOnChannel(expCh, expOp, null); + } + + /** + * Checks that operation goes through specified channel. + */ + protected void assertOpOnChannel( + @Nullable TestTcpClientChannel expCh, + ClientOperation expOp, + @Nullable ClientOperation ignoreOp) { + while (opsQueue.peek() != null && opsQueue.peek().get2() == ignoreOp) { + opsQueue.poll(); + } + T2 nextChOp = opsQueue.poll(); + T2 queuedOp = opsQueue.peek(); assertNotNull("Unexpected (null) next operation [expCh=" + expCh + ", expOp=" + expOp + ']', nextChOp); assertEquals("Unexpected operation on channel [expCh=" + expCh + ", expOp=" + expOp + - ", nextOpCh=" + nextChOp + ']', expOp, nextChOp.get2()); + ", nextOpCh=" + nextChOp + ", queuedOp=" + queuedOp + ']', expOp, nextChOp.get2()); if (expCh != null) { assertEquals("Unexpected channel for operation [expCh=" + expCh + ", expOp=" + expOp + - ", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1()); + ", nextOpCh=" + nextChOp + ", queuedOp=" + queuedOp + ']', expCh, nextChOp.get1()); } } @@ -245,10 +262,41 @@ protected void awaitChannelsInit(int... chIdxs) throws IgniteInterruptedCheckedE // Wait until all channels initialized. for (int ch : chIdxs) { assertTrue("Failed to wait for channel[" + ch + "] init", - GridTestUtils.waitForCondition(() -> channels[ch] != null, WAIT_TIMEOUT)); + GridTestUtils.waitForCondition(() -> isConnected(ch), WAIT_TIMEOUT)); } } + /** + * Gets a value indicating whether the channel is connected at the specified index (port offset). + * + * @param chIdx Channel index (port offset). + * @return {@code true} if the channel is connected, {@code false} otherwise. + */ + protected boolean isConnected(int chIdx) { + List channelHolders = ((TcpIgniteClient)client).reliableChannel().getChannelHolders(); + int chPort = DFLT_PORT + chIdx; + + for (ReliableChannel.ClientChannelHolder holder : channelHolders) { + if (holder == null || holder.isClosed()) { + continue; + } + + ClientChannel ch = GridTestUtils.getFieldValue(holder, "ch"); + + if (ch == null || ch.closed()) { + continue; + } + + for (InetSocketAddress addr : holder.getAddresses()) { + if (addr.getPort() == chPort) { + return true; + } + } + } + + return false; + } + /** * */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java index 6c91e3014f115..bcb7627f8ecaf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java @@ -17,10 +17,24 @@ package org.apache.ignite.internal.client.thin; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +import org.apache.ignite.Ignition; import org.apache.ignite.client.ClientConnectionException; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.ClientConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; +import static org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT; + /** * Test endpoints discovery by thin client. */ @@ -122,4 +136,35 @@ public void testDiscoveryAfterAllNodesFailed() throws Exception { awaitChannelsInit(0); } + + /** */ + @Test + public void testUnreachableAddressDiscoveredDoesNotPreventClientInit() throws Exception { + try (ServerSocket sock = new ServerSocket()) { + sock.bind(new InetSocketAddress("127.0.0.1", 0)); + + ArrayList addrs = new ArrayList<>(); + addrs.add("127.0.0.1:" + sock.getLocalPort()); + + IgniteEx server = startGrid(0); + ClusterNode serverNode = server.cluster().localNode(); + + // Override node attributes - set local port of the "fake server" socket which does not work. + Map attrsFiltered = serverNode.attributes(); + Map attrsSealed = GridTestUtils.getFieldValue(attrsFiltered, "map"); + Map attrs = GridTestUtils.getFieldValue(attrsSealed, "m"); + attrs.put(ClientListenerProcessor.CLIENT_LISTENER_PORT, sock.getLocalPort()); + + // Config has good server address, client discovery returns unreachable address. + // We expect the client to connect to the good address and ignore the unreachable one. + ClientConfiguration ccfg = new ClientConfiguration() + .setTimeout(2000) + .setAddresses("127.0.0.1:" + DFLT_PORT); + + IgniteClient client = Ignition.startClient(ccfg); + + Collection cacheNames = client.cacheNames(); + assertFalse(cacheNames.isEmpty()); + } + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java index fe49303bbeb9d..83081ffce83ca 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java @@ -205,12 +205,12 @@ public void testConnectionLoss() throws Exception { cache.put(key, 0); // Request goes to the connected channel, since affinity node is disconnected. - assertOpOnChannel(channels[1], ClientOperation.CACHE_PUT); + assertOpOnChannel(null, ClientOperation.CACHE_PUT); cache.put(key, 0); // Connection to disconnected node should be restored after retry. - assertOpOnChannel(channels[disconnectNodeIdx], ClientOperation.CACHE_PUT); + assertOpOnChannel(channels[disconnectNodeIdx], ClientOperation.CACHE_PUT, ClientOperation.CACHE_PARTITIONS); // Test partition awareness. testPartitionAwareness(false);