Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export-sample-test-data:

.PHONY: docs
docs:
cargo docs --document-private-items --exclude rollup-node-chain-orchestrator
cargo +$(NIGHTLY_TOOLCHAIN) docs --document-private-items --exclude rollup-node-chain-orchestrator

.PHONY: pr
pr: lint test docs
Expand Down
2 changes: 2 additions & 0 deletions crates/chain-orchestrator/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use scroll_network::NewBlockWithPeer;
pub enum ChainOrchestratorEvent {
/// A received block failed the consensus checks.
BlockFailedConsensusChecks(B256, PeerId),
/// A finalized block was received from a peer.
L2FinalizedBlockReceived(B256, PeerId),
/// A new block has been received from the network but we have insufficient data to process it
/// due to being in optimistic mode.
InsufficientDataForReceivedBlock(B256),
Expand Down
11 changes: 11 additions & 0 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,17 @@ impl<
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
tracing::debug!(target: "scroll::chain_orchestrator", block_hash = ?block_with_peer.block.header.hash_slow(), block_number = ?block_with_peer.block.number, peer_id = ?block_with_peer.peer_id, "Received new block from peer");

// Check we are not handling a finalized block.
if block_with_peer.block.header.number <= self.engine.fcs().finalized_block_info().number {
self.network
.handle()
.block_import_outcome(BlockImportOutcome::finalized_block(block_with_peer.peer_id));
return Ok(Some(ChainOrchestratorEvent::L2FinalizedBlockReceived(
block_with_peer.block.header.hash_slow(),
block_with_peer.peer_id,
)));
}

if let Err(err) =
self.consensus.validate_new_block(&block_with_peer.block, &block_with_peer.signature)
{
Expand Down
7 changes: 7 additions & 0 deletions crates/network/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ pub struct BlockImportOutcome {
}

impl BlockImportOutcome {
/// Creates a new `BlockImportOutcome` instance for a finalized block with the given peer ID.
pub fn finalized_block(peer: PeerId) -> Self {
Self { peer, result: Err(BlockImportError::L2FinalizedBlockReceived(peer)) }
}

/// Creates a new `BlockImportOutcome` instance for an invalid block with the given peer ID.
pub fn invalid_block(peer: PeerId) -> Self {
Self { peer, result: Err(BlockImportError::Validation(BlockValidationError::InvalidBlock)) }
Expand Down Expand Up @@ -56,6 +61,8 @@ pub enum BlockImportError {
Consensus(ConsensusError),
/// An error occurred during block validation.
Validation(BlockValidationError),
/// A finalized block was received from a peer.
L2FinalizedBlockReceived(PeerId),
}

/// A consensus related error that can occur during block import.
Expand Down
85 changes: 71 additions & 14 deletions crates/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use reth_tokio_util::{EventSender, EventStream};
use rollup_node_primitives::{sig_encode_hash, BlockInfo};
use scroll_alloy_hardforks::ScrollHardforks;
use scroll_wire::{
NewBlock, ScrollWireConfig, ScrollWireEvent, ScrollWireManager, ScrollWireProtocolHandler,
LRU_CACHE_SIZE,
NewBlock, PeerBlockState, ScrollWireConfig, ScrollWireEvent, ScrollWireManager,
ScrollWireProtocolHandler, LRU_CACHE_SIZE,
};
use std::{
pin::Pin,
Expand Down Expand Up @@ -184,12 +184,26 @@ impl<
// Compute the block hash.
let hash = block.block.hash_slow();

// Filter the peers that have not seen this block hash.
// Filter the peers that have not seen this block hash via either protocol.
// We iterate over all connected scroll-wire peers.
let peers: Vec<FixedBytes<64>> = self
.scroll_wire
.state()
.iter()
.filter_map(|(peer_id, blocks)| (!blocks.contains(&hash)).then_some(*peer_id))
.connected_peers()
.filter_map(|peer_id| {
// Check if peer has seen this block via any protocol
let has_seen = self
.scroll_wire
.peer_block_state()
.get(peer_id)
.is_some_and(|state| state.has_seen(&hash));

// Only announce if peer hasn't seen this block
if !has_seen {
Some(*peer_id)
} else {
None
}
})
.collect();

// TODO: remove this once we deprecate l2geth.
Expand Down Expand Up @@ -240,15 +254,35 @@ impl<
ScrollWireEvent::NewBlock { peer_id, block, signature } => {
let block_hash = block.hash_slow();
trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, signature = ?signature, "Received new block");

// Check if this peer has already received this block via scroll-wire, if so
// penalize it.
let state = self
.scroll_wire
.peer_block_state_mut()
.entry(peer_id)
.or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE));
if state.has_seen_via_scroll_wire(&block_hash) {
tracing::warn!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block via scroll-wire, penalizing");
self.inner_network_handle.reputation_change(
peer_id,
reth_network_api::ReputationChangeKind::BadBlock,
);
return None;
} else {
// Update the state: peer has seen this block via scroll-wire
state.insert_scroll_wire(block_hash);
}

if self.blocks_seen.contains(&(block_hash, signature)) {
None
} else {
// Update the state of the peer cache i.e. peer has seen this block.
// Update the state: peer has seen this block via scroll-wire
self.scroll_wire
.state_mut()
.peer_block_state_mut()
.entry(peer_id)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(block_hash);
.or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE))
.insert_scroll_wire(block_hash);
// Update the state of the block cache i.e. we have seen this block.
self.blocks_seen.insert((block.hash_slow(), signature));

Expand Down Expand Up @@ -310,6 +344,11 @@ impl<
self.inner_network_handle
.reputation_change(peer, reth_network_api::ReputationChangeKind::BadBlock);
}
Err(BlockImportError::L2FinalizedBlockReceived(peer)) => {
trace!(target: "scroll::network::manager", peer_id = ?peer, "Block import failed - finalized block received - penalizing peer");
self.inner_network_handle
.reputation_change(peer, reth_network_api::ReputationChangeKind::BadBlock);
}
}
}

Expand Down Expand Up @@ -339,17 +378,35 @@ impl<
.and_then(|i| Signature::from_raw(&extra_data[i..]).ok())
{
let block_hash = block.hash_slow();

// Check if this peer has already sent this block to us via eth-wire, if so penalize it.
let state = self
.scroll_wire
.peer_block_state_mut()
.entry(peer_id)
.or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE));

if state.has_seen_via_eth_wire(&block_hash) {
tracing::warn!(target: "scroll::bridge::import", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block via eth-wire, penalizing");
self.inner_network_handle
.reputation_change(peer_id, reth_network_api::ReputationChangeKind::BadBlock);
return None;
} else {
// Update the state: peer has seen this block via eth-wire
state.insert_eth_wire(block_hash);
}

if self.blocks_seen.contains(&(block_hash, signature)) {
return None;
}
trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, signature = %signature.to_string(), extra_data = %extra_data.to_string(), "Received new block from eth-wire protocol");

// Update the state of the peer cache i.e. peer has seen this block.
// Update the state: peer has seen this block via eth-wire
self.scroll_wire
.state_mut()
.peer_block_state_mut()
.entry(peer_id)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(block_hash);
.or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE))
.insert_eth_wire(block_hash);

// Update the state of the block cache i.e. we have seen this block.
self.blocks_seen.insert((block_hash, signature));
Expand Down
2 changes: 1 addition & 1 deletion crates/scroll-wire/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub use config::ScrollWireConfig;

mod connection;
mod manager;
pub use manager::{ScrollWireManager, LRU_CACHE_SIZE};
pub use manager::{PeerBlockState, ScrollWireManager, LRU_CACHE_SIZE};

mod protocol;
pub use protocol::{NewBlock, ScrollWireEvent, ScrollWireProtocolHandler};
98 changes: 81 additions & 17 deletions crates/scroll-wire/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,118 @@ use tracing::trace;
/// The size of the LRU cache used to track blocks that have been seen by peers.
pub const LRU_CACHE_SIZE: u32 = 100;

/// Tracks block announced and received state for a peer.
#[derive(Debug)]
pub struct PeerBlockState {
/// blocks announced to the peer
announced: LruCache<B256>,
/// blocks received via scroll-wire protocol, this is used to penalize peers that send
/// duplicate blocks via scroll-wire.
scroll_wire_received: LruCache<B256>,
/// blocks received via eth-wire protocol, this is used to penalize peers that send duplicate
/// blocks via eth-wire.
eth_wire_received: LruCache<B256>,
}

impl PeerBlockState {
/// Creates a new `PeerBlockState` with the specified LRU cache capacity.
pub fn new(capacity: u32) -> Self {
Self {
announced: LruCache::new(capacity),
scroll_wire_received: LruCache::new(capacity),
eth_wire_received: LruCache::new(capacity),
}
}

/// Check if peer knows about this block (either received or announced).
pub fn has_seen(&self, hash: &B256) -> bool {
self.announced.contains(hash) ||
self.scroll_wire_received.contains(hash) ||
self.eth_wire_received.contains(hash)
}

/// Check if peer has received this block via scroll-wire specifically (for duplicate
/// detection).
pub fn has_seen_via_scroll_wire(&self, hash: &B256) -> bool {
self.scroll_wire_received.contains(hash)
}

/// Check if peer has received this block via eth-wire specifically (for duplicate detection).
pub fn has_seen_via_eth_wire(&self, hash: &B256) -> bool {
self.eth_wire_received.contains(hash)
}

/// Record that this peer has received a block via scroll-wire.
pub fn insert_scroll_wire(&mut self, hash: B256) {
self.scroll_wire_received.insert(hash); // Track for duplicate detection
}

/// Record that this peer has received a block via eth-wire.
pub fn insert_eth_wire(&mut self, hash: B256) {
self.eth_wire_received.insert(hash); // Track for duplicate detection
}

/// Record that we have announced a block to this peer.
pub fn insert_announced(&mut self, hash: B256) {
self.announced.insert(hash); // Only update unified announced, not protocol-specific
}
}

/// A manager for the `ScrollWire` protocol.
#[derive(Debug)]
pub struct ScrollWireManager {
/// A stream of [`ScrollWireEvent`]s produced by the scroll wire protocol.
events: UnboundedReceiverStream<ScrollWireEvent>,
/// A map of connections to peers.
connections: HashMap<PeerId, UnboundedSender<ScrollMessage>>,
/// A map of the state of the scroll wire protocol. Currently the state for each peer
/// is just a cache of the last 100 blocks seen by each peer.
state: HashMap<PeerId, LruCache<B256>>,
/// Unified state tracking block state and blocks received from each peer via both protocols.
peer_block_state: HashMap<PeerId, PeerBlockState>,
}

impl ScrollWireManager {
/// Creates a new [`ScrollWireManager`] instance.
pub fn new(events: UnboundedReceiver<ScrollWireEvent>) -> Self {
trace!(target: "scroll::wire::manager", "Creating new ScrollWireManager instance");
Self { events: events.into(), connections: HashMap::new(), state: HashMap::new() }
Self {
events: events.into(),
connections: HashMap::new(),
peer_block_state: HashMap::new(),
}
}

/// Announces a new block to the specified peer.
pub fn announce_block(&mut self, peer_id: PeerId, block: &NewBlock, hash: B256) {
if let Entry::Occupied(to_connection) = self.connections.entry(peer_id) {
// We send the block to the peer. If we receive an error we remove the peer from the
// connections map and delete its state as the connection is no longer valid.
// connections map and peer_block_state as the connection is no longer valid.
if to_connection.get().send(ScrollMessage::new_block(block.clone())).is_err() {
trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Failed to send block to peer - dropping peer.");
self.state.remove(&peer_id);
self.peer_block_state.remove(&peer_id);
to_connection.remove();
} else {
// Upon successful sending of the block we update the state of the peer.
trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Announced block to peer");
self.state
// Record that we announced this block to the peer
self.peer_block_state
.entry(peer_id)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(hash);
.or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE))
.insert_announced(hash);
}
}
}

/// Returns the state of the `ScrollWire` protocol.
pub const fn state(&self) -> &HashMap<PeerId, LruCache<B256>> {
&self.state
/// Returns an iterator over the connected peer IDs.
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.connections.keys()
}

/// Returns a reference to the peer block state map.
pub const fn peer_block_state(&self) -> &HashMap<PeerId, PeerBlockState> {
&self.peer_block_state
}

/// Returns a mutable reference to the state of the `ScrollWire` protocol.
pub const fn state_mut(&mut self) -> &mut HashMap<PeerId, LruCache<B256>> {
&mut self.state
/// Returns a mutable reference to the peer block state map.
pub const fn peer_block_state_mut(&mut self) -> &mut HashMap<PeerId, PeerBlockState> {
&mut self.peer_block_state
}
}

Expand Down Expand Up @@ -94,7 +159,6 @@ impl Future for ScrollWireManager {
direction
);
this.connections.insert(peer_id, to_connection);
this.state.insert(peer_id, LruCache::new(100));
}
None => break,
}
Expand Down
Loading