Skip to content

Commit b64d447

Browse files
viiryaclaude
andcommitted
feat(datafusion): Add Timestamp scalar value conversion for predicate pushdown
Add support for converting DataFusion Timestamp scalar values to Iceberg Datum for predicate pushdown. This enables timestamp literal comparisons to be pushed down to the storage layer when DataFusion provides timestamp literals in ScalarValue form, improving query performance. Changes: - Add conversion for all four timestamp time units (Second, Millisecond, Microsecond, Nanosecond) to Iceberg's native microsecond representation - Convert timestamps to Datum::timestamp_micros with appropriate scaling: * TimestampSecond: multiply by 1,000,000 * TimestampMillisecond: multiply by 1,000 * TimestampMicrosecond: use directly * TimestampNanosecond: divide by 1,000 (truncates sub-microsecond precision) - Add comprehensive unit tests covering all time units and edge cases - Add sqllogictest (timestamp_predicate_pushdown.slt) to validate end-to-end timestamp predicate filtering and pushdown behavior - Create test_timestamp_table in sqllogictest engine setup - Update show_tables.slt to include test_timestamp_table Implementation details: - Iceberg uses microseconds as the native timestamp representation - Nanosecond timestamps lose sub-microsecond precision when converted - Timezone information in ScalarValue is preserved but not used in conversion (Iceberg timestamp type is timezone-agnostic) - Currently DataFusion casts timestamp literals to strings in predicates, but this implementation provides the foundation for direct timestamp literal support when DataFusion evolves to provide timestamp ScalarValues Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent ffe355b commit b64d447

File tree

5 files changed

+228
-0
lines changed

5 files changed

+228
-0
lines changed

crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ fn reverse_predicate_operator(op: PredicateOperator) -> PredicateOperator {
201201
}
202202

203203
const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000;
204+
const MICROS_PER_SECOND: i64 = 1_000_000;
205+
const NANOS_PER_MICRO: i64 = 1_000;
206+
const MICROS_PER_MILLI: i64 = 1_000;
207+
204208
/// Convert a scalar value to an iceberg datum.
205209
fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
206210
match value {
@@ -216,6 +220,17 @@ fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
216220
ScalarValue::LargeBinary(Some(v)) => Some(Datum::binary(v.clone())),
217221
ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)),
218222
ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)),
223+
// Timestamp conversions - convert to microseconds (Iceberg's native timestamp unit)
224+
ScalarValue::TimestampSecond(Some(v), _) => {
225+
Some(Datum::timestamp_micros(v * MICROS_PER_SECOND))
226+
}
227+
ScalarValue::TimestampMillisecond(Some(v), _) => {
228+
Some(Datum::timestamp_micros(v * MICROS_PER_MILLI))
229+
}
230+
ScalarValue::TimestampMicrosecond(Some(v), _) => Some(Datum::timestamp_micros(*v)),
231+
ScalarValue::TimestampNanosecond(Some(v), _) => {
232+
Some(Datum::timestamp_micros(v / NANOS_PER_MICRO))
233+
}
219234
_ => None,
220235
}
221236
}
@@ -432,6 +447,51 @@ mod tests {
432447
assert_eq!(predicate, None);
433448
}
434449

450+
#[test]
451+
fn test_scalar_value_to_datum_timestamp() {
452+
use datafusion::common::ScalarValue;
453+
454+
// Test TimestampSecond - convert to microseconds
455+
let ts_seconds = 1672876800i64; // 2023-01-05 00:00:00 UTC in seconds
456+
let datum = super::scalar_value_to_datum(&ScalarValue::TimestampSecond(
457+
Some(ts_seconds),
458+
None,
459+
));
460+
assert_eq!(
461+
datum,
462+
Some(Datum::timestamp_micros(ts_seconds * 1_000_000))
463+
);
464+
465+
// Test TimestampMillisecond - convert to microseconds
466+
let ts_millis = 1672876800000i64; // 2023-01-05 00:00:00 UTC in milliseconds
467+
let datum = super::scalar_value_to_datum(&ScalarValue::TimestampMillisecond(
468+
Some(ts_millis),
469+
None,
470+
));
471+
assert_eq!(datum, Some(Datum::timestamp_micros(ts_millis * 1_000)));
472+
473+
// Test TimestampMicrosecond - use directly
474+
let ts_micros = 1672876800000000i64; // 2023-01-05 00:00:00 UTC in microseconds
475+
let datum = super::scalar_value_to_datum(&ScalarValue::TimestampMicrosecond(
476+
Some(ts_micros),
477+
None,
478+
));
479+
assert_eq!(datum, Some(Datum::timestamp_micros(ts_micros)));
480+
481+
// Test TimestampNanosecond - convert to microseconds (truncate)
482+
let ts_nanos = 1672876800000000500i64; // 2023-01-05 00:00:00.000000500 UTC in nanoseconds
483+
let datum = super::scalar_value_to_datum(&ScalarValue::TimestampNanosecond(
484+
Some(ts_nanos),
485+
None,
486+
));
487+
// Nanoseconds are truncated when converting to microseconds
488+
assert_eq!(datum, Some(Datum::timestamp_micros(1672876800000000)));
489+
490+
// Test None timestamp
491+
let datum = super::scalar_value_to_datum(&ScalarValue::TimestampMicrosecond(None, None));
492+
assert_eq!(datum, None);
493+
}
494+
435495
#[test]
436496
fn test_scalar_value_to_datum_binary() {
437497
use datafusion::common::ScalarValue;

crates/sqllogictest/src/engine/datafusion.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ impl DataFusionEngine {
9696
// Create partitioned test table (unpartitioned tables are now created via SQL)
9797
Self::create_partitioned_table(&catalog, &namespace).await?;
9898
Self::create_binary_table(&catalog, &namespace).await?;
99+
Self::create_timestamp_table(&catalog, &namespace).await?;
99100

100101
Ok(Arc::new(
101102
IcebergCatalogProvider::try_new(Arc::new(catalog)).await?,
@@ -162,4 +163,31 @@ impl DataFusionEngine {
162163

163164
Ok(())
164165
}
166+
167+
/// Create a test table with timestamp type column
168+
/// Used for testing timestamp predicate pushdown
169+
/// TODO: this can be removed when we support CREATE TABLE
170+
async fn create_timestamp_table(
171+
catalog: &impl Catalog,
172+
namespace: &NamespaceIdent,
173+
) -> anyhow::Result<()> {
174+
let schema = Schema::builder()
175+
.with_fields(vec![
176+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
177+
NestedField::optional(2, "ts", Type::Primitive(PrimitiveType::Timestamp)).into(),
178+
])
179+
.build()?;
180+
181+
catalog
182+
.create_table(
183+
namespace,
184+
TableCreation::builder()
185+
.name("test_timestamp_table".to_string())
186+
.schema(schema)
187+
.build(),
188+
)
189+
.await?;
190+
191+
Ok(())
192+
}
165193
}

crates/sqllogictest/testdata/schedules/df_test.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ slt = "df_test/insert_into.slt"
3434
engine = "df"
3535
slt = "df_test/binary_predicate_pushdown.slt"
3636

37+
[[steps]]
38+
engine = "df"
39+
slt = "df_test/timestamp_predicate_pushdown.slt"
40+
3741
[[steps]]
3842
engine = "df"
3943
slt = "df_test/drop_table.slt"

crates/sqllogictest/testdata/slts/df_test/show_tables.slt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ default default test_binary_table$snapshots BASE TABLE
3131
default default test_partitioned_table BASE TABLE
3232
default default test_partitioned_table$manifests BASE TABLE
3333
default default test_partitioned_table$snapshots BASE TABLE
34+
default default test_timestamp_table BASE TABLE
35+
default default test_timestamp_table$manifests BASE TABLE
36+
default default test_timestamp_table$snapshots BASE TABLE
3437
default information_schema columns VIEW
3538
default information_schema df_settings VIEW
3639
default information_schema parameters VIEW
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# Test timestamp predicate pushdown
19+
# This validates that timestamp predicates are pushed down to the storage layer
20+
# Note: Timestamp scalar value conversion to Datum::timestamp_micros supports
21+
# direct timestamp literals when DataFusion provides them in ScalarValue form
22+
23+
# Insert test data with timestamps
24+
# Note: Iceberg timestamps are stored in microseconds
25+
# We use CAST to convert string timestamps to proper timestamp values
26+
query I
27+
INSERT INTO default.default.test_timestamp_table
28+
VALUES
29+
(1, CAST('2023-01-01 00:00:00' AS TIMESTAMP)),
30+
(2, CAST('2023-01-05 12:30:00' AS TIMESTAMP)),
31+
(3, CAST('2023-01-10 15:45:30' AS TIMESTAMP)),
32+
(4, CAST('2023-01-15 09:00:00' AS TIMESTAMP)),
33+
(5, CAST('2023-01-20 18:20:10' AS TIMESTAMP))
34+
----
35+
5
36+
37+
# Verify timestamp equality predicate is pushed down to IcebergTableScan
38+
query TT
39+
EXPLAIN SELECT * FROM default.default.test_timestamp_table WHERE ts = CAST('2023-01-05 12:30:00' AS TIMESTAMP)
40+
----
41+
logical_plan
42+
01)Filter: default.default.test_timestamp_table.ts = TimestampMicrosecond(1672921800000000, None)
43+
02)--TableScan: default.default.test_timestamp_table projection=[id, ts], partial_filters=[default.default.test_timestamp_table.ts = TimestampMicrosecond(1672921800000000, None)]
44+
physical_plan
45+
01)CoalesceBatchesExec: target_batch_size=8192
46+
02)--FilterExec: ts@1 = 1672921800000000
47+
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
48+
04)------CooperativeExec
49+
05)--------IcebergTableScan projection:[id,ts] predicate:[ts = 2023-01-05 12:30:00]
50+
51+
# Verify timestamp equality filtering works
52+
query I?
53+
SELECT * FROM default.default.test_timestamp_table WHERE ts = CAST('2023-01-05 12:30:00' AS TIMESTAMP)
54+
----
55+
2 2023-01-05T12:30:00
56+
57+
# Verify timestamp greater than predicate is pushed down to IcebergTableScan
58+
query TT
59+
EXPLAIN SELECT * FROM default.default.test_timestamp_table WHERE ts > CAST('2023-01-10 00:00:00' AS TIMESTAMP)
60+
----
61+
logical_plan
62+
01)Filter: default.default.test_timestamp_table.ts > TimestampMicrosecond(1673308800000000, None)
63+
02)--TableScan: default.default.test_timestamp_table projection=[id, ts], partial_filters=[default.default.test_timestamp_table.ts > TimestampMicrosecond(1673308800000000, None)]
64+
physical_plan
65+
01)CoalesceBatchesExec: target_batch_size=8192
66+
02)--FilterExec: ts@1 > 1673308800000000
67+
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
68+
04)------CooperativeExec
69+
05)--------IcebergTableScan projection:[id,ts] predicate:[ts > 2023-01-10 00:00:00]
70+
71+
# Verify timestamp greater than filtering
72+
query I? rowsort
73+
SELECT * FROM default.default.test_timestamp_table WHERE ts > CAST('2023-01-10 00:00:00' AS TIMESTAMP)
74+
----
75+
3 2023-01-10T15:45:30
76+
4 2023-01-15T09:00:00
77+
5 2023-01-20T18:20:10
78+
79+
# Test timestamp less than or equal filtering
80+
query I? rowsort
81+
SELECT * FROM default.default.test_timestamp_table WHERE ts <= CAST('2023-01-05 12:30:00' AS TIMESTAMP)
82+
----
83+
1 2023-01-01T00:00:00
84+
2 2023-01-05T12:30:00
85+
86+
# Verify timestamp range predicate (AND of two comparisons) is pushed down
87+
query TT
88+
EXPLAIN SELECT * FROM default.default.test_timestamp_table
89+
WHERE ts >= CAST('2023-01-05 00:00:00' AS TIMESTAMP)
90+
AND ts <= CAST('2023-01-15 23:59:59' AS TIMESTAMP)
91+
----
92+
logical_plan
93+
01)Filter: default.default.test_timestamp_table.ts >= TimestampMicrosecond(1672876800000000, None) AND default.default.test_timestamp_table.ts <= TimestampMicrosecond(1673827199000000, None)
94+
02)--TableScan: default.default.test_timestamp_table projection=[id, ts], partial_filters=[default.default.test_timestamp_table.ts >= TimestampMicrosecond(1672876800000000, None), default.default.test_timestamp_table.ts <= TimestampMicrosecond(1673827199000000, None)]
95+
physical_plan
96+
01)CoalesceBatchesExec: target_batch_size=8192
97+
02)--FilterExec: ts@1 >= 1672876800000000 AND ts@1 <= 1673827199000000
98+
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
99+
04)------CooperativeExec
100+
05)--------IcebergTableScan projection:[id,ts] predicate:[(ts >= 2023-01-05 00:00:00) AND (ts <= 2023-01-15 23:59:59)]
101+
102+
# Test timestamp range predicate filtering
103+
query I? rowsort
104+
SELECT * FROM default.default.test_timestamp_table
105+
WHERE ts >= CAST('2023-01-05 00:00:00' AS TIMESTAMP)
106+
AND ts <= CAST('2023-01-15 23:59:59' AS TIMESTAMP)
107+
----
108+
2 2023-01-05T12:30:00
109+
3 2023-01-10T15:45:30
110+
4 2023-01-15T09:00:00
111+
112+
# Test timestamp predicate combined with other predicates
113+
query I? rowsort
114+
SELECT * FROM default.default.test_timestamp_table
115+
WHERE ts >= CAST('2023-01-10 00:00:00' AS TIMESTAMP) AND id < 5
116+
----
117+
3 2023-01-10T15:45:30
118+
4 2023-01-15T09:00:00
119+
120+
# Test timestamp NOT EQUAL predicate
121+
query I? rowsort
122+
SELECT * FROM default.default.test_timestamp_table WHERE ts != CAST('2023-01-05 12:30:00' AS TIMESTAMP)
123+
----
124+
1 2023-01-01T00:00:00
125+
3 2023-01-10T15:45:30
126+
4 2023-01-15T09:00:00
127+
5 2023-01-20T18:20:10
128+
129+
# Test timestamp less than filtering
130+
query I? rowsort
131+
SELECT * FROM default.default.test_timestamp_table WHERE ts < CAST('2023-01-05 00:00:00' AS TIMESTAMP)
132+
----
133+
1 2023-01-01T00:00:00

0 commit comments

Comments
 (0)