Skip to content

Commit 6548589

Browse files
author
Andrei Nadyktov
committed
IGNITE-22530 WIP
1 parent 35afc13 commit 6548589

File tree

10 files changed

+121
-143
lines changed

10 files changed

+121
-143
lines changed

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.ignite.cdc;
1919

2020
import java.nio.file.Path;
21-
import java.util.HashSet;
2221
import java.util.Iterator;
2322
import java.util.Set;
2423
import java.util.stream.Collectors;
@@ -80,11 +79,11 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
8079
/** Regexp manager. */
8180
private CdcRegexManager regexManager;
8281

83-
/** Include regex templates for cache names. */
84-
private Set<String> includeTemplates = new HashSet<>();
82+
/** Include regex template for cache names. */
83+
private String includeTemplate;
8584

86-
/** Exclude regex templates for cache names. */
87-
private Set<String> excludeTemplates = new HashSet<>();
85+
/** Exclude regex template for cache names. */
86+
private String excludeTemplate;
8887

8988
/** Cache IDs. */
9089
protected Set<Integer> cachesIds;
@@ -127,7 +126,7 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
127126
.boxed()
128127
.collect(Collectors.toSet());
129128

130-
regexManager.compileRegexp(includeTemplates, excludeTemplates);
129+
regexManager.compileRegexp(includeTemplate, excludeTemplate);
131130

132131
regexManager.getSavedCaches().stream()
133132
.map(CU::cacheId)
@@ -276,25 +275,25 @@ public AbstractIgniteCdcStreamer setCaches(Set<String> caches) {
276275
}
277276

278277
/**
279-
* Sets include regex patterns that participate in CDC.
278+
* Sets include regex pattern that participate in CDC.
280279
*
281-
* @param includeTemplates Include regex templates
280+
* @param includeTemplate Include regex template
282281
* @return {@code this} for chaining.
283282
*/
284-
public AbstractIgniteCdcStreamer setIncludeTemplates(Set<String> includeTemplates) {
285-
this.includeTemplates = includeTemplates;
283+
public AbstractIgniteCdcStreamer setIncludeTemplate(String includeTemplate) {
284+
this.includeTemplate = includeTemplate;
286285

287286
return this;
288287
}
289288

290289
/**
291-
* Sets exclude regex patterns that participate in CDC.
290+
* Sets exclude regex pattern that participate in CDC.
292291
*
293-
* @param excludeTemplates Exclude regex templates
292+
* @param excludeTemplate Exclude regex template
294293
* @return {@code this} for chaining.
295294
*/
296-
public AbstractIgniteCdcStreamer setExcludeTemplates(Set<String> excludeTemplates) {
297-
this.excludeTemplates = excludeTemplates;
295+
public AbstractIgniteCdcStreamer setExcludeTemplate(String excludeTemplate) {
296+
this.excludeTemplate = excludeTemplate;
298297

299298
return this;
300299
}

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcRegexManager.java

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.nio.file.Path;
2323
import java.util.List;
2424
import java.util.Optional;
25-
import java.util.Set;
2625
import java.util.regex.Pattern;
2726
import java.util.regex.PatternSyntaxException;
2827
import java.util.stream.Collectors;
@@ -48,11 +47,11 @@ public class CdcRegexManager {
4847
/** CDC directory path. */
4948
private final Path cdcDir;
5049

51-
/** Include regex patterns for cache names. */
52-
private Set<Pattern> includeFilters;
50+
/** Include regex pattern for cache names. */
51+
private Pattern includeFilter;
5352

54-
/** Exclude regex patterns for cache names. */
55-
private Set<Pattern> excludeFilters;
53+
/** Exclude regex pattern for cache names. */
54+
private Pattern excludeFilter;
5655

5756
/** Logger. */
5857
private IgniteLogger log;
@@ -127,33 +126,23 @@ private boolean matchAndSave(String cacheName) {
127126
* Matches cache name with compiled regex patterns.
128127
*
129128
* @param cacheName Cache name.
130-
* @return True if cache name matches include patterns and doesn't match exclude patterns.
129+
* @return True if cache name matches include pattern and doesn't match exclude pattern.
131130
*/
132131
private boolean matchesFilters(String cacheName) {
133-
boolean matchesInclude = includeFilters.stream()
134-
.anyMatch(pattern -> pattern.matcher(cacheName).matches());
135-
136-
boolean notMatchesExclude = excludeFilters.stream()
137-
.noneMatch(pattern -> pattern.matcher(cacheName).matches());
138-
139-
return matchesInclude && notMatchesExclude;
132+
return includeFilter.matcher(cacheName).matches() && excludeFilter.matcher(cacheName).matches();
140133
}
141134

142135
/**
143136
* Compiles regex patterns from user templates.
144137
*
145-
* @param includeTemplates Include regex templates.
146-
* @param excludeTemplates Exclude regex templates.
138+
* @param includeTemplate Include regex template.
139+
* @param excludeTemplate Exclude regex template.
147140
* @throws PatternSyntaxException If the template's syntax is invalid
148141
*/
149-
public void compileRegexp(Set<String> includeTemplates, Set<String> excludeTemplates) {
150-
includeFilters = includeTemplates.stream()
151-
.map(Pattern::compile)
152-
.collect(Collectors.toSet());
153-
154-
excludeFilters = excludeTemplates.stream()
155-
.map(Pattern::compile)
156-
.collect(Collectors.toSet());
142+
public void compileRegexp(String includeTemplate, String excludeTemplate) {
143+
includeFilter = Pattern.compile(includeTemplate);
144+
145+
excludeFilter = Pattern.compile(excludeTemplate);
157146
}
158147

159148
/**

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.nio.file.Path;
2121
import java.util.ArrayList;
2222
import java.util.Collection;
23-
import java.util.HashSet;
2423
import java.util.Iterator;
2524
import java.util.List;
2625
import java.util.Properties;
@@ -154,11 +153,11 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumerEx {
154153
/** Regexp manager. */
155154
private CdcRegexManager regexManager;
156155

157-
/** Include regex templates for cache names. */
158-
private Set<String> includeTemplates = new HashSet<>();
156+
/** Include regex template for cache names. */
157+
private String includeTemplate;
159158

160-
/** Exclude regex templates for cache names. */
161-
private Set<String> excludeTemplates = new HashSet<>();
159+
/** Exclude regex template for cache names. */
160+
private String excludeTemplate;
162161

163162
/** Max batch size. */
164163
private int maxBatchSz = DFLT_MAX_BATCH_SIZE;
@@ -351,7 +350,7 @@ private <T> void sendOneBatch(
351350
.map(CU::cacheId)
352351
.collect(Collectors.toSet());
353352

354-
regexManager.compileRegexp(includeTemplates, excludeTemplates);
353+
regexManager.compileRegexp(includeTemplate, excludeTemplate);
355354

356355
regexManager.getSavedCaches().stream()
357356
.map(CU::cacheId)
@@ -464,25 +463,25 @@ public IgniteToKafkaCdcStreamer setCaches(Collection<String> caches) {
464463
}
465464

466465
/**
467-
* Sets include regex patterns that participate in CDC.
466+
* Sets include regex pattern that participate in CDC.
468467
*
469-
* @param includeTemplates Include regex templates.
468+
* @param includeTemplate Include regex template.
470469
* @return {@code this} for chaining.
471470
*/
472-
public IgniteToKafkaCdcStreamer setIncludeTemplates(Set<String> includeTemplates) {
473-
this.includeTemplates = includeTemplates;
471+
public IgniteToKafkaCdcStreamer setIncludeTemplate(String includeTemplate) {
472+
this.includeTemplate = includeTemplate;
474473

475474
return this;
476475
}
477476

478477
/**
479-
* Sets exclude regex patterns that participate in CDC.
478+
* Sets exclude regex pattern that participate in CDC.
480479
*
481-
* @param excludeTemplates Exclude regex templates
480+
* @param excludeTemplate Exclude regex template.
482481
* @return {@code this} for chaining.
483482
*/
484-
public IgniteToKafkaCdcStreamer setExcludeTemplates(Set<String> excludeTemplates) {
485-
this.excludeTemplates = excludeTemplates;
483+
public IgniteToKafkaCdcStreamer setExcludeTemplate(String excludeTemplate) {
484+
this.excludeTemplate = excludeTemplate;
486485

487486
return this;
488487
}

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,11 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
108108
/** Caches ids to read. */
109109
private final Set<Integer> caches;
110110

111-
/** Include regex templates for cache names. */
112-
private final Set<String> includeTemplates;
111+
/** Include regex template for cache names. */
112+
private final String includeTemplate;
113113

114-
/** Exclude regex templates for cache names. */
115-
private final Set<String> excludeTemplates;
114+
/** Exclude regex template for cache names. */
115+
private final String excludeTemplate;
116116

117117
/** The maximum time to complete Kafka related requests, in milliseconds. */
118118
private final long kafkaReqTimeout;
@@ -180,8 +180,8 @@ public KafkaToIgniteCdcStreamerApplier(
180180
this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
181181
this.metrics = metrics;
182182
this.streamer = streamer;
183-
this.includeTemplates = streamerCfg.getIncludeTemplates();
184-
this.excludeTemplates = streamerCfg.getExcludeTemplates();
183+
this.includeTemplate = streamerCfg.getIncludeTemplate();
184+
this.excludeTemplate = streamerCfg.getExcludeTemplate();
185185
}
186186

187187
/** {@inheritDoc} */
@@ -307,13 +307,9 @@ private boolean matchesRegexTemplates(Integer key) {
307307
* @return True if cache name match include patterns and don't match exclude patterns.
308308
*/
309309
private boolean matchesFilters(String cacheName) {
310-
boolean matchesInclude = includeTemplates.stream()
311-
.map(Pattern::compile)
312-
.anyMatch(pattern -> pattern.matcher(cacheName).matches());
310+
boolean matchesInclude = Pattern.compile(includeTemplate).matcher(cacheName).matches();
313311

314-
boolean notMatchesExclude = excludeTemplates.stream()
315-
.map(Pattern::compile)
316-
.noneMatch(pattern -> pattern.matcher(cacheName).matches());
312+
boolean notMatchesExclude = Pattern.compile(excludeTemplate).matcher(cacheName).matches();
317313

318314
return matchesInclude && notMatchesExclude;
319315
}

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.ignite.cdc.kafka;
1919

2020
import java.util.Collection;
21-
import java.util.HashSet;
2221
import java.util.Map;
2322
import java.util.Set;
2423

@@ -86,11 +85,11 @@ public class KafkaToIgniteCdcStreamerConfiguration {
8685
*/
8786
private Collection<String> caches;
8887

89-
/** Include regex templates for cache names. */
90-
private Set<String> includeTemplates = new HashSet<>();
88+
/** Include regex template for cache names. */
89+
private String includeTemplate;
9190

92-
/** Exclude regex templates for cache names. */
93-
private Set<String> excludeTemplates = new HashSet<>();
91+
/** Exclude regex template for cache names. */
92+
private String excludeTemplate;
9493

9594
/** Metric exporter SPI. */
9695
private MetricExporterSpi[] metricExporterSpi;
@@ -185,29 +184,29 @@ public void setCaches(Collection<String> caches) {
185184
/**
186185
* @return Include regex templates
187186
*/
188-
public Set<String> getIncludeTemplates() {
189-
return includeTemplates;
187+
public String getIncludeTemplate() {
188+
return includeTemplate;
190189
}
191190

192191
/**
193-
* @param includeTemplates Include regex templates
192+
* @param includeTemplate Include regex templates
194193
*/
195-
public void setIncludeTemplates(Set<String> includeTemplates) {
196-
this.includeTemplates = includeTemplates;
194+
public void setIncludeTemplate(String includeTemplate) {
195+
this.includeTemplate = includeTemplate;
197196
}
198197

199198
/**
200199
* @return Exclude regex templates
201200
*/
202-
public Set<String> getExcludeTemplates() {
203-
return excludeTemplates;
201+
public String getExcludeTemplate() {
202+
return excludeTemplate;
204203
}
205204

206205
/**
207-
* @param excludeTemplates Exclude regex templates
206+
* @param excludeTemplate Exclude regex templates
208207
*/
209-
public void setExcludeTemplates(Set<String> excludeTemplates) {
210-
this.excludeTemplates = excludeTemplates;
208+
public void setExcludeTemplate(String excludeTemplate) {
209+
this.excludeTemplate = excludeTemplate;
211210
}
212211

213212
/** @return The maximum time to complete Kafka related requests, in milliseconds. */

modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ public void testActiveActiveReplicationWithRegexFilters() throws Exception {
599599
IntStream.range(0, KEYS_CNT).filter(i -> i % 2 != 0)));
600600

601601
//Start CDC with only 'active-active-cache' in 'caches' property of CDC config
602-
List<IgniteInternalFuture<?>> futs = startActiveActiveCdcWithFilters(includeTemplates, excludeTemplates);
602+
List<IgniteInternalFuture<?>> futs = startActiveActiveCdcWithFilters(REGEX_INCLUDE_PATTERN, REGEX_EXCLUDE_PATTERN);
603603

604604
try {
605605
waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, futs);
@@ -628,8 +628,8 @@ public void testActivePassiveReplicationWithRegexFilters() throws Exception {
628628
Set<String> excludeTemplates = new HashSet<>(Arrays.asList(REGEX_EXCLUDE_PATTERN));
629629

630630
//Start CDC with only 'active-active-cache' in 'caches' property of CDC config
631-
List<IgniteInternalFuture<?>> futs = startActivePassiveCdcWithFilters(ACTIVE_PASSIVE_CACHE,
632-
includeTemplates, excludeTemplates);
631+
List<IgniteInternalFuture<?>> futs = startActivePassiveCdcWithFilters(ACTIVE_PASSIVE_CACHE, REGEX_INCLUDE_PATTERN,
632+
REGEX_EXCLUDE_PATTERN);
633633

634634
try {
635635
createCache(destCluster[0], ACTIVE_PASSIVE_CACHE);
@@ -797,15 +797,15 @@ protected String[] hostAddresses(IgniteEx[] dest) {
797797

798798
/** */
799799
protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdcWithFilters(String cache,
800-
Set<String> includeTemplates,
801-
Set<String> excludeTemplates);
800+
String includeTemplate,
801+
String excludeTemplate);
802802

803803
/** */
804804
protected abstract List<IgniteInternalFuture<?>> startActiveActiveCdc();
805805

806806
/** */
807-
protected abstract List<IgniteInternalFuture<?>> startActiveActiveCdcWithFilters(Set<String> includeTemplates,
808-
Set<String> excludeTemplates);
807+
protected abstract List<IgniteInternalFuture<?>> startActiveActiveCdcWithFilters(String includeTemplate,
808+
String excludeTemplate);
809809

810810
/** */
811811
protected abstract void checkConsumerMetrics(Function<String, Long> longMetric);

0 commit comments

Comments
 (0)