Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ pub fn mark_table_index_as_deleted(
TableIndexType::Inverted => MarkedDeletedIndexType::INVERTED,
TableIndexType::Ngram => MarkedDeletedIndexType::NGRAM,
TableIndexType::Vector => MarkedDeletedIndexType::VECTOR,
TableIndexType::Spatial => MarkedDeletedIndexType::SPATIAL,
};
let marked_deleted_table_index_meta = MarkedDeletedIndexMeta {
dropped_on: Utc::now(),
Expand Down
1 change: 1 addition & 0 deletions src/meta/app/src/schema/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub enum MarkedDeletedIndexType {
INVERTED = 2,
NGRAM = 3,
VECTOR = 4,
SPATIAL = 5,
}

#[derive(Clone, Debug, Eq, PartialEq)]
Expand Down
4 changes: 4 additions & 0 deletions src/meta/app/src/schema/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ pub enum TableIndexType {
Inverted = 0,
Ngram = 1,
Vector = 2,
Spatial = 3,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -523,6 +524,9 @@ impl Display for TableIndexType {
TableIndexType::Vector => {
write!(f, "VECTOR")
}
TableIndexType::Spatial => {
write!(f, "SPATIAL")
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/query/ast/src/ast/statements/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub enum TableIndexType {
Inverted,
Ngram,
Vector,
Spatial,
}

impl Display for TableIndexType {
Expand All @@ -61,6 +62,9 @@ impl Display for TableIndexType {
TableIndexType::Vector => {
write!(f, "VECTOR")
}
TableIndexType::Spatial => {
write!(f, "SPATIAL")
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6367,6 +6367,7 @@ fn index_type(i: Input) -> IResult<TableIndexType> {
value(TableIndexType::Inverted, rule! { INVERTED }),
value(TableIndexType::Ngram, rule! { NGRAM }),
value(TableIndexType::Vector, rule! { VECTOR }),
value(TableIndexType::Spatial, rule! { SPATIAL }),
))
.parse(i)
}
2 changes: 2 additions & 0 deletions src/query/ast/src/parser/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,8 @@ pub enum TokenKind {
SESSION,
#[token("SETTINGS", ignore(ascii_case))]
SETTINGS,
#[token("SPATIAL", ignore(ascii_case))]
SPATIAL,
#[token("VARIABLES", ignore(ascii_case))]
VARIABLES,
#[token("STAGES", ignore(ascii_case))]
Expand Down
4 changes: 2 additions & 2 deletions src/query/ast/tests/it/testdata/stmt-error.txt
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ error:
--> SQL:1:6
|
1 | drop a
| ^ unexpected `a`, expecting `TASK`, `TABLE`, `CATALOG`, `MASKING`, `DATABASE`, `PASSWORD`, `WAREHOUSE`, `AGGREGATING`, `WORKLOAD`, `SCHEMA`, `DICTIONARY`, `VIEW`, `INVERTED`, `NGRAM`, `VECTOR`, `STAGE`, `TAG`, `FILE`, `PIPE`, `NOTIFICATION`, `CONNECTION`, `ROW`, `USER`, `ROLE`, `FUNCTION`, `NETWORK`, `PROCEDURE`, `STREAM`, or `SEQUENCE`
| ^ unexpected `a`, expecting `TASK`, `TABLE`, `CATALOG`, `SPATIAL`, `MASKING`, `DATABASE`, `PASSWORD`, `WAREHOUSE`, `AGGREGATING`, `WORKLOAD`, `SCHEMA`, `DICTIONARY`, `VIEW`, `INVERTED`, `NGRAM`, `VECTOR`, `STAGE`, `TAG`, `FILE`, `PIPE`, `NOTIFICATION`, `CONNECTION`, `ROW`, `USER`, `ROLE`, `FUNCTION`, `NETWORK`, `PROCEDURE`, `STREAM`, or `SEQUENCE`


---------- Input ----------
Expand Down Expand Up @@ -281,7 +281,7 @@ error:
--> SQL:1:6
|
1 | drop usar if exists 'test-j';
| ^^^^ unexpected `usar`, expecting `USER`, `STREAM`, `STAGE`, `PASSWORD`, `WAREHOUSE`, `AGGREGATING`, `TAG`, `ROLE`, `TABLE`, `SCHEMA`, `VECTOR`, `CATALOG`, `NETWORK`, `WORKLOAD`, `DATABASE`, `INVERTED`, `FUNCTION`, `PROCEDURE`, `TASK`, `NOTIFICATION`, `NGRAM`, `MASKING`, `SEQUENCE`, `DICTIONARY`, `VIEW`, `FILE`, `PIPE`, `CONNECTION`, or `ROW`
| ^^^^ unexpected `usar`, expecting `USER`, `STREAM`, `STAGE`, `SPATIAL`, `PASSWORD`, `WAREHOUSE`, `AGGREGATING`, `TAG`, `ROLE`, `TABLE`, `SCHEMA`, `VECTOR`, `CATALOG`, `NETWORK`, `WORKLOAD`, `DATABASE`, `INVERTED`, `FUNCTION`, `PROCEDURE`, `TASK`, `NOTIFICATION`, `NGRAM`, `MASKING`, `SEQUENCE`, `DICTIONARY`, `VIEW`, `FILE`, `PIPE`, `CONNECTION`, or `ROW`


---------- Input ----------
Expand Down
6 changes: 3 additions & 3 deletions src/query/service/src/interpreters/hook/refresh_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
}

// Generate sync inverted indexes.
let inverted_index_plans =
generate_refresh_inverted_index_plan(ctx.clone(), &desc, table).await?;
let inverted_index_plans = generate_refresh_table_index_plan(ctx.clone(), &desc, table).await?;
plans.extend_from_slice(&inverted_index_plans);

let mut tasks = Vec::with_capacity(std::cmp::min(
Expand Down Expand Up @@ -257,7 +256,7 @@ async fn build_refresh_index_plan(
.await
}

async fn generate_refresh_inverted_index_plan(
async fn generate_refresh_table_index_plan(
ctx: Arc<QueryContext>,
desc: &RefreshDesc,
table: Arc<dyn Table>,
Expand All @@ -274,6 +273,7 @@ async fn generate_refresh_inverted_index_plan(
TableIndexType::Inverted => ast::TableIndexType::Inverted,
TableIndexType::Ngram => ast::TableIndexType::Ngram,
TableIndexType::Vector => ast::TableIndexType::Vector,
TableIndexType::Spatial => ast::TableIndexType::Spatial,
};
let plan = RefreshTableIndexPlan {
index_type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl Interpreter for CreateTableIndexInterpreter {
ast::TableIndexType::Inverted => TableIndexType::Inverted,
ast::TableIndexType::Ngram => TableIndexType::Ngram,
ast::TableIndexType::Vector => TableIndexType::Vector,
ast::TableIndexType::Spatial => TableIndexType::Spatial,
};

let create_index_req = CreateTableIndexReq {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl Interpreter for DropTableIndexInterpreter {
ast::TableIndexType::Inverted => TableIndexType::Inverted,
ast::TableIndexType::Ngram => TableIndexType::Ngram,
ast::TableIndexType::Vector => TableIndexType::Vector,
ast::TableIndexType::Spatial => TableIndexType::Spatial,
};

let drop_index_req = DropTableIndexReq {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl Interpreter for RefreshTableIndexInterpreter {
ast::TableIndexType::Inverted => TableIndexType::Inverted,
ast::TableIndexType::Ngram => TableIndexType::Ngram,
ast::TableIndexType::Vector => TableIndexType::Vector,
ast::TableIndexType::Spatial => todo!(),
ast::TableIndexType::Aggregating => unreachable!(),
};

Expand Down
52 changes: 52 additions & 0 deletions src/query/sql/src/planner/binder/ddl/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,12 @@ impl Binder {
let index_options = self.validate_vector_index_options(index_options)?;
(column_ids, index_options, TableIndexType::Vector)
}
AstTableIndexType::Spatial => {
let column_ids =
self.validate_spatial_index_columns(table_schema.clone(), columns)?;
let index_options = self.validate_spatial_index_options(index_options)?;
(column_ids, index_options, TableIndexType::Spatial)
}
AstTableIndexType::Aggregating => unreachable!(),
};

Expand Down Expand Up @@ -831,6 +837,52 @@ impl Binder {
Ok(options)
}

pub(in crate::planner::binder) fn validate_spatial_index_columns(
&self,
table_schema: TableSchemaRef,
columns: &[Identifier],
) -> Result<Vec<ColumnId>> {
let mut column_set = BTreeSet::new();
for column in columns {
match table_schema.field_with_name(&column.name) {
Ok(field) => {
if !matches!(
field.data_type.remove_nullable(),
TableDataType::Geometry | TableDataType::Geography
) {
return Err(ErrorCode::UnsupportedIndex(format!(
"Spatial index only support Geometry and Geography type, but the type of column {} is {}",
column, field.data_type
)));
}
if column_set.contains(&field.column_id) {
return Err(ErrorCode::UnsupportedIndex(format!(
"Spatial index column must be unique, but column {} is duplicate",
column.name
)));
}
column_set.insert(field.column_id);
}
Err(_) => {
return Err(ErrorCode::UnsupportedIndex(format!(
"Table does not have column {}",
column
)));
}
}
}
Ok(Vec::from_iter(column_set))
}

pub(in crate::planner::binder) fn validate_spatial_index_options(
&self,
_index_options: &BTreeMap<String, String>,
) -> Result<BTreeMap<String, String>> {
let options = BTreeMap::new();
// todo
Ok(options)
}

#[async_backtrace::framed]
pub(in crate::planner::binder) async fn bind_drop_table_index(
&mut self,
Expand Down
9 changes: 9 additions & 0 deletions src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2081,6 +2081,15 @@ impl Binder {
self.validate_vector_index_options(&table_index_def.index_options)?;
(TableIndexType::Vector, column_ids, options)
}
AstTableIndexType::Spatial => {
let column_ids = self.validate_spatial_index_columns(
table_schema.clone(),
&table_index_def.columns,
)?;
let options =
self.validate_spatial_index_options(&table_index_def.index_options)?;
(TableIndexType::Spatial, column_ids, options)
}
AstTableIndexType::Aggregating => unreachable!(),
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
statement ok
drop database if exists test_spatial_index

statement ok
create database test_spatial_index

statement ok
use test_spatial_index

statement ok
CREATE TABLE IF NOT EXISTS t(id Int, geom Geometry, geog Geography, SPATIAL INDEX idx (geom, geog)) Engine = Fuse

query TT
SHOW CREATE TABLE t
----
t CREATE TABLE t ( id INT NULL, geom GEOMETRY NULL, geog GEOGRAPHY NULL, SYNC SPATIAL INDEX idx (geom, geog) ) ENGINE=FUSE

statement ok
DROP SPATIAL INDEX idx ON t;

statement error
CREATE SPATIAL INDEX idx2 ON t(id);

statement ok
CREATE SPATIAL INDEX idx2 ON t(geom, geog);

query TT
SHOW CREATE TABLE t
----
t CREATE TABLE t ( id INT NULL, geom GEOMETRY NULL, geog GEOGRAPHY NULL, SYNC SPATIAL INDEX idx2 (geom, geog) ) ENGINE=FUSE

statement error
DROP INVERTED INDEX idx2 ON t;

statement ok
use default

statement ok
drop database test_spatial_index

Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,29 @@
ontime_200.csv
199

199 2020 769
199 2020.0 769
--ontime_200.csv.gz
ontime_200.csv.gz
199

199 2020 769
199 2020.0 769
--ontime_200.csv.zst
ontime_200.csv.zst
199

199 2020 769
199 2020.0 769
--ontime_200.csv.bz2
ontime_200.csv.bz2
199

199 2020 769
199 2020.0 769
--ontime_200.ndjson
ontime_200.ndjson
199

199 2020 769
199 2020.0 769
--ontime_200.parquet
ontime_200.parquet
199

199 2020 769
199 2020.0 769
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ MQ 12316.754450296687
NW 11370.336373327018
OH 12599.259602036094
2006 12.16979169791698
99999
99999.0
2006 1
San Diego, CA Los Angeles, CA 622
Los Angeles, CA San Diego, CA 619
Expand Down
Loading
Loading