Skip to content

Commit 79de41b

Browse files
committed
Improve
2 parents 078ee95 + 05a91c3 commit 79de41b

File tree

8 files changed

+257
-122
lines changed

8 files changed

+257
-122
lines changed

Cargo.lock

Lines changed: 10 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ const-oid = { version = "0.9.6", default-features = false }
4646
constant_time_eq = { version = "0.4.2" }
4747
fail = { version = "0.5.1", default-features = false }
4848
futures = { version = "0.3.31", default-features = false }
49-
gcp-bigquery-client = { git = "https://github.com/iambriccardo/gcp-bigquery-client", default-features = false, rev = "a1cc7895afce36c0c86cd71bab94253fef04f05c" }
49+
gcp-bigquery-client = { git = "https://github.com/iambriccardo/gcp-bigquery-client", default-features = false, rev = "cfb449afc8eabd6743f7341f5ee13d20531e68d5" }
5050
iceberg = { version = "0.7.0", default-features = false }
5151
iceberg-catalog-rest = { version = "0.7.0", default-features = false }
5252
insta = { version = "1.43.1", default-features = false }
@@ -76,6 +76,7 @@ tikv-jemallocator = { version = "0.6.1", default-features = false, features = ["
7676
tokio = { version = "1.47.0", default-features = false }
7777
tokio-postgres = { git = "https://github.com/iambriccardo/rust-postgres", default-features = false, rev = "31acf55c7e5c2244e5bb3a36e7afa2a01bf52c38" }
7878
tokio-rustls = { version = "0.26.2", default-features = false }
79+
tonic = { version = "0.14.2", default-features = false }
7980
tracing = { version = "0.1.41", default-features = false }
8081
tracing-actix-web = { version = "0.7.19", default-features = false }
8182
tracing-appender = { version = "0.2.3", default-features = false }

etl-config/src/shared/connection.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,12 @@ pub static ETL_MIGRATION_OPTIONS: LazyLock<PgConnectionOptions> =
6868

6969
/// Connection options for logical replication streams.
7070
///
71-
/// Disables statement and idle timeouts to allow large COPY operations and long-running
71+
/// Disables statement, lock, and idle timeouts to allow large COPY operations and long-running
7272
/// transactions during initial table synchronization and WAL streaming.
73+
///
74+
/// Lock timeout is disabled because `CREATE_REPLICATION_SLOT` must wait for all in-progress
75+
/// write transactions to reach a consistent snapshot point. On heavily-loaded databases this
76+
/// can take minutes, and a timeout would only cause retries that will also fail.
7377
pub static ETL_REPLICATION_OPTIONS: LazyLock<PgConnectionOptions> =
7478
LazyLock::new(|| PgConnectionOptions {
7579
datestyle: COMMON_DATESTYLE.to_string(),
@@ -78,7 +82,7 @@ pub static ETL_REPLICATION_OPTIONS: LazyLock<PgConnectionOptions> =
7882
client_encoding: COMMON_CLIENT_ENCODING.to_string(),
7983
timezone: COMMON_TIMEZONE.to_string(),
8084
statement_timeout: 0,
81-
lock_timeout: 30_000,
85+
lock_timeout: 0,
8286
idle_in_transaction_session_timeout: 0,
8387
application_name: APP_NAME_REPLICATOR_STREAMING.to_string(),
8488
});
@@ -398,7 +402,7 @@ mod tests {
398402

399403
assert_eq!(
400404
options_string,
401-
"-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3 -c client_encoding=UTF8 -c timezone=UTC -c statement_timeout=0 -c lock_timeout=30000 -c idle_in_transaction_session_timeout=0 -c application_name=supabase_etl_replicator_streaming"
405+
"-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3 -c client_encoding=UTF8 -c timezone=UTC -c statement_timeout=0 -c lock_timeout=0 -c idle_in_transaction_session_timeout=0 -c application_name=supabase_etl_replicator_streaming"
402406
);
403407
}
404408

etl-config/src/shared/destination.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub enum DestinationConfig {
5555

5656
impl DestinationConfig {
5757
/// Default maximum number of concurrent streams for BigQuery destinations.
58-
pub const DEFAULT_MAX_CONCURRENT_STREAMS: usize = 8;
58+
pub const DEFAULT_MAX_CONCURRENT_STREAMS: usize = 2;
5959
}
6060

6161
/// Configuration for the iceberg destination with two variants

etl-destinations/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ homepage.workspace = true
1111
bigquery = [
1212
"dep:gcp-bigquery-client",
1313
"dep:prost",
14+
"dep:rand",
15+
"dep:tonic",
1416
"dep:tracing",
1517
"dep:tokio",
1618
"dep:base64",
@@ -48,10 +50,12 @@ iceberg = { workspace = true, optional = true }
4850
iceberg-catalog-rest = { workspace = true, optional = true }
4951
parquet = { workspace = true, optional = true, features = ["async", "arrow"] }
5052
prost = { workspace = true, optional = true }
53+
rand = { workspace = true, optional = true, features = ["thread_rng"] }
5154
reqwest = { workspace = true, optional = true, features = ["json"] }
5255
serde = { workspace = true, optional = true, features = ["derive"] }
5356
serde_json = { workspace = true, optional = true }
5457
tokio = { workspace = true, optional = true, features = ["sync"] }
58+
tonic = { workspace = true, optional = true }
5559
tracing = { workspace = true, optional = true, default-features = true }
5660
uuid = { workspace = true, optional = true, features = ["v4"] }
5761

etl-destinations/src/bigquery/client.rs

Lines changed: 130 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use gcp_bigquery_client::{
1313
use prost::Message;
1414
use std::fmt;
1515
use std::sync::Arc;
16+
use tonic::Code;
1617
use tracing::{debug, info};
1718

1819
use crate::bigquery::encoding::BigQueryTableRow;
@@ -410,20 +411,17 @@ impl BigQueryClient {
410411
}
411412
}
412413

413-
/// Streams table batches to BigQuery using the concurrent Storage Write API.
414+
/// Appends table batches to BigQuery using the concurrent Storage Write API.
414415
///
415-
/// Accepts pre-constructed TableBatch objects and processes them concurrently with
416-
/// controlled parallelism. This allows streaming to multiple different tables efficiently
417-
/// in a single call.
416+
/// Accepts pre-constructed TableBatch objects wrapped in Arc and processes them concurrently
417+
/// with controlled parallelism. This allows streaming to multiple different tables efficiently
418+
/// in a single call. The Arc wrapping enables efficient retry operations without cloning data.
418419
///
419420
/// If ordering is not required, you may split a table's data into multiple batches,
420421
/// which can be processed concurrently.
421422
/// If ordering guarantees are needed, all data for a given table must be included
422423
/// in a single batch.
423-
///
424-
/// TODO: we might want to improve the detection of retriable errors by having a special error
425-
/// type that we return for this.
426-
pub async fn stream_table_batches_concurrent<I>(
424+
pub async fn append_table_batches<I>(
427425
&self,
428426
table_batches: I,
429427
max_concurrent_streams: usize,
@@ -538,6 +536,15 @@ impl BigQueryClient {
538536
Ok(ResultSet::new_from_query_response(query_response))
539537
}
540538

539+
/// Releases all connections currently held in the connection pool.
540+
///
541+
/// Removes all idle connections, forcing new requests to create fresh connections.
542+
/// This is useful after DDL operations (e.g., ALTER TABLE) when BigQuery's Storage
543+
/// Write API may have stale schema information cached in existing connections.
544+
pub fn release_all_connections(&self) {
545+
self.client.storage().release_all_connections();
546+
}
547+
541548
/// Sanitizes a BigQuery identifier for safe backtick quoting.
542549
///
543550
/// Rejects empty identifiers and identifiers containing control characters. Internal backticks
@@ -873,19 +880,123 @@ fn bq_error_to_etl_error(err: BQError) -> EtlError {
873880
(ErrorKind::InvalidData, "BigQuery invalid metadata value")
874881
}
875882
BQError::TonicStatusError(status) => {
876-
// Since we do not have access to the `Code` type from `tonic`, we just match on the description
877-
// statically.
878-
if status.code().description()
879-
== "The caller does not have permission to execute the specified operation"
880-
{
881-
(ErrorKind::PermissionDenied, "BigQuery permission denied")
882-
} else if is_retryable_streaming_message(status.message()) {
883-
(
883+
// First check for schema mismatch patterns in the message, as these can occur
884+
// with various gRPC codes after DDL operations.
885+
if is_retryable_streaming_message(status.message()) {
886+
return etl_error!(
884887
ErrorKind::DestinationSchemaMismatch,
885888
"BigQuery schema mismatch",
886-
)
887-
} else {
888-
(ErrorKind::DestinationError, "BigQuery gRPC status error")
889+
err.to_string()
890+
);
891+
}
892+
893+
match status.code() {
894+
// Code::Unavailable (14) - "The service is currently unavailable."
895+
// This is the primary retryable code per Google AIP-194. It indicates transient
896+
// conditions like network hiccups or intentional throttling. BigQuery returns this
897+
// with messages like "Task is overloaded (cpu-protection)" or "(memory-protection)"
898+
// when the service is temporarily overwhelmed. Safe to retry with exponential backoff.
899+
Code::Unavailable => (ErrorKind::DestinationThrottled, "BigQuery unavailable"),
900+
901+
// Code::ResourceExhausted (8) - "Some resource has been exhausted."
902+
// Per Google AIP-194: "This code may be a signal that quota is exhausted. Retries
903+
// therefore may not be expected to work for several hours; meanwhile the retries
904+
// may have billing implications." We do NOT retry this to avoid wasting resources
905+
// on quota exhaustion that won't recover quickly.
906+
Code::ResourceExhausted => {
907+
(ErrorKind::DestinationError, "BigQuery resource exhausted")
908+
}
909+
910+
// Code::PermissionDenied (7) - "The caller does not have permission."
911+
// Authorization failure. The request will never succeed without configuration
912+
// changes (e.g., granting IAM permissions). Never retry.
913+
Code::PermissionDenied => {
914+
(ErrorKind::DestinationError, "BigQuery permission denied")
915+
}
916+
917+
// Code::Unauthenticated (16) - "Missing or invalid authentication credentials."
918+
// Authentication failure. Requires credential refresh or configuration fix.
919+
// Never retry automatically.
920+
Code::Unauthenticated => (
921+
ErrorKind::DestinationError,
922+
"BigQuery authentication failed",
923+
),
924+
925+
// Code::InvalidArgument (3) - "Client specified an invalid argument."
926+
// Malformed request or invalid data. This is a client bug that won't be fixed
927+
// by retrying. Never retry.
928+
Code::InvalidArgument => (ErrorKind::DestinationError, "BigQuery invalid argument"),
929+
930+
// Code::NotFound (5) - "Some requested entity was not found."
931+
// The resource (table, dataset, stream) doesn't exist. Requires creating the
932+
// resource first. Never retry.
933+
Code::NotFound => (
934+
ErrorKind::DestinationTableMissing,
935+
"BigQuery entity not found",
936+
),
937+
938+
// Code::AlreadyExists (6) - "The entity already exists."
939+
// Conflict during creation. For streaming with offsets, this may indicate the
940+
// row was already written (safe to ignore). Never retry.
941+
Code::AlreadyExists => (
942+
ErrorKind::DestinationTableAlreadyExists,
943+
"BigQuery entity already exists",
944+
),
945+
946+
// Code::FailedPrecondition (9) - "System is not in required state."
947+
// The operation can't proceed due to system state (e.g., non-empty table for
948+
// certain operations). Requires explicit state change before retrying.
949+
// Per gRPC spec: "Use FAILED_PRECONDITION if the client should not retry until
950+
// the system state has been explicitly fixed." Never retry automatically.
951+
Code::FailedPrecondition => {
952+
(ErrorKind::DestinationError, "BigQuery precondition failed")
953+
}
954+
955+
// Code::OutOfRange (11) - "Operation attempted past the valid range."
956+
// For streaming, this typically means the specified offset is beyond the current
957+
// end of the stream, indicating a previous write failed. Requires application-level
958+
// recovery (retry from last successful write). Never retry at this level.
959+
Code::OutOfRange => (ErrorKind::DestinationError, "BigQuery offset out of range"),
960+
961+
// Code::Aborted (10) - "The operation was aborted."
962+
// Typically due to concurrency issues (sequencer check failure, transaction abort).
963+
// Per gRPC spec: "Use ABORTED if the client should retry at a higher level."
964+
// This means retry the entire transaction, not just this request. We don't retry
965+
// here; the caller should handle transaction-level retry if needed.
966+
Code::Aborted => (ErrorKind::DestinationError, "BigQuery operation aborted"),
967+
968+
// Code::Internal (13) - "Internal server error."
969+
// Per Google AIP-194: "This error must be surfaced to the application immediately;
970+
// it usually means a bug should be filed against the system." While BigQuery docs
971+
// suggest these can be retried, AIP-194 recommends surfacing them. The underlying
972+
// client library may already retry these internally before surfacing to us.
973+
Code::Internal => (ErrorKind::DestinationError, "BigQuery internal error"),
974+
975+
// Code::DeadlineExceeded (4) - "Deadline expired before operation could complete."
976+
// Per Google AIP-194: "An application can set a deadline, which must be honored."
977+
// Retrying could violate the application's timeout expectations. The caller should
978+
// decide whether to retry with a new deadline.
979+
Code::DeadlineExceeded => {
980+
(ErrorKind::DestinationError, "BigQuery deadline exceeded")
981+
}
982+
983+
// Code::Cancelled (1) - "The operation was cancelled."
984+
// Typically client-initiated cancellation. Never retry.
985+
Code::Cancelled => (ErrorKind::DestinationError, "BigQuery operation cancelled"),
986+
987+
// Code::Unimplemented (12) - "Operation not implemented or supported."
988+
// The requested operation is not available. Never retry.
989+
Code::Unimplemented => (
990+
ErrorKind::DestinationError,
991+
"BigQuery operation not supported",
992+
),
993+
994+
// Code::DataLoss (15) - "Unrecoverable data loss or corruption."
995+
// Severe error indicating data corruption. Never retry.
996+
Code::DataLoss => (ErrorKind::DestinationError, "BigQuery data loss"),
997+
998+
// Catch-all for unexpected codes.
999+
_ => (ErrorKind::DestinationError, "BigQuery gRPC error"),
8891000
}
8901001
}
8911002

0 commit comments

Comments
 (0)