1use super::core::*;
19use super::error::parse_error;
20use crate::raw::*;
21use crate::services::core::AzblobCore;
22use crate::services::writer::AzblobWriter;
23use crate::*;
24use http::header::{ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE};
25use http::Request;
26use std::str::FromStr;
27use std::sync::Arc;
28
29pub type GhacWriter = TwoWays<GhacWriterV1, GhacWriterV2>;
30
31impl GhacWriter {
32 pub fn new(core: Arc<GhacCore>, write_path: String, url: String) -> Result<Self> {
34 match core.service_version {
35 GhacVersion::V1 => Ok(TwoWays::One(GhacWriterV1 {
36 core,
37 path: write_path,
38 url,
39 size: 0,
40 })),
41 GhacVersion::V2 => {
42 let uri = http::Uri::from_str(&url)
43 .map_err(new_http_uri_invalid_error)?
44 .into_parts();
45 let (Some(scheme), Some(authority), Some(pq)) =
46 (uri.scheme, uri.authority, uri.path_and_query)
47 else {
48 return Err(Error::new(
49 ErrorKind::Unexpected,
50 "ghac returns invalid signed url",
51 )
52 .with_context("url", &url));
53 };
54 let endpoint = format!("{scheme}://{authority}");
55 let Some((container, path)) = pq.path().trim_matches('/').split_once("/") else {
56 return Err(Error::new(
57 ErrorKind::Unexpected,
58 "ghac returns invalid signed url that bucket or path is missing",
59 )
60 .with_context("url", &url));
61 };
62 let Some(query) = pq.query() else {
63 return Err(Error::new(
64 ErrorKind::Unexpected,
65 "ghac returns invalid signed url that sas is missing",
66 )
67 .with_context("url", &url));
68 };
69 let azure_core = Arc::new(AzblobCore {
70 info: {
71 let am = AccessorInfo::default();
72 am.set_scheme(Scheme::Azblob)
73 .set_root("/")
74 .set_name(container)
75 .set_native_capability(Capability {
76 stat: true,
77 stat_with_if_match: true,
78 stat_with_if_none_match: true,
79 stat_has_cache_control: true,
80 stat_has_content_length: true,
81 stat_has_content_type: true,
82 stat_has_content_encoding: true,
83 stat_has_content_range: true,
84 stat_has_etag: true,
85 stat_has_content_md5: true,
86 stat_has_last_modified: true,
87 stat_has_content_disposition: true,
88
89 read: true,
90
91 read_with_if_match: true,
92 read_with_if_none_match: true,
93 read_with_override_content_disposition: true,
94 read_with_if_modified_since: true,
95 read_with_if_unmodified_since: true,
96
97 write: true,
98 write_can_append: true,
99 write_can_empty: true,
100 write_can_multi: true,
101 write_with_cache_control: true,
102 write_with_content_type: true,
103 write_with_if_not_exists: true,
104 write_with_if_none_match: true,
105 write_with_user_metadata: true,
106
107 copy: true,
108
109 list: true,
110 list_with_recursive: true,
111 list_has_etag: true,
112 list_has_content_length: true,
113 list_has_content_md5: true,
114 list_has_content_type: true,
115 list_has_last_modified: true,
116
117 shared: true,
118
119 ..Default::default()
120 });
121
122 am.into()
123 },
124 container: container.to_string(),
125 root: "/".to_string(),
126 endpoint,
127 encryption_key: None,
128 encryption_key_sha256: None,
129 encryption_algorithm: None,
130 loader: {
131 let config = reqsign::AzureStorageConfig {
132 sas_token: Some(query.to_string()),
133 ..Default::default()
134 };
135 reqsign::AzureStorageLoader::new(config)
136 },
137 signer: { reqsign::AzureStorageSigner::new() },
138 });
139 let w = AzblobWriter::new(azure_core, OpWrite::default(), path.to_string());
140 let writer = oio::BlockWriter::new(core.info.clone(), w, 4);
141 Ok(TwoWays::Two(GhacWriterV2 {
142 core,
143 writer,
144 path: write_path,
145 url,
146 size: 0,
147 }))
148 }
149 }
150 }
151}
152
153pub struct GhacWriterV1 {
154 core: Arc<GhacCore>,
155
156 path: String,
157 url: String,
158 size: u64,
159}
160
161impl oio::Write for GhacWriterV1 {
162 async fn write(&mut self, bs: Buffer) -> Result<()> {
163 let size = bs.len() as u64;
164 let offset = self.size;
165
166 let mut req = Request::patch(&self.url);
167 req = req.header(AUTHORIZATION, format!("Bearer {}", self.core.catch_token));
168 req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
169 req = req.header(CONTENT_LENGTH, size);
170 req = req.header(CONTENT_TYPE, "application/octet-stream");
171 req = req.header(
172 CONTENT_RANGE,
173 BytesContentRange::default()
174 .with_range(offset, offset + size - 1)
175 .to_header(),
176 );
177 let req = req.body(bs).map_err(new_request_build_error)?;
178
179 let resp = self.core.info.http_client().send(req).await?;
180 if !resp.status().is_success() {
181 return Err(parse_error(resp).map(|err| err.with_operation("Backend::ghac_upload")));
182 }
183 self.size += size;
184 Ok(())
185 }
186
187 async fn abort(&mut self) -> Result<()> {
188 Ok(())
189 }
190
191 async fn close(&mut self) -> Result<Metadata> {
192 self.core
193 .ghac_finalize_upload(&self.path, &self.url, self.size)
194 .await?;
195 Ok(Metadata::default().with_content_length(self.size))
196 }
197}
198
199pub struct GhacWriterV2 {
200 core: Arc<GhacCore>,
201 writer: oio::BlockWriter<AzblobWriter>,
202
203 path: String,
204 url: String,
205 size: u64,
206}
207
208impl oio::Write for GhacWriterV2 {
209 async fn write(&mut self, bs: Buffer) -> Result<()> {
210 let size = bs.len() as u64;
211
212 self.writer.write(bs).await?;
213 self.size += size;
214 Ok(())
215 }
216
217 async fn close(&mut self) -> Result<Metadata> {
218 self.writer.close().await?;
219 let _ = self
220 .core
221 .ghac_finalize_upload(&self.path, &self.url, self.size)
222 .await;
223 Ok(Metadata::default().with_content_length(self.size))
224 }
225
226 async fn abort(&mut self) -> Result<()> {
227 Ok(())
228 }
229}