opendal/services/gcs/
writer.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22
23use super::core::CompleteMultipartUploadRequestPart;
24use super::core::GcsCore;
25use super::core::InitiateMultipartUploadResult;
26use super::error::parse_error;
27use crate::raw::*;
28use crate::*;
29
30pub type GcsWriters = oio::MultipartWriter<GcsWriter>;
31
32pub struct GcsWriter {
33 core: Arc<GcsCore>,
34 path: String,
35 op: OpWrite,
36}
37
38impl GcsWriter {
39 pub fn new(core: Arc<GcsCore>, path: &str, op: OpWrite) -> Self {
40 GcsWriter {
41 core,
42 path: path.to_string(),
43 op,
44 }
45 }
46}
47
48impl oio::MultipartWrite for GcsWriter {
49 async fn write_once(&self, _: u64, body: Buffer) -> Result<Metadata> {
50 let size = body.len() as u64;
51 let mut req = self.core.gcs_insert_object_request(
52 &percent_encode_path(&self.path),
53 Some(size),
54 &self.op,
55 body,
56 )?;
57
58 self.core.sign(&mut req).await?;
59
60 let resp = self.core.send(req).await?;
61
62 let status = resp.status();
63
64 match status {
65 StatusCode::CREATED | StatusCode::OK => {
66 let metadata =
67 GcsCore::build_metadata_from_object_response(&self.path, resp.into_body())?;
68 Ok(metadata)
69 }
70 _ => Err(parse_error(resp)),
71 }
72 }
73
74 async fn initiate_part(&self) -> Result<String> {
75 let resp = self
76 .core
77 .gcs_initiate_multipart_upload(&percent_encode_path(&self.path))
78 .await?;
79
80 if !resp.status().is_success() {
81 return Err(parse_error(resp));
82 }
83
84 let buf = resp.into_body();
85 let upload_id: InitiateMultipartUploadResult =
86 quick_xml::de::from_reader(buf.reader()).map_err(new_xml_deserialize_error)?;
87 Ok(upload_id.upload_id)
88 }
89
90 async fn write_part(
91 &self,
92 upload_id: &str,
93 part_number: usize,
94 size: u64,
95 body: Buffer,
96 ) -> Result<oio::MultipartPart> {
97 let part_number = part_number + 1;
99
100 let resp = self
101 .core
102 .gcs_upload_part(&self.path, upload_id, part_number, size, body)
103 .await?;
104
105 if !resp.status().is_success() {
106 return Err(parse_error(resp));
107 }
108
109 let etag = parse_etag(resp.headers())?
110 .ok_or_else(|| {
111 Error::new(
112 ErrorKind::Unexpected,
113 "ETag not present in returning response",
114 )
115 })?
116 .to_string();
117
118 Ok(oio::MultipartPart {
119 part_number,
120 etag,
121 checksum: None,
122 })
123 }
124
125 async fn complete_part(
126 &self,
127 upload_id: &str,
128 parts: &[oio::MultipartPart],
129 ) -> Result<Metadata> {
130 let parts = parts
131 .iter()
132 .map(|p| CompleteMultipartUploadRequestPart {
133 part_number: p.part_number,
134 etag: p.etag.clone(),
135 })
136 .collect();
137
138 let resp = self
139 .core
140 .gcs_complete_multipart_upload(&self.path, upload_id, parts)
141 .await?;
142
143 if !resp.status().is_success() {
144 return Err(parse_error(resp));
145 }
146 Ok(Metadata::default())
150 }
151
152 async fn abort_part(&self, upload_id: &str) -> Result<()> {
153 let resp = self
154 .core
155 .gcs_abort_multipart_upload(&self.path, upload_id)
156 .await?;
157 match resp.status() {
158 StatusCode::NO_CONTENT => Ok(()),
160 _ => Err(parse_error(resp)),
161 }
162 }
163}