1use std::env;
19use std::fmt::{Debug, Formatter};
20use std::str::FromStr;
21use std::sync::Arc;
22
23use ::ghac::v1 as ghac_types;
24use bytes::{Buf, Bytes};
25use http::header::{self, ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE};
26use http::{Request, Response, StatusCode, Uri};
27use prost::Message;
28use serde::{Deserialize, Serialize};
29
30use super::error::parse_error;
31use crate::raw::*;
32use crate::*;
33
34pub const CACHE_URL_BASE: &str = "_apis/artifactcache";
36pub const CACHE_URL_BASE_V2: &str = "twirp/github.actions.results.api.v1.CacheService";
38pub const CACHE_HEADER_ACCEPT: &str = "application/json;api-version=6.0-preview.1";
40pub const ACTIONS_CACHE_URL: &str = "ACTIONS_CACHE_URL";
44pub const ACTIONS_RUNTIME_TOKEN: &str = "ACTIONS_RUNTIME_TOKEN";
49pub const ACTIONS_CACHE_SERVICE_V2: &str = "ACTIONS_CACHE_SERVICE_V2";
51pub const ACTIONS_RESULTS_URL: &str = "ACTIONS_RESULTS_URL";
53pub const CONTENT_TYPE_JSON: &str = "application/json";
55pub const CONTENT_TYPE_PROTOBUF: &str = "application/protobuf";
57
58#[derive(Clone, Copy, Debug)]
60pub enum GhacVersion {
61 V1,
62 V2,
63}
64
65#[derive(Clone)]
67pub struct GhacCore {
68 pub info: Arc<AccessorInfo>,
69
70 pub root: String,
72
73 pub cache_url: String,
74 pub catch_token: String,
75 pub version: String,
76
77 pub service_version: GhacVersion,
78}
79
80impl Debug for GhacCore {
81 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
82 f.debug_struct("GhacCore")
83 .field("root", &self.root)
84 .field("cache_url", &self.cache_url)
85 .field("version", &self.version)
86 .field("service_version", &self.service_version)
87 .finish_non_exhaustive()
88 }
89}
90
91impl GhacCore {
92 async fn ghac_get_download_url(&self, path: &str) -> Result<String> {
93 let p = build_abs_path(&self.root, path);
94
95 match self.service_version {
96 GhacVersion::V1 => {
97 let url = format!(
98 "{}{CACHE_URL_BASE}/cache?keys={}&version={}",
99 self.cache_url,
100 percent_encode_path(&p),
101 self.version
102 );
103
104 let mut req = Request::get(&url);
105 req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
106 req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
107
108 let req = req
109 .extension(Operation::Read)
110 .body(Buffer::new())
111 .map_err(new_request_build_error)?;
112 let resp = self.info.http_client().send(req).await?;
113 let location = if resp.status() == StatusCode::OK {
114 let slc = resp.into_body();
115 let query_resp: GhacQueryResponse = serde_json::from_reader(slc.reader())
116 .map_err(new_json_deserialize_error)?;
117 query_resp.archive_location
118 } else {
119 return Err(parse_error(resp));
120 };
121 Ok(location)
122 }
123 GhacVersion::V2 => {
124 let url = format!(
125 "{}{CACHE_URL_BASE_V2}/GetCacheEntryDownloadURL",
126 self.cache_url,
127 );
128
129 let req = ghac_types::GetCacheEntryDownloadUrlRequest {
130 key: p,
131 version: self.version.clone(),
132
133 metadata: None,
134 restore_keys: vec![],
135 };
136
137 let body = Buffer::from(req.encode_to_vec());
138
139 let req = Request::post(&url)
140 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
141 .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
142 .header(CONTENT_LENGTH, body.len())
143 .extension(Operation::Read)
144 .body(body)
145 .map_err(new_request_build_error)?;
146 let resp = self.info.http_client().send(req).await?;
147 let location = if resp.status() == StatusCode::OK {
148 let slc = resp.into_body();
149 let query_resp = ghac_types::GetCacheEntryDownloadUrlResponse::decode(slc)
150 .map_err(new_prost_decode_error)?;
151 if !query_resp.ok {
152 let mut err = Error::new(
153 ErrorKind::NotFound,
154 "GetCacheEntryDownloadURL returns non-ok, the key doesn't exist",
155 );
156
157 if env::var("OPENDAL_TEST") == Ok("ghac".to_string()) {
164 err = err.set_temporary();
165 }
166 return Err(err);
167 }
168 query_resp.signed_download_url
169 } else {
170 return Err(parse_error(resp));
171 };
172
173 Ok(location)
174 }
175 }
176 }
177
178 pub async fn ghac_stat(&self, path: &str) -> Result<Response<Buffer>> {
179 let location = self.ghac_get_download_url(path).await?;
180
181 let req = Request::get(location)
182 .header(header::RANGE, "bytes=0-0")
183 .extension(Operation::Stat)
184 .body(Buffer::new())
185 .map_err(new_request_build_error)?;
186
187 self.info.http_client().send(req).await
188 }
189
190 pub async fn ghac_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
191 let location = self.ghac_get_download_url(path).await?;
192
193 let mut req = Request::get(location);
194
195 if !range.is_full() {
196 req = req.header(header::RANGE, range.to_header());
197 }
198 let req = req
199 .extension(Operation::Read)
200 .body(Buffer::new())
201 .map_err(new_request_build_error)?;
202
203 self.info.http_client().fetch(req).await
204 }
205
206 pub async fn ghac_get_upload_url(&self, path: &str) -> Result<String> {
207 let p = build_abs_path(&self.root, path);
208
209 match self.service_version {
210 GhacVersion::V1 => {
211 let url = format!("{}{CACHE_URL_BASE}/caches", self.cache_url);
212
213 let bs = serde_json::to_vec(&GhacReserveRequest {
214 key: p,
215 version: self.version.to_string(),
216 })
217 .map_err(new_json_serialize_error)?;
218
219 let mut req = Request::post(&url);
220 req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
221 req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
222 req = req.header(CONTENT_TYPE, CONTENT_TYPE_JSON);
223 req = req.header(CONTENT_LENGTH, bs.len());
224
225 let req = req
226 .extension(Operation::Write)
227 .body(Buffer::from(Bytes::from(bs)))
228 .map_err(new_request_build_error)?;
229 let resp = self.info.http_client().send(req).await?;
230 let cache_id = if resp.status().is_success() {
231 let slc = resp.into_body();
232 let reserve_resp: GhacReserveResponse = serde_json::from_reader(slc.reader())
233 .map_err(new_json_deserialize_error)?;
234 reserve_resp.cache_id
235 } else {
236 return Err(
237 parse_error(resp).map(|err| err.with_operation("Backend::ghac_reserve"))
238 );
239 };
240
241 let url = format!("{}{CACHE_URL_BASE}/caches/{cache_id}", self.cache_url);
242 Ok(url)
243 }
244 GhacVersion::V2 => {
245 let url = format!("{}{CACHE_URL_BASE_V2}/CreateCacheEntry", self.cache_url,);
246
247 let req = ghac_types::CreateCacheEntryRequest {
248 key: p,
249 version: self.version.clone(),
250 metadata: None,
251 };
252
253 let body = Buffer::from(req.encode_to_vec());
254
255 let req = Request::post(&url)
256 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
257 .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
258 .header(CONTENT_LENGTH, body.len())
259 .extension(Operation::Write)
260 .body(body)
261 .map_err(new_request_build_error)?;
262 let resp = self.info.http_client().send(req).await?;
263 let location = if resp.status() == StatusCode::OK {
264 let (parts, slc) = resp.into_parts();
265 let query_resp = ghac_types::CreateCacheEntryResponse::decode(slc)
266 .map_err(new_prost_decode_error)?;
267 if !query_resp.ok {
268 return Err(Error::new(
269 ErrorKind::Unexpected,
270 "create cache entry returns non-ok",
271 )
272 .with_context("parts", format!("{:?}", parts)));
273 }
274 query_resp.signed_upload_url
275 } else {
276 return Err(parse_error(resp));
277 };
278 Ok(location)
279 }
280 }
281 }
282
283 pub async fn ghac_v1_write(
284 &self,
285 upload_url: &str,
286 size: u64,
287 offset: u64,
288 body: Buffer,
289 ) -> Result<Response<Buffer>> {
290 let mut req = Request::patch(upload_url);
291 req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
292 req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
293 req = req.header(CONTENT_LENGTH, size);
294 req = req.header(CONTENT_TYPE, "application/octet-stream");
295 req = req.header(
296 CONTENT_RANGE,
297 BytesContentRange::default()
298 .with_range(offset, offset + size - 1)
299 .to_header(),
300 );
301 let req = req
302 .extension(Operation::Write)
303 .body(body)
304 .map_err(new_request_build_error)?;
305
306 self.info.http_client().send(req).await
307 }
308
309 pub async fn ghac_finalize_upload(&self, path: &str, url: &str, size: u64) -> Result<()> {
310 let p = build_abs_path(&self.root, path);
311
312 match self.service_version {
313 GhacVersion::V1 => {
314 let bs = serde_json::to_vec(&GhacCommitRequest { size })
315 .map_err(new_json_serialize_error)?;
316
317 let req = Request::post(url)
318 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
319 .header(ACCEPT, CACHE_HEADER_ACCEPT)
320 .header(CONTENT_TYPE, CONTENT_TYPE_JSON)
321 .header(CONTENT_LENGTH, bs.len())
322 .extension(Operation::Write)
323 .body(Buffer::from(bs))
324 .map_err(new_request_build_error)?;
325 let resp = self.info.http_client().send(req).await?;
326 if resp.status().is_success() {
327 Ok(())
328 } else {
329 Err(parse_error(resp))
330 }
331 }
332 GhacVersion::V2 => {
333 let url = format!(
334 "{}{CACHE_URL_BASE_V2}/FinalizeCacheEntryUpload",
335 self.cache_url,
336 );
337
338 let req = ghac_types::FinalizeCacheEntryUploadRequest {
339 key: p,
340 version: self.version.clone(),
341 size_bytes: size as i64,
342
343 metadata: None,
344 };
345 let body = Buffer::from(req.encode_to_vec());
346
347 let req = Request::post(&url)
348 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
349 .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
350 .header(CONTENT_LENGTH, body.len())
351 .extension(Operation::Write)
352 .body(body)
353 .map_err(new_request_build_error)?;
354 let resp = self.info.http_client().send(req).await?;
355 if resp.status() != StatusCode::OK {
356 return Err(parse_error(resp));
357 };
358 Ok(())
359 }
360 }
361 }
362}
363
364pub fn is_ghes() -> bool {
368 let server_url =
370 env::var("GITHUB_SERVER_URL").unwrap_or_else(|_| "https://github.com".to_string());
371
372 let Ok(url) = Uri::from_str(&server_url) else {
373 return false;
375 };
376
377 let hostname = url.host().unwrap_or("").trim_end().to_lowercase();
379
380 let is_github_host = hostname == "github.com";
381 let is_ghe_host = hostname.ends_with(".ghe.com");
382 let is_localhost = hostname.ends_with(".localhost");
383
384 !is_github_host && !is_ghe_host && !is_localhost
385}
386
387pub fn get_cache_service_version() -> GhacVersion {
389 if is_ghes() {
390 GhacVersion::V1
392 } else {
393 let value = env::var(ACTIONS_CACHE_SERVICE_V2).unwrap_or_default();
395 if value.is_empty() {
396 GhacVersion::V1
397 } else {
398 GhacVersion::V2
399 }
400 }
401}
402
403pub fn get_cache_service_url(version: GhacVersion) -> String {
405 match version {
406 GhacVersion::V1 => {
407 env::var(ACTIONS_CACHE_URL)
409 .or_else(|_| env::var(ACTIONS_RESULTS_URL))
410 .unwrap_or_default()
411 }
412 GhacVersion::V2 => {
413 env::var(ACTIONS_RESULTS_URL).unwrap_or_default()
415 }
416 }
417}
418
419pub fn new_prost_decode_error(e: prost::DecodeError) -> Error {
421 Error::new(ErrorKind::Unexpected, "deserialize protobuf").set_source(e)
422}
423
424#[derive(Deserialize)]
425#[serde(rename_all = "camelCase")]
426pub struct GhacQueryResponse {
427 pub archive_location: String,
431}
432
433#[derive(Serialize)]
434pub struct GhacReserveRequest {
435 pub key: String,
436 pub version: String,
437}
438
439#[derive(Deserialize)]
440#[serde(rename_all = "camelCase")]
441pub struct GhacReserveResponse {
442 pub cache_id: i64,
443}
444
445#[derive(Serialize)]
446pub struct GhacCommitRequest {
447 pub size: u64,
448}