From af6491cd595269dee6f92753fbf02af1f6918fac Mon Sep 17 00:00:00 2001 From: Drake Tetreault Date: Thu, 16 Jan 2020 17:27:52 -0800 Subject: [PATCH] Remove unsafe usage by taking advantage of new Decoder::unwrap function. --- src/lib.rs | 1 + src/pool.rs | 29 +++++----------- src/response.rs | 88 +++++++++++-------------------------------------- src/stream.rs | 21 ++++++++++++ 4 files changed, 50 insertions(+), 89 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ece4c56..e922c11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![forbid(unsafe_code)] #![warn(clippy::all)] //! ureq is a minimal request library. //! diff --git a/src/pool.rs b/src/pool.rs index 995c2a1..0dce5ca 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::io::{Read, Result as IoResult}; -use crate::stream::Stream; +use crate::stream::{ReclaimStream, Stream}; use crate::unit::Unit; use url::Url; @@ -74,40 +74,27 @@ impl PoolKey { /// read is exhausted (reached a 0). /// /// *Internal API* -pub(crate) struct PoolReturnRead { +pub(crate) struct PoolReturnRead { // unit that contains the agent where we want to return the reader. unit: Option, - // pointer to underlying stream. - // this pointer forces the entire PoolReturnRead to be !Sync and !Send - // that's a good thing, because the pool return logic is certainly not - // thread safe. - stream: *mut Stream, // wrapped reader around the same stream reader: Option, } -impl PoolReturnRead { - pub fn new(unit: Option, stream: *mut Stream, reader: R) -> Self { +impl PoolReturnRead { + pub fn new(unit: Option, reader: R) -> Self { PoolReturnRead { unit, - stream, reader: Some(reader), } } fn return_connection(&mut self) { // guard we only do this once. - if let Some(unit) = self.unit.take() { - // this frees up the wrapper type around the Stream so - // we can safely bring the stream pointer back. - self.reader.take(); - if self.stream.is_null() { - return; - } + if let (Some(unit), Some(reader)) = (self.unit.take(), self.reader.take()) { let state = &mut unit.agent.lock().unwrap(); // bring back stream here to either go into pool or dealloc - let stream = unsafe { *Box::from_raw(self.stream) }; - self.stream = ::std::ptr::null_mut(); + let stream = reader.reclaim_stream(); if let Some(agent) = state.as_mut() { if !stream.is_poolable() { // just let it deallocate @@ -128,7 +115,7 @@ impl PoolReturnRead { } } -impl Read for PoolReturnRead { +impl Read for PoolReturnRead { fn read(&mut self, buf: &mut [u8]) -> IoResult { let amount = self.do_read(buf)?; // only if the underlying reader is exhausted can we send a new @@ -140,7 +127,7 @@ impl Read for PoolReturnRead { } } -impl Drop for PoolReturnRead { +impl Drop for PoolReturnRead { fn drop(&mut self) { self.return_connection(); } diff --git a/src/response.rs b/src/response.rs index d664ce9..6dcf5e1 100644 --- a/src/response.rs +++ b/src/response.rs @@ -6,7 +6,7 @@ use chunked_transfer::Decoder as ChunkDecoder; use crate::error::Error; use crate::header::Header; use crate::pool::PoolReturnRead; -use crate::stream::Stream; +use crate::stream::{ReclaimStream, Stream}; use crate::unit::Unit; #[cfg(feature = "json")] @@ -304,29 +304,17 @@ impl Response { .and_then(|l| l.parse::().ok()) }; - let stream = Box::new(self.stream.expect("No reader in response?!")); - let stream_ptr = Box::into_raw(stream); - let mut reclaiming_read = ReclaimingRead { - stream: stream_ptr, - dealloc: false, - }; + let stream = self.stream.expect("No reader in response?!"); let unit = self.unit; match (use_chunked, limit_bytes) { - (true, _) => Box::new(PoolReturnRead::new( - unit, - stream_ptr, - ChunkDecoder::new(reclaiming_read), - )) as Box, - (false, Some(len)) => Box::new(PoolReturnRead::new( - unit, - stream_ptr, - LimitedRead::new(reclaiming_read, len), - )), - (false, None) => { - reclaiming_read.dealloc = true; // dealloc when read drops. - Box::new(reclaiming_read) + (true, _) => { + Box::new(PoolReturnRead::new(unit, ChunkDecoder::new(stream))) as Box } + (false, Some(len)) => { + Box::new(PoolReturnRead::new(unit, LimitedRead::new(stream, len))) + } + (false, None) => Box::new(stream), } } @@ -591,57 +579,15 @@ fn read_next_line(reader: &mut R) -> IoResult { } } -/// Read Wrapper around an (unsafe) pointer to a Stream. -/// -/// *Internal API* -/// -/// The reason for this is that we wrap our reader in `ChunkDecoder::new` and -/// that api provides no way for us to get the underlying stream back. We need -/// to get the stream both for sending responses and for pooling. -pub(crate) struct ReclaimingRead { - // this pointer forces ReclaimingRead to be !Send and !Sync. That's a good - // thing, cause passing this reader around threads would not be safe. - stream: *mut Stream, - dealloc: bool, // whether we are to dealloc stream on drop -} - -impl Read for ReclaimingRead { - fn read(&mut self, buf: &mut [u8]) -> IoResult { - unsafe { - if self.stream.is_null() { - return Ok(0); - } - let amount = (*self.stream).read(buf)?; - if amount == 0 { - if self.dealloc { - let _stream = Box::from_raw(self.stream); - } - self.stream = ::std::ptr::null_mut(); - } - Ok(amount) - } - } -} - -impl Drop for ReclaimingRead { - fn drop(&mut self) { - if self.dealloc && !self.stream.is_null() { - unsafe { - let _stream = Box::from_raw(self.stream); - } - } - } -} - -/// Limits a ReclaimingRead to a content size (as set by a "Content-Length" header). -struct LimitedRead { - reader: ReclaimingRead, +/// Limits a `Read` to a content size (as set by a "Content-Length" header). +struct LimitedRead { + reader: R, limit: usize, position: usize, } -impl LimitedRead { - fn new(reader: ReclaimingRead, limit: usize) -> Self { +impl LimitedRead { + fn new(reader: R, limit: usize) -> Self { LimitedRead { reader, limit, @@ -650,7 +596,7 @@ impl LimitedRead { } } -impl Read for LimitedRead { +impl Read for LimitedRead { fn read(&mut self, buf: &mut [u8]) -> IoResult { let left = self.limit - self.position; if left == 0 { @@ -671,6 +617,12 @@ impl Read for LimitedRead { } } +impl ReclaimStream for LimitedRead { + fn reclaim_stream(self) -> Stream { + self.reader.reclaim_stream() + } +} + /// Extract the charset from a "Content-Type" header. /// /// "Content-Type: text/plain; charset=iso8859-1" -> "iso8859-1" diff --git a/src/stream.rs b/src/stream.rs index 4b57673..294864c 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -5,6 +5,8 @@ use std::net::ToSocketAddrs; use std::time::Duration; use std::time::Instant; +use chunked_transfer::Decoder as ChunkDecoder; + #[cfg(feature = "tls")] use rustls::ClientSession; #[cfg(feature = "tls")] @@ -119,6 +121,25 @@ impl Read for Stream { } #[cfg(all(feature = "tls", not(feature = "native-tls")))] +pub(crate) trait ReclaimStream { + fn reclaim_stream(self) -> Stream; +} + +impl ReclaimStream for Stream { + fn reclaim_stream(self) -> Stream { + self + } +} + +impl ReclaimStream for ChunkDecoder +where + R: Read, +{ + fn reclaim_stream(self) -> Stream { + self.into_inner().reclaim_stream() + } +} + fn read_https( stream: &mut StreamOwned, buf: &mut [u8],