opendal/raw/http_util/
client.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::future;
21use std::mem;
22use std::ops::Deref;
23use std::str::FromStr;
24use std::sync::Arc;
25
26use futures::Future;
27use futures::TryStreamExt;
28use http::Request;
29use http::Response;
30use raw::oio::Read;
31use std::sync::LazyLock;
32
33use super::parse_content_encoding;
34use super::parse_content_length;
35use super::HttpBody;
36use crate::raw::*;
37use crate::*;
38
39#[allow(dead_code)]
43pub(crate) static GLOBAL_REQWEST_CLIENT: LazyLock<reqwest::Client> =
44 LazyLock::new(reqwest::Client::new);
45
46pub type HttpFetcher = Arc<dyn HttpFetchDyn>;
48
49#[derive(Clone)]
55pub struct HttpClient {
56 fetcher: HttpFetcher,
57}
58
59impl Debug for HttpClient {
61 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct("HttpClient").finish()
63 }
64}
65
66impl Default for HttpClient {
67 fn default() -> Self {
68 Self {
69 fetcher: Arc::new(GLOBAL_REQWEST_CLIENT.clone()),
70 }
71 }
72}
73
74impl HttpClient {
75 pub fn new() -> Result<Self> {
77 Ok(Self::default())
78 }
79
80 pub fn with(client: impl HttpFetch) -> Self {
82 let fetcher = Arc::new(client);
83 Self { fetcher }
84 }
85
86 pub(crate) fn into_inner(self) -> HttpFetcher {
88 self.fetcher
89 }
90
91 #[deprecated]
93 pub fn build(builder: reqwest::ClientBuilder) -> Result<Self> {
94 let client = builder.build().map_err(|err| {
95 Error::new(ErrorKind::Unexpected, "http client build failed").set_source(err)
96 })?;
97 let fetcher = Arc::new(client);
98 Ok(Self { fetcher })
99 }
100
101 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
103 let (parts, mut body) = self.fetch(req).await?.into_parts();
104 let buffer = body.read_all().await?;
105 Ok(Response::from_parts(parts, buffer))
106 }
107
108 pub async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
112 self.fetcher.fetch(req).await
113 }
114}
115
116pub trait HttpFetch: Send + Sync + Unpin + 'static {
119 fn fetch(
121 &self,
122 req: Request<Buffer>,
123 ) -> impl Future<Output = Result<Response<HttpBody>>> + MaybeSend;
124}
125
126pub trait HttpFetchDyn: Send + Sync + Unpin + 'static {
130 fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<Result<Response<HttpBody>>>;
134}
135
136impl<T: HttpFetch + ?Sized> HttpFetchDyn for T {
137 fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<Result<Response<HttpBody>>> {
138 Box::pin(self.fetch(req))
139 }
140}
141
142impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
143 async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
144 self.deref().fetch_dyn(req).await
145 }
146}
147
148impl HttpFetch for reqwest::Client {
149 async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
150 let uri = req.uri().clone();
153 let is_head = req.method() == http::Method::HEAD;
154
155 let (parts, body) = req.into_parts();
156
157 let mut req_builder = self
158 .request(
159 parts.method,
160 reqwest::Url::from_str(&uri.to_string()).expect("input request url must be valid"),
161 )
162 .headers(parts.headers);
163
164 #[cfg(not(target_arch = "wasm32"))]
166 {
167 req_builder = req_builder.version(parts.version);
168 }
169
170 if !body.is_empty() {
172 #[cfg(not(target_arch = "wasm32"))]
173 {
174 req_builder = req_builder.body(reqwest::Body::wrap_stream(body))
175 }
176 #[cfg(target_arch = "wasm32")]
177 {
178 req_builder = req_builder.body(reqwest::Body::from(body.to_bytes()))
179 }
180 }
181
182 let mut resp = req_builder.send().await.map_err(|err| {
183 Error::new(ErrorKind::Unexpected, "send http request")
184 .with_operation("http_util::Client::send")
185 .with_context("url", uri.to_string())
186 .with_temporary(is_temporary_error(&err))
187 .set_source(err)
188 })?;
189
190 let content_length = if is_head || parse_content_encoding(resp.headers())?.is_some() {
195 None
196 } else {
197 parse_content_length(resp.headers())?
198 };
199
200 let mut hr = Response::builder()
201 .status(resp.status())
202 .extension(uri.clone());
205
206 #[cfg(not(target_arch = "wasm32"))]
208 {
209 hr = hr.version(resp.version());
210 }
211
212 mem::swap(hr.headers_mut().unwrap(), resp.headers_mut());
214
215 let bs = HttpBody::new(
216 resp.bytes_stream()
217 .try_filter(|v| future::ready(!v.is_empty()))
218 .map_ok(Buffer::from)
219 .map_err(move |err| {
220 Error::new(ErrorKind::Unexpected, "read data from http response")
221 .with_operation("http_util::Client::send")
222 .with_context("url", uri.to_string())
223 .with_temporary(is_temporary_error(&err))
224 .set_source(err)
225 }),
226 content_length,
227 );
228
229 let resp = hr.body(bs).expect("response must build succeed");
230 Ok(resp)
231 }
232}
233
234#[inline]
235fn is_temporary_error(err: &reqwest::Error) -> bool {
236 err.is_request()||
238 err.is_body() ||
240 err.is_decode()
242}