opendal/services/ghac/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::env;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use ::ghac::v1 as ghac_types;
25use bytes::Buf;
26use bytes::Bytes;
27use http::header::ACCEPT;
28use http::header::AUTHORIZATION;
29use http::header::CONTENT_LENGTH;
30use http::header::CONTENT_RANGE;
31use http::header::CONTENT_TYPE;
32use http::header::{self};
33use http::Request;
34use http::Response;
35use http::StatusCode;
36use http::Uri;
37use prost::Message;
38use serde::Deserialize;
39use serde::Serialize;
40
41use super::error::parse_error;
42use crate::raw::*;
43use crate::*;
44
45/// The base url for cache url.
46pub const CACHE_URL_BASE: &str = "_apis/artifactcache";
47/// The base url for cache service v2.
48pub const CACHE_URL_BASE_V2: &str = "twirp/github.actions.results.api.v1.CacheService";
49/// Cache API requires to provide an accept header.
50pub const CACHE_HEADER_ACCEPT: &str = "application/json;api-version=6.0-preview.1";
51/// The cache url env for ghac.
52///
53/// The url will be like `https://artifactcache.actions.githubusercontent.com/<id>/`
54pub const ACTIONS_CACHE_URL: &str = "ACTIONS_CACHE_URL";
55/// The runtime token env for ghac.
56///
57/// This token will be valid for 6h and github action will running for 6
58/// hours at most. So we don't need to refetch it again.
59pub const ACTIONS_RUNTIME_TOKEN: &str = "ACTIONS_RUNTIME_TOKEN";
60/// The cache service version env for ghac.
61pub const ACTIONS_CACHE_SERVICE_V2: &str = "ACTIONS_CACHE_SERVICE_V2";
62/// The results url env for ghac.
63pub const ACTIONS_RESULTS_URL: &str = "ACTIONS_RESULTS_URL";
64/// The content type for protobuf.
65pub const CONTENT_TYPE_JSON: &str = "application/json";
66/// The content type for protobuf.
67pub const CONTENT_TYPE_PROTOBUF: &str = "application/protobuf";
68
69/// The version of github action cache.
70#[derive(Clone, Copy, Debug)]
71pub enum GhacVersion {
72    V1,
73    V2,
74}
75
76/// Core for github action cache services.
77#[derive(Clone)]
78pub struct GhacCore {
79    pub info: Arc<AccessorInfo>,
80
81    // root should end with "/"
82    pub root: String,
83
84    pub cache_url: String,
85    pub catch_token: String,
86    pub version: String,
87
88    pub service_version: GhacVersion,
89}
90
91impl Debug for GhacCore {
92    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
93        f.debug_struct("GhacCore")
94            .field("root", &self.root)
95            .field("cache_url", &self.cache_url)
96            .field("version", &self.version)
97            .field("service_version", &self.service_version)
98            .finish_non_exhaustive()
99    }
100}
101
102impl GhacCore {
103    async fn ghac_get_download_url(&self, path: &str) -> Result<String> {
104        let p = build_abs_path(&self.root, path);
105
106        match self.service_version {
107            GhacVersion::V1 => {
108                let url = format!(
109                    "{}{CACHE_URL_BASE}/cache?keys={}&version={}",
110                    self.cache_url,
111                    percent_encode_path(&p),
112                    self.version
113                );
114
115                let mut req = Request::get(&url);
116                req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
117                req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
118
119                let req = req
120                    .extension(Operation::Read)
121                    .body(Buffer::new())
122                    .map_err(new_request_build_error)?;
123                let resp = self.info.http_client().send(req).await?;
124                let location = if resp.status() == StatusCode::OK {
125                    let slc = resp.into_body();
126                    let query_resp: GhacQueryResponse = serde_json::from_reader(slc.reader())
127                        .map_err(new_json_deserialize_error)?;
128                    query_resp.archive_location
129                } else {
130                    return Err(parse_error(resp));
131                };
132                Ok(location)
133            }
134            GhacVersion::V2 => {
135                let url = format!(
136                    "{}{CACHE_URL_BASE_V2}/GetCacheEntryDownloadURL",
137                    self.cache_url,
138                );
139
140                let req = ghac_types::GetCacheEntryDownloadUrlRequest {
141                    key: p,
142                    version: self.version.clone(),
143
144                    metadata: None,
145                    restore_keys: vec![],
146                };
147
148                let body = Buffer::from(req.encode_to_vec());
149
150                let req = Request::post(&url)
151                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
152                    .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
153                    .header(CONTENT_LENGTH, body.len())
154                    .extension(Operation::Read)
155                    .body(body)
156                    .map_err(new_request_build_error)?;
157                let resp = self.info.http_client().send(req).await?;
158                let location = if resp.status() == StatusCode::OK {
159                    let slc = resp.into_body();
160                    let query_resp = ghac_types::GetCacheEntryDownloadUrlResponse::decode(slc)
161                        .map_err(new_prost_decode_error)?;
162                    if !query_resp.ok {
163                        let mut err = Error::new(
164                            ErrorKind::NotFound,
165                            "GetCacheEntryDownloadURL returns non-ok, the key doesn't exist",
166                        );
167
168                        // GHAC is a cache service, so it's acceptable for it to occasionally not contain
169                        // data that users have just written. However, we don't want users to always have
170                        // to retry reading it, nor do we want our CI to fail constantly.
171                        //
172                        // Here's the trick: we check if the environment variable `OPENDAL_TEST` is set to `ghac`.
173                        // If it is, we mark the error as temporary to allow retries in the test CI.
174                        if env::var("OPENDAL_TEST") == Ok("ghac".to_string()) {
175                            err = err.set_temporary();
176                        }
177                        return Err(err);
178                    }
179                    query_resp.signed_download_url
180                } else {
181                    return Err(parse_error(resp));
182                };
183
184                Ok(location)
185            }
186        }
187    }
188
189    pub async fn ghac_stat(&self, path: &str) -> Result<Response<Buffer>> {
190        let location = self.ghac_get_download_url(path).await?;
191
192        let req = Request::get(location)
193            .header(header::RANGE, "bytes=0-0")
194            .extension(Operation::Stat)
195            .body(Buffer::new())
196            .map_err(new_request_build_error)?;
197
198        self.info.http_client().send(req).await
199    }
200
201    pub async fn ghac_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
202        let location = self.ghac_get_download_url(path).await?;
203
204        let mut req = Request::get(location);
205
206        if !range.is_full() {
207            req = req.header(header::RANGE, range.to_header());
208        }
209        let req = req
210            .extension(Operation::Read)
211            .body(Buffer::new())
212            .map_err(new_request_build_error)?;
213
214        self.info.http_client().fetch(req).await
215    }
216
217    pub async fn ghac_get_upload_url(&self, path: &str) -> Result<String> {
218        let p = build_abs_path(&self.root, path);
219
220        match self.service_version {
221            GhacVersion::V1 => {
222                let url = format!("{}{CACHE_URL_BASE}/caches", self.cache_url);
223
224                let bs = serde_json::to_vec(&GhacReserveRequest {
225                    key: p,
226                    version: self.version.to_string(),
227                })
228                .map_err(new_json_serialize_error)?;
229
230                let mut req = Request::post(&url);
231                req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
232                req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
233                req = req.header(CONTENT_TYPE, CONTENT_TYPE_JSON);
234                req = req.header(CONTENT_LENGTH, bs.len());
235
236                let req = req
237                    .extension(Operation::Write)
238                    .body(Buffer::from(Bytes::from(bs)))
239                    .map_err(new_request_build_error)?;
240                let resp = self.info.http_client().send(req).await?;
241                let cache_id = if resp.status().is_success() {
242                    let slc = resp.into_body();
243                    let reserve_resp: GhacReserveResponse = serde_json::from_reader(slc.reader())
244                        .map_err(new_json_deserialize_error)?;
245                    reserve_resp.cache_id
246                } else {
247                    return Err(
248                        parse_error(resp).map(|err| err.with_operation("Backend::ghac_reserve"))
249                    );
250                };
251
252                let url = format!("{}{CACHE_URL_BASE}/caches/{cache_id}", self.cache_url);
253                Ok(url)
254            }
255            GhacVersion::V2 => {
256                let url = format!("{}{CACHE_URL_BASE_V2}/CreateCacheEntry", self.cache_url,);
257
258                let req = ghac_types::CreateCacheEntryRequest {
259                    key: p,
260                    version: self.version.clone(),
261                    metadata: None,
262                };
263
264                let body = Buffer::from(req.encode_to_vec());
265
266                let req = Request::post(&url)
267                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
268                    .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
269                    .header(CONTENT_LENGTH, body.len())
270                    .extension(Operation::Write)
271                    .body(body)
272                    .map_err(new_request_build_error)?;
273                let resp = self.info.http_client().send(req).await?;
274                let location = if resp.status() == StatusCode::OK {
275                    let (parts, slc) = resp.into_parts();
276                    let query_resp = ghac_types::CreateCacheEntryResponse::decode(slc)
277                        .map_err(new_prost_decode_error)?;
278                    if !query_resp.ok {
279                        return Err(Error::new(
280                            ErrorKind::Unexpected,
281                            "create cache entry returns non-ok",
282                        )
283                        .with_context("parts", format!("{:?}", parts)));
284                    }
285                    query_resp.signed_upload_url
286                } else {
287                    return Err(parse_error(resp));
288                };
289                Ok(location)
290            }
291        }
292    }
293
294    pub async fn ghac_v1_write(
295        &self,
296        upload_url: &str,
297        size: u64,
298        offset: u64,
299        body: Buffer,
300    ) -> Result<Response<Buffer>> {
301        let mut req = Request::patch(upload_url);
302        req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
303        req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
304        req = req.header(CONTENT_LENGTH, size);
305        req = req.header(CONTENT_TYPE, "application/octet-stream");
306        req = req.header(
307            CONTENT_RANGE,
308            BytesContentRange::default()
309                .with_range(offset, offset + size - 1)
310                .to_header(),
311        );
312        let req = req
313            .extension(Operation::Write)
314            .body(body)
315            .map_err(new_request_build_error)?;
316
317        self.info.http_client().send(req).await
318    }
319
320    pub async fn ghac_finalize_upload(&self, path: &str, url: &str, size: u64) -> Result<()> {
321        let p = build_abs_path(&self.root, path);
322
323        match self.service_version {
324            GhacVersion::V1 => {
325                let bs = serde_json::to_vec(&GhacCommitRequest { size })
326                    .map_err(new_json_serialize_error)?;
327
328                let req = Request::post(url)
329                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
330                    .header(ACCEPT, CACHE_HEADER_ACCEPT)
331                    .header(CONTENT_TYPE, CONTENT_TYPE_JSON)
332                    .header(CONTENT_LENGTH, bs.len())
333                    .extension(Operation::Write)
334                    .body(Buffer::from(bs))
335                    .map_err(new_request_build_error)?;
336                let resp = self.info.http_client().send(req).await?;
337                if resp.status().is_success() {
338                    Ok(())
339                } else {
340                    Err(parse_error(resp))
341                }
342            }
343            GhacVersion::V2 => {
344                let url = format!(
345                    "{}{CACHE_URL_BASE_V2}/FinalizeCacheEntryUpload",
346                    self.cache_url,
347                );
348
349                let req = ghac_types::FinalizeCacheEntryUploadRequest {
350                    key: p,
351                    version: self.version.clone(),
352                    size_bytes: size as i64,
353
354                    metadata: None,
355                };
356                let body = Buffer::from(req.encode_to_vec());
357
358                let req = Request::post(&url)
359                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
360                    .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
361                    .header(CONTENT_LENGTH, body.len())
362                    .extension(Operation::Write)
363                    .body(body)
364                    .map_err(new_request_build_error)?;
365                let resp = self.info.http_client().send(req).await?;
366                if resp.status() != StatusCode::OK {
367                    return Err(parse_error(resp));
368                };
369                Ok(())
370            }
371        }
372    }
373}
374
375/// Determines if the current environment is GitHub Enterprise Server (GHES)
376///
377/// We need to know this since GHES doesn't support ghac v2 yet.
378pub fn is_ghes() -> bool {
379    // Fetch GitHub Server URL with fallback to "https://github.com"
380    let server_url =
381        env::var("GITHUB_SERVER_URL").unwrap_or_else(|_| "https://github.com".to_string());
382
383    let Ok(url) = Uri::from_str(&server_url) else {
384        // We just return false if the URL is invalid
385        return false;
386    };
387
388    // Check against known non-GHES host patterns
389    let hostname = url.host().unwrap_or("").trim_end().to_lowercase();
390
391    let is_github_host = hostname == "github.com";
392    let is_ghe_host = hostname.ends_with(".ghe.com");
393    let is_localhost = hostname.ends_with(".localhost");
394
395    !is_github_host && !is_ghe_host && !is_localhost
396}
397
398/// Determines the cache service version based on environment
399pub fn get_cache_service_version() -> GhacVersion {
400    if is_ghes() {
401        // GHES only supports v1 regardless of feature flags
402        GhacVersion::V1
403    } else {
404        // Check for presence of non-empty ACTIONS_CACHE_SERVICE_V2
405        let value = env::var(ACTIONS_CACHE_SERVICE_V2).unwrap_or_default();
406        if value.is_empty() {
407            GhacVersion::V1
408        } else {
409            GhacVersion::V2
410        }
411    }
412}
413
414/// Returns the appropriate cache service URL based on version
415pub fn get_cache_service_url(version: GhacVersion) -> String {
416    match version {
417        GhacVersion::V1 => {
418            // Priority order for v1: CACHE_URL -> RESULTS_URL
419            env::var(ACTIONS_CACHE_URL)
420                .or_else(|_| env::var(ACTIONS_RESULTS_URL))
421                .unwrap_or_default()
422        }
423        GhacVersion::V2 => {
424            // Only RESULTS_URL is used for v2
425            env::var(ACTIONS_RESULTS_URL).unwrap_or_default()
426        }
427    }
428}
429
430/// Parse prost decode error into opendal::Error.
431pub fn new_prost_decode_error(e: prost::DecodeError) -> Error {
432    Error::new(ErrorKind::Unexpected, "deserialize protobuf").set_source(e)
433}
434
435#[derive(Deserialize)]
436#[serde(rename_all = "camelCase")]
437pub struct GhacQueryResponse {
438    // Not used fields.
439    // cache_key: String,
440    // scope: String,
441    pub archive_location: String,
442}
443
444#[derive(Serialize)]
445pub struct GhacReserveRequest {
446    pub key: String,
447    pub version: String,
448}
449
450#[derive(Deserialize)]
451#[serde(rename_all = "camelCase")]
452pub struct GhacReserveResponse {
453    pub cache_id: i64,
454}
455
456#[derive(Serialize)]
457pub struct GhacCommitRequest {
458    pub size: u64,
459}