opendal/services/azdls/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use http::header::CONTENT_DISPOSITION;
24use http::header::CONTENT_LENGTH;
25use http::header::CONTENT_TYPE;
26use http::header::IF_NONE_MATCH;
27use http::HeaderName;
28use http::HeaderValue;
29use http::Request;
30use http::Response;
31use http::StatusCode;
32use reqsign::AzureStorageCredential;
33use reqsign::AzureStorageLoader;
34use reqsign::AzureStorageSigner;
35
36use super::error::parse_error;
37use crate::raw::*;
38use crate::*;
39
40const X_MS_RENAME_SOURCE: &str = "x-ms-rename-source";
41const X_MS_VERSION: &str = "x-ms-version";
42pub const X_MS_VERSION_ID: &str = "x-ms-version-id";
43pub const DIRECTORY: &str = "directory";
44pub const FILE: &str = "file";
45
46pub struct AzdlsCore {
47    pub info: Arc<AccessorInfo>,
48    pub filesystem: String,
49    pub root: String,
50    pub endpoint: String,
51
52    pub loader: AzureStorageLoader,
53    pub signer: AzureStorageSigner,
54}
55
56impl Debug for AzdlsCore {
57    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
58        f.debug_struct("AzdlsCore")
59            .field("filesystem", &self.filesystem)
60            .field("root", &self.root)
61            .field("endpoint", &self.endpoint)
62            .finish_non_exhaustive()
63    }
64}
65
66impl AzdlsCore {
67    async fn load_credential(&self) -> Result<AzureStorageCredential> {
68        let cred = self
69            .loader
70            .load()
71            .await
72            .map_err(new_request_credential_error)?;
73
74        if let Some(cred) = cred {
75            Ok(cred)
76        } else {
77            Err(Error::new(
78                ErrorKind::ConfigInvalid,
79                "no valid credential found",
80            ))
81        }
82    }
83
84    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
85        let cred = self.load_credential().await?;
86        // Insert x-ms-version header for normal requests.
87        req.headers_mut().insert(
88            HeaderName::from_static(X_MS_VERSION),
89            // 2022-11-02 is the version supported by Azurite V3 and
90            // used by Azure Portal, We use this version to make
91            // sure most our developer happy.
92            //
93            // In the future, we could allow users to configure this value.
94            HeaderValue::from_static("2022-11-02"),
95        );
96        self.signer.sign(req, &cred).map_err(new_request_sign_error)
97    }
98
99    #[inline]
100    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
101        self.info.http_client().send(req).await
102    }
103}
104
105impl AzdlsCore {
106    pub async fn azdls_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
107        let p = build_abs_path(&self.root, path);
108
109        let url = format!(
110            "{}/{}/{}",
111            self.endpoint,
112            self.filesystem,
113            percent_encode_path(&p)
114        );
115
116        let mut req = Request::get(&url);
117
118        if !range.is_full() {
119            req = req.header(http::header::RANGE, range.to_header());
120        }
121
122        let mut req = req
123            .extension(Operation::Read)
124            .body(Buffer::new())
125            .map_err(new_request_build_error)?;
126
127        self.sign(&mut req).await?;
128        self.info.http_client().fetch(req).await
129    }
130
131    /// resource should be one of `file` or `directory`
132    ///
133    /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create
134    pub async fn azdls_create(
135        &self,
136        path: &str,
137        resource: &str,
138        args: &OpWrite,
139    ) -> Result<Response<Buffer>> {
140        let p = build_abs_path(&self.root, path)
141            .trim_end_matches('/')
142            .to_string();
143
144        let url = format!(
145            "{}/{}/{}?resource={resource}",
146            self.endpoint,
147            self.filesystem,
148            percent_encode_path(&p)
149        );
150
151        let mut req = Request::put(&url);
152
153        // Content length must be 0 for create request.
154        req = req.header(CONTENT_LENGTH, 0);
155
156        if let Some(ty) = args.content_type() {
157            req = req.header(CONTENT_TYPE, ty)
158        }
159
160        if let Some(pos) = args.content_disposition() {
161            req = req.header(CONTENT_DISPOSITION, pos)
162        }
163
164        if args.if_not_exists() {
165            req = req.header(IF_NONE_MATCH, "*")
166        }
167
168        if let Some(v) = args.if_none_match() {
169            req = req.header(IF_NONE_MATCH, v)
170        }
171
172        let operation = if resource == DIRECTORY {
173            Operation::CreateDir
174        } else {
175            Operation::Write
176        };
177
178        let mut req = req
179            .extension(operation)
180            .body(Buffer::new())
181            .map_err(new_request_build_error)?;
182
183        self.sign(&mut req).await?;
184        self.send(req).await
185    }
186
187    pub async fn azdls_rename(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
188        let source = build_abs_path(&self.root, from);
189        let target = build_abs_path(&self.root, to);
190
191        let url = format!(
192            "{}/{}/{}",
193            self.endpoint,
194            self.filesystem,
195            percent_encode_path(&target)
196        );
197
198        let source_path = format!("/{}/{}", self.filesystem, percent_encode_path(&source));
199
200        let mut req = Request::put(&url)
201            .header(X_MS_RENAME_SOURCE, source_path)
202            .header(CONTENT_LENGTH, 0)
203            .extension(Operation::Rename)
204            .body(Buffer::new())
205            .map_err(new_request_build_error)?;
206
207        self.sign(&mut req).await?;
208        self.send(req).await
209    }
210
211    /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update
212    pub async fn azdls_update(
213        &self,
214        path: &str,
215        size: Option<u64>,
216        position: u64,
217        body: Buffer,
218    ) -> Result<Response<Buffer>> {
219        let p = build_abs_path(&self.root, path);
220
221        // - close: Make this is the final action to this file.
222        // - flush: Flush the file directly.
223        let url = format!(
224            "{}/{}/{}?action=append&close=true&flush=true&position={}",
225            self.endpoint,
226            self.filesystem,
227            percent_encode_path(&p),
228            position
229        );
230
231        let mut req = Request::patch(&url);
232
233        if let Some(size) = size {
234            req = req.header(CONTENT_LENGTH, size)
235        }
236
237        let mut req = req
238            .extension(Operation::Write)
239            .body(body)
240            .map_err(new_request_build_error)?;
241
242        self.sign(&mut req).await?;
243        self.send(req).await
244    }
245
246    pub async fn azdls_get_properties(&self, path: &str) -> Result<Response<Buffer>> {
247        let p = build_abs_path(&self.root, path)
248            .trim_end_matches('/')
249            .to_string();
250
251        let url = format!(
252            "{}/{}/{}?action=getStatus",
253            self.endpoint,
254            self.filesystem,
255            percent_encode_path(&p)
256        );
257
258        let req = Request::head(&url);
259
260        let mut req = req
261            .extension(Operation::Stat)
262            .body(Buffer::new())
263            .map_err(new_request_build_error)?;
264
265        self.sign(&mut req).await?;
266        self.send(req).await
267    }
268
269    pub async fn azdls_stat_metadata(&self, path: &str) -> Result<Metadata> {
270        let resp = self.azdls_get_properties(path).await?;
271
272        if resp.status() != StatusCode::OK {
273            return Err(parse_error(resp));
274        }
275
276        let headers = resp.headers();
277        let mut meta = parse_into_metadata(path, headers)?;
278
279        if let Some(version_id) = parse_header_to_str(headers, X_MS_VERSION_ID)? {
280            meta.set_version(version_id);
281        }
282
283        let resource = resp
284            .headers()
285            .get("x-ms-resource-type")
286            .ok_or_else(|| {
287                Error::new(
288                    ErrorKind::Unexpected,
289                    "azdls should return x-ms-resource-type header, but it's missing",
290                )
291            })?
292            .to_str()
293            .map_err(|err| {
294                Error::new(
295                    ErrorKind::Unexpected,
296                    "azdls should return x-ms-resource-type header, but it's not a valid string",
297                )
298                .set_source(err)
299            })?;
300
301        match resource {
302            FILE => Ok(meta.with_mode(EntryMode::FILE)),
303            DIRECTORY => Ok(meta.with_mode(EntryMode::DIR)),
304            v => Err(Error::new(
305                ErrorKind::Unexpected,
306                "azdls returns an unknown x-ms-resource-type",
307            )
308            .with_context("resource", v)),
309        }
310    }
311
312    pub async fn azdls_delete(&self, path: &str) -> Result<Response<Buffer>> {
313        let p = build_abs_path(&self.root, path)
314            .trim_end_matches('/')
315            .to_string();
316
317        let url = format!(
318            "{}/{}/{}",
319            self.endpoint,
320            self.filesystem,
321            percent_encode_path(&p)
322        );
323
324        let mut req = Request::delete(&url)
325            .extension(Operation::Delete)
326            .body(Buffer::new())
327            .map_err(new_request_build_error)?;
328
329        self.sign(&mut req).await?;
330        self.send(req).await
331    }
332
333    pub async fn azdls_list(
334        &self,
335        path: &str,
336        continuation: &str,
337        limit: Option<usize>,
338    ) -> Result<Response<Buffer>> {
339        let p = build_abs_path(&self.root, path)
340            .trim_end_matches('/')
341            .to_string();
342
343        let mut url = QueryPairsWriter::new(&format!("{}/{}", self.endpoint, self.filesystem))
344            .push("resource", "filesystem")
345            .push("recursive", "false");
346        if !p.is_empty() {
347            url = url.push("directory", &percent_encode_path(&p));
348        }
349        if let Some(limit) = limit {
350            url = url.push("maxResults", &limit.to_string());
351        }
352        if !continuation.is_empty() {
353            url = url.push("continuation", &percent_encode_path(continuation));
354        }
355
356        let mut req = Request::get(url.finish())
357            .extension(Operation::List)
358            .body(Buffer::new())
359            .map_err(new_request_build_error)?;
360
361        self.sign(&mut req).await?;
362        self.send(req).await
363    }
364
365    pub async fn azdls_ensure_parent_path(&self, path: &str) -> Result<Option<Response<Buffer>>> {
366        let abs_target_path = path.trim_end_matches('/').to_string();
367        let abs_target_path = abs_target_path.as_str();
368        let mut parts: Vec<&str> = abs_target_path
369            .split('/')
370            .filter(|x| !x.is_empty())
371            .collect();
372
373        if !parts.is_empty() {
374            parts.pop();
375        }
376
377        if !parts.is_empty() {
378            let parent_path = parts.join("/");
379            let resp = self
380                .azdls_create(&parent_path, DIRECTORY, &OpWrite::default())
381                .await?;
382
383            Ok(Some(resp))
384        } else {
385            Ok(None)
386        }
387    }
388}