Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/query/ee/src/table_ref/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::SnapshotRef;
use databend_common_meta_app::schema::SnapshotRefType;
use databend_common_meta_app::schema::TableLvtCheck;
use databend_common_meta_app::schema::UpdateTableMetaReq;
use databend_common_meta_types::MatchSeq;
Expand Down Expand Up @@ -90,6 +91,7 @@ impl TableRefHandler for RealTableRefHandler {
None => fuse_table.snapshot_loc(),
};

let schema = table_info.schema();
let (new_snapshot, prev_ts) = if let Some(snapshot) = fuse_table
.read_table_snapshot_with_location(snapshot_loc)
.await?
Expand All @@ -102,6 +104,12 @@ impl TableRefHandler for RealTableRefHandler {
table_id
)));
}
if plan.ref_type == SnapshotRefType::Branch && schema.as_ref() != &snapshot.schema {
return Err(ErrorCode::IllegalReference(format!(
"Cannot create branch from snapshot '{}' with a different schema",
snapshot.snapshot_id,
)));
}
let mut new_snapshot = TableSnapshot::try_from_previous(
snapshot.clone(),
Some(seq),
Expand All @@ -113,7 +121,7 @@ impl TableRefHandler for RealTableRefHandler {
let new_snapshot = TableSnapshot::try_new(
Some(seq),
None,
table_info.schema().as_ref().clone(),
schema.as_ref().clone(),
Default::default(),
vec![],
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ impl Interpreter for AddTableColumnInterpreter {
table_info.desc
)));
}
if fuse_table.contains_active_branches() {
return Err(ErrorCode::AlterTableError(format!(
"Adding stored computed columns is not allowed on table '{}' while it has active branches",
table_info.desc
)));
}
let base_snapshot = fuse_table.read_table_snapshot().await?;
let prev_snapshot_id = base_snapshot.snapshot_id().map(|(id, _)| id);
let table_meta_timestamps = self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,13 @@ impl ModifyTableColumnInterpreter {
)));
}

if fuse_table.contains_active_branches() {
return Err(ErrorCode::AlterTableError(format!(
"table {} has active branches, modifying columns should be avoided",
table_info.desc
)));
}

// construct sql for selecting data from old table.
// computed columns are ignored, as it is build from other columns.
let query_fields = new_schema_without_computed_fields
Expand Down
11 changes: 11 additions & 0 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::time::Instant;
use async_channel::Receiver;
use chrono::Duration;
use chrono::TimeDelta;
use chrono::Utc;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_catalog::catalog::StorageDescription;
use databend_common_catalog::plan::DataSourcePlan;
Expand Down Expand Up @@ -62,6 +63,7 @@ use databend_common_io::constants::DEFAULT_BLOCK_PER_SEGMENT;
use databend_common_io::constants::DEFAULT_BLOCK_ROW_COUNT;
use databend_common_meta_app::schema::DatabaseType;
use databend_common_meta_app::schema::SnapshotRef;
use databend_common_meta_app::schema::SnapshotRefType;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
Expand Down Expand Up @@ -833,6 +835,15 @@ impl FuseTable {
.cluster_type()
.is_none_or(|v| matches!(v, ClusterType::Hilbert)))
}

pub fn contains_active_branches(&self) -> bool {
let now = Utc::now();
self.table_info
.meta
.refs
.values()
.any(|r| r.typ == SnapshotRefType::Branch && r.expire_at.is_none_or(|t| t > now))
}
}

#[async_trait::async_trait]
Expand Down
17 changes: 17 additions & 0 deletions tests/sqllogictests/suites/ee/08_table_ref/08_0000_branch.test
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ optimize table t1 compact
statement ok
ALTER TABLE t1 CREATE BRANCH dev

## Schema change should be rejected if there are active branches
statement error 1132
ALTER TABLE t1 MODIFY COLUMN a STRING

statement error 1132
ALTER TABLE t1 ADD COLUMN c INT64 AS (a + 1) STORED

query IT
SELECT * FROM t1/dev ORDER BY a
----
Expand Down Expand Up @@ -110,6 +117,16 @@ SELECT * FROM t1/from_dev ORDER BY a
2 b
3 c

statement ok
ALTER TABLE t1 ADD COLUMN c INT default 0

query ITI
SELECT * FROM t1/from_dev ORDER BY a
----
1 a 0
2 b 0
3 c 0

## Test duplicate branch name
statement error 2746
ALTER TABLE t1 CREATE BRANCH dev
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
drop database if exists test_ee_11_table_ref
create database test_ee_11_table_ref
create table t11_0000(a int)
two insertions
latest snapshot should contain 3 rows
3
create branch at snapshot (expect 2 rows)
2
create branch at timestamp (expect 2 rows)
2
alter table schema
create branch from old snapshot after schema change should fail
1
1
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../../../shell_env.sh

DB="test_ee_11_table_ref"
TBL="t11_0000"

export BENDSQL_CLIENT_CONNECT_DB="${BENDSQL_CLIENT_CONNECT} -D ${DB}"
export BENDSQL_CLIENT_OUTPUT_NULL_DB="${BENDSQL_CLIENT_OUTPUT_NULL} -D ${DB}"

echo "drop database if exists ${DB}"
echo "DROP DATABASE IF EXISTS ${DB}" | $BENDSQL_CLIENT_OUTPUT_NULL
echo "create database ${DB}"
echo "CREATE DATABASE ${DB}" | $BENDSQL_CLIENT_OUTPUT_NULL

echo "create table ${TBL}(a int)"
echo "CREATE TABLE ${TBL}(a INT)" | $BENDSQL_CLIENT_OUTPUT_NULL_DB

echo "two insertions"
echo "INSERT INTO ${TBL} VALUES (1),(2)" | $BENDSQL_CLIENT_OUTPUT_NULL_DB

echo "INSERT INTO ${TBL} VALUES (3)" | $BENDSQL_CLIENT_OUTPUT_NULL_DB
echo "latest snapshot should contain 3 rows"
echo "SELECT COUNT(*) FROM ${TBL}" | $BENDSQL_CLIENT_CONNECT_DB

SNAPSHOT_ID=$(
echo "SELECT previous_snapshot_id FROM fuse_snapshot('${DB}','${TBL}') WHERE row_count=3" \
| $BENDSQL_CLIENT_CONNECT
)
TIMEPOINT=$(
echo "SELECT timestamp FROM fuse_snapshot('${DB}','${TBL}') WHERE row_count=2" \
| $BENDSQL_CLIENT_CONNECT
)

echo "create branch at snapshot (expect 2 rows)"
echo "ALTER TABLE ${TBL} CREATE BRANCH b_snapshot AT (SNAPSHOT => '${SNAPSHOT_ID}')" \
| $BENDSQL_CLIENT_OUTPUT_NULL_DB
echo "SELECT COUNT(*) FROM ${TBL}/b_snapshot" | $BENDSQL_CLIENT_CONNECT_DB

echo "create branch at timestamp (expect 2 rows)"
echo "ALTER TABLE ${TBL} CREATE BRANCH b_ts AT (TIMESTAMP => '${TIMEPOINT}'::TIMESTAMP)" \
| $BENDSQL_CLIENT_OUTPUT_NULL_DB
echo "SELECT COUNT(*) FROM ${TBL}/b_ts" | $BENDSQL_CLIENT_CONNECT_DB

echo "alter table schema"
echo "ALTER TABLE ${TBL} ADD COLUMN b INT" | $BENDSQL_CLIENT_OUTPUT_NULL_DB

echo "create branch from old snapshot after schema change should fail"
output=$(
echo "ALTER TABLE ${TBL} CREATE BRANCH b_err AT (SNAPSHOT => '${SNAPSHOT_ID}')" \
| $BENDSQL_CLIENT_CONNECT_DB 2>&1 || true
)
cnt=$(echo "$output" | grep -c "\\[2747\\]")
echo "$cnt"
if [ "$cnt" -ne 1 ]; then
exit 1
fi
cnt=$(echo "$output" | grep -c "different schema")
echo "$cnt"
if [ "$cnt" -ne 1 ]; then
exit 1
fi

echo "DROP DATABASE IF EXISTS ${DB}" | $BENDSQL_CLIENT_OUTPUT_NULL
Loading