1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
//! Contains an implementation of an asynchronous client.
//!
//! It allows users to make requests to the same underlying connection from
//! different threads concurrently, as well as to receive the response
//! asynchronously.
use std::collections::HashMap;

use std::fmt;
use std::io;
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;

use http::{StreamId, HttpError, Response, StaticResponse, Header, HttpResult, StaticHeader};
use http::frame::{RawFrame, FrameIR};
use http::transport::TransportStream;
use http::connection::{SendFrame, ReceiveFrame, HttpFrame, HttpConnection};
use http::session::{SessionState, DefaultSessionState, DefaultStream, Stream};
use http::session::Client as ClientMarker;
use http::client::{ClientConnection, HttpConnect, HttpConnectError, ClientStream, RequestStream};

/// Like `thread::spawn`, but with a `name` argument
pub fn spawn_named<F, T, S>(name: S, f: F) -> thread::JoinHandle<T>
    where F: FnOnce() -> T,
          F: Send + 'static,
          T: Send + 'static,
          S: Into<String>
{
    thread::Builder::new().name(name.into()).spawn(f).expect("spawn thread")
}

/// A struct representing an asynchronously dispatched request. It is used
/// internally be the `ClientService` and `Client` structs.
struct AsyncRequest {
    /// The method of the request
    pub method: Vec<u8>,
    /// The path being requested
    pub path: Vec<u8>,
    /// Extra headers that should be included in the request. Does *not*
    /// include meta-headers.
    pub headers: Vec<StaticHeader>,
    /// The body of the request, if any.
    pub body: Option<Vec<u8>>,
    /// The sender side of a channel where the response to this request should
    /// be delivered.
    tx: Sender<StaticResponse>,
}

/// A struct that buffers `RawFrame`s in an internal `mpsc` channel and sends them using the
/// wrapped `SendFrame` instance when the `send_next` method is called.
///
/// Additionally, it provides a `ChannelFrameSenderHandle` instance that implements the `SendFrame`
/// trait and as such can be passed to the `HttpConnection`. This handler simply queues the frame
/// into the internal channel, without ever blocking.
///
/// As such, this is a convenience struct that makes it possible to provide non-blocking writes
/// from within `HttpConnection`s, while handling the actual writes using a `SendFrame`
/// implementation that will block until the frame is sent on a separate thread.
struct ChannelFrameSender<S>
    where S: SendFrame
{
    /// The receiving end of the channel. Buffers the frames that are to be sent.
    rx: Receiver<Vec<u8>>,
    /// The `SendFrame` instance that will perform the actual writes from within the `send_next`
    /// method.
    inner: S,
}

impl<S> ChannelFrameSender<S>
    where S: SendFrame
{
    /// Creates a new `ChannelFrameSender` that will use the provided `SendFrame` instance within
    /// the `send_next` method in order to perform the final send to the remote peer.
    /// The `ChannelFrameSenderHandle` that is returned can be used to queue frames for sending
    /// from within `HttpConnection`s, as it implements the `SendFrame` trait.
    fn new(inner: S) -> (ChannelFrameSender<S>, ChannelFrameSenderHandle) {
        let (send, recv) = mpsc::channel();

        let handle = ChannelFrameSenderHandle { tx: send };
        let sender = ChannelFrameSender {
            rx: recv,
            inner: inner,
        };
        (sender, handle)
    }

    /// Performs the send of the next frame that is buffered in the internal channel of the struct.
    ///
    /// If there is no frame in the channel, it will block until there is one there.
    ///
    /// If the channel becomes disconnected from all senders, indicating that all handles to the
    /// sender have been dropped, the mehod will return an error.
    fn send_next(&mut self) -> HttpResult<()> {
        let frame_buffer = try!(self.rx
                                    .recv()
                                    .map_err(|_| {
                                        io::Error::new(io::ErrorKind::Other, "Unable to send frame")
                                    }));
        debug!("Performing the actual send frame IO");
        let raw_frame: RawFrame = frame_buffer.into();
        try!(self.inner.send_frame(raw_frame));
        Ok(())
    }
}

/// A handle to the `ChannelFrameSender` and an implementation of the `SendFrame` trait. It simply
/// queues the given frames into the send queue of the `ChannelFrameSender` without ever blocking.
/// (Except possibly to allocate some memory, as per the `mpsc::channel` specification.)
struct ChannelFrameSenderHandle {
    /// The sender side of the channel that buffers the frames to be written. Allows the handle to
    /// queue the frame for future writing without blocking on the IO.
    tx: Sender<Vec<u8>>,
}

impl SendFrame for ChannelFrameSenderHandle {
    fn send_frame<F: FrameIR>(&mut self, frame: F) -> HttpResult<()> {
        let mut buf = io::Cursor::new(Vec::with_capacity(1024));
        try!(frame.serialize_into(&mut buf));
        try!(self.tx
                 .send(buf.into_inner())
                 .map_err(|_| io::Error::new(io::ErrorKind::Other, "Unable to send frame")));
        debug!("Queued the frame for sending...");
        Ok(())
    }
}

/// A struct that buffers `HttpFrame`s read by the wrapped `ReceiveFrame` instance in an internal
/// `mpsc` channel. The reads from the wrapped `ReceiveFrame` instance are triggered by calls to
/// the `read_next` method.
///
/// Additionally, it provides a `ChannelFrameReceiverHandle` instance that implements the
/// `ReceiveFrame` trait, such that it pops the next available frame from the internal channel.
/// If there are no available frames, it will block, so care must be taken to trigger the
/// connection's frame handling only when there are buffered frames, if it is not to block.
///
/// As such, this is a convenience struct that makes it possible to provide non-blocking reads
/// from within `HttpConnection`s, while handling the actual reads using a `ReceiveFrame`
/// implementation that can block. (Predicated on triggering a single frame handle operation on
/// the connection for each successfully executed `read_next`.)
struct ChannelFrameReceiver<TS>
    where TS: TransportStream
{
    /// The sender side of the channel. Buffers the frames read by the wrapped `ReceiveFrame`
    /// instance for future consumation by the associated `ChannelFrameReceiverHandle`.
    tx: Sender<RawFrame<'static>>,
    /// The `ReceiveFrame` instance that performs the actual reading of the frame, used from within
    /// the `read_next` method.
    inner: TS,
}

use http::frame::unpack_header;
impl<TS> ChannelFrameReceiver<TS>
    where TS: TransportStream
{
    /// Creates a new `ChannelFrameReceiver`, as well as the associated
    /// `ChannelFrameReceiverHandle`.
    fn new(inner: TS) -> (ChannelFrameReceiver<TS>, ChannelFrameReceiverHandle) {
        let (send, recv) = mpsc::channel();

        let handle = ChannelFrameReceiverHandle {
            rx: recv,
            raw: None,
        };
        let receiver = ChannelFrameReceiver {
            tx: send,
            inner: inner,
        };
        (receiver, handle)
    }

    /// Performs a `recv_frame` operation on the wrapped `ReceiveFrame` instance, possibly blocking
    /// the thread in the process, depending on the implementation of the trait. Once a frame is
    /// returned, it will buffer it within the internal channel.
    fn read_next(&mut self) -> HttpResult<()> {
        let mut header = [0; 9];
        try!(TransportStream::read_exact(&mut self.inner, &mut header));
        let total_len = unpack_header(&header).0 as usize;
        let mut buf = Vec::with_capacity(9 + total_len);
        unsafe {
            buf.set_len(9 + total_len);
        }
        try!(io::copy(&mut &header[..], &mut &mut buf[..9]));
        try!(TransportStream::read_exact(&mut self.inner, &mut buf[9..]));
        try!(self.tx
                 .send(buf.into())
                 .map_err(|_| io::Error::new(io::ErrorKind::Other, "Unable to read frame")));
        Ok(())
    }
}

/// A handle to the `ChannelFrameReceiver` and an implementation of the `ReceiveFrame` trait.
/// It simply pops the next frame from the internal channel that buffers the frames read by the
/// `ReceiveFrame` instance wrapped by the associated `ChannelFrameReceiver`. If there are no
/// frames currently buffered, it blocks until there is one. Therefore, the `handle_next_frame`
/// method of the `HttpConnection` that relies on the IO provided by this `ReceiveFrame`
/// implementation should be triggered only when sure that there are buffered frames, if blocking
/// handles are to be avoided.
struct ChannelFrameReceiverHandle {
    /// The receiver end of the channel that buffers the received frames.
    rx: Receiver<RawFrame<'static>>,
    raw: Option<RawFrame<'static>>,
}

impl ReceiveFrame for ChannelFrameReceiverHandle {
    fn recv_frame(&mut self) -> HttpResult<HttpFrame> {
        let raw = try!(self.rx
                           .recv()
                           .map_err(|_| {
                               HttpError::from(io::Error::new(io::ErrorKind::Other,
                                                              "Unable to read frame"))
                           }));
        // Tethers the lifetime of the returned parsed HttpFrame to the lifetime of `self` (i.e.
        // the provider of the frame).
        self.raw = Some(raw);
        HttpFrame::from_raw(self.raw.as_ref().unwrap())
    }
}

/// An enum that represents errors that can be raised by the operation of a
/// `ClientService`.
enum ClientServiceErr {
    /// Corresponds to the case where the service has finished its operation.
    Done,
    /// Corresponds to the case where the service is unable to continue due to
    /// an error that occurred on the underlying HTTP/2 connection.
    Http(HttpError),
}

impl From<HttpError> for ClientServiceErr {
    fn from(err: HttpError) -> ClientServiceErr {
        ClientServiceErr::Http(err)
    }
}

/// An enum representing the types of work that the `ClientService` can perform from within its
/// `run_once` method.
enum WorkItem {
    /// Queue a new request to the HTTP/2 connection.
    Request(AsyncRequest),
    /// Trigger a new `handle_next_frame`. The work item should be queued only when there is a
    /// frame to be handled to avoid blocking the `run_once` call.
    HandleFrame,
    /// Trigger a new `send_next_data` operation.
    SendData,
    /// Signals to the service that a new client is connected. Helps it keep track of whether there
    /// are clients that would expect a response.
    NewClient,
    /// Signals to the service that a client has disconnected. Helps it keep track of whether there
    /// are clients that would expect a response.
    ClientLeft,
    /// Send a PING frame to the server
    SendPing,
}

/// An internal struct encapsulating a service that lets multiple clients
/// issue concurrent requests to the same HTTP/2 connection.
///
/// The service maintains an internal queue of `WorkItem`s that indicate what the operations that
/// it should perform. The next operation from the queue is performed on each `run_once` method
/// call.
///
/// It handles issuing new requests (corresponding to `WorkItem::Request` work item), handling the
/// next received frame (when indicated by the `WorkItem::HandleFrame`), and tracks the number of
/// connected clients (`run_once` returns an error once there are no more clients connected to the
/// service).
///
/// If there is no work in the queue, the `run_once` method blocks.
///
/// Essentially, this represents a simplified event loop that handles events queued on the work
/// queue (blocking to wait for new work when none is available; does not spin). Therefore, the
/// user of the `ClientService` needs to provide a dedicated thread in which to run the `run_once`
/// event loop handler.
///
/// Additionally, the client needs to make sure to perform the actual socket IO (which is fully
/// blocking, without even timeout support currently in Rust) in threads dedicated for that, by
/// calling the `send_next` or `read_next` methods of the `ChannelFrameSender` or
/// `ChannelFrameReceiver`, which are returned from the `ClientService` constructor.
///
/// TODO: Technically, the `run_once` method could take a `WorkItem`, so a single event loop could
///       dispatch work items to a corresponding service, removing the need for the
///       thread-per-service requirement. However, at that point we're nearing a reimplementation
///       of a real event loop, which is slightly out of scope of the `solicit` library, as
///       imagined; the async client is (for now) supposed to be a proof-of-concept
///       implementation of a high-level async/concurrent HTTP/2 client.
struct ClientService {
    /// The number of requests that have been sent, but are yet unanswered.
    outstanding_reqs: u32,
    /// The limit to the number of requests that can be pending (unanswered,
    /// but sent).
    limit: u32,
    /// The connection that is used for underlying HTTP/2 communication.
    conn: ClientConnection,
    /// The handle allows the service to get the HTTP/2 frame that has been extracted from the data
    /// read from the socket on another thread.
    recv_handle: ChannelFrameReceiverHandle,
    /// The handle allows the service to queue HTTP/2 frames for another thread to push out on a
    /// blocking socket.
    send_handle: ChannelFrameSenderHandle,
    /// A mapping of stream IDs to the sender side of a channel that is
    /// expecting a response to the request that is to arrive on that stream.
    chans: HashMap<StreamId, Sender<StaticResponse>>,
    /// The receiver end of a channel to which work items for the service are
    /// queued. Work items include the variants of the `WorkItem` enum.
    work_queue: Receiver<WorkItem>,
    /// The queue of `AsyncRequest`s that haven't yet been sent to the server.
    request_queue: Vec<AsyncRequest>,
    /// Tracks the number of currently connected clients -- once it reaches 0, the `run_once`
    /// method returns an error.
    client_count: i32,
    /// The name of the host the connection is established to.
    host: Vec<u8>,
    /// Whether the connection has already been initialized.
    initialized: bool,
}

/// A helper wrapper around the components of the `ClientService` that are returned from its
/// constructor.
struct Service<S>(ClientService, Sender<WorkItem>, ChannelFrameReceiver<S>, ChannelFrameSender<S>)
    where S: TransportStream;

impl ClientService {
    /// Creates a new `ClientService` that will use the provided `ClientStream` for its underlying
    /// network communication. A handle is returned for both the read, as well as the write end of
    /// the socket that allows the client that creates the `ClientService` to perform the blocking
    /// IO without influencing the `ClientService` (i.e. without having its `run_once` method
    /// block).
    ///
    /// # Returns
    ///
    /// Returns all the relevant components of the newly created `ClientService`:
    ///
    /// - The `ClientService` itself -- processes events (`WorkItem`s) on each `run_once` call.
    /// - The sender-side of the work queue -- allows `WorkItem`s to be queued into the
    ///   `ClientService`'s simplified event loop.
    /// - The `ChannelFrameReceiver` -- the instance that wraps the actual socket that performs
    ///   the blocking read IO. Allows the caller to block on the IO in a customized manner (e.g.
    ///   in a separate dedicated thread).
    /// - The `ChannelFrameSender` -- the instance that wraps the actual socket that performs the
    ///   blocking write IO. Allows the caller to block on the IO in a customized manner (e.g. in
    ///   a separate thread).
    ///
    /// If no HTTP/2 connection can be established to the given host on the
    /// given port, returns `None`.
    pub fn new<S>(client_stream: ClientStream<S>) -> Service<S>
        where S: TransportStream
    {
        let (tx, rx): (Sender<WorkItem>, Receiver<WorkItem>) = mpsc::channel();
        let ClientStream(stream, scheme, host) = client_stream;

        // Manually split the stream into the write/read ends, so that we can...
        let sender = stream.try_split().unwrap();
        let receiver = stream;
        // ...wrap them into the adapters...
        let (recv_frame, recv_handle) = ChannelFrameReceiver::new(receiver);
        let (send_frame, send_handle) = ChannelFrameSender::new(sender);

        // ...and pass the non-blocking/buffering ends into the `HttpConnect` instead of the
        // blocking socket itself.
        let conn = ClientConnection::with_connection(HttpConnection::new(scheme),
                                                     DefaultSessionState::<ClientMarker, _>::new());

        let service = ClientService {
            outstanding_reqs: 0,
            limit: 3,
            conn: conn,
            chans: HashMap::new(),
            work_queue: rx,
            recv_handle: recv_handle,
            send_handle: send_handle,
            request_queue: Vec::new(),
            client_count: 0,
            host: host.as_bytes().to_vec(),
            initialized: false,
        };

        // Returns the handles to the channel sender/receiver, so that the client can use them to
        // perform the real IO somewhere.
        Service(service, tx, recv_frame, send_frame)
    }

    /// Performs one iteration of the service.
    ///
    /// One iteration corresponds to running the next `WorkItem` that the service
    /// has queued in its `work_queue`. Essentially, this is a poor-man's event
    /// loop implementation. If there is no work queued for the service, it will
    /// *block*, until there is. As such, embedding calls to this method into a
    /// real event loop should not be done.
    ///
    /// For `WorkItem::Request` work items, the service will queue the received
    /// `AsyncRequest` for sending. It will also attempt to queue it for
    /// transmission to the server, unless the concurrent requests limit has been
    /// exceeded, in which case the request is kept in an internal FIFO queue and
    /// will be sent when its time comes.
    ///
    /// For `WorkItem::HandleFrame` work items, the service will perform a single
    /// `handle_next_frame` call on its underlying `ClientConnection` instance.
    /// Since the item is queued only when the connection actually has frames to
    /// process, this call will never block. If a response got finalized by the
    /// handling of the frame, it is shipped to the channel that expects it and
    /// a new request from the request queue sent.
    ///
    /// # Returns
    ///
    /// On a successful pass, the function returns an `Ok(())`.
    ///
    /// The `Err` response is returned when there are no more responses to be
    /// received and there are no more clients connected to the service (and
    /// thus no more requests could ever be issued by the instance). This
    /// corresponds to the `ClientServiceErr::Done` variant.
    ///
    /// Any HTTP/2 error is propagated (wrapped into a ClientServiceErr::Http
    /// variant).
    pub fn run_once(&mut self) -> Result<(), ClientServiceErr> {
        let work_item = match self.work_queue.recv() {
            Ok(item) => item,
            // The receive operation can only fail if the sender has
            // disconnected implying no further receives are possible.
            // At that point, we make sure to gracefully stop the service.
            Err(_) => return Err(ClientServiceErr::Done),
        };

        // Dispatch the work to the corresponding method...
        match work_item {
            WorkItem::Request(async_req) => {
                debug!("Queuing request");
                self.request_queue.push(async_req);
                self.queue_next_request();
                Ok(())
            }
            WorkItem::HandleFrame => {
                if self.initialized {
                    self.handle_frame()
                } else {
                    try!(self.conn.expect_settings(&mut self.recv_handle, &mut self.send_handle));
                    self.initialized = true;
                    Ok(())
                }
            }
            WorkItem::SendData => {
                debug!("Will queue some request data");
                try!(self.conn.send_next_data(&mut self.send_handle));
                Ok(())
            },
            WorkItem::SendPing => {
                self.send_ping();
                Ok(())
            },
            WorkItem::NewClient => {
                self.on_new_client();
                Ok(())
            }
            WorkItem::ClientLeft => {
                self.client_count -= 1;
                if self.client_count == 0 {
                    Err(ClientServiceErr::Done)
                } else {
                    Ok(())
                }
            }
        }
    }

    fn on_new_client(&mut self) {
        self.client_count += 1;
    }

    /// A private convenience method that performs the handling of the next received frame.
    ///
    /// It calls the underlying connection's `handle_next_frame` method and then inspects the
    /// changes made to the session, notifying clients of completed requests or queueing new ones,
    /// if available.
    fn handle_frame(&mut self) -> Result<(), ClientServiceErr> {
        // Handles the next frame...
        debug!("Handling next frame");
        try!(self.conn.handle_next_frame(&mut self.recv_handle, &mut self.send_handle));
        // ...and then any connections that may have been closed in the meantime
        // are converted to responses and notifications sent to appropriate
        // channels.
        self.handle_closed();
        // At this point we try to queue another outstanding request (if the
        // limit has not been reached).
        self.queue_next_request();

        Ok(())
    }

    /// Internal helper method. Sends a request to the server based on the
    /// parameters given in the `AsyncRequest`. It blocks until the request is
    /// fully transmitted to the server.
    fn send_request(&mut self, async_req: AsyncRequest) {
        let (req, tx) = self.create_request(async_req);

        trace!("Sending new request...");

        let stream_id = self.conn.start_request(req, &mut self.send_handle).ok().unwrap();
        // The ID has been assigned to the stream, so attach it to the stream instance too.
        // TODO(mlalic): The `Stream` trait should grow an `on_id_assigned` method which can
        //               then be called by the session (i.e. the `ClientConnection` in this case).
        self.conn.state.get_stream_mut(stream_id).unwrap().stream_id = Some(stream_id);

        self.chans.insert(stream_id, tx);
        self.outstanding_reqs += 1;
    }

    /// Internal helper method. Creates a new `RequestStream` instance based on the
    /// given parameters. Such a `RequestStream` instance is ready to be passed to
    /// the connection for transmission to the server (i.e. `start_request`).
    /// Also returns the sender end of the channel to which the response is to be transmitted,
    /// once received.
    fn create_request(&self,
                      async_req: AsyncRequest)
                      -> (RequestStream<'static, 'static, DefaultStream>,
                          Sender<StaticResponse>) {
        let mut headers: Vec<Header> = Vec::new();
        headers.extend(vec![
            Header::new(b":method", async_req.method),
            Header::new(b":path", async_req.path),
            Header::new(b":authority", self.host.clone()),
            Header::new(b":scheme", self.conn.scheme().as_bytes().to_vec()),
        ]
                           .into_iter());
        headers.extend(async_req.headers.into_iter());

        let mut stream = DefaultStream::new();
        match async_req.body {
            Some(body) => stream.set_full_data(body),
            None => stream.close_local(),
        };

        (RequestStream {
            stream: stream,
            headers: headers,
        },
         async_req.tx)
    }

    /// Internal helper method. Sends a response assembled from the given
    /// stream to the corresponding channel that is waiting for the response.
    ///
    /// The given `stream` instance is consumed by this method.
    fn send_response(&mut self, stream: DefaultStream) {
        let stream_id = stream.stream_id.unwrap();
        match self.chans.remove(&stream_id) {
            None => {
                // This should never happen, it means the session gave us
                // a response that we didn't request.
                panic!("Received a response for an unknown request!");
            }
            Some(tx) => {
                let _ = tx.send(Response {
                    stream_id: stream_id,
                    headers: stream.headers.unwrap(),
                    body: stream.body,
                });
            }
        };
    }

    /// Internal helper method. Handles all closed streams by sending appropriate
    /// notifications to waiting channels.
    ///
    /// For now, the channels are all given a `Response`, even though the
    /// stream might end up being closed by the server with an error.
    fn handle_closed(&mut self) {
        let done = self.conn.state.get_closed();
        for stream in done {
            self.send_response(stream);
            self.outstanding_reqs -= 1;
        }
    }

    /// Internal helper method. If there are yet unsent requests queued by a
    /// client to the service and the service has not exceeded the limit of
    /// concurrent requests that it is allowed to issue, it sends a single
    /// new request to the server. Blocks until this request is sent.
    fn queue_next_request(&mut self) {
        if self.outstanding_reqs < self.limit {
            // Try to queue another request since we haven't gone over
            // the (arbitrary) limit.
            debug!("Not over the limit yet. Checking for more requests...");
            if !self.request_queue.is_empty() {
                let async_req = self.request_queue.remove(0);
                self.send_request(async_req);
            }
        }
    }

    /// Internal helper method to send a PING frame to the server
    fn send_ping(&mut self) {
        self.conn.send_ping(&mut self.send_handle).ok().unwrap();
    }
}

/// A struct representing an HTTP/2 client that receives responses to its
/// requests asynchronously. Additionally, this client can be cloned and all
/// clones can issue (concurrently) requests to the server, using the same
/// underlying HTTP/2 connection.
///
/// # Example
///
/// ```no_run
/// use solicit::client::Client;
/// use solicit::http::client::CleartextConnector;
/// use std::thread;
/// use std::str;
///
/// // Connect to a server that supports HTTP/2
/// let connector = CleartextConnector::new("http2bin.org");
/// let client = Client::with_connector(connector).unwrap();
///
/// // Issue 5 requests from 5 different threads concurrently and wait for all
/// // threads to receive their response.
/// let threads: Vec<_> = (0..5).map(|i| {
///     let this = client.clone();
///     thread::spawn(move || {
///         let resp = this.get(b"/", &[]).unwrap();
///         let response = resp.recv().unwrap();
///         println!("Thread {} got response ... {}", i, response.status_code().ok().unwrap());
///         println!("The response contains the following headers:");
///         for header in response.headers.iter() {
///             println!("  {}: {}",
///                   str::from_utf8(header.name()).unwrap(),
///                   str::from_utf8(header.value()).unwrap());
///         }
///     })
/// }).collect();
///
/// let _: Vec<_> = threads.into_iter().map(|thread| thread.join()).collect();
/// ```
pub struct Client {
    /// The sender side of a channel on which a running `ClientService` expects
    /// to receive new requests, which are to be sent to the server.
    sender: Sender<WorkItem>,
}

impl Clone for Client {
    fn clone(&self) -> Client {
        self.sender.send(WorkItem::NewClient).unwrap();
        Client { sender: self.sender.clone() }
    }
}

impl Drop for Client {
    fn drop(&mut self) {
        let _ = self.sender.send(WorkItem::ClientLeft);
    }
}

impl Client {
    /// Creates a brand new HTTP/2 client. This means that a new HTTP/2
    /// connection will be established behind the scenes. A thread is spawned
    /// to handle the connection in the background, so that the thread that
    /// creates the client can use it asynchronously.
    ///
    /// # Returns
    ///
    /// A `Client` instance that allows access to the underlying HTTP/2
    /// connection on the application level. Only full requests and responses
    /// are exposed to users.
    ///
    /// The returned `Client` can be cloned and all clones will use the same
    /// underlying HTTP/2 connection. Once all cloned instances (as well as the
    /// original one) are dropped, the thread that was spawned will also exit
    /// gracefully. Any error on the underlying HTTP/2 connection also causes
    /// the thread to exit.
    ///
    /// If the HTTP/2 connection cannot be initialized returns `None`.
    pub fn with_connector<C, S, E>(connector: C) -> Result<Client, ClientConnectError<E>>
        where C: HttpConnect<Stream = S, Err = E>,
              S: TransportStream + Send + 'static,
              E: HttpConnectError + 'static
    {
        // Use the provided connector to establish a network connection...
        let client_stream = try!(connector.connect());

        // Keep a socket handle in order to shut it down once the service stops. This is required
        // because if the service decides to stop (due to all clients disconnecting) while the
        // socket is still open and the read thread waiting, it can happen that the read thread
        // (and as such the socket itself) ends up waiting indefinitely (or well, until the server
        // decides to close it), effectively leaking the socket and thread.
        let mut sck = try!(client_stream.0.try_split());

        let service = ClientService::new(client_stream);

        let Service(mut service, rx, mut recv_frame, mut send_frame) = service;

        service.on_new_client();

        // Keep a handle to the work queue to notify the service of newly read frames, making it so
        // that it never blocks on waiting for frames to read.
        let read_notify = rx.clone();
        let sender_work_queue = rx.clone();

        spawn_named("Solicit Service", move || {
            while let Ok(_) = service.run_once() {}
            debug!("Service thread halting");
            // This is the one place where it's okay to unwrap, as if the shutdown fails, there's
            // really nothing we can do to recover at this point...
            // This forces the reader thread to stop, as the socket is no longer operational.
            sck.close().expect("close socket handler");
        });

        spawn_named("Solicit Sender", move || {
            while let Ok(_) = send_frame.send_next() {
                sender_work_queue.send(WorkItem::SendData).unwrap();
            }
            debug!("Sender thread halting");
        });

        spawn_named("Solicit Reader", move || {
            while let Ok(_) = recv_frame.read_next() {
                read_notify.send(WorkItem::HandleFrame).unwrap();
            }
            debug!("Reader thread halting");
        });

        Ok(Client { sender: rx })
    }

    /// Issues a new request to the server.
    ///
    /// The request's method, path, and extra headers are provided as parameters.
    /// The headers should *never* include any meta-headers (such as `:method`).
    ///
    /// # Returns
    ///
    /// The method itself returns immediately upon queuing the request. It does
    /// not wait for the request to be transmitted nor for the response to
    /// arrive. Once the caller is interested in the final response, they can
    /// block on the returned `Receiver` end of a channel which will receive
    /// the response once generated.
    ///
    /// The `Response` instance that the channel receives will contain the full
    /// response body and is available only once the full response body has
    /// been received.
    ///
    /// If the method is unable to queue the request, it must mean that the
    /// underlying HTTP/2 connection to which this client is associated has
    /// failed and it returns `None`.
    pub fn request(&self,
                   method: &[u8],
                   path: &[u8],
                   headers: &[StaticHeader],
                   body: Option<Vec<u8>>)
                   -> Option<Receiver<StaticResponse>> {
        let (resp_tx, resp_rx): (Sender<StaticResponse>, Receiver<StaticResponse>) =
            mpsc::channel();
        // A send can only fail if the receiver is disconnected. If the send
        // fails here, it means that the service hit an error on the underlying
        // HTTP/2 connection and will never come alive again.
        let res = self.sender.send(WorkItem::Request(AsyncRequest {
            method: method.to_vec(),
            path: path.to_vec(),
            headers: headers.to_vec(),
            body: body,
            tx: resp_tx,
        }));

        match res {
            Ok(_) => Some(resp_rx),
            Err(_) => None,
        }
    }

    /// Issues a GET request to the server.
    ///
    /// A convenience wrapper around the `request` method that sets the correct
    /// method.
    pub fn get(&self, path: &[u8], headers: &[StaticHeader]) -> Option<Receiver<StaticResponse>> {
        self.request(b"GET", path, headers, None)
    }

    /// Issues a POST request to the server.
    ///
    /// Returns the receiving end of a channel where the `Response` will eventually be pushed.
    pub fn post(&self,
                path: &[u8],
                headers: &[StaticHeader],
                body: Vec<u8>)
                -> Option<Receiver<StaticResponse>> {
        self.request(b"POST", path, headers, Some(body))
    }

    /// Sends a PING to the server
    pub fn ping(&self) -> Result<(), &'static str> {
        self.sender.send(WorkItem::SendPing).map_err(|_| "Client not available")
    }
}

/// Error that occur when creating/connecting a client
#[derive(Debug)]
pub enum ClientConnectError<E>
    where E: HttpConnectError
{
    /// Some sort of io::Error
    Io(io::Error),

    /// Error from the http connector
    HttpConnector(E),
}

impl<E> ::std::error::Error for ClientConnectError<E>
    where E: HttpConnectError,
{
    fn cause(&self) -> Option<&::std::error::Error> {
        match *self {
            ClientConnectError::Io(ref err) => Some(err),
            ClientConnectError::HttpConnector(ref err) => Some(err),
        }
    }

    fn description(&self) -> &str {
        match *self {
            ClientConnectError::Io(ref err) => err.description(),
            ClientConnectError::HttpConnector(ref err) => err.description(),
        }
    }
}

impl<E> fmt::Display for ClientConnectError<E>
    where E: HttpConnectError
{
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            ClientConnectError::Io(ref err) => {
                write!(f, "I/O Error during connect: {}", err)
            },
            ClientConnectError::HttpConnector(ref err) => {
                write!(f, "Error during HTTP connect: {}", err)
            },
        }
    }
}

impl<E> From<E> for ClientConnectError<E>
    where E: HttpConnectError
{
    fn from(err: E) -> Self {
        ClientConnectError::HttpConnector(err)
    }
}

impl<E> From<io::Error> for ClientConnectError<E>
    where E: HttpConnectError
{
    fn from(err: io::Error) -> Self {
        ClientConnectError::Io(err)
    }
}