1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
// Copyright (c) 2019 Parity Technologies (UK) Ltd.
// Copyright (c) 2016 twist developers
//
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.
//! An implementation of the [RFC 6455][rfc6455] websocket protocol.
//!
//! To begin a websocket connection one first needs to perform a [handshake],
//! either as [client] or [server], in order to upgrade from HTTP.
//! Once successful, the client or server can transition to a connection,
//! i.e. a [Sender]/[Receiver] pair and send and receive textual or
//! binary data.
//!
//! **Note**: While it is possible to only receive websocket messages it is
//! not possible to only send websocket messages. Receiving data is required
//! in order to react to control frames such as PING or CLOSE. While those will be
//! answered transparently they have to be received in the first place, so
//! calling [`connection::Receiver::receive`] is imperative.
//!
//! **Note**: None of the `async` methods are safe to cancel so their `Future`s
//! must not be dropped unless they return `Poll::Ready`.
//!
//! # Client example
//!
//! ```no_run
//! # use tokio_util::compat::Tokio02AsyncReadCompatExt;
//! # async fn doc() -> Result<(), soketto::BoxedError> {
//! use soketto::handshake::{Client, ServerResponse};
//!
//! // First, we need to establish a TCP connection.
//! let socket = tokio::net::TcpStream::connect("...").await?;
//!
//! // Then we configure the client handshake.
//! let mut client = Client::new(socket.compat(), "...", "/");
//!
//! // And finally we perform the handshake and handle the result.
//! let (mut sender, mut receiver) = match client.handshake().await? {
//! ServerResponse::Accepted { .. } => client.into_builder().finish(),
//! ServerResponse::Redirect { status_code, location } => unimplemented!("follow location URL"),
//! ServerResponse::Rejected { status_code } => unimplemented!("handle failure")
//! };
//!
//! // Over the established websocket connection we can send
//! sender.send_text("some text").await?;
//! sender.send_text("some more text").await?;
//! sender.flush().await?;
//!
//! // ... and receive data.
//! let mut data = Vec::new();
//! receiver.receive_data(&mut data).await?;
//!
//! # Ok(())
//! # }
//!
//! ```
//!
//! # Server example
//!
//! ```no_run
//! # use tokio_util::compat::Tokio02AsyncReadCompatExt;
//! # use tokio::stream::StreamExt;
//! # async fn doc() -> Result<(), soketto::BoxedError> {
//! use soketto::{handshake::{Server, ClientRequest, server::Response}};
//!
//! // First, we listen for incoming connections.
//! let mut listener = tokio::net::TcpListener::bind("...").await?;
//! let mut incoming = listener.incoming();
//!
//! while let Some(socket) = incoming.next().await {
//! // For each incoming connection we perform a handshake.
//! let mut server = Server::new(socket?.compat());
//!
//! let websocket_key = {
//! let req = server.receive_request().await?;
//! req.into_key()
//! };
//!
//! // Here we accept the client unconditionally.
//! let accept = Response::Accept { key: &websocket_key, protocol: None };
//! server.send_response(&accept).await?;
//!
//! // And we can finally transition to a websocket connection.
//! let (mut sender, mut receiver) = server.into_builder().finish();
//!
//! let mut data = Vec::new();
//! let data_type = receiver.receive_data(&mut data).await?;
//!
//! if data_type.is_text() {
//! sender.send_text(std::str::from_utf8(&data)?).await?
//! } else {
//! sender.send_binary(&data).await?
//! }
//!
//! sender.close().await?
//! }
//!
//! # Ok(())
//! # }
//!
//! ```
//! [client]: handshake::Client
//! [server]: handshake::Server
//! [Sender]: connection::Sender
//! [Receiver]: connection::Receiver
//! [rfc6455]: https://tools.ietf.org/html/rfc6455
//! [handshake]: https://tools.ietf.org/html/rfc6455#section-4
#![forbid(unsafe_code)]
pub mod base;
pub mod data;
pub mod extension;
pub mod handshake;
pub mod connection;
use bytes::BytesMut;
use futures::io::{AsyncRead, AsyncReadExt};
use std::io;
pub use connection::{Mode, Receiver, Sender};
pub use data::{Data, Incoming};
pub type BoxedError = Box<dyn std::error::Error + Send + Sync>;
/// A parsing result.
#[derive(Debug, Clone)]
pub enum Parsing<T, N = ()> {
/// Parsing completed.
Done {
/// The parsed value.
value: T,
/// The offset into the byte slice that has been consumed.
offset: usize
},
/// Parsing is incomplete and needs more data.
NeedMore(N)
}
/// A buffer type used for implementing `Extension`s.
#[derive(Debug)]
pub enum Storage<'a> {
/// A read-only shared byte slice.
Shared(&'a [u8]),
/// A mutable byte slice.
Unique(&'a mut [u8]),
/// An owned byte buffer.
Owned(Vec<u8>)
}
impl AsRef<[u8]> for Storage<'_> {
fn as_ref(&self) -> &[u8] {
match self {
Storage::Shared(d) => d,
Storage::Unique(d) => d,
Storage::Owned(b) => b.as_ref()
}
}
}
/// Helper function to allow casts from `usize` to `u64` only on platforms
/// where the sizes are guaranteed to fit.
#[cfg(any(target_pointer_width = "32", target_pointer_width = "64"))]
const fn as_u64(a: usize) -> u64 {
a as u64
}
/// Fill the buffer from the given `AsyncRead` impl with up to `max` bytes.
async fn read<R>(reader: &mut R, dest: &mut BytesMut, max: usize) -> io::Result<()>
where
R: AsyncRead + Unpin
{
let i = dest.len();
dest.resize(i + max, 0u8);
let n = reader.read(&mut dest[i ..]).await?;
dest.truncate(i + n);
if n == 0 {
return Err(io::ErrorKind::UnexpectedEof.into())
}
log::trace!("read {} bytes", n);
Ok(())
}