use std::net::TcpStream;
use std::io;
use std::fmt;
use std::error;
use http::{HttpScheme, HttpResult, StreamId, Header, HttpError, ErrorCode};
use http::transport::TransportStream;
use http::frame::{SettingsFrame, HttpSetting, FrameIR, PingFrame};
use http::connection::{SendFrame, ReceiveFrame, SendStatus, HttpConnection, EndStream};
use http::session::{Session, Stream, DefaultStream, DefaultSessionState, SessionState};
use http::session::Client as ClientMarker;
use http::priority::SimplePrioritizer;
#[cfg(feature="tls")]
pub mod tls;
pub fn write_preface<W: io::Write>(stream: &mut W) -> Result<(), io::Error> {
let preface = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
try!(stream.write_all(preface));
let settings = {
let mut frame = SettingsFrame::new();
frame.add_setting(HttpSetting::EnablePush(0));
frame
};
let mut buf = io::Cursor::new(Vec::with_capacity(16));
try!(settings.serialize_into(&mut buf));
try!(stream.write_all(buf.get_ref()));
debug!("Sent client preface");
Ok(())
}
pub struct ClientStream<TS: TransportStream>(pub TS, pub HttpScheme, pub String);
pub trait HttpConnectError: error::Error + Send + Sync {}
impl<E> From<E> for HttpError
where E: HttpConnectError + 'static
{
fn from(e: E) -> HttpError {
HttpError::Other(Box::new(e))
}
}
pub trait HttpConnect {
type Stream: TransportStream;
type Err: HttpConnectError + 'static;
fn connect(self) -> Result<ClientStream<Self::Stream>, Self::Err>;
}
pub struct CleartextConnector<'a> {
pub host: &'a str,
pub port: u16,
}
impl<'a> CleartextConnector<'a> {
pub fn new(host: &'a str) -> CleartextConnector {
CleartextConnector {
host: host,
port: 80,
}
}
pub fn with_port(host: &'a str, port: u16) -> CleartextConnector {
CleartextConnector {
host: host,
port: port,
}
}
}
#[derive(Debug)]
pub struct CleartextConnectError(io::Error);
impl fmt::Display for CleartextConnectError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt,
"Cleartext HTTP/2 connect error: {}",
(self as &error::Error).description())
}
}
impl error::Error for CleartextConnectError {
fn description(&self) -> &str {
self.0.description()
}
fn cause(&self) -> Option<&error::Error> {
self.0.cause()
}
}
impl From<io::Error> for CleartextConnectError {
fn from(e: io::Error) -> CleartextConnectError {
CleartextConnectError(e)
}
}
impl HttpConnectError for CleartextConnectError {}
impl<'a> HttpConnect for CleartextConnector<'a> {
type Stream = TcpStream;
type Err = CleartextConnectError;
fn connect(self) -> Result<ClientStream<TcpStream>, CleartextConnectError> {
let mut stream = try!(TcpStream::connect((self.host, self.port)));
try!(write_preface(&mut stream));
Ok(ClientStream(stream, HttpScheme::Http, self.host.into()))
}
}
pub struct RequestStream<'n, 'v, S>
where S: Stream
{
pub headers: Vec<Header<'n, 'v>>,
pub stream: S,
}
pub struct ClientConnection<State = DefaultSessionState<ClientMarker, DefaultStream>>
where State: SessionState
{
conn: HttpConnection,
pub state: State,
}
impl<State> ClientConnection<State>
where State: SessionState
{
pub fn with_connection(conn: HttpConnection, state: State) -> ClientConnection<State> {
ClientConnection {
conn: conn,
state: state,
}
}
#[inline]
pub fn scheme(&self) -> HttpScheme {
self.conn.scheme
}
pub fn expect_settings<Recv: ReceiveFrame, Sender: SendFrame>(&mut self,
rx: &mut Recv,
tx: &mut Sender)
-> HttpResult<()> {
let mut session = ClientSession::new(&mut self.state, tx);
self.conn.expect_settings(rx, &mut session)
}
pub fn start_request<S: SendFrame>(&mut self,
req: RequestStream<State::Stream>,
sender: &mut S)
-> HttpResult<StreamId> {
let end_stream = if req.stream.is_closed_local() {
EndStream::Yes
} else {
EndStream::No
};
let stream_id = self.state.insert_outgoing(req.stream);
try!(self.conn.sender(sender).send_headers(req.headers, stream_id, end_stream));
Ok(stream_id)
}
pub fn send_ping<S: SendFrame>(&mut self, sender: &mut S) -> HttpResult<()> {
try!(self.conn.sender(sender).send_ping(0));
Ok(())
}
pub fn handle_next_frame<Recv: ReceiveFrame, Sender: SendFrame>(&mut self,
rx: &mut Recv,
tx: &mut Sender)
-> HttpResult<()> {
let mut session = ClientSession::new(&mut self.state, tx);
self.conn.handle_next_frame(rx, &mut session)
}
pub fn send_next_data<S: SendFrame>(&mut self, sender: &mut S) -> HttpResult<SendStatus> {
debug!("Sending next data...");
const MAX_CHUNK_SIZE: usize = 8 * 1024;
let mut buf = [0; MAX_CHUNK_SIZE];
let mut prioritizer = SimplePrioritizer::new(&mut self.state, &mut buf);
self.conn.sender(sender).send_next_data(&mut prioritizer)
}
}
pub struct ClientSession<'a, State, S>
where State: SessionState + 'a,
S: SendFrame + 'a
{
state: &'a mut State,
sender: &'a mut S,
}
impl<'a, State, S> ClientSession<'a, State, S>
where State: SessionState + 'a,
S: SendFrame + 'a
{
#[inline]
pub fn new(state: &'a mut State, sender: &'a mut S) -> ClientSession<'a, State, S> {
ClientSession {
state: state,
sender: sender,
}
}
}
impl<'a, State, S> Session for ClientSession<'a, State, S>
where State: SessionState + 'a,
S: SendFrame + 'a
{
fn new_data_chunk(&mut self,
stream_id: StreamId,
data: &[u8],
_: &mut HttpConnection)
-> HttpResult<()> {
debug!("Data chunk for stream {}", stream_id);
let mut stream = match self.state.get_stream_mut(stream_id) {
None => {
debug!("Received a frame for an unknown stream!");
return Ok(());
}
Some(stream) => stream,
};
stream.new_data_chunk(data);
Ok(())
}
fn new_headers<'n, 'v>(&mut self,
stream_id: StreamId,
headers: Vec<Header<'n, 'v>>,
_conn: &mut HttpConnection)
-> HttpResult<()> {
debug!("Headers for stream {}", stream_id);
let mut stream = match self.state.get_stream_mut(stream_id) {
None => {
debug!("Received a frame for an unknown stream!");
return Ok(());
}
Some(stream) => stream,
};
stream.set_headers(headers);
Ok(())
}
fn end_of_stream(&mut self, stream_id: StreamId, _: &mut HttpConnection) -> HttpResult<()> {
debug!("End of stream {}", stream_id);
let mut stream = match self.state.get_stream_mut(stream_id) {
None => {
debug!("Received a frame for an unknown stream!");
return Ok(());
}
Some(stream) => stream,
};
stream.close();
Ok(())
}
fn rst_stream(&mut self,
stream_id: StreamId,
error_code: ErrorCode,
_: &mut HttpConnection)
-> HttpResult<()> {
debug!("RST_STREAM id={:?}, error={:?}", stream_id, error_code);
self.state.get_stream_mut(stream_id).map(|stream| stream.on_rst_stream(error_code));
Ok(())
}
fn new_settings(&mut self,
_settings: Vec<HttpSetting>,
conn: &mut HttpConnection)
-> HttpResult<()> {
debug!("Sending a SETTINGS ack");
conn.sender(self.sender).send_settings_ack()
}
fn on_ping(&mut self, ping: &PingFrame, conn: &mut HttpConnection) -> HttpResult<()> {
debug!("Sending a PING ack");
conn.sender(self.sender).send_ping_ack(ping.opaque_data())
}
fn on_pong(&mut self, _ping: &PingFrame, _conn: &mut HttpConnection) -> HttpResult<()> {
debug!("Received a PING ack");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{ClientSession, write_preface, RequestStream};
use http::{Header, ErrorCode, HttpError};
use http::tests::common::{TestStream, build_mock_client_conn, build_mock_http_conn,
MockReceiveFrame, MockSendFrame};
use http::frame::{SettingsFrame, DataFrame, Frame, RawFrame};
use http::connection::{HttpFrame, SendStatus};
use http::session::{Session, SessionState, Stream, DefaultSessionState};
use http::session::Client as ClientMarker;
#[test]
fn test_init_client_conn() {
let frames = vec![HttpFrame::SettingsFrame(SettingsFrame::new())];
let mut conn = build_mock_client_conn();
let mut sender = MockSendFrame::new();
let mut receiver = MockReceiveFrame::new(frames);
conn.expect_settings(&mut receiver, &mut sender).unwrap();
assert_eq!(receiver.recv_list.len(), 0);
assert_eq!(sender.sent.len(), 1);
let frame = match HttpFrame::from_raw(&sender.sent[0]).unwrap() {
HttpFrame::SettingsFrame(frame) => frame,
_ => panic!("ACK not sent!"),
};
assert!(frame.is_ack());
}
#[test]
fn test_init_client_conn_no_settings() {
let frames = vec![HttpFrame::DataFrame(DataFrame::new(1))];
let mut conn = build_mock_client_conn();
let mut sender = MockSendFrame::new();
let mut receiver = MockReceiveFrame::new(frames);
assert!(conn.expect_settings(&mut receiver, &mut sender).is_err());
}
fn prepare_stream(data: Option<Vec<u8>>) -> TestStream {
let mut stream = TestStream::new();
match data {
None => stream.close_local(),
Some(d) => stream.set_outgoing(d),
};
return stream;
}
#[test]
fn test_client_conn_send_next_data() {
{
let mut conn = build_mock_client_conn();
let mut sender = MockSendFrame::new();
let res = conn.send_next_data(&mut sender).unwrap();
assert_eq!(res, SendStatus::Nothing);
}
{
let mut conn = build_mock_client_conn();
let mut sender = MockSendFrame::new();
conn.state.insert_outgoing(prepare_stream(None));
let res = conn.send_next_data(&mut sender).unwrap();
assert_eq!(res, SendStatus::Nothing);
}
{
let mut conn = build_mock_client_conn();
let mut sender = MockSendFrame::new();
conn.state.insert_outgoing(prepare_stream(Some(vec![1, 2, 3])));
let res = conn.send_next_data(&mut sender).unwrap();
assert_eq!(res, SendStatus::Sent);
let res = conn.send_next_data(&mut sender).unwrap();
assert_eq!(res, SendStatus::Nothing);
}
{
let mut conn = build_mock_client_conn();
let mut sender = MockSendFrame::new();
conn.state.insert_outgoing(prepare_stream(Some(vec![1, 2, 3])));
conn.state.insert_outgoing(prepare_stream(Some(vec![1, 2, 3])));
conn.state.insert_outgoing(prepare_stream(Some(vec![1, 2, 3])));
for _ in 0..3 {
let res = conn.send_next_data(&mut sender).unwrap();
assert_eq!(res, SendStatus::Sent);
}
let res = conn.send_next_data(&mut sender).unwrap();
assert_eq!(res, SendStatus::Nothing);
}
}
#[test]
fn test_client_conn_start_request() {
{
let mut conn = build_mock_client_conn();
let mut sender = MockSendFrame::new();
let stream = RequestStream {
headers: vec![
Header::new(b":method", b"GET"),
],
stream: prepare_stream(None),
};
conn.start_request(stream, &mut sender).unwrap();
assert!(conn.state.get_stream_ref(1).is_some());
assert_eq!(sender.sent.len(), 1);
match HttpFrame::from_raw(&sender.sent[0]).unwrap() {
HttpFrame::HeadersFrame(ref frame) => {
assert!(frame.is_end_of_stream());
}
_ => panic!("Expected a Headers frame"),
};
}
{
let mut conn = build_mock_client_conn();
let mut sender = MockSendFrame::new();
let stream = RequestStream {
headers: vec![
Header::new(b":method", b"POST"),
],
stream: prepare_stream(Some(vec![1, 2, 3])),
};
conn.start_request(stream, &mut sender).unwrap();
assert!(conn.state.get_stream_ref(1).is_some());
assert_eq!(sender.sent.len(), 1);
match HttpFrame::from_raw(&sender.sent.remove(0)).unwrap() {
HttpFrame::HeadersFrame(ref frame) => {
assert!(!frame.is_end_of_stream());
}
_ => panic!("Expected a Headers frame"),
};
}
}
#[test]
fn test_client_session_notifies_stream() {
let mut state = DefaultSessionState::<ClientMarker, TestStream>::new();
state.insert_outgoing(TestStream::new());
let mut conn = build_mock_http_conn();
let mut sender = MockSendFrame::new();
{
let mut session = ClientSession::new(&mut state, &mut sender);
session.new_data_chunk(1, &[1, 2, 3], &mut conn).unwrap();
}
assert_eq!(state.get_stream_ref(1).unwrap().body, vec![1, 2, 3]);
{
let mut session = ClientSession::new(&mut state, &mut sender);
session.new_data_chunk(1, &[4], &mut conn).unwrap();
}
assert_eq!(state.get_stream_ref(1).unwrap().body, vec![1, 2, 3, 4]);
let headers = vec![
Header::new(b":method", b"GET"),
];
{
let mut session = ClientSession::new(&mut state, &mut sender);
session.new_headers(1, headers.clone(), &mut conn).unwrap();
}
assert_eq!(state.get_stream_ref(1).unwrap().headers.clone().unwrap(),
headers);
state.insert_outgoing(TestStream::new());
{
let mut session = ClientSession::new(&mut state, &mut sender);
session.new_data_chunk(3, &[100], &mut conn).unwrap();
}
assert_eq!(state.get_stream_ref(3).unwrap().body, vec![100]);
{
let mut session = ClientSession::new(&mut state, &mut sender);
session.end_of_stream(1, &mut conn).unwrap();
}
assert!(state.get_stream_ref(1).unwrap().is_closed());
assert!(!state.get_stream_ref(3).unwrap().is_closed());
assert_eq!(state.iter().collect::<Vec<_>>().len(), 2);
let closed = state.get_closed();
assert_eq!(closed.len(), 1);
assert_eq!(state.iter().collect::<Vec<_>>().len(), 1);
}
#[test]
fn test_client_session_on_rst_stream() {
let mut state = DefaultSessionState::<ClientMarker, TestStream>::new();
state.insert_outgoing(TestStream::new());
state.insert_outgoing(TestStream::new());
let mut conn = build_mock_http_conn();
let mut sender = MockSendFrame::new();
{
let mut session = ClientSession::new(&mut state, &mut sender);
session.rst_stream(3, ErrorCode::Cancel, &mut conn).unwrap();
}
assert!(state.get_stream_ref(3)
.map(|stream| {
stream.errors.len() == 1 && stream.errors[0] == ErrorCode::Cancel
})
.unwrap());
assert!(state.get_stream_ref(1).map(|stream| stream.errors.len() == 0).unwrap());
}
#[test]
fn test_client_session_on_goaway() {
let mut state = DefaultSessionState::<ClientMarker, TestStream>::new();
let mut conn = build_mock_http_conn();
let mut sender = MockSendFrame::new();
let res = {
let mut session = ClientSession::new(&mut state, &mut sender);
session.on_goaway(0, ErrorCode::ProtocolError, None, &mut conn)
};
if let Err(HttpError::PeerConnectionError(err)) = res {
assert_eq!(err.error_code(), ErrorCode::ProtocolError);
assert_eq!(err.debug_data(), None);
} else {
panic!("Expected a PeerConnectionError");
}
}
#[test]
fn test_write_preface() {
let mut written: Vec<u8> = Vec::new();
write_preface(&mut written).unwrap();
let preface = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
let frames_buf = &written[preface.len()..];
assert_eq!(preface, &written[..preface.len()]);
let raw = RawFrame::parse(frames_buf).unwrap();
let frame: SettingsFrame = Frame::from_raw(&raw).unwrap();
assert!(!frame.is_ack());
}
}