Skip to content

Conversation

@timwu20
Copy link
Contributor

@timwu20 timwu20 commented Jan 14, 2026

Summary

Implements proper FIN/FIN_ACK handshake for WebRTC substream shutdown according to the libp2p WebRTC-direct specification, with timeout handling to prevent indefinite waiting.

Closes #476

Changes

1. Proper FIN/FIN_ACK Handshake

  • poll_shutdown now waits for FIN_ACK acknowledgment before completing
  • Added waker mechanism to wake shutdown task when FIN_ACK is received
  • Ensures graceful shutdown with delivery guarantees

2. Timeout Protection

  • Added 5-second timeout for FIN_ACK (matching go-libp2p's behavior)
  • Prevents indefinite waiting if remote never sends FIN_ACK
  • Automatically completes shutdown after timeout to avoid resource leaks

3. Shutdown Requirements Fulfilled

  • ✅ Stops accepting new writes during shutdown (transitions through Closing state)
  • ✅ Flushes pending data before sending FIN
  • ✅ Sends FIN flag with empty payload
  • ✅ Waits for FIN_ACK acknowledgment from remote
  • ✅ Triggers data channel closure and resource cleanup

4. Code Refactoring

  • Unified Message event variant to include optional flag parameter (instead of separate MessageWithFlags variant)
  • Unified encode functions into single function with optional flag parameter
  • Fixed flag handling bug: changed from bitwise operations to equality checks
  • Renamed plural "flags" to singular "flag" throughout codebase for consistency with protobuf schema

Implementation Details

The shutdown process now follows this state machine:

  1. OpenClosing (blocks new writes)
  2. Flush pending data
  3. Send FIN flag
  4. ClosingFinSent
  5. Wait for FIN_ACK or timeout (5 seconds)
  6. FinSentFinAcked
  7. Drop substream → channel closes → data channel cleanup

The timeout task is spawned immediately when entering FinSent state and wakes the shutdown future after 5 seconds if no FIN_ACK is received.

Testing

  • Added comprehensive tests for FIN/FIN_ACK handshake
  • Added test for timeout behavior when FIN_ACK is not received

Compatibility

// Flags.
pub flags: Option<i32>,
/// Flag.
pub flag: Option<i32>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe we could enforce here Option<Flag>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in c7f4ea8

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements proper FIN/FIN_ACK handshake for WebRTC substream shutdown according to the libp2p specification. The implementation adds graceful shutdown with acknowledgment guarantees and timeout protection to prevent indefinite waiting.

Changes:

  • Added FIN_ACK flag to protobuf schema and implemented proper handshake with 5-second timeout
  • Unified message encoding API and renamed "flags" to "flag" throughout codebase for consistency
  • Introduced state machine with Closing, FinSent, and FinAcked states to manage shutdown lifecycle

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/schema/webrtc.proto Added FIN_ACK flag definition to the protobuf message schema
src/transport/webrtc/util.rs Unified encode functions into single method with optional flag parameter, renamed plural "flags" to singular "flag"
src/transport/webrtc/substream.rs Implemented FIN/FIN_ACK handshake state machine with timeout mechanism, updated event handling, added comprehensive tests
src/transport/webrtc/connection.rs Updated to handle new Message event structure with optional flag parameter
src/transport/webrtc/opening.rs Updated encode function calls to pass None for flag parameter

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

rx: Receiver<Event>,

/// Waker to notify when shutdown completes (FIN_ACK received).
shutdown_waker: Arc<Mutex<Option<Waker>>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could an AtomicWaker be used instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 42de0ab.

Closing,

/// We sent FIN, waiting for FIN_ACK.
FinSent,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle receiving a FIN packet while already in the FinSent state? In other words, due to net delays remote has decided to send a FIN and we receive it immediately after sending our own packet

Copy link
Contributor Author

@timwu20 timwu20 Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is expected behaviour since we are waiting on remote to send FIN_ACK before closing our write side of the channel. When we receive a FIN from the remote we send FIN_ACK back and close the read side of the channel. It's just in practice that we don't support this "half-closed" read/write side of the transport since when we call shutdown on the substream, we are closing both sides. So if we have sent the FIN flag to the remote, we called shutdown on the substream.


// Spawn timeout task to wake us after FIN_ACK_TIMEOUT
let waker = cx.waker().clone();
tokio::spawn(async move {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the spanw be avoided here?

Would it be possible for poll_shutdown to be called w different waker instances?

  • in this case we'll spawn a timeout which after 5s wakes the wrong waker
  • the poll contract states we should preserve the latest waker received

Maybe change this to an optional pinned tokio::time::sleep? Then we'll wake the same function and wake the appropriate waker

nit: maybe use pin_project to avoid the boxing? Or similar strategy to how PollSender manages mem with memcpy under the hood

Copy link
Contributor Author

@timwu20 timwu20 Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that I've changed it to use AtomicWaker in 42de0ab, which updates to the latest worker it should now work if poll_shutdown is called multiple times. However, it's quite unlikely that the poll_shutdown would be called multiple times since the Substream is not polled in different contexts.

I've avoided the spawn by using a Option<Pin<Box<tokio::time::Sleep>>> in 2ec58e4, but I was unable to use pin_project because it would make Substream !Unpin, which is required in the rest of the codebase. The tradeoff being we are doing a small heap allocation for the timeout when shutting down.

let protobuf_payload = schema::webrtc::Message {
message: (!payload.is_empty()).then_some(payload),
flag: Some(flags),
flag,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code below is a bit unoptimized:

  • one allocation for vec::with_capacity (protobuf enc)
  • one allocation for bytes mut for variant-prefix

Could we write something like:

    let proto_len = protobuf_payload.encoded_len();
    let varint_len = unsigned_varint::encode::usize_buffer().len()

    // single alloc
    let mut out_buf = Vec::with_capacity(varint_len + proto_len);

    let mut varint_buf = unsigned_varint::encode::usize_buffer();
    let varint_bytes = unsigned_varint::encode::usize(proto_len, &mut varint_buf);
    out_buf.extend_from_slice(varint_bytes);

    // Encode protobuf directly into final buffer
    protobuf_payload.encode(&mut out_buf).expect(..suffucient)

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the changes in a4effe2. I also optimized the decode function as well.

Ok(message) => Ok(Self {
payload: message.message,
flags: message.flag,
flag: message.flag,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: BytesMut::from(payload) will copy the entire payload, instead could we:

  • let (len, remaining) = unsigned_varint::decode::usize(payload)
  • then decode the mssage via let message = schema::webrtc::Message::decode(&remaining[..len])?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented the suggestion in a4effe2.

}

if flag == Flag::StopSending as i32 {
*self.state.lock() = State::SendClosed;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we receive the flag StopSending in the following race-case?

  • T0: poll_write is called with Open
  • T1: poll_reserve(cx) returns Pending -- waker registered for channel capacity
  • T2: remote sends us the STOP_SENDING flag

Because we are not wakeing up any waker here, the poll_write will block indefinetely until we get channel capacity

Copy link
Contributor Author

@timwu20 timwu20 Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! This should be fixed now in 73f76ec. I introduced a write_waker to Substream and SubstreamHandle to re-poll the waker in the event that a STOP_SENDING flag is received.

SendClosed,

/// Shutdown initiated, flushing pending data before sending FIN.
Closing,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we couple any State transition with an associated waker?

In other words, who is going to wake up the counter part of poll_shutdown or poll_write if we are transitioning the state from on_message or individual poll functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be addressed now in 73f76ec.

}

if flags & 2 == Flag::ResetStream as i32 {
if flag == Flag::ResetStream as i32 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to the state message, we are not waking other poll_ functions to announce the state transition. Shouldn't we do that to avoid blocking / delys in processing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are waking the write_waker if we receive this flag in 73f76ec.

Copy link
Collaborator

@lexnv lexnv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good to me! Left a few comments to close the gap on poll impl missing wakers, and some suggestions to optimize lock-free state, encoding / decoding inefficiencies, and wake up on state transitions 🙏


/// Remote is no longer interested in receiving anything.
SendClosed,

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also replace the state with an AtomicU8 and have the implementation look like:

struct SharedState {
   // lock-free / more efficient 
   // BITS 0 to 3 
   // 0: open 1 sendClosed 2: closing 3: FinSent (our states) 4: FinAck
    
   // BIT 4: remote recv closed via FIN recv
   // BIT 5: STOP_SENDING remote send closed
   state: AtomicU8, 
   
   // Waker for tasks waiting on state changes (ie poll write blocked on state, but poll shutdown has FIN_ACK) -- triggered via on_message for STOP_SENDING, FIN_ACK, RESET_STREAM etc
   state_waker: AtomicWaker,
}   

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that state transitions are relatively rare in this protocol, the performance gain would not be that pronounced. I also think it's easier to understand semantically using the enum. I think It's more of a nice-to-have optimization, rather than a must have.

In terms of the single state_waker AtomicWaker, given that the Substream must be owned and polled by exactly one task, I think it could be merged, but I think it makes it easier to understand semantically about which waker we are registering. I'd rather not introduce the burden of figuring out if we should register over an already registered AtomicWorker based on state transition.

Does that make sense @lexnv?

timwu20 and others added 3 commits January 22, 2026 11:46
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +193 to +219
if self.read_closed.swap(true, std::sync::atomic::Ordering::SeqCst) {
// Already processed FIN, ignore duplicate
tracing::debug!(
target: "litep2p::webrtc::substream",
"received duplicate FIN, ignoring"
);
return Ok(());
}

// Received FIN from remote, close our read half
self.inbound_tx.send(Event::RecvClosed).await?;

// Send FIN_ACK back to remote using try_send to avoid blocking.
// If the channel is full, the remote will timeout waiting for FIN_ACK
// and handle it gracefully. This prevents deadlock if the outbound
// channel is blocked due to backpressure.
if let Err(e) = self.outbound_tx.try_send(Event::Message {
payload: vec![],
flag: Some(Flag::FinAck),
}) {
tracing::warn!(
target: "litep2p::webrtc::substream",
?e,
"failed to send FIN_ACK, remote will timeout"
);
}
return Ok(());
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The duplicate FIN handling only prevents sending multiple RecvClosed events via the read_closed atomic flag check, but it still attempts to send FIN_ACK even for duplicate FINs. While the try_send might fail if the channel is closed, it would be cleaner to return early after detecting a duplicate FIN (at line 199) to avoid attempting to send FIN_ACK unnecessarily.

Additionally, if a FIN is received when we're already in the FinAcked state (we've completed our own shutdown), sending another FIN_ACK might be redundant. However, this is a minor issue as the remote should not be sending FIN after receiving our FIN_ACK, and if they do, sending another FIN_ACK is harmless.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code already returns early after detecting a duplicate FIN. The swap returns the previous value. If read_closed was already true, we return immediately at line 199 without sending FIN_ACK.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
// State machine for proper shutdown:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In poll_flush we return immediately, IIUC we should send the FIN packet only after all pending data is delivered. Do we handle this gracefully or can deliver FIN before buffered data reaches the remote?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes with the poll_reserve call on line 446, we won't send the FIN via self.tx unless the channel has space. After the FIN flag is sent, we won't drop the connection until a FIN_ACK is received or the timeout expires.


/// Shorter timeout for tests.
#[cfg(test)]
const FIN_ACK_TIMEOUT: Duration = Duration::from_secs(2);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this and rely on 5s as production

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 184971c.

Copy link
Collaborator

@lexnv lexnv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Let's create an issue for implementing the lock-free atomic state, thanks

@timwu20
Copy link
Contributor Author

timwu20 commented Jan 23, 2026

LGTM!

Let's create an issue for implementing the lock-free atomic state, thanks

Created an issue to track this #523

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

webrtc: Gracefully close substreams

4 participants