opendal/services/gcs/
delete.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::core::*;
19use super::error::parse_error;
20use crate::raw::oio::BatchDeleteResult;
21use crate::raw::*;
22use crate::*;
23use http::StatusCode;
24use std::sync::Arc;
25
26pub struct GcsDeleter {
27    core: Arc<GcsCore>,
28}
29
30impl GcsDeleter {
31    pub fn new(core: Arc<GcsCore>) -> Self {
32        Self { core }
33    }
34}
35
36impl oio::BatchDelete for GcsDeleter {
37    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
38        let resp = self.core.gcs_delete_object(&path).await?;
39
40        // deleting not existing objects is ok
41        if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND {
42            Ok(())
43        } else {
44            Err(parse_error(resp))
45        }
46    }
47
48    async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result<BatchDeleteResult> {
49        let paths: Vec<String> = batch.into_iter().map(|(p, _)| p).collect();
50        let resp = self.core.gcs_delete_objects(paths.clone()).await?;
51
52        let status = resp.status();
53
54        // If the overall request isn't formatted correctly and Cloud Storage is unable to parse it into sub-requests, you receive a 400 error.
55        // Otherwise, Cloud Storage returns a 200 status code, even if some or all of the sub-requests fail.
56        if status != StatusCode::OK {
57            return Err(parse_error(resp));
58        }
59
60        let boundary = parse_multipart_boundary(resp.headers())?.ok_or_else(|| {
61            Error::new(
62                ErrorKind::Unexpected,
63                "gcs batch delete response content type is empty",
64            )
65        })?;
66        let multipart: Multipart<MixedPart> = Multipart::new()
67            .with_boundary(boundary)
68            .parse(resp.into_body().to_bytes())?;
69        let parts = multipart.into_parts();
70
71        let mut batched_result = BatchDeleteResult::default();
72
73        for (i, part) in parts.into_iter().enumerate() {
74            let resp = part.into_response();
75            // TODO: maybe we can take it directly?
76            let path = paths[i].clone();
77
78            // deleting not existing objects is ok
79            if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND {
80                batched_result.succeeded.push((path, OpDelete::default()));
81            } else {
82                batched_result
83                    .failed
84                    .push((path, OpDelete::default(), parse_error(resp)));
85            }
86        }
87
88        // If no object is deleted, return directly.
89        if batched_result.succeeded.is_empty() {
90            let err = batched_result.failed.remove(0).2;
91            return Err(err);
92        }
93
94        Ok(batched_result)
95    }
96}