1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use http::Uri;
25use log::debug;
26use reqsign::AliyunConfig;
27use reqsign::AliyunLoader;
28use reqsign::AliyunOssSigner;
29
30use super::core::*;
31use super::delete::OssDeleter;
32use super::error::parse_error;
33use super::lister::{OssLister, OssListers, OssObjectVersionsLister};
34use super::writer::OssWriter;
35use super::writer::OssWriters;
36use crate::raw::*;
37use crate::services::OssConfig;
38use crate::*;
39
40const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
41
42impl Configurator for OssConfig {
43 type Builder = OssBuilder;
44
45 #[allow(deprecated)]
46 fn into_builder(self) -> Self::Builder {
47 OssBuilder {
48 config: self,
49
50 http_client: None,
51 }
52 }
53}
54
55#[doc = include_str!("docs.md")]
57#[derive(Default)]
58pub struct OssBuilder {
59 config: OssConfig,
60
61 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
62 http_client: Option<HttpClient>,
63}
64
65impl Debug for OssBuilder {
66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67 let mut d = f.debug_struct("OssBuilder");
68
69 d.field("config", &self.config);
70 d.finish_non_exhaustive()
71 }
72}
73
74impl OssBuilder {
75 pub fn root(mut self, root: &str) -> Self {
79 self.config.root = if root.is_empty() {
80 None
81 } else {
82 Some(root.to_string())
83 };
84
85 self
86 }
87
88 pub fn bucket(mut self, bucket: &str) -> Self {
90 self.config.bucket = bucket.to_string();
91
92 self
93 }
94
95 pub fn endpoint(mut self, endpoint: &str) -> Self {
97 if !endpoint.is_empty() {
98 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
100 }
101
102 self
103 }
104
105 pub fn enable_versioning(mut self, enabled: bool) -> Self {
107 self.config.enable_versioning = enabled;
108
109 self
110 }
111
112 pub fn presign_endpoint(mut self, endpoint: &str) -> Self {
121 if !endpoint.is_empty() {
122 self.config.presign_endpoint = Some(endpoint.trim_end_matches('/').to_string())
124 }
125
126 self
127 }
128
129 pub fn access_key_id(mut self, v: &str) -> Self {
134 if !v.is_empty() {
135 self.config.access_key_id = Some(v.to_string())
136 }
137
138 self
139 }
140
141 pub fn access_key_secret(mut self, v: &str) -> Self {
146 if !v.is_empty() {
147 self.config.access_key_secret = Some(v.to_string())
148 }
149
150 self
151 }
152
153 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
160 #[allow(deprecated)]
161 pub fn http_client(mut self, client: HttpClient) -> Self {
162 self.http_client = Some(client);
163 self
164 }
165
166 fn parse_endpoint(&self, endpoint: &Option<String>, bucket: &str) -> Result<(String, String)> {
168 let (endpoint, host) = match endpoint.clone() {
169 Some(ep) => {
170 let uri = ep.parse::<Uri>().map_err(|err| {
171 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
172 .with_context("service", Scheme::Oss)
173 .with_context("endpoint", &ep)
174 .set_source(err)
175 })?;
176 let host = uri.host().ok_or_else(|| {
177 Error::new(ErrorKind::ConfigInvalid, "endpoint host is empty")
178 .with_context("service", Scheme::Oss)
179 .with_context("endpoint", &ep)
180 })?;
181 let full_host = if let Some(port) = uri.port_u16() {
182 format!("{bucket}.{host}:{port}")
183 } else {
184 format!("{bucket}.{host}")
185 };
186 let endpoint = match uri.scheme_str() {
187 Some(scheme_str) => match scheme_str {
188 "http" | "https" => format!("{scheme_str}://{full_host}"),
189 _ => {
190 return Err(Error::new(
191 ErrorKind::ConfigInvalid,
192 "endpoint protocol is invalid",
193 )
194 .with_context("service", Scheme::Oss));
195 }
196 },
197 None => format!("https://{full_host}"),
198 };
199 (endpoint, full_host)
200 }
201 None => {
202 return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
203 .with_context("service", Scheme::Oss));
204 }
205 };
206 Ok((endpoint, host))
207 }
208
209 pub fn server_side_encryption(mut self, v: &str) -> Self {
226 if !v.is_empty() {
227 self.config.server_side_encryption = Some(v.to_string())
228 }
229 self
230 }
231
232 pub fn server_side_encryption_key_id(mut self, v: &str) -> Self {
238 if !v.is_empty() {
239 self.config.server_side_encryption_key_id = Some(v.to_string())
240 }
241 self
242 }
243
244 #[deprecated(
246 since = "0.52.0",
247 note = "Please use `delete_max_size` instead of `batch_max_operations`"
248 )]
249 pub fn batch_max_operations(mut self, delete_max_size: usize) -> Self {
250 self.config.delete_max_size = Some(delete_max_size);
251
252 self
253 }
254
255 pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
257 self.config.delete_max_size = Some(delete_max_size);
258
259 self
260 }
261
262 pub fn allow_anonymous(mut self) -> Self {
265 self.config.allow_anonymous = true;
266 self
267 }
268
269 pub fn role_arn(mut self, role_arn: &str) -> Self {
274 if !role_arn.is_empty() {
275 self.config.role_arn = Some(role_arn.to_string())
276 }
277
278 self
279 }
280
281 pub fn role_session_name(mut self, role_session_name: &str) -> Self {
283 if !role_session_name.is_empty() {
284 self.config.role_session_name = Some(role_session_name.to_string())
285 }
286
287 self
288 }
289
290 pub fn oidc_provider_arn(mut self, oidc_provider_arn: &str) -> Self {
292 if !oidc_provider_arn.is_empty() {
293 self.config.oidc_provider_arn = Some(oidc_provider_arn.to_string())
294 }
295
296 self
297 }
298
299 pub fn oidc_token_file(mut self, oidc_token_file: &str) -> Self {
301 if !oidc_token_file.is_empty() {
302 self.config.oidc_token_file = Some(oidc_token_file.to_string())
303 }
304
305 self
306 }
307
308 pub fn sts_endpoint(mut self, sts_endpoint: &str) -> Self {
310 if !sts_endpoint.is_empty() {
311 self.config.sts_endpoint = Some(sts_endpoint.to_string())
312 }
313
314 self
315 }
316}
317
318impl Builder for OssBuilder {
319 const SCHEME: Scheme = Scheme::Oss;
320 type Config = OssConfig;
321
322 fn build(self) -> Result<impl Access> {
323 debug!("backend build started: {:?}", &self);
324
325 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
326 debug!("backend use root {}", &root);
327
328 let bucket = match self.config.bucket.is_empty() {
330 false => Ok(&self.config.bucket),
331 true => Err(
332 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
333 .with_context("service", Scheme::Oss),
334 ),
335 }?;
336
337 let (endpoint, host) = self.parse_endpoint(&self.config.endpoint, bucket)?;
340 debug!("backend use bucket {}, endpoint: {}", &bucket, &endpoint);
341
342 let presign_endpoint = if self.config.presign_endpoint.is_some() {
343 self.parse_endpoint(&self.config.presign_endpoint, bucket)?
344 .0
345 } else {
346 endpoint.clone()
347 };
348 debug!("backend use presign_endpoint: {}", &presign_endpoint);
349
350 let server_side_encryption = match &self.config.server_side_encryption {
351 None => None,
352 Some(v) => Some(
353 build_header_value(v)
354 .map_err(|err| err.with_context("key", "server_side_encryption"))?,
355 ),
356 };
357
358 let server_side_encryption_key_id = match &self.config.server_side_encryption_key_id {
359 None => None,
360 Some(v) => Some(
361 build_header_value(v)
362 .map_err(|err| err.with_context("key", "server_side_encryption_key_id"))?,
363 ),
364 };
365
366 let mut cfg = AliyunConfig::default();
367 cfg = cfg.from_env();
369
370 if let Some(v) = self.config.access_key_id {
371 cfg.access_key_id = Some(v);
372 }
373
374 if let Some(v) = self.config.access_key_secret {
375 cfg.access_key_secret = Some(v);
376 }
377
378 if let Some(v) = self.config.role_arn {
379 cfg.role_arn = Some(v);
380 }
381
382 if let Some(v) = self.config.role_session_name {
384 cfg.role_session_name = v;
385 }
386
387 if let Some(v) = self.config.oidc_provider_arn {
388 cfg.oidc_provider_arn = Some(v);
389 }
390
391 if let Some(v) = self.config.oidc_token_file {
392 cfg.oidc_token_file = Some(v);
393 }
394
395 if let Some(v) = self.config.sts_endpoint {
396 cfg.sts_endpoint = Some(v);
397 }
398
399 let loader = AliyunLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
400
401 let signer = AliyunOssSigner::new(bucket);
402
403 let delete_max_size = self
404 .config
405 .delete_max_size
406 .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
407
408 Ok(OssBackend {
409 core: Arc::new(OssCore {
410 info: {
411 let am = AccessorInfo::default();
412 am.set_scheme(Scheme::Oss)
413 .set_root(&root)
414 .set_name(bucket)
415 .set_native_capability(Capability {
416 stat: true,
417 stat_with_if_match: true,
418 stat_with_if_none_match: true,
419 stat_has_cache_control: true,
420 stat_has_content_length: true,
421 stat_has_content_type: true,
422 stat_has_content_encoding: true,
423 stat_has_content_range: true,
424 stat_with_version: self.config.enable_versioning,
425 stat_has_etag: true,
426 stat_has_content_md5: true,
427 stat_has_last_modified: true,
428 stat_has_content_disposition: true,
429 stat_has_user_metadata: true,
430 stat_has_version: true,
431
432 read: true,
433
434 read_with_if_match: true,
435 read_with_if_none_match: true,
436 read_with_version: self.config.enable_versioning,
437 read_with_if_modified_since: true,
438 read_with_if_unmodified_since: true,
439
440 write: true,
441 write_can_empty: true,
442 write_can_append: true,
443 write_can_multi: true,
444 write_with_cache_control: true,
445 write_with_content_type: true,
446 write_with_content_disposition: true,
447 write_with_if_not_exists: !self.config.enable_versioning,
449
450 write_multi_min_size: Some(100 * 1024),
454 write_multi_max_size: if cfg!(target_pointer_width = "64") {
458 Some(5 * 1024 * 1024 * 1024)
459 } else {
460 Some(usize::MAX)
461 },
462 write_with_user_metadata: true,
463
464 delete: true,
465 delete_with_version: self.config.enable_versioning,
466 delete_max_size: Some(delete_max_size),
467
468 copy: true,
469
470 list: true,
471 list_with_limit: true,
472 list_with_start_after: true,
473 list_with_recursive: true,
474 list_has_etag: true,
475 list_has_content_md5: true,
476 list_with_versions: self.config.enable_versioning,
477 list_with_deleted: self.config.enable_versioning,
478 list_has_content_length: true,
479 list_has_last_modified: true,
480
481 presign: true,
482 presign_stat: true,
483 presign_read: true,
484 presign_write: true,
485
486 shared: true,
487
488 ..Default::default()
489 });
490
491 #[allow(deprecated)]
493 if let Some(client) = self.http_client {
494 am.update_http_client(|_| client);
495 }
496
497 am.into()
498 },
499 root,
500 bucket: bucket.to_owned(),
501 endpoint,
502 host,
503 presign_endpoint,
504 allow_anonymous: self.config.allow_anonymous,
505 signer,
506 loader,
507 server_side_encryption,
508 server_side_encryption_key_id,
509 }),
510 })
511 }
512}
513
514#[derive(Debug, Clone)]
515pub struct OssBackend {
517 core: Arc<OssCore>,
518}
519
520impl Access for OssBackend {
521 type Reader = HttpBody;
522 type Writer = OssWriters;
523 type Lister = OssListers;
524 type Deleter = oio::BatchDeleter<OssDeleter>;
525 type BlockingReader = ();
526 type BlockingWriter = ();
527 type BlockingLister = ();
528 type BlockingDeleter = ();
529
530 fn info(&self) -> Arc<AccessorInfo> {
531 self.core.info.clone()
532 }
533
534 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
535 let resp = self.core.oss_head_object(path, &args).await?;
536
537 let status = resp.status();
538
539 match status {
540 StatusCode::OK => {
541 let headers = resp.headers();
542 let mut meta = self.core.parse_metadata(path, resp.headers())?;
543
544 if let Some(v) = parse_header_to_str(headers, constants::X_OSS_VERSION_ID)? {
545 meta.set_version(v);
546 }
547
548 Ok(RpStat::new(meta))
549 }
550 _ => Err(parse_error(resp)),
551 }
552 }
553
554 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
555 let resp = self.core.oss_get_object(path, &args).await?;
556
557 let status = resp.status();
558
559 match status {
560 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
561 Ok((RpRead::default(), resp.into_body()))
562 }
563 _ => {
564 let (part, mut body) = resp.into_parts();
565 let buf = body.to_buffer().await?;
566 Err(parse_error(Response::from_parts(part, buf)))
567 }
568 }
569 }
570
571 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
572 let writer = OssWriter::new(self.core.clone(), path, args.clone());
573
574 let w = if args.append() {
575 OssWriters::Two(oio::AppendWriter::new(writer))
576 } else {
577 OssWriters::One(oio::MultipartWriter::new(
578 self.core.info.clone(),
579 writer,
580 args.concurrent(),
581 ))
582 };
583
584 Ok((RpWrite::default(), w))
585 }
586
587 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
588 Ok((
589 RpDelete::default(),
590 oio::BatchDeleter::new(OssDeleter::new(self.core.clone())),
591 ))
592 }
593
594 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
595 let l = if args.versions() || args.deleted() {
596 TwoWays::Two(oio::PageLister::new(OssObjectVersionsLister::new(
597 self.core.clone(),
598 path,
599 args,
600 )))
601 } else {
602 TwoWays::One(oio::PageLister::new(OssLister::new(
603 self.core.clone(),
604 path,
605 args.recursive(),
606 args.limit(),
607 args.start_after(),
608 )))
609 };
610
611 Ok((RpList::default(), l))
612 }
613
614 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
615 let resp = self.core.oss_copy_object(from, to).await?;
616 let status = resp.status();
617
618 match status {
619 StatusCode::OK => Ok(RpCopy::default()),
620 _ => Err(parse_error(resp)),
621 }
622 }
623
624 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
625 let req = match args.operation() {
627 PresignOperation::Stat(v) => self.core.oss_head_object_request(path, true, v),
628 PresignOperation::Read(v) => self.core.oss_get_object_request(path, true, v),
629 PresignOperation::Write(v) => {
630 self.core
631 .oss_put_object_request(path, None, v, Buffer::new(), true)
632 }
633 PresignOperation::Delete(_) => Err(Error::new(
634 ErrorKind::Unsupported,
635 "operation is not supported",
636 )),
637 };
638 let mut req = req?;
639
640 self.core.sign_query(&mut req, args.expire()).await?;
641
642 let (parts, _) = req.into_parts();
644
645 Ok(RpPresign::new(PresignedRequest::new(
646 parts.method,
647 parts.uri,
648 parts.headers,
649 )))
650 }
651}