opendal/services/ghac/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::env;
19use std::fmt::{Debug, Formatter};
20use std::str::FromStr;
21use std::sync::Arc;
22
23use ::ghac::v1 as ghac_types;
24use bytes::{Buf, Bytes};
25use http::header::{ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE};
26use http::{Request, StatusCode, Uri};
27use prost::Message;
28use serde::{Deserialize, Serialize};
29
30use super::error::parse_error;
31use crate::raw::*;
32use crate::*;
33
34/// The base url for cache url.
35pub const CACHE_URL_BASE: &str = "_apis/artifactcache";
36/// The base url for cache service v2.
37pub const CACHE_URL_BASE_V2: &str = "twirp/github.actions.results.api.v1.CacheService";
38/// Cache API requires to provide an accept header.
39pub const CACHE_HEADER_ACCEPT: &str = "application/json;api-version=6.0-preview.1";
40/// The cache url env for ghac.
41///
42/// The url will be like `https://artifactcache.actions.githubusercontent.com/<id>/`
43pub const ACTIONS_CACHE_URL: &str = "ACTIONS_CACHE_URL";
44/// The runtime token env for ghac.
45///
46/// This token will be valid for 6h and github action will running for 6
47/// hours at most. So we don't need to refetch it again.
48pub const ACTIONS_RUNTIME_TOKEN: &str = "ACTIONS_RUNTIME_TOKEN";
49/// The cache service version env for ghac.
50pub const ACTIONS_CACHE_SERVICE_V2: &str = "ACTIONS_CACHE_SERVICE_V2";
51/// The results url env for ghac.
52pub const ACTIONS_RESULTS_URL: &str = "ACTIONS_RESULTS_URL";
53/// The content type for protobuf.
54pub const CONTENT_TYPE_JSON: &str = "application/json";
55/// The content type for protobuf.
56pub const CONTENT_TYPE_PROTOBUF: &str = "application/protobuf";
57
58/// The version of github action cache.
59#[derive(Clone, Copy, Debug)]
60pub enum GhacVersion {
61    V1,
62    V2,
63}
64
65/// Core for github action cache services.
66#[derive(Clone)]
67pub struct GhacCore {
68    pub info: Arc<AccessorInfo>,
69
70    // root should end with "/"
71    pub root: String,
72
73    pub cache_url: String,
74    pub catch_token: String,
75    pub version: String,
76
77    pub service_version: GhacVersion,
78}
79
80impl Debug for GhacCore {
81    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
82        f.debug_struct("GhacCore")
83            .field("root", &self.root)
84            .field("cache_url", &self.cache_url)
85            .field("version", &self.version)
86            .field("service_version", &self.service_version)
87            .finish_non_exhaustive()
88    }
89}
90
91impl GhacCore {
92    pub async fn ghac_get_download_url(&self, path: &str) -> Result<String> {
93        let p = build_abs_path(&self.root, path);
94
95        match self.service_version {
96            GhacVersion::V1 => {
97                let url = format!(
98                    "{}{CACHE_URL_BASE}/cache?keys={}&version={}",
99                    self.cache_url,
100                    percent_encode_path(&p),
101                    self.version
102                );
103
104                let mut req = Request::get(&url);
105                req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
106                req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
107
108                let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
109                let resp = self.info.http_client().send(req).await?;
110                let location = if resp.status() == StatusCode::OK {
111                    let slc = resp.into_body();
112                    let query_resp: GhacQueryResponse = serde_json::from_reader(slc.reader())
113                        .map_err(new_json_deserialize_error)?;
114                    query_resp.archive_location
115                } else {
116                    return Err(parse_error(resp));
117                };
118                Ok(location)
119            }
120            GhacVersion::V2 => {
121                let url = format!(
122                    "{}{CACHE_URL_BASE_V2}/GetCacheEntryDownloadURL",
123                    self.cache_url,
124                );
125
126                let req = ghac_types::GetCacheEntryDownloadUrlRequest {
127                    key: p,
128                    version: self.version.clone(),
129
130                    metadata: None,
131                    restore_keys: vec![],
132                };
133
134                let body = Buffer::from(req.encode_to_vec());
135
136                let req = Request::post(&url)
137                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
138                    .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
139                    .header(CONTENT_LENGTH, body.len())
140                    .body(body)
141                    .map_err(new_request_build_error)?;
142                let resp = self.info.http_client().send(req).await?;
143                let location = if resp.status() == StatusCode::OK {
144                    let slc = resp.into_body();
145                    let query_resp = ghac_types::GetCacheEntryDownloadUrlResponse::decode(slc)
146                        .map_err(new_prost_decode_error)?;
147                    if !query_resp.ok {
148                        let mut err = Error::new(
149                            ErrorKind::NotFound,
150                            "GetCacheEntryDownloadURL returns non-ok, the key doesn't exist",
151                        );
152
153                        // GHAC is a cache service, so it's acceptable for it to occasionally not contain
154                        // data that users have just written. However, we don't want users to always have
155                        // to retry reading it, nor do we want our CI to fail constantly.
156                        //
157                        // Here's the trick: we check if the environment variable `OPENDAL_TEST` is set to `ghac`.
158                        // If it is, we mark the error as temporary to allow retries in the test CI.
159                        if env::var("OPENDAL_TEST") == Ok("ghac".to_string()) {
160                            err = err.set_temporary();
161                        }
162                        return Err(err);
163                    }
164                    query_resp.signed_download_url
165                } else {
166                    return Err(parse_error(resp));
167                };
168
169                Ok(location)
170            }
171        }
172    }
173
174    pub async fn ghac_get_upload_url(&self, path: &str) -> Result<String> {
175        let p = build_abs_path(&self.root, path);
176
177        match self.service_version {
178            GhacVersion::V1 => {
179                let url = format!("{}{CACHE_URL_BASE}/caches", self.cache_url);
180
181                let bs = serde_json::to_vec(&GhacReserveRequest {
182                    key: p,
183                    version: self.version.to_string(),
184                })
185                .map_err(new_json_serialize_error)?;
186
187                let mut req = Request::post(&url);
188                req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
189                req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
190                req = req.header(CONTENT_TYPE, CONTENT_TYPE_JSON);
191                req = req.header(CONTENT_LENGTH, bs.len());
192
193                let req = req
194                    .body(Buffer::from(Bytes::from(bs)))
195                    .map_err(new_request_build_error)?;
196                let resp = self.info.http_client().send(req).await?;
197                let cache_id = if resp.status().is_success() {
198                    let slc = resp.into_body();
199                    let reserve_resp: GhacReserveResponse = serde_json::from_reader(slc.reader())
200                        .map_err(new_json_deserialize_error)?;
201                    reserve_resp.cache_id
202                } else {
203                    return Err(
204                        parse_error(resp).map(|err| err.with_operation("Backend::ghac_reserve"))
205                    );
206                };
207
208                let url = format!("{}{CACHE_URL_BASE}/caches/{cache_id}", self.cache_url);
209                Ok(url)
210            }
211            GhacVersion::V2 => {
212                let url = format!("{}{CACHE_URL_BASE_V2}/CreateCacheEntry", self.cache_url,);
213
214                let req = ghac_types::CreateCacheEntryRequest {
215                    key: p,
216                    version: self.version.clone(),
217                    metadata: None,
218                };
219
220                let body = Buffer::from(req.encode_to_vec());
221
222                let req = Request::post(&url)
223                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
224                    .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
225                    .header(CONTENT_LENGTH, body.len())
226                    .body(body)
227                    .map_err(new_request_build_error)?;
228                let resp = self.info.http_client().send(req).await?;
229                let location = if resp.status() == StatusCode::OK {
230                    let (parts, slc) = resp.into_parts();
231                    let query_resp = ghac_types::CreateCacheEntryResponse::decode(slc)
232                        .map_err(new_prost_decode_error)?;
233                    if !query_resp.ok {
234                        return Err(Error::new(
235                            ErrorKind::Unexpected,
236                            "create cache entry returns non-ok",
237                        )
238                        .with_context("parts", format!("{:?}", parts)));
239                    }
240                    query_resp.signed_upload_url
241                } else {
242                    return Err(parse_error(resp));
243                };
244                Ok(location)
245            }
246        }
247    }
248
249    pub async fn ghac_finalize_upload(&self, path: &str, url: &str, size: u64) -> Result<()> {
250        let p = build_abs_path(&self.root, path);
251
252        match self.service_version {
253            GhacVersion::V1 => {
254                let bs = serde_json::to_vec(&GhacCommitRequest { size })
255                    .map_err(new_json_serialize_error)?;
256
257                let req = Request::post(url)
258                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
259                    .header(ACCEPT, CACHE_HEADER_ACCEPT)
260                    .header(CONTENT_TYPE, CONTENT_TYPE_JSON)
261                    .header(CONTENT_LENGTH, bs.len())
262                    .body(Buffer::from(bs))
263                    .map_err(new_request_build_error)?;
264                let resp = self.info.http_client().send(req).await?;
265                if resp.status().is_success() {
266                    Ok(())
267                } else {
268                    Err(parse_error(resp))
269                }
270            }
271            GhacVersion::V2 => {
272                let url = format!(
273                    "{}{CACHE_URL_BASE_V2}/FinalizeCacheEntryUpload",
274                    self.cache_url,
275                );
276
277                let req = ghac_types::FinalizeCacheEntryUploadRequest {
278                    key: p,
279                    version: self.version.clone(),
280                    size_bytes: size as i64,
281
282                    metadata: None,
283                };
284                let body = Buffer::from(req.encode_to_vec());
285
286                let req = Request::post(&url)
287                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
288                    .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
289                    .header(CONTENT_LENGTH, body.len())
290                    .body(body)
291                    .map_err(new_request_build_error)?;
292                let resp = self.info.http_client().send(req).await?;
293                if resp.status() != StatusCode::OK {
294                    return Err(parse_error(resp));
295                };
296                Ok(())
297            }
298        }
299    }
300}
301
302/// Determines if the current environment is GitHub Enterprise Server (GHES)
303///
304/// We need to know this since GHES doesn't support ghac v2 yet.
305pub fn is_ghes() -> bool {
306    // Fetch GitHub Server URL with fallback to "https://github.com"
307    let server_url =
308        env::var("GITHUB_SERVER_URL").unwrap_or_else(|_| "https://github.com".to_string());
309
310    let Ok(url) = Uri::from_str(&server_url) else {
311        // We just return false if the URL is invalid
312        return false;
313    };
314
315    // Check against known non-GHES host patterns
316    let hostname = url.host().unwrap_or("").trim_end().to_lowercase();
317
318    let is_github_host = hostname == "github.com";
319    let is_ghe_host = hostname.ends_with(".ghe.com");
320    let is_localhost = hostname.ends_with(".localhost");
321
322    !is_github_host && !is_ghe_host && !is_localhost
323}
324
325/// Determines the cache service version based on environment
326pub fn get_cache_service_version() -> GhacVersion {
327    if is_ghes() {
328        // GHES only supports v1 regardless of feature flags
329        GhacVersion::V1
330    } else {
331        // Check for presence of non-empty ACTIONS_CACHE_SERVICE_V2
332        let value = env::var(ACTIONS_CACHE_SERVICE_V2).unwrap_or_default();
333        if value.is_empty() {
334            GhacVersion::V1
335        } else {
336            GhacVersion::V2
337        }
338    }
339}
340
341/// Returns the appropriate cache service URL based on version
342pub fn get_cache_service_url(version: GhacVersion) -> String {
343    match version {
344        GhacVersion::V1 => {
345            // Priority order for v1: CACHE_URL -> RESULTS_URL
346            env::var(ACTIONS_CACHE_URL)
347                .or_else(|_| env::var(ACTIONS_RESULTS_URL))
348                .unwrap_or_default()
349        }
350        GhacVersion::V2 => {
351            // Only RESULTS_URL is used for v2
352            env::var(ACTIONS_RESULTS_URL).unwrap_or_default()
353        }
354    }
355}
356
357/// Parse prost decode error into opendal::Error.
358pub fn new_prost_decode_error(e: prost::DecodeError) -> Error {
359    Error::new(ErrorKind::Unexpected, "deserialize protobuf").set_source(e)
360}
361
362#[derive(Deserialize)]
363#[serde(rename_all = "camelCase")]
364pub struct GhacQueryResponse {
365    // Not used fields.
366    // cache_key: String,
367    // scope: String,
368    pub archive_location: String,
369}
370
371#[derive(Serialize)]
372pub struct GhacReserveRequest {
373    pub key: String,
374    pub version: String,
375}
376
377#[derive(Deserialize)]
378#[serde(rename_all = "camelCase")]
379pub struct GhacReserveResponse {
380    pub cache_id: i64,
381}
382
383#[derive(Serialize)]
384pub struct GhacCommitRequest {
385    pub size: u64,
386}