Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ sentry_protos = "0.4.11"
serde = "1.0.214"
serde_yaml = "0.9.34"
sha2 = "0.10.8"
sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "chrono"] }
sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "chrono", "postgres"] }
tokio = { version = "1.43.1", features = ["full"] }
tokio-stream = { version = "0.1.16", features = ["full"] }
tokio-util = "0.7.12"
Expand All @@ -62,6 +62,7 @@ uuid = { version = "1.11.0", features = ["v4"] }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_tokio"] }
rstest = "0.23"

[[bench]]
name = "store_bench"
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# recent enough version of protobuf-compiler
FROM rust:1-bookworm AS build

RUN apt-get update && apt-get upgrade -y
RUN apt-get update && apt-get upgrade -y
RUN apt-get install -y cmake pkg-config libssl-dev librdkafka-dev protobuf-compiler

RUN USER=root cargo new --bin taskbroker
Expand All @@ -17,6 +17,7 @@ ENV TASKBROKER_VERSION=$TASKBROKER_GIT_REVISION
COPY ./Cargo.lock ./Cargo.lock
COPY ./Cargo.toml ./Cargo.toml
COPY ./migrations ./migrations
COPY ./pg_migrations ./pg_migrations
COPY ./benches ./benches

# Build dependencies in a way they can be cached
Expand Down
12 changes: 9 additions & 3 deletions devservices/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ x-sentry-service-config:
repo_name: sentry-shared-redis
branch: main
repo_link: https://github.com/getsentry/sentry-shared-redis.git
postgres:
description: Shared instance of postgres used by sentry services
remote:
repo_name: sentry-shared-postgres
branch: main
repo_link: https://github.com/getsentry/sentry-shared-postgres.git
taskbroker:
description: Taskbroker service
modes:
default: [kafka]
client: [kafka, redis]
containerized: [kafka, redis, taskbroker]
default: [kafka, postgres]
client: [kafka, redis, postgres]
containerized: [kafka, redis, postgres, taskbroker]

x-programs:
devserver:
Expand Down
20 changes: 20 additions & 0 deletions pg_migrations/0001_create_inflight_activations.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- PostgreSQL equivalent of the inflight_taskactivations table
CREATE TABLE IF NOT EXISTS inflight_taskactivations (
id TEXT NOT NULL PRIMARY KEY,
activation BYTEA NOT NULL,
partition INTEGER NOT NULL,
kafka_offset BIGINT NOT NULL,
added_at TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ NOT NULL,
processing_attempts INTEGER NOT NULL,
expires_at TIMESTAMPTZ,
delay_until TIMESTAMPTZ,
processing_deadline_duration INTEGER NOT NULL,
processing_deadline TIMESTAMPTZ,
status TEXT NOT NULL,
at_most_once BOOLEAN NOT NULL DEFAULT FALSE,
application TEXT NOT NULL,
namespace TEXT NOT NULL,
taskname TEXT NOT NULL,
on_attempts_exceeded INTEGER NOT NULL DEFAULT 1
);
34 changes: 33 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ use std::{borrow::Cow, collections::BTreeMap};

use crate::{Args, logging::LogFormat};

#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum DatabaseAdapter {
/// SQLite database adapter
Sqlite,

/// PostgreSQL database adapter
Postgres,
}

#[derive(PartialEq, Debug, Deserialize, Serialize)]
pub struct Config {
/// The sentry DSN to use for error reporting.
Expand Down Expand Up @@ -121,6 +131,20 @@ pub struct Config {
/// The number of ms for timeouts when publishing messages to kafka.
pub kafka_send_timeout_ms: u64,

/// The database adapter to use for the inflight activation store.
pub database_adapter: DatabaseAdapter,

/// Whether to run the migrations on the database.
/// This is only used by the postgres database adapter, since
/// in production the migrations shouldn't be run by the taskbroker.
pub run_migrations: bool,

/// The url of the postgres database to use for the inflight activation store.
pub pg_url: String,

/// The name of the postgres database to use for the inflight activation store.
pub pg_database_name: String,

/// The path to the sqlite database
pub db_path: String,

Expand Down Expand Up @@ -256,6 +280,10 @@ impl Default for Config {
kafka_auto_offset_reset: "latest".to_owned(),
kafka_send_timeout_ms: 500,
db_path: "./taskbroker-inflight.sqlite".to_owned(),
database_adapter: DatabaseAdapter::Sqlite,
run_migrations: false,
pg_url: "postgres://postgres:password@sentry-postgres-1:5432/".to_owned(),
pg_database_name: "default".to_owned(),
db_write_failure_backoff_ms: 4000,
db_insert_batch_max_len: 256,
db_insert_batch_max_size: 16_000_000,
Expand Down Expand Up @@ -393,7 +421,7 @@ impl Provider for Config {
mod tests {
use std::{borrow::Cow, collections::BTreeMap};

use super::Config;
use super::{Config, DatabaseAdapter};
use crate::{Args, logging::LogFormat};
use figment::Jail;

Expand Down Expand Up @@ -431,6 +459,7 @@ mod tests {
kafka_topic: error-tasks
kafka_deadletter_topic: error-tasks-dlq
kafka_auto_offset_reset: earliest
database_adapter: postgres
db_path: ./taskbroker-error.sqlite
db_max_size: 3000000000
max_pending_count: 512
Expand Down Expand Up @@ -464,6 +493,7 @@ mod tests {
assert_eq!(config.kafka_session_timeout_ms, 6000.to_owned());
assert_eq!(config.kafka_topic, "error-tasks".to_owned());
assert_eq!(config.kafka_deadletter_topic, "error-tasks-dlq".to_owned());
assert_eq!(config.database_adapter, DatabaseAdapter::Postgres);
assert_eq!(config.db_path, "./taskbroker-error.sqlite".to_owned());
assert_eq!(config.max_pending_count, 512);
assert_eq!(config.max_processing_count, 512);
Expand All @@ -480,11 +510,13 @@ mod tests {
fn test_from_args_env_and_args() {
Jail::expect_with(|jail| {
jail.set_env("TASKBROKER_LOG_FILTER", "error");
jail.set_env("TASKBROKER_DATABASE_ADAPTER", "postgres");
jail.set_env("TASKBROKER_MAX_PROCESSING_ATTEMPTS", "5");

let args = Args { config: None };
let config = Config::from_args(&args).unwrap();
assert_eq!(config.log_filter, "error");
assert_eq!(config.database_adapter, DatabaseAdapter::Postgres);
assert_eq!(config.max_processing_attempts, 5);

Ok(())
Expand Down
Loading
Loading