opendal/services/upyun/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use base64::Engine;
23use hmac::Hmac;
24use hmac::Mac;
25use http::header;
26use http::HeaderMap;
27use http::Request;
28use http::Response;
29use md5::Digest;
30use serde::Deserialize;
31use sha1::Sha1;
32
33use self::constants::*;
34use crate::raw::*;
35use crate::*;
36
37pub(super) mod constants {
38    pub const X_UPYUN_FILE_TYPE: &str = "x-upyun-file-type";
39    pub const X_UPYUN_FILE_SIZE: &str = "x-upyun-file-size";
40    pub const X_UPYUN_CACHE_CONTROL: &str = "x-upyun-meta-cache-control";
41    pub const X_UPYUN_CONTENT_DISPOSITION: &str = "x-upyun-meta-content-disposition";
42    pub const X_UPYUN_MULTI_STAGE: &str = "X-Upyun-Multi-Stage";
43    pub const X_UPYUN_MULTI_TYPE: &str = "X-Upyun-Multi-Type";
44    pub const X_UPYUN_MULTI_DISORDER: &str = "X-Upyun-Multi-Disorder";
45    pub const X_UPYUN_MULTI_UUID: &str = "X-Upyun-Multi-Uuid";
46    pub const X_UPYUN_PART_ID: &str = "X-Upyun-Part-Id";
47    pub const X_UPYUN_FOLDER: &str = "x-upyun-folder";
48    pub const X_UPYUN_MOVE_SOURCE: &str = "X-Upyun-Move-Source";
49    pub const X_UPYUN_COPY_SOURCE: &str = "X-Upyun-Copy-Source";
50    pub const X_UPYUN_METADATA_DIRECTIVE: &str = "X-Upyun-Metadata-Directive";
51    pub const X_UPYUN_LIST_ITER: &str = "x-list-iter";
52    pub const X_UPYUN_LIST_LIMIT: &str = "X-List-Limit";
53    pub const X_UPYUN_LIST_MAX_LIMIT: usize = 4096;
54    pub const X_UPYUN_LIST_DEFAULT_LIMIT: usize = 256;
55}
56
57#[derive(Clone)]
58pub struct UpyunCore {
59    pub info: Arc<AccessorInfo>,
60    /// The root of this core.
61    pub root: String,
62    /// The endpoint of this backend.
63    pub operator: String,
64    /// The bucket of this backend.
65    pub bucket: String,
66
67    /// signer of this backend.
68    pub signer: UpyunSigner,
69}
70
71impl Debug for UpyunCore {
72    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("Backend")
74            .field("root", &self.root)
75            .field("bucket", &self.bucket)
76            .field("operator", &self.operator)
77            .finish_non_exhaustive()
78    }
79}
80
81impl UpyunCore {
82    #[inline]
83    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
84        self.info.http_client().send(req).await
85    }
86
87    pub fn sign(&self, req: &mut Request<Buffer>) -> Result<()> {
88        // get rfc1123 date
89        let date = chrono::Utc::now()
90            .format("%a, %d %b %Y %H:%M:%S GMT")
91            .to_string();
92        let authorization =
93            self.signer
94                .authorization(&date, req.method().as_str(), req.uri().path());
95
96        req.headers_mut()
97            .insert("Authorization", authorization.parse().unwrap());
98        req.headers_mut().insert("Date", date.parse().unwrap());
99
100        Ok(())
101    }
102}
103
104impl UpyunCore {
105    pub async fn download_file(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
106        let path = build_abs_path(&self.root, path);
107
108        let url = format!(
109            "https://v0.api.upyun.com/{}/{}",
110            self.bucket,
111            percent_encode_path(&path)
112        );
113
114        let req = Request::get(url);
115
116        let mut req = req
117            .header(header::RANGE, range.to_header())
118            .extension(Operation::Read)
119            .body(Buffer::new())
120            .map_err(new_request_build_error)?;
121
122        self.sign(&mut req)?;
123
124        self.info.http_client().fetch(req).await
125    }
126
127    pub async fn info(&self, path: &str) -> Result<Response<Buffer>> {
128        let path = build_abs_path(&self.root, path);
129
130        let url = format!(
131            "https://v0.api.upyun.com/{}/{}",
132            self.bucket,
133            percent_encode_path(&path)
134        );
135
136        let req = Request::head(url);
137
138        let mut req = req
139            .extension(Operation::Stat)
140            .body(Buffer::new())
141            .map_err(new_request_build_error)?;
142
143        self.sign(&mut req)?;
144
145        self.send(req).await
146    }
147
148    pub fn upload(
149        &self,
150        path: &str,
151        size: Option<u64>,
152        args: &OpWrite,
153        body: Buffer,
154    ) -> Result<Request<Buffer>> {
155        let p = build_abs_path(&self.root, path);
156
157        let url = format!(
158            "https://v0.api.upyun.com/{}/{}",
159            self.bucket,
160            percent_encode_path(&p)
161        );
162
163        let mut req = Request::put(&url);
164
165        if let Some(size) = size {
166            req = req.header(header::CONTENT_LENGTH, size.to_string())
167        }
168
169        if let Some(mime) = args.content_type() {
170            req = req.header(header::CONTENT_TYPE, mime)
171        }
172
173        if let Some(pos) = args.content_disposition() {
174            req = req.header(X_UPYUN_CONTENT_DISPOSITION, pos)
175        }
176
177        if let Some(cache_control) = args.cache_control() {
178            req = req.header(X_UPYUN_CACHE_CONTROL, cache_control)
179        }
180
181        let req = req.extension(Operation::Write);
182
183        // Set body
184        let mut req = req.body(body).map_err(new_request_build_error)?;
185
186        self.sign(&mut req)?;
187
188        Ok(req)
189    }
190
191    pub async fn delete(&self, path: &str) -> Result<Response<Buffer>> {
192        let path = build_abs_path(&self.root, path);
193
194        let url = format!(
195            "https://v0.api.upyun.com/{}/{}",
196            self.bucket,
197            percent_encode_path(&path)
198        );
199
200        let req = Request::delete(url);
201
202        let req = req.extension(Operation::Delete);
203
204        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
205
206        self.sign(&mut req)?;
207
208        self.send(req).await
209    }
210
211    pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
212        let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, from));
213        let to = build_abs_path(&self.root, to);
214
215        let url = format!(
216            "https://v0.api.upyun.com/{}/{}",
217            self.bucket,
218            percent_encode_path(&to)
219        );
220
221        let mut req = Request::put(url);
222
223        req = req.header(header::CONTENT_LENGTH, "0");
224
225        req = req.header(X_UPYUN_COPY_SOURCE, from);
226
227        req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy");
228
229        let req = req.extension(Operation::Copy);
230
231        // Set body
232        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
233
234        self.sign(&mut req)?;
235
236        self.send(req).await
237    }
238
239    pub async fn move_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
240        let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, from));
241        let to = build_abs_path(&self.root, to);
242
243        let url = format!(
244            "https://v0.api.upyun.com/{}/{}",
245            self.bucket,
246            percent_encode_path(&to)
247        );
248
249        let mut req = Request::put(url);
250
251        req = req.header(header::CONTENT_LENGTH, "0");
252
253        req = req.header(X_UPYUN_MOVE_SOURCE, from);
254
255        req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy");
256
257        let req = req.extension(Operation::Rename);
258
259        // Set body
260        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
261
262        self.sign(&mut req)?;
263
264        self.send(req).await
265    }
266
267    pub async fn create_dir(&self, path: &str) -> Result<Response<Buffer>> {
268        let path = build_abs_path(&self.root, path);
269        let path = path[..path.len() - 1].to_string();
270
271        let url = format!(
272            "https://v0.api.upyun.com/{}/{}",
273            self.bucket,
274            percent_encode_path(&path)
275        );
276
277        let mut req = Request::post(url);
278
279        req = req.header("folder", "true");
280
281        req = req.header(X_UPYUN_FOLDER, "true");
282
283        let req = req.extension(Operation::CreateDir);
284
285        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
286
287        self.sign(&mut req)?;
288
289        self.send(req).await
290    }
291
292    pub async fn initiate_multipart_upload(
293        &self,
294        path: &str,
295        args: &OpWrite,
296    ) -> Result<Response<Buffer>> {
297        let path = build_abs_path(&self.root, path);
298
299        let url = format!(
300            "https://v0.api.upyun.com/{}/{}",
301            self.bucket,
302            percent_encode_path(&path)
303        );
304
305        let mut req = Request::put(url);
306
307        req = req.header(X_UPYUN_MULTI_STAGE, "initiate");
308
309        req = req.header(X_UPYUN_MULTI_DISORDER, "true");
310
311        if let Some(content_type) = args.content_type() {
312            req = req.header(X_UPYUN_MULTI_TYPE, content_type);
313        }
314
315        if let Some(content_disposition) = args.content_disposition() {
316            req = req.header(X_UPYUN_CONTENT_DISPOSITION, content_disposition)
317        }
318
319        if let Some(cache_control) = args.cache_control() {
320            req = req.header(X_UPYUN_CACHE_CONTROL, cache_control)
321        }
322
323        let req = req.extension(Operation::Write);
324
325        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
326
327        self.sign(&mut req)?;
328
329        self.send(req).await
330    }
331
332    pub fn upload_part(
333        &self,
334        path: &str,
335        upload_id: &str,
336        part_number: usize,
337        size: u64,
338        body: Buffer,
339    ) -> Result<Request<Buffer>> {
340        let p = build_abs_path(&self.root, path);
341
342        let url = format!(
343            "https://v0.api.upyun.com/{}/{}",
344            self.bucket,
345            percent_encode_path(&p),
346        );
347
348        let mut req = Request::put(&url);
349
350        req = req.header(header::CONTENT_LENGTH, size);
351
352        req = req.header(X_UPYUN_MULTI_STAGE, "upload");
353
354        req = req.header(X_UPYUN_MULTI_UUID, upload_id);
355
356        req = req.header(X_UPYUN_PART_ID, part_number);
357
358        let req = req.extension(Operation::Write);
359
360        // Set body
361        let mut req = req.body(body).map_err(new_request_build_error)?;
362
363        self.sign(&mut req)?;
364
365        Ok(req)
366    }
367
368    pub async fn complete_multipart_upload(
369        &self,
370        path: &str,
371        upload_id: &str,
372    ) -> Result<Response<Buffer>> {
373        let p = build_abs_path(&self.root, path);
374
375        let url = format!(
376            "https://v0.api.upyun.com/{}/{}",
377            self.bucket,
378            percent_encode_path(&p),
379        );
380
381        let mut req = Request::put(url);
382
383        req = req.header(X_UPYUN_MULTI_STAGE, "complete");
384
385        req = req.header(X_UPYUN_MULTI_UUID, upload_id);
386
387        let req = req.extension(Operation::Write);
388
389        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
390
391        self.sign(&mut req)?;
392
393        self.send(req).await
394    }
395
396    pub async fn list_objects(
397        &self,
398        path: &str,
399        iter: &str,
400        limit: Option<usize>,
401    ) -> Result<Response<Buffer>> {
402        let path = build_abs_path(&self.root, path);
403
404        let url = format!(
405            "https://v0.api.upyun.com/{}/{}",
406            self.bucket,
407            percent_encode_path(&path),
408        );
409
410        let mut req = Request::get(url.clone());
411
412        req = req.header(header::ACCEPT, "application/json");
413
414        if !iter.is_empty() {
415            req = req.header(X_UPYUN_LIST_ITER, iter);
416        }
417
418        if let Some(mut limit) = limit {
419            if limit > X_UPYUN_LIST_MAX_LIMIT {
420                limit = X_UPYUN_LIST_DEFAULT_LIMIT;
421            }
422            req = req.header(X_UPYUN_LIST_LIMIT, limit);
423        }
424
425        let req = req.extension(Operation::List);
426
427        // Set body
428        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
429
430        self.sign(&mut req)?;
431
432        self.send(req).await
433    }
434}
435
436#[derive(Clone, Default)]
437pub struct UpyunSigner {
438    pub operator: String,
439    pub password: String,
440}
441
442type HmacSha1 = Hmac<Sha1>;
443
444impl UpyunSigner {
445    pub fn authorization(&self, date: &str, method: &str, uri: &str) -> String {
446        let sign = vec![method, uri, date];
447
448        let sign = sign
449            .into_iter()
450            .filter(|s| !s.is_empty())
451            .collect::<Vec<&str>>()
452            .join("&");
453
454        let mut mac = HmacSha1::new_from_slice(format_md5(self.password.as_bytes()).as_bytes())
455            .expect("HMAC can take key of any size");
456        mac.update(sign.as_bytes());
457        let sign_str = mac.finalize().into_bytes();
458
459        let sign = base64::engine::general_purpose::STANDARD.encode(sign_str.as_slice());
460        format!("UPYUN {}:{}", self.operator, sign)
461    }
462}
463
464pub(super) fn parse_info(headers: &HeaderMap) -> Result<Metadata> {
465    let mode = if parse_header_to_str(headers, X_UPYUN_FILE_TYPE)? == Some("file") {
466        EntryMode::FILE
467    } else {
468        EntryMode::DIR
469    };
470
471    let mut m = Metadata::new(mode);
472
473    if let Some(v) = parse_header_to_str(headers, X_UPYUN_FILE_SIZE)? {
474        let size = v.parse::<u64>().map_err(|e| {
475            Error::new(ErrorKind::Unexpected, "header value is not valid integer")
476                .with_operation("parse_info")
477                .set_source(e)
478        })?;
479        m.set_content_length(size);
480    }
481
482    if let Some(v) = parse_content_type(headers)? {
483        m.set_content_type(v);
484    }
485
486    if let Some(v) = parse_content_md5(headers)? {
487        m.set_content_md5(v);
488    }
489
490    if let Some(v) = parse_header_to_str(headers, X_UPYUN_CACHE_CONTROL)? {
491        m.set_cache_control(v);
492    }
493
494    if let Some(v) = parse_header_to_str(headers, X_UPYUN_CONTENT_DISPOSITION)? {
495        m.set_content_disposition(v);
496    }
497
498    Ok(m)
499}
500
501pub fn format_md5(bs: &[u8]) -> String {
502    let mut hasher = md5::Md5::new();
503    hasher.update(bs);
504
505    format!("{:x}", hasher.finalize())
506}
507
508#[derive(Debug, Deserialize)]
509pub(super) struct File {
510    #[serde(rename = "type")]
511    pub type_field: String,
512    pub name: String,
513    pub length: u64,
514    pub last_modified: i64,
515}
516
517#[derive(Debug, Deserialize)]
518pub(super) struct ListObjectsResponse {
519    pub iter: String,
520    pub files: Vec<File>,
521}