opendal/services/gcs/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22
23use super::core::CompleteMultipartUploadRequestPart;
24use super::core::GcsCore;
25use super::core::InitiateMultipartUploadResult;
26use super::error::parse_error;
27use crate::raw::*;
28use crate::*;
29
30pub type GcsWriters = oio::MultipartWriter<GcsWriter>;
31
32pub struct GcsWriter {
33    core: Arc<GcsCore>,
34    path: String,
35    op: OpWrite,
36}
37
38impl GcsWriter {
39    pub fn new(core: Arc<GcsCore>, path: &str, op: OpWrite) -> Self {
40        GcsWriter {
41            core,
42            path: path.to_string(),
43            op,
44        }
45    }
46}
47
48impl oio::MultipartWrite for GcsWriter {
49    async fn write_once(&self, _: u64, body: Buffer) -> Result<Metadata> {
50        let size = body.len() as u64;
51        let mut req = self.core.gcs_insert_object_request(
52            &percent_encode_path(&self.path),
53            Some(size),
54            &self.op,
55            body,
56        )?;
57
58        self.core.sign(&mut req).await?;
59
60        let resp = self.core.send(req).await?;
61
62        let status = resp.status();
63
64        match status {
65            StatusCode::CREATED | StatusCode::OK => {
66                let metadata =
67                    GcsCore::build_metadata_from_object_response(&self.path, resp.into_body())?;
68                Ok(metadata)
69            }
70            _ => Err(parse_error(resp)),
71        }
72    }
73
74    async fn initiate_part(&self) -> Result<String> {
75        let resp = self
76            .core
77            .gcs_initiate_multipart_upload(&percent_encode_path(&self.path))
78            .await?;
79
80        if !resp.status().is_success() {
81            return Err(parse_error(resp));
82        }
83
84        let buf = resp.into_body();
85        let upload_id: InitiateMultipartUploadResult =
86            quick_xml::de::from_reader(buf.reader()).map_err(new_xml_deserialize_error)?;
87        Ok(upload_id.upload_id)
88    }
89
90    async fn write_part(
91        &self,
92        upload_id: &str,
93        part_number: usize,
94        size: u64,
95        body: Buffer,
96    ) -> Result<oio::MultipartPart> {
97        // Gcs requires part number must between [1..=10000]
98        let part_number = part_number + 1;
99
100        let resp = self
101            .core
102            .gcs_upload_part(&self.path, upload_id, part_number, size, body)
103            .await?;
104
105        if !resp.status().is_success() {
106            return Err(parse_error(resp));
107        }
108
109        let etag = parse_etag(resp.headers())?
110            .ok_or_else(|| {
111                Error::new(
112                    ErrorKind::Unexpected,
113                    "ETag not present in returning response",
114                )
115            })?
116            .to_string();
117
118        Ok(oio::MultipartPart {
119            part_number,
120            etag,
121            checksum: None,
122        })
123    }
124
125    async fn complete_part(
126        &self,
127        upload_id: &str,
128        parts: &[oio::MultipartPart],
129    ) -> Result<Metadata> {
130        let parts = parts
131            .iter()
132            .map(|p| CompleteMultipartUploadRequestPart {
133                part_number: p.part_number,
134                etag: p.etag.clone(),
135            })
136            .collect();
137
138        let resp = self
139            .core
140            .gcs_complete_multipart_upload(&self.path, upload_id, parts)
141            .await?;
142
143        if !resp.status().is_success() {
144            return Err(parse_error(resp));
145        }
146        // we don't extract metadata from `CompleteMultipartUploadResult`, since we only need the `ETag` from it.
147        // However, the `ETag` differs from the `ETag` obtained through the `stat` operation.
148        // refer to: https://cloud.google.com/storage/docs/metadata#etags
149        Ok(Metadata::default())
150    }
151
152    async fn abort_part(&self, upload_id: &str) -> Result<()> {
153        let resp = self
154            .core
155            .gcs_abort_multipart_upload(&self.path, upload_id)
156            .await?;
157        match resp.status() {
158            // gcs returns code 204 if abort succeeds.
159            StatusCode::NO_CONTENT => Ok(()),
160            _ => Err(parse_error(resp)),
161        }
162    }
163}