Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions src/query/service/src/servers/mysql/mysql_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use futures::future::AbortHandle;
use futures::future::AbortRegistration;
use futures::future::Abortable;
use log::error;
use num_cpus;
use rustls::ServerConfig;
use socket2::TcpKeepalive;
use tokio::net::TcpStream;
Expand All @@ -42,6 +43,7 @@ pub struct MySQLHandler {
join_handle: Option<JoinHandle<()>>,
keepalive: TcpKeepalive,
tls: Option<Arc<ServerConfig>>,
query_executor: Arc<Runtime>,
}

impl MySQLHandler {
Expand All @@ -53,13 +55,19 @@ impl MySQLHandler {
let keepalive = TcpKeepalive::new()
.with_time(std::time::Duration::from_secs(tcp_keepalive_timeout_secs));
let tls = tls_config.setup()?.map(Arc::new);
let thread_num = std::cmp::max(2, num_cpus::get() / 2);
let query_executor = Arc::new(Runtime::with_worker_threads(
thread_num,
Some("mysql-query-executor".to_string()),
)?);

Ok(Box::new(MySQLHandler {
abort_handle,
abort_registration: Some(registration),
join_handle: None,
keepalive,
tls,
query_executor,
}))
}

Expand All @@ -81,18 +89,25 @@ impl MySQLHandler {
) -> impl Future<Output = ()> + use<> {
let keepalive = self.keepalive.clone();
let tls = self.tls.clone();
let query_executor = self.query_executor.clone();

stream.for_each(move |accept_socket| {
let tls = tls.clone();
let keepalive = keepalive.clone();
let executor = rt.clone();
let query_executor = query_executor.clone();
let sessions = SessionManager::instance();
async move {
match accept_socket {
Err(error) => error!("Broken session connection: {}", error),
Ok(socket) => {
MySQLHandler::accept_socket(sessions, executor, socket, keepalive, tls)
}
Ok(socket) => MySQLHandler::accept_socket(
sessions,
executor,
query_executor,
socket,
keepalive,
tls,
),
};
}
})
Expand All @@ -101,13 +116,20 @@ impl MySQLHandler {
fn accept_socket(
session_manager: Arc<SessionManager>,
executor: Arc<Runtime>,
query_executor: Arc<Runtime>,
socket: TcpStream,
keepalive: TcpKeepalive,
tls: Option<Arc<ServerConfig>>,
) {
executor.spawn(async move {
if let Err(error) =
MySQLConnection::run_on_stream(session_manager, socket, keepalive, tls).await
if let Err(error) = MySQLConnection::run_on_stream(
session_manager,
socket,
keepalive,
tls,
query_executor,
)
.await
{
error!("Unexpected error occurred during query: {:?}", error);
}
Expand Down
73 changes: 29 additions & 44 deletions src/query/service/src/servers/mysql/mysql_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ use std::io;
use std::net::Shutdown;
use std::sync::Arc;

use databend_common_base::base::tokio::io::AsyncWriteExt;
use databend_common_base::base::tokio::io::BufWriter;
use databend_common_base::base::tokio::net::TcpStream;
use databend_common_base::base::tokio::net::tcp::OwnedReadHalf;
use databend_common_base::base::tokio::net::tcp::OwnedWriteHalf;
use databend_common_base::runtime::Runtime;
use databend_common_base::runtime::Thread;
use databend_common_config::GlobalConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand All @@ -37,11 +41,6 @@ use rand::RngCore;
use rustls::ServerConfig;
use socket2::SockRef;
use socket2::TcpKeepalive;
use tokio::io::AsyncWriteExt;
use tokio::io::BufWriter;
use tokio::net::TcpStream;
use tokio::net::tcp::OwnedReadHalf;
use tokio::net::tcp::OwnedWriteHalf;

use crate::servers::mysql::MYSQL_VERSION;
use crate::servers::mysql::mysql_interactive_worker::InteractiveWorker;
Expand All @@ -59,6 +58,7 @@ impl MySQLConnection {
stream: TcpStream,
keepalive: TcpKeepalive,
tls: Option<Arc<ServerConfig>>,
query_executor: Arc<Runtime>,
) -> Result<()> {
let blocking_stream = Self::convert_stream(stream)?;
let handshake_stream = blocking_stream.try_clone()?;
Expand Down Expand Up @@ -142,10 +142,7 @@ impl MySQLConnection {

info!("MySQL connection coming: {}", client_addr);

let query_executor =
Runtime::with_worker_threads(1, Some("mysql-query-executor".to_string()))?;

Thread::spawn(move || {
query_executor.spawn(async move {
let tls_clone = tls.clone();
let interactive_worker =
InteractiveWorker::create(session.clone(), version, client_addr, salt);
Expand All @@ -154,42 +151,30 @@ impl MySQLConnection {
reject_connection_on_dbname_absence: false,
};

let join_handle = query_executor.spawn(async move {
let run_result = match (tls_clone, use_ssl) {
(Some(config), true) => {
secure_run_with_options(
interactive_worker,
writer,
opts,
config,
init_params,
)
let run_result = match (tls_clone, use_ssl) {
(Some(config), true) => {
secure_run_with_options(interactive_worker, writer, opts, config, init_params)
.await
}
_ => {
plain_run_with_options(interactive_worker, writer, opts, init_params).await
}
};

run_result.ok();

let tenant = session.get_current_tenant();
let session_id = session.get_id();
let user = session.get_current_user()?.name;
UserApiProvider::instance()
.client_session_api(&tenant)
.drop_client_session_id(&session_id, &user)
.await
.ok();
drop_all_temp_tables(
&format!("{user}/{session_id}"),
session.temp_tbl_mgr(),
"mysql",
)
.await
});
}
_ => plain_run_with_options(interactive_worker, writer, opts, init_params).await,
};

let _ = futures::executor::block_on(join_handle);
run_result.ok();

let tenant = session.get_current_tenant();
let session_id = session.get_id();
let user = session.get_current_user()?.name;
UserApiProvider::instance()
.client_session_api(&tenant)
.drop_client_session_id(&session_id, &user)
.await
.ok();
drop_all_temp_tables(
&format!("{user}/{session_id}"),
session.temp_tbl_mgr(),
"mysql",
)
.await
});
Ok(())
}
Expand Down
Loading