opendal/services/ghac/
backend.rs1use std::env;
19use std::sync::Arc;
20
21use super::core::*;
22use super::error::parse_error;
23use super::writer::GhacWriter;
24use crate::raw::*;
25use crate::services::ghac::core::GhacCore;
26use crate::services::GhacConfig;
27use crate::*;
28use http::header;
29use http::Request;
30use http::Response;
31use http::StatusCode;
32use log::debug;
33use sha2::Digest;
34
35fn value_or_env(
36 explicit_value: Option<String>,
37 env_var_name: &str,
38 operation: &'static str,
39) -> Result<String> {
40 if let Some(value) = explicit_value {
41 return Ok(value);
42 }
43
44 env::var(env_var_name).map_err(|err| {
45 let text = format!(
46 "{} not found, maybe not in github action environment?",
47 env_var_name
48 );
49 Error::new(ErrorKind::ConfigInvalid, text)
50 .with_operation(operation)
51 .set_source(err)
52 })
53}
54
55impl Configurator for GhacConfig {
56 type Builder = GhacBuilder;
57
58 #[allow(deprecated)]
59 fn into_builder(self) -> Self::Builder {
60 GhacBuilder {
61 config: self,
62 http_client: None,
63 }
64 }
65}
66
67#[doc = include_str!("docs.md")]
69#[derive(Debug, Default)]
70pub struct GhacBuilder {
71 config: GhacConfig,
72
73 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
74 http_client: Option<HttpClient>,
75}
76
77impl GhacBuilder {
78 pub fn root(mut self, root: &str) -> Self {
80 self.config.root = if root.is_empty() {
81 None
82 } else {
83 Some(root.to_string())
84 };
85
86 self
87 }
88
89 pub fn version(mut self, version: &str) -> Self {
96 if !version.is_empty() {
97 self.config.version = Some(version.to_string())
98 }
99
100 self
101 }
102
103 pub fn endpoint(mut self, endpoint: &str) -> Self {
109 if !endpoint.is_empty() {
110 self.config.endpoint = Some(endpoint.to_string())
111 }
112 self
113 }
114
115 pub fn runtime_token(mut self, runtime_token: &str) -> Self {
122 if !runtime_token.is_empty() {
123 self.config.runtime_token = Some(runtime_token.to_string())
124 }
125 self
126 }
127
128 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
135 #[allow(deprecated)]
136 pub fn http_client(mut self, client: HttpClient) -> Self {
137 self.http_client = Some(client);
138 self
139 }
140}
141
142impl Builder for GhacBuilder {
143 const SCHEME: Scheme = Scheme::Ghac;
144 type Config = GhacConfig;
145
146 fn build(self) -> Result<impl Access> {
147 debug!("backend build started: {:?}", self);
148
149 let root = normalize_root(&self.config.root.unwrap_or_default());
150 debug!("backend use root {}", root);
151
152 let service_version = get_cache_service_version();
153 debug!("backend use service version {:?}", service_version);
154
155 let mut version = self
156 .config
157 .version
158 .clone()
159 .unwrap_or_else(|| "opendal".to_string());
160 debug!("backend use version {version}");
161 if matches!(service_version, GhacVersion::V2) {
163 let hash = sha2::Sha256::digest(&version);
164 version = format!("{:x}", hash);
165 }
166
167 let cache_url = self
168 .config
169 .endpoint
170 .unwrap_or_else(|| get_cache_service_url(service_version));
171 if cache_url.is_empty() {
172 return Err(Error::new(
173 ErrorKind::ConfigInvalid,
174 "cache url for ghac not found, maybe not in github action environment?".to_string(),
175 ));
176 }
177
178 let core = GhacCore {
179 info: {
180 let am = AccessorInfo::default();
181 am.set_scheme(Scheme::Ghac)
182 .set_root(&root)
183 .set_name(&version)
184 .set_native_capability(Capability {
185 stat: true,
186 stat_has_cache_control: true,
187 stat_has_content_length: true,
188 stat_has_content_type: true,
189 stat_has_content_encoding: true,
190 stat_has_content_range: true,
191 stat_has_etag: true,
192 stat_has_content_md5: true,
193 stat_has_last_modified: true,
194 stat_has_content_disposition: true,
195
196 read: true,
197
198 write: true,
199 write_can_multi: true,
200
201 shared: true,
202
203 ..Default::default()
204 });
205
206 #[allow(deprecated)]
208 if let Some(client) = self.http_client {
209 am.update_http_client(|_| client);
210 }
211
212 am.into()
213 },
214 root,
215
216 cache_url,
217 catch_token: value_or_env(
218 self.config.runtime_token,
219 ACTIONS_RUNTIME_TOKEN,
220 "Builder::build",
221 )?,
222 version,
223
224 service_version,
225 };
226
227 Ok(GhacBackend {
228 core: Arc::new(core),
229 })
230 }
231}
232
233#[derive(Debug, Clone)]
235pub struct GhacBackend {
236 core: Arc<GhacCore>,
237}
238
239impl Access for GhacBackend {
240 type Reader = HttpBody;
241 type Writer = GhacWriter;
242 type Lister = ();
243 type Deleter = ();
244 type BlockingReader = ();
245 type BlockingWriter = ();
246 type BlockingLister = ();
247 type BlockingDeleter = ();
248
249 fn info(&self) -> Arc<AccessorInfo> {
250 self.core.info.clone()
251 }
252
253 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
259 let location = self.core.ghac_get_download_url(path).await?;
260
261 let req = Request::get(location)
262 .header(header::RANGE, "bytes=0-0")
263 .body(Buffer::new())
264 .map_err(new_request_build_error)?;
265 let resp = self.core.info.http_client().send(req).await?;
266
267 let status = resp.status();
268 match status {
269 StatusCode::OK | StatusCode::PARTIAL_CONTENT | StatusCode::RANGE_NOT_SATISFIABLE => {
270 let mut meta = parse_into_metadata(path, resp.headers())?;
271 meta.set_content_length(
273 meta.content_range()
274 .expect("content range must be valid")
275 .size()
276 .expect("content range must contains size"),
277 );
278
279 Ok(RpStat::new(meta))
280 }
281 _ => Err(parse_error(resp)),
282 }
283 }
284
285 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
286 let location = self.core.ghac_get_download_url(path).await?;
287
288 let mut req = Request::get(location);
289
290 if !args.range().is_full() {
291 req = req.header(header::RANGE, args.range().to_header());
292 }
293 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
294
295 let resp = self.core.info.http_client().fetch(req).await?;
296
297 let status = resp.status();
298 match status {
299 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
300 Ok((RpRead::default(), resp.into_body()))
301 }
302 _ => {
303 let (part, mut body) = resp.into_parts();
304 let buf = body.to_buffer().await?;
305 Err(parse_error(Response::from_parts(part, buf)))
306 }
307 }
308 }
309
310 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
311 let url = self.core.ghac_get_upload_url(path).await?;
312
313 Ok((
314 RpWrite::default(),
315 GhacWriter::new(self.core.clone(), path.to_string(), url)?,
316 ))
317 }
318}