opendal/services/ghac/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::env;
19use std::fmt::{Debug, Formatter};
20use std::str::FromStr;
21use std::sync::Arc;
22
23use ::ghac::v1 as ghac_types;
24use bytes::{Buf, Bytes};
25use http::header::{self, ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE};
26use http::{Request, Response, StatusCode, Uri};
27use prost::Message;
28use serde::{Deserialize, Serialize};
29
30use super::error::parse_error;
31use crate::raw::*;
32use crate::*;
33
34/// The base url for cache url.
35pub const CACHE_URL_BASE: &str = "_apis/artifactcache";
36/// The base url for cache service v2.
37pub const CACHE_URL_BASE_V2: &str = "twirp/github.actions.results.api.v1.CacheService";
38/// Cache API requires to provide an accept header.
39pub const CACHE_HEADER_ACCEPT: &str = "application/json;api-version=6.0-preview.1";
40/// The cache url env for ghac.
41///
42/// The url will be like `https://artifactcache.actions.githubusercontent.com/<id>/`
43pub const ACTIONS_CACHE_URL: &str = "ACTIONS_CACHE_URL";
44/// The runtime token env for ghac.
45///
46/// This token will be valid for 6h and github action will running for 6
47/// hours at most. So we don't need to refetch it again.
48pub const ACTIONS_RUNTIME_TOKEN: &str = "ACTIONS_RUNTIME_TOKEN";
49/// The cache service version env for ghac.
50pub const ACTIONS_CACHE_SERVICE_V2: &str = "ACTIONS_CACHE_SERVICE_V2";
51/// The results url env for ghac.
52pub const ACTIONS_RESULTS_URL: &str = "ACTIONS_RESULTS_URL";
53/// The content type for protobuf.
54pub const CONTENT_TYPE_JSON: &str = "application/json";
55/// The content type for protobuf.
56pub const CONTENT_TYPE_PROTOBUF: &str = "application/protobuf";
57
58/// The version of github action cache.
59#[derive(Clone, Copy, Debug)]
60pub enum GhacVersion {
61    V1,
62    V2,
63}
64
65/// Core for github action cache services.
66#[derive(Clone)]
67pub struct GhacCore {
68    pub info: Arc<AccessorInfo>,
69
70    // root should end with "/"
71    pub root: String,
72
73    pub cache_url: String,
74    pub catch_token: String,
75    pub version: String,
76
77    pub service_version: GhacVersion,
78}
79
80impl Debug for GhacCore {
81    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
82        f.debug_struct("GhacCore")
83            .field("root", &self.root)
84            .field("cache_url", &self.cache_url)
85            .field("version", &self.version)
86            .field("service_version", &self.service_version)
87            .finish_non_exhaustive()
88    }
89}
90
91impl GhacCore {
92    async fn ghac_get_download_url(&self, path: &str) -> Result<String> {
93        let p = build_abs_path(&self.root, path);
94
95        match self.service_version {
96            GhacVersion::V1 => {
97                let url = format!(
98                    "{}{CACHE_URL_BASE}/cache?keys={}&version={}",
99                    self.cache_url,
100                    percent_encode_path(&p),
101                    self.version
102                );
103
104                let mut req = Request::get(&url);
105                req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
106                req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
107
108                let req = req
109                    .extension(Operation::Read)
110                    .body(Buffer::new())
111                    .map_err(new_request_build_error)?;
112                let resp = self.info.http_client().send(req).await?;
113                let location = if resp.status() == StatusCode::OK {
114                    let slc = resp.into_body();
115                    let query_resp: GhacQueryResponse = serde_json::from_reader(slc.reader())
116                        .map_err(new_json_deserialize_error)?;
117                    query_resp.archive_location
118                } else {
119                    return Err(parse_error(resp));
120                };
121                Ok(location)
122            }
123            GhacVersion::V2 => {
124                let url = format!(
125                    "{}{CACHE_URL_BASE_V2}/GetCacheEntryDownloadURL",
126                    self.cache_url,
127                );
128
129                let req = ghac_types::GetCacheEntryDownloadUrlRequest {
130                    key: p,
131                    version: self.version.clone(),
132
133                    metadata: None,
134                    restore_keys: vec![],
135                };
136
137                let body = Buffer::from(req.encode_to_vec());
138
139                let req = Request::post(&url)
140                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
141                    .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
142                    .header(CONTENT_LENGTH, body.len())
143                    .extension(Operation::Read)
144                    .body(body)
145                    .map_err(new_request_build_error)?;
146                let resp = self.info.http_client().send(req).await?;
147                let location = if resp.status() == StatusCode::OK {
148                    let slc = resp.into_body();
149                    let query_resp = ghac_types::GetCacheEntryDownloadUrlResponse::decode(slc)
150                        .map_err(new_prost_decode_error)?;
151                    if !query_resp.ok {
152                        let mut err = Error::new(
153                            ErrorKind::NotFound,
154                            "GetCacheEntryDownloadURL returns non-ok, the key doesn't exist",
155                        );
156
157                        // GHAC is a cache service, so it's acceptable for it to occasionally not contain
158                        // data that users have just written. However, we don't want users to always have
159                        // to retry reading it, nor do we want our CI to fail constantly.
160                        //
161                        // Here's the trick: we check if the environment variable `OPENDAL_TEST` is set to `ghac`.
162                        // If it is, we mark the error as temporary to allow retries in the test CI.
163                        if env::var("OPENDAL_TEST") == Ok("ghac".to_string()) {
164                            err = err.set_temporary();
165                        }
166                        return Err(err);
167                    }
168                    query_resp.signed_download_url
169                } else {
170                    return Err(parse_error(resp));
171                };
172
173                Ok(location)
174            }
175        }
176    }
177
178    pub async fn ghac_stat(&self, path: &str) -> Result<Response<Buffer>> {
179        let location = self.ghac_get_download_url(path).await?;
180
181        let req = Request::get(location)
182            .header(header::RANGE, "bytes=0-0")
183            .extension(Operation::Stat)
184            .body(Buffer::new())
185            .map_err(new_request_build_error)?;
186
187        self.info.http_client().send(req).await
188    }
189
190    pub async fn ghac_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
191        let location = self.ghac_get_download_url(path).await?;
192
193        let mut req = Request::get(location);
194
195        if !range.is_full() {
196            req = req.header(header::RANGE, range.to_header());
197        }
198        let req = req
199            .extension(Operation::Read)
200            .body(Buffer::new())
201            .map_err(new_request_build_error)?;
202
203        self.info.http_client().fetch(req).await
204    }
205
206    pub async fn ghac_get_upload_url(&self, path: &str) -> Result<String> {
207        let p = build_abs_path(&self.root, path);
208
209        match self.service_version {
210            GhacVersion::V1 => {
211                let url = format!("{}{CACHE_URL_BASE}/caches", self.cache_url);
212
213                let bs = serde_json::to_vec(&GhacReserveRequest {
214                    key: p,
215                    version: self.version.to_string(),
216                })
217                .map_err(new_json_serialize_error)?;
218
219                let mut req = Request::post(&url);
220                req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
221                req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
222                req = req.header(CONTENT_TYPE, CONTENT_TYPE_JSON);
223                req = req.header(CONTENT_LENGTH, bs.len());
224
225                let req = req
226                    .extension(Operation::Write)
227                    .body(Buffer::from(Bytes::from(bs)))
228                    .map_err(new_request_build_error)?;
229                let resp = self.info.http_client().send(req).await?;
230                let cache_id = if resp.status().is_success() {
231                    let slc = resp.into_body();
232                    let reserve_resp: GhacReserveResponse = serde_json::from_reader(slc.reader())
233                        .map_err(new_json_deserialize_error)?;
234                    reserve_resp.cache_id
235                } else {
236                    return Err(
237                        parse_error(resp).map(|err| err.with_operation("Backend::ghac_reserve"))
238                    );
239                };
240
241                let url = format!("{}{CACHE_URL_BASE}/caches/{cache_id}", self.cache_url);
242                Ok(url)
243            }
244            GhacVersion::V2 => {
245                let url = format!("{}{CACHE_URL_BASE_V2}/CreateCacheEntry", self.cache_url,);
246
247                let req = ghac_types::CreateCacheEntryRequest {
248                    key: p,
249                    version: self.version.clone(),
250                    metadata: None,
251                };
252
253                let body = Buffer::from(req.encode_to_vec());
254
255                let req = Request::post(&url)
256                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
257                    .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
258                    .header(CONTENT_LENGTH, body.len())
259                    .extension(Operation::Write)
260                    .body(body)
261                    .map_err(new_request_build_error)?;
262                let resp = self.info.http_client().send(req).await?;
263                let location = if resp.status() == StatusCode::OK {
264                    let (parts, slc) = resp.into_parts();
265                    let query_resp = ghac_types::CreateCacheEntryResponse::decode(slc)
266                        .map_err(new_prost_decode_error)?;
267                    if !query_resp.ok {
268                        return Err(Error::new(
269                            ErrorKind::Unexpected,
270                            "create cache entry returns non-ok",
271                        )
272                        .with_context("parts", format!("{:?}", parts)));
273                    }
274                    query_resp.signed_upload_url
275                } else {
276                    return Err(parse_error(resp));
277                };
278                Ok(location)
279            }
280        }
281    }
282
283    pub async fn ghac_v1_write(
284        &self,
285        upload_url: &str,
286        size: u64,
287        offset: u64,
288        body: Buffer,
289    ) -> Result<Response<Buffer>> {
290        let mut req = Request::patch(upload_url);
291        req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
292        req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
293        req = req.header(CONTENT_LENGTH, size);
294        req = req.header(CONTENT_TYPE, "application/octet-stream");
295        req = req.header(
296            CONTENT_RANGE,
297            BytesContentRange::default()
298                .with_range(offset, offset + size - 1)
299                .to_header(),
300        );
301        let req = req
302            .extension(Operation::Write)
303            .body(body)
304            .map_err(new_request_build_error)?;
305
306        self.info.http_client().send(req).await
307    }
308
309    pub async fn ghac_finalize_upload(&self, path: &str, url: &str, size: u64) -> Result<()> {
310        let p = build_abs_path(&self.root, path);
311
312        match self.service_version {
313            GhacVersion::V1 => {
314                let bs = serde_json::to_vec(&GhacCommitRequest { size })
315                    .map_err(new_json_serialize_error)?;
316
317                let req = Request::post(url)
318                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
319                    .header(ACCEPT, CACHE_HEADER_ACCEPT)
320                    .header(CONTENT_TYPE, CONTENT_TYPE_JSON)
321                    .header(CONTENT_LENGTH, bs.len())
322                    .extension(Operation::Write)
323                    .body(Buffer::from(bs))
324                    .map_err(new_request_build_error)?;
325                let resp = self.info.http_client().send(req).await?;
326                if resp.status().is_success() {
327                    Ok(())
328                } else {
329                    Err(parse_error(resp))
330                }
331            }
332            GhacVersion::V2 => {
333                let url = format!(
334                    "{}{CACHE_URL_BASE_V2}/FinalizeCacheEntryUpload",
335                    self.cache_url,
336                );
337
338                let req = ghac_types::FinalizeCacheEntryUploadRequest {
339                    key: p,
340                    version: self.version.clone(),
341                    size_bytes: size as i64,
342
343                    metadata: None,
344                };
345                let body = Buffer::from(req.encode_to_vec());
346
347                let req = Request::post(&url)
348                    .header(AUTHORIZATION, format!("Bearer {}", self.catch_token))
349                    .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF)
350                    .header(CONTENT_LENGTH, body.len())
351                    .extension(Operation::Write)
352                    .body(body)
353                    .map_err(new_request_build_error)?;
354                let resp = self.info.http_client().send(req).await?;
355                if resp.status() != StatusCode::OK {
356                    return Err(parse_error(resp));
357                };
358                Ok(())
359            }
360        }
361    }
362}
363
364/// Determines if the current environment is GitHub Enterprise Server (GHES)
365///
366/// We need to know this since GHES doesn't support ghac v2 yet.
367pub fn is_ghes() -> bool {
368    // Fetch GitHub Server URL with fallback to "https://github.com"
369    let server_url =
370        env::var("GITHUB_SERVER_URL").unwrap_or_else(|_| "https://github.com".to_string());
371
372    let Ok(url) = Uri::from_str(&server_url) else {
373        // We just return false if the URL is invalid
374        return false;
375    };
376
377    // Check against known non-GHES host patterns
378    let hostname = url.host().unwrap_or("").trim_end().to_lowercase();
379
380    let is_github_host = hostname == "github.com";
381    let is_ghe_host = hostname.ends_with(".ghe.com");
382    let is_localhost = hostname.ends_with(".localhost");
383
384    !is_github_host && !is_ghe_host && !is_localhost
385}
386
387/// Determines the cache service version based on environment
388pub fn get_cache_service_version() -> GhacVersion {
389    if is_ghes() {
390        // GHES only supports v1 regardless of feature flags
391        GhacVersion::V1
392    } else {
393        // Check for presence of non-empty ACTIONS_CACHE_SERVICE_V2
394        let value = env::var(ACTIONS_CACHE_SERVICE_V2).unwrap_or_default();
395        if value.is_empty() {
396            GhacVersion::V1
397        } else {
398            GhacVersion::V2
399        }
400    }
401}
402
403/// Returns the appropriate cache service URL based on version
404pub fn get_cache_service_url(version: GhacVersion) -> String {
405    match version {
406        GhacVersion::V1 => {
407            // Priority order for v1: CACHE_URL -> RESULTS_URL
408            env::var(ACTIONS_CACHE_URL)
409                .or_else(|_| env::var(ACTIONS_RESULTS_URL))
410                .unwrap_or_default()
411        }
412        GhacVersion::V2 => {
413            // Only RESULTS_URL is used for v2
414            env::var(ACTIONS_RESULTS_URL).unwrap_or_default()
415        }
416    }
417}
418
419/// Parse prost decode error into opendal::Error.
420pub fn new_prost_decode_error(e: prost::DecodeError) -> Error {
421    Error::new(ErrorKind::Unexpected, "deserialize protobuf").set_source(e)
422}
423
424#[derive(Deserialize)]
425#[serde(rename_all = "camelCase")]
426pub struct GhacQueryResponse {
427    // Not used fields.
428    // cache_key: String,
429    // scope: String,
430    pub archive_location: String,
431}
432
433#[derive(Serialize)]
434pub struct GhacReserveRequest {
435    pub key: String,
436    pub version: String,
437}
438
439#[derive(Deserialize)]
440#[serde(rename_all = "camelCase")]
441pub struct GhacReserveResponse {
442    pub cache_id: i64,
443}
444
445#[derive(Serialize)]
446pub struct GhacCommitRequest {
447    pub size: u64,
448}