Skip to content

Commit ff4b120

Browse files
authored
[server] Fix zk partition residual when using dynamic partition (#1187)
1 parent 8ea5f46 commit ff4b120

File tree

7 files changed

+101
-47
lines changed

7 files changed

+101
-47
lines changed

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/utils/FlinkTestBase.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -250,14 +250,14 @@ public static Map<Long, String> createPartitions(
250250
new TabletServerInfo(2, "rack2")
251251
});
252252

253-
// register partition assignments
254-
zkClient.registerPartitionAssignment(
253+
// register partition assignments and metadata
254+
zkClient.registerPartitionAssignmentAndMetadata(
255255
partitionId,
256+
partition,
256257
new PartitionAssignment(
257-
tableInfo.getTableId(), assignment.getBucketAssignments()));
258-
259-
// register partition
260-
zkClient.registerPartition(tablePath, tableInfo.getTableId(), partition, partitionId);
258+
tableInfo.getTableId(), assignment.getBucketAssignments()),
259+
tablePath,
260+
tableInfo.getTableId());
261261
}
262262
return newPartitionIds;
263263
}

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -438,10 +438,9 @@ public void createPartition(
438438

439439
try {
440440
long partitionId = zookeeperClient.getPartitionIdAndIncrement();
441-
// register partition assignments to zk first
442-
zookeeperClient.registerPartitionAssignment(partitionId, partitionAssignment);
443-
// then register the partition metadata to zk
444-
zookeeperClient.registerPartition(tablePath, tableId, partitionName, partitionId);
441+
// register partition assignments and partition metadata to zk in transaction
442+
zookeeperClient.registerPartitionAssignmentAndMetadata(
443+
partitionId, partitionName, partitionAssignment, tablePath, tableId);
445444
LOG.info(
446445
"Register partition {} to zookeeper for table [{}].", partitionName, tablePath);
447446
} catch (KeeperException.NodeExistsException nodeExistsException) {

fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -190,16 +190,6 @@ public void registerTableAssignment(long tableId, TableAssignment tableAssignmen
190190
LOG.info("Registered table assignment {} for table id {}.", tableAssignment, tableId);
191191
}
192192

193-
/** Register partition assignment to ZK. */
194-
public void registerPartitionAssignment(
195-
long partitionId, PartitionAssignment partitionAssignment) throws Exception {
196-
String path = PartitionIdZNode.path(partitionId);
197-
zkClient.create()
198-
.creatingParentsIfNeeded()
199-
.withMode(CreateMode.PERSISTENT)
200-
.forPath(path, PartitionIdZNode.encode(partitionAssignment));
201-
}
202-
203193
/** Get the table assignment in ZK. */
204194
public Optional<TableAssignment> getTableAssignment(long tableId) throws Exception {
205195
Optional<byte[]> bytes = getOrEmpty(TableIdZNode.path(tableId));
@@ -502,23 +492,68 @@ public int getPartitionNumber(TablePath tablePath) throws Exception {
502492
return stat.getNumChildren();
503493
}
504494

505-
/** Create a partition for a table in ZK. */
506-
public void registerPartition(
507-
TablePath tablePath, long tableId, String partitionName, long partitionId)
508-
throws Exception {
509-
String path = PartitionZNode.path(tablePath, partitionName);
510-
zkClient.create()
511-
.creatingParentsIfNeeded()
512-
.withMode(CreateMode.PERSISTENT)
513-
.forPath(path, PartitionZNode.encode(new TablePartition(tableId, partitionId)));
514-
}
515-
516495
/** Delete a partition for a table in ZK. */
517496
public void deletePartition(TablePath tablePath, String partitionName) throws Exception {
518497
String path = PartitionZNode.path(tablePath, partitionName);
519498
zkClient.delete().forPath(path);
520499
}
521500

501+
/** Register partition assignment and metadata in transaction. */
502+
public void registerPartitionAssignmentAndMetadata(
503+
long partitionId,
504+
String partitionName,
505+
PartitionAssignment partitionAssignment,
506+
TablePath tablePath,
507+
long tableId)
508+
throws Exception {
509+
// Merge "registerPartitionAssignment()" and "registerPartition()"
510+
// into one transaction. This is to avoid the case that the partition assignment is
511+
// registered
512+
// but the partition metadata is not registered.
513+
514+
// Create parent dictionary in advance.
515+
try {
516+
String tabletServerPartitionParentPath = ZkData.PartitionIdsZNode.path();
517+
zkClient.create()
518+
.creatingParentsIfNeeded()
519+
.withMode(CreateMode.PERSISTENT)
520+
.forPath(tabletServerPartitionParentPath);
521+
} catch (KeeperException.NodeExistsException e) {
522+
// ignore
523+
}
524+
try {
525+
String metadataPartitionParentPath = PartitionsZNode.path(tablePath);
526+
zkClient.create()
527+
.creatingParentsIfNeeded()
528+
.withMode(CreateMode.PERSISTENT)
529+
.forPath(metadataPartitionParentPath);
530+
} catch (KeeperException.NodeExistsException e) {
531+
// ignore
532+
}
533+
534+
List<CuratorOp> ops = new ArrayList<>(2);
535+
String tabletServerPartitionPath = PartitionIdZNode.path(partitionId);
536+
CuratorOp tabletServerPartitionNode =
537+
zkClient.transactionOp()
538+
.create()
539+
.withMode(CreateMode.PERSISTENT)
540+
.forPath(
541+
tabletServerPartitionPath,
542+
PartitionIdZNode.encode(partitionAssignment));
543+
544+
String metadataPath = PartitionZNode.path(tablePath, partitionName);
545+
CuratorOp metadataPartitionNode =
546+
zkClient.transactionOp()
547+
.create()
548+
.withMode(CreateMode.PERSISTENT)
549+
.forPath(
550+
metadataPath,
551+
PartitionZNode.encode(new TablePartition(tableId, partitionId)));
552+
553+
ops.add(tabletServerPartitionNode);
554+
ops.add(metadataPartitionNode);
555+
zkClient.transaction().forOperations(ops);
556+
}
522557
// --------------------------------------------------------------------------------------------
523558
// Schema
524559
// --------------------------------------------------------------------------------------------

fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -802,10 +802,10 @@ private Tuple2<PartitionIdName, PartitionIdName> preparePartitionAssignment(
802802
long partition2Id = zookeeperClient.getPartitionIdAndIncrement();
803803
String partition1Name = "2024";
804804
String partition2Name = "2025";
805-
zookeeperClient.registerPartitionAssignment(partition1Id, partitionAssignment);
806-
zookeeperClient.registerPartition(tablePath, tableId, partition1Name, partition1Id);
807-
zookeeperClient.registerPartitionAssignment(partition2Id, partitionAssignment);
808-
zookeeperClient.registerPartition(tablePath, tableId, partition2Name, partition2Id);
805+
zookeeperClient.registerPartitionAssignmentAndMetadata(
806+
partition1Id, partition1Name, partitionAssignment, tablePath, tableId);
807+
zookeeperClient.registerPartitionAssignmentAndMetadata(
808+
partition2Id, partition2Name, partitionAssignment, tablePath, tableId);
809809

810810
return Tuple2.of(
811811
new PartitionIdName(partition1Id, partition1Name),

fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,13 +266,18 @@ void testCreateAndDropPartition() throws Exception {
266266

267267
PartitionAssignment partitionAssignment =
268268
new PartitionAssignment(tableId, createAssignment().getBucketAssignments());
269-
zookeeperClient.registerPartitionAssignment(
270-
zookeeperClient.getPartitionIdAndIncrement(), partitionAssignment);
269+
String partitionName = "2024";
270+
zookeeperClient.registerPartitionAssignmentAndMetadata(
271+
zookeeperClient.getPartitionIdAndIncrement(),
272+
partitionName,
273+
partitionAssignment,
274+
DATA1_TABLE_PATH,
275+
tableId);
271276

272277
// create partition
273278
long partitionId = 1L;
274279
tableManager.onCreateNewPartition(
275-
DATA1_TABLE_PATH, tableId, partitionId, "2024", partitionAssignment);
280+
DATA1_TABLE_PATH, tableId, partitionId, partitionName, partitionAssignment);
276281

277282
// all replicas should be online
278283
checkReplicaOnline(tableId, partitionId, partitionAssignment);

fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,11 @@ void testPartitionedTable() throws Exception {
201201
new TabletServerInfo(2, "rack2")
202202
})
203203
.getBucketAssignments());
204-
// register assignment
205-
zookeeperClient.registerPartitionAssignment(1L, partitionAssignment);
206-
zookeeperClient.registerPartitionAssignment(2L, partitionAssignment);
207-
208-
// register partitions
209-
zookeeperClient.registerPartition(tablePath, tableId, "2011", 1L);
210-
zookeeperClient.registerPartition(tablePath, tableId, "2022", 2L);
204+
// register assignment and metadata
205+
zookeeperClient.registerPartitionAssignmentAndMetadata(
206+
1L, "2011", partitionAssignment, tablePath, tableId);
207+
zookeeperClient.registerPartitionAssignmentAndMetadata(
208+
2L, "2022", partitionAssignment, tablePath, tableId);
211209

212210
// create partitions events
213211
expectedEvents.add(

fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alibaba.fluss.server.zk;
1919

2020
import com.alibaba.fluss.cluster.Endpoint;
21+
import com.alibaba.fluss.cluster.TabletServerInfo;
2122
import com.alibaba.fluss.config.ConfigOptions;
2223
import com.alibaba.fluss.config.Configuration;
2324
import com.alibaba.fluss.metadata.Schema;
@@ -31,6 +32,7 @@
3132
import com.alibaba.fluss.server.zk.data.BucketSnapshot;
3233
import com.alibaba.fluss.server.zk.data.CoordinatorAddress;
3334
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
35+
import com.alibaba.fluss.server.zk.data.PartitionAssignment;
3436
import com.alibaba.fluss.server.zk.data.TableAssignment;
3537
import com.alibaba.fluss.server.zk.data.TableRegistration;
3638
import com.alibaba.fluss.server.zk.data.TabletServerRegistration;
@@ -57,6 +59,7 @@
5759
import java.util.Optional;
5860
import java.util.Set;
5961

62+
import static com.alibaba.fluss.server.utils.TableAssignmentUtils.generateAssignment;
6063
import static org.assertj.core.api.Assertions.assertThat;
6164
import static org.assertj.core.api.Assertions.assertThatThrownBy;
6265

@@ -411,8 +414,22 @@ void testPartition() throws Exception {
411414
assertThat(partitions).isEmpty();
412415

413416
// test create new partitions
414-
zookeeperClient.registerPartition(tablePath, tableId, "p1", 1L);
415-
zookeeperClient.registerPartition(tablePath, tableId, "p2", 2L);
417+
PartitionAssignment partitionAssignment =
418+
new PartitionAssignment(
419+
tableId,
420+
generateAssignment(
421+
3,
422+
3,
423+
new TabletServerInfo[] {
424+
new TabletServerInfo(0, "rack0"),
425+
new TabletServerInfo(1, "rack1"),
426+
new TabletServerInfo(2, "rack2")
427+
})
428+
.getBucketAssignments());
429+
zookeeperClient.registerPartitionAssignmentAndMetadata(
430+
1L, "p1", partitionAssignment, tablePath, tableId);
431+
zookeeperClient.registerPartitionAssignmentAndMetadata(
432+
2L, "p2", partitionAssignment, tablePath, tableId);
416433

417434
// check created partitions
418435
partitions = zookeeperClient.getPartitions(tablePath);

0 commit comments

Comments
 (0)