Skip to content

Commit e7a7fe0

Browse files
author
yangmu.0722
committed
[core] Support re-overwrite by spark for chain table
1 parent c7877ec commit e7a7fe0

File tree

4 files changed

+98
-1
lines changed

4 files changed

+98
-1
lines changed

docs/static/img/chain-table.png

260 Bytes
Loading

paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@
2525
import org.apache.paimon.partition.PartitionTimeExtractor;
2626
import org.apache.paimon.predicate.Predicate;
2727
import org.apache.paimon.predicate.PredicateBuilder;
28+
import org.apache.paimon.table.ChainGroupReadTable;
29+
import org.apache.paimon.table.FallbackReadFileStoreTable;
30+
import org.apache.paimon.table.FileStoreTable;
31+
import org.apache.paimon.table.sink.BatchTableCommit;
32+
import org.apache.paimon.table.sink.CommitMessage;
2833
import org.apache.paimon.types.RowType;
2934

3035
import java.time.LocalDateTime;
@@ -204,4 +209,49 @@ public static LinkedHashMap<String, String> calPartValues(
204209
}
205210
return res;
206211
}
212+
213+
public static void postCommit(
214+
FileStoreTable table, boolean overwrite, List<CommitMessage> commitMessages) {
215+
if (overwrite) {
216+
FileStoreTable candidateTbl = table;
217+
if (table instanceof FallbackReadFileStoreTable) {
218+
candidateTbl =
219+
((ChainGroupReadTable) ((FallbackReadFileStoreTable) table).fallback())
220+
.wrapped();
221+
}
222+
FileStoreTable snapshotTbl =
223+
candidateTbl.switchToBranch(table.coreOptions().scanFallbackSnapshotBranch());
224+
InternalRowPartitionComputer partitionComputer =
225+
new InternalRowPartitionComputer(
226+
table.coreOptions().partitionDefaultName(),
227+
table.schema().logicalPartitionType(),
228+
table.schema().partitionKeys().toArray(new String[0]),
229+
table.coreOptions().legacyPartitionName());
230+
List<BinaryRow> overwritePartitions =
231+
commitMessages.stream()
232+
.map(CommitMessage::partition)
233+
.distinct()
234+
.collect(Collectors.toList());
235+
if (!overwritePartitions.isEmpty()) {
236+
List<Map<String, String>> candidatePartitions =
237+
overwritePartitions.stream()
238+
.map(partition -> partitionComputer.generatePartValues(partition))
239+
.collect(Collectors.toList());
240+
try (BatchTableCommit commit = snapshotTbl.newBatchWriteBuilder().newCommit()) {
241+
commit.truncatePartitions(candidatePartitions);
242+
} catch (Exception e) {
243+
throw new RuntimeException(
244+
String.format(
245+
"Failed to truncate partitions in snapshot table %s.",
246+
candidatePartitions),
247+
e);
248+
}
249+
}
250+
}
251+
}
252+
253+
public static boolean chainScanFallbackDeltaBranch(CoreOptions options) {
254+
return options.isChainTable()
255+
&& options.scanFallbackDeltaBranch().equalsIgnoreCase(options.branch());
256+
}
207257
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import org.apache.paimon.options.Options
2323
import org.apache.paimon.spark._
2424
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
2525
import org.apache.paimon.table.FileStoreTable
26+
import org.apache.paimon.table.sink.CommitMessage
27+
import org.apache.paimon.utils.ChainTableUtils
2628

2729
import org.apache.spark.internal.Logging
2830
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
@@ -57,7 +59,7 @@ case class WriteIntoPaimonTable(
5759
}
5860
val commitMessages = writer.write(data)
5961
writer.commit(commitMessages)
60-
62+
postCommit(dynamicPartitionOverwriteMode, overwritePartition, commitMessages)
6163
Seq.empty
6264
}
6365

@@ -82,6 +84,18 @@ case class WriteIntoPaimonTable(
8284
(dynamicPartitionOverwriteMode, overwritePartition)
8385
}
8486

87+
private def postCommit(
88+
dynamicPartitionOverwrite: Boolean,
89+
staticOverwritePartition: Map[String, String],
90+
commitMessages: Seq[CommitMessage]): Unit = {
91+
if (ChainTableUtils.chainScanFallbackDeltaBranch(table.coreOptions())) {
92+
ChainTableUtils.postCommit(
93+
table,
94+
dynamicPartitionOverwrite || staticOverwritePartition != null,
95+
commitMessages.toList.asJava)
96+
}
97+
}
98+
8599
override def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): LogicalPlan =
86100
this.asInstanceOf[WriteIntoPaimonTable]
87101
}

paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.hive.TestHiveMetastore;
2424
import org.apache.paimon.table.FileStoreTableFactory;
2525

26+
import org.apache.spark.sql.Dataset;
2627
import org.apache.spark.sql.Row;
2728
import org.apache.spark.sql.SparkSession;
2829
import org.junit.jupiter.api.AfterAll;
@@ -379,7 +380,23 @@ public void testChainTable(@TempDir java.nio.file.Path tempDir) throws IOExcepti
379380

380381
spark.close();
381382
spark = builder.getOrCreate();
383+
spark.sql("set spark.paimon.branch=delta;");
384+
spark.sql(
385+
"insert overwrite table `my_db1`.`chain_test` values (5, 2, '1', '20250813'),(6, 2, '1', '20250814');");
382386

387+
spark.close();
388+
spark = builder.getOrCreate();
389+
Dataset<Row> df =
390+
spark.sql(
391+
"SELECT t1,t2,t3 FROM `my_db1`.`chain_test$branch_snapshot` where dt = '20250814'");
392+
assertThat(df.count()).isEqualTo(0);
393+
df =
394+
spark.sql(
395+
"SELECT t1,t2,t3 FROM `my_db1`.`chain_test$branch_delta` where dt = '20250814'");
396+
assertThat(df.count()).isEqualTo(1);
397+
398+
spark.close();
399+
spark = builder.getOrCreate();
383400
/** Drop table */
384401
spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
385402

@@ -592,7 +609,23 @@ public void testHourlyChainTable(@TempDir java.nio.file.Path tempDir) throws IOE
592609

593610
spark.close();
594611
spark = builder.getOrCreate();
612+
spark.sql("set spark.paimon.branch=delta;");
613+
spark.sql(
614+
"insert overwrite table `my_db1`.`chain_test` values (6, 2, '1', '20250811', '02');");
595615

616+
spark.close();
617+
spark = builder.getOrCreate();
618+
Dataset<Row> df =
619+
spark.sql(
620+
"SELECT t1,t2,t3 FROM `my_db1`.`chain_test$branch_snapshot` where dt = '20250811' and hour = '02'");
621+
assertThat(df.count()).isEqualTo(0);
622+
df =
623+
spark.sql(
624+
"SELECT t1,t2,t3 FROM `my_db1`.`chain_test$branch_delta` where dt = '20250811' and hour = '02'");
625+
assertThat(df.count()).isEqualTo(1);
626+
627+
spark.close();
628+
spark = builder.getOrCreate();
596629
/** Drop table */
597630
spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
598631

0 commit comments

Comments
 (0)