1use http::header::CONTENT_DISPOSITION;
19use http::header::CONTENT_LENGTH;
20use http::header::CONTENT_TYPE;
21use http::header::RANGE;
22use http::HeaderName;
23use http::HeaderValue;
24use http::Request;
25use http::Response;
26use http::StatusCode;
27use reqsign::AzureStorageCredential;
28use reqsign::AzureStorageLoader;
29use reqsign::AzureStorageSigner;
30use std::collections::VecDeque;
31use std::fmt::Debug;
32use std::fmt::Formatter;
33use std::sync::Arc;
34
35use super::error::parse_error;
36use crate::raw::*;
37use crate::*;
38
39const X_MS_VERSION: &str = "x-ms-version";
40const X_MS_WRITE: &str = "x-ms-write";
41const X_MS_FILE_RENAME_SOURCE: &str = "x-ms-file-rename-source";
42const X_MS_CONTENT_LENGTH: &str = "x-ms-content-length";
43const X_MS_TYPE: &str = "x-ms-type";
44const X_MS_FILE_RENAME_REPLACE_IF_EXISTS: &str = "x-ms-file-rename-replace-if-exists";
45
46pub struct AzfileCore {
47 pub info: Arc<AccessorInfo>,
48 pub root: String,
49 pub endpoint: String,
50 pub share_name: String,
51 pub loader: AzureStorageLoader,
52 pub signer: AzureStorageSigner,
53}
54
55impl Debug for AzfileCore {
56 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct("AzfileCore")
58 .field("root", &self.root)
59 .field("endpoint", &self.endpoint)
60 .field("share_name", &self.share_name)
61 .finish_non_exhaustive()
62 }
63}
64
65impl AzfileCore {
66 async fn load_credential(&self) -> Result<AzureStorageCredential> {
67 let cred = self
68 .loader
69 .load()
70 .await
71 .map_err(new_request_credential_error)?;
72
73 if let Some(cred) = cred {
74 Ok(cred)
75 } else {
76 Err(Error::new(
77 ErrorKind::ConfigInvalid,
78 "no valid credential found",
79 ))
80 }
81 }
82
83 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
84 let cred = self.load_credential().await?;
85 req.headers_mut().insert(
87 HeaderName::from_static(X_MS_VERSION),
88 HeaderValue::from_static("2022-11-02"),
90 );
91 self.signer.sign(req, &cred).map_err(new_request_sign_error)
92 }
93
94 #[inline]
95 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
96 self.info.http_client().send(req).await
97 }
98
99 pub async fn azfile_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
100 let p = build_abs_path(&self.root, path);
101
102 let url = format!(
103 "{}/{}/{}",
104 self.endpoint,
105 self.share_name,
106 percent_encode_path(&p)
107 );
108
109 let mut req = Request::get(&url);
110
111 if !range.is_full() {
112 req = req.header(RANGE, range.to_header());
113 }
114
115 let req = req.extension(Operation::Read);
116
117 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
118 self.sign(&mut req).await?;
119 self.info.http_client().fetch(req).await
120 }
121
122 pub async fn azfile_create_file(
123 &self,
124 path: &str,
125 size: usize,
126 args: &OpWrite,
127 ) -> Result<Response<Buffer>> {
128 let p = build_abs_path(&self.root, path)
129 .trim_start_matches('/')
130 .to_string();
131 let url = format!(
132 "{}/{}/{}",
133 self.endpoint,
134 self.share_name,
135 percent_encode_path(&p)
136 );
137
138 let mut req = Request::put(&url);
139
140 req = req.header(X_MS_CONTENT_LENGTH, size);
143
144 req = req.header(X_MS_TYPE, "file");
145
146 req = req.header(CONTENT_LENGTH, 0);
148
149 if let Some(ty) = args.content_type() {
150 req = req.header(CONTENT_TYPE, ty);
151 }
152
153 if let Some(pos) = args.content_disposition() {
154 req = req.header(CONTENT_DISPOSITION, pos);
155 }
156
157 let req = req.extension(Operation::Write);
158
159 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
160 self.sign(&mut req).await?;
161 self.send(req).await
162 }
163
164 pub async fn azfile_update(
165 &self,
166 path: &str,
167 size: u64,
168 position: u64,
169 body: Buffer,
170 ) -> Result<Response<Buffer>> {
171 let p = build_abs_path(&self.root, path)
172 .trim_start_matches('/')
173 .to_string();
174
175 let url = format!(
176 "{}/{}/{}?comp=range",
177 self.endpoint,
178 self.share_name,
179 percent_encode_path(&p)
180 );
181
182 let mut req = Request::put(&url);
183
184 req = req.header(CONTENT_LENGTH, size);
185
186 req = req.header(X_MS_WRITE, "update");
187
188 req = req.header(
189 RANGE,
190 BytesRange::from(position..position + size).to_header(),
191 );
192
193 let req = req.extension(Operation::Write);
194
195 let mut req = req.body(body).map_err(new_request_build_error)?;
196 self.sign(&mut req).await?;
197 self.send(req).await
198 }
199
200 pub async fn azfile_get_file_properties(&self, path: &str) -> Result<Response<Buffer>> {
201 let p = build_abs_path(&self.root, path);
202 let url = format!(
203 "{}/{}/{}",
204 self.endpoint,
205 self.share_name,
206 percent_encode_path(&p)
207 );
208
209 let req = Request::head(&url);
210
211 let req = req.extension(Operation::Stat);
212
213 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
214 self.sign(&mut req).await?;
215 self.send(req).await
216 }
217
218 pub async fn azfile_get_directory_properties(&self, path: &str) -> Result<Response<Buffer>> {
219 let p = build_abs_path(&self.root, path);
220
221 let url = format!(
222 "{}/{}/{}?restype=directory",
223 self.endpoint,
224 self.share_name,
225 percent_encode_path(&p)
226 );
227
228 let req = Request::head(&url);
229
230 let req = req.extension(Operation::Stat);
231
232 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
233 self.sign(&mut req).await?;
234 self.send(req).await
235 }
236
237 pub async fn azfile_rename(&self, path: &str, new_path: &str) -> Result<Response<Buffer>> {
238 let p = build_abs_path(&self.root, path)
239 .trim_start_matches('/')
240 .to_string();
241
242 let new_p = build_abs_path(&self.root, new_path)
243 .trim_start_matches('/')
244 .to_string();
245
246 let url = if path.ends_with('/') {
247 format!(
248 "{}/{}/{}?restype=directory&comp=rename",
249 self.endpoint,
250 self.share_name,
251 percent_encode_path(&new_p)
252 )
253 } else {
254 format!(
255 "{}/{}/{}?comp=rename",
256 self.endpoint,
257 self.share_name,
258 percent_encode_path(&new_p)
259 )
260 };
261
262 let mut req = Request::put(&url);
263
264 req = req.header(CONTENT_LENGTH, 0);
265
266 let source_url = format!(
272 "{}/{}/{}",
273 self.endpoint,
274 self.share_name,
275 percent_encode_path(&p)
276 );
277
278 req = req.header(X_MS_FILE_RENAME_SOURCE, &source_url);
279
280 req = req.header(X_MS_FILE_RENAME_REPLACE_IF_EXISTS, "true");
281
282 let req = req.extension(Operation::Rename);
283
284 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
285 self.sign(&mut req).await?;
286 self.send(req).await
287 }
288
289 pub async fn azfile_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
290 let p = build_abs_path(&self.root, path)
291 .trim_start_matches('/')
292 .to_string();
293
294 let url = format!(
295 "{}/{}/{}?restype=directory",
296 self.endpoint,
297 self.share_name,
298 percent_encode_path(&p)
299 );
300
301 let mut req = Request::put(&url);
302
303 req = req.header(CONTENT_LENGTH, 0);
304
305 let req = req.extension(Operation::CreateDir);
306
307 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
308 self.sign(&mut req).await?;
309 self.send(req).await
310 }
311
312 pub async fn azfile_delete_file(&self, path: &str) -> Result<Response<Buffer>> {
313 let p = build_abs_path(&self.root, path)
314 .trim_start_matches('/')
315 .to_string();
316
317 let url = format!(
318 "{}/{}/{}",
319 self.endpoint,
320 self.share_name,
321 percent_encode_path(&p)
322 );
323
324 let req = Request::delete(&url);
325
326 let req = req.extension(Operation::Delete);
327
328 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
329 self.sign(&mut req).await?;
330 self.send(req).await
331 }
332
333 pub async fn azfile_delete_dir(&self, path: &str) -> Result<Response<Buffer>> {
334 let p = build_abs_path(&self.root, path)
335 .trim_start_matches('/')
336 .to_string();
337
338 let url = format!(
339 "{}/{}/{}?restype=directory",
340 self.endpoint,
341 self.share_name,
342 percent_encode_path(&p)
343 );
344
345 let req = Request::delete(&url);
346
347 let req = req.extension(Operation::Delete);
348
349 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
350 self.sign(&mut req).await?;
351 self.send(req).await
352 }
353
354 pub async fn azfile_list(
355 &self,
356 path: &str,
357 limit: &Option<usize>,
358 continuation: &str,
359 ) -> Result<Response<Buffer>> {
360 let p = build_abs_path(&self.root, path)
361 .trim_start_matches('/')
362 .to_string();
363
364 let url = format!(
365 "{}/{}/{}",
366 self.endpoint,
367 self.share_name,
368 percent_encode_path(&p),
369 );
370
371 let mut url = QueryPairsWriter::new(&url)
372 .push("restype", "directory")
373 .push("comp", "list")
374 .push("include", "Timestamps,ETag");
375
376 if !continuation.is_empty() {
377 url = url.push("marker", continuation);
378 }
379
380 if let Some(limit) = limit {
381 url = url.push("maxresults", &limit.to_string());
382 }
383
384 let req = Request::get(url.finish());
385
386 let req = req.extension(Operation::List);
387
388 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
389 self.sign(&mut req).await?;
390 self.send(req).await
391 }
392
393 pub async fn ensure_parent_dir_exists(&self, path: &str) -> Result<()> {
394 let mut dirs = VecDeque::default();
395 let mut p = path;
397 while p != "/" {
398 p = get_parent(p);
399 dirs.push_front(p);
400 }
401
402 let mut pop_dir_count = dirs.len();
403 for dir in dirs.iter().rev() {
404 let resp = self.azfile_get_directory_properties(dir).await?;
405 if resp.status() == StatusCode::NOT_FOUND {
406 pop_dir_count -= 1;
407 continue;
408 }
409 break;
410 }
411
412 for dir in dirs.iter().skip(pop_dir_count) {
413 let resp = self.azfile_create_dir(dir).await?;
414
415 if resp.status() == StatusCode::CREATED {
416 continue;
417 }
418
419 if resp
420 .headers()
421 .get("x-ms-error-code")
422 .map(|value| value.to_str().unwrap_or(""))
423 .unwrap_or_else(|| "")
424 == "ResourceAlreadyExists"
425 {
426 continue;
427 }
428
429 return Err(parse_error(resp));
430 }
431
432 Ok(())
433 }
434}