1use std::str::FromStr;
19use std::sync::Arc;
20
21use super::core::*;
22use super::error::parse_error;
23use crate::raw::*;
24use crate::services::core::AzblobCore;
25use crate::services::writer::AzblobWriter;
26use crate::*;
27
28pub type GhacWriter = TwoWays<GhacWriterV1, GhacWriterV2>;
29
30impl GhacWriter {
31 pub fn new(core: Arc<GhacCore>, write_path: String, url: String) -> Result<Self> {
33 match core.service_version {
34 GhacVersion::V1 => Ok(TwoWays::One(GhacWriterV1 {
35 core,
36 path: write_path,
37 url,
38 size: 0,
39 })),
40 GhacVersion::V2 => {
41 let uri = http::Uri::from_str(&url)
42 .map_err(new_http_uri_invalid_error)?
43 .into_parts();
44 let (Some(scheme), Some(authority), Some(pq)) =
45 (uri.scheme, uri.authority, uri.path_and_query)
46 else {
47 return Err(Error::new(
48 ErrorKind::Unexpected,
49 "ghac returns invalid signed url",
50 )
51 .with_context("url", &url));
52 };
53 let endpoint = format!("{scheme}://{authority}");
54 let Some((container, path)) = pq.path().trim_matches('/').split_once("/") else {
55 return Err(Error::new(
56 ErrorKind::Unexpected,
57 "ghac returns invalid signed url that bucket or path is missing",
58 )
59 .with_context("url", &url));
60 };
61 let Some(query) = pq.query() else {
62 return Err(Error::new(
63 ErrorKind::Unexpected,
64 "ghac returns invalid signed url that sas is missing",
65 )
66 .with_context("url", &url));
67 };
68 let azure_core = Arc::new(AzblobCore {
69 info: {
70 let am = AccessorInfo::default();
71 am.set_scheme(Scheme::Azblob)
72 .set_root("/")
73 .set_name(container)
74 .set_native_capability(Capability {
75 stat: true,
76 stat_with_if_match: true,
77 stat_with_if_none_match: true,
78 stat_has_cache_control: true,
79 stat_has_content_length: true,
80 stat_has_content_type: true,
81 stat_has_content_encoding: true,
82 stat_has_content_range: true,
83 stat_has_etag: true,
84 stat_has_content_md5: true,
85 stat_has_last_modified: true,
86 stat_has_content_disposition: true,
87
88 read: true,
89
90 read_with_if_match: true,
91 read_with_if_none_match: true,
92 read_with_override_content_disposition: true,
93 read_with_if_modified_since: true,
94 read_with_if_unmodified_since: true,
95
96 write: true,
97 write_can_append: true,
98 write_can_empty: true,
99 write_can_multi: true,
100 write_with_cache_control: true,
101 write_with_content_type: true,
102 write_with_if_not_exists: true,
103 write_with_if_none_match: true,
104 write_with_user_metadata: true,
105
106 copy: true,
107
108 list: true,
109 list_with_recursive: true,
110 list_has_etag: true,
111 list_has_content_length: true,
112 list_has_content_md5: true,
113 list_has_content_type: true,
114 list_has_last_modified: true,
115
116 shared: true,
117
118 ..Default::default()
119 });
120
121 am.into()
122 },
123 container: container.to_string(),
124 root: "/".to_string(),
125 endpoint,
126 encryption_key: None,
127 encryption_key_sha256: None,
128 encryption_algorithm: None,
129 loader: {
130 let config = reqsign::AzureStorageConfig {
131 sas_token: Some(query.to_string()),
132 ..Default::default()
133 };
134 reqsign::AzureStorageLoader::new(config)
135 },
136 signer: { reqsign::AzureStorageSigner::new() },
137 });
138 let w = AzblobWriter::new(azure_core, OpWrite::default(), path.to_string());
139 let writer = oio::BlockWriter::new(core.info.clone(), w, 4);
140 Ok(TwoWays::Two(GhacWriterV2 {
141 core,
142 writer,
143 path: write_path,
144 url,
145 size: 0,
146 }))
147 }
148 }
149 }
150}
151
152pub struct GhacWriterV1 {
153 core: Arc<GhacCore>,
154
155 path: String,
156 url: String,
157 size: u64,
158}
159
160impl oio::Write for GhacWriterV1 {
161 async fn write(&mut self, bs: Buffer) -> Result<()> {
162 let size = bs.len() as u64;
163 let offset = self.size;
164
165 let resp = self.core.ghac_v1_write(&self.url, size, offset, bs).await?;
166 if !resp.status().is_success() {
167 return Err(parse_error(resp).map(|err| err.with_operation("Backend::ghac_upload")));
168 }
169 self.size += size;
170 Ok(())
171 }
172
173 async fn abort(&mut self) -> Result<()> {
174 Ok(())
175 }
176
177 async fn close(&mut self) -> Result<Metadata> {
178 self.core
179 .ghac_finalize_upload(&self.path, &self.url, self.size)
180 .await?;
181 Ok(Metadata::default().with_content_length(self.size))
182 }
183}
184
185pub struct GhacWriterV2 {
186 core: Arc<GhacCore>,
187 writer: oio::BlockWriter<AzblobWriter>,
188
189 path: String,
190 url: String,
191 size: u64,
192}
193
194impl oio::Write for GhacWriterV2 {
195 async fn write(&mut self, bs: Buffer) -> Result<()> {
196 let size = bs.len() as u64;
197
198 self.writer.write(bs).await?;
199 self.size += size;
200 Ok(())
201 }
202
203 async fn close(&mut self) -> Result<Metadata> {
204 self.writer.close().await?;
205 let _ = self
206 .core
207 .ghac_finalize_upload(&self.path, &self.url, self.size)
208 .await;
209 Ok(Metadata::default().with_content_length(self.size))
210 }
211
212 async fn abort(&mut self) -> Result<()> {
213 Ok(())
214 }
215}