opendal/services/azfile/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use http::header::CONTENT_DISPOSITION;
19use http::header::CONTENT_LENGTH;
20use http::header::CONTENT_TYPE;
21use http::header::RANGE;
22use http::HeaderName;
23use http::HeaderValue;
24use http::Request;
25use http::Response;
26use http::StatusCode;
27use reqsign::AzureStorageCredential;
28use reqsign::AzureStorageLoader;
29use reqsign::AzureStorageSigner;
30use std::collections::VecDeque;
31use std::fmt::Debug;
32use std::fmt::Formatter;
33use std::fmt::Write;
34use std::sync::Arc;
35
36use super::error::parse_error;
37use crate::raw::*;
38use crate::*;
39
40const X_MS_VERSION: &str = "x-ms-version";
41const X_MS_WRITE: &str = "x-ms-write";
42const X_MS_FILE_RENAME_SOURCE: &str = "x-ms-file-rename-source";
43const X_MS_CONTENT_LENGTH: &str = "x-ms-content-length";
44const X_MS_TYPE: &str = "x-ms-type";
45const X_MS_FILE_RENAME_REPLACE_IF_EXISTS: &str = "x-ms-file-rename-replace-if-exists";
46
47pub struct AzfileCore {
48    pub info: Arc<AccessorInfo>,
49    pub root: String,
50    pub endpoint: String,
51    pub share_name: String,
52    pub loader: AzureStorageLoader,
53    pub signer: AzureStorageSigner,
54}
55
56impl Debug for AzfileCore {
57    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("AzfileCore")
59            .field("root", &self.root)
60            .field("endpoint", &self.endpoint)
61            .field("share_name", &self.share_name)
62            .finish_non_exhaustive()
63    }
64}
65
66impl AzfileCore {
67    async fn load_credential(&self) -> Result<AzureStorageCredential> {
68        let cred = self
69            .loader
70            .load()
71            .await
72            .map_err(new_request_credential_error)?;
73
74        if let Some(cred) = cred {
75            Ok(cred)
76        } else {
77            Err(Error::new(
78                ErrorKind::ConfigInvalid,
79                "no valid credential found",
80            ))
81        }
82    }
83
84    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
85        let cred = self.load_credential().await?;
86        // Insert x-ms-version header for normal requests.
87        req.headers_mut().insert(
88            HeaderName::from_static(X_MS_VERSION),
89            // consistent with azdls and azblob
90            HeaderValue::from_static("2022-11-02"),
91        );
92        self.signer.sign(req, &cred).map_err(new_request_sign_error)
93    }
94
95    #[inline]
96    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
97        self.info.http_client().send(req).await
98    }
99
100    pub async fn azfile_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
101        let p = build_abs_path(&self.root, path);
102
103        let url = format!(
104            "{}/{}/{}",
105            self.endpoint,
106            self.share_name,
107            percent_encode_path(&p)
108        );
109
110        let mut req = Request::get(&url);
111
112        if !range.is_full() {
113            req = req.header(RANGE, range.to_header());
114        }
115
116        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
117        self.sign(&mut req).await?;
118        self.info.http_client().fetch(req).await
119    }
120
121    pub async fn azfile_create_file(
122        &self,
123        path: &str,
124        size: usize,
125        args: &OpWrite,
126    ) -> Result<Response<Buffer>> {
127        let p = build_abs_path(&self.root, path)
128            .trim_start_matches('/')
129            .to_string();
130        let url = format!(
131            "{}/{}/{}",
132            self.endpoint,
133            self.share_name,
134            percent_encode_path(&p)
135        );
136
137        let mut req = Request::put(&url);
138
139        // x-ms-content-length specifies the maximum size for the file, up to 4 tebibytes (TiB)
140        // https://learn.microsoft.com/en-us/rest/api/storageservices/create-file
141        req = req.header(X_MS_CONTENT_LENGTH, size);
142
143        req = req.header(X_MS_TYPE, "file");
144
145        // Content length must be 0 for create request.
146        req = req.header(CONTENT_LENGTH, 0);
147
148        if let Some(ty) = args.content_type() {
149            req = req.header(CONTENT_TYPE, ty);
150        }
151
152        if let Some(pos) = args.content_disposition() {
153            req = req.header(CONTENT_DISPOSITION, pos);
154        }
155
156        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
157        self.sign(&mut req).await?;
158        self.send(req).await
159    }
160
161    pub async fn azfile_update(
162        &self,
163        path: &str,
164        size: u64,
165        position: u64,
166        body: Buffer,
167    ) -> Result<Response<Buffer>> {
168        let p = build_abs_path(&self.root, path)
169            .trim_start_matches('/')
170            .to_string();
171
172        let url = format!(
173            "{}/{}/{}?comp=range",
174            self.endpoint,
175            self.share_name,
176            percent_encode_path(&p)
177        );
178
179        let mut req = Request::put(&url);
180
181        req = req.header(CONTENT_LENGTH, size);
182
183        req = req.header(X_MS_WRITE, "update");
184
185        req = req.header(
186            RANGE,
187            BytesRange::from(position..position + size).to_header(),
188        );
189
190        let mut req = req.body(body).map_err(new_request_build_error)?;
191        self.sign(&mut req).await?;
192        self.send(req).await
193    }
194
195    pub async fn azfile_get_file_properties(&self, path: &str) -> Result<Response<Buffer>> {
196        let p = build_abs_path(&self.root, path);
197        let url = format!(
198            "{}/{}/{}",
199            self.endpoint,
200            self.share_name,
201            percent_encode_path(&p)
202        );
203
204        let req = Request::head(&url);
205
206        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
207        self.sign(&mut req).await?;
208        self.send(req).await
209    }
210
211    pub async fn azfile_get_directory_properties(&self, path: &str) -> Result<Response<Buffer>> {
212        let p = build_abs_path(&self.root, path);
213
214        let url = format!(
215            "{}/{}/{}?restype=directory",
216            self.endpoint,
217            self.share_name,
218            percent_encode_path(&p)
219        );
220
221        let req = Request::head(&url);
222
223        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
224        self.sign(&mut req).await?;
225        self.send(req).await
226    }
227
228    pub async fn azfile_rename(&self, path: &str, new_path: &str) -> Result<Response<Buffer>> {
229        let p = build_abs_path(&self.root, path)
230            .trim_start_matches('/')
231            .to_string();
232
233        let new_p = build_abs_path(&self.root, new_path)
234            .trim_start_matches('/')
235            .to_string();
236
237        let url = if path.ends_with('/') {
238            format!(
239                "{}/{}/{}?restype=directory&comp=rename",
240                self.endpoint,
241                self.share_name,
242                percent_encode_path(&new_p)
243            )
244        } else {
245            format!(
246                "{}/{}/{}?comp=rename",
247                self.endpoint,
248                self.share_name,
249                percent_encode_path(&new_p)
250            )
251        };
252
253        let mut req = Request::put(&url);
254
255        req = req.header(CONTENT_LENGTH, 0);
256
257        // x-ms-file-rename-source specifies the file or directory to be renamed.
258        // the value must be a URL style path
259        // the official document does not mention the URL style path
260        // find the solution from the community FAQ and implementation of the Java-SDK
261        // ref: https://learn.microsoft.com/en-us/answers/questions/799611/azure-file-service-rest-api(rename)?page=1
262        let source_url = format!(
263            "{}/{}/{}",
264            self.endpoint,
265            self.share_name,
266            percent_encode_path(&p)
267        );
268
269        req = req.header(X_MS_FILE_RENAME_SOURCE, &source_url);
270
271        req = req.header(X_MS_FILE_RENAME_REPLACE_IF_EXISTS, "true");
272
273        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
274        self.sign(&mut req).await?;
275        self.send(req).await
276    }
277
278    pub async fn azfile_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
279        let p = build_abs_path(&self.root, path)
280            .trim_start_matches('/')
281            .to_string();
282
283        let url = format!(
284            "{}/{}/{}?restype=directory",
285            self.endpoint,
286            self.share_name,
287            percent_encode_path(&p)
288        );
289
290        let mut req = Request::put(&url);
291
292        req = req.header(CONTENT_LENGTH, 0);
293
294        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
295        self.sign(&mut req).await?;
296        self.send(req).await
297    }
298
299    pub async fn azfile_delete_file(&self, path: &str) -> Result<Response<Buffer>> {
300        let p = build_abs_path(&self.root, path)
301            .trim_start_matches('/')
302            .to_string();
303
304        let url = format!(
305            "{}/{}/{}",
306            self.endpoint,
307            self.share_name,
308            percent_encode_path(&p)
309        );
310
311        let req = Request::delete(&url);
312
313        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
314        self.sign(&mut req).await?;
315        self.send(req).await
316    }
317
318    pub async fn azfile_delete_dir(&self, path: &str) -> Result<Response<Buffer>> {
319        let p = build_abs_path(&self.root, path)
320            .trim_start_matches('/')
321            .to_string();
322
323        let url = format!(
324            "{}/{}/{}?restype=directory",
325            self.endpoint,
326            self.share_name,
327            percent_encode_path(&p)
328        );
329
330        let req = Request::delete(&url);
331
332        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
333        self.sign(&mut req).await?;
334        self.send(req).await
335    }
336
337    pub async fn azfile_list(
338        &self,
339        path: &str,
340        limit: &Option<usize>,
341        continuation: &String,
342    ) -> Result<Response<Buffer>> {
343        let p = build_abs_path(&self.root, path)
344            .trim_start_matches('/')
345            .to_string();
346
347        let mut url = format!(
348            "{}/{}/{}?restype=directory&comp=list&include=Timestamps,ETag",
349            self.endpoint,
350            self.share_name,
351            percent_encode_path(&p),
352        );
353
354        if !continuation.is_empty() {
355            write!(url, "&marker={}", &continuation).expect("write into string must succeed");
356        }
357
358        if let Some(limit) = limit {
359            write!(url, "&maxresults={}", limit).expect("write into string must succeed");
360        }
361
362        let req = Request::get(&url);
363
364        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
365        self.sign(&mut req).await?;
366        self.send(req).await
367    }
368
369    pub async fn ensure_parent_dir_exists(&self, path: &str) -> Result<()> {
370        let mut dirs = VecDeque::default();
371        // azure file service does not support recursive directory creation
372        let mut p = path;
373        while p != "/" {
374            p = get_parent(p);
375            dirs.push_front(p);
376        }
377
378        let mut pop_dir_count = dirs.len();
379        for dir in dirs.iter().rev() {
380            let resp = self.azfile_get_directory_properties(dir).await?;
381            if resp.status() == StatusCode::NOT_FOUND {
382                pop_dir_count -= 1;
383                continue;
384            }
385            break;
386        }
387
388        for dir in dirs.iter().skip(pop_dir_count) {
389            let resp = self.azfile_create_dir(dir).await?;
390
391            if resp.status() == StatusCode::CREATED {
392                continue;
393            }
394
395            if resp
396                .headers()
397                .get("x-ms-error-code")
398                .map(|value| value.to_str().unwrap_or(""))
399                .unwrap_or_else(|| "")
400                == "ResourceAlreadyExists"
401            {
402                continue;
403            }
404
405            return Err(parse_error(resp));
406        }
407
408        Ok(())
409    }
410}