Skip to content

Commit b4961e7

Browse files
authored
ref(error): Improve error handling (#597)
* ref(error): Improve error handling * Improve * Improve
1 parent 8d182fb commit b4961e7

File tree

11 files changed

+536
-309
lines changed

11 files changed

+536
-309
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

etl-api/src/main.rs

Lines changed: 5 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
use anyhow::{Context, anyhow};
22
use etl_api::{config::ApiConfig, startup::Application};
3-
use etl_config::{Environment, load_config, shared::PgConnectionConfig};
3+
use etl_config::{load_config, shared::PgConnectionConfig};
44
use etl_telemetry::tracing::init_tracing;
5-
use secrecy::ExposeSecret;
65
use std::env;
7-
use std::sync::{Arc, Once};
6+
use std::sync::Once;
87
use tracing::{error, info};
98

109
/// Ensures crypto provider is only initialized once.
1110
static INIT_CRYPTO: Once = Once::new();
1211

12+
mod sentry;
13+
1314
/// Installs the default cryptographic provider for rustls.
1415
///
1516
/// Uses AWS LC cryptographic provider and ensures it's only installed once
@@ -38,7 +39,7 @@ fn main() -> anyhow::Result<()> {
3839
let _log_flusher = init_tracing(env!("CARGO_BIN_NAME"))?;
3940

4041
// Initialize Sentry before the async runtime starts
41-
let _sentry_guard = init_sentry()?;
42+
let _sentry_guard = sentry::init()?;
4243

4344
// We start the runtime.
4445
actix_web::rt::System::new().block_on(async_main())?;
@@ -86,62 +87,6 @@ async fn async_main() -> anyhow::Result<()> {
8687
Ok(())
8788
}
8889

89-
/// Initializes Sentry error tracking and performance monitoring.
90-
///
91-
/// Configures Sentry with environment-specific settings, service tagging, and a custom
92-
/// traces sampler that reduces sampling for high-frequency endpoints like `/metrics` and
93-
/// `/health_check`.
94-
/// Returns `None` if Sentry configuration is not provided.
95-
fn init_sentry() -> anyhow::Result<Option<sentry::ClientInitGuard>> {
96-
if let Ok(config) = load_config::<ApiConfig>()
97-
&& let Some(sentry_config) = &config.sentry
98-
{
99-
info!("initializing sentry with supplied dsn");
100-
101-
let environment = Environment::load()?;
102-
let guard = sentry::init(sentry::ClientOptions {
103-
dsn: Some(sentry_config.dsn.expose_secret().parse()?),
104-
environment: Some(environment.to_string().into()),
105-
traces_sampler: Some(Arc::new(|ctx: &sentry::TransactionContext| {
106-
let transaction_name = ctx.name();
107-
let endpoint = transaction_name
108-
.split_once(' ')
109-
.and_then(|(method, path)| {
110-
if path.starts_with('/') && method.chars().all(|c| c.is_ascii_uppercase()) {
111-
Some(path)
112-
} else {
113-
None
114-
}
115-
})
116-
.unwrap_or(transaction_name);
117-
118-
// Sample verbose endpoints at a much lower rate (0.1%)
119-
match endpoint {
120-
"/metrics" | "/health_check" => 0.001,
121-
// All other endpoints sampled at 1%
122-
_ => 0.01,
123-
}
124-
})),
125-
max_request_body_size: sentry::MaxRequestBodySize::Always,
126-
integrations: vec![Arc::new(
127-
sentry::integrations::panic::PanicIntegration::new(),
128-
)],
129-
..Default::default()
130-
});
131-
132-
// Set service tag to differentiate API from other services
133-
sentry::configure_scope(|scope| {
134-
scope.set_tag("service", "api");
135-
});
136-
137-
return Ok(Some(guard));
138-
}
139-
140-
info!("sentry not configured for api, skipping initialization");
141-
142-
Ok(None)
143-
}
144-
14590
fn log_pg_connection_config(config: &PgConnectionConfig) {
14691
info!(
14792
host = config.host,

etl-api/src/sentry.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use anyhow::Result;
2+
use etl_api::config::ApiConfig;
3+
use etl_config::{Environment, load_config};
4+
use secrecy::ExposeSecret;
5+
use std::sync::Arc;
6+
use tracing::info;
7+
8+
/// Initializes Sentry error tracking and performance monitoring for the API.
9+
///
10+
/// Returns [`None`] when no Sentry configuration is provided.
11+
pub fn init() -> Result<Option<sentry::ClientInitGuard>> {
12+
if let Ok(config) = load_config::<ApiConfig>()
13+
&& let Some(sentry_config) = &config.sentry
14+
{
15+
info!("initializing sentry with supplied dsn");
16+
17+
let environment = Environment::load()?;
18+
let guard = sentry::init(sentry::ClientOptions {
19+
dsn: Some(sentry_config.dsn.expose_secret().parse()?),
20+
environment: Some(environment.to_string().into()),
21+
traces_sampler: Some(Arc::new(|ctx: &sentry::TransactionContext| {
22+
sample_trace_rate(ctx)
23+
})),
24+
max_request_body_size: sentry::MaxRequestBodySize::Always,
25+
integrations: vec![Arc::new(
26+
sentry::integrations::panic::PanicIntegration::new(),
27+
)],
28+
attach_stacktrace: true,
29+
..Default::default()
30+
});
31+
32+
sentry::configure_scope(|scope| {
33+
scope.set_tag("service", "api");
34+
});
35+
36+
return Ok(Some(guard));
37+
}
38+
39+
info!("sentry not configured for api, skipping initialization");
40+
Ok(None)
41+
}
42+
43+
/// Computes the trace sampling rate based on endpoint path.
44+
fn sample_trace_rate(ctx: &sentry::TransactionContext) -> f32 {
45+
let transaction_name = ctx.name();
46+
let endpoint = transaction_name
47+
.split_once(' ')
48+
.and_then(|(method, path)| {
49+
if path.starts_with('/') && method.chars().all(|c| c.is_ascii_uppercase()) {
50+
Some(path)
51+
} else {
52+
None
53+
}
54+
})
55+
.unwrap_or(transaction_name);
56+
57+
match endpoint {
58+
"/metrics" | "/health_check" => 0.001,
59+
_ => 0.01,
60+
}
61+
}

etl-replicator/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ etl-config = { workspace = true, features = ["supabase"] }
1717
etl-destinations = { workspace = true, features = ["bigquery", "iceberg"] }
1818
etl-telemetry = { workspace = true }
1919

20-
anyhow = { workspace = true, features = ["std"] }
2120
configcat = { workspace = true }
2221
metrics = { workspace = true }
2322
reqwest = { workspace = true, features = ["rustls-tls", "json"] }

etl-replicator/src/config.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
use anyhow::Context;
21
use etl_config::load_config;
32
use etl_config::shared::ReplicatorConfig;
43

4+
use crate::error::{ReplicatorError, ReplicatorResult};
5+
56
/// Loads and validates the replicator configuration.
67
///
78
/// Uses the standard configuration loading mechanism from [`etl_config`] and
89
/// validates the resulting [`ReplicatorConfig`] before returning it.
9-
pub fn load_replicator_config() -> anyhow::Result<ReplicatorConfig> {
10-
let config = load_config::<ReplicatorConfig>().context("loading replicator configuration")?;
11-
config.validate()?;
10+
pub fn load_replicator_config() -> ReplicatorResult<ReplicatorConfig> {
11+
let config = load_config::<ReplicatorConfig>().map_err(ReplicatorError::config)?;
12+
config.validate().map_err(ReplicatorError::config)?;
1213

1314
Ok(config)
1415
}

etl-replicator/src/core.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::collections::HashMap;
22

3+
use crate::error::{ReplicatorError, ReplicatorResult};
34
use crate::migrations::migrate_state_store;
45
use etl::destination::memory::MemoryDestination;
56
use etl::pipeline::Pipeline;
@@ -31,7 +32,7 @@ use tracing::{debug, info, warn};
3132
/// destinations with proper initialization and error handling.
3233
pub async fn start_replicator_with_config(
3334
replicator_config: ReplicatorConfig,
34-
) -> anyhow::Result<()> {
35+
) -> ReplicatorResult<()> {
3536
info!("starting replicator service");
3637

3738
log_config(&replicator_config);
@@ -85,7 +86,7 @@ pub async fn start_replicator_with_config(
8586
s3_region,
8687
},
8788
} => {
88-
let env = Environment::load()?;
89+
let env = Environment::load().map_err(ReplicatorError::config)?;
8990
let client = IcebergClient::new_with_supabase_catalog(
9091
project_ref,
9192
env.get_supabase_domain(),
@@ -95,7 +96,8 @@ pub async fn start_replicator_with_config(
9596
s3_secret_access_key.expose_secret().to_string(),
9697
s3_region.clone(),
9798
)
98-
.await?;
99+
.await
100+
.map_err(ReplicatorError::config)?;
99101
let namespace = match namespace {
100102
Some(ns) => DestinationNamespace::Single(ns.to_string()),
101103
None => DestinationNamespace::OnePerSchema,
@@ -125,7 +127,8 @@ pub async fn start_replicator_with_config(
125127
s3_endpoint.clone(),
126128
),
127129
)
128-
.await?;
130+
.await
131+
.map_err(ReplicatorError::config)?;
129132
let namespace = match namespace {
130133
Some(ns) => DestinationNamespace::Single(ns.to_string()),
131134
None => DestinationNamespace::OnePerSchema,
@@ -258,7 +261,7 @@ fn log_batch_config(config: &BatchConfig) {
258261
async fn init_store(
259262
pipeline_id: PipelineId,
260263
pg_connection_config: PgConnectionConfig,
261-
) -> anyhow::Result<impl StateStore + SchemaStore + CleanupStore + Clone> {
264+
) -> ReplicatorResult<impl StateStore + SchemaStore + CleanupStore + Clone> {
262265
migrate_state_store(&pg_connection_config).await?;
263266

264267
Ok(PostgresStore::new(pipeline_id, pg_connection_config))
@@ -270,7 +273,7 @@ async fn init_store(
270273
/// and ensures proper cleanup on shutdown. The pipeline will attempt to
271274
/// finish processing current batches before terminating.
272275
#[tracing::instrument(skip(pipeline))]
273-
async fn start_pipeline<S, D>(mut pipeline: Pipeline<S, D>) -> anyhow::Result<()>
276+
async fn start_pipeline<S, D>(mut pipeline: Pipeline<S, D>) -> ReplicatorResult<()>
274277
where
275278
S: StateStore + SchemaStore + CleanupStore + Clone + Send + Sync + 'static,
276279
D: Destination + Clone + Send + Sync + 'static,
@@ -315,7 +318,7 @@ where
315318
shutdown_handle.abort();
316319
let _ = shutdown_handle.await;
317320

318-
// Propagate any pipeline error as anyhow error.
321+
// Propagate any pipeline error.
319322
result?;
320323

321324
Ok(())

0 commit comments

Comments
 (0)