opendal/raw/http_util/
client.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::convert::Infallible;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::future;
22use std::mem;
23use std::ops::Deref;
24use std::pin::Pin;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::task::Context;
28use std::task::Poll;
29
30use bytes::Bytes;
31use futures::Future;
32use futures::TryStreamExt;
33use http::Request;
34use http::Response;
35use http_body::Frame;
36use http_body::SizeHint;
37use raw::oio::Read;
38use std::sync::LazyLock;
39
40use super::parse_content_encoding;
41use super::parse_content_length;
42use super::HttpBody;
43use crate::raw::*;
44use crate::*;
45
46/// Http client used across opendal for loading credentials.
47/// This is merely a temporary solution because reqsign requires a reqwest client to be passed.
48/// We will remove it after the next major version of reqsign, which will enable users to provide their own client.
49#[allow(dead_code)]
50pub(crate) static GLOBAL_REQWEST_CLIENT: LazyLock<reqwest::Client> =
51    LazyLock::new(reqwest::Client::new);
52
53/// HttpFetcher is a type erased [`HttpFetch`].
54pub type HttpFetcher = Arc<dyn HttpFetchDyn>;
55
56/// A HTTP client instance for OpenDAL's services.
57///
58/// # Notes
59///
60/// * A http client must support redirections that follows 3xx response.
61#[derive(Clone)]
62pub struct HttpClient {
63    fetcher: HttpFetcher,
64}
65
66/// We don't want users to know details about our clients.
67impl Debug for HttpClient {
68    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("HttpClient").finish()
70    }
71}
72
73impl Default for HttpClient {
74    fn default() -> Self {
75        Self {
76            fetcher: Arc::new(GLOBAL_REQWEST_CLIENT.clone()),
77        }
78    }
79}
80
81impl HttpClient {
82    /// Create a new http client in async context.
83    pub fn new() -> Result<Self> {
84        Ok(Self::default())
85    }
86
87    /// Construct `Self` with given [`reqwest::Client`]
88    pub fn with(client: impl HttpFetch) -> Self {
89        let fetcher = Arc::new(client);
90        Self { fetcher }
91    }
92
93    /// Get the inner http client.
94    pub(crate) fn into_inner(self) -> HttpFetcher {
95        self.fetcher
96    }
97
98    /// Build a new http client in async context.
99    #[deprecated]
100    pub fn build(builder: reqwest::ClientBuilder) -> Result<Self> {
101        let client = builder.build().map_err(|err| {
102            Error::new(ErrorKind::Unexpected, "http client build failed").set_source(err)
103        })?;
104        let fetcher = Arc::new(client);
105        Ok(Self { fetcher })
106    }
107
108    /// Send a request and consume response.
109    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
110        let (parts, mut body) = self.fetch(req).await?.into_parts();
111        let buffer = body.read_all().await?;
112        Ok(Response::from_parts(parts, buffer))
113    }
114
115    /// Fetch a request and return a streamable [`HttpBody`].
116    ///
117    /// Services can use [`HttpBody`] as [`Access::Read`].
118    pub async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
119        self.fetcher.fetch(req).await
120    }
121}
122
123/// HttpFetch is the trait to fetch a request in async way.
124/// User should implement this trait to provide their own http client.
125pub trait HttpFetch: Send + Sync + Unpin + 'static {
126    /// Fetch a request in async way.
127    fn fetch(
128        &self,
129        req: Request<Buffer>,
130    ) -> impl Future<Output = Result<Response<HttpBody>>> + MaybeSend;
131}
132
133/// HttpFetchDyn is the dyn version of [`HttpFetch`]
134/// which make it possible to use as `Arc<dyn HttpFetchDyn>`.
135/// User should never implement this trait, but use `HttpFetch` instead.
136pub trait HttpFetchDyn: Send + Sync + Unpin + 'static {
137    /// The dyn version of [`HttpFetch::fetch`].
138    ///
139    /// This function returns a boxed future to make it object safe.
140    fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<Result<Response<HttpBody>>>;
141}
142
143impl<T: HttpFetch + ?Sized> HttpFetchDyn for T {
144    fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<Result<Response<HttpBody>>> {
145        Box::pin(self.fetch(req))
146    }
147}
148
149impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
150    async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
151        self.deref().fetch_dyn(req).await
152    }
153}
154
155impl HttpFetch for reqwest::Client {
156    async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
157        // Uri stores all string alike data in `Bytes` which means
158        // the clone here is cheap.
159        let uri = req.uri().clone();
160        let is_head = req.method() == http::Method::HEAD;
161
162        let (parts, body) = req.into_parts();
163
164        let mut req_builder = self
165            .request(
166                parts.method,
167                reqwest::Url::from_str(&uri.to_string()).expect("input request url must be valid"),
168            )
169            .headers(parts.headers);
170
171        // Client under wasm doesn't support set version.
172        #[cfg(not(target_arch = "wasm32"))]
173        {
174            req_builder = req_builder.version(parts.version);
175        }
176
177        // Don't set body if body is empty.
178        if !body.is_empty() {
179            #[cfg(not(target_arch = "wasm32"))]
180            {
181                req_builder = req_builder.body(reqwest::Body::wrap(HttpBufferBody(body)))
182            }
183            #[cfg(target_arch = "wasm32")]
184            {
185                req_builder = req_builder.body(reqwest::Body::from(body.to_bytes()))
186            }
187        }
188
189        let mut resp = req_builder.send().await.map_err(|err| {
190            Error::new(ErrorKind::Unexpected, "send http request")
191                .with_operation("http_util::Client::send")
192                .with_context("url", uri.to_string())
193                .with_temporary(is_temporary_error(&err))
194                .set_source(err)
195        })?;
196
197        // Get content length from header so that we can check it.
198        //
199        // - If the request method is HEAD, we will ignore content length.
200        // - If response contains content_encoding, we should omit its content length.
201        let content_length = if is_head || parse_content_encoding(resp.headers())?.is_some() {
202            None
203        } else {
204            parse_content_length(resp.headers())?
205        };
206
207        let mut hr = Response::builder()
208            .status(resp.status())
209            // Insert uri into response extension so that we can fetch
210            // it later.
211            .extension(uri.clone());
212
213        // Response builder under wasm doesn't support set version.
214        #[cfg(not(target_arch = "wasm32"))]
215        {
216            hr = hr.version(resp.version());
217        }
218
219        // Swap headers directly instead of copy the entire map.
220        mem::swap(hr.headers_mut().unwrap(), resp.headers_mut());
221
222        let bs = HttpBody::new(
223            resp.bytes_stream()
224                .try_filter(|v| future::ready(!v.is_empty()))
225                .map_ok(Buffer::from)
226                .map_err(move |err| {
227                    Error::new(ErrorKind::Unexpected, "read data from http response")
228                        .with_operation("http_util::Client::send")
229                        .with_context("url", uri.to_string())
230                        .with_temporary(is_temporary_error(&err))
231                        .set_source(err)
232                }),
233            content_length,
234        );
235
236        let resp = hr.body(bs).expect("response must build succeed");
237        Ok(resp)
238    }
239}
240
241#[inline]
242fn is_temporary_error(err: &reqwest::Error) -> bool {
243    // error sending request
244    err.is_request()||
245    // request or response body error
246    err.is_body() ||
247    // error decoding response body, for example, connection reset.
248    err.is_decode()
249}
250
251struct HttpBufferBody(Buffer);
252
253impl http_body::Body for HttpBufferBody {
254    type Data = Bytes;
255    type Error = Infallible;
256
257    fn poll_frame(
258        mut self: Pin<&mut Self>,
259        _: &mut Context<'_>,
260    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
261        match self.0.next() {
262            Some(bs) => Poll::Ready(Some(Ok(Frame::data(bs)))),
263            None => Poll::Ready(None),
264        }
265    }
266
267    fn is_end_stream(&self) -> bool {
268        self.0.is_empty()
269    }
270
271    fn size_hint(&self) -> SizeHint {
272        SizeHint::with_exact(self.0.len() as u64)
273    }
274}