Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 79 additions & 28 deletions snuba/web/rpc/v1/endpoint_export_trace_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@
class ExportTraceItemsPageToken:
def __init__(
self,
last_seen_project_id: int,
last_seen_item_type: TraceItemType.ValueType,
last_seen_timestamp: float,
last_seen_trace_id: str,
last_seen_item_id: str,
):
self.last_seen_project_id = last_seen_project_id
self.last_seen_item_type = last_seen_item_type
self.last_seen_timestamp = last_seen_timestamp
self.last_seen_trace_id = last_seen_trace_id
Expand All @@ -67,38 +69,45 @@ def from_protobuf(cls, page_token: PageToken) -> Optional["ExportTraceItemsPageT
if page_token == PageToken():
return None
filters = page_token.filter_offset.and_filter.filters
if len(filters) != 4:
if len(filters) != 5:
raise ValueError("Invalid page token")

if not (
filters[0].comparison_filter.key.name == "last_seen_item_type"
filters[0].comparison_filter.key.name == "last_seen_project_id"
and filters[0].comparison_filter.key.type == AttributeKey.Type.TYPE_INT
):
raise ValueError("Invalid project id")
last_seen_project_id = filters[0].comparison_filter.value.val_int
if not (
filters[1].comparison_filter.key.name == "last_seen_item_type"
and filters[1].comparison_filter.key.type == AttributeKey.Type.TYPE_INT
):
raise ValueError("Invalid item type")
last_seen_item_type = filters[0].comparison_filter.value.val_int
last_seen_item_type = filters[1].comparison_filter.value.val_int

if not (
filters[1].comparison_filter.key.name == "last_seen_timestamp"
and filters[1].comparison_filter.key.type == AttributeKey.Type.TYPE_DOUBLE
filters[2].comparison_filter.key.name == "last_seen_timestamp"
and filters[2].comparison_filter.key.type == AttributeKey.Type.TYPE_DOUBLE
):
raise ValueError("Invalid page token")
last_seen_timestamp = filters[1].comparison_filter.value.val_double
last_seen_timestamp = filters[2].comparison_filter.value.val_double

if not (
filters[2].comparison_filter.key.name == "last_seen_trace_id"
and filters[2].comparison_filter.key.type == AttributeKey.Type.TYPE_STRING
filters[3].comparison_filter.key.name == "last_seen_trace_id"
and filters[3].comparison_filter.key.type == AttributeKey.Type.TYPE_STRING
):
raise ValueError("Invalid trace id")
last_seen_trace_id = filters[2].comparison_filter.value.val_str
last_seen_trace_id = filters[3].comparison_filter.value.val_str

if not (
filters[3].comparison_filter.key.name == "last_seen_item_id"
and filters[3].comparison_filter.key.type == AttributeKey.Type.TYPE_STRING
filters[4].comparison_filter.key.name == "last_seen_item_id"
and filters[4].comparison_filter.key.type == AttributeKey.Type.TYPE_STRING
):
raise ValueError("Invalid page token")
last_seen_item_id = filters[3].comparison_filter.value.val_str
last_seen_item_id = filters[4].comparison_filter.value.val_str

return cls(
last_seen_project_id,
cast(TraceItemType.ValueType, last_seen_item_type),
last_seen_timestamp,
last_seen_trace_id,
Expand All @@ -109,6 +118,15 @@ def to_protobuf(self) -> PageToken:
filters = TraceItemFilter(
and_filter=AndFilter(
filters=[
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=AttributeKey(
name="last_seen_project_id", type=AttributeKey.Type.TYPE_INT
),
op=ComparisonFilter.OP_EQUALS,
value=AttributeValue(val_int=self.last_seen_project_id),
)
),
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=AttributeKey(
Expand Down Expand Up @@ -210,29 +228,57 @@ def _build_query(
page_token_filter = (
[
or_cond(
# (item_type > page_token.last_seen_item_type)
f.greater(column("item_type"), literal(page_token.last_seen_item_type)),
# (project_id > page_token.last_seen_project_id)
f.greater(column("project_id"), literal(page_token.last_seen_project_id)),
or_cond(
# (item_type = page_token.last_seen_item_type AND timestamp > page_token.last_seen_timestamp)
# (project_id = page_token.last_seen_project_id AND item_type > page_token.last_seen_item_type)
and_cond(
f.equals(column("item_type"), literal(page_token.last_seen_item_type)),
f.greater(column("timestamp"), literal(page_token.last_seen_timestamp)),
f.equals(column("project_id"), literal(page_token.last_seen_project_id)),
f.greater(column("item_type"), literal(page_token.last_seen_item_type)),
),
or_cond(
# (item_type = page_token.last_seen_item_type AND timestamp = page_token.last_seen_timestamp AND trace_id > page_token.last_seen_trace_id)
# (project_id = page_token.last_seen_project_id AND item_type = page_token.last_seen_item_type AND timestamp > page_token.last_seen_timestamp)
and_cond(
f.equals(
column("project_id"), literal(page_token.last_seen_project_id)
),
f.equals(column("item_type"), literal(page_token.last_seen_item_type)),
f.equals(column("timestamp"), literal(page_token.last_seen_timestamp)),
f.greater(column("trace_id"), literal(page_token.last_seen_trace_id)),
f.greater(column("timestamp"), literal(page_token.last_seen_timestamp)),
),
# (item_type = page_token.last_seen_item_type AND timestamp = page_token.last_seen_timestamp AND trace_id = page_token.last_seen_trace_id AND item_id > page_token.last_seen_item_id)
and_cond(
f.equals(column("item_type"), literal(page_token.last_seen_item_type)),
f.equals(column("timestamp"), literal(page_token.last_seen_timestamp)),
f.equals(column("trace_id"), literal(page_token.last_seen_trace_id)),
f.greater(
f.reinterpretAsUInt128(f.reverse(f.unhex(column("item_id")))),
literal(page_token.last_seen_item_id),
or_cond(
# (project_id = page_token.last_seen_project_id AND item_type = page_token.last_seen_item_type AND timestamp = page_token.last_seen_timestamp AND trace_id > page_token.last_seen_trace_id)
and_cond(
f.equals(
column("project_id"), literal(page_token.last_seen_project_id)
),
f.equals(
column("item_type"), literal(page_token.last_seen_item_type)
),
f.equals(
column("timestamp"), literal(page_token.last_seen_timestamp)
),
f.greater(
column("trace_id"), literal(page_token.last_seen_trace_id)
),
),
# (project_id = page_token.last_seen_project_id AND item_type = page_token.last_seen_item_type AND timestamp = page_token.last_seen_timestamp AND trace_id = page_token.last_seen_trace_id AND item_id > page_token.last_seen_item_id)
and_cond(
f.equals(
column("project_id"), literal(page_token.last_seen_project_id)
),
f.equals(
column("item_type"), literal(page_token.last_seen_item_type)
),
f.equals(
column("timestamp"), literal(page_token.last_seen_timestamp)
),
f.equals(
column("trace_id"), literal(page_token.last_seen_trace_id)
),
f.greater(
f.reinterpretAsUInt128(f.reverse(f.unhex(column("item_id")))),
literal(page_token.last_seen_item_id),
),
Comment on lines +248 to +281
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand why you need to OR together these and conditions for pagination. can you explain please?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, the next row will satisfy one of these "first-difference" positions that are OR'd together (aka greater than the token). This is because the next row will either be:

  • the start of the next project ID
  • the start of the next item type within the same project ID
  • the start of the next timestamp within the same project and item type
  • the start of the next trace within the same project, item, and timestamp
  • the start of the next item id within the same project, item, timestamp, and trace
    thanks to the sort key of the table

),
),
),
Expand Down Expand Up @@ -308,6 +354,7 @@ def _to_any_value(value: Any) -> AnyValue:
"ProcessedResults",
[
("items", list[TraceItem]),
("last_seen_project_id", int),
("last_seen_item_type", TraceItemType.ValueType),
("last_seen_timestamp", float),
("last_seen_trace_id", str),
Expand All @@ -318,6 +365,7 @@ def _to_any_value(value: Any) -> AnyValue:

def _convert_rows(rows: Iterable[Dict[str, Any]]) -> ProcessedResults:
items: list[TraceItem] = []
last_seen_project_id = 0
last_seen_item_type = TraceItemType.TRACE_ITEM_TYPE_UNSPECIFIED
last_seen_timestamp = 0.0
last_seen_trace_id = ""
Expand Down Expand Up @@ -377,13 +425,15 @@ def _convert_rows(rows: Iterable[Dict[str, Any]]) -> ProcessedResults:
)
items.append(item)

last_seen_project_id = int(proj_id)
last_seen_item_type = item_type
last_seen_timestamp = float(ts)
last_seen_trace_id = trace_id
last_seen_item_id = item_id

return ProcessedResults(
items=items,
last_seen_project_id=last_seen_project_id,
last_seen_item_type=last_seen_item_type,
last_seen_timestamp=last_seen_timestamp,
last_seen_trace_id=last_seen_trace_id,
Expand Down Expand Up @@ -426,6 +476,7 @@ def _execute(self, in_msg: ExportTraceItemsRequest) -> ExportTraceItemsResponse:
next_token: PageToken | None = None
if len(processed_results.items) >= limit:
next_token = ExportTraceItemsPageToken(
last_seen_project_id=processed_results.last_seen_project_id,
last_seen_item_type=processed_results.last_seen_item_type,
last_seen_trace_id=processed_results.last_seen_trace_id,
last_seen_timestamp=processed_results.last_seen_timestamp,
Expand Down
2 changes: 2 additions & 0 deletions tests/web/rpc/v1/test_endpoint_export_trace_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
start_timestamp=BASE_TIME + timedelta(seconds=i),
trace_id=_SPANS_TRACE_IDS[i],
item_id=_SPANS_ITEM_IDS[i],
project_id=i % 3 + 1,
)
for i in range(_SPAN_COUNT) # 2 minutes
]
Expand All @@ -45,6 +46,7 @@
trace_id=_LOGS_TRACE_IDS[i],
type=TraceItemType.TRACE_ITEM_TYPE_LOG,
item_id=_LOGS_ITEM_IDS[i],
project_id=i % 3 + 1,
)
for i in range(_LOG_COUNT)
]
Expand Down
2 changes: 2 additions & 0 deletions tests/web/rpc/v1/test_endpoint_get_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
),
"sentry.is_segment": AnyValue(bool_value=i == 0),
},
project_id=i % 3 + 1,
)
for i in range(_SPAN_COUNT)
]
Expand All @@ -79,6 +80,7 @@
byteorder="little",
signed=False,
),
project_id=i % 3 + 1,
)
for i in range(10)
]
Expand Down
Loading