opendal/services/b2/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22
23use super::core::B2Core;
24use super::core::StartLargeFileResponse;
25use super::core::UploadPartResponse;
26use super::core::UploadResponse;
27use super::error::parse_error;
28use crate::raw::*;
29use crate::*;
30
31pub type B2Writers = oio::MultipartWriter<B2Writer>;
32
33pub struct B2Writer {
34    core: Arc<B2Core>,
35
36    op: OpWrite,
37    path: String,
38}
39
40impl B2Writer {
41    pub fn new(core: Arc<B2Core>, path: &str, op: OpWrite) -> Self {
42        B2Writer {
43            core,
44            path: path.to_string(),
45            op,
46        }
47    }
48
49    pub fn parse_body_into_meta(path: &str, resp: UploadResponse) -> Metadata {
50        let mut meta = Metadata::new(EntryMode::from_path(path));
51
52        if let Some(md5) = resp.content_md5 {
53            meta.set_content_md5(&md5);
54        }
55
56        if let Some(content_type) = resp.content_type {
57            meta.set_content_type(&content_type);
58        }
59
60        meta.set_content_length(resp.content_length);
61
62        meta
63    }
64}
65
66impl oio::MultipartWrite for B2Writer {
67    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
68        let resp = self
69            .core
70            .upload_file(&self.path, Some(size), &self.op, body)
71            .await?;
72
73        let status = resp.status();
74
75        match status {
76            StatusCode::OK => {
77                let bs = resp.into_body();
78
79                let result: UploadResponse =
80                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
81
82                let meta = Self::parse_body_into_meta(&self.path, result);
83
84                Ok(meta)
85            }
86            _ => Err(parse_error(resp)),
87        }
88    }
89
90    async fn initiate_part(&self) -> Result<String> {
91        let resp = self.core.start_large_file(&self.path, &self.op).await?;
92
93        let status = resp.status();
94
95        match status {
96            StatusCode::OK => {
97                let bs = resp.into_body();
98
99                let result: StartLargeFileResponse =
100                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
101
102                Ok(result.file_id)
103            }
104            _ => Err(parse_error(resp)),
105        }
106    }
107
108    async fn write_part(
109        &self,
110        upload_id: &str,
111        part_number: usize,
112        size: u64,
113        body: Buffer,
114    ) -> Result<oio::MultipartPart> {
115        // B2 requires part number must between [1..=10000]
116        let part_number = part_number + 1;
117
118        let resp = self
119            .core
120            .upload_part(upload_id, part_number, size, body)
121            .await?;
122
123        let status = resp.status();
124
125        match status {
126            StatusCode::OK => {
127                let bs = resp.into_body();
128
129                let result: UploadPartResponse =
130                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
131
132                Ok(oio::MultipartPart {
133                    etag: result.content_sha1,
134                    part_number,
135                    checksum: None,
136                })
137            }
138            _ => Err(parse_error(resp)),
139        }
140    }
141
142    async fn complete_part(
143        &self,
144        upload_id: &str,
145        parts: &[oio::MultipartPart],
146    ) -> Result<Metadata> {
147        let part_sha1_array = parts
148            .iter()
149            .map(|p| {
150                let binding = p.etag.clone();
151                let sha1 = binding.strip_prefix("unverified:");
152                let Some(sha1) = sha1 else {
153                    return "".to_string();
154                };
155                sha1.to_string()
156            })
157            .collect();
158
159        let resp = self
160            .core
161            .finish_large_file(upload_id, part_sha1_array)
162            .await?;
163
164        let status = resp.status();
165
166        match status {
167            StatusCode::OK => {
168                let bs = resp.into_body();
169
170                let result: UploadResponse =
171                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
172
173                let meta = Self::parse_body_into_meta(&self.path, result);
174
175                Ok(meta)
176            }
177            _ => Err(parse_error(resp)),
178        }
179    }
180
181    async fn abort_part(&self, upload_id: &str) -> Result<()> {
182        let resp = self.core.cancel_large_file(upload_id).await?;
183        match resp.status() {
184            // b2 returns code 200 if abort succeeds.
185            StatusCode::OK => Ok(()),
186            _ => Err(parse_error(resp)),
187        }
188    }
189}