Skip to content

Commit cce425a

Browse files
committed
Merge branch 'main' into feat-gist-index
2 parents b133f89 + ea8274f commit cce425a

File tree

164 files changed

+8155
-7607
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

164 files changed

+8155
-7607
lines changed

src/common/base/src/runtime/perf/query_perf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ impl QueryPerf {
5555
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
5656
.set_filter_func(filter_closure)
5757
.build()
58-
.map_err(|_e| ErrorCode::Internal("Failed to create profiler"))?;
58+
.map_err(|e| ErrorCode::Internal(format!("Failed to create profiler, {e}")))?;
5959
debug!("starting perf with frequency: {}", frequency);
6060
let mut payload = ThreadTracker::new_tracking_payload();
6161
payload.perf_enabled = true;

src/meta/api/src/kv_fetch_util.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ use databend_common_meta_types::MetaNetworkError;
2626
use databend_common_meta_types::SeqV;
2727
use databend_common_meta_types::TxnGetResponse;
2828
use databend_common_meta_types::UpsertKV;
29+
use databend_common_meta_types::errors;
2930
use databend_common_proto_conv::FromToProto;
31+
use futures::TryStreamExt;
3032

3133
use crate::deserialize_struct;
3234
use crate::deserialize_u64;
@@ -155,6 +157,42 @@ pub async fn list_u64_value<K: kvapi::Key>(
155157
Ok((structured_keys, values))
156158
}
157159

160+
/// Batch get u64 values by keys.
161+
///
162+
/// Returns a vec of Option<u64> in the same order as input keys.
163+
/// None means the key does not exist.
164+
pub async fn mget_u64_values<K: kvapi::Key>(
165+
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
166+
keys: &[K],
167+
) -> Result<Vec<Option<u64>>, MetaError> {
168+
if keys.is_empty() {
169+
return Ok(vec![]);
170+
}
171+
172+
let str_keys: Vec<String> = keys.iter().map(|k| k.to_string_key()).collect();
173+
let mut strm = kv_api.get_kv_stream(&str_keys).await?;
174+
175+
let mut results = Vec::with_capacity(keys.len());
176+
while let Some(item) = strm.try_next().await? {
177+
if let Some(seq_v) = item.value {
178+
let id = *deserialize_u64(&seq_v.data)?;
179+
results.push(Some(id));
180+
} else {
181+
results.push(None);
182+
}
183+
}
184+
185+
if results.len() != keys.len() {
186+
return Err(
187+
errors::IncompleteStream::new(keys.len() as u64, results.len() as u64)
188+
.context(" while mget_u64_values")
189+
.into(),
190+
);
191+
}
192+
193+
Ok(results)
194+
}
195+
158196
/// Generate an id on metasrv.
159197
///
160198
/// Ids are categorized by generators.

src/meta/api/src/table_api.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ use crate::kv_app_error::KVAppError;
128128
use crate::kv_fetch_util::deserialize_id_get_response;
129129
use crate::kv_fetch_util::deserialize_struct_get_response;
130130
use crate::kv_fetch_util::mget_pb_values;
131+
use crate::kv_fetch_util::mget_u64_values;
131132
use crate::kv_pb_api::KVPbApi;
132133
use crate::kv_pb_crud_api::KVPbCrudApi;
133134
use crate::list_u64_value;
@@ -1530,6 +1531,71 @@ where
15301531
return Ok(Arc::new(tb_info));
15311532
}
15321533

1534+
/// Get multiple tables by db_id and table names in batch.
1535+
/// Returns TableInfo for tables that exist, in the same order as input.
1536+
#[logcall::logcall]
1537+
#[fastrace::trace]
1538+
async fn mget_tables(
1539+
&self,
1540+
db_id: u64,
1541+
db_name: &str,
1542+
table_names: &[String],
1543+
) -> Result<Vec<Arc<TableInfo>>, KVAppError> {
1544+
debug!(db_id = db_id, table_names :? = table_names; "TableApi: {}", func_name!());
1545+
1546+
// Build DBIdTableName keys for all table names
1547+
let dbid_tbnames: Vec<DBIdTableName> = table_names
1548+
.iter()
1549+
.map(|name| DBIdTableName::new(db_id, name))
1550+
.collect();
1551+
1552+
// Batch get table ids
1553+
let table_ids = mget_u64_values(self, &dbid_tbnames).await?;
1554+
1555+
// Collect valid table ids with their names
1556+
let mut valid_tables: Vec<(String, u64)> = Vec::with_capacity(table_names.len());
1557+
for (dbid_tbname, table_id_opt) in dbid_tbnames.into_iter().zip(table_ids.into_iter()) {
1558+
if let Some(table_id) = table_id_opt {
1559+
valid_tables.push((dbid_tbname.table_name, table_id));
1560+
}
1561+
}
1562+
1563+
if valid_tables.is_empty() {
1564+
return Ok(vec![]);
1565+
}
1566+
1567+
// Batch get table metas
1568+
let table_id_keys: Vec<TableId> = valid_tables
1569+
.iter()
1570+
.map(|(_, id)| TableId { table_id: *id })
1571+
.collect();
1572+
let seq_metas = self.get_pb_values_vec(table_id_keys).await?;
1573+
1574+
// Build TableInfo for valid tables
1575+
let db_type = DatabaseType::NormalDB;
1576+
let mut results = Vec::with_capacity(valid_tables.len());
1577+
for ((table_name, table_id), seq_meta_opt) in
1578+
valid_tables.into_iter().zip(seq_metas.into_iter())
1579+
{
1580+
if let Some(seq_meta) = seq_meta_opt {
1581+
let tb_info = TableInfo {
1582+
ident: TableIdent {
1583+
table_id,
1584+
seq: seq_meta.seq,
1585+
},
1586+
desc: format!("'{}'.'{}'", db_name, table_name),
1587+
name: table_name,
1588+
meta: seq_meta.data,
1589+
db_type: db_type.clone(),
1590+
catalog_info: Default::default(),
1591+
};
1592+
results.push(Arc::new(tb_info));
1593+
}
1594+
}
1595+
1596+
Ok(results)
1597+
}
1598+
15331599
#[logcall::logcall]
15341600
#[fastrace::trace]
15351601
async fn get_table_by_id(

src/query/catalog/src/catalog/interface.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,21 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
296296
table_name: &str,
297297
) -> Result<Arc<dyn Table>>;
298298

299+
/// Get multiple tables by db and table names.
300+
/// Returns tables in the same order as the input table_names.
301+
/// If a table is not found, it will not be included in the result.
302+
async fn mget_tables(
303+
&self,
304+
_tenant: &Tenant,
305+
_db_name: &str,
306+
_table_names: &[String],
307+
) -> Result<Vec<Arc<dyn Table>>> {
308+
Err(ErrorCode::Unimplemented(format!(
309+
"'mget_tables' not implemented for catalog {}",
310+
self.name()
311+
)))
312+
}
313+
299314
// Get one table identified as dropped by db and table name.
300315
async fn get_table_history(
301316
&self,

src/query/catalog/src/database.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,21 @@ pub trait Database: DynClone + Sync + Send {
120120
)))
121121
}
122122

123+
/// Get multiple tables by names.
124+
/// Returns tables in the same order as input, skipping tables that are not found.
125+
#[async_backtrace::framed]
126+
async fn mget_tables(&self, _table_names: &[String]) -> Result<Vec<Arc<dyn Table>>> {
127+
Err(ErrorCode::Unimplemented(format!(
128+
"UnImplement mget_tables in {} Database",
129+
self.name()
130+
)))
131+
}
132+
123133
// Get one table history by db and table name.
124134
#[async_backtrace::framed]
125135
async fn get_table_history(&self, _table_name: &str) -> Result<Vec<Arc<dyn Table>>> {
126136
Err(ErrorCode::Unimplemented(format!(
127-
"UnImplement get_table in {} Database",
137+
"UnImplement get_table_history in {} Database",
128138
self.name()
129139
)))
130140
}

0 commit comments

Comments
 (0)