Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5540,6 +5540,7 @@ def write_dataset(
data_storage_version: Optional[
Literal["stable", "2.0", "2.1", "2.2", "next", "legacy", "0.1"]
] = None,
blob_version: Optional[Literal["v1", "v2"]] = None,
use_legacy_format: Optional[bool] = None,
enable_v2_manifest_paths: bool = True,
enable_stable_row_ids: bool = False,
Expand Down Expand Up @@ -5802,6 +5803,9 @@ def write_dataset(
"target_bases": target_bases,
}

if blob_version is not None:
params["blob_version"] = blob_version

# Add storage_options_provider if created from namespace
if storage_options_provider is not None:
params["storage_options_provider"] = storage_options_provider
Expand Down
12 changes: 10 additions & 2 deletions python/python/tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,12 @@ def test_scan_blob(tmp_path, dataset_with_blobs):

def test_blob_extension_write_inline(tmp_path):
table = pa.table({"blob": lance.blob_array([b"foo", b"bar"])})
ds = lance.write_dataset(table, tmp_path / "test_ds_v2", data_storage_version="2.2")
ds = lance.write_dataset(
table,
tmp_path / "test_ds_v2",
data_storage_version="2.2",
blob_version="v2",
)

desc = ds.to_table(columns=["blob"]).column("blob").chunk(0)
assert pa.types.is_struct(desc.type)
Expand All @@ -276,7 +281,10 @@ def test_blob_extension_write_external(tmp_path):

table = pa.table({"blob": lance.blob_array([uri])})
ds = lance.write_dataset(
table, tmp_path / "test_ds_v2_external", data_storage_version="2.2"
table,
tmp_path / "test_ds_v2_external",
data_storage_version="2.2",
blob_version="v2",
)

blob = ds.take_blobs("blob", indices=[0])[0]
Expand Down
14 changes: 14 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use lance::index::vector::utils::get_vector_type;
use lance::index::{vector::VectorIndexParams, DatasetIndexInternalExt};
use lance::{dataset::builder::DatasetBuilder, index::vector::IndexFileVersion};
use lance_arrow::as_fixed_size_list_array;
use lance_core::datatypes::BlobVersion;
use lance_core::Error;
use lance_datafusion::utils::reader_to_stream;
use lance_encoding::decoder::DecoderConfig;
Expand Down Expand Up @@ -3087,6 +3088,9 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult<Option<WritePar
{
p.data_storage_version = Some(data_storage_version.parse().infer_error()?);
}
if let Some(blob_version) = get_dict_opt::<String>(options, "blob_version")? {
p.blob_version = Some(parse_blob_version(&blob_version)?);
}
if let Some(progress) = get_dict_opt::<Py<PyAny>>(options, "progress")? {
p.progress = Arc::new(PyWriteProgress::new(progress.into_py_any(options.py())?));
}
Expand Down Expand Up @@ -3189,6 +3193,16 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult<Option<WritePar
Ok(params)
}

fn parse_blob_version(value: &str) -> PyResult<BlobVersion> {
match value.to_lowercase().as_str() {
"v1" | "1" => Ok(BlobVersion::V1),
"v2" | "2" => Ok(BlobVersion::V2),
_ => Err(PyValueError::new_err(format!(
"Invalid blob_version: {value} (expected 'v1' or 'v2')"
))),
}
}

fn prepare_vector_index_params(
index_type: &str,
column_type: &DataType,
Expand Down
7 changes: 7 additions & 0 deletions rust/lance-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,13 @@ fn project_array(array: &ArrayRef, target_field: &Field) -> Result<ArrayRef> {
}

fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
if struct_array.fields().len() != struct_array.columns().len() {
return Err(ArrowError::SchemaError(format!(
"Invalid StructArray: {} fields but {} columns",
struct_array.fields().len(),
struct_array.columns().len()
)));
}
if fields.is_empty() {
return Ok(StructArray::new_empty_fields(
struct_array.len(),
Expand Down
91 changes: 73 additions & 18 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use arrow_array::{Array, ArrayRef, RecordBatch};
use arrow_schema::DataType;
use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use lance_core::datatypes::{Field, Schema};
use lance_core::datatypes::{BlobVersion, Field, Schema};
use lance_core::error::LanceOptionExt;
use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
use lance_core::{Error, Result};
Expand Down Expand Up @@ -302,18 +302,26 @@ pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEnco
pub fn default_encoding_strategy_with_params(
version: LanceFileVersion,
params: CompressionParams,
blob_version: BlobVersion,
) -> Result<Box<dyn FieldEncodingStrategy>> {
match version.resolve() {
LanceFileVersion::Legacy | LanceFileVersion::V2_0 => Err(Error::invalid_input(
"Compression parameters are only supported in Lance file version 2.1 and later",
location!(),
)),
_ => {
if blob_version != BlobVersion::V1 && version < LanceFileVersion::V2_2 {
return Err(Error::InvalidInput {
source: "Blob version v2 requires file version >= 2.2".into(),
location: location!(),
});
}
let compression_strategy =
Arc::new(DefaultCompressionStrategy::with_params(params).with_version(version));
Ok(Box::new(StructuralEncodingStrategy {
compression_strategy,
version,
blob_version,
}))
}
}
Expand All @@ -324,6 +332,7 @@ pub fn default_encoding_strategy_with_params(
pub struct StructuralEncodingStrategy {
pub compression_strategy: Arc<dyn CompressionStrategy>,
pub version: LanceFileVersion,
pub blob_version: BlobVersion,
}

// For some reason, clippy thinks we can add Default to the above derive but
Expand All @@ -334,6 +343,7 @@ impl Default for StructuralEncodingStrategy {
Self {
compression_strategy: Arc::new(DefaultCompressionStrategy::new()),
version: LanceFileVersion::default(),
blob_version: BlobVersion::V1,
}
}
}
Expand All @@ -343,6 +353,18 @@ impl StructuralEncodingStrategy {
Self {
compression_strategy: Arc::new(DefaultCompressionStrategy::new().with_version(version)),
version,
blob_version: BlobVersion::V1,
}
}

pub fn with_version_and_blob_version(
version: LanceFileVersion,
blob_version: BlobVersion,
) -> Self {
Self {
compression_strategy: Arc::new(DefaultCompressionStrategy::new().with_version(version)),
version,
blob_version,
}
}

Expand Down Expand Up @@ -394,6 +416,36 @@ impl StructuralEncodingStrategy {

// Check if field is marked as blob
if field.is_blob() {
if self.blob_version == BlobVersion::V2 && self.version < LanceFileVersion::V2_2 {
return Err(Error::InvalidInput {
source: "Blob v2 requires file version >= 2.2".into(),
location: location!(),
});
}

if self.blob_version == BlobVersion::V2 {
match data_type {
DataType::Binary | DataType::LargeBinary | DataType::Struct(_) => {
return Ok(Box::new(BlobV2StructuralEncoder::new(
field,
column_index.next_column_index(field.id as u32),
options,
self.compression_strategy.clone(),
)?));
}
_ => {
return Err(Error::InvalidInput {
source: format!(
"Blob encoding only supports Binary/LargeBinary or Struct, got {}",
data_type
)
.into(),
location: location!(),
});
}
}
}

match data_type {
DataType::Binary | DataType::LargeBinary => {
return Ok(Box::new(BlobStructuralEncoder::new(
Expand All @@ -403,24 +455,16 @@ impl StructuralEncodingStrategy {
self.compression_strategy.clone(),
)?));
}
DataType::Struct(_) if self.version >= LanceFileVersion::V2_2 => {
return Ok(Box::new(BlobV2StructuralEncoder::new(
field,
column_index.next_column_index(field.id as u32),
options,
self.compression_strategy.clone(),
)?));
}
DataType::Struct(_) => {
return Err(Error::InvalidInput {
source: "Blob v2 struct input requires file version >= 2.2".into(),
source: "Blob struct input requires blob version v2".into(),
location: location!(),
});
}
_ => {
return Err(Error::InvalidInput {
source: format!(
"Blob encoding only supports Binary/LargeBinary or v2 Struct, got {}",
"Blob encoding only supports Binary/LargeBinary, got {}",
data_type
)
.into(),
Expand Down Expand Up @@ -807,24 +851,35 @@ mod tests {
);

// Test with V2.1 - should succeed
let strategy =
default_encoding_strategy_with_params(LanceFileVersion::V2_1, params.clone())
.expect("Should succeed for V2.1");
let strategy = default_encoding_strategy_with_params(
LanceFileVersion::V2_1,
params.clone(),
BlobVersion::V1,
)
.expect("Should succeed for V2.1");

// Verify it's a StructuralEncodingStrategy
assert!(format!("{:?}", strategy).contains("StructuralEncodingStrategy"));
assert!(format!("{:?}", strategy).contains("DefaultCompressionStrategy"));

// Test with V2.0 - should fail
let err = default_encoding_strategy_with_params(LanceFileVersion::V2_0, params.clone())
.expect_err("Should fail for V2.0");
let err = default_encoding_strategy_with_params(
LanceFileVersion::V2_0,
params.clone(),
BlobVersion::V1,
)
.expect_err("Should fail for V2.0");
assert!(err
.to_string()
.contains("only supported in Lance file version 2.1"));

// Test with Legacy - should fail
let err = default_encoding_strategy_with_params(LanceFileVersion::Legacy, params)
.expect_err("Should fail for Legacy");
let err = default_encoding_strategy_with_params(
LanceFileVersion::Legacy,
params,
BlobVersion::V1,
)
.expect_err("Should fail for Legacy");
assert!(err
.to_string()
.contains("only supported in Lance file version 2.1"));
Expand Down
Loading
Loading