opendal/services/azdls/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use http::header::CONTENT_LENGTH;
19use http::header::CONTENT_TYPE;
20use http::header::{CONTENT_DISPOSITION, IF_NONE_MATCH};
21use http::HeaderName;
22use http::HeaderValue;
23use http::Request;
24use http::Response;
25use reqsign::AzureStorageCredential;
26use reqsign::AzureStorageLoader;
27use reqsign::AzureStorageSigner;
28use std::fmt;
29use std::fmt::Debug;
30use std::fmt::Formatter;
31use std::sync::Arc;
32
33use crate::raw::*;
34use crate::*;
35
36const X_MS_RENAME_SOURCE: &str = "x-ms-rename-source";
37const X_MS_VERSION: &str = "x-ms-version";
38
39pub struct AzdlsCore {
40    pub info: Arc<AccessorInfo>,
41    pub filesystem: String,
42    pub root: String,
43    pub endpoint: String,
44
45    pub loader: AzureStorageLoader,
46    pub signer: AzureStorageSigner,
47}
48
49impl Debug for AzdlsCore {
50    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
51        f.debug_struct("AzdlsCore")
52            .field("filesystem", &self.filesystem)
53            .field("root", &self.root)
54            .field("endpoint", &self.endpoint)
55            .finish_non_exhaustive()
56    }
57}
58
59impl AzdlsCore {
60    async fn load_credential(&self) -> Result<AzureStorageCredential> {
61        let cred = self
62            .loader
63            .load()
64            .await
65            .map_err(new_request_credential_error)?;
66
67        if let Some(cred) = cred {
68            Ok(cred)
69        } else {
70            Err(Error::new(
71                ErrorKind::ConfigInvalid,
72                "no valid credential found",
73            ))
74        }
75    }
76
77    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
78        let cred = self.load_credential().await?;
79        // Insert x-ms-version header for normal requests.
80        req.headers_mut().insert(
81            HeaderName::from_static(X_MS_VERSION),
82            // 2022-11-02 is the version supported by Azurite V3 and
83            // used by Azure Portal, We use this version to make
84            // sure most our developer happy.
85            //
86            // In the future, we could allow users to configure this value.
87            HeaderValue::from_static("2022-11-02"),
88        );
89        self.signer.sign(req, &cred).map_err(new_request_sign_error)
90    }
91
92    #[inline]
93    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
94        self.info.http_client().send(req).await
95    }
96}
97
98impl AzdlsCore {
99    pub async fn azdls_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
100        let p = build_abs_path(&self.root, path);
101
102        let url = format!(
103            "{}/{}/{}",
104            self.endpoint,
105            self.filesystem,
106            percent_encode_path(&p)
107        );
108
109        let mut req = Request::get(&url);
110
111        if !range.is_full() {
112            req = req.header(http::header::RANGE, range.to_header());
113        }
114
115        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
116
117        self.sign(&mut req).await?;
118        self.info.http_client().fetch(req).await
119    }
120
121    /// resource should be one of `file` or `directory`
122    ///
123    /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create
124    pub async fn azdls_create(
125        &self,
126        path: &str,
127        resource: &str,
128        args: &OpWrite,
129        body: Buffer,
130    ) -> Result<Response<Buffer>> {
131        let p = build_abs_path(&self.root, path)
132            .trim_end_matches('/')
133            .to_string();
134
135        let url = format!(
136            "{}/{}/{}?resource={resource}",
137            self.endpoint,
138            self.filesystem,
139            percent_encode_path(&p)
140        );
141
142        let mut req = Request::put(&url);
143
144        // Content length must be 0 for create request.
145        req = req.header(CONTENT_LENGTH, 0);
146
147        if let Some(ty) = args.content_type() {
148            req = req.header(CONTENT_TYPE, ty)
149        }
150
151        if let Some(pos) = args.content_disposition() {
152            req = req.header(CONTENT_DISPOSITION, pos)
153        }
154
155        if args.if_not_exists() {
156            req = req.header(IF_NONE_MATCH, "*")
157        }
158
159        if let Some(v) = args.if_none_match() {
160            req = req.header(IF_NONE_MATCH, v)
161        }
162
163        // Set body
164        let mut req = req.body(body).map_err(new_request_build_error)?;
165
166        self.sign(&mut req).await?;
167
168        self.send(req).await
169    }
170
171    pub async fn azdls_rename(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
172        let source = build_abs_path(&self.root, from);
173        let target = build_abs_path(&self.root, to);
174
175        let url = format!(
176            "{}/{}/{}",
177            self.endpoint,
178            self.filesystem,
179            percent_encode_path(&target)
180        );
181
182        let mut req = Request::put(&url)
183            .header(
184                X_MS_RENAME_SOURCE,
185                format!("/{}/{}", self.filesystem, percent_encode_path(&source)),
186            )
187            .header(CONTENT_LENGTH, 0)
188            .body(Buffer::new())
189            .map_err(new_request_build_error)?;
190
191        self.sign(&mut req).await?;
192        self.send(req).await
193    }
194
195    /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update
196    pub async fn azdls_update(
197        &self,
198        path: &str,
199        size: Option<u64>,
200        position: u64,
201        body: Buffer,
202    ) -> Result<Response<Buffer>> {
203        let p = build_abs_path(&self.root, path);
204
205        // - close: Make this is the final action to this file.
206        // - flush: Flush the file directly.
207        let url = format!(
208            "{}/{}/{}?action=append&close=true&flush=true&position={}",
209            self.endpoint,
210            self.filesystem,
211            percent_encode_path(&p),
212            position
213        );
214
215        let mut req = Request::patch(&url);
216
217        if let Some(size) = size {
218            req = req.header(CONTENT_LENGTH, size)
219        }
220
221        // Set body
222        let mut req = req.body(body).map_err(new_request_build_error)?;
223
224        self.sign(&mut req).await?;
225
226        self.send(req).await
227    }
228
229    pub async fn azdls_get_properties(&self, path: &str) -> Result<Response<Buffer>> {
230        let p = build_abs_path(&self.root, path)
231            .trim_end_matches('/')
232            .to_string();
233
234        let url = format!(
235            "{}/{}/{}?action=getStatus",
236            self.endpoint,
237            self.filesystem,
238            percent_encode_path(&p)
239        );
240
241        let req = Request::head(&url);
242
243        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
244
245        self.sign(&mut req).await?;
246        self.info.http_client().send(req).await
247    }
248
249    pub async fn azdls_delete(&self, path: &str) -> Result<Response<Buffer>> {
250        let p = build_abs_path(&self.root, path)
251            .trim_end_matches('/')
252            .to_string();
253
254        let url = format!(
255            "{}/{}/{}",
256            self.endpoint,
257            self.filesystem,
258            percent_encode_path(&p)
259        );
260
261        let req = Request::delete(&url);
262
263        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
264
265        self.sign(&mut req).await?;
266        self.send(req).await
267    }
268
269    pub async fn azdls_list(
270        &self,
271        path: &str,
272        continuation: &str,
273        limit: Option<usize>,
274    ) -> Result<Response<Buffer>> {
275        let p = build_abs_path(&self.root, path)
276            .trim_end_matches('/')
277            .to_string();
278
279        let mut url = QueryPairsWriter::new(&format!("{}/{}", self.endpoint, self.filesystem))
280            .push("resource", "filesystem")
281            .push("recursive", "false");
282        if !p.is_empty() {
283            url = url.push("directory", &percent_encode_path(&p));
284        }
285        if let Some(limit) = limit {
286            url = url.push("maxResults", &limit.to_string());
287        }
288        if !continuation.is_empty() {
289            url = url.push("continuation", &percent_encode_path(continuation));
290        }
291
292        let mut req = Request::get(url.finish())
293            .body(Buffer::new())
294            .map_err(new_request_build_error)?;
295
296        self.sign(&mut req).await?;
297        self.send(req).await
298    }
299
300    pub async fn azdls_ensure_parent_path(&self, path: &str) -> Result<Option<Response<Buffer>>> {
301        let abs_target_path = path.trim_end_matches('/').to_string();
302        let abs_target_path = abs_target_path.as_str();
303        let mut parts: Vec<&str> = abs_target_path
304            .split('/')
305            .filter(|x| !x.is_empty())
306            .collect();
307
308        if !parts.is_empty() {
309            parts.pop();
310        }
311
312        if !parts.is_empty() {
313            let parent_path = parts.join("/");
314            let resp = self
315                .azdls_create(
316                    &parent_path,
317                    "directory",
318                    &OpWrite::default(),
319                    Buffer::new(),
320                )
321                .await?;
322
323            Ok(Some(resp))
324        } else {
325            Ok(None)
326        }
327    }
328}