1use std::env;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use ::ghac::v1 as ghac_types;
25use bytes::Buf;
26use bytes::Bytes;
27use http::header::ACCEPT;
28use http::header::AUTHORIZATION;
29use http::header::CONTENT_LENGTH;
30use http::header::CONTENT_RANGE;
31use http::header::CONTENT_TYPE;
32use http::header::{self};
33use http::Request;
34use http::Response;
35use http::StatusCode;
36use http::Uri;
37use prost::Message;
38use serde::Deserialize;
39use serde::Serialize;
40
41use super::error::parse_error;
42use crate::raw::*;
43use crate::*;
44
45pub const CACHE_URL_BASE: &str = "_apis/artifactcache";
47pub const CACHE_URL_BASE_V2: &str = "twirp/github.actions.results.api.v1.CacheService";
49pub const CACHE_HEADER_ACCEPT: &str = "application/json;api-version=6.0-preview.1";
51pub const ACTIONS_CACHE_URL: &str = "ACTIONS_CACHE_URL";
55pub const ACTIONS_RUNTIME_TOKEN: &str = "ACTIONS_RUNTIME_TOKEN";
60pub const ACTIONS_CACHE_SERVICE_V2: &str = "ACTIONS_CACHE_SERVICE_V2";
62pub const ACTIONS_RESULTS_URL: &str = "ACTIONS_RESULTS_URL";
64pub const CONTENT_TYPE_JSON: &str = "application/json";
66pub const CONTENT_TYPE_PROTOBUF: &str = "application/protobuf";
68
69#[derive(Clone, Copy, Debug)]
71pub enum GhacVersion {
72 V1,
73 V2,
74}
75
76#[derive(Clone)]
78pub struct GhacCore {
79 pub info: Arc<AccessorInfo>,
80
81 pub root: String,
83
84 pub cache_url: String,
85 pub catch_token: String,
86 pub version: String,
87
88 pub service_version: GhacVersion,
89}
90
91impl Debug for GhacCore {
92 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
93 f.debug_struct("GhacCore")
94 .field("root", &self.root)
95 .field("cache_url", &self.cache_url)
96 .field("version", &self.version)
97 .field("service_version", &self.service_version)
98 .finish_non_exhaustive()
99 }
100}
101
102impl GhacCore {
103 async fn ghac_get_download_url(&self, path: &str) -> Result<String> {
104 let p = build_abs_path(&self.root, path);
105
106 match self.service_version {
107 GhacVersion::V1 => {
108 let url = format!(
109 "{}{CACHE_URL_BASE}/cache?keys={}&version={}",
110 self.cache_url,
111 percent_encode_path(&p),
112 self.version
113 );
114
115 let mut req = Request::get(&url);
116 req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
117 req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
118
119 let req = req
120 .extension(Operation::Read)
121 .body(Buffer::new())
122 .map_err(new_request_build_error)?;
123 let resp = self.info.http_client().send(req).await?;
124 let location = if resp.status() == StatusCode::OK {
125 let slc = resp.into_body();
126 let query_resp: GhacQueryResponse = serde_json::from_reader(slc.reader())
127 .map_err(new_json_deserialize_error)?;
128 query_resp.archive_location
129 } else {
130 return Err(parse_error(resp));
131 };
132 Ok(location)
133 }
134 GhacVersion::V2 => {
135 let url = format!(
136 "{}{CACHE_URL_BASE_V2}/GetCacheEntryDownloadURL",
137 self.cache_url,
138 );
139
140 let req = ghac_types::GetCacheEntryDownloadUrlRequest {
141 key: p,
142 version: self.version.clone(),
143
144 metadata: None,
145 restore_keys: vec![],
146 };
147
148 let body = Buffer::from(req.encode_to_vec());
149
150 let req = Request::post(&url)
151 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
152 .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
153 .header(CONTENT_LENGTH, body.len())
154 .extension(Operation::Read)
155 .body(body)
156 .map_err(new_request_build_error)?;
157 let resp = self.info.http_client().send(req).await?;
158 let location = if resp.status() == StatusCode::OK {
159 let slc = resp.into_body();
160 let query_resp = ghac_types::GetCacheEntryDownloadUrlResponse::decode(slc)
161 .map_err(new_prost_decode_error)?;
162 if !query_resp.ok {
163 let mut err = Error::new(
164 ErrorKind::NotFound,
165 "GetCacheEntryDownloadURL returns non-ok, the key doesn't exist",
166 );
167
168 if env::var("OPENDAL_TEST") == Ok("ghac".to_string()) {
175 err = err.set_temporary();
176 }
177 return Err(err);
178 }
179 query_resp.signed_download_url
180 } else {
181 return Err(parse_error(resp));
182 };
183
184 Ok(location)
185 }
186 }
187 }
188
189 pub async fn ghac_stat(&self, path: &str) -> Result<Response<Buffer>> {
190 let location = self.ghac_get_download_url(path).await?;
191
192 let req = Request::get(location)
193 .header(header::RANGE, "bytes=0-0")
194 .extension(Operation::Stat)
195 .body(Buffer::new())
196 .map_err(new_request_build_error)?;
197
198 self.info.http_client().send(req).await
199 }
200
201 pub async fn ghac_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
202 let location = self.ghac_get_download_url(path).await?;
203
204 let mut req = Request::get(location);
205
206 if !range.is_full() {
207 req = req.header(header::RANGE, range.to_header());
208 }
209 let req = req
210 .extension(Operation::Read)
211 .body(Buffer::new())
212 .map_err(new_request_build_error)?;
213
214 self.info.http_client().fetch(req).await
215 }
216
217 pub async fn ghac_get_upload_url(&self, path: &str) -> Result<String> {
218 let p = build_abs_path(&self.root, path);
219
220 match self.service_version {
221 GhacVersion::V1 => {
222 let url = format!("{}{CACHE_URL_BASE}/caches", self.cache_url);
223
224 let bs = serde_json::to_vec(&GhacReserveRequest {
225 key: p,
226 version: self.version.to_string(),
227 })
228 .map_err(new_json_serialize_error)?;
229
230 let mut req = Request::post(&url);
231 req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
232 req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
233 req = req.header(CONTENT_TYPE, CONTENT_TYPE_JSON);
234 req = req.header(CONTENT_LENGTH, bs.len());
235
236 let req = req
237 .extension(Operation::Write)
238 .body(Buffer::from(Bytes::from(bs)))
239 .map_err(new_request_build_error)?;
240 let resp = self.info.http_client().send(req).await?;
241 let cache_id = if resp.status().is_success() {
242 let slc = resp.into_body();
243 let reserve_resp: GhacReserveResponse = serde_json::from_reader(slc.reader())
244 .map_err(new_json_deserialize_error)?;
245 reserve_resp.cache_id
246 } else {
247 return Err(
248 parse_error(resp).map(|err| err.with_operation("Backend::ghac_reserve"))
249 );
250 };
251
252 let url = format!("{}{CACHE_URL_BASE}/caches/{cache_id}", self.cache_url);
253 Ok(url)
254 }
255 GhacVersion::V2 => {
256 let url = format!("{}{CACHE_URL_BASE_V2}/CreateCacheEntry", self.cache_url,);
257
258 let req = ghac_types::CreateCacheEntryRequest {
259 key: p,
260 version: self.version.clone(),
261 metadata: None,
262 };
263
264 let body = Buffer::from(req.encode_to_vec());
265
266 let req = Request::post(&url)
267 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
268 .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
269 .header(CONTENT_LENGTH, body.len())
270 .extension(Operation::Write)
271 .body(body)
272 .map_err(new_request_build_error)?;
273 let resp = self.info.http_client().send(req).await?;
274 let location = if resp.status() == StatusCode::OK {
275 let (parts, slc) = resp.into_parts();
276 let query_resp = ghac_types::CreateCacheEntryResponse::decode(slc)
277 .map_err(new_prost_decode_error)?;
278 if !query_resp.ok {
279 return Err(Error::new(
280 ErrorKind::Unexpected,
281 "create cache entry returns non-ok",
282 )
283 .with_context("parts", format!("{:?}", parts)));
284 }
285 query_resp.signed_upload_url
286 } else {
287 return Err(parse_error(resp));
288 };
289 Ok(location)
290 }
291 }
292 }
293
294 pub async fn ghac_v1_write(
295 &self,
296 upload_url: &str,
297 size: u64,
298 offset: u64,
299 body: Buffer,
300 ) -> Result<Response<Buffer>> {
301 let mut req = Request::patch(upload_url);
302 req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
303 req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
304 req = req.header(CONTENT_LENGTH, size);
305 req = req.header(CONTENT_TYPE, "application/octet-stream");
306 req = req.header(
307 CONTENT_RANGE,
308 BytesContentRange::default()
309 .with_range(offset, offset + size - 1)
310 .to_header(),
311 );
312 let req = req
313 .extension(Operation::Write)
314 .body(body)
315 .map_err(new_request_build_error)?;
316
317 self.info.http_client().send(req).await
318 }
319
320 pub async fn ghac_finalize_upload(&self, path: &str, url: &str, size: u64) -> Result<()> {
321 let p = build_abs_path(&self.root, path);
322
323 match self.service_version {
324 GhacVersion::V1 => {
325 let bs = serde_json::to_vec(&GhacCommitRequest { size })
326 .map_err(new_json_serialize_error)?;
327
328 let req = Request::post(url)
329 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
330 .header(ACCEPT, CACHE_HEADER_ACCEPT)
331 .header(CONTENT_TYPE, CONTENT_TYPE_JSON)
332 .header(CONTENT_LENGTH, bs.len())
333 .extension(Operation::Write)
334 .body(Buffer::from(bs))
335 .map_err(new_request_build_error)?;
336 let resp = self.info.http_client().send(req).await?;
337 if resp.status().is_success() {
338 Ok(())
339 } else {
340 Err(parse_error(resp))
341 }
342 }
343 GhacVersion::V2 => {
344 let url = format!(
345 "{}{CACHE_URL_BASE_V2}/FinalizeCacheEntryUpload",
346 self.cache_url,
347 );
348
349 let req = ghac_types::FinalizeCacheEntryUploadRequest {
350 key: p,
351 version: self.version.clone(),
352 size_bytes: size as i64,
353
354 metadata: None,
355 };
356 let body = Buffer::from(req.encode_to_vec());
357
358 let req = Request::post(&url)
359 .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
360 .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
361 .header(CONTENT_LENGTH, body.len())
362 .extension(Operation::Write)
363 .body(body)
364 .map_err(new_request_build_error)?;
365 let resp = self.info.http_client().send(req).await?;
366 if resp.status() != StatusCode::OK {
367 return Err(parse_error(resp));
368 };
369 Ok(())
370 }
371 }
372 }
373}
374
375pub fn is_ghes() -> bool {
379 let server_url =
381 env::var("GITHUB_SERVER_URL").unwrap_or_else(|_| "https://github.com".to_string());
382
383 let Ok(url) = Uri::from_str(&server_url) else {
384 return false;
386 };
387
388 let hostname = url.host().unwrap_or("").trim_end().to_lowercase();
390
391 let is_github_host = hostname == "github.com";
392 let is_ghe_host = hostname.ends_with(".ghe.com");
393 let is_localhost = hostname.ends_with(".localhost");
394
395 !is_github_host && !is_ghe_host && !is_localhost
396}
397
398pub fn get_cache_service_version() -> GhacVersion {
400 if is_ghes() {
401 GhacVersion::V1
403 } else {
404 let value = env::var(ACTIONS_CACHE_SERVICE_V2).unwrap_or_default();
406 if value.is_empty() {
407 GhacVersion::V1
408 } else {
409 GhacVersion::V2
410 }
411 }
412}
413
414pub fn get_cache_service_url(version: GhacVersion) -> String {
416 match version {
417 GhacVersion::V1 => {
418 env::var(ACTIONS_CACHE_URL)
420 .or_else(|_| env::var(ACTIONS_RESULTS_URL))
421 .unwrap_or_default()
422 }
423 GhacVersion::V2 => {
424 env::var(ACTIONS_RESULTS_URL).unwrap_or_default()
426 }
427 }
428}
429
430pub fn new_prost_decode_error(e: prost::DecodeError) -> Error {
432 Error::new(ErrorKind::Unexpected, "deserialize protobuf").set_source(e)
433}
434
435#[derive(Deserialize)]
436#[serde(rename_all = "camelCase")]
437pub struct GhacQueryResponse {
438 pub archive_location: String,
442}
443
444#[derive(Serialize)]
445pub struct GhacReserveRequest {
446 pub key: String,
447 pub version: String,
448}
449
450#[derive(Deserialize)]
451#[serde(rename_all = "camelCase")]
452pub struct GhacReserveResponse {
453 pub cache_id: i64,
454}
455
456#[derive(Serialize)]
457pub struct GhacCommitRequest {
458 pub size: u64,
459}