Skip to content

Commit 80ffa87

Browse files
committed
extract FlinkCDCPGLikeSourceFactory to tis-flinl-cdc-common
1 parent 352def8 commit 80ffa87

File tree

18 files changed

+755
-434
lines changed

18 files changed

+755
-434
lines changed
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.qlangtech.plugins.incr.flink.cdc.pglike;
20+
21+
import com.alibaba.citrus.turbine.Context;
22+
import com.google.common.collect.Lists;
23+
import com.qlangtech.plugins.incr.debuzium.DebuziumPropAssist;
24+
import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory;
25+
import com.qlangtech.tis.datax.DataXName;
26+
import com.qlangtech.tis.datax.impl.DataxReader;
27+
import com.qlangtech.tis.extension.util.AbstractPropAssist;
28+
import com.qlangtech.tis.extension.util.OverwriteProps;
29+
import com.qlangtech.tis.plugin.annotation.FormField;
30+
import com.qlangtech.tis.plugin.annotation.FormFieldType;
31+
import com.qlangtech.tis.plugin.annotation.Validator;
32+
import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
33+
import com.qlangtech.tis.plugin.ds.ISelectedTab;
34+
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
35+
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
36+
import com.qlangtech.tis.util.IPluginContext;
37+
import io.debezium.config.Field;
38+
import org.apache.commons.lang3.tuple.Pair;
39+
40+
import java.util.List;
41+
import java.util.Objects;
42+
import java.util.Properties;
43+
import java.util.function.BiConsumer;
44+
import java.util.function.Consumer;
45+
46+
/**
47+
* @author: 百岁([email protected]
48+
* @create: 2025-01-19 18:14
49+
**/
50+
public abstract class FlinkCDCPGLikeSourceFactory extends MQListenerFactory {
51+
52+
public static final String FIELD_REPLICA_RULE = "replicaRule";
53+
public static final String FIELD_KEY_SLOT_NAME = "slotName";
54+
/**
55+
* The name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput.
56+
*/
57+
@FormField(ordinal = 0, type = FormFieldType.ENUM, validate = {Validator.require})
58+
public String decodingPluginName;
59+
60+
@FormField(ordinal = 1, type = FormFieldType.INPUTTEXT, validate = {Validator.require, Validator.db_col_name})
61+
public String slotName;
62+
63+
//DROP_SLOT_ON_STOP
64+
@FormField(ordinal = 2, type = FormFieldType.ENUM, validate = {Validator.require, Validator.db_col_name})
65+
public Boolean dropSolt;
66+
67+
/**
68+
* https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#incremental-snapshot-optionshttps://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#incremental-snapshot-options
69+
*/
70+
@FormField(ordinal = 3, type = FormFieldType.ENUM, validate = {Validator.require})
71+
public String startupOptions;
72+
// REPLICA IDENTITY
73+
@FormField(ordinal = 4, advance = false, validate = {Validator.require})
74+
public PGLikeReplicaIdentity replicaRule;
75+
76+
77+
public final PGLikeReplicaIdentity getRepIdentity() {
78+
return Objects.requireNonNull(replicaRule, "replicaIdentity can not be null");
79+
}
80+
81+
public void setDebeziumProperties(Properties debeziumProperties, FlinkCDCPGLikeSourceFactory sourceFactory) {
82+
((BasePGLikeDescriptor) this.getDescriptor()).debeziumProps
83+
.forEach((pair) -> {
84+
pair.getValue().accept(debeziumProperties //
85+
, Objects.requireNonNull(sourceFactory, "sourceFactory can not be null"));
86+
});
87+
}
88+
89+
90+
// @Override
91+
// public IFlinkColCreator<FlinkCol> createFlinkColCreator(DataSourceMeta sourceMeta) {
92+
// IFlinkColCreator<FlinkCol> flinkColCreator = (meta, colIndex) -> {
93+
// return meta.getType().accept(new PGCDCTypeVisitor(meta, colIndex));
94+
// };
95+
// return flinkColCreator;
96+
// }
97+
98+
// public static List<Triple<String, Field, Function<FlinkCDCPGLikeSourceFactory, Object>>> debeziumProps
99+
// = Lists.newArrayList(
100+
// Triple.of(FIELD_KEY_SLOT_NAME, PostgresConnectorConfig.SLOT_NAME, (sf) -> sf.slotName)
101+
// , Triple.of("dropSolt", PostgresConnectorConfig.DROP_SLOT_ON_STOP, (sf) -> sf.dropSolt));
102+
103+
// Properties debeziumProperties
104+
105+
106+
// @TISExtension()
107+
public static abstract class BasePGLikeDescriptor extends BaseDescriptor {
108+
109+
public final List<Pair<Consumer<AbstractPropAssist.Options<MQListenerFactory, Field>>, BiConsumer<Properties, FlinkCDCPGLikeSourceFactory>>> debeziumProps;
110+
111+
public BasePGLikeDescriptor() {
112+
super();
113+
this.debeziumProps = Lists.newArrayList(
114+
Pair.of((opts) -> {
115+
opts.add(FIELD_KEY_SLOT_NAME, getSoltNameField()
116+
, new OverwriteProps().setDftVal(IPluginContext.getThreadLocalInstance().getCollectionName().getPipelineName()));
117+
}
118+
, (debeziumProperties, sourceFactory) -> {
119+
debeziumProperties.setProperty(getSoltNameField().name(), sourceFactory.slotName);
120+
})
121+
, //
122+
Pair.of((opts) -> {
123+
opts.add("dropSolt", getDropSoltOnStopField());
124+
}, (debeziumProperties, sourceFactory) -> {
125+
debeziumProperties.setProperty(getDropSoltOnStopField().name(), String.valueOf(sourceFactory.dropSolt));
126+
}));
127+
AbstractPropAssist.Options<MQListenerFactory, Field> opts = DebuziumPropAssist.createOpts(this);
128+
debeziumProps.forEach((pair) -> {
129+
//opts.add(trip.getLeft(), trip.getMiddle());
130+
pair.getKey().accept(opts);
131+
});
132+
// this.opts.add("slotName", PostgresConnectorConfig.SLOT_NAME);
133+
// this.opts.add("dropSolt", PostgresConnectorConfig.DROP_SLOT_ON_STOP);
134+
}
135+
136+
/**
137+
* PostgresConnectorConfig.SLOT_NAME
138+
*
139+
* @return
140+
*/
141+
protected abstract Field getSoltNameField();
142+
143+
/**
144+
* PostgresConnectorConfig.DROP_SLOT_ON_STOP
145+
*
146+
* @return
147+
*/
148+
protected abstract Field getDropSoltOnStopField();
149+
150+
151+
@Override
152+
public PluginVender getVender() {
153+
return PluginVender.FLINK_CDC;
154+
}
155+
156+
// @Override
157+
public boolean validateSlotName(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) {
158+
if (!value.matches("[a-z0-9_]{1,63}")) {
159+
msgHandler.addFieldError(context, fieldName, "must contain only digits, lowercase characters and underscores with length <= 63");
160+
return false;
161+
}
162+
163+
return true;
164+
}
165+
166+
@Override
167+
protected boolean validateMQListenerForm(
168+
IControlMsgHandler msgHandler, Context context, MQListenerFactory sourceFactory) {
169+
FlinkCDCPGLikeSourceFactory incrSource = (FlinkCDCPGLikeSourceFactory) sourceFactory;
170+
DataXName pipeline = msgHandler.getCollectionName();
171+
DataxReader dataxReader = DataxReader.load((IPluginContext) msgHandler, pipeline.getPipelineName());
172+
IDataSourceFactoryGetter dataSourceGetter = (IDataSourceFactoryGetter) dataxReader;
173+
List<ISelectedTab> selectedTabs = dataxReader.getSelectedTabs();
174+
175+
if (!incrSource.replicaRule.validateSelectedTabs(msgHandler, context, dataSourceGetter, selectedTabs)) {
176+
return false;
177+
}
178+
179+
final boolean[] validationPassed = {true};
180+
181+
dataSourceGetter.getDataSourceFactory().visitFirstConnection((conn) -> {
182+
// Validate replication slot availability
183+
String slotName = incrSource.slotName;
184+
185+
String checkSlotSql = "SELECT active FROM pg_replication_slots WHERE slot_name = ?";
186+
try (java.sql.PreparedStatement pstmt = conn.preparedStatement(checkSlotSql)) {
187+
pstmt.setString(1, slotName);
188+
189+
try (java.sql.ResultSet rs = pstmt.executeQuery()) {
190+
if (rs.next()) {
191+
boolean isActive = rs.getBoolean("active");
192+
193+
if (isActive) {
194+
// Slot is currently active and being used by another task
195+
msgHandler.addFieldError(context, FIELD_KEY_SLOT_NAME,
196+
"Replication slot '" + slotName + "' is already active and in use by another CDC task. " +
197+
"Please use a different slot name or stop the task currently using this slot.");
198+
validationPassed[0] = false;
199+
}
200+
// If active=false, the slot exists but is idle, which is acceptable
201+
// The task can reuse this slot
202+
}
203+
// If no result, the slot doesn't exist yet, which is fine
204+
// Flink CDC will create the slot automatically on startup
205+
}
206+
} catch (java.sql.SQLException e) {
207+
// Log warning but don't fail validation if we can't check the slot
208+
// This could happen if pg_replication_slots view doesn't exist or permission issues
209+
msgHandler.addErrorMessage(context,
210+
"Unable to validate replication slot status: " + e.getMessage());
211+
validationPassed[0] = false;
212+
}
213+
});
214+
215+
return validationPassed[0];
216+
}
217+
}
218+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.qlangtech.plugins.incr.flink.cdc.pglike;
20+
21+
import com.alibaba.citrus.turbine.Context;
22+
import com.qlangtech.tis.extension.Describable;
23+
import com.qlangtech.tis.extension.TISExtensible;
24+
import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
25+
import com.qlangtech.tis.plugin.ds.ISelectedTab;
26+
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
27+
28+
import java.io.Serializable;
29+
import java.util.List;
30+
31+
/**
32+
* PostgreSQL binlog的复制规则,如果需要支持物理删除,必须要使用full,不然更新流程中不会附带需要删除的物理id
33+
*
34+
* @author: 百岁([email protected]
35+
* @create: 2024-11-11 16:31
36+
**/
37+
@TISExtensible
38+
public abstract class PGLikeReplicaIdentity implements Describable<PGLikeReplicaIdentity>, Serializable {
39+
protected static final String FULL = "FULL";
40+
protected static final String DEFAULT = "DEFAULT";
41+
42+
public abstract boolean isShallContainBeforeVals();
43+
44+
45+
public abstract boolean validateSelectedTabs(
46+
IControlMsgHandler msgHandler
47+
, Context context
48+
, IDataSourceFactoryGetter dataSourceGetter, List<ISelectedTab> selectedTabs);
49+
}

tis-incr/tis-flink-cdc-kingbase-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/pglike/replica/DefaultPGLikeReplicaIdentity.java renamed to tis-incr/tis-flink-cdc-common/src/main/java/com/qlangtech/plugins/incr/flink/cdc/pglike/replica/DefaultPGLikeReplicaIdentity.java

File renamed without changes.

tis-incr/tis-flink-cdc-kingbase-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/pglike/replica/FullPGLikeReplicaIdentity.java renamed to tis-incr/tis-flink-cdc-common/src/main/java/com/qlangtech/plugins/incr/flink/cdc/pglike/replica/FullPGLikeReplicaIdentity.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
import com.qlangtech.plugins.incr.flink.cdc.pglike.PGLikeReplicaIdentity;
55
import com.qlangtech.tis.extension.Descriptor;
66
import com.qlangtech.tis.extension.TISExtension;
7+
import com.qlangtech.tis.plugin.ds.DataSourceFactory;
78
import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
89
import com.qlangtech.tis.plugin.ds.ISelectedTab;
910
import com.qlangtech.tis.plugin.ds.JDBCConnection;
10-
import com.qlangtech.tis.plugin.ds.postgresql.PGLikeDataSourceFactory;
1111
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
1212
import org.apache.commons.lang3.StringUtils;
1313

@@ -44,11 +44,13 @@ public boolean validateSelectedTabs(
4444
, Context context
4545
, IDataSourceFactoryGetter dataSourceGetter, List<ISelectedTab> selectedTabs) {
4646

47-
PGLikeDataSourceFactory ds = (PGLikeDataSourceFactory) dataSourceGetter.getDataSourceFactory();
47+
DataSourceFactory ds = dataSourceGetter.getDataSourceFactory();
4848

49+
// see PGLikeDataSourceFactory
50+
DataSourceFactory.ISchemaSupported schemaSupported = (DataSourceFactory.ISchemaSupported)ds;
4951
boolean[] validate = new boolean[]{true};
5052
ds.visitFirstConnection((conn) -> {
51-
validate[0] = validateReplicaIdentity(msgHandler, context, conn, ds.tabSchema, selectedTabs);
53+
validate[0] = validateReplicaIdentity(msgHandler, context, conn, schemaSupported.getDBSchema(), selectedTabs);
5254
});
5355

5456
return validate[0];

tis-incr/tis-flink-cdc-kingbase-plugin/src/main/resources/com/qlangtech/plugins/incr/flink/cdc/pglike/FlinkCDCPGLikeSourceFactory.json renamed to tis-incr/tis-flink-cdc-common/src/main/resources/com/qlangtech/plugins/incr/flink/cdc/pglike/FlinkCDCPGLikeSourceFactory.json

File renamed without changes.

tis-incr/tis-flink-cdc-kingbase-plugin/src/main/resources/com/qlangtech/plugins/incr/flink/cdc/pglike/FlinkCDCPGLikeSourceFactory.md renamed to tis-incr/tis-flink-cdc-common/src/main/resources/com/qlangtech/plugins/incr/flink/cdc/pglike/FlinkCDCPGLikeSourceFactory.md

File renamed without changes.

tis-incr/tis-flink-cdc-kingbase-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/kingbase/FlinkCDCKingBaseSourceFactory.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,16 @@
1818

1919
package com.qlangtech.plugins.incr.flink.cdc.kingbase;
2020

21+
import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
2122
import com.qlangtech.plugins.incr.flink.cdc.pglike.FlinkCDCPGLikeSourceFactory;
23+
import com.qlangtech.plugins.incr.flink.cdc.pglike.PGDTOColValProcess;
24+
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
2225
import com.qlangtech.tis.async.message.client.consumer.IMQListener;
2326
import com.qlangtech.tis.extension.TISExtension;
2427
import com.qlangtech.tis.plugin.IEndTypeGetter;
28+
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
29+
import io.debezium.config.Field;
30+
import io.debezium.connector.postgresql.PostgresConnectorConfig;
2531

2632
/**
2733
* @author: 百岁([email protected]
@@ -34,13 +40,31 @@ public IMQListener create() {
3440
return new FlinkCDCKingBaseSourceFunction(this);
3541
}
3642

43+
@Override
44+
public IFlinkColCreator<FlinkCol> createFlinkColCreator(DataSourceMeta sourceMeta) {
45+
IFlinkColCreator<FlinkCol> flinkColCreator = (meta, colIndex) -> {
46+
return meta.getType().accept(new PGDTOColValProcess.PGCDCTypeVisitor(meta, colIndex));
47+
};
48+
return flinkColCreator;
49+
}
50+
3751
@TISExtension()
3852
public static class KingBaseDescriptor extends BasePGLikeDescriptor {
3953
@Override
4054
public String getDisplayName() {
4155
return "Flink-CDC-KingBase";
4256
}
4357

58+
@Override
59+
protected Field getSoltNameField() {
60+
return PostgresConnectorConfig.SLOT_NAME;
61+
}
62+
63+
@Override
64+
protected Field getDropSoltOnStopField() {
65+
return PostgresConnectorConfig.DROP_SLOT_ON_STOP;
66+
}
67+
4468
@Override
4569
public PluginVender getVender() {
4670
return PluginVender.TIS;

tis-incr/tis-flink-cdc-kingbase-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/kingbase/FlinkCDCKingBaseSourceFunction.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
2222
import com.qlangtech.plugins.incr.flink.cdc.kingbase.source.KingBaseDialect;
23-
import com.qlangtech.plugins.incr.flink.cdc.pglike.FlinkCDCPGLikeSourceFactory;
2423
import com.qlangtech.plugins.incr.flink.cdc.pglike.FlinkCDCPGLikeSourceFunction;
2524
import com.qlangtech.plugins.incr.flink.cdc.pglike.PostgreSQLDeserializationSchema;
2625
import com.qlangtech.plugins.incr.flink.cdc.pglike.StartupOptionUtils;
@@ -108,12 +107,12 @@ public boolean visit(DBConfig config, String jdbcUrl, String ip, String dbName)
108107
configFactory.password(dsFactory.password);
109108
configFactory.debeziumProperties(debeziumProperties);
110109
// configFactory.slotName(sourceFactory.slotName);
111-
112-
FlinkCDCPGLikeSourceFactory.debeziumProps.forEach((trip) -> {
113-
// debeziumProperties.setProperty(trip.getMiddle().name(), String.valueOf(trip.getRight().apply(sourceFactory)));
114-
115-
trip.getValue().accept(debeziumProperties,sourceFactory);
116-
});
110+
sourceFactory.setDebeziumProperties(debeziumProperties,sourceFactory);
111+
// FlinkCDCPGLikeSourceFactory.debeziumProps.forEach((trip) -> {
112+
// // debeziumProperties.setProperty(trip.getMiddle().name(), String.valueOf(trip.getRight().apply(sourceFactory)));
113+
//
114+
// trip.getValue().accept(debeziumProperties,sourceFactory);
115+
// });
117116

118117
// debeziumProperties.setProperty(
119118
// io.debezium.connector.postgresql.PostgresConnectorConfig.DROP_SLOT_ON_STOP.name(), String.valueOf(sourceFactory.dropSolt));

0 commit comments

Comments
 (0)