opendal/services/upyun/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use base64::Engine;
23use hmac::Hmac;
24use hmac::Mac;
25use http::HeaderMap;
26use http::Request;
27use http::Response;
28use http::header;
29use md5::Digest;
30use serde::Deserialize;
31use sha1::Sha1;
32
33use self::constants::*;
34use crate::raw::*;
35use crate::*;
36
37pub(super) mod constants {
38    pub const X_UPYUN_FILE_TYPE: &str = "x-upyun-file-type";
39    pub const X_UPYUN_FILE_SIZE: &str = "x-upyun-file-size";
40    pub const X_UPYUN_CACHE_CONTROL: &str = "x-upyun-meta-cache-control";
41    pub const X_UPYUN_CONTENT_DISPOSITION: &str = "x-upyun-meta-content-disposition";
42    pub const X_UPYUN_MULTI_STAGE: &str = "X-Upyun-Multi-Stage";
43    pub const X_UPYUN_MULTI_TYPE: &str = "X-Upyun-Multi-Type";
44    pub const X_UPYUN_MULTI_DISORDER: &str = "X-Upyun-Multi-Disorder";
45    pub const X_UPYUN_MULTI_UUID: &str = "X-Upyun-Multi-Uuid";
46    pub const X_UPYUN_PART_ID: &str = "X-Upyun-Part-Id";
47    pub const X_UPYUN_FOLDER: &str = "x-upyun-folder";
48    pub const X_UPYUN_MOVE_SOURCE: &str = "X-Upyun-Move-Source";
49    pub const X_UPYUN_COPY_SOURCE: &str = "X-Upyun-Copy-Source";
50    pub const X_UPYUN_METADATA_DIRECTIVE: &str = "X-Upyun-Metadata-Directive";
51    pub const X_UPYUN_LIST_ITER: &str = "x-list-iter";
52    pub const X_UPYUN_LIST_LIMIT: &str = "X-List-Limit";
53    pub const X_UPYUN_LIST_MAX_LIMIT: usize = 4096;
54    pub const X_UPYUN_LIST_DEFAULT_LIMIT: usize = 256;
55}
56
57#[derive(Clone)]
58pub struct UpyunCore {
59    pub info: Arc<AccessorInfo>,
60    /// The root of this core.
61    pub root: String,
62    /// The endpoint of this backend.
63    pub operator: String,
64    /// The bucket of this backend.
65    pub bucket: String,
66
67    /// signer of this backend.
68    pub signer: UpyunSigner,
69}
70
71impl Debug for UpyunCore {
72    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("Backend")
74            .field("root", &self.root)
75            .field("bucket", &self.bucket)
76            .field("operator", &self.operator)
77            .finish_non_exhaustive()
78    }
79}
80
81impl UpyunCore {
82    #[inline]
83    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
84        self.info.http_client().send(req).await
85    }
86
87    pub fn sign(&self, req: &mut Request<Buffer>) -> Result<()> {
88        // get rfc1123 date
89        let date = Timestamp::now().format_http_date();
90        let authorization =
91            self.signer
92                .authorization(&date, req.method().as_str(), req.uri().path());
93
94        req.headers_mut()
95            .insert("Authorization", authorization.parse().unwrap());
96        req.headers_mut().insert("Date", date.parse().unwrap());
97
98        Ok(())
99    }
100}
101
102impl UpyunCore {
103    pub async fn download_file(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
104        let path = build_abs_path(&self.root, path);
105
106        let url = format!(
107            "https://v0.api.upyun.com/{}/{}",
108            self.bucket,
109            percent_encode_path(&path)
110        );
111
112        let req = Request::get(url);
113
114        let mut req = req
115            .header(header::RANGE, range.to_header())
116            .extension(Operation::Read)
117            .body(Buffer::new())
118            .map_err(new_request_build_error)?;
119
120        self.sign(&mut req)?;
121
122        self.info.http_client().fetch(req).await
123    }
124
125    pub async fn info(&self, path: &str) -> Result<Response<Buffer>> {
126        let path = build_abs_path(&self.root, path);
127
128        let url = format!(
129            "https://v0.api.upyun.com/{}/{}",
130            self.bucket,
131            percent_encode_path(&path)
132        );
133
134        let req = Request::head(url);
135
136        let mut req = req
137            .extension(Operation::Stat)
138            .body(Buffer::new())
139            .map_err(new_request_build_error)?;
140
141        self.sign(&mut req)?;
142
143        self.send(req).await
144    }
145
146    pub fn upload(
147        &self,
148        path: &str,
149        size: Option<u64>,
150        args: &OpWrite,
151        body: Buffer,
152    ) -> Result<Request<Buffer>> {
153        let p = build_abs_path(&self.root, path);
154
155        let url = format!(
156            "https://v0.api.upyun.com/{}/{}",
157            self.bucket,
158            percent_encode_path(&p)
159        );
160
161        let mut req = Request::put(&url);
162
163        if let Some(size) = size {
164            req = req.header(header::CONTENT_LENGTH, size.to_string())
165        }
166
167        if let Some(mime) = args.content_type() {
168            req = req.header(header::CONTENT_TYPE, mime)
169        }
170
171        if let Some(pos) = args.content_disposition() {
172            req = req.header(X_UPYUN_CONTENT_DISPOSITION, pos)
173        }
174
175        if let Some(cache_control) = args.cache_control() {
176            req = req.header(X_UPYUN_CACHE_CONTROL, cache_control)
177        }
178
179        let req = req.extension(Operation::Write);
180
181        // Set body
182        let mut req = req.body(body).map_err(new_request_build_error)?;
183
184        self.sign(&mut req)?;
185
186        Ok(req)
187    }
188
189    pub async fn delete(&self, path: &str) -> Result<Response<Buffer>> {
190        let path = build_abs_path(&self.root, path);
191
192        let url = format!(
193            "https://v0.api.upyun.com/{}/{}",
194            self.bucket,
195            percent_encode_path(&path)
196        );
197
198        let req = Request::delete(url);
199
200        let req = req.extension(Operation::Delete);
201
202        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
203
204        self.sign(&mut req)?;
205
206        self.send(req).await
207    }
208
209    pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
210        let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, from));
211        let to = build_abs_path(&self.root, to);
212
213        let url = format!(
214            "https://v0.api.upyun.com/{}/{}",
215            self.bucket,
216            percent_encode_path(&to)
217        );
218
219        let mut req = Request::put(url);
220
221        req = req.header(header::CONTENT_LENGTH, "0");
222
223        req = req.header(X_UPYUN_COPY_SOURCE, from);
224
225        req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy");
226
227        let req = req.extension(Operation::Copy);
228
229        // Set body
230        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
231
232        self.sign(&mut req)?;
233
234        self.send(req).await
235    }
236
237    pub async fn move_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
238        let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, from));
239        let to = build_abs_path(&self.root, to);
240
241        let url = format!(
242            "https://v0.api.upyun.com/{}/{}",
243            self.bucket,
244            percent_encode_path(&to)
245        );
246
247        let mut req = Request::put(url);
248
249        req = req.header(header::CONTENT_LENGTH, "0");
250
251        req = req.header(X_UPYUN_MOVE_SOURCE, from);
252
253        req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy");
254
255        let req = req.extension(Operation::Rename);
256
257        // Set body
258        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
259
260        self.sign(&mut req)?;
261
262        self.send(req).await
263    }
264
265    pub async fn create_dir(&self, path: &str) -> Result<Response<Buffer>> {
266        let path = build_abs_path(&self.root, path);
267        let path = path[..path.len() - 1].to_string();
268
269        let url = format!(
270            "https://v0.api.upyun.com/{}/{}",
271            self.bucket,
272            percent_encode_path(&path)
273        );
274
275        let mut req = Request::post(url);
276
277        req = req.header("folder", "true");
278
279        req = req.header(X_UPYUN_FOLDER, "true");
280
281        let req = req.extension(Operation::CreateDir);
282
283        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
284
285        self.sign(&mut req)?;
286
287        self.send(req).await
288    }
289
290    pub async fn initiate_multipart_upload(
291        &self,
292        path: &str,
293        args: &OpWrite,
294    ) -> Result<Response<Buffer>> {
295        let path = build_abs_path(&self.root, path);
296
297        let url = format!(
298            "https://v0.api.upyun.com/{}/{}",
299            self.bucket,
300            percent_encode_path(&path)
301        );
302
303        let mut req = Request::put(url);
304
305        req = req.header(X_UPYUN_MULTI_STAGE, "initiate");
306
307        req = req.header(X_UPYUN_MULTI_DISORDER, "true");
308
309        if let Some(content_type) = args.content_type() {
310            req = req.header(X_UPYUN_MULTI_TYPE, content_type);
311        }
312
313        if let Some(content_disposition) = args.content_disposition() {
314            req = req.header(X_UPYUN_CONTENT_DISPOSITION, content_disposition)
315        }
316
317        if let Some(cache_control) = args.cache_control() {
318            req = req.header(X_UPYUN_CACHE_CONTROL, cache_control)
319        }
320
321        let req = req.extension(Operation::Write);
322
323        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
324
325        self.sign(&mut req)?;
326
327        self.send(req).await
328    }
329
330    pub fn upload_part(
331        &self,
332        path: &str,
333        upload_id: &str,
334        part_number: usize,
335        size: u64,
336        body: Buffer,
337    ) -> Result<Request<Buffer>> {
338        let p = build_abs_path(&self.root, path);
339
340        let url = format!(
341            "https://v0.api.upyun.com/{}/{}",
342            self.bucket,
343            percent_encode_path(&p),
344        );
345
346        let mut req = Request::put(&url);
347
348        req = req.header(header::CONTENT_LENGTH, size);
349
350        req = req.header(X_UPYUN_MULTI_STAGE, "upload");
351
352        req = req.header(X_UPYUN_MULTI_UUID, upload_id);
353
354        req = req.header(X_UPYUN_PART_ID, part_number);
355
356        let req = req.extension(Operation::Write);
357
358        // Set body
359        let mut req = req.body(body).map_err(new_request_build_error)?;
360
361        self.sign(&mut req)?;
362
363        Ok(req)
364    }
365
366    pub async fn complete_multipart_upload(
367        &self,
368        path: &str,
369        upload_id: &str,
370    ) -> Result<Response<Buffer>> {
371        let p = build_abs_path(&self.root, path);
372
373        let url = format!(
374            "https://v0.api.upyun.com/{}/{}",
375            self.bucket,
376            percent_encode_path(&p),
377        );
378
379        let mut req = Request::put(url);
380
381        req = req.header(X_UPYUN_MULTI_STAGE, "complete");
382
383        req = req.header(X_UPYUN_MULTI_UUID, upload_id);
384
385        let req = req.extension(Operation::Write);
386
387        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
388
389        self.sign(&mut req)?;
390
391        self.send(req).await
392    }
393
394    pub async fn list_objects(
395        &self,
396        path: &str,
397        iter: &str,
398        limit: Option<usize>,
399    ) -> Result<Response<Buffer>> {
400        let path = build_abs_path(&self.root, path);
401
402        let url = format!(
403            "https://v0.api.upyun.com/{}/{}",
404            self.bucket,
405            percent_encode_path(&path),
406        );
407
408        let mut req = Request::get(url.clone());
409
410        req = req.header(header::ACCEPT, "application/json");
411
412        if !iter.is_empty() {
413            req = req.header(X_UPYUN_LIST_ITER, iter);
414        }
415
416        if let Some(mut limit) = limit {
417            if limit > X_UPYUN_LIST_MAX_LIMIT {
418                limit = X_UPYUN_LIST_DEFAULT_LIMIT;
419            }
420            req = req.header(X_UPYUN_LIST_LIMIT, limit);
421        }
422
423        let req = req.extension(Operation::List);
424
425        // Set body
426        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
427
428        self.sign(&mut req)?;
429
430        self.send(req).await
431    }
432}
433
434#[derive(Clone, Default)]
435pub struct UpyunSigner {
436    pub operator: String,
437    pub password: String,
438}
439
440type HmacSha1 = Hmac<Sha1>;
441
442impl UpyunSigner {
443    pub fn authorization(&self, date: &str, method: &str, uri: &str) -> String {
444        let sign = vec![method, uri, date];
445
446        let sign = sign
447            .into_iter()
448            .filter(|s| !s.is_empty())
449            .collect::<Vec<&str>>()
450            .join("&");
451
452        let mut mac = HmacSha1::new_from_slice(format_md5(self.password.as_bytes()).as_bytes())
453            .expect("HMAC can take key of any size");
454        mac.update(sign.as_bytes());
455        let sign_str = mac.finalize().into_bytes();
456
457        let sign = base64::engine::general_purpose::STANDARD.encode(sign_str.as_slice());
458        format!("UPYUN {}:{}", self.operator, sign)
459    }
460}
461
462pub(super) fn parse_info(headers: &HeaderMap) -> Result<Metadata> {
463    let mode = if parse_header_to_str(headers, X_UPYUN_FILE_TYPE)? == Some("file") {
464        EntryMode::FILE
465    } else {
466        EntryMode::DIR
467    };
468
469    let mut m = Metadata::new(mode);
470
471    if let Some(v) = parse_header_to_str(headers, X_UPYUN_FILE_SIZE)? {
472        let size = v.parse::<u64>().map_err(|e| {
473            Error::new(ErrorKind::Unexpected, "header value is not valid integer")
474                .with_operation("parse_info")
475                .set_source(e)
476        })?;
477        m.set_content_length(size);
478    }
479
480    if let Some(v) = parse_content_type(headers)? {
481        m.set_content_type(v);
482    }
483
484    if let Some(v) = parse_content_md5(headers)? {
485        m.set_content_md5(v);
486    }
487
488    if let Some(v) = parse_header_to_str(headers, X_UPYUN_CACHE_CONTROL)? {
489        m.set_cache_control(v);
490    }
491
492    if let Some(v) = parse_header_to_str(headers, X_UPYUN_CONTENT_DISPOSITION)? {
493        m.set_content_disposition(v);
494    }
495
496    Ok(m)
497}
498
499pub fn format_md5(bs: &[u8]) -> String {
500    let mut hasher = md5::Md5::new();
501    hasher.update(bs);
502
503    format!("{:x}", hasher.finalize())
504}
505
506#[derive(Debug, Deserialize)]
507pub(super) struct File {
508    #[serde(rename = "type")]
509    pub type_field: String,
510    pub name: String,
511    pub length: u64,
512    pub last_modified: i64,
513}
514
515#[derive(Debug, Deserialize)]
516pub(super) struct ListObjectsResponse {
517    pub iter: String,
518    pub files: Vec<File>,
519}