opendal_core/services/azfile/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::VecDeque;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use http::HeaderName;
23use http::HeaderValue;
24use http::Request;
25use http::Response;
26use http::StatusCode;
27use http::header::CONTENT_DISPOSITION;
28use http::header::CONTENT_LENGTH;
29use http::header::CONTENT_TYPE;
30use http::header::RANGE;
31use reqsign::AzureStorageCredential;
32use reqsign::AzureStorageLoader;
33use reqsign::AzureStorageSigner;
34
35use super::error::parse_error;
36use crate::raw::*;
37use crate::*;
38
39const X_MS_VERSION: &str = "x-ms-version";
40const X_MS_WRITE: &str = "x-ms-write";
41const X_MS_FILE_RENAME_SOURCE: &str = "x-ms-file-rename-source";
42const X_MS_CONTENT_LENGTH: &str = "x-ms-content-length";
43const X_MS_TYPE: &str = "x-ms-type";
44const X_MS_FILE_RENAME_REPLACE_IF_EXISTS: &str = "x-ms-file-rename-replace-if-exists";
45pub const X_MS_META_PREFIX: &str = "x-ms-meta-";
46
47pub struct AzfileCore {
48    pub info: Arc<AccessorInfo>,
49    pub root: String,
50    pub endpoint: String,
51    pub share_name: String,
52    pub loader: AzureStorageLoader,
53    pub signer: AzureStorageSigner,
54}
55
56impl Debug for AzfileCore {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("AzfileCore")
59            .field("root", &self.root)
60            .field("endpoint", &self.endpoint)
61            .field("share_name", &self.share_name)
62            .finish_non_exhaustive()
63    }
64}
65
66impl AzfileCore {
67    async fn load_credential(&self) -> Result<AzureStorageCredential> {
68        let cred = self
69            .loader
70            .load()
71            .await
72            .map_err(new_request_credential_error)?;
73
74        if let Some(cred) = cred {
75            Ok(cred)
76        } else {
77            Err(Error::new(
78                ErrorKind::ConfigInvalid,
79                "no valid credential found",
80            ))
81        }
82    }
83
84    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
85        let cred = self.load_credential().await?;
86        // Insert x-ms-version header for normal requests.
87        req.headers_mut().insert(
88            HeaderName::from_static(X_MS_VERSION),
89            // consistent with azdls and azblob
90            HeaderValue::from_static("2022-11-02"),
91        );
92        self.signer.sign(req, &cred).map_err(new_request_sign_error)
93    }
94
95    #[inline]
96    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
97        self.info.http_client().send(req).await
98    }
99
100    pub async fn azfile_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
101        let p = build_abs_path(&self.root, path);
102
103        let url = format!(
104            "{}/{}/{}",
105            self.endpoint,
106            self.share_name,
107            percent_encode_path(&p)
108        );
109
110        let mut req = Request::get(&url);
111
112        if !range.is_full() {
113            req = req.header(RANGE, range.to_header());
114        }
115
116        let req = req.extension(Operation::Read);
117
118        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
119        self.sign(&mut req).await?;
120        self.info.http_client().fetch(req).await
121    }
122
123    pub async fn azfile_create_file(
124        &self,
125        path: &str,
126        size: usize,
127        args: &OpWrite,
128    ) -> Result<Response<Buffer>> {
129        let p = build_abs_path(&self.root, path)
130            .trim_start_matches('/')
131            .to_string();
132        let url = format!(
133            "{}/{}/{}",
134            self.endpoint,
135            self.share_name,
136            percent_encode_path(&p)
137        );
138
139        let mut req = Request::put(&url);
140
141        // x-ms-content-length specifies the maximum size for the file, up to 4 tebibytes (TiB)
142        // https://learn.microsoft.com/en-us/rest/api/storageservices/create-file
143        req = req.header(X_MS_CONTENT_LENGTH, size);
144
145        req = req.header(X_MS_TYPE, "file");
146
147        // Content length must be 0 for create request.
148        req = req.header(CONTENT_LENGTH, 0);
149
150        if let Some(ty) = args.content_type() {
151            req = req.header(CONTENT_TYPE, ty);
152        }
153
154        if let Some(pos) = args.content_disposition() {
155            req = req.header(CONTENT_DISPOSITION, pos);
156        }
157
158        // Set user metadata headers.
159        if let Some(user_metadata) = args.user_metadata() {
160            for (key, value) in user_metadata {
161                req = req.header(format!("{X_MS_META_PREFIX}{key}"), value);
162            }
163        }
164
165        let req = req.extension(Operation::Write);
166
167        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
168        self.sign(&mut req).await?;
169        self.send(req).await
170    }
171
172    pub async fn azfile_update(
173        &self,
174        path: &str,
175        size: u64,
176        position: u64,
177        body: Buffer,
178    ) -> Result<Response<Buffer>> {
179        let p = build_abs_path(&self.root, path)
180            .trim_start_matches('/')
181            .to_string();
182
183        let url = format!(
184            "{}/{}/{}?comp=range",
185            self.endpoint,
186            self.share_name,
187            percent_encode_path(&p)
188        );
189
190        let mut req = Request::put(&url);
191
192        req = req.header(CONTENT_LENGTH, size);
193
194        req = req.header(X_MS_WRITE, "update");
195
196        req = req.header(
197            RANGE,
198            BytesRange::from(position..position + size).to_header(),
199        );
200
201        let req = req.extension(Operation::Write);
202
203        let mut req = req.body(body).map_err(new_request_build_error)?;
204        self.sign(&mut req).await?;
205        self.send(req).await
206    }
207
208    pub async fn azfile_get_file_properties(&self, path: &str) -> Result<Response<Buffer>> {
209        let p = build_abs_path(&self.root, path);
210        let url = format!(
211            "{}/{}/{}",
212            self.endpoint,
213            self.share_name,
214            percent_encode_path(&p)
215        );
216
217        let req = Request::head(&url);
218
219        let req = req.extension(Operation::Stat);
220
221        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
222        self.sign(&mut req).await?;
223        self.send(req).await
224    }
225
226    pub async fn azfile_get_directory_properties(&self, path: &str) -> Result<Response<Buffer>> {
227        let p = build_abs_path(&self.root, path);
228
229        let url = format!(
230            "{}/{}/{}?restype=directory",
231            self.endpoint,
232            self.share_name,
233            percent_encode_path(&p)
234        );
235
236        let req = Request::head(&url);
237
238        let req = req.extension(Operation::Stat);
239
240        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
241        self.sign(&mut req).await?;
242        self.send(req).await
243    }
244
245    pub async fn azfile_rename(&self, path: &str, new_path: &str) -> Result<Response<Buffer>> {
246        let p = build_abs_path(&self.root, path)
247            .trim_start_matches('/')
248            .to_string();
249
250        let new_p = build_abs_path(&self.root, new_path)
251            .trim_start_matches('/')
252            .to_string();
253
254        let url = if path.ends_with('/') {
255            format!(
256                "{}/{}/{}?restype=directory&comp=rename",
257                self.endpoint,
258                self.share_name,
259                percent_encode_path(&new_p)
260            )
261        } else {
262            format!(
263                "{}/{}/{}?comp=rename",
264                self.endpoint,
265                self.share_name,
266                percent_encode_path(&new_p)
267            )
268        };
269
270        let mut req = Request::put(&url);
271
272        req = req.header(CONTENT_LENGTH, 0);
273
274        // x-ms-file-rename-source specifies the file or directory to be renamed.
275        // the value must be a URL style path
276        // the official document does not mention the URL style path
277        // find the solution from the community FAQ and implementation of the Java-SDK
278        // ref: https://learn.microsoft.com/en-us/answers/questions/799611/azure-file-service-rest-api(rename)?page=1
279        let source_url = format!(
280            "{}/{}/{}",
281            self.endpoint,
282            self.share_name,
283            percent_encode_path(&p)
284        );
285
286        req = req.header(X_MS_FILE_RENAME_SOURCE, &source_url);
287
288        req = req.header(X_MS_FILE_RENAME_REPLACE_IF_EXISTS, "true");
289
290        let req = req.extension(Operation::Rename);
291
292        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
293        self.sign(&mut req).await?;
294        self.send(req).await
295    }
296
297    pub async fn azfile_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
298        let p = build_abs_path(&self.root, path)
299            .trim_start_matches('/')
300            .to_string();
301
302        let url = format!(
303            "{}/{}/{}?restype=directory",
304            self.endpoint,
305            self.share_name,
306            percent_encode_path(&p)
307        );
308
309        let mut req = Request::put(&url);
310
311        req = req.header(CONTENT_LENGTH, 0);
312
313        let req = req.extension(Operation::CreateDir);
314
315        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
316        self.sign(&mut req).await?;
317        self.send(req).await
318    }
319
320    pub async fn azfile_delete_file(&self, path: &str) -> Result<Response<Buffer>> {
321        let p = build_abs_path(&self.root, path)
322            .trim_start_matches('/')
323            .to_string();
324
325        let url = format!(
326            "{}/{}/{}",
327            self.endpoint,
328            self.share_name,
329            percent_encode_path(&p)
330        );
331
332        let req = Request::delete(&url);
333
334        let req = req.extension(Operation::Delete);
335
336        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
337        self.sign(&mut req).await?;
338        self.send(req).await
339    }
340
341    pub async fn azfile_delete_dir(&self, path: &str) -> Result<Response<Buffer>> {
342        let p = build_abs_path(&self.root, path)
343            .trim_start_matches('/')
344            .to_string();
345
346        let url = format!(
347            "{}/{}/{}?restype=directory",
348            self.endpoint,
349            self.share_name,
350            percent_encode_path(&p)
351        );
352
353        let req = Request::delete(&url);
354
355        let req = req.extension(Operation::Delete);
356
357        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
358        self.sign(&mut req).await?;
359        self.send(req).await
360    }
361
362    pub async fn azfile_list(
363        &self,
364        path: &str,
365        limit: &Option<usize>,
366        continuation: &str,
367    ) -> Result<Response<Buffer>> {
368        let p = build_abs_path(&self.root, path)
369            .trim_start_matches('/')
370            .to_string();
371
372        let url = format!(
373            "{}/{}/{}",
374            self.endpoint,
375            self.share_name,
376            percent_encode_path(&p),
377        );
378
379        let mut url = QueryPairsWriter::new(&url)
380            .push("restype", "directory")
381            .push("comp", "list")
382            .push("include", "Timestamps,ETag");
383
384        if !continuation.is_empty() {
385            url = url.push("marker", continuation);
386        }
387
388        if let Some(limit) = limit {
389            url = url.push("maxresults", &limit.to_string());
390        }
391
392        let req = Request::get(url.finish());
393
394        let req = req.extension(Operation::List);
395
396        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
397        self.sign(&mut req).await?;
398        self.send(req).await
399    }
400
401    pub async fn ensure_parent_dir_exists(&self, path: &str) -> Result<()> {
402        let mut dirs = VecDeque::default();
403        // azure file service does not support recursive directory creation
404        let mut p = path;
405        while p != "/" {
406            p = get_parent(p);
407            dirs.push_front(p);
408        }
409
410        let mut pop_dir_count = dirs.len();
411        for dir in dirs.iter().rev() {
412            let resp = self.azfile_get_directory_properties(dir).await?;
413            if resp.status() == StatusCode::NOT_FOUND {
414                pop_dir_count -= 1;
415                continue;
416            }
417            break;
418        }
419
420        for dir in dirs.iter().skip(pop_dir_count) {
421            let resp = self.azfile_create_dir(dir).await?;
422
423            if resp.status() == StatusCode::CREATED {
424                continue;
425            }
426
427            if resp
428                .headers()
429                .get("x-ms-error-code")
430                .map(|value| value.to_str().unwrap_or(""))
431                .unwrap_or_else(|| "")
432                == "ResourceAlreadyExists"
433            {
434                continue;
435            }
436
437            return Err(parse_error(resp));
438        }
439
440        Ok(())
441    }
442}