opendal/services/ghac/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::core::*;
19use super::error::parse_error;
20use crate::raw::*;
21use crate::services::core::AzblobCore;
22use crate::services::writer::AzblobWriter;
23use crate::*;
24use http::header::{ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE};
25use http::Request;
26use std::str::FromStr;
27use std::sync::Arc;
28
29pub type GhacWriter = TwoWays<GhacWriterV1, GhacWriterV2>;
30
31impl GhacWriter {
32    /// TODO: maybe we can move the signed url logic to azblob service instead.
33    pub fn new(core: Arc<GhacCore>, write_path: String, url: String) -> Result<Self> {
34        match core.service_version {
35            GhacVersion::V1 => Ok(TwoWays::One(GhacWriterV1 {
36                core,
37                path: write_path,
38                url,
39                size: 0,
40            })),
41            GhacVersion::V2 => {
42                let uri = http::Uri::from_str(&url)
43                    .map_err(new_http_uri_invalid_error)?
44                    .into_parts();
45                let (Some(scheme), Some(authority), Some(pq)) =
46                    (uri.scheme, uri.authority, uri.path_and_query)
47                else {
48                    return Err(Error::new(
49                        ErrorKind::Unexpected,
50                        "ghac returns invalid signed url",
51                    )
52                    .with_context("url", &url));
53                };
54                let endpoint = format!("{scheme}://{authority}");
55                let Some((container, path)) = pq.path().trim_matches('/').split_once("/") else {
56                    return Err(Error::new(
57                        ErrorKind::Unexpected,
58                        "ghac returns invalid signed url that bucket or path is missing",
59                    )
60                    .with_context("url", &url));
61                };
62                let Some(query) = pq.query() else {
63                    return Err(Error::new(
64                        ErrorKind::Unexpected,
65                        "ghac returns invalid signed url that sas is missing",
66                    )
67                    .with_context("url", &url));
68                };
69                let azure_core = Arc::new(AzblobCore {
70                    info: {
71                        let am = AccessorInfo::default();
72                        am.set_scheme(Scheme::Azblob)
73                            .set_root("/")
74                            .set_name(container)
75                            .set_native_capability(Capability {
76                                stat: true,
77                                stat_with_if_match: true,
78                                stat_with_if_none_match: true,
79                                stat_has_cache_control: true,
80                                stat_has_content_length: true,
81                                stat_has_content_type: true,
82                                stat_has_content_encoding: true,
83                                stat_has_content_range: true,
84                                stat_has_etag: true,
85                                stat_has_content_md5: true,
86                                stat_has_last_modified: true,
87                                stat_has_content_disposition: true,
88
89                                read: true,
90
91                                read_with_if_match: true,
92                                read_with_if_none_match: true,
93                                read_with_override_content_disposition: true,
94                                read_with_if_modified_since: true,
95                                read_with_if_unmodified_since: true,
96
97                                write: true,
98                                write_can_append: true,
99                                write_can_empty: true,
100                                write_can_multi: true,
101                                write_with_cache_control: true,
102                                write_with_content_type: true,
103                                write_with_if_not_exists: true,
104                                write_with_if_none_match: true,
105                                write_with_user_metadata: true,
106
107                                copy: true,
108
109                                list: true,
110                                list_with_recursive: true,
111                                list_has_etag: true,
112                                list_has_content_length: true,
113                                list_has_content_md5: true,
114                                list_has_content_type: true,
115                                list_has_last_modified: true,
116
117                                shared: true,
118
119                                ..Default::default()
120                            });
121
122                        am.into()
123                    },
124                    container: container.to_string(),
125                    root: "/".to_string(),
126                    endpoint,
127                    encryption_key: None,
128                    encryption_key_sha256: None,
129                    encryption_algorithm: None,
130                    loader: {
131                        let config = reqsign::AzureStorageConfig {
132                            sas_token: Some(query.to_string()),
133                            ..Default::default()
134                        };
135                        reqsign::AzureStorageLoader::new(config)
136                    },
137                    signer: { reqsign::AzureStorageSigner::new() },
138                });
139                let w = AzblobWriter::new(azure_core, OpWrite::default(), path.to_string());
140                let writer = oio::BlockWriter::new(core.info.clone(), w, 4);
141                Ok(TwoWays::Two(GhacWriterV2 {
142                    core,
143                    writer,
144                    path: write_path,
145                    url,
146                    size: 0,
147                }))
148            }
149        }
150    }
151}
152
153pub struct GhacWriterV1 {
154    core: Arc<GhacCore>,
155
156    path: String,
157    url: String,
158    size: u64,
159}
160
161impl oio::Write for GhacWriterV1 {
162    async fn write(&mut self, bs: Buffer) -> Result<()> {
163        let size = bs.len() as u64;
164        let offset = self.size;
165
166        let mut req = Request::patch(&self.url);
167        req = req.header(AUTHORIZATION, format!("Bearer {}", self.core.catch_token));
168        req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
169        req = req.header(CONTENT_LENGTH, size);
170        req = req.header(CONTENT_TYPE, "application/octet-stream");
171        req = req.header(
172            CONTENT_RANGE,
173            BytesContentRange::default()
174                .with_range(offset, offset + size - 1)
175                .to_header(),
176        );
177        let req = req.body(bs).map_err(new_request_build_error)?;
178
179        let resp = self.core.info.http_client().send(req).await?;
180        if !resp.status().is_success() {
181            return Err(parse_error(resp).map(|err| err.with_operation("Backend::ghac_upload")));
182        }
183        self.size += size;
184        Ok(())
185    }
186
187    async fn abort(&mut self) -> Result<()> {
188        Ok(())
189    }
190
191    async fn close(&mut self) -> Result<Metadata> {
192        self.core
193            .ghac_finalize_upload(&self.path, &self.url, self.size)
194            .await?;
195        Ok(Metadata::default().with_content_length(self.size))
196    }
197}
198
199pub struct GhacWriterV2 {
200    core: Arc<GhacCore>,
201    writer: oio::BlockWriter<AzblobWriter>,
202
203    path: String,
204    url: String,
205    size: u64,
206}
207
208impl oio::Write for GhacWriterV2 {
209    async fn write(&mut self, bs: Buffer) -> Result<()> {
210        let size = bs.len() as u64;
211
212        self.writer.write(bs).await?;
213        self.size += size;
214        Ok(())
215    }
216
217    async fn close(&mut self) -> Result<Metadata> {
218        self.writer.close().await?;
219        let _ = self
220            .core
221            .ghac_finalize_upload(&self.path, &self.url, self.size)
222            .await;
223        Ok(Metadata::default().with_content_length(self.size))
224    }
225
226    async fn abort(&mut self) -> Result<()> {
227        Ok(())
228    }
229}