Skip to content

Commit

Permalink
IGNITE-21656: Fix cache dump check fails on a cache with a node filter (
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladsz83 committed Apr 5, 2024
1 parent 2ccb962 commit 5145c80
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public DumpReader(DumpReaderConfiguration cfg, IgniteLogger log) {
: null;

for (SnapshotMetadata meta : dump.metadata()) {
for (Integer grp : meta.cacheGroupIds()) {
for (Integer grp : meta.partitions().keySet()) {
if (cacheGrpIds == null || cacheGrpIds.contains(grp))
grpToNodes.computeIfAbsent(grp, key -> new ArrayList<>()).add(meta.folderName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1181,27 +1181,23 @@ private IgniteInternalFuture<SnapshotOperationResponse> initLocalFullSnapshot(
parts.put(grpId, null);
}

IgniteInternalFuture<?> task0;
IgniteInternalFuture<?> task0 = registerSnapshotTask(req.snapshotName(),
req.snapshotPath(),
req.operationalNodeId(),
req.requestId(),
parts,
withMetaStorage,
req.dump(),
req.compress(),
req.encrypt(),
locSndrFactory.apply(req.snapshotName(), req.snapshotPath())
);

if (parts.isEmpty() && !withMetaStorage)
task0 = new GridFinishedFuture<>(Collections.emptySet());
else {
task0 = registerSnapshotTask(req.snapshotName(),
req.snapshotPath(),
req.operationalNodeId(),
req.requestId(),
parts,
withMetaStorage,
req.dump(),
req.compress(),
req.encrypt(),
locSndrFactory.apply(req.snapshotName(), req.snapshotPath())
);
if (withMetaStorage) {
assert task0 instanceof SnapshotFutureTask;

if (withMetaStorage && task0 instanceof SnapshotFutureTask) {
((DistributedMetaStorageImpl)cctx.kernalContext().distributedMetastorage())
.suspend(((SnapshotFutureTask)task0).started());
}
((DistributedMetaStorageImpl)cctx.kernalContext().distributedMetastorage())
.suspend(((SnapshotFutureTask)task0).started());
}

return task0.chain(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.dump.DumpEntry;
Expand All @@ -52,8 +56,10 @@
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.platform.model.Key;
import org.apache.ignite.platform.model.User;
import org.apache.ignite.platform.model.Value;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.util.AttributeNodeFilter;
import org.junit.Test;

import static java.lang.Boolean.FALSE;
Expand All @@ -63,6 +69,7 @@
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeTrue;

/** */
public class IgniteCacheDumpSelfTest extends AbstractCacheDumpTest {
Expand Down Expand Up @@ -110,6 +117,68 @@ public class IgniteCacheDumpSelfTest extends AbstractCacheDumpTest {
return cfg;
}

/** */
@Test
public void testDumpWithNodeFilterCache() throws Exception {
assumeTrue(nodes > 1);

CacheConfiguration<?, ?> ccfg0 = cacheConfiguration(getConfiguration(getTestIgniteInstanceName()), DEFAULT_CACHE_NAME)
.setNodeFilter(new AttributeNodeFilter(DEFAULT_CACHE_NAME, null));

CacheConfiguration<?, ?> ccfg1 = cacheConfiguration(getConfiguration(getTestIgniteInstanceName()), CACHE_0)
.setNodeFilter(ccfg0.getNodeFilter());

for (int i = 0; i <= nodes; ++i) {
IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(i));

if (i == 0)
cfg.setUserAttributes(F.asMap(DEFAULT_CACHE_NAME, ""));

cfg.setCacheConfiguration(null);

cfg.setClientMode(i == nodes);

IgniteEx ig = startGrid(cfg);

cli = i == nodes ? ig : null;
}

cli.cluster().state(ClusterState.ACTIVE);

cli.createCache(ccfg0);
cli.createCache(ccfg1);

try (IgniteDataStreamer<Integer, Integer> ds0 = cli.dataStreamer(DEFAULT_CACHE_NAME);
IgniteDataStreamer<Integer, User> ds1 = cli.dataStreamer(CACHE_0)) {
IgniteCache<Integer, Integer> cache0 = cli.cache(DEFAULT_CACHE_NAME);
IgniteCache<Integer, User> cache1 = cli.cache(CACHE_0);

for (int i = 0; i < KEYS_CNT; ++i) {
if (useDataStreamer) {
ds0.addData(i, i);
ds1.addData(i, USER_FACTORY.apply(i));
}
else {
cache0.put(i, i);
cache1.put(i, USER_FACTORY.apply(i));
}
}
}

createDump(cli, DMP_NAME, null);

checkDump(cli,
DMP_NAME,
new String[] {DEFAULT_CACHE_NAME, GRP},
Stream.of(DEFAULT_CACHE_NAME, CACHE_0).collect(Collectors.toSet()),
KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
0,
false,
false
);
}

/** */
@Test
public void testCacheDump() throws Exception {
Expand Down

0 comments on commit 5145c80

Please sign in to comment.