opendal_core/services/ghac/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::env;
19use std::fmt::Debug;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use ::ghac::v1 as ghac_types;
24use bytes::Buf;
25use bytes::Bytes;
26use http::Request;
27use http::Response;
28use http::StatusCode;
29use http::Uri;
30use http::header::ACCEPT;
31use http::header::AUTHORIZATION;
32use http::header::CONTENT_LENGTH;
33use http::header::CONTENT_RANGE;
34use http::header::CONTENT_TYPE;
35use http::header::{self};
36use prost::Message;
37use serde::Deserialize;
38use serde::Serialize;
39
40use super::error::parse_error;
41use crate::raw::*;
42use crate::*;
43
44/// The base url for cache url.
45pub const CACHE_URL_BASE: &str = "_apis/artifactcache";
46/// The base url for cache service v2.
47pub const CACHE_URL_BASE_V2: &str = "twirp/github.actions.results.api.v1.CacheService";
48/// Cache API requires to provide an accept header.
49pub const CACHE_HEADER_ACCEPT: &str = "application/json;api-version=6.0-preview.1";
50/// The cache url env for ghac.
51///
52/// The url will be like `https://artifactcache.actions.githubusercontent.com/<id>/`
53pub const ACTIONS_CACHE_URL: &str = "ACTIONS_CACHE_URL";
54/// The runtime token env for ghac.
55///
56/// This token will be valid for 6h and github action will running for 6
57/// hours at most. So we don't need to refetch it again.
58pub const ACTIONS_RUNTIME_TOKEN: &str = "ACTIONS_RUNTIME_TOKEN";
59/// The cache service version env for ghac.
60pub const ACTIONS_CACHE_SERVICE_V2: &str = "ACTIONS_CACHE_SERVICE_V2";
61/// The results url env for ghac.
62pub const ACTIONS_RESULTS_URL: &str = "ACTIONS_RESULTS_URL";
63/// The content type for protobuf.
64pub const CONTENT_TYPE_JSON: &str = "application/json";
65/// The content type for protobuf.
66pub const CONTENT_TYPE_PROTOBUF: &str = "application/protobuf";
67
68/// The version of github action cache.
69#[derive(Clone, Copy, Debug)]
70pub enum GhacVersion {
71    V1,
72    V2,
73}
74
75/// Core for github action cache services.
76#[derive(Clone)]
77pub struct GhacCore {
78    pub info: Arc<AccessorInfo>,
79
80    // root should end with "/"
81    pub root: String,
82
83    pub cache_url: String,
84    pub catch_token: String,
85    pub version: String,
86
87    pub service_version: GhacVersion,
88}
89
90impl Debug for GhacCore {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.debug_struct("GhacCore")
93            .field("root", &self.root)
94            .field("cache_url", &self.cache_url)
95            .field("version", &self.version)
96            .field("service_version", &self.service_version)
97            .finish_non_exhaustive()
98    }
99}
100
101impl GhacCore {
102    async fn ghac_get_download_url(&self, path: &str) -> Result<String> {
103        let p = build_abs_path(&self.root, path);
104
105        match self.service_version {
106            GhacVersion::V1 => {
107                let url = format!(
108                    "{}{CACHE_URL_BASE}/cache?keys={}&version={}",
109                    self.cache_url,
110                    percent_encode_path(&p),
111                    self.version
112                );
113
114                let mut req = Request::get(&url);
115                req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
116                req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
117
118                let req = req
119                    .extension(Operation::Read)
120                    .body(Buffer::new())
121                    .map_err(new_request_build_error)?;
122                let resp = self.info.http_client().send(req).await?;
123                let location = if resp.status() == StatusCode::OK {
124                    let slc = resp.into_body();
125                    let query_resp: GhacQueryResponse = serde_json::from_reader(slc.reader())
126                        .map_err(new_json_deserialize_error)?;
127                    query_resp.archive_location
128                } else {
129                    return Err(parse_error(resp));
130                };
131                Ok(location)
132            }
133            GhacVersion::V2 => {
134                let url = format!(
135                    "{}{CACHE_URL_BASE_V2}/GetCacheEntryDownloadURL",
136                    self.cache_url,
137                );
138
139                let req = ghac_types::GetCacheEntryDownloadUrlRequest {
140                    key: p,
141                    version: self.version.clone(),
142
143                    metadata: None,
144                    restore_keys: vec![],
145                };
146
147                let body = Buffer::from(req.encode_to_vec());
148
149                let req = Request::post(&url)
150                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
151                    .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
152                    .header(CONTENT_LENGTH, body.len())
153                    .extension(Operation::Read)
154                    .body(body)
155                    .map_err(new_request_build_error)?;
156                let resp = self.info.http_client().send(req).await?;
157                let location = if resp.status() == StatusCode::OK {
158                    let slc = resp.into_body();
159                    let query_resp = ghac_types::GetCacheEntryDownloadUrlResponse::decode(slc)
160                        .map_err(new_prost_decode_error)?;
161                    if !query_resp.ok {
162                        let mut err = Error::new(
163                            ErrorKind::NotFound,
164                            "GetCacheEntryDownloadURL returns non-ok, the key doesn't exist",
165                        );
166
167                        // GHAC is a cache service, so it's acceptable for it to occasionally not contain
168                        // data that users have just written. However, we don't want users to always have
169                        // to retry reading it, nor do we want our CI to fail constantly.
170                        //
171                        // Here's the trick: we check if the environment variable `OPENDAL_TEST` is set to `ghac`.
172                        // If it is, we mark the error as temporary to allow retries in the test CI.
173                        if env::var("OPENDAL_TEST") == Ok("ghac".to_string()) {
174                            err = err.set_temporary();
175                        }
176                        return Err(err);
177                    }
178                    query_resp.signed_download_url
179                } else {
180                    return Err(parse_error(resp));
181                };
182
183                Ok(location)
184            }
185        }
186    }
187
188    pub async fn ghac_stat(&self, path: &str) -> Result<Response<Buffer>> {
189        let location = self.ghac_get_download_url(path).await?;
190
191        let req = Request::get(location)
192            .header(header::RANGE, "bytes=0-0")
193            .extension(Operation::Stat)
194            .body(Buffer::new())
195            .map_err(new_request_build_error)?;
196
197        self.info.http_client().send(req).await
198    }
199
200    pub async fn ghac_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
201        let location = self.ghac_get_download_url(path).await?;
202
203        let mut req = Request::get(location);
204
205        if !range.is_full() {
206            req = req.header(header::RANGE, range.to_header());
207        }
208        let req = req
209            .extension(Operation::Read)
210            .body(Buffer::new())
211            .map_err(new_request_build_error)?;
212
213        self.info.http_client().fetch(req).await
214    }
215
216    pub async fn ghac_get_upload_url(&self, path: &str) -> Result<String> {
217        let p = build_abs_path(&self.root, path);
218
219        match self.service_version {
220            GhacVersion::V1 => {
221                let url = format!("{}{CACHE_URL_BASE}/caches", self.cache_url);
222
223                let bs = serde_json::to_vec(&GhacReserveRequest {
224                    key: p,
225                    version: self.version.to_string(),
226                })
227                .map_err(new_json_serialize_error)?;
228
229                let mut req = Request::post(&url);
230                req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
231                req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
232                req = req.header(CONTENT_TYPE, CONTENT_TYPE_JSON);
233                req = req.header(CONTENT_LENGTH, bs.len());
234
235                let req = req
236                    .extension(Operation::Write)
237                    .body(Buffer::from(Bytes::from(bs)))
238                    .map_err(new_request_build_error)?;
239                let resp = self.info.http_client().send(req).await?;
240                let cache_id = if resp.status().is_success() {
241                    let slc = resp.into_body();
242                    let reserve_resp: GhacReserveResponse = serde_json::from_reader(slc.reader())
243                        .map_err(new_json_deserialize_error)?;
244                    reserve_resp.cache_id
245                } else {
246                    return Err(
247                        parse_error(resp).map(|err| err.with_operation("Backend::ghac_reserve"))
248                    );
249                };
250
251                let url = format!("{}{CACHE_URL_BASE}/caches/{cache_id}", self.cache_url);
252                Ok(url)
253            }
254            GhacVersion::V2 => {
255                let url = format!("{}{CACHE_URL_BASE_V2}/CreateCacheEntry", self.cache_url,);
256
257                let req = ghac_types::CreateCacheEntryRequest {
258                    key: p,
259                    version: self.version.clone(),
260                    metadata: None,
261                };
262
263                let body = Buffer::from(req.encode_to_vec());
264
265                let req = Request::post(&url)
266                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
267                    .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
268                    .header(CONTENT_LENGTH, body.len())
269                    .extension(Operation::Write)
270                    .body(body)
271                    .map_err(new_request_build_error)?;
272                let resp = self.info.http_client().send(req).await?;
273                let location = if resp.status() == StatusCode::OK {
274                    let (parts, slc) = resp.into_parts();
275                    let query_resp = ghac_types::CreateCacheEntryResponse::decode(slc)
276                        .map_err(new_prost_decode_error)?;
277                    if !query_resp.ok {
278                        return Err(Error::new(
279                            ErrorKind::Unexpected,
280                            "create cache entry returns non-ok",
281                        )
282                        .with_context("parts", format!("{parts:?}")));
283                    }
284                    query_resp.signed_upload_url
285                } else {
286                    return Err(parse_error(resp));
287                };
288                Ok(location)
289            }
290        }
291    }
292
293    pub async fn ghac_v1_write(
294        &self,
295        upload_url: &str,
296        size: u64,
297        offset: u64,
298        body: Buffer,
299    ) -> Result<Response<Buffer>> {
300        let mut req = Request::patch(upload_url);
301        req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
302        req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
303        req = req.header(CONTENT_LENGTH, size);
304        req = req.header(CONTENT_TYPE, "application/octet-stream");
305        req = req.header(
306            CONTENT_RANGE,
307            BytesContentRange::default()
308                .with_range(offset, offset + size - 1)
309                .to_header(),
310        );
311        let req = req
312            .extension(Operation::Write)
313            .body(body)
314            .map_err(new_request_build_error)?;
315
316        self.info.http_client().send(req).await
317    }
318
319    pub async fn ghac_finalize_upload(&self, path: &str, url: &str, size: u64) -> Result<()> {
320        let p = build_abs_path(&self.root, path);
321
322        match self.service_version {
323            GhacVersion::V1 => {
324                let bs = serde_json::to_vec(&GhacCommitRequest { size })
325                    .map_err(new_json_serialize_error)?;
326
327                let req = Request::post(url)
328                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
329                    .header(ACCEPT, CACHE_HEADER_ACCEPT)
330                    .header(CONTENT_TYPE, CONTENT_TYPE_JSON)
331                    .header(CONTENT_LENGTH, bs.len())
332                    .extension(Operation::Write)
333                    .body(Buffer::from(bs))
334                    .map_err(new_request_build_error)?;
335                let resp = self.info.http_client().send(req).await?;
336                if resp.status().is_success() {
337                    Ok(())
338                } else {
339                    Err(parse_error(resp))
340                }
341            }
342            GhacVersion::V2 => {
343                let url = format!(
344                    "{}{CACHE_URL_BASE_V2}/FinalizeCacheEntryUpload",
345                    self.cache_url,
346                );
347
348                let req = ghac_types::FinalizeCacheEntryUploadRequest {
349                    key: p,
350                    version: self.version.clone(),
351                    size_bytes: size as i64,
352
353                    metadata: None,
354                };
355                let body = Buffer::from(req.encode_to_vec());
356
357                let req = Request::post(&url)
358                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
359                    .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
360                    .header(CONTENT_LENGTH, body.len())
361                    .extension(Operation::Write)
362                    .body(body)
363                    .map_err(new_request_build_error)?;
364                let resp = self.info.http_client().send(req).await?;
365                if resp.status() != StatusCode::OK {
366                    return Err(parse_error(resp));
367                };
368                Ok(())
369            }
370        }
371    }
372}
373
374/// Determines if the current environment is GitHub Enterprise Server (GHES)
375///
376/// We need to know this since GHES doesn't support ghac v2 yet.
377pub fn is_ghes() -> bool {
378    // Fetch GitHub Server URL with fallback to "https://github.com"
379    let server_url =
380        env::var("GITHUB_SERVER_URL").unwrap_or_else(|_| "https://github.com".to_string());
381
382    let Ok(url) = Uri::from_str(&server_url) else {
383        // We just return false if the URL is invalid
384        return false;
385    };
386
387    // Check against known non-GHES host patterns
388    let hostname = url.host().unwrap_or("").trim_end().to_lowercase();
389
390    let is_github_host = hostname == "github.com";
391    let is_ghe_host = hostname.ends_with(".ghe.com");
392    let is_localhost = hostname.ends_with(".localhost");
393
394    !is_github_host && !is_ghe_host && !is_localhost
395}
396
397/// Determines the cache service version based on environment
398pub fn get_cache_service_version() -> GhacVersion {
399    if is_ghes() {
400        // GHES only supports v1 regardless of feature flags
401        GhacVersion::V1
402    } else {
403        // Check for presence of non-empty ACTIONS_CACHE_SERVICE_V2
404        let value = env::var(ACTIONS_CACHE_SERVICE_V2).unwrap_or_default();
405        if value.is_empty() {
406            GhacVersion::V1
407        } else {
408            GhacVersion::V2
409        }
410    }
411}
412
413/// Returns the appropriate cache service URL based on version
414pub fn get_cache_service_url(version: GhacVersion) -> String {
415    match version {
416        GhacVersion::V1 => {
417            // Priority order for v1: CACHE_URL -> RESULTS_URL
418            env::var(ACTIONS_CACHE_URL)
419                .or_else(|_| env::var(ACTIONS_RESULTS_URL))
420                .unwrap_or_default()
421        }
422        GhacVersion::V2 => {
423            // Only RESULTS_URL is used for v2
424            env::var(ACTIONS_RESULTS_URL).unwrap_or_default()
425        }
426    }
427}
428
429/// Parse prost decode error into opendal::Error.
430pub fn new_prost_decode_error(e: prost::DecodeError) -> Error {
431    Error::new(ErrorKind::Unexpected, "deserialize protobuf").set_source(e)
432}
433
434#[derive(Deserialize)]
435#[serde(rename_all = "camelCase")]
436pub struct GhacQueryResponse {
437    // Not used fields.
438    // cache_key: String,
439    // scope: String,
440    pub archive_location: String,
441}
442
443#[derive(Serialize)]
444pub struct GhacReserveRequest {
445    pub key: String,
446    pub version: String,
447}
448
449#[derive(Deserialize)]
450#[serde(rename_all = "camelCase")]
451pub struct GhacReserveResponse {
452    pub cache_id: i64,
453}
454
455#[derive(Serialize)]
456pub struct GhacCommitRequest {
457    pub size: u64,
458}