opendal_core/services/azdls/
core.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::HeaderName;
22use http::HeaderValue;
23use http::Request;
24use http::Response;
25use http::StatusCode;
26use http::header::CONTENT_DISPOSITION;
27use http::header::CONTENT_LENGTH;
28use http::header::CONTENT_TYPE;
29use http::header::IF_NONE_MATCH;
30use reqsign::AzureStorageCredential;
31use reqsign::AzureStorageLoader;
32use reqsign::AzureStorageSigner;
33
34use super::error::parse_error;
35use crate::raw::*;
36use crate::*;
37
38const X_MS_RENAME_SOURCE: &str = "x-ms-rename-source";
39const X_MS_VERSION: &str = "x-ms-version";
40pub const X_MS_VERSION_ID: &str = "x-ms-version-id";
41pub const DIRECTORY: &str = "directory";
42pub const FILE: &str = "file";
43
44pub struct AzdlsCore {
45 pub info: Arc<AccessorInfo>,
46 pub filesystem: String,
47 pub root: String,
48 pub endpoint: String,
49
50 pub loader: AzureStorageLoader,
51 pub signer: AzureStorageSigner,
52}
53
54impl Debug for AzdlsCore {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("AzdlsCore")
57 .field("filesystem", &self.filesystem)
58 .field("root", &self.root)
59 .field("endpoint", &self.endpoint)
60 .finish_non_exhaustive()
61 }
62}
63
64impl AzdlsCore {
65 async fn load_credential(&self) -> Result<AzureStorageCredential> {
66 let cred = self
67 .loader
68 .load()
69 .await
70 .map_err(new_request_credential_error)?;
71
72 if let Some(cred) = cred {
73 Ok(cred)
74 } else {
75 Err(Error::new(
76 ErrorKind::ConfigInvalid,
77 "no valid credential found",
78 ))
79 }
80 }
81
82 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
83 let cred = self.load_credential().await?;
84 req.headers_mut().insert(
86 HeaderName::from_static(X_MS_VERSION),
87 HeaderValue::from_static("2022-11-02"),
93 );
94 self.signer.sign(req, &cred).map_err(new_request_sign_error)
95 }
96
97 #[inline]
98 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
99 self.info.http_client().send(req).await
100 }
101}
102
103impl AzdlsCore {
104 pub async fn azdls_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
105 let p = build_abs_path(&self.root, path);
106
107 let url = format!(
108 "{}/{}/{}",
109 self.endpoint,
110 self.filesystem,
111 percent_encode_path(&p)
112 );
113
114 let mut req = Request::get(&url);
115
116 if !range.is_full() {
117 req = req.header(http::header::RANGE, range.to_header());
118 }
119
120 let mut req = req
121 .extension(Operation::Read)
122 .body(Buffer::new())
123 .map_err(new_request_build_error)?;
124
125 self.sign(&mut req).await?;
126 self.info.http_client().fetch(req).await
127 }
128
129 pub async fn azdls_create(
133 &self,
134 path: &str,
135 resource: &str,
136 args: &OpWrite,
137 ) -> Result<Response<Buffer>> {
138 let p = build_abs_path(&self.root, path)
139 .trim_end_matches('/')
140 .to_string();
141
142 let url = format!(
143 "{}/{}/{}?resource={resource}",
144 self.endpoint,
145 self.filesystem,
146 percent_encode_path(&p)
147 );
148
149 let mut req = Request::put(&url);
150
151 req = req.header(CONTENT_LENGTH, 0);
153
154 if let Some(ty) = args.content_type() {
155 req = req.header(CONTENT_TYPE, ty)
156 }
157
158 if let Some(pos) = args.content_disposition() {
159 req = req.header(CONTENT_DISPOSITION, pos)
160 }
161
162 if args.if_not_exists() {
163 req = req.header(IF_NONE_MATCH, "*")
164 }
165
166 if let Some(v) = args.if_none_match() {
167 req = req.header(IF_NONE_MATCH, v)
168 }
169
170 let operation = if resource == DIRECTORY {
171 Operation::CreateDir
172 } else {
173 Operation::Write
174 };
175
176 let mut req = req
177 .extension(operation)
178 .body(Buffer::new())
179 .map_err(new_request_build_error)?;
180
181 self.sign(&mut req).await?;
182 self.send(req).await
183 }
184
185 pub async fn azdls_rename(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
186 let source = build_abs_path(&self.root, from);
187 let target = build_abs_path(&self.root, to);
188
189 let url = format!(
190 "{}/{}/{}",
191 self.endpoint,
192 self.filesystem,
193 percent_encode_path(&target)
194 );
195
196 let source_path = format!("/{}/{}", self.filesystem, percent_encode_path(&source));
197
198 let mut req = Request::put(&url)
199 .header(X_MS_RENAME_SOURCE, source_path)
200 .header(CONTENT_LENGTH, 0)
201 .extension(Operation::Rename)
202 .body(Buffer::new())
203 .map_err(new_request_build_error)?;
204
205 self.sign(&mut req).await?;
206 self.send(req).await
207 }
208
209 pub async fn azdls_append(
211 &self,
212 path: &str,
213 size: Option<u64>,
214 position: u64,
215 flush: bool,
216 close: bool,
217 body: Buffer,
218 ) -> Result<Response<Buffer>> {
219 let p = build_abs_path(&self.root, path);
220
221 let mut url = format!(
222 "{}/{}/{}?action=append&position={}",
223 self.endpoint,
224 self.filesystem,
225 percent_encode_path(&p),
226 position
227 );
228
229 if flush {
230 url.push_str("&flush=true");
231 }
232 if close {
233 url.push_str("&close=true");
234 }
235
236 let mut req = Request::patch(&url);
237
238 if let Some(size) = size {
239 req = req.header(CONTENT_LENGTH, size)
240 }
241
242 let mut req = req
243 .extension(Operation::Write)
244 .body(body)
245 .map_err(new_request_build_error)?;
246
247 self.sign(&mut req).await?;
248 self.send(req).await
249 }
250
251 pub async fn azdls_flush(
253 &self,
254 path: &str,
255 position: u64,
256 close: bool,
257 ) -> Result<Response<Buffer>> {
258 let p = build_abs_path(&self.root, path);
259
260 let mut url = format!(
261 "{}/{}/{}?action=flush&position={}",
262 self.endpoint,
263 self.filesystem,
264 percent_encode_path(&p),
265 position
266 );
267
268 if close {
269 url.push_str("&close=true");
270 }
271
272 let mut req = Request::patch(&url)
273 .header(CONTENT_LENGTH, 0)
274 .extension(Operation::Write)
275 .body(Buffer::new())
276 .map_err(new_request_build_error)?;
277
278 self.sign(&mut req).await?;
279 self.send(req).await
280 }
281
282 pub async fn azdls_get_properties(&self, path: &str) -> Result<Response<Buffer>> {
283 let p = build_abs_path(&self.root, path)
284 .trim_end_matches('/')
285 .to_string();
286
287 let url = format!(
288 "{}/{}/{}?action=getStatus",
289 self.endpoint,
290 self.filesystem,
291 percent_encode_path(&p)
292 );
293
294 let req = Request::head(&url);
295
296 let mut req = req
297 .extension(Operation::Stat)
298 .body(Buffer::new())
299 .map_err(new_request_build_error)?;
300
301 self.sign(&mut req).await?;
302 self.send(req).await
303 }
304
305 pub async fn azdls_stat_metadata(&self, path: &str) -> Result<Metadata> {
306 let resp = self.azdls_get_properties(path).await?;
307
308 if resp.status() != StatusCode::OK {
309 return Err(parse_error(resp));
310 }
311
312 let headers = resp.headers();
313 let mut meta = parse_into_metadata(path, headers)?;
314
315 if let Some(version_id) = parse_header_to_str(headers, X_MS_VERSION_ID)? {
316 meta.set_version(version_id);
317 }
318
319 let resource = resp
320 .headers()
321 .get("x-ms-resource-type")
322 .ok_or_else(|| {
323 Error::new(
324 ErrorKind::Unexpected,
325 "azdls should return x-ms-resource-type header, but it's missing",
326 )
327 })?
328 .to_str()
329 .map_err(|err| {
330 Error::new(
331 ErrorKind::Unexpected,
332 "azdls should return x-ms-resource-type header, but it's not a valid string",
333 )
334 .set_source(err)
335 })?;
336
337 match resource {
338 FILE => Ok(meta.with_mode(EntryMode::FILE)),
339 DIRECTORY => Ok(meta.with_mode(EntryMode::DIR)),
340 v => Err(Error::new(
341 ErrorKind::Unexpected,
342 "azdls returns an unknown x-ms-resource-type",
343 )
344 .with_context("resource", v)),
345 }
346 }
347
348 pub async fn azdls_delete(&self, path: &str) -> Result<Response<Buffer>> {
349 let p = build_abs_path(&self.root, path)
350 .trim_end_matches('/')
351 .to_string();
352
353 let url = format!(
354 "{}/{}/{}",
355 self.endpoint,
356 self.filesystem,
357 percent_encode_path(&p)
358 );
359
360 let mut req = Request::delete(&url)
361 .extension(Operation::Delete)
362 .body(Buffer::new())
363 .map_err(new_request_build_error)?;
364
365 self.sign(&mut req).await?;
366 self.send(req).await
367 }
368
369 pub async fn azdls_list(
370 &self,
371 path: &str,
372 continuation: &str,
373 limit: Option<usize>,
374 ) -> Result<Response<Buffer>> {
375 let p = build_abs_path(&self.root, path)
376 .trim_end_matches('/')
377 .to_string();
378
379 let mut url = QueryPairsWriter::new(&format!("{}/{}", self.endpoint, self.filesystem))
380 .push("resource", "filesystem")
381 .push("recursive", "false");
382 if !p.is_empty() {
383 url = url.push("directory", &percent_encode_path(&p));
384 }
385 if let Some(limit) = limit {
386 url = url.push("maxResults", &limit.to_string());
387 }
388 if !continuation.is_empty() {
389 url = url.push("continuation", &percent_encode_path(continuation));
390 }
391
392 let mut req = Request::get(url.finish())
393 .extension(Operation::List)
394 .body(Buffer::new())
395 .map_err(new_request_build_error)?;
396
397 self.sign(&mut req).await?;
398 self.send(req).await
399 }
400
401 pub async fn azdls_ensure_parent_path(&self, path: &str) -> Result<Option<Response<Buffer>>> {
402 let abs_target_path = path.trim_end_matches('/').to_string();
403 let abs_target_path = abs_target_path.as_str();
404 let mut parts: Vec<&str> = abs_target_path
405 .split('/')
406 .filter(|x| !x.is_empty())
407 .collect();
408
409 if !parts.is_empty() {
410 parts.pop();
411 }
412
413 if !parts.is_empty() {
414 let parent_path = parts.join("/");
415 let resp = self
416 .azdls_create(&parent_path, DIRECTORY, &OpWrite::default())
417 .await?;
418
419 Ok(Some(resp))
420 } else {
421 Ok(None)
422 }
423 }
424}