opendal/raw/http_util/
client.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::future;
21use std::mem;
22use std::ops::Deref;
23use std::str::FromStr;
24use std::sync::Arc;
25
26use futures::Future;
27use futures::TryStreamExt;
28use http::Request;
29use http::Response;
30use raw::oio::Read;
31use std::sync::LazyLock;
32
33use super::parse_content_encoding;
34use super::parse_content_length;
35use super::HttpBody;
36use crate::raw::*;
37use crate::*;
38
39/// Http client used across opendal for loading credentials.
40/// This is merely a temporary solution because reqsign requires a reqwest client to be passed.
41/// We will remove it after the next major version of reqsign, which will enable users to provide their own client.
42#[allow(dead_code)]
43pub(crate) static GLOBAL_REQWEST_CLIENT: LazyLock<reqwest::Client> =
44    LazyLock::new(reqwest::Client::new);
45
46/// HttpFetcher is a type erased [`HttpFetch`].
47pub type HttpFetcher = Arc<dyn HttpFetchDyn>;
48
49/// A HTTP client instance for OpenDAL's services.
50///
51/// # Notes
52///
53/// * A http client must support redirections that follows 3xx response.
54#[derive(Clone)]
55pub struct HttpClient {
56    fetcher: HttpFetcher,
57}
58
59/// We don't want users to know details about our clients.
60impl Debug for HttpClient {
61    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct("HttpClient").finish()
63    }
64}
65
66impl Default for HttpClient {
67    fn default() -> Self {
68        Self {
69            fetcher: Arc::new(GLOBAL_REQWEST_CLIENT.clone()),
70        }
71    }
72}
73
74impl HttpClient {
75    /// Create a new http client in async context.
76    pub fn new() -> Result<Self> {
77        Ok(Self::default())
78    }
79
80    /// Construct `Self` with given [`reqwest::Client`]
81    pub fn with(client: impl HttpFetch) -> Self {
82        let fetcher = Arc::new(client);
83        Self { fetcher }
84    }
85
86    /// Get the inner http client.
87    pub(crate) fn into_inner(self) -> HttpFetcher {
88        self.fetcher
89    }
90
91    /// Build a new http client in async context.
92    #[deprecated]
93    pub fn build(builder: reqwest::ClientBuilder) -> Result<Self> {
94        let client = builder.build().map_err(|err| {
95            Error::new(ErrorKind::Unexpected, "http client build failed").set_source(err)
96        })?;
97        let fetcher = Arc::new(client);
98        Ok(Self { fetcher })
99    }
100
101    /// Send a request and consume response.
102    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
103        let (parts, mut body) = self.fetch(req).await?.into_parts();
104        let buffer = body.read_all().await?;
105        Ok(Response::from_parts(parts, buffer))
106    }
107
108    /// Fetch a request and return a streamable [`HttpBody`].
109    ///
110    /// Services can use [`HttpBody`] as [`Access::Read`].
111    pub async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
112        self.fetcher.fetch(req).await
113    }
114}
115
116/// HttpFetch is the trait to fetch a request in async way.
117/// User should implement this trait to provide their own http client.
118pub trait HttpFetch: Send + Sync + Unpin + 'static {
119    /// Fetch a request in async way.
120    fn fetch(
121        &self,
122        req: Request<Buffer>,
123    ) -> impl Future<Output = Result<Response<HttpBody>>> + MaybeSend;
124}
125
126/// HttpFetchDyn is the dyn version of [`HttpFetch`]
127/// which make it possible to use as `Arc<dyn HttpFetchDyn>`.
128/// User should never implement this trait, but use `HttpFetch` instead.
129pub trait HttpFetchDyn: Send + Sync + Unpin + 'static {
130    /// The dyn version of [`HttpFetch::fetch`].
131    ///
132    /// This function returns a boxed future to make it object safe.
133    fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<Result<Response<HttpBody>>>;
134}
135
136impl<T: HttpFetch + ?Sized> HttpFetchDyn for T {
137    fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<Result<Response<HttpBody>>> {
138        Box::pin(self.fetch(req))
139    }
140}
141
142impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
143    async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
144        self.deref().fetch_dyn(req).await
145    }
146}
147
148impl HttpFetch for reqwest::Client {
149    async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
150        // Uri stores all string alike data in `Bytes` which means
151        // the clone here is cheap.
152        let uri = req.uri().clone();
153        let is_head = req.method() == http::Method::HEAD;
154
155        let (parts, body) = req.into_parts();
156
157        let mut req_builder = self
158            .request(
159                parts.method,
160                reqwest::Url::from_str(&uri.to_string()).expect("input request url must be valid"),
161            )
162            .headers(parts.headers);
163
164        // Client under wasm doesn't support set version.
165        #[cfg(not(target_arch = "wasm32"))]
166        {
167            req_builder = req_builder.version(parts.version);
168        }
169
170        // Don't set body if body is empty.
171        if !body.is_empty() {
172            #[cfg(not(target_arch = "wasm32"))]
173            {
174                req_builder = req_builder.body(reqwest::Body::wrap_stream(body))
175            }
176            #[cfg(target_arch = "wasm32")]
177            {
178                req_builder = req_builder.body(reqwest::Body::from(body.to_bytes()))
179            }
180        }
181
182        let mut resp = req_builder.send().await.map_err(|err| {
183            Error::new(ErrorKind::Unexpected, "send http request")
184                .with_operation("http_util::Client::send")
185                .with_context("url", uri.to_string())
186                .with_temporary(is_temporary_error(&err))
187                .set_source(err)
188        })?;
189
190        // Get content length from header so that we can check it.
191        //
192        // - If the request method is HEAD, we will ignore content length.
193        // - If response contains content_encoding, we should omit its content length.
194        let content_length = if is_head || parse_content_encoding(resp.headers())?.is_some() {
195            None
196        } else {
197            parse_content_length(resp.headers())?
198        };
199
200        let mut hr = Response::builder()
201            .status(resp.status())
202            // Insert uri into response extension so that we can fetch
203            // it later.
204            .extension(uri.clone());
205
206        // Response builder under wasm doesn't support set version.
207        #[cfg(not(target_arch = "wasm32"))]
208        {
209            hr = hr.version(resp.version());
210        }
211
212        // Swap headers directly instead of copy the entire map.
213        mem::swap(hr.headers_mut().unwrap(), resp.headers_mut());
214
215        let bs = HttpBody::new(
216            resp.bytes_stream()
217                .try_filter(|v| future::ready(!v.is_empty()))
218                .map_ok(Buffer::from)
219                .map_err(move |err| {
220                    Error::new(ErrorKind::Unexpected, "read data from http response")
221                        .with_operation("http_util::Client::send")
222                        .with_context("url", uri.to_string())
223                        .with_temporary(is_temporary_error(&err))
224                        .set_source(err)
225                }),
226            content_length,
227        );
228
229        let resp = hr.body(bs).expect("response must build succeed");
230        Ok(resp)
231    }
232}
233
234#[inline]
235fn is_temporary_error(err: &reqwest::Error) -> bool {
236    // error sending request
237    err.is_request()||
238    // request or response body error
239    err.is_body() ||
240    // error decoding response body, for example, connection reset.
241    err.is_decode()
242}