Skip to content

Commit 7a7373b

Browse files
authored
[core] Fix the issue where aggregate function columns without defining sequenceGroup cause the table to be unavailable (#7046)
1 parent cba396f commit 7a7373b

File tree

2 files changed

+53
-1
lines changed

2 files changed

+53
-1
lines changed

paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.paimon.format.FileFormat;
2727
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
2828
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
29+
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
2930
import org.apache.paimon.options.ConfigOption;
3031
import org.apache.paimon.options.Options;
3132
import org.apache.paimon.table.BucketMode;
@@ -450,11 +451,15 @@ private static void validateFieldsPrefix(TableSchema schema, CoreOptions options
450451

451452
private static void validateSequenceGroup(TableSchema schema, CoreOptions options) {
452453
Map<String, Set<String>> fields2Group = new HashMap<>();
454+
Set<Integer> sequenceGroupFieldIndexs = new HashSet<>();
455+
List<String> fieldNames = schema.fieldNames();
453456
for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
454457
String k = entry.getKey();
455458
String v = entry.getValue();
456-
List<String> fieldNames = schema.fieldNames();
457459
if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
460+
Arrays.stream(v.split(FIELDS_SEPARATOR))
461+
.map(fieldName -> requireField(fieldName, fieldNames))
462+
.forEach(sequenceGroupFieldIndexs::add);
458463
String[] sequenceFieldNames =
459464
k.substring(
460465
FIELDS_PREFIX.length() + 1,
@@ -492,8 +497,33 @@ private static void validateSequenceGroup(TableSchema schema, CoreOptions option
492497
Set<String> group = fields2Group.computeIfAbsent(field, p -> new HashSet<>());
493498
group.addAll(sequenceFieldsList);
494499
}
500+
501+
// add self
502+
Arrays.stream(sequenceFieldNames)
503+
.mapToInt(fieldName -> requireField(fieldName, fieldNames))
504+
.forEach(sequenceGroupFieldIndexs::add);
495505
}
496506
}
507+
508+
if (options.mergeEngine() == MergeEngine.PARTIAL_UPDATE) {
509+
for (String fieldName : fieldNames) {
510+
String aggFunc = options.fieldAggFunc(fieldName);
511+
String aggFuncName = aggFunc == null ? options.fieldsDefaultFunc() : aggFunc;
512+
if (schema.primaryKeys().contains(fieldName)) {
513+
continue;
514+
}
515+
if (aggFuncName != null) {
516+
// last_non_null_value doesn't require sequence group
517+
checkArgument(
518+
aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)
519+
|| sequenceGroupFieldIndexs.contains(
520+
fieldNames.indexOf(fieldName)),
521+
"Must use sequence group for aggregation functions but not found for field %s.",
522+
fieldName);
523+
}
524+
}
525+
}
526+
497527
Set<String> illegalGroup =
498528
fields2Group.values().stream()
499529
.flatMap(Collection::stream)
@@ -689,6 +719,15 @@ private static void validateIncrementalClustering(TableSchema schema, CoreOption
689719
}
690720
}
691721

722+
private static int requireField(String fieldName, List<String> fieldNames) {
723+
int field = fieldNames.indexOf(fieldName);
724+
if (field == -1) {
725+
throw new IllegalArgumentException(
726+
String.format("Field %s can not be found in table schema.", fieldName));
727+
}
728+
return field;
729+
}
730+
692731
public static void validateChainTable(TableSchema schema, CoreOptions options) {
693732
if (options.isChainTable()) {
694733
boolean isPrimaryTbl = schema.primaryKeys() != null && !schema.primaryKeys().isEmpty();

paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,17 @@ public void testBlobTableSchema() {
151151
assertThatThrownBy(() -> validateBlobSchema(options, Collections.singletonList("f2")))
152152
.hasMessage("The BLOB type column can not be part of partition keys.");
153153
}
154+
155+
@Test
156+
public void testPartialUpdateTableAggregateFunctionWithoutSequenceGroup() {
157+
Map<String, String> options = new HashMap<>(2);
158+
options.put("merge-engine", "partial-update");
159+
options.put("fields.f3.aggregate-function", "max");
160+
assertThatThrownBy(() -> validateTableSchemaExec(options))
161+
.hasMessageContaining(
162+
"Must use sequence group for aggregation functions but not found for field");
163+
164+
options.put("fields.f2.sequence-group", "f3");
165+
assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException();
166+
}
154167
}

0 commit comments

Comments
 (0)