Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
5a64e7e
webrtc: Implement proper FIN/FIN_ACK handshake and fix flag handling bug
timwu20 Jan 8, 2026
057eac8
webrtc: Unify event variants and encoding to use optional flag parameter
timwu20 Jan 9, 2026
0027ec7
webrtc: Implement proper shutdown with FIN/FIN_ACK handshake and wait…
timwu20 Jan 9, 2026
6de9d33
webrtc: Add timeout for FIN_ACK to prevent indefinite waiting
timwu20 Jan 9, 2026
4c5917d
webrtc: Fix clippy and fmt
timwu20 Jan 12, 2026
4695143
webrtc: Fix FIN_ACK response to be sent outbound to network instead o…
timwu20 Jan 12, 2026
8578487
webrtc: Fix data channel closure by detecting substream drop regardle…
timwu20 Jan 13, 2026
c7f4ea8
webrtc: Use protobuf-generated Flag enum instead of i32
timwu20 Jan 22, 2026
833da9b
Update src/transport/webrtc/substream.rs
timwu20 Jan 22, 2026
35b99f4
Update src/transport/webrtc/substream.rs
timwu20 Jan 22, 2026
42de0ab
webrtc: Use AtomicWaker instead of Mutex<Option<Waker>> for shutdown_…
timwu20 Jan 22, 2026
2ec58e4
webrtc: Use pinned Sleep instead of tokio::spawn for FIN_ACK timeout
timwu20 Jan 22, 2026
a4effe2
webrtc: Optimize WebRtcMessage encode/decode to reduce allocations
timwu20 Jan 22, 2026
73f76ec
webrtc: Wake blocked writers and close both sides on RESET_STREAM
timwu20 Jan 22, 2026
7aa6084
webrtc: Fix race condition in shutdown waker registration
timwu20 Jan 22, 2026
e3a436e
webrtc: Log warning for unknown flag values in message decoding
timwu20 Jan 22, 2026
0a24d6b
webrtc: Fix typo in WebRtcMessage doc comment
timwu20 Jan 22, 2026
1cc104a
webrtc: Address review feedback for FIN/FIN_ACK handshake
timwu20 Jan 23, 2026
2440797
webrtc: Additional review feedback fixes
timwu20 Jan 23, 2026
a36a04e
webrtc: Fix varint length calculation to use ilog2
timwu20 Jan 23, 2026
d537573
webrtc: Add test for FIN with payload and clarify race condition comm…
timwu20 Jan 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/schema/webrtc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ message Message {
// The sender abruptly terminates the sending part of the stream. The
// receiver MAY discard any data that it already received on that stream.
RESET_STREAM = 2;
// Sending the FIN_ACK flag acknowledges the previous receipt of a message
// with the FIN flag set. Receiving a FIN_ACK flag gives the recipient
// confidence that the remote has received all sent messages.
FIN_ACK = 3;
}

optional Flag flag = 1;
Expand Down
29 changes: 20 additions & 9 deletions src/transport/webrtc/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
substream::Substream,
transport::{
webrtc::{
schema::webrtc::message::Flag,
substream::{Event as SubstreamEvent, Substream as WebRtcSubstream, SubstreamHandle},
util::WebRtcMessage,
},
Expand Down Expand Up @@ -263,7 +264,7 @@ impl WebRtcConnection {
let fallback_names = std::mem::take(&mut context.fallback_names);
let (dialer_state, message) =
WebRtcDialerState::propose(context.protocol.clone(), fallback_names)?;
let message = WebRtcMessage::encode(message);
let message = WebRtcMessage::encode(message, None);

self.rtc
.channel(channel_id)
Expand Down Expand Up @@ -330,7 +331,10 @@ impl WebRtcConnection {
self.rtc
.channel(channel_id)
.ok_or(Error::ChannelDoesntExist)?
.write(true, WebRtcMessage::encode(response.to_vec()).as_ref())
.write(
true,
WebRtcMessage::encode(response.to_vec(), None).as_ref(),
)
.map_err(Error::WebRtc)?;

let protocol = negotiated.ok_or(Error::SubstreamDoesntExist)?;
Expand Down Expand Up @@ -452,7 +456,7 @@ impl WebRtcConnection {
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
flags = message.flags,
flag = ?message.flag,
data_len = message.payload.as_ref().map_or(0usize, |payload| payload.len()),
"handle inbound message",
);
Expand Down Expand Up @@ -598,20 +602,26 @@ impl WebRtcConnection {
Ok(())
}

/// Handle outbound data.
fn on_outbound_data(&mut self, channel_id: ChannelId, data: Vec<u8>) -> crate::Result<()> {
/// Handle outbound data with optional flag.
fn on_outbound_data(
&mut self,
channel_id: ChannelId,
data: Vec<u8>,
flag: Option<Flag>,
) -> crate::Result<()> {
tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
data_len = ?data.len(),
?flag,
"send data",
);

self.rtc
.channel(channel_id)
.ok_or(Error::ChannelDoesntExist)?
.write(true, WebRtcMessage::encode(data).as_ref())
.write(true, WebRtcMessage::encode(data, flag).as_ref())
.map_err(Error::WebRtc)
.map(|_| ())
}
Expand Down Expand Up @@ -788,7 +798,7 @@ impl WebRtcConnection {
},
event = self.handles.next() => match event {
None => unreachable!(),
Some((channel_id, None | Some(SubstreamEvent::Close))) => {
Some((channel_id, None)) => {
tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
Expand All @@ -800,11 +810,12 @@ impl WebRtcConnection {
self.channels.insert(channel_id, ChannelState::Closing);
self.handles.remove(&channel_id);
}
Some((channel_id, Some(SubstreamEvent::Message(data)))) => {
if let Err(error) = self.on_outbound_data(channel_id, data) {
Some((channel_id, Some(SubstreamEvent::Message { payload, flag }))) => {
if let Err(error) = self.on_outbound_data(channel_id, payload, flag) {
tracing::debug!(
target: LOG_TARGET,
?channel_id,
?flag,
?error,
"failed to send data to remote peer",
);
Expand Down
4 changes: 2 additions & 2 deletions src/transport/webrtc/opening.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl OpeningWebRtcConnection {
};

// create first noise handshake and send it to remote peer
let payload = WebRtcMessage::encode(context.first_message(Role::Dialer)?);
let payload = WebRtcMessage::encode(context.first_message(Role::Dialer)?, None);

self.rtc
.channel(self.noise_channel_id)
Expand Down Expand Up @@ -300,7 +300,7 @@ impl OpeningWebRtcConnection {
};

// create second noise handshake message and send it to remote
let payload = WebRtcMessage::encode(context.second_message()?);
let payload = WebRtcMessage::encode(context.second_message()?, None);

let mut channel =
self.rtc.channel(self.noise_channel_id).ok_or(Error::ChannelDoesntExist)?;
Expand Down
Loading
Loading