opendal_core/services/ghac/
core.rs1use std::env;
19use std::fmt::Debug;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use ::ghac::v1 as ghac_types;
24use bytes::Buf;
25use bytes::Bytes;
26use http::Request;
27use http::Response;
28use http::StatusCode;
29use http::Uri;
30use http::header::ACCEPT;
31use http::header::AUTHORIZATION;
32use http::header::CONTENT_LENGTH;
33use http::header::CONTENT_RANGE;
34use http::header::CONTENT_TYPE;
35use http::header::{self};
36use prost::Message;
37use serde::Deserialize;
38use serde::Serialize;
39
40use super::error::parse_error;
41use crate::raw::*;
42use crate::*;
43
44pub const CACHE_URL_BASE: &str = "_apis/artifactcache";
46pub const CACHE_URL_BASE_V2: &str = "twirp/github.actions.results.api.v1.CacheService";
48pub const CACHE_HEADER_ACCEPT: &str = "application/json;api-version=6.0-preview.1";
50pub const ACTIONS_CACHE_URL: &str = "ACTIONS_CACHE_URL";
54pub const ACTIONS_RUNTIME_TOKEN: &str = "ACTIONS_RUNTIME_TOKEN";
59pub const ACTIONS_CACHE_SERVICE_V2: &str = "ACTIONS_CACHE_SERVICE_V2";
61pub const ACTIONS_RESULTS_URL: &str = "ACTIONS_RESULTS_URL";
63pub const CONTENT_TYPE_JSON: &str = "application/json";
65pub const CONTENT_TYPE_PROTOBUF: &str = "application/protobuf";
67
68#[derive(Clone, Copy, Debug)]
70pub enum GhacVersion {
71 V1,
72 V2,
73}
74
75#[derive(Clone)]
77pub struct GhacCore {
78 pub info: Arc<AccessorInfo>,
79
80 pub root: String,
82
83 pub cache_url: String,
84 pub catch_token: String,
85 pub version: String,
86
87 pub service_version: GhacVersion,
88}
89
90impl Debug for GhacCore {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.debug_struct("GhacCore")
93 .field("root", &self.root)
94 .field("cache_url", &self.cache_url)
95 .field("version", &self.version)
96 .field("service_version", &self.service_version)
97 .finish_non_exhaustive()
98 }
99}
100
101impl GhacCore {
102 async fn ghac_get_download_url(&self, path: &str) -> Result<String> {
103 let p = build_abs_path(&self.root, path);
104
105 match self.service_version {
106 GhacVersion::V1 => {
107 let url = format!(
108 "{}{CACHE_URL_BASE}/cache?keys={}&version={}",
109 self.cache_url,
110 percent_encode_path(&p),
111 self.version
112 );
113
114 let mut req = Request::get(&url);
115 req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
116 req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
117
118 let req = req
119 .extension(Operation::Read)
120 .body(Buffer::new())
121 .map_err(new_request_build_error)?;
122 let resp = self.info.http_client().send(req).await?;
123 let location = if resp.status() == StatusCode::OK {
124 let slc = resp.into_body();
125 let query_resp: GhacQueryResponse = serde_json::from_reader(slc.reader())
126 .map_err(new_json_deserialize_error)?;
127 query_resp.archive_location
128 } else {
129 return Err(parse_error(resp));
130 };
131 Ok(location)
132 }
133 GhacVersion::V2 => {
134 let url = format!(
135 "{}{CACHE_URL_BASE_V2}/GetCacheEntryDownloadURL",
136 self.cache_url,
137 );
138
139 let req = ghac_types::GetCacheEntryDownloadUrlRequest {
140 key: p,
141 version: self.version.clone(),
142
143 metadata: None,
144 restore_keys: vec![],
145 };
146
147 let body = Buffer::from(req.encode_to_vec());
148
149 let req = Request::post(&url)
150 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
151 .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
152 .header(CONTENT_LENGTH, body.len())
153 .extension(Operation::Read)
154 .body(body)
155 .map_err(new_request_build_error)?;
156 let resp = self.info.http_client().send(req).await?;
157 let location = if resp.status() == StatusCode::OK {
158 let slc = resp.into_body();
159 let query_resp = ghac_types::GetCacheEntryDownloadUrlResponse::decode(slc)
160 .map_err(new_prost_decode_error)?;
161 if !query_resp.ok {
162 let mut err = Error::new(
163 ErrorKind::NotFound,
164 "GetCacheEntryDownloadURL returns non-ok, the key doesn't exist",
165 );
166
167 if env::var("OPENDAL_TEST") == Ok("ghac".to_string()) {
174 err = err.set_temporary();
175 }
176 return Err(err);
177 }
178 query_resp.signed_download_url
179 } else {
180 return Err(parse_error(resp));
181 };
182
183 Ok(location)
184 }
185 }
186 }
187
188 pub async fn ghac_stat(&self, path: &str) -> Result<Response<Buffer>> {
189 let location = self.ghac_get_download_url(path).await?;
190
191 let req = Request::get(location)
192 .header(header::RANGE, "bytes=0-0")
193 .extension(Operation::Stat)
194 .body(Buffer::new())
195 .map_err(new_request_build_error)?;
196
197 self.info.http_client().send(req).await
198 }
199
200 pub async fn ghac_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
201 let location = self.ghac_get_download_url(path).await?;
202
203 let mut req = Request::get(location);
204
205 if !range.is_full() {
206 req = req.header(header::RANGE, range.to_header());
207 }
208 let req = req
209 .extension(Operation::Read)
210 .body(Buffer::new())
211 .map_err(new_request_build_error)?;
212
213 self.info.http_client().fetch(req).await
214 }
215
216 pub async fn ghac_get_upload_url(&self, path: &str) -> Result<String> {
217 let p = build_abs_path(&self.root, path);
218
219 match self.service_version {
220 GhacVersion::V1 => {
221 let url = format!("{}{CACHE_URL_BASE}/caches", self.cache_url);
222
223 let bs = serde_json::to_vec(&GhacReserveRequest {
224 key: p,
225 version: self.version.to_string(),
226 })
227 .map_err(new_json_serialize_error)?;
228
229 let mut req = Request::post(&url);
230 req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
231 req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
232 req = req.header(CONTENT_TYPE, CONTENT_TYPE_JSON);
233 req = req.header(CONTENT_LENGTH, bs.len());
234
235 let req = req
236 .extension(Operation::Write)
237 .body(Buffer::from(Bytes::from(bs)))
238 .map_err(new_request_build_error)?;
239 let resp = self.info.http_client().send(req).await?;
240 let cache_id = if resp.status().is_success() {
241 let slc = resp.into_body();
242 let reserve_resp: GhacReserveResponse = serde_json::from_reader(slc.reader())
243 .map_err(new_json_deserialize_error)?;
244 reserve_resp.cache_id
245 } else {
246 return Err(
247 parse_error(resp).map(|err| err.with_operation("Backend::ghac_reserve"))
248 );
249 };
250
251 let url = format!("{}{CACHE_URL_BASE}/caches/{cache_id}", self.cache_url);
252 Ok(url)
253 }
254 GhacVersion::V2 => {
255 let url = format!("{}{CACHE_URL_BASE_V2}/CreateCacheEntry", self.cache_url,);
256
257 let req = ghac_types::CreateCacheEntryRequest {
258 key: p,
259 version: self.version.clone(),
260 metadata: None,
261 };
262
263 let body = Buffer::from(req.encode_to_vec());
264
265 let req = Request::post(&url)
266 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
267 .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
268 .header(CONTENT_LENGTH, body.len())
269 .extension(Operation::Write)
270 .body(body)
271 .map_err(new_request_build_error)?;
272 let resp = self.info.http_client().send(req).await?;
273 let location = if resp.status() == StatusCode::OK {
274 let (parts, slc) = resp.into_parts();
275 let query_resp = ghac_types::CreateCacheEntryResponse::decode(slc)
276 .map_err(new_prost_decode_error)?;
277 if !query_resp.ok {
278 return Err(Error::new(
279 ErrorKind::Unexpected,
280 "create cache entry returns non-ok",
281 )
282 .with_context("parts", format!("{parts:?}")));
283 }
284 query_resp.signed_upload_url
285 } else {
286 return Err(parse_error(resp));
287 };
288 Ok(location)
289 }
290 }
291 }
292
293 pub async fn ghac_v1_write(
294 &self,
295 upload_url: &str,
296 size: u64,
297 offset: u64,
298 body: Buffer,
299 ) -> Result<Response<Buffer>> {
300 let mut req = Request::patch(upload_url);
301 req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
302 req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
303 req = req.header(CONTENT_LENGTH, size);
304 req = req.header(CONTENT_TYPE, "application/octet-stream");
305 req = req.header(
306 CONTENT_RANGE,
307 BytesContentRange::default()
308 .with_range(offset, offset + size - 1)
309 .to_header(),
310 );
311 let req = req
312 .extension(Operation::Write)
313 .body(body)
314 .map_err(new_request_build_error)?;
315
316 self.info.http_client().send(req).await
317 }
318
319 pub async fn ghac_finalize_upload(&self, path: &str, url: &str, size: u64) -> Result<()> {
320 let p = build_abs_path(&self.root, path);
321
322 match self.service_version {
323 GhacVersion::V1 => {
324 let bs = serde_json::to_vec(&GhacCommitRequest { size })
325 .map_err(new_json_serialize_error)?;
326
327 let req = Request::post(url)
328 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
329 .header(ACCEPT, CACHE_HEADER_ACCEPT)
330 .header(CONTENT_TYPE, CONTENT_TYPE_JSON)
331 .header(CONTENT_LENGTH, bs.len())
332 .extension(Operation::Write)
333 .body(Buffer::from(bs))
334 .map_err(new_request_build_error)?;
335 let resp = self.info.http_client().send(req).await?;
336 if resp.status().is_success() {
337 Ok(())
338 } else {
339 Err(parse_error(resp))
340 }
341 }
342 GhacVersion::V2 => {
343 let url = format!(
344 "{}{CACHE_URL_BASE_V2}/FinalizeCacheEntryUpload",
345 self.cache_url,
346 );
347
348 let req = ghac_types::FinalizeCacheEntryUploadRequest {
349 key: p,
350 version: self.version.clone(),
351 size_bytes: size as i64,
352
353 metadata: None,
354 };
355 let body = Buffer::from(req.encode_to_vec());
356
357 let req = Request::post(&url)
358 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
359 .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
360 .header(CONTENT_LENGTH, body.len())
361 .extension(Operation::Write)
362 .body(body)
363 .map_err(new_request_build_error)?;
364 let resp = self.info.http_client().send(req).await?;
365 if resp.status() != StatusCode::OK {
366 return Err(parse_error(resp));
367 };
368 Ok(())
369 }
370 }
371 }
372}
373
374pub fn is_ghes() -> bool {
378 let server_url =
380 env::var("GITHUB_SERVER_URL").unwrap_or_else(|_| "https://github.com".to_string());
381
382 let Ok(url) = Uri::from_str(&server_url) else {
383 return false;
385 };
386
387 let hostname = url.host().unwrap_or("").trim_end().to_lowercase();
389
390 let is_github_host = hostname == "github.com";
391 let is_ghe_host = hostname.ends_with(".ghe.com");
392 let is_localhost = hostname.ends_with(".localhost");
393
394 !is_github_host && !is_ghe_host && !is_localhost
395}
396
397pub fn get_cache_service_version() -> GhacVersion {
399 if is_ghes() {
400 GhacVersion::V1
402 } else {
403 let value = env::var(ACTIONS_CACHE_SERVICE_V2).unwrap_or_default();
405 if value.is_empty() {
406 GhacVersion::V1
407 } else {
408 GhacVersion::V2
409 }
410 }
411}
412
413pub fn get_cache_service_url(version: GhacVersion) -> String {
415 match version {
416 GhacVersion::V1 => {
417 env::var(ACTIONS_CACHE_URL)
419 .or_else(|_| env::var(ACTIONS_RESULTS_URL))
420 .unwrap_or_default()
421 }
422 GhacVersion::V2 => {
423 env::var(ACTIONS_RESULTS_URL).unwrap_or_default()
425 }
426 }
427}
428
429pub fn new_prost_decode_error(e: prost::DecodeError) -> Error {
431 Error::new(ErrorKind::Unexpected, "deserialize protobuf").set_source(e)
432}
433
434#[derive(Deserialize)]
435#[serde(rename_all = "camelCase")]
436pub struct GhacQueryResponse {
437 pub archive_location: String,
441}
442
443#[derive(Serialize)]
444pub struct GhacReserveRequest {
445 pub key: String,
446 pub version: String,
447}
448
449#[derive(Deserialize)]
450#[serde(rename_all = "camelCase")]
451pub struct GhacReserveResponse {
452 pub cache_id: i64,
453}
454
455#[derive(Serialize)]
456pub struct GhacCommitRequest {
457 pub size: u64,
458}