Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/actions/test_sandbox_udf/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: "Test sandbox UDF for databend query"
description: "Test sandbox Python UDF flow with mock control plane"
runs:
using: "composite"
steps:
- uses: ./.github/actions/setup_test

- name: Install jq
shell: bash
run: sudo apt-get update -yq && sudo apt-get install -yq jq

- name: Install uv
shell: bash
run: pip install uv

- uses: ./.github/actions/setup_minio

- name: Run Sandbox UDF Test
shell: bash
run: |
bash ./tests/udf/test-sandbox-udf.sh

- name: Upload failure
if: failure()
uses: ./.github/actions/artifact_failure
with:
name: test-sandbox-udf
17 changes: 17 additions & 0 deletions .github/workflows/reuse.linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,23 @@ jobs:
- uses: ./.github/actions/test_private_tasks
timeout-minutes: 20

test_sandbox_udf:
needs: [build, check]
runs-on:
- self-hosted
- "${{ inputs.runner_arch }}"
- Linux
- 8c
- "${{ inputs.runner_provider }}"
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup_license
with:
runner_provider: ${{ inputs.runner_provider }}
type: ${{ inputs.license_type }}
- uses: ./.github/actions/test_sandbox_udf
timeout-minutes: 20

test_meta_cluster:
needs: [build, check]
runs-on:
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions src/common/cloud_control/proto/udf.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
syntax = "proto3";
option go_package = "databend.com/cloudcontrol/proto";

package udfproto;

message UdfAsset {
string location = 1;
string url = 2;
// Content tag for caching (e.g. etag or content length), independent of presigned URL.
string tag = 3;
}

message UdfRuntimeSpec {
string language = 1;
string handler = 2;
repeated string input_types = 3;
string result_type = 4;
repeated string imports = 5;
repeated string packages = 6;
string runtime_version = 7;
string code = 8;
}

message ApplyUdfResourceRequest {
string type = 3;
string script = 4;
}

message ApplyUdfResourceResponse {
string endpoint = 1;
map<string, string> headers = 2;
}

service UdfService {
rpc ApplyUdfResource(ApplyUdfResourceRequest) returns (ApplyUdfResourceResponse);
}
7 changes: 7 additions & 0 deletions src/common/cloud_control/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ impl ClientConfig {
crate::notification_client::NOTIFICATION_CLIENT_VERSION,
);
}

pub fn add_udf_version_info(&mut self) {
self.add_metadata(
crate::udf_client::UDF_CLIENT_VERSION_NAME,
crate::udf_client::UDF_CLIENT_VERSION,
);
}
pub fn get_metadata(&self) -> &Vec<(String, String)> {
&self.metadata
}
Expand Down
10 changes: 9 additions & 1 deletion src/common/cloud_control/src/cloud_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ use databend_common_exception::Result;

use crate::notification_client::NotificationClient;
use crate::task_client::TaskClient;
use crate::udf_client::UdfClient;

pub const CLOUD_REQUEST_TIMEOUT_SEC: u64 = 5; // 5 seconds

pub struct CloudControlApiProvider {
pub task_client: Arc<TaskClient>,
pub notification_client: Arc<NotificationClient>,
pub udf_client: Arc<UdfClient>,
pub timeout: Duration,
}

Expand All @@ -41,10 +43,12 @@ impl CloudControlApiProvider {
let endpoint = Self::get_endpoint(endpoint, timeout).await?;
let channel = endpoint.connect_lazy();
let task_client = TaskClient::new(channel.clone()).await?;
let notification_client = NotificationClient::new(channel).await?;
let notification_client = NotificationClient::new(channel.clone()).await?;
let udf_client = UdfClient::new(channel).await?;
Ok(Arc::new(CloudControlApiProvider {
task_client,
notification_client,
udf_client,
timeout,
}))
}
Expand Down Expand Up @@ -84,6 +88,10 @@ impl CloudControlApiProvider {
pub fn get_notification_client(&self) -> Arc<NotificationClient> {
self.notification_client.clone()
}

pub fn get_udf_client(&self) -> Arc<UdfClient> {
self.udf_client.clone()
}
pub fn get_timeout(&self) -> Duration {
self.timeout
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/cloud_control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod notification_client;
pub mod notification_utils;
pub mod task_client;
pub mod task_utils;
pub mod udf_client;

#[allow(clippy::derive_partial_eq_without_eq)]
#[allow(clippy::large_enum_variant)]
Expand All @@ -26,6 +27,7 @@ pub mod pb {
// taskproto is proto package name.
tonic::include_proto!("taskproto");
tonic::include_proto!("notificationproto");
tonic::include_proto!("udfproto");
}

pub mod utils {
Expand Down
46 changes: 46 additions & 0 deletions src/common/cloud_control/src/udf_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::Result;
use tonic::Request;
use tonic::transport::Channel;

use crate::pb::ApplyUdfResourceRequest;
use crate::pb::ApplyUdfResourceResponse;
use crate::pb::udf_service_client::UdfServiceClient;

pub(crate) const UDF_CLIENT_VERSION: &str = "v1";
pub(crate) const UDF_CLIENT_VERSION_NAME: &str = "UDF_CLIENT_VERSION";

pub struct UdfClient {
pub udf_client: UdfServiceClient<Channel>,
}

impl UdfClient {
pub async fn new(channel: Channel) -> Result<Arc<UdfClient>> {
let udf_client = UdfServiceClient::new(channel);
Ok(Arc::new(UdfClient { udf_client }))
}

pub async fn apply_udf_resource(
&self,
req: Request<ApplyUdfResourceRequest>,
) -> Result<ApplyUdfResourceResponse> {
let mut client = self.udf_client.clone();
let resp = client.apply_udf_resource(req).await?;
Ok(resp.into_inner())
}
}
5 changes: 5 additions & 0 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1932,6 +1932,9 @@ pub struct QueryConfig {
#[clap(long, value_name = "VALUE", default_value = "true")]
pub enable_udf_wasm_script: bool,

#[clap(long, value_name = "VALUE", default_value = "false")]
pub enable_udf_sandbox: bool,

#[clap(long, value_name = "VALUE", default_value = "false")]
pub enable_udf_server: bool,

Expand Down Expand Up @@ -2054,6 +2057,7 @@ impl TryInto<InnerQueryConfig> for QueryConfig {
enable_udf_python_script: self.enable_udf_python_script,
enable_udf_js_script: self.enable_udf_js_script,
enable_udf_wasm_script: self.enable_udf_wasm_script,
enable_udf_sandbox: self.enable_udf_sandbox,
udf_server_allow_list: self.udf_server_allow_list,
udf_server_allow_insecure: self.udf_server_allow_insecure,
cloud_control_grpc_server_address: self.cloud_control_grpc_server_address,
Expand Down Expand Up @@ -2161,6 +2165,7 @@ impl From<InnerQueryConfig> for QueryConfig {
enable_udf_python_script: inner.enable_udf_python_script,
enable_udf_js_script: inner.enable_udf_js_script,
enable_udf_wasm_script: inner.enable_udf_wasm_script,
enable_udf_sandbox: inner.enable_udf_sandbox,

enable_udf_server: inner.enable_udf_server,
udf_server_allow_list: inner.udf_server_allow_list,
Expand Down
2 changes: 2 additions & 0 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ pub struct QueryConfig {
pub enable_udf_python_script: bool,
pub enable_udf_js_script: bool,
pub enable_udf_wasm_script: bool,
pub enable_udf_sandbox: bool,

pub enable_udf_server: bool,
pub udf_server_allow_list: Vec<String>,
Expand Down Expand Up @@ -336,6 +337,7 @@ impl Default for QueryConfig {
enable_udf_js_script: true,
enable_udf_python_script: true,
enable_udf_wasm_script: true,
enable_udf_sandbox: false,

enable_udf_server: false,
udf_server_allow_list: Vec::new(),
Expand Down
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,13 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("udf_cloud_import_presign_expire_secs", DefaultSettingValue {
value: UserSettingValue::UInt64(3600),
desc: "Presign expiry for cloud UDF stage imports",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(1..=u64::MAX)),
}),
("external_server_request_batch_rows", DefaultSettingValue {
value: UserSettingValue::UInt64(65536),
desc: "Request batch rows to external server",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,10 @@ impl Settings {
self.try_get_u64("external_server_request_timeout_secs")
}

pub fn get_udf_cloud_import_presign_expire_secs(&self) -> Result<u64> {
self.try_get_u64("udf_cloud_import_presign_expire_secs")
}

pub fn get_external_server_request_batch_rows(&self) -> Result<u64> {
self.try_get_u64("external_server_request_batch_rows")
}
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ storage-hdfs = ["databend-common-config/storage-hdfs"]
databend-common-ast = { workspace = true }
databend-common-base = { workspace = true }
databend-common-catalog = { workspace = true }
databend-common-cloud-control = { workspace = true }
databend-common-compress = { workspace = true }
databend-common-config = { workspace = true }
databend-common-exception = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion src/query/sql/src/planner/binder/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ impl Binder {
packages,
immutable,
} => {
UDFValidator::is_udf_script_allowed(&language.parse()?)?;
let parsed_language = language.parse::<UDFLanguage>()?;
UDFValidator::is_udf_script_allowed(&parsed_language)?;
let definition = create_udf_definition_script(
arg_types,
None,
Expand Down
20 changes: 19 additions & 1 deletion src/query/sql/src/planner/expression/udf_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ impl UDFValidator {
UDFLanguage::JavaScript if GlobalConfig::instance().query.enable_udf_js_script => {
Ok(())
}
UDFLanguage::Python if GlobalConfig::instance().query.enable_udf_python_script => {
UDFLanguage::Python
if GlobalConfig::instance().query.enable_udf_python_script
|| GlobalConfig::instance().query.enable_udf_sandbox =>
{
Ok(())
}
UDFLanguage::WebAssembly if GlobalConfig::instance().query.enable_udf_wasm_script => {
Expand All @@ -112,6 +115,21 @@ impl UDFValidator {
}
}

pub fn is_udf_cloud_script_allowed(lang: &UDFLanguage) -> Result<()> {
if !GlobalConfig::instance().query.enable_udf_sandbox {
return Err(ErrorCode::Unimplemented(
"SandboxUDF is not enabled, you can enable it by setting 'enable_udf_sandbox = true' in query node config",
));
}
// TODO: more lang e.g. JavaScript ..
if *lang != UDFLanguage::Python {
return Err(ErrorCode::InvalidArgument(
"SandboxUDF only supports python language",
));
}
Ok(())
}

pub fn is_udf_server_allowed(address: &str) -> Result<()> {
if !GlobalConfig::instance().query.enable_udf_server {
return Err(ErrorCode::Unimplemented(
Expand Down
Loading
Loading