1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
// This file is part of Substrate.

// Copyright (C) 2017-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Helper for sending rate-limited gossip messages.
//!
//! # Context
//!
//! The [`NetworkService`] struct provides a way to send notifications to a certain peer through
//! the [`NetworkService::notification_sender`] method. This method is quite low level and isn't
//! expected to be used directly.
//!
//! The [`QueuedSender`] struct provided by this module is built on top of
//! [`NetworkService::notification_sender`] and provides a cleaner way to send notifications.
//!
//! # Behaviour
//!
//! An instance of [`QueuedSender`] is specific to a certain combination of `PeerId` and
//! protocol name. It maintains a buffer of messages waiting to be sent out. The user of this API
//! is able to manipulate that queue, adding or removing obsolete messages.
//!
//! Creating a [`QueuedSender`] also returns a opaque `Future` whose responsibility it to
//! drain that queue and actually send the messages. If the substream with the given combination
//! of peer and protocol is closed, the queue is silently discarded. It is the role of the user
//! to track which peers we are connected to.
//!
//! In normal situations, messages sent through a [`QueuedSender`] will arrive in the same
//! order as they have been sent.
//! It is possible, in the situation of disconnects and reconnects, that messages arrive in a
//! different order. See also <https://github.com/paritytech/substrate/issues/6756>.
//! However, if multiple instances of [`QueuedSender`] exist for the same peer and protocol, or
//! if some other code uses the [`NetworkService`] to send notifications to this combination or
//! peer and protocol, then the notifications will be interleaved in an unpredictable way.
//!

use crate::{ExHashT, NetworkService};

use async_std::sync::{Mutex, MutexGuard};
use futures::prelude::*;
use futures::channel::mpsc::{channel, Receiver, Sender};
use libp2p::PeerId;
use sp_runtime::traits::Block as BlockT;
use std::{
	borrow::Cow,
	collections::VecDeque,
	fmt,
	sync::Arc,
};

#[cfg(test)]
mod tests;

/// Notifications sender for a specific combination of network service, peer, and protocol.
pub struct QueuedSender<M> {
	/// Shared between the user-facing [`QueuedSender`] and the background future.
	shared_message_queue: SharedMessageQueue<M>,
	/// Used to notify the background future to check for new messages in the message queue.
	notify_background_future: Sender<()>,
	/// Maximum number of elements in [`QueuedSender::shared_message_queue`].
	queue_size_limit: usize,
}

impl<M> QueuedSender<M> {
	/// Returns a new [`QueuedSender`] containing a queue of message for this specific
	/// combination of peer and protocol.
	///
	/// In addition to the [`QueuedSender`], also returns a `Future` whose role is to drive
	/// the messages sending forward.
	pub fn new<B, H, F>(
		service: Arc<NetworkService<B, H>>,
		peer_id: PeerId,
		protocol: Cow<'static, str>,
		queue_size_limit: usize,
		messages_encode: F
	) -> (Self, impl Future<Output = ()> + Send + 'static)
	where
		M: Send + 'static,
		B: BlockT + 'static,
		H: ExHashT,
		F: Fn(M) -> Vec<u8> + Send + 'static,
	{
		let (notify_background_future, wait_for_sender) = channel(0);

		let shared_message_queue = Arc::new(Mutex::new(
			VecDeque::with_capacity(queue_size_limit),
		));

		let background_future = create_background_future(
			wait_for_sender,
			service,
			peer_id,
			protocol,
			shared_message_queue.clone(),
			messages_encode
		);

		let sender = QueuedSender {
			shared_message_queue,
			notify_background_future,
			queue_size_limit,
		};

		(sender, background_future)
	}

	/// Locks the queue of messages towards this peer.
	///
	/// The returned `Future` is expected to be ready quite quickly.
	pub async fn lock_queue<'a>(&'a mut self) -> QueueGuard<'a, M> {
		QueueGuard {
			message_queue: self.shared_message_queue.lock().await,
			queue_size_limit: self.queue_size_limit,
			notify_background_future: &mut self.notify_background_future,
		}
	}

	/// Pushes a message to the queue, or discards it if the queue is full.
	///
	/// The returned `Future` is expected to be ready quite quickly.
	pub async fn queue_or_discard(&mut self, message: M)
	where
		M: Send + 'static
	{
		self.lock_queue().await.push_or_discard(message);
	}
}

impl<M> fmt::Debug for QueuedSender<M> {
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		f.debug_struct("QueuedSender").finish()
	}
}

/// Locked queue of messages to the given peer.
///
/// As long as this struct exists, the background future is asleep and the owner of the
/// [`QueueGuard`] is in total control of the message queue. Messages can only ever be sent out on
/// the network after the [`QueueGuard`] is dropped.
#[must_use]
pub struct QueueGuard<'a, M> {
	message_queue: MutexGuard<'a, MessageQueue<M>>,
	/// Same as [`QueuedSender::queue_size_limit`].
	queue_size_limit: usize,
	notify_background_future: &'a mut Sender<()>,
}

impl<'a, M: Send + 'static> QueueGuard<'a, M> {
	/// Pushes a message to the queue, or discards it if the queue is full.
	///
	/// The message will only start being sent out after the [`QueueGuard`] is dropped.
	pub fn push_or_discard(&mut self, message: M) {
		if self.message_queue.len() < self.queue_size_limit {
			self.message_queue.push_back(message);
		}
	}

	/// Calls `filter` for each message in the queue, and removes the ones for which `false` is
	/// returned.
	///
	/// > **Note**: The parameter of `filter` is a `&M` and not a `&mut M` (which would be
	/// >           better) because the underlying implementation relies on `VecDeque::retain`.
	pub fn retain(&mut self, filter: impl FnMut(&M) -> bool) {
		self.message_queue.retain(filter);
	}
}

impl<'a, M> Drop for QueueGuard<'a, M> {
	fn drop(&mut self) {
		// Notify background future to check for new messages in the message queue.
		let _ = self.notify_background_future.try_send(());
	}
}

type MessageQueue<M> = VecDeque<M>;

/// [`MessageQueue`] shared between [`QueuedSender`] and background future.
type SharedMessageQueue<M> = Arc<Mutex<MessageQueue<M>>>;

async fn create_background_future<B: BlockT, H: ExHashT, M, F: Fn(M) -> Vec<u8>>(
	mut wait_for_sender: Receiver<()>,
	service: Arc<NetworkService<B, H>>,
	peer_id: PeerId,
	protocol: Cow<'static, str>,
	shared_message_queue: SharedMessageQueue<M>,
	messages_encode: F,
) {
	loop {
		if wait_for_sender.next().await.is_none() {
			return
		}

		loop {
			let mut queue_guard = shared_message_queue.lock().await;
			let next_message = match queue_guard.pop_front() {
				Some(msg) => msg,
				None => break,
			};
			drop(queue_guard);

			// Starting from below, we try to send the message. If an error happens when sending,
			// the only sane option we have is to silently discard the message.
			let sender = match service.notification_sender(peer_id.clone(), protocol.clone()) {
				Ok(s) => s,
				Err(_) => continue,
			};

			let ready = match sender.ready().await {
				Ok(r) => r,
				Err(_) => continue,
			};

			let _ = ready.send(messages_encode(next_message));
		}
	}
}