opendal/services/azfile/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::VecDeque;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use http::header::CONTENT_DISPOSITION;
24use http::header::CONTENT_LENGTH;
25use http::header::CONTENT_TYPE;
26use http::header::RANGE;
27use http::HeaderName;
28use http::HeaderValue;
29use http::Request;
30use http::Response;
31use http::StatusCode;
32use reqsign::AzureStorageCredential;
33use reqsign::AzureStorageLoader;
34use reqsign::AzureStorageSigner;
35
36use super::error::parse_error;
37use crate::raw::*;
38use crate::*;
39
40const X_MS_VERSION: &str = "x-ms-version";
41const X_MS_WRITE: &str = "x-ms-write";
42const X_MS_FILE_RENAME_SOURCE: &str = "x-ms-file-rename-source";
43const X_MS_CONTENT_LENGTH: &str = "x-ms-content-length";
44const X_MS_TYPE: &str = "x-ms-type";
45const X_MS_FILE_RENAME_REPLACE_IF_EXISTS: &str = "x-ms-file-rename-replace-if-exists";
46
47pub struct AzfileCore {
48    pub info: Arc<AccessorInfo>,
49    pub root: String,
50    pub endpoint: String,
51    pub share_name: String,
52    pub loader: AzureStorageLoader,
53    pub signer: AzureStorageSigner,
54}
55
56impl Debug for AzfileCore {
57    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("AzfileCore")
59            .field("root", &self.root)
60            .field("endpoint", &self.endpoint)
61            .field("share_name", &self.share_name)
62            .finish_non_exhaustive()
63    }
64}
65
66impl AzfileCore {
67    async fn load_credential(&self) -> Result<AzureStorageCredential> {
68        let cred = self
69            .loader
70            .load()
71            .await
72            .map_err(new_request_credential_error)?;
73
74        if let Some(cred) = cred {
75            Ok(cred)
76        } else {
77            Err(Error::new(
78                ErrorKind::ConfigInvalid,
79                "no valid credential found",
80            ))
81        }
82    }
83
84    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
85        let cred = self.load_credential().await?;
86        // Insert x-ms-version header for normal requests.
87        req.headers_mut().insert(
88            HeaderName::from_static(X_MS_VERSION),
89            // consistent with azdls and azblob
90            HeaderValue::from_static("2022-11-02"),
91        );
92        self.signer.sign(req, &cred).map_err(new_request_sign_error)
93    }
94
95    #[inline]
96    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
97        self.info.http_client().send(req).await
98    }
99
100    pub async fn azfile_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
101        let p = build_abs_path(&self.root, path);
102
103        let url = format!(
104            "{}/{}/{}",
105            self.endpoint,
106            self.share_name,
107            percent_encode_path(&p)
108        );
109
110        let mut req = Request::get(&url);
111
112        if !range.is_full() {
113            req = req.header(RANGE, range.to_header());
114        }
115
116        let req = req.extension(Operation::Read);
117
118        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
119        self.sign(&mut req).await?;
120        self.info.http_client().fetch(req).await
121    }
122
123    pub async fn azfile_create_file(
124        &self,
125        path: &str,
126        size: usize,
127        args: &OpWrite,
128    ) -> Result<Response<Buffer>> {
129        let p = build_abs_path(&self.root, path)
130            .trim_start_matches('/')
131            .to_string();
132        let url = format!(
133            "{}/{}/{}",
134            self.endpoint,
135            self.share_name,
136            percent_encode_path(&p)
137        );
138
139        let mut req = Request::put(&url);
140
141        // x-ms-content-length specifies the maximum size for the file, up to 4 tebibytes (TiB)
142        // https://learn.microsoft.com/en-us/rest/api/storageservices/create-file
143        req = req.header(X_MS_CONTENT_LENGTH, size);
144
145        req = req.header(X_MS_TYPE, "file");
146
147        // Content length must be 0 for create request.
148        req = req.header(CONTENT_LENGTH, 0);
149
150        if let Some(ty) = args.content_type() {
151            req = req.header(CONTENT_TYPE, ty);
152        }
153
154        if let Some(pos) = args.content_disposition() {
155            req = req.header(CONTENT_DISPOSITION, pos);
156        }
157
158        let req = req.extension(Operation::Write);
159
160        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
161        self.sign(&mut req).await?;
162        self.send(req).await
163    }
164
165    pub async fn azfile_update(
166        &self,
167        path: &str,
168        size: u64,
169        position: u64,
170        body: Buffer,
171    ) -> Result<Response<Buffer>> {
172        let p = build_abs_path(&self.root, path)
173            .trim_start_matches('/')
174            .to_string();
175
176        let url = format!(
177            "{}/{}/{}?comp=range",
178            self.endpoint,
179            self.share_name,
180            percent_encode_path(&p)
181        );
182
183        let mut req = Request::put(&url);
184
185        req = req.header(CONTENT_LENGTH, size);
186
187        req = req.header(X_MS_WRITE, "update");
188
189        req = req.header(
190            RANGE,
191            BytesRange::from(position..position + size).to_header(),
192        );
193
194        let req = req.extension(Operation::Write);
195
196        let mut req = req.body(body).map_err(new_request_build_error)?;
197        self.sign(&mut req).await?;
198        self.send(req).await
199    }
200
201    pub async fn azfile_get_file_properties(&self, path: &str) -> Result<Response<Buffer>> {
202        let p = build_abs_path(&self.root, path);
203        let url = format!(
204            "{}/{}/{}",
205            self.endpoint,
206            self.share_name,
207            percent_encode_path(&p)
208        );
209
210        let req = Request::head(&url);
211
212        let req = req.extension(Operation::Stat);
213
214        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
215        self.sign(&mut req).await?;
216        self.send(req).await
217    }
218
219    pub async fn azfile_get_directory_properties(&self, path: &str) -> Result<Response<Buffer>> {
220        let p = build_abs_path(&self.root, path);
221
222        let url = format!(
223            "{}/{}/{}?restype=directory",
224            self.endpoint,
225            self.share_name,
226            percent_encode_path(&p)
227        );
228
229        let req = Request::head(&url);
230
231        let req = req.extension(Operation::Stat);
232
233        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
234        self.sign(&mut req).await?;
235        self.send(req).await
236    }
237
238    pub async fn azfile_rename(&self, path: &str, new_path: &str) -> Result<Response<Buffer>> {
239        let p = build_abs_path(&self.root, path)
240            .trim_start_matches('/')
241            .to_string();
242
243        let new_p = build_abs_path(&self.root, new_path)
244            .trim_start_matches('/')
245            .to_string();
246
247        let url = if path.ends_with('/') {
248            format!(
249                "{}/{}/{}?restype=directory&comp=rename",
250                self.endpoint,
251                self.share_name,
252                percent_encode_path(&new_p)
253            )
254        } else {
255            format!(
256                "{}/{}/{}?comp=rename",
257                self.endpoint,
258                self.share_name,
259                percent_encode_path(&new_p)
260            )
261        };
262
263        let mut req = Request::put(&url);
264
265        req = req.header(CONTENT_LENGTH, 0);
266
267        // x-ms-file-rename-source specifies the file or directory to be renamed.
268        // the value must be a URL style path
269        // the official document does not mention the URL style path
270        // find the solution from the community FAQ and implementation of the Java-SDK
271        // ref: https://learn.microsoft.com/en-us/answers/questions/799611/azure-file-service-rest-api(rename)?page=1
272        let source_url = format!(
273            "{}/{}/{}",
274            self.endpoint,
275            self.share_name,
276            percent_encode_path(&p)
277        );
278
279        req = req.header(X_MS_FILE_RENAME_SOURCE, &source_url);
280
281        req = req.header(X_MS_FILE_RENAME_REPLACE_IF_EXISTS, "true");
282
283        let req = req.extension(Operation::Rename);
284
285        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
286        self.sign(&mut req).await?;
287        self.send(req).await
288    }
289
290    pub async fn azfile_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
291        let p = build_abs_path(&self.root, path)
292            .trim_start_matches('/')
293            .to_string();
294
295        let url = format!(
296            "{}/{}/{}?restype=directory",
297            self.endpoint,
298            self.share_name,
299            percent_encode_path(&p)
300        );
301
302        let mut req = Request::put(&url);
303
304        req = req.header(CONTENT_LENGTH, 0);
305
306        let req = req.extension(Operation::CreateDir);
307
308        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
309        self.sign(&mut req).await?;
310        self.send(req).await
311    }
312
313    pub async fn azfile_delete_file(&self, path: &str) -> Result<Response<Buffer>> {
314        let p = build_abs_path(&self.root, path)
315            .trim_start_matches('/')
316            .to_string();
317
318        let url = format!(
319            "{}/{}/{}",
320            self.endpoint,
321            self.share_name,
322            percent_encode_path(&p)
323        );
324
325        let req = Request::delete(&url);
326
327        let req = req.extension(Operation::Delete);
328
329        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
330        self.sign(&mut req).await?;
331        self.send(req).await
332    }
333
334    pub async fn azfile_delete_dir(&self, path: &str) -> Result<Response<Buffer>> {
335        let p = build_abs_path(&self.root, path)
336            .trim_start_matches('/')
337            .to_string();
338
339        let url = format!(
340            "{}/{}/{}?restype=directory",
341            self.endpoint,
342            self.share_name,
343            percent_encode_path(&p)
344        );
345
346        let req = Request::delete(&url);
347
348        let req = req.extension(Operation::Delete);
349
350        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
351        self.sign(&mut req).await?;
352        self.send(req).await
353    }
354
355    pub async fn azfile_list(
356        &self,
357        path: &str,
358        limit: &Option<usize>,
359        continuation: &str,
360    ) -> Result<Response<Buffer>> {
361        let p = build_abs_path(&self.root, path)
362            .trim_start_matches('/')
363            .to_string();
364
365        let url = format!(
366            "{}/{}/{}",
367            self.endpoint,
368            self.share_name,
369            percent_encode_path(&p),
370        );
371
372        let mut url = QueryPairsWriter::new(&url)
373            .push("restype", "directory")
374            .push("comp", "list")
375            .push("include", "Timestamps,ETag");
376
377        if !continuation.is_empty() {
378            url = url.push("marker", continuation);
379        }
380
381        if let Some(limit) = limit {
382            url = url.push("maxresults", &limit.to_string());
383        }
384
385        let req = Request::get(url.finish());
386
387        let req = req.extension(Operation::List);
388
389        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
390        self.sign(&mut req).await?;
391        self.send(req).await
392    }
393
394    pub async fn ensure_parent_dir_exists(&self, path: &str) -> Result<()> {
395        let mut dirs = VecDeque::default();
396        // azure file service does not support recursive directory creation
397        let mut p = path;
398        while p != "/" {
399            p = get_parent(p);
400            dirs.push_front(p);
401        }
402
403        let mut pop_dir_count = dirs.len();
404        for dir in dirs.iter().rev() {
405            let resp = self.azfile_get_directory_properties(dir).await?;
406            if resp.status() == StatusCode::NOT_FOUND {
407                pop_dir_count -= 1;
408                continue;
409            }
410            break;
411        }
412
413        for dir in dirs.iter().skip(pop_dir_count) {
414            let resp = self.azfile_create_dir(dir).await?;
415
416            if resp.status() == StatusCode::CREATED {
417                continue;
418            }
419
420            if resp
421                .headers()
422                .get("x-ms-error-code")
423                .map(|value| value.to_str().unwrap_or(""))
424                .unwrap_or_else(|| "")
425                == "ResourceAlreadyExists"
426            {
427                continue;
428            }
429
430            return Err(parse_error(resp));
431        }
432
433        Ok(())
434    }
435}