opendal/services/upyun/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use base64::Engine;
19use hmac::Hmac;
20use hmac::Mac;
21use http::header;
22use http::HeaderMap;
23use http::Request;
24use http::Response;
25use md5::Digest;
26use serde::Deserialize;
27use sha1::Sha1;
28use std::fmt::Debug;
29use std::fmt::Formatter;
30use std::sync::Arc;
31
32use self::constants::*;
33use crate::raw::*;
34use crate::*;
35
36pub(super) mod constants {
37    pub const X_UPYUN_FILE_TYPE: &str = "x-upyun-file-type";
38    pub const X_UPYUN_FILE_SIZE: &str = "x-upyun-file-size";
39    pub const X_UPYUN_CACHE_CONTROL: &str = "x-upyun-meta-cache-control";
40    pub const X_UPYUN_CONTENT_DISPOSITION: &str = "x-upyun-meta-content-disposition";
41    pub const X_UPYUN_MULTI_STAGE: &str = "X-Upyun-Multi-Stage";
42    pub const X_UPYUN_MULTI_TYPE: &str = "X-Upyun-Multi-Type";
43    pub const X_UPYUN_MULTI_DISORDER: &str = "X-Upyun-Multi-Disorder";
44    pub const X_UPYUN_MULTI_UUID: &str = "X-Upyun-Multi-Uuid";
45    pub const X_UPYUN_PART_ID: &str = "X-Upyun-Part-Id";
46    pub const X_UPYUN_FOLDER: &str = "x-upyun-folder";
47    pub const X_UPYUN_MOVE_SOURCE: &str = "X-Upyun-Move-Source";
48    pub const X_UPYUN_COPY_SOURCE: &str = "X-Upyun-Copy-Source";
49    pub const X_UPYUN_METADATA_DIRECTIVE: &str = "X-Upyun-Metadata-Directive";
50    pub const X_UPYUN_LIST_ITER: &str = "x-list-iter";
51    pub const X_UPYUN_LIST_LIMIT: &str = "X-List-Limit";
52    pub const X_UPYUN_LIST_MAX_LIMIT: usize = 4096;
53    pub const X_UPYUN_LIST_DEFAULT_LIMIT: usize = 256;
54}
55
56#[derive(Clone)]
57pub struct UpyunCore {
58    pub info: Arc<AccessorInfo>,
59    /// The root of this core.
60    pub root: String,
61    /// The endpoint of this backend.
62    pub operator: String,
63    /// The bucket of this backend.
64    pub bucket: String,
65
66    /// signer of this backend.
67    pub signer: UpyunSigner,
68}
69
70impl Debug for UpyunCore {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("Backend")
73            .field("root", &self.root)
74            .field("bucket", &self.bucket)
75            .field("operator", &self.operator)
76            .finish_non_exhaustive()
77    }
78}
79
80impl UpyunCore {
81    #[inline]
82    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
83        self.info.http_client().send(req).await
84    }
85
86    pub fn sign(&self, req: &mut Request<Buffer>) -> Result<()> {
87        // get rfc1123 date
88        let date = chrono::Utc::now()
89            .format("%a, %d %b %Y %H:%M:%S GMT")
90            .to_string();
91        let authorization =
92            self.signer
93                .authorization(&date, req.method().as_str(), req.uri().path());
94
95        req.headers_mut()
96            .insert("Authorization", authorization.parse().unwrap());
97        req.headers_mut().insert("Date", date.parse().unwrap());
98
99        Ok(())
100    }
101}
102
103impl UpyunCore {
104    pub async fn download_file(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
105        let path = build_abs_path(&self.root, path);
106
107        let url = format!(
108            "https://v0.api.upyun.com/{}/{}",
109            self.bucket,
110            percent_encode_path(&path)
111        );
112
113        let req = Request::get(url);
114
115        let mut req = req
116            .header(header::RANGE, range.to_header())
117            .body(Buffer::new())
118            .map_err(new_request_build_error)?;
119
120        self.sign(&mut req)?;
121
122        self.info.http_client().fetch(req).await
123    }
124
125    pub async fn info(&self, path: &str) -> Result<Response<Buffer>> {
126        let path = build_abs_path(&self.root, path);
127
128        let url = format!(
129            "https://v0.api.upyun.com/{}/{}",
130            self.bucket,
131            percent_encode_path(&path)
132        );
133
134        let req = Request::head(url);
135
136        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
137
138        self.sign(&mut req)?;
139
140        self.send(req).await
141    }
142
143    pub fn upload(
144        &self,
145        path: &str,
146        size: Option<u64>,
147        args: &OpWrite,
148        body: Buffer,
149    ) -> Result<Request<Buffer>> {
150        let p = build_abs_path(&self.root, path);
151
152        let url = format!(
153            "https://v0.api.upyun.com/{}/{}",
154            self.bucket,
155            percent_encode_path(&p)
156        );
157
158        let mut req = Request::put(&url);
159
160        if let Some(size) = size {
161            req = req.header(header::CONTENT_LENGTH, size.to_string())
162        }
163
164        if let Some(mime) = args.content_type() {
165            req = req.header(header::CONTENT_TYPE, mime)
166        }
167
168        if let Some(pos) = args.content_disposition() {
169            req = req.header(X_UPYUN_CONTENT_DISPOSITION, pos)
170        }
171
172        if let Some(cache_control) = args.cache_control() {
173            req = req.header(X_UPYUN_CACHE_CONTROL, cache_control)
174        }
175
176        // Set body
177        let mut req = req.body(body).map_err(new_request_build_error)?;
178
179        self.sign(&mut req)?;
180
181        Ok(req)
182    }
183
184    pub async fn delete(&self, path: &str) -> Result<Response<Buffer>> {
185        let path = build_abs_path(&self.root, path);
186
187        let url = format!(
188            "https://v0.api.upyun.com/{}/{}",
189            self.bucket,
190            percent_encode_path(&path)
191        );
192
193        let req = Request::delete(url);
194
195        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
196
197        self.sign(&mut req)?;
198
199        self.send(req).await
200    }
201
202    pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
203        let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, from));
204        let to = build_abs_path(&self.root, to);
205
206        let url = format!(
207            "https://v0.api.upyun.com/{}/{}",
208            self.bucket,
209            percent_encode_path(&to)
210        );
211
212        let mut req = Request::put(url);
213
214        req = req.header(header::CONTENT_LENGTH, "0");
215
216        req = req.header(X_UPYUN_COPY_SOURCE, from);
217
218        req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy");
219
220        // Set body
221        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
222
223        self.sign(&mut req)?;
224
225        self.send(req).await
226    }
227
228    pub async fn move_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
229        let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, from));
230        let to = build_abs_path(&self.root, to);
231
232        let url = format!(
233            "https://v0.api.upyun.com/{}/{}",
234            self.bucket,
235            percent_encode_path(&to)
236        );
237
238        let mut req = Request::put(url);
239
240        req = req.header(header::CONTENT_LENGTH, "0");
241
242        req = req.header(X_UPYUN_MOVE_SOURCE, from);
243
244        req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy");
245
246        // Set body
247        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
248
249        self.sign(&mut req)?;
250
251        self.send(req).await
252    }
253
254    pub async fn create_dir(&self, path: &str) -> Result<Response<Buffer>> {
255        let path = build_abs_path(&self.root, path);
256        let path = path[..path.len() - 1].to_string();
257
258        let url = format!(
259            "https://v0.api.upyun.com/{}/{}",
260            self.bucket,
261            percent_encode_path(&path)
262        );
263
264        let mut req = Request::post(url);
265
266        req = req.header("folder", "true");
267
268        req = req.header(X_UPYUN_FOLDER, "true");
269
270        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
271
272        self.sign(&mut req)?;
273
274        self.send(req).await
275    }
276
277    pub async fn initiate_multipart_upload(
278        &self,
279        path: &str,
280        args: &OpWrite,
281    ) -> Result<Response<Buffer>> {
282        let path = build_abs_path(&self.root, path);
283
284        let url = format!(
285            "https://v0.api.upyun.com/{}/{}",
286            self.bucket,
287            percent_encode_path(&path)
288        );
289
290        let mut req = Request::put(url);
291
292        req = req.header(X_UPYUN_MULTI_STAGE, "initiate");
293
294        req = req.header(X_UPYUN_MULTI_DISORDER, "true");
295
296        if let Some(content_type) = args.content_type() {
297            req = req.header(X_UPYUN_MULTI_TYPE, content_type);
298        }
299
300        if let Some(content_disposition) = args.content_disposition() {
301            req = req.header(X_UPYUN_CONTENT_DISPOSITION, content_disposition)
302        }
303
304        if let Some(cache_control) = args.cache_control() {
305            req = req.header(X_UPYUN_CACHE_CONTROL, cache_control)
306        }
307
308        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
309
310        self.sign(&mut req)?;
311
312        self.send(req).await
313    }
314
315    pub fn upload_part(
316        &self,
317        path: &str,
318        upload_id: &str,
319        part_number: usize,
320        size: u64,
321        body: Buffer,
322    ) -> Result<Request<Buffer>> {
323        let p = build_abs_path(&self.root, path);
324
325        let url = format!(
326            "https://v0.api.upyun.com/{}/{}",
327            self.bucket,
328            percent_encode_path(&p),
329        );
330
331        let mut req = Request::put(&url);
332
333        req = req.header(header::CONTENT_LENGTH, size);
334
335        req = req.header(X_UPYUN_MULTI_STAGE, "upload");
336
337        req = req.header(X_UPYUN_MULTI_UUID, upload_id);
338
339        req = req.header(X_UPYUN_PART_ID, part_number);
340
341        // Set body
342        let mut req = req.body(body).map_err(new_request_build_error)?;
343
344        self.sign(&mut req)?;
345
346        Ok(req)
347    }
348
349    pub async fn complete_multipart_upload(
350        &self,
351        path: &str,
352        upload_id: &str,
353    ) -> Result<Response<Buffer>> {
354        let p = build_abs_path(&self.root, path);
355
356        let url = format!(
357            "https://v0.api.upyun.com/{}/{}",
358            self.bucket,
359            percent_encode_path(&p),
360        );
361
362        let mut req = Request::put(url);
363
364        req = req.header(X_UPYUN_MULTI_STAGE, "complete");
365
366        req = req.header(X_UPYUN_MULTI_UUID, upload_id);
367
368        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
369
370        self.sign(&mut req)?;
371
372        self.send(req).await
373    }
374
375    pub async fn list_objects(
376        &self,
377        path: &str,
378        iter: &str,
379        limit: Option<usize>,
380    ) -> Result<Response<Buffer>> {
381        let path = build_abs_path(&self.root, path);
382
383        let url = format!(
384            "https://v0.api.upyun.com/{}/{}",
385            self.bucket,
386            percent_encode_path(&path),
387        );
388
389        let mut req = Request::get(url.clone());
390
391        req = req.header(header::ACCEPT, "application/json");
392
393        if !iter.is_empty() {
394            req = req.header(X_UPYUN_LIST_ITER, iter);
395        }
396
397        if let Some(mut limit) = limit {
398            if limit > X_UPYUN_LIST_MAX_LIMIT {
399                limit = X_UPYUN_LIST_DEFAULT_LIMIT;
400            }
401            req = req.header(X_UPYUN_LIST_LIMIT, limit);
402        }
403
404        // Set body
405        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
406
407        self.sign(&mut req)?;
408
409        self.send(req).await
410    }
411}
412
413#[derive(Clone, Default)]
414pub struct UpyunSigner {
415    pub operator: String,
416    pub password: String,
417}
418
419type HmacSha1 = Hmac<Sha1>;
420
421impl UpyunSigner {
422    pub fn authorization(&self, date: &str, method: &str, uri: &str) -> String {
423        let sign = vec![method, uri, date];
424
425        let sign = sign
426            .into_iter()
427            .filter(|s| !s.is_empty())
428            .collect::<Vec<&str>>()
429            .join("&");
430
431        let mut mac = HmacSha1::new_from_slice(format_md5(self.password.as_bytes()).as_bytes())
432            .expect("HMAC can take key of any size");
433        mac.update(sign.as_bytes());
434        let sign_str = mac.finalize().into_bytes();
435
436        let sign = base64::engine::general_purpose::STANDARD.encode(sign_str.as_slice());
437        format!("UPYUN {}:{}", self.operator, sign)
438    }
439}
440
441pub(super) fn parse_info(headers: &HeaderMap) -> Result<Metadata> {
442    let mode = if parse_header_to_str(headers, X_UPYUN_FILE_TYPE)? == Some("file") {
443        EntryMode::FILE
444    } else {
445        EntryMode::DIR
446    };
447
448    let mut m = Metadata::new(mode);
449
450    if let Some(v) = parse_header_to_str(headers, X_UPYUN_FILE_SIZE)? {
451        let size = v.parse::<u64>().map_err(|e| {
452            Error::new(ErrorKind::Unexpected, "header value is not valid integer")
453                .with_operation("parse_info")
454                .set_source(e)
455        })?;
456        m.set_content_length(size);
457    }
458
459    if let Some(v) = parse_content_type(headers)? {
460        m.set_content_type(v);
461    }
462
463    if let Some(v) = parse_content_md5(headers)? {
464        m.set_content_md5(v);
465    }
466
467    if let Some(v) = parse_header_to_str(headers, X_UPYUN_CACHE_CONTROL)? {
468        m.set_cache_control(v);
469    }
470
471    if let Some(v) = parse_header_to_str(headers, X_UPYUN_CONTENT_DISPOSITION)? {
472        m.set_content_disposition(v);
473    }
474
475    Ok(m)
476}
477
478pub fn format_md5(bs: &[u8]) -> String {
479    let mut hasher = md5::Md5::new();
480    hasher.update(bs);
481
482    format!("{:x}", hasher.finalize())
483}
484
485#[derive(Debug, Deserialize)]
486pub(super) struct File {
487    #[serde(rename = "type")]
488    pub type_field: String,
489    pub name: String,
490    pub length: u64,
491    pub last_modified: i64,
492}
493
494#[derive(Debug, Deserialize)]
495pub(super) struct ListObjectsResponse {
496    pub iter: String,
497    pub files: Vec<File>,
498}