opendal/services/s3/
delete.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22
23use super::core::*;
24use super::error::parse_error;
25use super::error::parse_s3_error_code;
26use crate::raw::oio::BatchDeleteResult;
27use crate::raw::*;
28use crate::*;
29
30pub struct S3Deleter {
31    core: Arc<S3Core>,
32}
33
34impl S3Deleter {
35    pub fn new(core: Arc<S3Core>) -> Self {
36        Self { core }
37    }
38}
39
40impl oio::BatchDelete for S3Deleter {
41    async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> {
42        // This would delete the bucket, do not perform
43        if self.core.root == "/" && path == "/" {
44            return Ok(());
45        }
46
47        let resp = self.core.s3_delete_object(&path, &args).await?;
48
49        let status = resp.status();
50
51        match status {
52            StatusCode::NO_CONTENT => Ok(()),
53            // Allow 404 when deleting a non-existing object
54            // This is not a standard behavior, only some s3 alike service like GCS XML API do this.
55            // ref: <https://cloud.google.com/storage/docs/xml-api/delete-object>
56            StatusCode::NOT_FOUND => Ok(()),
57            _ => Err(parse_error(resp)),
58        }
59    }
60
61    async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result<BatchDeleteResult> {
62        let resp = self.core.s3_delete_objects(batch).await?;
63
64        let status = resp.status();
65        if status != StatusCode::OK {
66            return Err(parse_error(resp));
67        }
68
69        let bs = resp.into_body();
70
71        let result: DeleteObjectsResult =
72            quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
73
74        let mut batched_result = BatchDeleteResult {
75            succeeded: Vec::with_capacity(result.deleted.len()),
76            failed: Vec::with_capacity(result.error.len()),
77        };
78        for i in result.deleted {
79            let path = build_rel_path(&self.core.root, &i.key);
80            let mut op = OpDelete::new();
81            if let Some(version_id) = i.version_id {
82                op = op.with_version(version_id.as_str());
83            }
84            batched_result.succeeded.push((path, op));
85        }
86        for i in result.error {
87            let path = build_rel_path(&self.core.root, &i.key);
88            let mut op = OpDelete::new();
89            if let Some(version_id) = &i.version_id {
90                op = op.with_version(version_id.as_str());
91            }
92            batched_result
93                .failed
94                .push((path, op, parse_delete_objects_result_error(i)));
95        }
96
97        Ok(batched_result)
98    }
99}
100
101fn parse_delete_objects_result_error(err: DeleteObjectsResultError) -> Error {
102    let (kind, retryable) =
103        parse_s3_error_code(err.code.as_str()).unwrap_or((ErrorKind::Unexpected, false));
104    let mut err: Error = Error::new(kind, format!("{err:?}"));
105    if retryable {
106        err = err.set_temporary();
107    }
108    err
109}