Remove unsafe usage by taking advantage of new Decoder::unwrap function.

This commit is contained in:
Drake Tetreault
2020-01-16 17:27:52 -08:00
committed by Martin Algesten
parent 787b11b1e4
commit af6491cd59
4 changed files with 50 additions and 89 deletions

View File

@@ -1,3 +1,4 @@
#![forbid(unsafe_code)]
#![warn(clippy::all)] #![warn(clippy::all)]
//! ureq is a minimal request library. //! ureq is a minimal request library.
//! //!

View File

@@ -1,7 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Read, Result as IoResult}; use std::io::{Read, Result as IoResult};
use crate::stream::Stream; use crate::stream::{ReclaimStream, Stream};
use crate::unit::Unit; use crate::unit::Unit;
use url::Url; use url::Url;
@@ -74,40 +74,27 @@ impl PoolKey {
/// read is exhausted (reached a 0). /// read is exhausted (reached a 0).
/// ///
/// *Internal API* /// *Internal API*
pub(crate) struct PoolReturnRead<R: Read + Sized> { pub(crate) struct PoolReturnRead<R: Read + Sized + ReclaimStream> {
// unit that contains the agent where we want to return the reader. // unit that contains the agent where we want to return the reader.
unit: Option<Unit>, unit: Option<Unit>,
// pointer to underlying stream.
// this pointer forces the entire PoolReturnRead to be !Sync and !Send
// that's a good thing, because the pool return logic is certainly not
// thread safe.
stream: *mut Stream,
// wrapped reader around the same stream // wrapped reader around the same stream
reader: Option<R>, reader: Option<R>,
} }
impl<R: Read + Sized> PoolReturnRead<R> { impl<R: Read + Sized + ReclaimStream> PoolReturnRead<R> {
pub fn new(unit: Option<Unit>, stream: *mut Stream, reader: R) -> Self { pub fn new(unit: Option<Unit>, reader: R) -> Self {
PoolReturnRead { PoolReturnRead {
unit, unit,
stream,
reader: Some(reader), reader: Some(reader),
} }
} }
fn return_connection(&mut self) { fn return_connection(&mut self) {
// guard we only do this once. // guard we only do this once.
if let Some(unit) = self.unit.take() { if let (Some(unit), Some(reader)) = (self.unit.take(), self.reader.take()) {
// this frees up the wrapper type around the Stream so
// we can safely bring the stream pointer back.
self.reader.take();
if self.stream.is_null() {
return;
}
let state = &mut unit.agent.lock().unwrap(); let state = &mut unit.agent.lock().unwrap();
// bring back stream here to either go into pool or dealloc // bring back stream here to either go into pool or dealloc
let stream = unsafe { *Box::from_raw(self.stream) }; let stream = reader.reclaim_stream();
self.stream = ::std::ptr::null_mut();
if let Some(agent) = state.as_mut() { if let Some(agent) = state.as_mut() {
if !stream.is_poolable() { if !stream.is_poolable() {
// just let it deallocate // just let it deallocate
@@ -128,7 +115,7 @@ impl<R: Read + Sized> PoolReturnRead<R> {
} }
} }
impl<R: Read + Sized> Read for PoolReturnRead<R> { impl<R: Read + Sized + ReclaimStream> Read for PoolReturnRead<R> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
let amount = self.do_read(buf)?; let amount = self.do_read(buf)?;
// only if the underlying reader is exhausted can we send a new // only if the underlying reader is exhausted can we send a new
@@ -140,7 +127,7 @@ impl<R: Read + Sized> Read for PoolReturnRead<R> {
} }
} }
impl<R: Read + Sized> Drop for PoolReturnRead<R> { impl<R: Read + Sized + ReclaimStream> Drop for PoolReturnRead<R> {
fn drop(&mut self) { fn drop(&mut self) {
self.return_connection(); self.return_connection();
} }

View File

@@ -6,7 +6,7 @@ use chunked_transfer::Decoder as ChunkDecoder;
use crate::error::Error; use crate::error::Error;
use crate::header::Header; use crate::header::Header;
use crate::pool::PoolReturnRead; use crate::pool::PoolReturnRead;
use crate::stream::Stream; use crate::stream::{ReclaimStream, Stream};
use crate::unit::Unit; use crate::unit::Unit;
#[cfg(feature = "json")] #[cfg(feature = "json")]
@@ -304,29 +304,17 @@ impl Response {
.and_then(|l| l.parse::<usize>().ok()) .and_then(|l| l.parse::<usize>().ok())
}; };
let stream = Box::new(self.stream.expect("No reader in response?!")); let stream = self.stream.expect("No reader in response?!");
let stream_ptr = Box::into_raw(stream);
let mut reclaiming_read = ReclaimingRead {
stream: stream_ptr,
dealloc: false,
};
let unit = self.unit; let unit = self.unit;
match (use_chunked, limit_bytes) { match (use_chunked, limit_bytes) {
(true, _) => Box::new(PoolReturnRead::new( (true, _) => {
unit, Box::new(PoolReturnRead::new(unit, ChunkDecoder::new(stream))) as Box<dyn Read>
stream_ptr,
ChunkDecoder::new(reclaiming_read),
)) as Box<dyn Read>,
(false, Some(len)) => Box::new(PoolReturnRead::new(
unit,
stream_ptr,
LimitedRead::new(reclaiming_read, len),
)),
(false, None) => {
reclaiming_read.dealloc = true; // dealloc when read drops.
Box::new(reclaiming_read)
} }
(false, Some(len)) => {
Box::new(PoolReturnRead::new(unit, LimitedRead::new(stream, len)))
}
(false, None) => Box::new(stream),
} }
} }
@@ -591,57 +579,15 @@ fn read_next_line<R: Read>(reader: &mut R) -> IoResult<String> {
} }
} }
/// Read Wrapper around an (unsafe) pointer to a Stream. /// Limits a `Read` to a content size (as set by a "Content-Length" header).
/// struct LimitedRead<R> {
/// *Internal API* reader: R,
///
/// The reason for this is that we wrap our reader in `ChunkDecoder::new` and
/// that api provides no way for us to get the underlying stream back. We need
/// to get the stream both for sending responses and for pooling.
pub(crate) struct ReclaimingRead {
// this pointer forces ReclaimingRead to be !Send and !Sync. That's a good
// thing, cause passing this reader around threads would not be safe.
stream: *mut Stream,
dealloc: bool, // whether we are to dealloc stream on drop
}
impl Read for ReclaimingRead {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
unsafe {
if self.stream.is_null() {
return Ok(0);
}
let amount = (*self.stream).read(buf)?;
if amount == 0 {
if self.dealloc {
let _stream = Box::from_raw(self.stream);
}
self.stream = ::std::ptr::null_mut();
}
Ok(amount)
}
}
}
impl Drop for ReclaimingRead {
fn drop(&mut self) {
if self.dealloc && !self.stream.is_null() {
unsafe {
let _stream = Box::from_raw(self.stream);
}
}
}
}
/// Limits a ReclaimingRead to a content size (as set by a "Content-Length" header).
struct LimitedRead {
reader: ReclaimingRead,
limit: usize, limit: usize,
position: usize, position: usize,
} }
impl LimitedRead { impl<R> LimitedRead<R> {
fn new(reader: ReclaimingRead, limit: usize) -> Self { fn new(reader: R, limit: usize) -> Self {
LimitedRead { LimitedRead {
reader, reader,
limit, limit,
@@ -650,7 +596,7 @@ impl LimitedRead {
} }
} }
impl Read for LimitedRead { impl<R: Read> Read for LimitedRead<R> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
let left = self.limit - self.position; let left = self.limit - self.position;
if left == 0 { if left == 0 {
@@ -671,6 +617,12 @@ impl Read for LimitedRead {
} }
} }
impl<R: ReclaimStream> ReclaimStream for LimitedRead<R> {
fn reclaim_stream(self) -> Stream {
self.reader.reclaim_stream()
}
}
/// Extract the charset from a "Content-Type" header. /// Extract the charset from a "Content-Type" header.
/// ///
/// "Content-Type: text/plain; charset=iso8859-1" -> "iso8859-1" /// "Content-Type: text/plain; charset=iso8859-1" -> "iso8859-1"

View File

@@ -5,6 +5,8 @@ use std::net::ToSocketAddrs;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use chunked_transfer::Decoder as ChunkDecoder;
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
use rustls::ClientSession; use rustls::ClientSession;
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
@@ -119,6 +121,25 @@ impl Read for Stream {
} }
#[cfg(all(feature = "tls", not(feature = "native-tls")))] #[cfg(all(feature = "tls", not(feature = "native-tls")))]
pub(crate) trait ReclaimStream {
fn reclaim_stream(self) -> Stream;
}
impl ReclaimStream for Stream {
fn reclaim_stream(self) -> Stream {
self
}
}
impl<R: ReclaimStream> ReclaimStream for ChunkDecoder<R>
where
R: Read,
{
fn reclaim_stream(self) -> Stream {
self.into_inner().reclaim_stream()
}
}
fn read_https( fn read_https(
stream: &mut StreamOwned<ClientSession, TcpStream>, stream: &mut StreamOwned<ClientSession, TcpStream>,
buf: &mut [u8], buf: &mut [u8],