opendal/services/upyun/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use base64::Engine;
19use hmac::Hmac;
20use hmac::Mac;
21use http::header;
22use http::HeaderMap;
23use http::Request;
24use http::Response;
25use md5::Digest;
26use serde::Deserialize;
27use sha1::Sha1;
28use std::fmt::Debug;
29use std::fmt::Formatter;
30use std::sync::Arc;
31
32use self::constants::*;
33use crate::raw::*;
34use crate::*;
35
36pub(super) mod constants {
37    pub const X_UPYUN_FILE_TYPE: &str = "x-upyun-file-type";
38    pub const X_UPYUN_FILE_SIZE: &str = "x-upyun-file-size";
39    pub const X_UPYUN_CACHE_CONTROL: &str = "x-upyun-meta-cache-control";
40    pub const X_UPYUN_CONTENT_DISPOSITION: &str = "x-upyun-meta-content-disposition";
41    pub const X_UPYUN_MULTI_STAGE: &str = "X-Upyun-Multi-Stage";
42    pub const X_UPYUN_MULTI_TYPE: &str = "X-Upyun-Multi-Type";
43    pub const X_UPYUN_MULTI_DISORDER: &str = "X-Upyun-Multi-Disorder";
44    pub const X_UPYUN_MULTI_UUID: &str = "X-Upyun-Multi-Uuid";
45    pub const X_UPYUN_PART_ID: &str = "X-Upyun-Part-Id";
46    pub const X_UPYUN_FOLDER: &str = "x-upyun-folder";
47    pub const X_UPYUN_MOVE_SOURCE: &str = "X-Upyun-Move-Source";
48    pub const X_UPYUN_COPY_SOURCE: &str = "X-Upyun-Copy-Source";
49    pub const X_UPYUN_METADATA_DIRECTIVE: &str = "X-Upyun-Metadata-Directive";
50    pub const X_UPYUN_LIST_ITER: &str = "x-list-iter";
51    pub const X_UPYUN_LIST_LIMIT: &str = "X-List-Limit";
52    pub const X_UPYUN_LIST_MAX_LIMIT: usize = 4096;
53    pub const X_UPYUN_LIST_DEFAULT_LIMIT: usize = 256;
54}
55
56#[derive(Clone)]
57pub struct UpyunCore {
58    pub info: Arc<AccessorInfo>,
59    /// The root of this core.
60    pub root: String,
61    /// The endpoint of this backend.
62    pub operator: String,
63    /// The bucket of this backend.
64    pub bucket: String,
65
66    /// signer of this backend.
67    pub signer: UpyunSigner,
68}
69
70impl Debug for UpyunCore {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("Backend")
73            .field("root", &self.root)
74            .field("bucket", &self.bucket)
75            .field("operator", &self.operator)
76            .finish_non_exhaustive()
77    }
78}
79
80impl UpyunCore {
81    #[inline]
82    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
83        self.info.http_client().send(req).await
84    }
85
86    pub fn sign(&self, req: &mut Request<Buffer>) -> Result<()> {
87        // get rfc1123 date
88        let date = chrono::Utc::now()
89            .format("%a, %d %b %Y %H:%M:%S GMT")
90            .to_string();
91        let authorization =
92            self.signer
93                .authorization(&date, req.method().as_str(), req.uri().path());
94
95        req.headers_mut()
96            .insert("Authorization", authorization.parse().unwrap());
97        req.headers_mut().insert("Date", date.parse().unwrap());
98
99        Ok(())
100    }
101}
102
103impl UpyunCore {
104    pub async fn download_file(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
105        let path = build_abs_path(&self.root, path);
106
107        let url = format!(
108            "https://v0.api.upyun.com/{}/{}",
109            self.bucket,
110            percent_encode_path(&path)
111        );
112
113        let req = Request::get(url);
114
115        let mut req = req
116            .header(header::RANGE, range.to_header())
117            .extension(Operation::Read)
118            .body(Buffer::new())
119            .map_err(new_request_build_error)?;
120
121        self.sign(&mut req)?;
122
123        self.info.http_client().fetch(req).await
124    }
125
126    pub async fn info(&self, path: &str) -> Result<Response<Buffer>> {
127        let path = build_abs_path(&self.root, path);
128
129        let url = format!(
130            "https://v0.api.upyun.com/{}/{}",
131            self.bucket,
132            percent_encode_path(&path)
133        );
134
135        let req = Request::head(url);
136
137        let mut req = req
138            .extension(Operation::Stat)
139            .body(Buffer::new())
140            .map_err(new_request_build_error)?;
141
142        self.sign(&mut req)?;
143
144        self.send(req).await
145    }
146
147    pub fn upload(
148        &self,
149        path: &str,
150        size: Option<u64>,
151        args: &OpWrite,
152        body: Buffer,
153    ) -> Result<Request<Buffer>> {
154        let p = build_abs_path(&self.root, path);
155
156        let url = format!(
157            "https://v0.api.upyun.com/{}/{}",
158            self.bucket,
159            percent_encode_path(&p)
160        );
161
162        let mut req = Request::put(&url);
163
164        if let Some(size) = size {
165            req = req.header(header::CONTENT_LENGTH, size.to_string())
166        }
167
168        if let Some(mime) = args.content_type() {
169            req = req.header(header::CONTENT_TYPE, mime)
170        }
171
172        if let Some(pos) = args.content_disposition() {
173            req = req.header(X_UPYUN_CONTENT_DISPOSITION, pos)
174        }
175
176        if let Some(cache_control) = args.cache_control() {
177            req = req.header(X_UPYUN_CACHE_CONTROL, cache_control)
178        }
179
180        let req = req.extension(Operation::Write);
181
182        // Set body
183        let mut req = req.body(body).map_err(new_request_build_error)?;
184
185        self.sign(&mut req)?;
186
187        Ok(req)
188    }
189
190    pub async fn delete(&self, path: &str) -> Result<Response<Buffer>> {
191        let path = build_abs_path(&self.root, path);
192
193        let url = format!(
194            "https://v0.api.upyun.com/{}/{}",
195            self.bucket,
196            percent_encode_path(&path)
197        );
198
199        let req = Request::delete(url);
200
201        let req = req.extension(Operation::Delete);
202
203        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
204
205        self.sign(&mut req)?;
206
207        self.send(req).await
208    }
209
210    pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
211        let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, from));
212        let to = build_abs_path(&self.root, to);
213
214        let url = format!(
215            "https://v0.api.upyun.com/{}/{}",
216            self.bucket,
217            percent_encode_path(&to)
218        );
219
220        let mut req = Request::put(url);
221
222        req = req.header(header::CONTENT_LENGTH, "0");
223
224        req = req.header(X_UPYUN_COPY_SOURCE, from);
225
226        req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy");
227
228        let req = req.extension(Operation::Copy);
229
230        // Set body
231        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
232
233        self.sign(&mut req)?;
234
235        self.send(req).await
236    }
237
238    pub async fn move_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
239        let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, from));
240        let to = build_abs_path(&self.root, to);
241
242        let url = format!(
243            "https://v0.api.upyun.com/{}/{}",
244            self.bucket,
245            percent_encode_path(&to)
246        );
247
248        let mut req = Request::put(url);
249
250        req = req.header(header::CONTENT_LENGTH, "0");
251
252        req = req.header(X_UPYUN_MOVE_SOURCE, from);
253
254        req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy");
255
256        let req = req.extension(Operation::Rename);
257
258        // Set body
259        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
260
261        self.sign(&mut req)?;
262
263        self.send(req).await
264    }
265
266    pub async fn create_dir(&self, path: &str) -> Result<Response<Buffer>> {
267        let path = build_abs_path(&self.root, path);
268        let path = path[..path.len() - 1].to_string();
269
270        let url = format!(
271            "https://v0.api.upyun.com/{}/{}",
272            self.bucket,
273            percent_encode_path(&path)
274        );
275
276        let mut req = Request::post(url);
277
278        req = req.header("folder", "true");
279
280        req = req.header(X_UPYUN_FOLDER, "true");
281
282        let req = req.extension(Operation::CreateDir);
283
284        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
285
286        self.sign(&mut req)?;
287
288        self.send(req).await
289    }
290
291    pub async fn initiate_multipart_upload(
292        &self,
293        path: &str,
294        args: &OpWrite,
295    ) -> Result<Response<Buffer>> {
296        let path = build_abs_path(&self.root, path);
297
298        let url = format!(
299            "https://v0.api.upyun.com/{}/{}",
300            self.bucket,
301            percent_encode_path(&path)
302        );
303
304        let mut req = Request::put(url);
305
306        req = req.header(X_UPYUN_MULTI_STAGE, "initiate");
307
308        req = req.header(X_UPYUN_MULTI_DISORDER, "true");
309
310        if let Some(content_type) = args.content_type() {
311            req = req.header(X_UPYUN_MULTI_TYPE, content_type);
312        }
313
314        if let Some(content_disposition) = args.content_disposition() {
315            req = req.header(X_UPYUN_CONTENT_DISPOSITION, content_disposition)
316        }
317
318        if let Some(cache_control) = args.cache_control() {
319            req = req.header(X_UPYUN_CACHE_CONTROL, cache_control)
320        }
321
322        let req = req.extension(Operation::Write);
323
324        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
325
326        self.sign(&mut req)?;
327
328        self.send(req).await
329    }
330
331    pub fn upload_part(
332        &self,
333        path: &str,
334        upload_id: &str,
335        part_number: usize,
336        size: u64,
337        body: Buffer,
338    ) -> Result<Request<Buffer>> {
339        let p = build_abs_path(&self.root, path);
340
341        let url = format!(
342            "https://v0.api.upyun.com/{}/{}",
343            self.bucket,
344            percent_encode_path(&p),
345        );
346
347        let mut req = Request::put(&url);
348
349        req = req.header(header::CONTENT_LENGTH, size);
350
351        req = req.header(X_UPYUN_MULTI_STAGE, "upload");
352
353        req = req.header(X_UPYUN_MULTI_UUID, upload_id);
354
355        req = req.header(X_UPYUN_PART_ID, part_number);
356
357        let req = req.extension(Operation::Write);
358
359        // Set body
360        let mut req = req.body(body).map_err(new_request_build_error)?;
361
362        self.sign(&mut req)?;
363
364        Ok(req)
365    }
366
367    pub async fn complete_multipart_upload(
368        &self,
369        path: &str,
370        upload_id: &str,
371    ) -> Result<Response<Buffer>> {
372        let p = build_abs_path(&self.root, path);
373
374        let url = format!(
375            "https://v0.api.upyun.com/{}/{}",
376            self.bucket,
377            percent_encode_path(&p),
378        );
379
380        let mut req = Request::put(url);
381
382        req = req.header(X_UPYUN_MULTI_STAGE, "complete");
383
384        req = req.header(X_UPYUN_MULTI_UUID, upload_id);
385
386        let req = req.extension(Operation::Write);
387
388        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
389
390        self.sign(&mut req)?;
391
392        self.send(req).await
393    }
394
395    pub async fn list_objects(
396        &self,
397        path: &str,
398        iter: &str,
399        limit: Option<usize>,
400    ) -> Result<Response<Buffer>> {
401        let path = build_abs_path(&self.root, path);
402
403        let url = format!(
404            "https://v0.api.upyun.com/{}/{}",
405            self.bucket,
406            percent_encode_path(&path),
407        );
408
409        let mut req = Request::get(url.clone());
410
411        req = req.header(header::ACCEPT, "application/json");
412
413        if !iter.is_empty() {
414            req = req.header(X_UPYUN_LIST_ITER, iter);
415        }
416
417        if let Some(mut limit) = limit {
418            if limit > X_UPYUN_LIST_MAX_LIMIT {
419                limit = X_UPYUN_LIST_DEFAULT_LIMIT;
420            }
421            req = req.header(X_UPYUN_LIST_LIMIT, limit);
422        }
423
424        let req = req.extension(Operation::List);
425
426        // Set body
427        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
428
429        self.sign(&mut req)?;
430
431        self.send(req).await
432    }
433}
434
435#[derive(Clone, Default)]
436pub struct UpyunSigner {
437    pub operator: String,
438    pub password: String,
439}
440
441type HmacSha1 = Hmac<Sha1>;
442
443impl UpyunSigner {
444    pub fn authorization(&self, date: &str, method: &str, uri: &str) -> String {
445        let sign = vec![method, uri, date];
446
447        let sign = sign
448            .into_iter()
449            .filter(|s| !s.is_empty())
450            .collect::<Vec<&str>>()
451            .join("&");
452
453        let mut mac = HmacSha1::new_from_slice(format_md5(self.password.as_bytes()).as_bytes())
454            .expect("HMAC can take key of any size");
455        mac.update(sign.as_bytes());
456        let sign_str = mac.finalize().into_bytes();
457
458        let sign = base64::engine::general_purpose::STANDARD.encode(sign_str.as_slice());
459        format!("UPYUN {}:{}", self.operator, sign)
460    }
461}
462
463pub(super) fn parse_info(headers: &HeaderMap) -> Result<Metadata> {
464    let mode = if parse_header_to_str(headers, X_UPYUN_FILE_TYPE)? == Some("file") {
465        EntryMode::FILE
466    } else {
467        EntryMode::DIR
468    };
469
470    let mut m = Metadata::new(mode);
471
472    if let Some(v) = parse_header_to_str(headers, X_UPYUN_FILE_SIZE)? {
473        let size = v.parse::<u64>().map_err(|e| {
474            Error::new(ErrorKind::Unexpected, "header value is not valid integer")
475                .with_operation("parse_info")
476                .set_source(e)
477        })?;
478        m.set_content_length(size);
479    }
480
481    if let Some(v) = parse_content_type(headers)? {
482        m.set_content_type(v);
483    }
484
485    if let Some(v) = parse_content_md5(headers)? {
486        m.set_content_md5(v);
487    }
488
489    if let Some(v) = parse_header_to_str(headers, X_UPYUN_CACHE_CONTROL)? {
490        m.set_cache_control(v);
491    }
492
493    if let Some(v) = parse_header_to_str(headers, X_UPYUN_CONTENT_DISPOSITION)? {
494        m.set_content_disposition(v);
495    }
496
497    Ok(m)
498}
499
500pub fn format_md5(bs: &[u8]) -> String {
501    let mut hasher = md5::Md5::new();
502    hasher.update(bs);
503
504    format!("{:x}", hasher.finalize())
505}
506
507#[derive(Debug, Deserialize)]
508pub(super) struct File {
509    #[serde(rename = "type")]
510    pub type_field: String,
511    pub name: String,
512    pub length: u64,
513    pub last_modified: i64,
514}
515
516#[derive(Debug, Deserialize)]
517pub(super) struct ListObjectsResponse {
518    pub iter: String,
519    pub files: Vec<File>,
520}