1use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Response;
22use http::StatusCode;
23use http::Uri;
24use log::debug;
25use reqsign::AliyunConfig;
26use reqsign::AliyunLoader;
27use reqsign::AliyunOssSigner;
28
29use super::OSS_SCHEME;
30use super::config::OssConfig;
31use super::core::*;
32use super::deleter::OssDeleter;
33use super::error::parse_error;
34use super::lister::OssLister;
35use super::lister::OssListers;
36use super::lister::OssObjectVersionsLister;
37use super::writer::OssWriter;
38use super::writer::OssWriters;
39use crate::raw::*;
40use crate::*;
41
42const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
43
44#[doc = include_str!("docs.md")]
46#[derive(Default)]
47pub struct OssBuilder {
48 pub(super) config: OssConfig,
49
50 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
51 pub(super) http_client: Option<HttpClient>,
52}
53
54impl Debug for OssBuilder {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("OssBuilder")
57 .field("config", &self.config)
58 .finish_non_exhaustive()
59 }
60}
61
62impl OssBuilder {
63 pub fn root(mut self, root: &str) -> Self {
67 self.config.root = if root.is_empty() {
68 None
69 } else {
70 Some(root.to_string())
71 };
72
73 self
74 }
75
76 pub fn bucket(mut self, bucket: &str) -> Self {
78 self.config.bucket = bucket.to_string();
79
80 self
81 }
82
83 pub fn endpoint(mut self, endpoint: &str) -> Self {
85 if !endpoint.is_empty() {
86 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
88 }
89
90 self
91 }
92
93 pub fn addressing_style(mut self, addressing_style: &str) -> Self {
103 self.config.addressing_style = Some(addressing_style.to_string());
104
105 self
106 }
107
108 pub fn enable_versioning(mut self, enabled: bool) -> Self {
110 self.config.enable_versioning = enabled;
111
112 self
113 }
114
115 pub fn presign_endpoint(mut self, endpoint: &str) -> Self {
124 if !endpoint.is_empty() {
125 self.config.presign_endpoint = Some(endpoint.trim_end_matches('/').to_string())
127 }
128
129 self
130 }
131
132 pub fn presign_addressing_style(mut self, addressing_style: &str) -> Self {
140 self.config.presign_addressing_style = Some(addressing_style.to_string());
141
142 self
143 }
144
145 pub fn access_key_id(mut self, v: &str) -> Self {
150 if !v.is_empty() {
151 self.config.access_key_id = Some(v.to_string())
152 }
153
154 self
155 }
156
157 pub fn access_key_secret(mut self, v: &str) -> Self {
162 if !v.is_empty() {
163 self.config.access_key_secret = Some(v.to_string())
164 }
165
166 self
167 }
168
169 pub fn security_token(mut self, security_token: &str) -> Self {
174 if !security_token.is_empty() {
175 self.config.security_token = Some(security_token.to_string())
176 }
177
178 self
179 }
180
181 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
188 #[allow(deprecated)]
189 pub fn http_client(mut self, client: HttpClient) -> Self {
190 self.http_client = Some(client);
191 self
192 }
193
194 fn parse_endpoint(
196 &self,
197 endpoint: &Option<String>,
198 bucket: &str,
199 addressing_style: AddressingStyle,
200 ) -> Result<(String, String)> {
201 let (endpoint, host) = match endpoint.clone() {
202 Some(ep) => {
203 let uri = ep.parse::<Uri>().map_err(|err| {
204 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
205 .with_context("service", OSS_SCHEME)
206 .with_context("endpoint", &ep)
207 .set_source(err)
208 })?;
209 let host = uri.host().ok_or_else(|| {
210 Error::new(ErrorKind::ConfigInvalid, "endpoint host is empty")
211 .with_context("service", OSS_SCHEME)
212 .with_context("endpoint", &ep)
213 })?;
214 let full_host = match addressing_style {
215 AddressingStyle::Virtual => {
216 if let Some(port) = uri.port_u16() {
217 format!("{bucket}.{host}:{port}")
218 } else {
219 format!("{bucket}.{host}")
220 }
221 }
222 AddressingStyle::Cname | AddressingStyle::Path => {
223 if let Some(port) = uri.port_u16() {
224 format!("{host}:{port}")
225 } else {
226 host.to_string()
227 }
228 }
229 };
230 if let Some(port) = uri.port_u16() {
231 format!("{bucket}.{host}:{port}")
232 } else {
233 format!("{bucket}.{host}")
234 };
235 let endpoint = match uri.scheme_str() {
236 Some(scheme_str) => match scheme_str {
237 "http" | "https" => format!("{scheme_str}://{full_host}"),
238 _ => {
239 return Err(Error::new(
240 ErrorKind::ConfigInvalid,
241 "endpoint protocol is invalid",
242 )
243 .with_context("service", OSS_SCHEME));
244 }
245 },
246 None => format!("https://{full_host}"),
247 };
248 let endpoint = match addressing_style {
249 AddressingStyle::Path => format!("{}/{}", endpoint, bucket),
250 AddressingStyle::Cname | AddressingStyle::Virtual => endpoint,
251 };
252 (endpoint, full_host)
253 }
254 None => {
255 return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
256 .with_context("service", OSS_SCHEME));
257 }
258 };
259 Ok((endpoint, host))
260 }
261
262 pub fn server_side_encryption(mut self, v: &str) -> Self {
279 if !v.is_empty() {
280 self.config.server_side_encryption = Some(v.to_string())
281 }
282 self
283 }
284
285 pub fn server_side_encryption_key_id(mut self, v: &str) -> Self {
291 if !v.is_empty() {
292 self.config.server_side_encryption_key_id = Some(v.to_string())
293 }
294 self
295 }
296
297 #[deprecated(
299 since = "0.52.0",
300 note = "Please use `delete_max_size` instead of `batch_max_operations`"
301 )]
302 pub fn batch_max_operations(mut self, delete_max_size: usize) -> Self {
303 self.config.delete_max_size = Some(delete_max_size);
304
305 self
306 }
307
308 pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
310 self.config.delete_max_size = Some(delete_max_size);
311
312 self
313 }
314
315 pub fn allow_anonymous(mut self) -> Self {
318 self.config.allow_anonymous = true;
319 self
320 }
321
322 pub fn role_arn(mut self, role_arn: &str) -> Self {
327 if !role_arn.is_empty() {
328 self.config.role_arn = Some(role_arn.to_string())
329 }
330
331 self
332 }
333
334 pub fn role_session_name(mut self, role_session_name: &str) -> Self {
336 if !role_session_name.is_empty() {
337 self.config.role_session_name = Some(role_session_name.to_string())
338 }
339
340 self
341 }
342
343 pub fn oidc_provider_arn(mut self, oidc_provider_arn: &str) -> Self {
345 if !oidc_provider_arn.is_empty() {
346 self.config.oidc_provider_arn = Some(oidc_provider_arn.to_string())
347 }
348
349 self
350 }
351
352 pub fn oidc_token_file(mut self, oidc_token_file: &str) -> Self {
354 if !oidc_token_file.is_empty() {
355 self.config.oidc_token_file = Some(oidc_token_file.to_string())
356 }
357
358 self
359 }
360
361 pub fn sts_endpoint(mut self, sts_endpoint: &str) -> Self {
363 if !sts_endpoint.is_empty() {
364 self.config.sts_endpoint = Some(sts_endpoint.to_string())
365 }
366
367 self
368 }
369}
370
371enum AddressingStyle {
372 Path,
373 Cname,
374 Virtual,
375}
376
377impl TryFrom<&Option<String>> for AddressingStyle {
378 type Error = Error;
379
380 fn try_from(value: &Option<String>) -> Result<Self> {
381 match value.as_deref() {
382 None | Some("virtual") => Ok(AddressingStyle::Virtual),
383 Some("path") => Ok(AddressingStyle::Path),
384 Some("cname") => Ok(AddressingStyle::Cname),
385 Some(v) => Err(Error::new(
386 ErrorKind::ConfigInvalid,
387 "Invalid addressing style, available: `virtual`, `path`, `cname`",
388 )
389 .with_context("service", OSS_SCHEME)
390 .with_context("addressing_style", v)),
391 }
392 }
393}
394
395impl Builder for OssBuilder {
396 type Config = OssConfig;
397
398 fn build(self) -> Result<impl Access> {
399 debug!("backend build started: {:?}", &self);
400
401 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
402 debug!("backend use root {}", &root);
403
404 let bucket = match self.config.bucket.is_empty() {
406 false => Ok(&self.config.bucket),
407 true => Err(
408 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
409 .with_context("service", OSS_SCHEME),
410 ),
411 }?;
412
413 let (endpoint, host) = self.parse_endpoint(
416 &self.config.endpoint,
417 bucket,
418 (&self.config.addressing_style).try_into()?,
419 )?;
420 debug!("backend use bucket {}, endpoint: {}", &bucket, &endpoint);
421
422 let presign_endpoint = if self.config.presign_endpoint.is_some() {
423 self.parse_endpoint(
424 &self.config.presign_endpoint,
425 bucket,
426 (&self.config.presign_addressing_style).try_into()?,
427 )?
428 .0
429 } else {
430 endpoint.clone()
431 };
432 debug!("backend use presign_endpoint: {}", &presign_endpoint);
433
434 let server_side_encryption = match &self.config.server_side_encryption {
435 None => None,
436 Some(v) => Some(
437 build_header_value(v)
438 .map_err(|err| err.with_context("key", "server_side_encryption"))?,
439 ),
440 };
441
442 let server_side_encryption_key_id = match &self.config.server_side_encryption_key_id {
443 None => None,
444 Some(v) => Some(
445 build_header_value(v)
446 .map_err(|err| err.with_context("key", "server_side_encryption_key_id"))?,
447 ),
448 };
449
450 let mut cfg = AliyunConfig::default();
451 cfg = cfg.from_env();
453
454 if let Some(v) = self.config.access_key_id {
455 cfg.access_key_id = Some(v);
456 }
457
458 if let Some(v) = self.config.access_key_secret {
459 cfg.access_key_secret = Some(v);
460 }
461
462 if let Some(v) = self.config.security_token {
463 cfg.security_token = Some(v);
464 }
465
466 if let Some(v) = self.config.role_arn {
467 cfg.role_arn = Some(v);
468 }
469
470 if let Some(v) = self.config.role_session_name {
472 cfg.role_session_name = v;
473 }
474
475 if let Some(v) = self.config.oidc_provider_arn {
476 cfg.oidc_provider_arn = Some(v);
477 }
478
479 if let Some(v) = self.config.oidc_token_file {
480 cfg.oidc_token_file = Some(v);
481 }
482
483 if let Some(v) = self.config.sts_endpoint {
484 cfg.sts_endpoint = Some(v);
485 }
486
487 let loader = AliyunLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
488
489 let signer = AliyunOssSigner::new(bucket);
490
491 let delete_max_size = self
492 .config
493 .delete_max_size
494 .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
495
496 Ok(OssBackend {
497 core: Arc::new(OssCore {
498 info: {
499 let am = AccessorInfo::default();
500 am.set_scheme(OSS_SCHEME)
501 .set_root(&root)
502 .set_name(bucket)
503 .set_native_capability(Capability {
504 stat: true,
505 stat_with_if_match: true,
506 stat_with_if_none_match: true,
507 stat_with_version: self.config.enable_versioning,
508
509 read: true,
510
511 read_with_if_match: true,
512 read_with_if_none_match: true,
513 read_with_version: self.config.enable_versioning,
514 read_with_if_modified_since: true,
515 read_with_if_unmodified_since: true,
516
517 write: true,
518 write_can_empty: true,
519 write_can_append: true,
520 write_can_multi: true,
521 write_with_cache_control: true,
522 write_with_content_type: true,
523 write_with_content_disposition: true,
524 write_with_if_not_exists: !self.config.enable_versioning,
526
527 write_multi_min_size: Some(100 * 1024),
531 write_multi_max_size: if cfg!(target_pointer_width = "64") {
535 Some(5 * 1024 * 1024 * 1024)
536 } else {
537 Some(usize::MAX)
538 },
539 write_with_user_metadata: true,
540
541 delete: true,
542 delete_with_version: self.config.enable_versioning,
543 delete_max_size: Some(delete_max_size),
544
545 copy: true,
546
547 list: true,
548 list_with_limit: true,
549 list_with_start_after: true,
550 list_with_recursive: true,
551 list_with_versions: self.config.enable_versioning,
552 list_with_deleted: self.config.enable_versioning,
553
554 presign: true,
555 presign_stat: true,
556 presign_read: true,
557 presign_write: true,
558
559 shared: true,
560
561 ..Default::default()
562 });
563
564 #[allow(deprecated)]
566 if let Some(client) = self.http_client {
567 am.update_http_client(|_| client);
568 }
569
570 am.into()
571 },
572 root,
573 bucket: bucket.to_owned(),
574 endpoint,
575 host,
576 presign_endpoint,
577 allow_anonymous: self.config.allow_anonymous,
578 signer,
579 loader,
580 server_side_encryption,
581 server_side_encryption_key_id,
582 }),
583 })
584 }
585}
586
587#[derive(Debug, Clone)]
588pub struct OssBackend {
590 core: Arc<OssCore>,
591}
592
593impl Access for OssBackend {
594 type Reader = HttpBody;
595 type Writer = OssWriters;
596 type Lister = OssListers;
597 type Deleter = oio::BatchDeleter<OssDeleter>;
598
599 fn info(&self) -> Arc<AccessorInfo> {
600 self.core.info.clone()
601 }
602
603 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
604 let resp = self.core.oss_head_object(path, &args).await?;
605
606 let status = resp.status();
607
608 match status {
609 StatusCode::OK => {
610 let headers = resp.headers();
611 let mut meta = self.core.parse_metadata(path, resp.headers())?;
612
613 if let Some(v) = parse_header_to_str(headers, constants::X_OSS_VERSION_ID)? {
614 meta.set_version(v);
615 }
616
617 Ok(RpStat::new(meta))
618 }
619 _ => Err(parse_error(resp)),
620 }
621 }
622
623 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
624 let resp = self.core.oss_get_object(path, &args).await?;
625
626 let status = resp.status();
627
628 match status {
629 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
630 Ok((RpRead::default(), resp.into_body()))
631 }
632 _ => {
633 let (part, mut body) = resp.into_parts();
634 let buf = body.to_buffer().await?;
635 Err(parse_error(Response::from_parts(part, buf)))
636 }
637 }
638 }
639
640 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
641 let writer = OssWriter::new(self.core.clone(), path, args.clone());
642
643 let w = if args.append() {
644 OssWriters::Two(oio::AppendWriter::new(writer))
645 } else {
646 OssWriters::One(oio::MultipartWriter::new(
647 self.core.info.clone(),
648 writer,
649 args.concurrent(),
650 ))
651 };
652
653 Ok((RpWrite::default(), w))
654 }
655
656 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
657 Ok((
658 RpDelete::default(),
659 oio::BatchDeleter::new(
660 OssDeleter::new(self.core.clone()),
661 self.core.info.full_capability().delete_max_size,
662 ),
663 ))
664 }
665
666 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
667 let l = if args.versions() || args.deleted() {
668 TwoWays::Two(oio::PageLister::new(OssObjectVersionsLister::new(
669 self.core.clone(),
670 path,
671 args,
672 )))
673 } else {
674 TwoWays::One(oio::PageLister::new(OssLister::new(
675 self.core.clone(),
676 path,
677 args.recursive(),
678 args.limit(),
679 args.start_after(),
680 )))
681 };
682
683 Ok((RpList::default(), l))
684 }
685
686 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
687 let resp = self.core.oss_copy_object(from, to).await?;
688 let status = resp.status();
689
690 match status {
691 StatusCode::OK => Ok(RpCopy::default()),
692 _ => Err(parse_error(resp)),
693 }
694 }
695
696 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
697 let req = match args.operation() {
699 PresignOperation::Stat(v) => self.core.oss_head_object_request(path, true, v),
700 PresignOperation::Read(v) => self.core.oss_get_object_request(path, true, v),
701 PresignOperation::Write(v) => {
702 self.core
703 .oss_put_object_request(path, None, v, Buffer::new(), true)
704 }
705 PresignOperation::Delete(_) => Err(Error::new(
706 ErrorKind::Unsupported,
707 "operation is not supported",
708 )),
709 };
710 let mut req = req?;
711
712 self.core.sign_query(&mut req, args.expire()).await?;
713
714 let (parts, _) = req.into_parts();
716
717 Ok(RpPresign::new(PresignedRequest::new(
718 parts.method,
719 parts.uri,
720 parts.headers,
721 )))
722 }
723}