1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use http::Uri;
25use log::debug;
26use reqsign::AliyunConfig;
27use reqsign::AliyunLoader;
28use reqsign::AliyunOssSigner;
29
30use super::core::*;
31use super::delete::OssDeleter;
32use super::error::parse_error;
33use super::lister::OssLister;
34use super::lister::OssListers;
35use super::lister::OssObjectVersionsLister;
36use super::writer::OssWriter;
37use super::writer::OssWriters;
38use crate::raw::*;
39use crate::services::OssConfig;
40use crate::*;
41
42const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
43
44impl Configurator for OssConfig {
45 type Builder = OssBuilder;
46
47 #[allow(deprecated)]
48 fn into_builder(self) -> Self::Builder {
49 OssBuilder {
50 config: self,
51
52 http_client: None,
53 }
54 }
55}
56
57#[doc = include_str!("docs.md")]
59#[derive(Default)]
60pub struct OssBuilder {
61 config: OssConfig,
62
63 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
64 http_client: Option<HttpClient>,
65}
66
67impl Debug for OssBuilder {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 let mut d = f.debug_struct("OssBuilder");
70
71 d.field("config", &self.config);
72 d.finish_non_exhaustive()
73 }
74}
75
76impl OssBuilder {
77 pub fn root(mut self, root: &str) -> Self {
81 self.config.root = if root.is_empty() {
82 None
83 } else {
84 Some(root.to_string())
85 };
86
87 self
88 }
89
90 pub fn bucket(mut self, bucket: &str) -> Self {
92 self.config.bucket = bucket.to_string();
93
94 self
95 }
96
97 pub fn endpoint(mut self, endpoint: &str) -> Self {
99 if !endpoint.is_empty() {
100 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
102 }
103
104 self
105 }
106
107 pub fn enable_versioning(mut self, enabled: bool) -> Self {
109 self.config.enable_versioning = enabled;
110
111 self
112 }
113
114 pub fn presign_endpoint(mut self, endpoint: &str) -> Self {
123 if !endpoint.is_empty() {
124 self.config.presign_endpoint = Some(endpoint.trim_end_matches('/').to_string())
126 }
127
128 self
129 }
130
131 pub fn access_key_id(mut self, v: &str) -> Self {
136 if !v.is_empty() {
137 self.config.access_key_id = Some(v.to_string())
138 }
139
140 self
141 }
142
143 pub fn access_key_secret(mut self, v: &str) -> Self {
148 if !v.is_empty() {
149 self.config.access_key_secret = Some(v.to_string())
150 }
151
152 self
153 }
154
155 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
162 #[allow(deprecated)]
163 pub fn http_client(mut self, client: HttpClient) -> Self {
164 self.http_client = Some(client);
165 self
166 }
167
168 fn parse_endpoint(&self, endpoint: &Option<String>, bucket: &str) -> Result<(String, String)> {
170 let (endpoint, host) = match endpoint.clone() {
171 Some(ep) => {
172 let uri = ep.parse::<Uri>().map_err(|err| {
173 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
174 .with_context("service", Scheme::Oss)
175 .with_context("endpoint", &ep)
176 .set_source(err)
177 })?;
178 let host = uri.host().ok_or_else(|| {
179 Error::new(ErrorKind::ConfigInvalid, "endpoint host is empty")
180 .with_context("service", Scheme::Oss)
181 .with_context("endpoint", &ep)
182 })?;
183 let full_host = if let Some(port) = uri.port_u16() {
184 format!("{bucket}.{host}:{port}")
185 } else {
186 format!("{bucket}.{host}")
187 };
188 let endpoint = match uri.scheme_str() {
189 Some(scheme_str) => match scheme_str {
190 "http" | "https" => format!("{scheme_str}://{full_host}"),
191 _ => {
192 return Err(Error::new(
193 ErrorKind::ConfigInvalid,
194 "endpoint protocol is invalid",
195 )
196 .with_context("service", Scheme::Oss));
197 }
198 },
199 None => format!("https://{full_host}"),
200 };
201 (endpoint, full_host)
202 }
203 None => {
204 return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
205 .with_context("service", Scheme::Oss));
206 }
207 };
208 Ok((endpoint, host))
209 }
210
211 pub fn server_side_encryption(mut self, v: &str) -> Self {
228 if !v.is_empty() {
229 self.config.server_side_encryption = Some(v.to_string())
230 }
231 self
232 }
233
234 pub fn server_side_encryption_key_id(mut self, v: &str) -> Self {
240 if !v.is_empty() {
241 self.config.server_side_encryption_key_id = Some(v.to_string())
242 }
243 self
244 }
245
246 #[deprecated(
248 since = "0.52.0",
249 note = "Please use `delete_max_size` instead of `batch_max_operations`"
250 )]
251 pub fn batch_max_operations(mut self, delete_max_size: usize) -> Self {
252 self.config.delete_max_size = Some(delete_max_size);
253
254 self
255 }
256
257 pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
259 self.config.delete_max_size = Some(delete_max_size);
260
261 self
262 }
263
264 pub fn allow_anonymous(mut self) -> Self {
267 self.config.allow_anonymous = true;
268 self
269 }
270
271 pub fn role_arn(mut self, role_arn: &str) -> Self {
276 if !role_arn.is_empty() {
277 self.config.role_arn = Some(role_arn.to_string())
278 }
279
280 self
281 }
282
283 pub fn role_session_name(mut self, role_session_name: &str) -> Self {
285 if !role_session_name.is_empty() {
286 self.config.role_session_name = Some(role_session_name.to_string())
287 }
288
289 self
290 }
291
292 pub fn oidc_provider_arn(mut self, oidc_provider_arn: &str) -> Self {
294 if !oidc_provider_arn.is_empty() {
295 self.config.oidc_provider_arn = Some(oidc_provider_arn.to_string())
296 }
297
298 self
299 }
300
301 pub fn oidc_token_file(mut self, oidc_token_file: &str) -> Self {
303 if !oidc_token_file.is_empty() {
304 self.config.oidc_token_file = Some(oidc_token_file.to_string())
305 }
306
307 self
308 }
309
310 pub fn sts_endpoint(mut self, sts_endpoint: &str) -> Self {
312 if !sts_endpoint.is_empty() {
313 self.config.sts_endpoint = Some(sts_endpoint.to_string())
314 }
315
316 self
317 }
318}
319
320impl Builder for OssBuilder {
321 const SCHEME: Scheme = Scheme::Oss;
322 type Config = OssConfig;
323
324 fn build(self) -> Result<impl Access> {
325 debug!("backend build started: {:?}", &self);
326
327 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
328 debug!("backend use root {}", &root);
329
330 let bucket = match self.config.bucket.is_empty() {
332 false => Ok(&self.config.bucket),
333 true => Err(
334 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
335 .with_context("service", Scheme::Oss),
336 ),
337 }?;
338
339 let (endpoint, host) = self.parse_endpoint(&self.config.endpoint, bucket)?;
342 debug!("backend use bucket {}, endpoint: {}", &bucket, &endpoint);
343
344 let presign_endpoint = if self.config.presign_endpoint.is_some() {
345 self.parse_endpoint(&self.config.presign_endpoint, bucket)?
346 .0
347 } else {
348 endpoint.clone()
349 };
350 debug!("backend use presign_endpoint: {}", &presign_endpoint);
351
352 let server_side_encryption = match &self.config.server_side_encryption {
353 None => None,
354 Some(v) => Some(
355 build_header_value(v)
356 .map_err(|err| err.with_context("key", "server_side_encryption"))?,
357 ),
358 };
359
360 let server_side_encryption_key_id = match &self.config.server_side_encryption_key_id {
361 None => None,
362 Some(v) => Some(
363 build_header_value(v)
364 .map_err(|err| err.with_context("key", "server_side_encryption_key_id"))?,
365 ),
366 };
367
368 let mut cfg = AliyunConfig::default();
369 cfg = cfg.from_env();
371
372 if let Some(v) = self.config.access_key_id {
373 cfg.access_key_id = Some(v);
374 }
375
376 if let Some(v) = self.config.access_key_secret {
377 cfg.access_key_secret = Some(v);
378 }
379
380 if let Some(v) = self.config.role_arn {
381 cfg.role_arn = Some(v);
382 }
383
384 if let Some(v) = self.config.role_session_name {
386 cfg.role_session_name = v;
387 }
388
389 if let Some(v) = self.config.oidc_provider_arn {
390 cfg.oidc_provider_arn = Some(v);
391 }
392
393 if let Some(v) = self.config.oidc_token_file {
394 cfg.oidc_token_file = Some(v);
395 }
396
397 if let Some(v) = self.config.sts_endpoint {
398 cfg.sts_endpoint = Some(v);
399 }
400
401 let loader = AliyunLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
402
403 let signer = AliyunOssSigner::new(bucket);
404
405 let delete_max_size = self
406 .config
407 .delete_max_size
408 .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
409
410 Ok(OssBackend {
411 core: Arc::new(OssCore {
412 info: {
413 let am = AccessorInfo::default();
414 am.set_scheme(Scheme::Oss)
415 .set_root(&root)
416 .set_name(bucket)
417 .set_native_capability(Capability {
418 stat: true,
419 stat_with_if_match: true,
420 stat_with_if_none_match: true,
421 stat_has_cache_control: true,
422 stat_has_content_length: true,
423 stat_has_content_type: true,
424 stat_has_content_encoding: true,
425 stat_has_content_range: true,
426 stat_with_version: self.config.enable_versioning,
427 stat_has_etag: true,
428 stat_has_content_md5: true,
429 stat_has_last_modified: true,
430 stat_has_content_disposition: true,
431 stat_has_user_metadata: true,
432 stat_has_version: true,
433
434 read: true,
435
436 read_with_if_match: true,
437 read_with_if_none_match: true,
438 read_with_version: self.config.enable_versioning,
439 read_with_if_modified_since: true,
440 read_with_if_unmodified_since: true,
441
442 write: true,
443 write_can_empty: true,
444 write_can_append: true,
445 write_can_multi: true,
446 write_with_cache_control: true,
447 write_with_content_type: true,
448 write_with_content_disposition: true,
449 write_with_if_not_exists: !self.config.enable_versioning,
451
452 write_multi_min_size: Some(100 * 1024),
456 write_multi_max_size: if cfg!(target_pointer_width = "64") {
460 Some(5 * 1024 * 1024 * 1024)
461 } else {
462 Some(usize::MAX)
463 },
464 write_with_user_metadata: true,
465
466 delete: true,
467 delete_with_version: self.config.enable_versioning,
468 delete_max_size: Some(delete_max_size),
469
470 copy: true,
471
472 list: true,
473 list_with_limit: true,
474 list_with_start_after: true,
475 list_with_recursive: true,
476 list_has_etag: true,
477 list_has_content_md5: true,
478 list_with_versions: self.config.enable_versioning,
479 list_with_deleted: self.config.enable_versioning,
480 list_has_content_length: true,
481 list_has_last_modified: true,
482
483 presign: true,
484 presign_stat: true,
485 presign_read: true,
486 presign_write: true,
487
488 shared: true,
489
490 ..Default::default()
491 });
492
493 #[allow(deprecated)]
495 if let Some(client) = self.http_client {
496 am.update_http_client(|_| client);
497 }
498
499 am.into()
500 },
501 root,
502 bucket: bucket.to_owned(),
503 endpoint,
504 host,
505 presign_endpoint,
506 allow_anonymous: self.config.allow_anonymous,
507 signer,
508 loader,
509 server_side_encryption,
510 server_side_encryption_key_id,
511 }),
512 })
513 }
514}
515
516#[derive(Debug, Clone)]
517pub struct OssBackend {
519 core: Arc<OssCore>,
520}
521
522impl Access for OssBackend {
523 type Reader = HttpBody;
524 type Writer = OssWriters;
525 type Lister = OssListers;
526 type Deleter = oio::BatchDeleter<OssDeleter>;
527
528 fn info(&self) -> Arc<AccessorInfo> {
529 self.core.info.clone()
530 }
531
532 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
533 let resp = self.core.oss_head_object(path, &args).await?;
534
535 let status = resp.status();
536
537 match status {
538 StatusCode::OK => {
539 let headers = resp.headers();
540 let mut meta = self.core.parse_metadata(path, resp.headers())?;
541
542 if let Some(v) = parse_header_to_str(headers, constants::X_OSS_VERSION_ID)? {
543 meta.set_version(v);
544 }
545
546 Ok(RpStat::new(meta))
547 }
548 _ => Err(parse_error(resp)),
549 }
550 }
551
552 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
553 let resp = self.core.oss_get_object(path, &args).await?;
554
555 let status = resp.status();
556
557 match status {
558 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
559 Ok((RpRead::default(), resp.into_body()))
560 }
561 _ => {
562 let (part, mut body) = resp.into_parts();
563 let buf = body.to_buffer().await?;
564 Err(parse_error(Response::from_parts(part, buf)))
565 }
566 }
567 }
568
569 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
570 let writer = OssWriter::new(self.core.clone(), path, args.clone());
571
572 let w = if args.append() {
573 OssWriters::Two(oio::AppendWriter::new(writer))
574 } else {
575 OssWriters::One(oio::MultipartWriter::new(
576 self.core.info.clone(),
577 writer,
578 args.concurrent(),
579 ))
580 };
581
582 Ok((RpWrite::default(), w))
583 }
584
585 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
586 Ok((
587 RpDelete::default(),
588 oio::BatchDeleter::new(OssDeleter::new(self.core.clone())),
589 ))
590 }
591
592 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
593 let l = if args.versions() || args.deleted() {
594 TwoWays::Two(oio::PageLister::new(OssObjectVersionsLister::new(
595 self.core.clone(),
596 path,
597 args,
598 )))
599 } else {
600 TwoWays::One(oio::PageLister::new(OssLister::new(
601 self.core.clone(),
602 path,
603 args.recursive(),
604 args.limit(),
605 args.start_after(),
606 )))
607 };
608
609 Ok((RpList::default(), l))
610 }
611
612 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
613 let resp = self.core.oss_copy_object(from, to).await?;
614 let status = resp.status();
615
616 match status {
617 StatusCode::OK => Ok(RpCopy::default()),
618 _ => Err(parse_error(resp)),
619 }
620 }
621
622 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
623 let req = match args.operation() {
625 PresignOperation::Stat(v) => self.core.oss_head_object_request(path, true, v),
626 PresignOperation::Read(v) => self.core.oss_get_object_request(path, true, v),
627 PresignOperation::Write(v) => {
628 self.core
629 .oss_put_object_request(path, None, v, Buffer::new(), true)
630 }
631 PresignOperation::Delete(_) => Err(Error::new(
632 ErrorKind::Unsupported,
633 "operation is not supported",
634 )),
635 };
636 let mut req = req?;
637
638 self.core.sign_query(&mut req, args.expire()).await?;
639
640 let (parts, _) = req.into_parts();
642
643 Ok(RpPresign::new(PresignedRequest::new(
644 parts.method,
645 parts.uri,
646 parts.headers,
647 )))
648 }
649}