1use http::header::CONTENT_LENGTH;
19use http::header::CONTENT_TYPE;
20use http::header::{CONTENT_DISPOSITION, IF_NONE_MATCH};
21use http::HeaderName;
22use http::HeaderValue;
23use http::Request;
24use http::Response;
25use reqsign::AzureStorageCredential;
26use reqsign::AzureStorageLoader;
27use reqsign::AzureStorageSigner;
28use std::fmt;
29use std::fmt::Debug;
30use std::fmt::Formatter;
31use std::fmt::Write;
32use std::sync::Arc;
33
34use crate::raw::*;
35use crate::*;
36
37const X_MS_RENAME_SOURCE: &str = "x-ms-rename-source";
38const X_MS_VERSION: &str = "x-ms-version";
39
40pub struct AzdlsCore {
41 pub info: Arc<AccessorInfo>,
42 pub filesystem: String,
43 pub root: String,
44 pub endpoint: String,
45
46 pub loader: AzureStorageLoader,
47 pub signer: AzureStorageSigner,
48}
49
50impl Debug for AzdlsCore {
51 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
52 f.debug_struct("AzdlsCore")
53 .field("filesystem", &self.filesystem)
54 .field("root", &self.root)
55 .field("endpoint", &self.endpoint)
56 .finish_non_exhaustive()
57 }
58}
59
60impl AzdlsCore {
61 async fn load_credential(&self) -> Result<AzureStorageCredential> {
62 let cred = self
63 .loader
64 .load()
65 .await
66 .map_err(new_request_credential_error)?;
67
68 if let Some(cred) = cred {
69 Ok(cred)
70 } else {
71 Err(Error::new(
72 ErrorKind::ConfigInvalid,
73 "no valid credential found",
74 ))
75 }
76 }
77
78 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
79 let cred = self.load_credential().await?;
80 req.headers_mut().insert(
82 HeaderName::from_static(X_MS_VERSION),
83 HeaderValue::from_static("2022-11-02"),
89 );
90 self.signer.sign(req, &cred).map_err(new_request_sign_error)
91 }
92
93 #[inline]
94 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
95 self.info.http_client().send(req).await
96 }
97}
98
99impl AzdlsCore {
100 pub async fn azdls_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
101 let p = build_abs_path(&self.root, path);
102
103 let url = format!(
104 "{}/{}/{}",
105 self.endpoint,
106 self.filesystem,
107 percent_encode_path(&p)
108 );
109
110 let mut req = Request::get(&url);
111
112 if !range.is_full() {
113 req = req.header(http::header::RANGE, range.to_header());
114 }
115
116 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
117
118 self.sign(&mut req).await?;
119 self.info.http_client().fetch(req).await
120 }
121
122 pub fn azdls_create_request(
126 &self,
127 path: &str,
128 resource: &str,
129 args: &OpWrite,
130 body: Buffer,
131 ) -> Result<Request<Buffer>> {
132 let p = build_abs_path(&self.root, path)
133 .trim_end_matches('/')
134 .to_string();
135
136 let url = format!(
137 "{}/{}/{}?resource={resource}",
138 self.endpoint,
139 self.filesystem,
140 percent_encode_path(&p)
141 );
142
143 let mut req = Request::put(&url);
144
145 req = req.header(CONTENT_LENGTH, 0);
147
148 if let Some(ty) = args.content_type() {
149 req = req.header(CONTENT_TYPE, ty)
150 }
151
152 if let Some(pos) = args.content_disposition() {
153 req = req.header(CONTENT_DISPOSITION, pos)
154 }
155
156 if args.if_not_exists() {
157 req = req.header(IF_NONE_MATCH, "*")
158 }
159
160 if let Some(v) = args.if_none_match() {
161 req = req.header(IF_NONE_MATCH, v)
162 }
163
164 let req = req.body(body).map_err(new_request_build_error)?;
166
167 Ok(req)
168 }
169
170 pub async fn azdls_rename(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
171 let source = build_abs_path(&self.root, from);
172 let target = build_abs_path(&self.root, to);
173
174 let url = format!(
175 "{}/{}/{}",
176 self.endpoint,
177 self.filesystem,
178 percent_encode_path(&target)
179 );
180
181 let mut req = Request::put(&url)
182 .header(
183 X_MS_RENAME_SOURCE,
184 format!("/{}/{}", self.filesystem, percent_encode_path(&source)),
185 )
186 .header(CONTENT_LENGTH, 0)
187 .body(Buffer::new())
188 .map_err(new_request_build_error)?;
189
190 self.sign(&mut req).await?;
191 self.send(req).await
192 }
193
194 pub fn azdls_update_request(
196 &self,
197 path: &str,
198 size: Option<u64>,
199 position: u64,
200 body: Buffer,
201 ) -> Result<Request<Buffer>> {
202 let p = build_abs_path(&self.root, path);
203
204 let url = format!(
207 "{}/{}/{}?action=append&close=true&flush=true&position={}",
208 self.endpoint,
209 self.filesystem,
210 percent_encode_path(&p),
211 position
212 );
213
214 let mut req = Request::patch(&url);
215
216 if let Some(size) = size {
217 req = req.header(CONTENT_LENGTH, size)
218 }
219
220 let req = req.body(body).map_err(new_request_build_error)?;
222
223 Ok(req)
224 }
225
226 pub async fn azdls_get_properties(&self, path: &str) -> Result<Response<Buffer>> {
227 let p = build_abs_path(&self.root, path)
228 .trim_end_matches('/')
229 .to_string();
230
231 let url = format!(
232 "{}/{}/{}?action=getStatus",
233 self.endpoint,
234 self.filesystem,
235 percent_encode_path(&p)
236 );
237
238 let req = Request::head(&url);
239
240 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
241
242 self.sign(&mut req).await?;
243 self.info.http_client().send(req).await
244 }
245
246 pub async fn azdls_delete(&self, path: &str) -> Result<Response<Buffer>> {
247 let p = build_abs_path(&self.root, path)
248 .trim_end_matches('/')
249 .to_string();
250
251 let url = format!(
252 "{}/{}/{}",
253 self.endpoint,
254 self.filesystem,
255 percent_encode_path(&p)
256 );
257
258 let req = Request::delete(&url);
259
260 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
261
262 self.sign(&mut req).await?;
263 self.send(req).await
264 }
265
266 pub async fn azdls_list(
267 &self,
268 path: &str,
269 continuation: &str,
270 limit: Option<usize>,
271 ) -> Result<Response<Buffer>> {
272 let p = build_abs_path(&self.root, path)
273 .trim_end_matches('/')
274 .to_string();
275
276 let mut url = format!(
277 "{}/{}?resource=filesystem&recursive=false",
278 self.endpoint, self.filesystem
279 );
280 if !p.is_empty() {
281 write!(url, "&directory={}", percent_encode_path(&p))
282 .expect("write into string must succeed");
283 }
284 if let Some(limit) = limit {
285 write!(url, "&maxResults={limit}").expect("write into string must succeed");
286 }
287 if !continuation.is_empty() {
288 write!(url, "&continuation={}", percent_encode_path(continuation))
289 .expect("write into string must succeed");
290 }
291
292 let mut req = Request::get(&url)
293 .body(Buffer::new())
294 .map_err(new_request_build_error)?;
295
296 self.sign(&mut req).await?;
297 self.send(req).await
298 }
299
300 pub async fn azdls_ensure_parent_path(&self, path: &str) -> Result<Option<Response<Buffer>>> {
301 let abs_target_path = path.trim_end_matches('/').to_string();
302 let abs_target_path = abs_target_path.as_str();
303 let mut parts: Vec<&str> = abs_target_path
304 .split('/')
305 .filter(|x| !x.is_empty())
306 .collect();
307
308 if !parts.is_empty() {
309 parts.pop();
310 }
311
312 if !parts.is_empty() {
313 let parent_path = parts.join("/");
314 let mut req = self.azdls_create_request(
315 &parent_path,
316 "directory",
317 &OpWrite::default(),
318 Buffer::new(),
319 )?;
320
321 self.sign(&mut req).await?;
322
323 Ok(Some(self.send(req).await?))
324 } else {
325 Ok(None)
326 }
327 }
328}