1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use http::Uri;
25use log::debug;
26use reqsign::AliyunConfig;
27use reqsign::AliyunLoader;
28use reqsign::AliyunOssSigner;
29
30use super::core::*;
31use super::delete::OssDeleter;
32use super::error::parse_error;
33use super::lister::OssLister;
34use super::lister::OssListers;
35use super::lister::OssObjectVersionsLister;
36use super::writer::OssWriter;
37use super::writer::OssWriters;
38use super::DEFAULT_SCHEME;
39use crate::raw::*;
40use crate::services::OssConfig;
41use crate::*;
42const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
43
44impl Configurator for OssConfig {
45 type Builder = OssBuilder;
46
47 #[allow(deprecated)]
48 fn into_builder(self) -> Self::Builder {
49 OssBuilder {
50 config: self,
51
52 http_client: None,
53 }
54 }
55}
56
57#[doc = include_str!("docs.md")]
59#[derive(Default)]
60pub struct OssBuilder {
61 config: OssConfig,
62
63 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
64 http_client: Option<HttpClient>,
65}
66
67impl Debug for OssBuilder {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 let mut d = f.debug_struct("OssBuilder");
70
71 d.field("config", &self.config);
72 d.finish_non_exhaustive()
73 }
74}
75
76impl OssBuilder {
77 pub fn root(mut self, root: &str) -> Self {
81 self.config.root = if root.is_empty() {
82 None
83 } else {
84 Some(root.to_string())
85 };
86
87 self
88 }
89
90 pub fn bucket(mut self, bucket: &str) -> Self {
92 self.config.bucket = bucket.to_string();
93
94 self
95 }
96
97 pub fn endpoint(mut self, endpoint: &str) -> Self {
99 if !endpoint.is_empty() {
100 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
102 }
103
104 self
105 }
106
107 pub fn enable_versioning(mut self, enabled: bool) -> Self {
109 self.config.enable_versioning = enabled;
110
111 self
112 }
113
114 pub fn presign_endpoint(mut self, endpoint: &str) -> Self {
123 if !endpoint.is_empty() {
124 self.config.presign_endpoint = Some(endpoint.trim_end_matches('/').to_string())
126 }
127
128 self
129 }
130
131 pub fn access_key_id(mut self, v: &str) -> Self {
136 if !v.is_empty() {
137 self.config.access_key_id = Some(v.to_string())
138 }
139
140 self
141 }
142
143 pub fn access_key_secret(mut self, v: &str) -> Self {
148 if !v.is_empty() {
149 self.config.access_key_secret = Some(v.to_string())
150 }
151
152 self
153 }
154
155 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
162 #[allow(deprecated)]
163 pub fn http_client(mut self, client: HttpClient) -> Self {
164 self.http_client = Some(client);
165 self
166 }
167
168 fn parse_endpoint(&self, endpoint: &Option<String>, bucket: &str) -> Result<(String, String)> {
170 let (endpoint, host) = match endpoint.clone() {
171 Some(ep) => {
172 let uri = ep.parse::<Uri>().map_err(|err| {
173 Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
174 .with_context("service", Scheme::Oss)
175 .with_context("endpoint", &ep)
176 .set_source(err)
177 })?;
178 let host = uri.host().ok_or_else(|| {
179 Error::new(ErrorKind::ConfigInvalid, "endpoint host is empty")
180 .with_context("service", Scheme::Oss)
181 .with_context("endpoint", &ep)
182 })?;
183 let full_host = if let Some(port) = uri.port_u16() {
184 format!("{bucket}.{host}:{port}")
185 } else {
186 format!("{bucket}.{host}")
187 };
188 let endpoint = match uri.scheme_str() {
189 Some(scheme_str) => match scheme_str {
190 "http" | "https" => format!("{scheme_str}://{full_host}"),
191 _ => {
192 return Err(Error::new(
193 ErrorKind::ConfigInvalid,
194 "endpoint protocol is invalid",
195 )
196 .with_context("service", Scheme::Oss));
197 }
198 },
199 None => format!("https://{full_host}"),
200 };
201 (endpoint, full_host)
202 }
203 None => {
204 return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
205 .with_context("service", Scheme::Oss));
206 }
207 };
208 Ok((endpoint, host))
209 }
210
211 pub fn server_side_encryption(mut self, v: &str) -> Self {
228 if !v.is_empty() {
229 self.config.server_side_encryption = Some(v.to_string())
230 }
231 self
232 }
233
234 pub fn server_side_encryption_key_id(mut self, v: &str) -> Self {
240 if !v.is_empty() {
241 self.config.server_side_encryption_key_id = Some(v.to_string())
242 }
243 self
244 }
245
246 #[deprecated(
248 since = "0.52.0",
249 note = "Please use `delete_max_size` instead of `batch_max_operations`"
250 )]
251 pub fn batch_max_operations(mut self, delete_max_size: usize) -> Self {
252 self.config.delete_max_size = Some(delete_max_size);
253
254 self
255 }
256
257 pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
259 self.config.delete_max_size = Some(delete_max_size);
260
261 self
262 }
263
264 pub fn allow_anonymous(mut self) -> Self {
267 self.config.allow_anonymous = true;
268 self
269 }
270
271 pub fn role_arn(mut self, role_arn: &str) -> Self {
276 if !role_arn.is_empty() {
277 self.config.role_arn = Some(role_arn.to_string())
278 }
279
280 self
281 }
282
283 pub fn role_session_name(mut self, role_session_name: &str) -> Self {
285 if !role_session_name.is_empty() {
286 self.config.role_session_name = Some(role_session_name.to_string())
287 }
288
289 self
290 }
291
292 pub fn oidc_provider_arn(mut self, oidc_provider_arn: &str) -> Self {
294 if !oidc_provider_arn.is_empty() {
295 self.config.oidc_provider_arn = Some(oidc_provider_arn.to_string())
296 }
297
298 self
299 }
300
301 pub fn oidc_token_file(mut self, oidc_token_file: &str) -> Self {
303 if !oidc_token_file.is_empty() {
304 self.config.oidc_token_file = Some(oidc_token_file.to_string())
305 }
306
307 self
308 }
309
310 pub fn sts_endpoint(mut self, sts_endpoint: &str) -> Self {
312 if !sts_endpoint.is_empty() {
313 self.config.sts_endpoint = Some(sts_endpoint.to_string())
314 }
315
316 self
317 }
318}
319
320impl Builder for OssBuilder {
321 type Config = OssConfig;
322
323 fn build(self) -> Result<impl Access> {
324 debug!("backend build started: {:?}", &self);
325
326 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
327 debug!("backend use root {}", &root);
328
329 let bucket = match self.config.bucket.is_empty() {
331 false => Ok(&self.config.bucket),
332 true => Err(
333 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
334 .with_context("service", Scheme::Oss),
335 ),
336 }?;
337
338 let (endpoint, host) = self.parse_endpoint(&self.config.endpoint, bucket)?;
341 debug!("backend use bucket {}, endpoint: {}", &bucket, &endpoint);
342
343 let presign_endpoint = if self.config.presign_endpoint.is_some() {
344 self.parse_endpoint(&self.config.presign_endpoint, bucket)?
345 .0
346 } else {
347 endpoint.clone()
348 };
349 debug!("backend use presign_endpoint: {}", &presign_endpoint);
350
351 let server_side_encryption = match &self.config.server_side_encryption {
352 None => None,
353 Some(v) => Some(
354 build_header_value(v)
355 .map_err(|err| err.with_context("key", "server_side_encryption"))?,
356 ),
357 };
358
359 let server_side_encryption_key_id = match &self.config.server_side_encryption_key_id {
360 None => None,
361 Some(v) => Some(
362 build_header_value(v)
363 .map_err(|err| err.with_context("key", "server_side_encryption_key_id"))?,
364 ),
365 };
366
367 let mut cfg = AliyunConfig::default();
368 cfg = cfg.from_env();
370
371 if let Some(v) = self.config.access_key_id {
372 cfg.access_key_id = Some(v);
373 }
374
375 if let Some(v) = self.config.access_key_secret {
376 cfg.access_key_secret = Some(v);
377 }
378
379 if let Some(v) = self.config.role_arn {
380 cfg.role_arn = Some(v);
381 }
382
383 if let Some(v) = self.config.role_session_name {
385 cfg.role_session_name = v;
386 }
387
388 if let Some(v) = self.config.oidc_provider_arn {
389 cfg.oidc_provider_arn = Some(v);
390 }
391
392 if let Some(v) = self.config.oidc_token_file {
393 cfg.oidc_token_file = Some(v);
394 }
395
396 if let Some(v) = self.config.sts_endpoint {
397 cfg.sts_endpoint = Some(v);
398 }
399
400 let loader = AliyunLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
401
402 let signer = AliyunOssSigner::new(bucket);
403
404 let delete_max_size = self
405 .config
406 .delete_max_size
407 .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
408
409 Ok(OssBackend {
410 core: Arc::new(OssCore {
411 info: {
412 let am = AccessorInfo::default();
413 am.set_scheme(DEFAULT_SCHEME)
414 .set_root(&root)
415 .set_name(bucket)
416 .set_native_capability(Capability {
417 stat: true,
418 stat_with_if_match: true,
419 stat_with_if_none_match: true,
420 stat_with_version: self.config.enable_versioning,
421
422 read: true,
423
424 read_with_if_match: true,
425 read_with_if_none_match: true,
426 read_with_version: self.config.enable_versioning,
427 read_with_if_modified_since: true,
428 read_with_if_unmodified_since: true,
429
430 write: true,
431 write_can_empty: true,
432 write_can_append: true,
433 write_can_multi: true,
434 write_with_cache_control: true,
435 write_with_content_type: true,
436 write_with_content_disposition: true,
437 write_with_if_not_exists: !self.config.enable_versioning,
439
440 write_multi_min_size: Some(100 * 1024),
444 write_multi_max_size: if cfg!(target_pointer_width = "64") {
448 Some(5 * 1024 * 1024 * 1024)
449 } else {
450 Some(usize::MAX)
451 },
452 write_with_user_metadata: true,
453
454 delete: true,
455 delete_with_version: self.config.enable_versioning,
456 delete_max_size: Some(delete_max_size),
457
458 copy: true,
459
460 list: true,
461 list_with_limit: true,
462 list_with_start_after: true,
463 list_with_recursive: true,
464 list_with_versions: self.config.enable_versioning,
465 list_with_deleted: self.config.enable_versioning,
466
467 presign: true,
468 presign_stat: true,
469 presign_read: true,
470 presign_write: true,
471
472 shared: true,
473
474 ..Default::default()
475 });
476
477 #[allow(deprecated)]
479 if let Some(client) = self.http_client {
480 am.update_http_client(|_| client);
481 }
482
483 am.into()
484 },
485 root,
486 bucket: bucket.to_owned(),
487 endpoint,
488 host,
489 presign_endpoint,
490 allow_anonymous: self.config.allow_anonymous,
491 signer,
492 loader,
493 server_side_encryption,
494 server_side_encryption_key_id,
495 }),
496 })
497 }
498}
499
500#[derive(Debug, Clone)]
501pub struct OssBackend {
503 core: Arc<OssCore>,
504}
505
506impl Access for OssBackend {
507 type Reader = HttpBody;
508 type Writer = OssWriters;
509 type Lister = OssListers;
510 type Deleter = oio::BatchDeleter<OssDeleter>;
511
512 fn info(&self) -> Arc<AccessorInfo> {
513 self.core.info.clone()
514 }
515
516 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
517 let resp = self.core.oss_head_object(path, &args).await?;
518
519 let status = resp.status();
520
521 match status {
522 StatusCode::OK => {
523 let headers = resp.headers();
524 let mut meta = self.core.parse_metadata(path, resp.headers())?;
525
526 if let Some(v) = parse_header_to_str(headers, constants::X_OSS_VERSION_ID)? {
527 meta.set_version(v);
528 }
529
530 Ok(RpStat::new(meta))
531 }
532 _ => Err(parse_error(resp)),
533 }
534 }
535
536 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
537 let resp = self.core.oss_get_object(path, &args).await?;
538
539 let status = resp.status();
540
541 match status {
542 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
543 Ok((RpRead::default(), resp.into_body()))
544 }
545 _ => {
546 let (part, mut body) = resp.into_parts();
547 let buf = body.to_buffer().await?;
548 Err(parse_error(Response::from_parts(part, buf)))
549 }
550 }
551 }
552
553 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
554 let writer = OssWriter::new(self.core.clone(), path, args.clone());
555
556 let w = if args.append() {
557 OssWriters::Two(oio::AppendWriter::new(writer))
558 } else {
559 OssWriters::One(oio::MultipartWriter::new(
560 self.core.info.clone(),
561 writer,
562 args.concurrent(),
563 ))
564 };
565
566 Ok((RpWrite::default(), w))
567 }
568
569 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
570 Ok((
571 RpDelete::default(),
572 oio::BatchDeleter::new(OssDeleter::new(self.core.clone())),
573 ))
574 }
575
576 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
577 let l = if args.versions() || args.deleted() {
578 TwoWays::Two(oio::PageLister::new(OssObjectVersionsLister::new(
579 self.core.clone(),
580 path,
581 args,
582 )))
583 } else {
584 TwoWays::One(oio::PageLister::new(OssLister::new(
585 self.core.clone(),
586 path,
587 args.recursive(),
588 args.limit(),
589 args.start_after(),
590 )))
591 };
592
593 Ok((RpList::default(), l))
594 }
595
596 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
597 let resp = self.core.oss_copy_object(from, to).await?;
598 let status = resp.status();
599
600 match status {
601 StatusCode::OK => Ok(RpCopy::default()),
602 _ => Err(parse_error(resp)),
603 }
604 }
605
606 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
607 let req = match args.operation() {
609 PresignOperation::Stat(v) => self.core.oss_head_object_request(path, true, v),
610 PresignOperation::Read(v) => self.core.oss_get_object_request(path, true, v),
611 PresignOperation::Write(v) => {
612 self.core
613 .oss_put_object_request(path, None, v, Buffer::new(), true)
614 }
615 PresignOperation::Delete(_) => Err(Error::new(
616 ErrorKind::Unsupported,
617 "operation is not supported",
618 )),
619 };
620 let mut req = req?;
621
622 self.core.sign_query(&mut req, args.expire()).await?;
623
624 let (parts, _) = req.into_parts();
626
627 Ok(RpPresign::new(PresignedRequest::new(
628 parts.method,
629 parts.uri,
630 parts.headers,
631 )))
632 }
633}