1use std::env;
19use std::fmt::{Debug, Formatter};
20use std::str::FromStr;
21use std::sync::Arc;
22
23use ::ghac::v1 as ghac_types;
24use bytes::{Buf, Bytes};
25use http::header::{ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE};
26use http::{Request, StatusCode, Uri};
27use prost::Message;
28use serde::{Deserialize, Serialize};
29
30use super::error::parse_error;
31use crate::raw::*;
32use crate::*;
33
34pub const CACHE_URL_BASE: &str = "_apis/artifactcache";
36pub const CACHE_URL_BASE_V2: &str = "twirp/github.actions.results.api.v1.CacheService";
38pub const CACHE_HEADER_ACCEPT: &str = "application/json;api-version=6.0-preview.1";
40pub const ACTIONS_CACHE_URL: &str = "ACTIONS_CACHE_URL";
44pub const ACTIONS_RUNTIME_TOKEN: &str = "ACTIONS_RUNTIME_TOKEN";
49pub const ACTIONS_CACHE_SERVICE_V2: &str = "ACTIONS_CACHE_SERVICE_V2";
51pub const ACTIONS_RESULTS_URL: &str = "ACTIONS_RESULTS_URL";
53pub const CONTENT_TYPE_JSON: &str = "application/json";
55pub const CONTENT_TYPE_PROTOBUF: &str = "application/protobuf";
57
58#[derive(Clone, Copy, Debug)]
60pub enum GhacVersion {
61 V1,
62 V2,
63}
64
65#[derive(Clone)]
67pub struct GhacCore {
68 pub info: Arc<AccessorInfo>,
69
70 pub root: String,
72
73 pub cache_url: String,
74 pub catch_token: String,
75 pub version: String,
76
77 pub service_version: GhacVersion,
78}
79
80impl Debug for GhacCore {
81 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
82 f.debug_struct("GhacCore")
83 .field("root", &self.root)
84 .field("cache_url", &self.cache_url)
85 .field("version", &self.version)
86 .field("service_version", &self.service_version)
87 .finish_non_exhaustive()
88 }
89}
90
91impl GhacCore {
92 pub async fn ghac_get_download_url(&self, path: &str) -> Result<String> {
93 let p = build_abs_path(&self.root, path);
94
95 match self.service_version {
96 GhacVersion::V1 => {
97 let url = format!(
98 "{}{CACHE_URL_BASE}/cache?keys={}&version={}",
99 self.cache_url,
100 percent_encode_path(&p),
101 self.version
102 );
103
104 let mut req = Request::get(&url);
105 req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
106 req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
107
108 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
109 let resp = self.info.http_client().send(req).await?;
110 let location = if resp.status() == StatusCode::OK {
111 let slc = resp.into_body();
112 let query_resp: GhacQueryResponse = serde_json::from_reader(slc.reader())
113 .map_err(new_json_deserialize_error)?;
114 query_resp.archive_location
115 } else {
116 return Err(parse_error(resp));
117 };
118 Ok(location)
119 }
120 GhacVersion::V2 => {
121 let url = format!(
122 "{}{CACHE_URL_BASE_V2}/GetCacheEntryDownloadURL",
123 self.cache_url,
124 );
125
126 let req = ghac_types::GetCacheEntryDownloadUrlRequest {
127 key: p,
128 version: self.version.clone(),
129
130 metadata: None,
131 restore_keys: vec![],
132 };
133
134 let body = Buffer::from(req.encode_to_vec());
135
136 let req = Request::post(&url)
137 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
138 .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
139 .header(CONTENT_LENGTH, body.len())
140 .body(body)
141 .map_err(new_request_build_error)?;
142 let resp = self.info.http_client().send(req).await?;
143 let location = if resp.status() == StatusCode::OK {
144 let slc = resp.into_body();
145 let query_resp = ghac_types::GetCacheEntryDownloadUrlResponse::decode(slc)
146 .map_err(new_prost_decode_error)?;
147 if !query_resp.ok {
148 let mut err = Error::new(
149 ErrorKind::NotFound,
150 "GetCacheEntryDownloadURL returns non-ok, the key doesn't exist",
151 );
152
153 if env::var("OPENDAL_TEST") == Ok("ghac".to_string()) {
160 err = err.set_temporary();
161 }
162 return Err(err);
163 }
164 query_resp.signed_download_url
165 } else {
166 return Err(parse_error(resp));
167 };
168
169 Ok(location)
170 }
171 }
172 }
173
174 pub async fn ghac_get_upload_url(&self, path: &str) -> Result<String> {
175 let p = build_abs_path(&self.root, path);
176
177 match self.service_version {
178 GhacVersion::V1 => {
179 let url = format!("{}{CACHE_URL_BASE}/caches", self.cache_url);
180
181 let bs = serde_json::to_vec(&GhacReserveRequest {
182 key: p,
183 version: self.version.to_string(),
184 })
185 .map_err(new_json_serialize_error)?;
186
187 let mut req = Request::post(&url);
188 req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
189 req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
190 req = req.header(CONTENT_TYPE, CONTENT_TYPE_JSON);
191 req = req.header(CONTENT_LENGTH, bs.len());
192
193 let req = req
194 .body(Buffer::from(Bytes::from(bs)))
195 .map_err(new_request_build_error)?;
196 let resp = self.info.http_client().send(req).await?;
197 let cache_id = if resp.status().is_success() {
198 let slc = resp.into_body();
199 let reserve_resp: GhacReserveResponse = serde_json::from_reader(slc.reader())
200 .map_err(new_json_deserialize_error)?;
201 reserve_resp.cache_id
202 } else {
203 return Err(
204 parse_error(resp).map(|err| err.with_operation("Backend::ghac_reserve"))
205 );
206 };
207
208 let url = format!("{}{CACHE_URL_BASE}/caches/{cache_id}", self.cache_url);
209 Ok(url)
210 }
211 GhacVersion::V2 => {
212 let url = format!("{}{CACHE_URL_BASE_V2}/CreateCacheEntry", self.cache_url,);
213
214 let req = ghac_types::CreateCacheEntryRequest {
215 key: p,
216 version: self.version.clone(),
217 metadata: None,
218 };
219
220 let body = Buffer::from(req.encode_to_vec());
221
222 let req = Request::post(&url)
223 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
224 .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
225 .header(CONTENT_LENGTH, body.len())
226 .body(body)
227 .map_err(new_request_build_error)?;
228 let resp = self.info.http_client().send(req).await?;
229 let location = if resp.status() == StatusCode::OK {
230 let (parts, slc) = resp.into_parts();
231 let query_resp = ghac_types::CreateCacheEntryResponse::decode(slc)
232 .map_err(new_prost_decode_error)?;
233 if !query_resp.ok {
234 return Err(Error::new(
235 ErrorKind::Unexpected,
236 "create cache entry returns non-ok",
237 )
238 .with_context("parts", format!("{:?}", parts)));
239 }
240 query_resp.signed_upload_url
241 } else {
242 return Err(parse_error(resp));
243 };
244 Ok(location)
245 }
246 }
247 }
248
249 pub async fn ghac_finalize_upload(&self, path: &str, url: &str, size: u64) -> Result<()> {
250 let p = build_abs_path(&self.root, path);
251
252 match self.service_version {
253 GhacVersion::V1 => {
254 let bs = serde_json::to_vec(&GhacCommitRequest { size })
255 .map_err(new_json_serialize_error)?;
256
257 let req = Request::post(url)
258 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
259 .header(ACCEPT, CACHE_HEADER_ACCEPT)
260 .header(CONTENT_TYPE, CONTENT_TYPE_JSON)
261 .header(CONTENT_LENGTH, bs.len())
262 .body(Buffer::from(bs))
263 .map_err(new_request_build_error)?;
264 let resp = self.info.http_client().send(req).await?;
265 if resp.status().is_success() {
266 Ok(())
267 } else {
268 Err(parse_error(resp))
269 }
270 }
271 GhacVersion::V2 => {
272 let url = format!(
273 "{}{CACHE_URL_BASE_V2}/FinalizeCacheEntryUpload",
274 self.cache_url,
275 );
276
277 let req = ghac_types::FinalizeCacheEntryUploadRequest {
278 key: p,
279 version: self.version.clone(),
280 size_bytes: size as i64,
281
282 metadata: None,
283 };
284 let body = Buffer::from(req.encode_to_vec());
285
286 let req = Request::post(&url)
287 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
288 .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
289 .header(CONTENT_LENGTH, body.len())
290 .body(body)
291 .map_err(new_request_build_error)?;
292 let resp = self.info.http_client().send(req).await?;
293 if resp.status() != StatusCode::OK {
294 return Err(parse_error(resp));
295 };
296 Ok(())
297 }
298 }
299 }
300}
301
302pub fn is_ghes() -> bool {
306 let server_url =
308 env::var("GITHUB_SERVER_URL").unwrap_or_else(|_| "https://github.com".to_string());
309
310 let Ok(url) = Uri::from_str(&server_url) else {
311 return false;
313 };
314
315 let hostname = url.host().unwrap_or("").trim_end().to_lowercase();
317
318 let is_github_host = hostname == "github.com";
319 let is_ghe_host = hostname.ends_with(".ghe.com");
320 let is_localhost = hostname.ends_with(".localhost");
321
322 !is_github_host && !is_ghe_host && !is_localhost
323}
324
325pub fn get_cache_service_version() -> GhacVersion {
327 if is_ghes() {
328 GhacVersion::V1
330 } else {
331 let value = env::var(ACTIONS_CACHE_SERVICE_V2).unwrap_or_default();
333 if value.is_empty() {
334 GhacVersion::V1
335 } else {
336 GhacVersion::V2
337 }
338 }
339}
340
341pub fn get_cache_service_url(version: GhacVersion) -> String {
343 match version {
344 GhacVersion::V1 => {
345 env::var(ACTIONS_CACHE_URL)
347 .or_else(|_| env::var(ACTIONS_RESULTS_URL))
348 .unwrap_or_default()
349 }
350 GhacVersion::V2 => {
351 env::var(ACTIONS_RESULTS_URL).unwrap_or_default()
353 }
354 }
355}
356
357pub fn new_prost_decode_error(e: prost::DecodeError) -> Error {
359 Error::new(ErrorKind::Unexpected, "deserialize protobuf").set_source(e)
360}
361
362#[derive(Deserialize)]
363#[serde(rename_all = "camelCase")]
364pub struct GhacQueryResponse {
365 pub archive_location: String,
369}
370
371#[derive(Serialize)]
372pub struct GhacReserveRequest {
373 pub key: String,
374 pub version: String,
375}
376
377#[derive(Deserialize)]
378#[serde(rename_all = "camelCase")]
379pub struct GhacReserveResponse {
380 pub cache_id: i64,
381}
382
383#[derive(Serialize)]
384pub struct GhacCommitRequest {
385 pub size: u64,
386}