1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
//! The module exposes an API for defining data prioritization strategies.
//!
//! Types that implement the `DataPrioritizer` trait can be used to provide new data for an
//! `HttpConnection` to send to its peer. Neither the `HttpConnection` nor the `DataPrioritizer`
//! have control over exactly *when* the data is sent. This is left up to the particular client
//! implementations to trigger.

use http::{HttpResult, HttpError};
use http::connection::{DataChunk, EndStream};
use http::session::{SessionState, StreamDataChunk, StreamDataError, Stream};

/// A trait that types that want to provide data to an HTTP/2 connection need to implement.
pub trait DataPrioritizer {
    /// Returns the next `DataChunk` that should be sent on the HTTP/2 connection. `None` indicates
    /// that currently there was no data that could be sent at that point.
    fn get_next_chunk(&mut self) -> HttpResult<Option<DataChunk>>;
}

/// An implementation of the `DataPrioritizer` trait that is based on finding the first stream from
/// the given `SessionState` instance that can send data and returning this chunk.
///
/// For all means and purposes, the order of data chunks that the prioritizer returns is undefined
/// and should not be relied on.
pub struct SimplePrioritizer<'a, 'b, State>
    where State: SessionState + 'a
{
    /// The session state from which the streams' data will be taken
    state: &'a mut State,
    /// The buffer into which the prioritizer can place the stream data chunk
    buf: &'b mut [u8],
}

impl<'a, 'b, State> SimplePrioritizer<'a, 'b, State>
    where State: SessionState + 'a
{
    /// Creates a new `SimplePrioritizer` that will use the given state to find stream data that
    /// should be sent and use the given buffer to hold the data of the returned chunk.
    pub fn new(state: &'a mut State, buf: &'b mut [u8]) -> SimplePrioritizer<'a, 'b, State> {
        SimplePrioritizer {
            state: state,
            buf: buf,
        }
    }
}

impl<'a, 'b, State> DataPrioritizer for SimplePrioritizer<'a, 'b, State>
    where State: SessionState + 'a
{
    fn get_next_chunk(&mut self) -> HttpResult<Option<DataChunk>> {
        // Returns the data of the first stream that has data to be written.
        for (stream_id, stream) in self.state.iter().filter(|&(_, ref s)| !s.is_closed_local()) {
            let res = stream.get_data_chunk(self.buf);
            match res {
                Ok(StreamDataChunk::Last(total)) => {
                    return Ok(Some(DataChunk::new_borrowed(&self.buf[..total],
                                                           *stream_id,
                                                           EndStream::Yes)));
                }
                Ok(StreamDataChunk::Chunk(total)) => {
                    return Ok(Some(DataChunk::new_borrowed(&self.buf[..total],
                                                           *stream_id,
                                                           EndStream::No)));
                }
                Ok(StreamDataChunk::Unavailable) => {
                    // Stream is still open, but currently has no data that could be sent.
                    // Pass...
                }
                Err(StreamDataError::Closed) => {
                    // Transition the stream state to be locally closed, so we don't attempt to
                    // write any more data on this stream.
                    stream.close_local();
                    // Find a stream with data to actually write to...
                }
                Err(StreamDataError::Other(e)) => {
                    // Any other error is fatal!
                    return Err(HttpError::Other(e));
                }
            };
        }
        // Nothing can be sent if we reach here -- no streams have data that can be sent.
        Ok(None)
    }
}

#[cfg(test)]
mod tests {
    use super::{DataPrioritizer, SimplePrioritizer};
    use http::session::{DefaultSessionState, SessionState};
    use http::session::Client as ClientMarker;

    use http::tests::common::TestStream;

    #[test]
    fn test_simple_prioritizer() {
        fn prepare_state() -> DefaultSessionState<ClientMarker, TestStream> {
            DefaultSessionState::<ClientMarker, _>::new()
        }

        {
            // No streams in the session
            let mut buf = [0; 5];
            let mut state = prepare_state();
            let mut prioritizer = SimplePrioritizer::new(&mut state, &mut buf);

            let chunk = prioritizer.get_next_chunk().unwrap();

            assert!(chunk.is_none());
        }
        {
            // One stream, one chunk
            let mut buf = [0; 5];
            let mut state = prepare_state();
            let mut stream = TestStream::new();
            stream.set_outgoing(vec![1, 2, 3]);
            state.insert_outgoing(stream);
            let mut prioritizer = SimplePrioritizer::new(&mut state, &mut buf);

            {
                let chunk = prioritizer.get_next_chunk().unwrap().unwrap();
                assert_eq!(chunk.data, vec![1, 2, 3]);
            }

            // Now we have no more data?
            assert!(prioritizer.get_next_chunk().unwrap().is_none());
        }
        {
            // One stream, two chunks
            let mut buf = [0; 2];
            let mut state = prepare_state();
            let mut stream = TestStream::new();
            stream.set_outgoing(vec![1, 2, 3]);
            state.insert_outgoing(stream);
            let mut prioritizer = SimplePrioritizer::new(&mut state, &mut buf);

            {
                let chunk = prioritizer.get_next_chunk().unwrap().unwrap();
                assert_eq!(chunk.data, vec![1, 2]);
            }
            {
                let chunk = prioritizer.get_next_chunk().unwrap().unwrap();
                assert_eq!(chunk.data, vec![3]);
            }

            // Now we have no more data?
            assert!(prioritizer.get_next_chunk().unwrap().is_none());
        }
        {
            // Multiple streams
            let mut buf = [0; 10];
            let mut state = prepare_state();
            for _ in 0..3 {
                let mut stream = TestStream::new();
                stream.set_outgoing(vec![1, 2, 3]);
                state.insert_outgoing(stream);
            }

            // In total, we get 3 frames; we don't know anything about the order of the streams,
            // though.
            for _ in 0..3 {
                {
                    let mut prioritizer = SimplePrioritizer::new(&mut state, &mut buf);
                    let chunk = prioritizer.get_next_chunk().unwrap().unwrap();
                    assert_eq!(chunk.data, vec![1, 2, 3]);
                }
                // Zero out the buffer to make sure we don't get false results due to the previous
                // data being the same
                for b in buf.iter_mut() {
                    *b = 0;
                }
            }

            // Now we have no more data?
            let mut prioritizer = SimplePrioritizer::new(&mut state, &mut buf);
            assert!(prioritizer.get_next_chunk().unwrap().is_none());
        }
    }
}