-
Notifications
You must be signed in to change notification settings - Fork 27
webrtc: Support FIN/FIN_ACK handshake for substream shutdown #513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
… for acknowledgment
…f inbound to self
src/transport/webrtc/util.rs
Outdated
| // Flags. | ||
| pub flags: Option<i32>, | ||
| /// Flag. | ||
| pub flag: Option<i32>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe we could enforce here Option<Flag>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in c7f4ea8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements proper FIN/FIN_ACK handshake for WebRTC substream shutdown according to the libp2p specification. The implementation adds graceful shutdown with acknowledgment guarantees and timeout protection to prevent indefinite waiting.
Changes:
- Added FIN_ACK flag to protobuf schema and implemented proper handshake with 5-second timeout
- Unified message encoding API and renamed "flags" to "flag" throughout codebase for consistency
- Introduced state machine with Closing, FinSent, and FinAcked states to manage shutdown lifecycle
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/schema/webrtc.proto | Added FIN_ACK flag definition to the protobuf message schema |
| src/transport/webrtc/util.rs | Unified encode functions into single method with optional flag parameter, renamed plural "flags" to singular "flag" |
| src/transport/webrtc/substream.rs | Implemented FIN/FIN_ACK handshake state machine with timeout mechanism, updated event handling, added comprehensive tests |
| src/transport/webrtc/connection.rs | Updated to handle new Message event structure with optional flag parameter |
| src/transport/webrtc/opening.rs | Updated encode function calls to pass None for flag parameter |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/transport/webrtc/substream.rs
Outdated
| rx: Receiver<Event>, | ||
|
|
||
| /// Waker to notify when shutdown completes (FIN_ACK received). | ||
| shutdown_waker: Arc<Mutex<Option<Waker>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could an AtomicWaker be used instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in 42de0ab.
| Closing, | ||
|
|
||
| /// We sent FIN, waiting for FIN_ACK. | ||
| FinSent, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we handle receiving a FIN packet while already in the FinSent state? In other words, due to net delays remote has decided to send a FIN and we receive it immediately after sending our own packet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is expected behaviour since we are waiting on remote to send FIN_ACK before closing our write side of the channel. When we receive a FIN from the remote we send FIN_ACK back and close the read side of the channel. It's just in practice that we don't support this "half-closed" read/write side of the transport since when we call shutdown on the substream, we are closing both sides. So if we have sent the FIN flag to the remote, we called shutdown on the substream.
src/transport/webrtc/substream.rs
Outdated
|
|
||
| // Spawn timeout task to wake us after FIN_ACK_TIMEOUT | ||
| let waker = cx.waker().clone(); | ||
| tokio::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the spanw be avoided here?
Would it be possible for poll_shutdown to be called w different waker instances?
- in this case we'll spawn a timeout which after 5s wakes the wrong waker
- the poll contract states we should preserve the latest waker received
Maybe change this to an optional pinned tokio::time::sleep? Then we'll wake the same function and wake the appropriate waker
nit: maybe use pin_project to avoid the boxing? Or similar strategy to how PollSender manages mem with memcpy under the hood
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that I've changed it to use AtomicWaker in 42de0ab, which updates to the latest worker it should now work if poll_shutdown is called multiple times. However, it's quite unlikely that the poll_shutdown would be called multiple times since the Substream is not polled in different contexts.
I've avoided the spawn by using a Option<Pin<Box<tokio::time::Sleep>>> in 2ec58e4, but I was unable to use pin_project because it would make Substream !Unpin, which is required in the rest of the codebase. The tradeoff being we are doing a small heap allocation for the timeout when shutting down.
src/transport/webrtc/util.rs
Outdated
| let protobuf_payload = schema::webrtc::Message { | ||
| message: (!payload.is_empty()).then_some(payload), | ||
| flag: Some(flags), | ||
| flag, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code below is a bit unoptimized:
- one allocation for vec::with_capacity (protobuf enc)
- one allocation for bytes mut for variant-prefix
Could we write something like:
let proto_len = protobuf_payload.encoded_len();
let varint_len = unsigned_varint::encode::usize_buffer().len()
// single alloc
let mut out_buf = Vec::with_capacity(varint_len + proto_len);
let mut varint_buf = unsigned_varint::encode::usize_buffer();
let varint_bytes = unsigned_varint::encode::usize(proto_len, &mut varint_buf);
out_buf.extend_from_slice(varint_bytes);
// Encode protobuf directly into final buffer
protobuf_payload.encode(&mut out_buf).expect(..suffucient)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made the changes in a4effe2. I also optimized the decode function as well.
src/transport/webrtc/util.rs
Outdated
| Ok(message) => Ok(Self { | ||
| payload: message.message, | ||
| flags: message.flag, | ||
| flag: message.flag, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: BytesMut::from(payload) will copy the entire payload, instead could we:
- let (len, remaining) = unsigned_varint::decode::usize(payload)
- then decode the mssage via let message = schema::webrtc::Message::decode(&remaining[..len])?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented the suggestion in a4effe2.
src/transport/webrtc/substream.rs
Outdated
| } | ||
|
|
||
| if flag == Flag::StopSending as i32 { | ||
| *self.state.lock() = State::SendClosed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we receive the flag StopSending in the following race-case?
- T0: poll_write is called with Open
- T1: poll_reserve(cx) returns Pending -- waker registered for channel capacity
- T2: remote sends us the STOP_SENDING flag
Because we are not wakeing up any waker here, the poll_write will block indefinetely until we get channel capacity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! This should be fixed now in 73f76ec. I introduced a write_waker to Substream and SubstreamHandle to re-poll the waker in the event that a STOP_SENDING flag is received.
| SendClosed, | ||
|
|
||
| /// Shutdown initiated, flushing pending data before sending FIN. | ||
| Closing, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we couple any State transition with an associated waker?
In other words, who is going to wake up the counter part of poll_shutdown or poll_write if we are transitioning the state from on_message or individual poll functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be addressed now in 73f76ec.
src/transport/webrtc/substream.rs
Outdated
| } | ||
|
|
||
| if flags & 2 == Flag::ResetStream as i32 { | ||
| if flag == Flag::ResetStream as i32 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly to the state message, we are not waking other poll_ functions to announce the state transition. Shouldn't we do that to avoid blocking / delys in processing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are waking the write_waker if we receive this flag in 73f76ec.
lexnv
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looks good to me! Left a few comments to close the gap on poll impl missing wakers, and some suggestions to optimize lock-free state, encoding / decoding inefficiencies, and wake up on state transitions 🙏
|
|
||
| /// Remote is no longer interested in receiving anything. | ||
| SendClosed, | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also replace the state with an AtomicU8 and have the implementation look like:
struct SharedState {
// lock-free / more efficient
// BITS 0 to 3
// 0: open 1 sendClosed 2: closing 3: FinSent (our states) 4: FinAck
// BIT 4: remote recv closed via FIN recv
// BIT 5: STOP_SENDING remote send closed
state: AtomicU8,
// Waker for tasks waiting on state changes (ie poll write blocked on state, but poll shutdown has FIN_ACK) -- triggered via on_message for STOP_SENDING, FIN_ACK, RESET_STREAM etc
state_waker: AtomicWaker,
} There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that state transitions are relatively rare in this protocol, the performance gain would not be that pronounced. I also think it's easier to understand semantically using the enum. I think It's more of a nice-to-have optimization, rather than a must have.
In terms of the single state_waker AtomicWaker, given that the Substream must be owned and polled by exactly one task, I think it could be merged, but I think it makes it easier to understand semantically about which waker we are registering. I'd rather not introduce the burden of figuring out if we should register over an already registered AtomicWorker based on state transition.
Does that make sense @lexnv?
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
2d78d7e to
2440797
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if self.read_closed.swap(true, std::sync::atomic::Ordering::SeqCst) { | ||
| // Already processed FIN, ignore duplicate | ||
| tracing::debug!( | ||
| target: "litep2p::webrtc::substream", | ||
| "received duplicate FIN, ignoring" | ||
| ); | ||
| return Ok(()); | ||
| } | ||
|
|
||
| // Received FIN from remote, close our read half | ||
| self.inbound_tx.send(Event::RecvClosed).await?; | ||
|
|
||
| // Send FIN_ACK back to remote using try_send to avoid blocking. | ||
| // If the channel is full, the remote will timeout waiting for FIN_ACK | ||
| // and handle it gracefully. This prevents deadlock if the outbound | ||
| // channel is blocked due to backpressure. | ||
| if let Err(e) = self.outbound_tx.try_send(Event::Message { | ||
| payload: vec![], | ||
| flag: Some(Flag::FinAck), | ||
| }) { | ||
| tracing::warn!( | ||
| target: "litep2p::webrtc::substream", | ||
| ?e, | ||
| "failed to send FIN_ACK, remote will timeout" | ||
| ); | ||
| } | ||
| return Ok(()); |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The duplicate FIN handling only prevents sending multiple RecvClosed events via the read_closed atomic flag check, but it still attempts to send FIN_ACK even for duplicate FINs. While the try_send might fail if the channel is closed, it would be cleaner to return early after detecting a duplicate FIN (at line 199) to avoid attempting to send FIN_ACK unnecessarily.
Additionally, if a FIN is received when we're already in the FinAcked state (we've completed our own shutdown), sending another FIN_ACK might be redundant. However, this is a minor issue as the remote should not be sending FIN after receiving our FIN_ACK, and if they do, sending another FIN_ACK is harmless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code already returns early after detecting a duplicate FIN. The swap returns the previous value. If read_closed was already true, we return immediately at line 199 without sending FIN_ACK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| mut self: Pin<&mut Self>, | ||
| cx: &mut Context<'_>, | ||
| ) -> Poll<Result<(), std::io::Error>> { | ||
| // State machine for proper shutdown: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In poll_flush we return immediately, IIUC we should send the FIN packet only after all pending data is delivered. Do we handle this gracefully or can deliver FIN before buffered data reaches the remote?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes with the poll_reserve call on line 446, we won't send the FIN via self.tx unless the channel has space. After the FIN flag is sent, we won't drop the connection until a FIN_ACK is received or the timeout expires.
src/transport/webrtc/substream.rs
Outdated
|
|
||
| /// Shorter timeout for tests. | ||
| #[cfg(test)] | ||
| const FIN_ACK_TIMEOUT: Duration = Duration::from_secs(2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this and rely on 5s as production
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in 184971c.
lexnv
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Let's create an issue for implementing the lock-free atomic state, thanks
Created an issue to track this #523 |
cecf23b to
184971c
Compare
Summary
Implements proper FIN/FIN_ACK handshake for WebRTC substream shutdown according to the libp2p WebRTC-direct specification, with timeout handling to prevent indefinite waiting.
Closes #476
Changes
1. Proper FIN/FIN_ACK Handshake
poll_shutdownnow waits for FIN_ACK acknowledgment before completing2. Timeout Protection
3. Shutdown Requirements Fulfilled
Closingstate)4. Code Refactoring
Messageevent variant to include optional flag parameter (instead of separateMessageWithFlagsvariant)Implementation Details
The shutdown process now follows this state machine:
Open→Closing(blocks new writes)Closing→FinSentFinSent→FinAckedThe timeout task is spawned immediately when entering
FinSentstate and wakes the shutdown future after 5 seconds if no FIN_ACK is received.Testing
Compatibility