opendal/services/upyun/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use base64::Engine;
22use hmac::Hmac;
23use hmac::Mac;
24use http::HeaderMap;
25use http::Request;
26use http::Response;
27use http::header;
28use md5::Digest;
29use serde::Deserialize;
30use sha1::Sha1;
31
32use self::constants::*;
33use crate::raw::*;
34use crate::*;
35
36pub(super) mod constants {
37    pub const X_UPYUN_FILE_TYPE: &str = "x-upyun-file-type";
38    pub const X_UPYUN_FILE_SIZE: &str = "x-upyun-file-size";
39    pub const X_UPYUN_CACHE_CONTROL: &str = "x-upyun-meta-cache-control";
40    pub const X_UPYUN_CONTENT_DISPOSITION: &str = "x-upyun-meta-content-disposition";
41    pub const X_UPYUN_MULTI_STAGE: &str = "X-Upyun-Multi-Stage";
42    pub const X_UPYUN_MULTI_TYPE: &str = "X-Upyun-Multi-Type";
43    pub const X_UPYUN_MULTI_DISORDER: &str = "X-Upyun-Multi-Disorder";
44    pub const X_UPYUN_MULTI_UUID: &str = "X-Upyun-Multi-Uuid";
45    pub const X_UPYUN_PART_ID: &str = "X-Upyun-Part-Id";
46    pub const X_UPYUN_FOLDER: &str = "x-upyun-folder";
47    pub const X_UPYUN_MOVE_SOURCE: &str = "X-Upyun-Move-Source";
48    pub const X_UPYUN_COPY_SOURCE: &str = "X-Upyun-Copy-Source";
49    pub const X_UPYUN_METADATA_DIRECTIVE: &str = "X-Upyun-Metadata-Directive";
50    pub const X_UPYUN_LIST_ITER: &str = "x-list-iter";
51    pub const X_UPYUN_LIST_LIMIT: &str = "X-List-Limit";
52    pub const X_UPYUN_LIST_MAX_LIMIT: usize = 4096;
53    pub const X_UPYUN_LIST_DEFAULT_LIMIT: usize = 256;
54}
55
56#[derive(Clone)]
57pub struct UpyunCore {
58    pub info: Arc<AccessorInfo>,
59    /// The root of this core.
60    pub root: String,
61    /// The endpoint of this backend.
62    pub operator: String,
63    /// The bucket of this backend.
64    pub bucket: String,
65
66    /// signer of this backend.
67    pub signer: UpyunSigner,
68}
69
70impl Debug for UpyunCore {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("UpyunCore")
73            .field("root", &self.root)
74            .field("bucket", &self.bucket)
75            .field("operator", &self.operator)
76            .finish_non_exhaustive()
77    }
78}
79
80impl UpyunCore {
81    #[inline]
82    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
83        self.info.http_client().send(req).await
84    }
85
86    pub fn sign(&self, req: &mut Request<Buffer>) -> Result<()> {
87        // get rfc1123 date
88        let date = Timestamp::now().format_http_date();
89        let authorization =
90            self.signer
91                .authorization(&date, req.method().as_str(), req.uri().path());
92
93        req.headers_mut()
94            .insert("Authorization", authorization.parse().unwrap());
95        req.headers_mut().insert("Date", date.parse().unwrap());
96
97        Ok(())
98    }
99}
100
101impl UpyunCore {
102    pub async fn download_file(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
103        let path = build_abs_path(&self.root, path);
104
105        let url = format!(
106            "https://v0.api.upyun.com/{}/{}",
107            self.bucket,
108            percent_encode_path(&path)
109        );
110
111        let req = Request::get(url);
112
113        let mut req = req
114            .header(header::RANGE, range.to_header())
115            .extension(Operation::Read)
116            .body(Buffer::new())
117            .map_err(new_request_build_error)?;
118
119        self.sign(&mut req)?;
120
121        self.info.http_client().fetch(req).await
122    }
123
124    pub async fn info(&self, path: &str) -> Result<Response<Buffer>> {
125        let path = build_abs_path(&self.root, path);
126
127        let url = format!(
128            "https://v0.api.upyun.com/{}/{}",
129            self.bucket,
130            percent_encode_path(&path)
131        );
132
133        let req = Request::head(url);
134
135        let mut req = req
136            .extension(Operation::Stat)
137            .body(Buffer::new())
138            .map_err(new_request_build_error)?;
139
140        self.sign(&mut req)?;
141
142        self.send(req).await
143    }
144
145    pub fn upload(
146        &self,
147        path: &str,
148        size: Option<u64>,
149        args: &OpWrite,
150        body: Buffer,
151    ) -> Result<Request<Buffer>> {
152        let p = build_abs_path(&self.root, path);
153
154        let url = format!(
155            "https://v0.api.upyun.com/{}/{}",
156            self.bucket,
157            percent_encode_path(&p)
158        );
159
160        let mut req = Request::put(&url);
161
162        if let Some(size) = size {
163            req = req.header(header::CONTENT_LENGTH, size.to_string())
164        }
165
166        if let Some(mime) = args.content_type() {
167            req = req.header(header::CONTENT_TYPE, mime)
168        }
169
170        if let Some(pos) = args.content_disposition() {
171            req = req.header(X_UPYUN_CONTENT_DISPOSITION, pos)
172        }
173
174        if let Some(cache_control) = args.cache_control() {
175            req = req.header(X_UPYUN_CACHE_CONTROL, cache_control)
176        }
177
178        let req = req.extension(Operation::Write);
179
180        // Set body
181        let mut req = req.body(body).map_err(new_request_build_error)?;
182
183        self.sign(&mut req)?;
184
185        Ok(req)
186    }
187
188    pub async fn delete(&self, path: &str) -> Result<Response<Buffer>> {
189        let path = build_abs_path(&self.root, path);
190
191        let url = format!(
192            "https://v0.api.upyun.com/{}/{}",
193            self.bucket,
194            percent_encode_path(&path)
195        );
196
197        let req = Request::delete(url);
198
199        let req = req.extension(Operation::Delete);
200
201        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
202
203        self.sign(&mut req)?;
204
205        self.send(req).await
206    }
207
208    pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
209        let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, from));
210        let to = build_abs_path(&self.root, to);
211
212        let url = format!(
213            "https://v0.api.upyun.com/{}/{}",
214            self.bucket,
215            percent_encode_path(&to)
216        );
217
218        let mut req = Request::put(url);
219
220        req = req.header(header::CONTENT_LENGTH, "0");
221
222        req = req.header(X_UPYUN_COPY_SOURCE, from);
223
224        req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy");
225
226        let req = req.extension(Operation::Copy);
227
228        // Set body
229        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
230
231        self.sign(&mut req)?;
232
233        self.send(req).await
234    }
235
236    pub async fn move_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
237        let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, from));
238        let to = build_abs_path(&self.root, to);
239
240        let url = format!(
241            "https://v0.api.upyun.com/{}/{}",
242            self.bucket,
243            percent_encode_path(&to)
244        );
245
246        let mut req = Request::put(url);
247
248        req = req.header(header::CONTENT_LENGTH, "0");
249
250        req = req.header(X_UPYUN_MOVE_SOURCE, from);
251
252        req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy");
253
254        let req = req.extension(Operation::Rename);
255
256        // Set body
257        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
258
259        self.sign(&mut req)?;
260
261        self.send(req).await
262    }
263
264    pub async fn create_dir(&self, path: &str) -> Result<Response<Buffer>> {
265        let path = build_abs_path(&self.root, path);
266        let path = path[..path.len() - 1].to_string();
267
268        let url = format!(
269            "https://v0.api.upyun.com/{}/{}",
270            self.bucket,
271            percent_encode_path(&path)
272        );
273
274        let mut req = Request::post(url);
275
276        req = req.header("folder", "true");
277
278        req = req.header(X_UPYUN_FOLDER, "true");
279
280        let req = req.extension(Operation::CreateDir);
281
282        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
283
284        self.sign(&mut req)?;
285
286        self.send(req).await
287    }
288
289    pub async fn initiate_multipart_upload(
290        &self,
291        path: &str,
292        args: &OpWrite,
293    ) -> Result<Response<Buffer>> {
294        let path = build_abs_path(&self.root, path);
295
296        let url = format!(
297            "https://v0.api.upyun.com/{}/{}",
298            self.bucket,
299            percent_encode_path(&path)
300        );
301
302        let mut req = Request::put(url);
303
304        req = req.header(X_UPYUN_MULTI_STAGE, "initiate");
305
306        req = req.header(X_UPYUN_MULTI_DISORDER, "true");
307
308        if let Some(content_type) = args.content_type() {
309            req = req.header(X_UPYUN_MULTI_TYPE, content_type);
310        }
311
312        if let Some(content_disposition) = args.content_disposition() {
313            req = req.header(X_UPYUN_CONTENT_DISPOSITION, content_disposition)
314        }
315
316        if let Some(cache_control) = args.cache_control() {
317            req = req.header(X_UPYUN_CACHE_CONTROL, cache_control)
318        }
319
320        let req = req.extension(Operation::Write);
321
322        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
323
324        self.sign(&mut req)?;
325
326        self.send(req).await
327    }
328
329    pub fn upload_part(
330        &self,
331        path: &str,
332        upload_id: &str,
333        part_number: usize,
334        size: u64,
335        body: Buffer,
336    ) -> Result<Request<Buffer>> {
337        let p = build_abs_path(&self.root, path);
338
339        let url = format!(
340            "https://v0.api.upyun.com/{}/{}",
341            self.bucket,
342            percent_encode_path(&p),
343        );
344
345        let mut req = Request::put(&url);
346
347        req = req.header(header::CONTENT_LENGTH, size);
348
349        req = req.header(X_UPYUN_MULTI_STAGE, "upload");
350
351        req = req.header(X_UPYUN_MULTI_UUID, upload_id);
352
353        req = req.header(X_UPYUN_PART_ID, part_number);
354
355        let req = req.extension(Operation::Write);
356
357        // Set body
358        let mut req = req.body(body).map_err(new_request_build_error)?;
359
360        self.sign(&mut req)?;
361
362        Ok(req)
363    }
364
365    pub async fn complete_multipart_upload(
366        &self,
367        path: &str,
368        upload_id: &str,
369    ) -> Result<Response<Buffer>> {
370        let p = build_abs_path(&self.root, path);
371
372        let url = format!(
373            "https://v0.api.upyun.com/{}/{}",
374            self.bucket,
375            percent_encode_path(&p),
376        );
377
378        let mut req = Request::put(url);
379
380        req = req.header(X_UPYUN_MULTI_STAGE, "complete");
381
382        req = req.header(X_UPYUN_MULTI_UUID, upload_id);
383
384        let req = req.extension(Operation::Write);
385
386        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
387
388        self.sign(&mut req)?;
389
390        self.send(req).await
391    }
392
393    pub async fn list_objects(
394        &self,
395        path: &str,
396        iter: &str,
397        limit: Option<usize>,
398    ) -> Result<Response<Buffer>> {
399        let path = build_abs_path(&self.root, path);
400
401        let url = format!(
402            "https://v0.api.upyun.com/{}/{}",
403            self.bucket,
404            percent_encode_path(&path),
405        );
406
407        let mut req = Request::get(url.clone());
408
409        req = req.header(header::ACCEPT, "application/json");
410
411        if !iter.is_empty() {
412            req = req.header(X_UPYUN_LIST_ITER, iter);
413        }
414
415        if let Some(mut limit) = limit {
416            if limit > X_UPYUN_LIST_MAX_LIMIT {
417                limit = X_UPYUN_LIST_DEFAULT_LIMIT;
418            }
419            req = req.header(X_UPYUN_LIST_LIMIT, limit);
420        }
421
422        let req = req.extension(Operation::List);
423
424        // Set body
425        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
426
427        self.sign(&mut req)?;
428
429        self.send(req).await
430    }
431}
432
433#[derive(Clone, Default)]
434pub struct UpyunSigner {
435    pub operator: String,
436    pub password: String,
437}
438
439type HmacSha1 = Hmac<Sha1>;
440
441impl UpyunSigner {
442    pub fn authorization(&self, date: &str, method: &str, uri: &str) -> String {
443        let sign = vec![method, uri, date];
444
445        let sign = sign
446            .into_iter()
447            .filter(|s| !s.is_empty())
448            .collect::<Vec<&str>>()
449            .join("&");
450
451        let mut mac = HmacSha1::new_from_slice(format_md5(self.password.as_bytes()).as_bytes())
452            .expect("HMAC can take key of any size");
453        mac.update(sign.as_bytes());
454        let sign_str = mac.finalize().into_bytes();
455
456        let sign = base64::engine::general_purpose::STANDARD.encode(sign_str);
457        format!("UPYUN {}:{}", self.operator, sign)
458    }
459}
460
461pub(super) fn parse_info(headers: &HeaderMap) -> Result<Metadata> {
462    let mode = if parse_header_to_str(headers, X_UPYUN_FILE_TYPE)? == Some("file") {
463        EntryMode::FILE
464    } else {
465        EntryMode::DIR
466    };
467
468    let mut m = Metadata::new(mode);
469
470    if let Some(v) = parse_header_to_str(headers, X_UPYUN_FILE_SIZE)? {
471        let size = v.parse::<u64>().map_err(|e| {
472            Error::new(ErrorKind::Unexpected, "header value is not valid integer")
473                .with_operation("parse_info")
474                .set_source(e)
475        })?;
476        m.set_content_length(size);
477    }
478
479    if let Some(v) = parse_content_type(headers)? {
480        m.set_content_type(v);
481    }
482
483    if let Some(v) = parse_content_md5(headers)? {
484        m.set_content_md5(v);
485    }
486
487    if let Some(v) = parse_header_to_str(headers, X_UPYUN_CACHE_CONTROL)? {
488        m.set_cache_control(v);
489    }
490
491    if let Some(v) = parse_header_to_str(headers, X_UPYUN_CONTENT_DISPOSITION)? {
492        m.set_content_disposition(v);
493    }
494
495    Ok(m)
496}
497
498pub fn format_md5(bs: &[u8]) -> String {
499    let mut hasher = md5::Md5::new();
500    hasher.update(bs);
501
502    format!("{:x}", hasher.finalize())
503}
504
505#[derive(Debug, Deserialize)]
506pub(super) struct File {
507    #[serde(rename = "type")]
508    pub type_field: String,
509    pub name: String,
510    pub length: u64,
511    pub last_modified: i64,
512}
513
514#[derive(Debug, Deserialize)]
515pub(super) struct ListObjectsResponse {
516    pub iter: String,
517    pub files: Vec<File>,
518}