1use http::header::CONTENT_LENGTH;
19use http::header::CONTENT_TYPE;
20use http::header::{CONTENT_DISPOSITION, IF_NONE_MATCH};
21use http::HeaderName;
22use http::HeaderValue;
23use http::Request;
24use http::Response;
25use reqsign::AzureStorageCredential;
26use reqsign::AzureStorageLoader;
27use reqsign::AzureStorageSigner;
28use std::fmt;
29use std::fmt::Debug;
30use std::fmt::Formatter;
31use std::sync::Arc;
32
33use crate::raw::*;
34use crate::*;
35
36const X_MS_RENAME_SOURCE: &str = "x-ms-rename-source";
37const X_MS_VERSION: &str = "x-ms-version";
38
39pub struct AzdlsCore {
40 pub info: Arc<AccessorInfo>,
41 pub filesystem: String,
42 pub root: String,
43 pub endpoint: String,
44
45 pub loader: AzureStorageLoader,
46 pub signer: AzureStorageSigner,
47}
48
49impl Debug for AzdlsCore {
50 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
51 f.debug_struct("AzdlsCore")
52 .field("filesystem", &self.filesystem)
53 .field("root", &self.root)
54 .field("endpoint", &self.endpoint)
55 .finish_non_exhaustive()
56 }
57}
58
59impl AzdlsCore {
60 async fn load_credential(&self) -> Result<AzureStorageCredential> {
61 let cred = self
62 .loader
63 .load()
64 .await
65 .map_err(new_request_credential_error)?;
66
67 if let Some(cred) = cred {
68 Ok(cred)
69 } else {
70 Err(Error::new(
71 ErrorKind::ConfigInvalid,
72 "no valid credential found",
73 ))
74 }
75 }
76
77 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
78 let cred = self.load_credential().await?;
79 req.headers_mut().insert(
81 HeaderName::from_static(X_MS_VERSION),
82 HeaderValue::from_static("2022-11-02"),
88 );
89 self.signer.sign(req, &cred).map_err(new_request_sign_error)
90 }
91
92 #[inline]
93 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
94 self.info.http_client().send(req).await
95 }
96}
97
98impl AzdlsCore {
99 pub async fn azdls_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
100 let p = build_abs_path(&self.root, path);
101
102 let url = format!(
103 "{}/{}/{}",
104 self.endpoint,
105 self.filesystem,
106 percent_encode_path(&p)
107 );
108
109 let mut req = Request::get(&url);
110
111 if !range.is_full() {
112 req = req.header(http::header::RANGE, range.to_header());
113 }
114
115 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
116
117 self.sign(&mut req).await?;
118 self.info.http_client().fetch(req).await
119 }
120
121 pub async fn azdls_create(
125 &self,
126 path: &str,
127 resource: &str,
128 args: &OpWrite,
129 body: Buffer,
130 ) -> Result<Response<Buffer>> {
131 let p = build_abs_path(&self.root, path)
132 .trim_end_matches('/')
133 .to_string();
134
135 let url = format!(
136 "{}/{}/{}?resource={resource}",
137 self.endpoint,
138 self.filesystem,
139 percent_encode_path(&p)
140 );
141
142 let mut req = Request::put(&url);
143
144 req = req.header(CONTENT_LENGTH, 0);
146
147 if let Some(ty) = args.content_type() {
148 req = req.header(CONTENT_TYPE, ty)
149 }
150
151 if let Some(pos) = args.content_disposition() {
152 req = req.header(CONTENT_DISPOSITION, pos)
153 }
154
155 if args.if_not_exists() {
156 req = req.header(IF_NONE_MATCH, "*")
157 }
158
159 if let Some(v) = args.if_none_match() {
160 req = req.header(IF_NONE_MATCH, v)
161 }
162
163 let mut req = req.body(body).map_err(new_request_build_error)?;
165
166 self.sign(&mut req).await?;
167
168 self.send(req).await
169 }
170
171 pub async fn azdls_rename(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
172 let source = build_abs_path(&self.root, from);
173 let target = build_abs_path(&self.root, to);
174
175 let url = format!(
176 "{}/{}/{}",
177 self.endpoint,
178 self.filesystem,
179 percent_encode_path(&target)
180 );
181
182 let mut req = Request::put(&url)
183 .header(
184 X_MS_RENAME_SOURCE,
185 format!("/{}/{}", self.filesystem, percent_encode_path(&source)),
186 )
187 .header(CONTENT_LENGTH, 0)
188 .body(Buffer::new())
189 .map_err(new_request_build_error)?;
190
191 self.sign(&mut req).await?;
192 self.send(req).await
193 }
194
195 pub async fn azdls_update(
197 &self,
198 path: &str,
199 size: Option<u64>,
200 position: u64,
201 body: Buffer,
202 ) -> Result<Response<Buffer>> {
203 let p = build_abs_path(&self.root, path);
204
205 let url = format!(
208 "{}/{}/{}?action=append&close=true&flush=true&position={}",
209 self.endpoint,
210 self.filesystem,
211 percent_encode_path(&p),
212 position
213 );
214
215 let mut req = Request::patch(&url);
216
217 if let Some(size) = size {
218 req = req.header(CONTENT_LENGTH, size)
219 }
220
221 let mut req = req.body(body).map_err(new_request_build_error)?;
223
224 self.sign(&mut req).await?;
225
226 self.send(req).await
227 }
228
229 pub async fn azdls_get_properties(&self, path: &str) -> Result<Response<Buffer>> {
230 let p = build_abs_path(&self.root, path)
231 .trim_end_matches('/')
232 .to_string();
233
234 let url = format!(
235 "{}/{}/{}?action=getStatus",
236 self.endpoint,
237 self.filesystem,
238 percent_encode_path(&p)
239 );
240
241 let req = Request::head(&url);
242
243 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
244
245 self.sign(&mut req).await?;
246 self.info.http_client().send(req).await
247 }
248
249 pub async fn azdls_delete(&self, path: &str) -> Result<Response<Buffer>> {
250 let p = build_abs_path(&self.root, path)
251 .trim_end_matches('/')
252 .to_string();
253
254 let url = format!(
255 "{}/{}/{}",
256 self.endpoint,
257 self.filesystem,
258 percent_encode_path(&p)
259 );
260
261 let req = Request::delete(&url);
262
263 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
264
265 self.sign(&mut req).await?;
266 self.send(req).await
267 }
268
269 pub async fn azdls_list(
270 &self,
271 path: &str,
272 continuation: &str,
273 limit: Option<usize>,
274 ) -> Result<Response<Buffer>> {
275 let p = build_abs_path(&self.root, path)
276 .trim_end_matches('/')
277 .to_string();
278
279 let mut url = QueryPairsWriter::new(&format!("{}/{}", self.endpoint, self.filesystem))
280 .push("resource", "filesystem")
281 .push("recursive", "false");
282 if !p.is_empty() {
283 url = url.push("directory", &percent_encode_path(&p));
284 }
285 if let Some(limit) = limit {
286 url = url.push("maxResults", &limit.to_string());
287 }
288 if !continuation.is_empty() {
289 url = url.push("continuation", &percent_encode_path(continuation));
290 }
291
292 let mut req = Request::get(url.finish())
293 .body(Buffer::new())
294 .map_err(new_request_build_error)?;
295
296 self.sign(&mut req).await?;
297 self.send(req).await
298 }
299
300 pub async fn azdls_ensure_parent_path(&self, path: &str) -> Result<Option<Response<Buffer>>> {
301 let abs_target_path = path.trim_end_matches('/').to_string();
302 let abs_target_path = abs_target_path.as_str();
303 let mut parts: Vec<&str> = abs_target_path
304 .split('/')
305 .filter(|x| !x.is_empty())
306 .collect();
307
308 if !parts.is_empty() {
309 parts.pop();
310 }
311
312 if !parts.is_empty() {
313 let parent_path = parts.join("/");
314 let resp = self
315 .azdls_create(
316 &parent_path,
317 "directory",
318 &OpWrite::default(),
319 Buffer::new(),
320 )
321 .await?;
322
323 Ok(Some(resp))
324 } else {
325 Ok(None)
326 }
327 }
328}