opendal/services/azblob/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use http::StatusCode;
21use uuid::Uuid;
22
23use super::core::constants::X_MS_VERSION_ID;
24use super::core::AzblobCore;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::*;
28
29const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";
30
31pub type AzblobWriters = TwoWays<oio::BlockWriter<AzblobWriter>, oio::AppendWriter<AzblobWriter>>;
32
33pub struct AzblobWriter {
34    core: Arc<AzblobCore>,
35
36    op: OpWrite,
37    path: String,
38}
39
40impl AzblobWriter {
41    pub fn new(core: Arc<AzblobCore>, op: OpWrite, path: String) -> Self {
42        AzblobWriter { core, op, path }
43    }
44
45    // skip extracting `content-md5` here, as it pertains to the content of the request rather than
46    // the content of the block itself for the `append` and `complete put block list` operations.
47    fn parse_metadata(headers: &http::HeaderMap) -> Result<Metadata> {
48        let mut metadata = Metadata::default();
49
50        if let Some(last_modified) = parse_last_modified(headers)? {
51            metadata.set_last_modified(last_modified);
52        }
53        let etag = parse_etag(headers)?;
54        if let Some(etag) = etag {
55            metadata.set_etag(etag);
56        }
57        let version_id = parse_header_to_str(headers, X_MS_VERSION_ID)?;
58        if let Some(version_id) = version_id {
59            metadata.set_version(version_id);
60        }
61
62        Ok(metadata)
63    }
64}
65
66impl oio::AppendWrite for AzblobWriter {
67    async fn offset(&self) -> Result<u64> {
68        let resp = self
69            .core
70            .azblob_get_blob_properties(&self.path, &OpStat::default())
71            .await?;
72
73        let status = resp.status();
74
75        match status {
76            StatusCode::OK => {
77                let headers = resp.headers();
78                let blob_type = headers.get(X_MS_BLOB_TYPE).and_then(|v| v.to_str().ok());
79                if blob_type != Some("AppendBlob") {
80                    return Err(Error::new(
81                        ErrorKind::ConditionNotMatch,
82                        "the blob is not an appendable blob.",
83                    ));
84                }
85
86                Ok(parse_content_length(headers)?.unwrap_or_default())
87            }
88            StatusCode::NOT_FOUND => {
89                let resp = self
90                    .core
91                    .azblob_init_appendable_blob(&self.path, &self.op)
92                    .await?;
93
94                let status = resp.status();
95                match status {
96                    StatusCode::CREATED => {
97                        // do nothing
98                    }
99                    _ => {
100                        return Err(parse_error(resp));
101                    }
102                }
103                Ok(0)
104            }
105            _ => Err(parse_error(resp)),
106        }
107    }
108
109    async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
110        let resp = self
111            .core
112            .azblob_append_blob(&self.path, offset, size, body)
113            .await?;
114
115        let meta = AzblobWriter::parse_metadata(resp.headers())?;
116        let status = resp.status();
117        match status {
118            StatusCode::CREATED => Ok(meta),
119            _ => Err(parse_error(resp)),
120        }
121    }
122}
123
124impl oio::BlockWrite for AzblobWriter {
125    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
126        let resp = self
127            .core
128            .azblob_put_blob(&self.path, Some(size), &self.op, body)
129            .await?;
130
131        let status = resp.status();
132
133        let mut meta = AzblobWriter::parse_metadata(resp.headers())?;
134        let md5 = parse_content_md5(resp.headers())?;
135        if let Some(md5) = md5 {
136            meta.set_content_md5(md5);
137        }
138        match status {
139            StatusCode::CREATED | StatusCode::OK => Ok(meta),
140            _ => Err(parse_error(resp)),
141        }
142    }
143
144    async fn write_block(&self, block_id: Uuid, size: u64, body: Buffer) -> Result<()> {
145        let resp = self
146            .core
147            .azblob_put_block(&self.path, block_id, Some(size), &self.op, body)
148            .await?;
149
150        let status = resp.status();
151        match status {
152            StatusCode::CREATED | StatusCode::OK => Ok(()),
153            _ => Err(parse_error(resp)),
154        }
155    }
156
157    async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<Metadata> {
158        let resp = self
159            .core
160            .azblob_complete_put_block_list(&self.path, block_ids, &self.op)
161            .await?;
162
163        let meta = AzblobWriter::parse_metadata(resp.headers())?;
164        let status = resp.status();
165        match status {
166            StatusCode::CREATED | StatusCode::OK => Ok(meta),
167            _ => Err(parse_error(resp)),
168        }
169    }
170
171    async fn abort_block(&self, _block_ids: Vec<Uuid>) -> Result<()> {
172        // refer to https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id
173        // Any uncommitted blocks are garbage collected if there are no successful calls to Put Block or Put Block List on the blob within a week.
174        // If Put Blob is called on the blob, any uncommitted blocks are garbage collected.
175        Ok(())
176    }
177}