1use super::core::*;
19use super::error::parse_error;
20use crate::raw::*;
21use crate::services::core::AzblobCore;
22use crate::services::writer::AzblobWriter;
23use crate::*;
24use std::str::FromStr;
25use std::sync::Arc;
26
27pub type GhacWriter = TwoWays<GhacWriterV1, GhacWriterV2>;
28
29impl GhacWriter {
30 pub fn new(core: Arc<GhacCore>, write_path: String, url: String) -> Result<Self> {
32 match core.service_version {
33 GhacVersion::V1 => Ok(TwoWays::One(GhacWriterV1 {
34 core,
35 path: write_path,
36 url,
37 size: 0,
38 })),
39 GhacVersion::V2 => {
40 let uri = http::Uri::from_str(&url)
41 .map_err(new_http_uri_invalid_error)?
42 .into_parts();
43 let (Some(scheme), Some(authority), Some(pq)) =
44 (uri.scheme, uri.authority, uri.path_and_query)
45 else {
46 return Err(Error::new(
47 ErrorKind::Unexpected,
48 "ghac returns invalid signed url",
49 )
50 .with_context("url", &url));
51 };
52 let endpoint = format!("{scheme}://{authority}");
53 let Some((container, path)) = pq.path().trim_matches('/').split_once("/") else {
54 return Err(Error::new(
55 ErrorKind::Unexpected,
56 "ghac returns invalid signed url that bucket or path is missing",
57 )
58 .with_context("url", &url));
59 };
60 let Some(query) = pq.query() else {
61 return Err(Error::new(
62 ErrorKind::Unexpected,
63 "ghac returns invalid signed url that sas is missing",
64 )
65 .with_context("url", &url));
66 };
67 let azure_core = Arc::new(AzblobCore {
68 info: {
69 let am = AccessorInfo::default();
70 am.set_scheme(Scheme::Azblob)
71 .set_root("/")
72 .set_name(container)
73 .set_native_capability(Capability {
74 stat: true,
75 stat_with_if_match: true,
76 stat_with_if_none_match: true,
77 stat_has_cache_control: true,
78 stat_has_content_length: true,
79 stat_has_content_type: true,
80 stat_has_content_encoding: true,
81 stat_has_content_range: true,
82 stat_has_etag: true,
83 stat_has_content_md5: true,
84 stat_has_last_modified: true,
85 stat_has_content_disposition: true,
86
87 read: true,
88
89 read_with_if_match: true,
90 read_with_if_none_match: true,
91 read_with_override_content_disposition: true,
92 read_with_if_modified_since: true,
93 read_with_if_unmodified_since: true,
94
95 write: true,
96 write_can_append: true,
97 write_can_empty: true,
98 write_can_multi: true,
99 write_with_cache_control: true,
100 write_with_content_type: true,
101 write_with_if_not_exists: true,
102 write_with_if_none_match: true,
103 write_with_user_metadata: true,
104
105 copy: true,
106
107 list: true,
108 list_with_recursive: true,
109 list_has_etag: true,
110 list_has_content_length: true,
111 list_has_content_md5: true,
112 list_has_content_type: true,
113 list_has_last_modified: true,
114
115 shared: true,
116
117 ..Default::default()
118 });
119
120 am.into()
121 },
122 container: container.to_string(),
123 root: "/".to_string(),
124 endpoint,
125 encryption_key: None,
126 encryption_key_sha256: None,
127 encryption_algorithm: None,
128 loader: {
129 let config = reqsign::AzureStorageConfig {
130 sas_token: Some(query.to_string()),
131 ..Default::default()
132 };
133 reqsign::AzureStorageLoader::new(config)
134 },
135 signer: { reqsign::AzureStorageSigner::new() },
136 });
137 let w = AzblobWriter::new(azure_core, OpWrite::default(), path.to_string());
138 let writer = oio::BlockWriter::new(core.info.clone(), w, 4);
139 Ok(TwoWays::Two(GhacWriterV2 {
140 core,
141 writer,
142 path: write_path,
143 url,
144 size: 0,
145 }))
146 }
147 }
148 }
149}
150
151pub struct GhacWriterV1 {
152 core: Arc<GhacCore>,
153
154 path: String,
155 url: String,
156 size: u64,
157}
158
159impl oio::Write for GhacWriterV1 {
160 async fn write(&mut self, bs: Buffer) -> Result<()> {
161 let size = bs.len() as u64;
162 let offset = self.size;
163
164 let resp = self.core.ghac_v1_write(&self.url, size, offset, bs).await?;
165 if !resp.status().is_success() {
166 return Err(parse_error(resp).map(|err| err.with_operation("Backend::ghac_upload")));
167 }
168 self.size += size;
169 Ok(())
170 }
171
172 async fn abort(&mut self) -> Result<()> {
173 Ok(())
174 }
175
176 async fn close(&mut self) -> Result<Metadata> {
177 self.core
178 .ghac_finalize_upload(&self.path, &self.url, self.size)
179 .await?;
180 Ok(Metadata::default().with_content_length(self.size))
181 }
182}
183
184pub struct GhacWriterV2 {
185 core: Arc<GhacCore>,
186 writer: oio::BlockWriter<AzblobWriter>,
187
188 path: String,
189 url: String,
190 size: u64,
191}
192
193impl oio::Write for GhacWriterV2 {
194 async fn write(&mut self, bs: Buffer) -> Result<()> {
195 let size = bs.len() as u64;
196
197 self.writer.write(bs).await?;
198 self.size += size;
199 Ok(())
200 }
201
202 async fn close(&mut self) -> Result<Metadata> {
203 self.writer.close().await?;
204 let _ = self
205 .core
206 .ghac_finalize_upload(&self.path, &self.url, self.size)
207 .await;
208 Ok(Metadata::default().with_content_length(self.size))
209 }
210
211 async fn abort(&mut self) -> Result<()> {
212 Ok(())
213 }
214}