use std::env;
use std::sync::Arc;
use bytes::Buf;
use bytes::Bytes;
use http::header;
use http::header::ACCEPT;
use http::header::AUTHORIZATION;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_RANGE;
use http::header::CONTENT_TYPE;
use http::header::USER_AGENT;
use http::Request;
use http::Response;
use http::StatusCode;
use log::debug;
use serde::Deserialize;
use serde::Serialize;
use super::error::parse_error;
use super::writer::GhacWriter;
use crate::raw::*;
use crate::services::GhacConfig;
use crate::*;
const CACHE_URL_BASE: &str = "_apis/artifactcache";
const CACHE_HEADER_ACCEPT: &str = "application/json;api-version=6.0-preview.1";
const ACTIONS_CACHE_URL: &str = "ACTIONS_CACHE_URL";
const ACTIONS_RUNTIME_TOKEN: &str = "ACTIONS_RUNTIME_TOKEN";
const GITHUB_TOKEN: &str = "GITHUB_TOKEN";
const GITHUB_API_URL: &str = "GITHUB_API_URL";
const GITHUB_REPOSITORY: &str = "GITHUB_REPOSITORY";
const GITHUB_API_VERSION: &str = "2022-11-28";
fn value_or_env(
explicit_value: Option<String>,
env_var_name: &str,
operation: &'static str,
) -> Result<String> {
if let Some(value) = explicit_value {
return Ok(value);
}
env::var(env_var_name).map_err(|err| {
let text = format!(
"{} not found, maybe not in github action environment?",
env_var_name
);
Error::new(ErrorKind::ConfigInvalid, text)
.with_operation(operation)
.set_source(err)
})
}
impl Configurator for GhacConfig {
type Builder = GhacBuilder;
fn into_builder(self) -> Self::Builder {
GhacBuilder {
config: self,
http_client: None,
}
}
}
#[doc = include_str!("docs.md")]
#[derive(Debug, Default)]
pub struct GhacBuilder {
config: GhacConfig,
http_client: Option<HttpClient>,
}
impl GhacBuilder {
pub fn root(mut self, root: &str) -> Self {
self.config.root = if root.is_empty() {
None
} else {
Some(root.to_string())
};
self
}
pub fn version(mut self, version: &str) -> Self {
if !version.is_empty() {
self.config.version = Some(version.to_string())
}
self
}
pub fn endpoint(mut self, endpoint: &str) -> Self {
if !endpoint.is_empty() {
self.config.endpoint = Some(endpoint.to_string())
}
self
}
pub fn runtime_token(mut self, runtime_token: &str) -> Self {
if !runtime_token.is_empty() {
self.config.runtime_token = Some(runtime_token.to_string())
}
self
}
pub fn http_client(mut self, client: HttpClient) -> Self {
self.http_client = Some(client);
self
}
}
impl Builder for GhacBuilder {
const SCHEME: Scheme = Scheme::Ghac;
type Config = GhacConfig;
fn build(self) -> Result<impl Access> {
debug!("backend build started: {:?}", self);
let root = normalize_root(&self.config.root.unwrap_or_default());
debug!("backend use root {}", root);
let client = if let Some(client) = self.http_client {
client
} else {
HttpClient::new().map_err(|err| {
err.with_operation("Builder::build")
.with_context("service", Scheme::Ghac)
})?
};
let backend = GhacBackend {
root,
cache_url: value_or_env(self.config.endpoint, ACTIONS_CACHE_URL, "Builder::build")?,
catch_token: value_or_env(
self.config.runtime_token,
ACTIONS_RUNTIME_TOKEN,
"Builder::build",
)?,
version: self
.config
.version
.clone()
.unwrap_or_else(|| "opendal".to_string()),
api_url: env::var(GITHUB_API_URL)
.unwrap_or_else(|_| "https://api.github.com".to_string()),
api_token: env::var(GITHUB_TOKEN).unwrap_or_default(),
repo: env::var(GITHUB_REPOSITORY).unwrap_or_default(),
client,
};
Ok(backend)
}
}
#[derive(Debug, Clone)]
pub struct GhacBackend {
root: String,
cache_url: String,
catch_token: String,
version: String,
api_url: String,
api_token: String,
repo: String,
pub client: HttpClient,
}
impl Access for GhacBackend {
type Reader = HttpBody;
type Writer = GhacWriter;
type Lister = ();
type BlockingReader = ();
type BlockingWriter = ();
type BlockingLister = ();
fn info(&self) -> Arc<AccessorInfo> {
let mut am = AccessorInfo::default();
am.set_scheme(Scheme::Ghac)
.set_root(&self.root)
.set_name(&self.version)
.set_native_capability(Capability {
stat: true,
read: true,
write: true,
write_can_multi: true,
delete: true,
shared: true,
..Default::default()
});
am.into()
}
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let req = self.ghac_query(path)?;
let resp = self.client.send(req).await?;
let location = if resp.status() == StatusCode::OK {
let slc = resp.into_body();
let query_resp: GhacQueryResponse =
serde_json::from_reader(slc.reader()).map_err(new_json_deserialize_error)?;
query_resp.archive_location
} else {
return Err(parse_error(resp));
};
let req = Request::get(location)
.header(header::RANGE, "bytes=0-0")
.body(Buffer::new())
.map_err(new_request_build_error)?;
let resp = self.client.send(req).await?;
let status = resp.status();
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT | StatusCode::RANGE_NOT_SATISFIABLE => {
let mut meta = parse_into_metadata(path, resp.headers())?;
meta.set_content_length(
meta.content_range()
.expect("content range must be valid")
.size()
.expect("content range must contains size"),
);
Ok(RpStat::new(meta))
}
_ => Err(parse_error(resp)),
}
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let req = self.ghac_query(path)?;
let resp = self.client.send(req).await?;
let location = if resp.status() == StatusCode::OK {
let slc = resp.into_body();
let query_resp: GhacQueryResponse =
serde_json::from_reader(slc.reader()).map_err(new_json_deserialize_error)?;
query_resp.archive_location
} else {
return Err(parse_error(resp));
};
let req = self.ghac_get_location(&location, args.range())?;
let resp = self.client.fetch(req).await?;
let status = resp.status();
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
Ok((RpRead::default(), resp.into_body()))
}
_ => {
let (part, mut body) = resp.into_parts();
let buf = body.to_buffer().await?;
Err(parse_error(Response::from_parts(part, buf)))
}
}
}
async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let req = self.ghac_reserve(path)?;
let resp = self.client.send(req).await?;
let cache_id = if resp.status().is_success() {
let slc = resp.into_body();
let reserve_resp: GhacReserveResponse =
serde_json::from_reader(slc.reader()).map_err(new_json_deserialize_error)?;
reserve_resp.cache_id
} else {
return Err(parse_error(resp).map(|err| err.with_operation("Backend::ghac_reserve")));
};
Ok((RpWrite::default(), GhacWriter::new(self.clone(), cache_id)))
}
async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
if self.api_token.is_empty() {
return Err(Error::new(
ErrorKind::PermissionDenied,
"github token is not configured, delete is permission denied",
));
}
let resp = self.ghac_delete(path).await?;
if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND {
Ok(RpDelete::default())
} else {
Err(parse_error(resp))
}
}
}
impl GhacBackend {
fn ghac_query(&self, path: &str) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}{CACHE_URL_BASE}/cache?keys={}&version={}",
self.cache_url,
percent_encode_path(&p),
self.version
);
let mut req = Request::get(&url);
req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
Ok(req)
}
pub fn ghac_get_location(&self, location: &str, range: BytesRange) -> Result<Request<Buffer>> {
let mut req = Request::get(location);
if !range.is_full() {
req = req.header(header::RANGE, range.to_header());
}
req.body(Buffer::new()).map_err(new_request_build_error)
}
fn ghac_reserve(&self, path: &str) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);
let url = format!("{}{CACHE_URL_BASE}/caches", self.cache_url);
let bs = serde_json::to_vec(&GhacReserveRequest {
key: p,
version: self.version.to_string(),
})
.map_err(new_json_serialize_error)?;
let mut req = Request::post(&url);
req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
req = req.header(CONTENT_LENGTH, bs.len());
req = req.header(CONTENT_TYPE, "application/json");
let req = req
.body(Buffer::from(Bytes::from(bs)))
.map_err(new_request_build_error)?;
Ok(req)
}
pub fn ghac_upload(
&self,
cache_id: i64,
offset: u64,
size: u64,
body: Buffer,
) -> Result<Request<Buffer>> {
let url = format!("{}{CACHE_URL_BASE}/caches/{cache_id}", self.cache_url);
let mut req = Request::patch(&url);
req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
req = req.header(CONTENT_LENGTH, size);
req = req.header(CONTENT_TYPE, "application/octet-stream");
req = req.header(
CONTENT_RANGE,
BytesContentRange::default()
.with_range(offset, offset + size - 1)
.to_header(),
);
let req = req.body(body).map_err(new_request_build_error)?;
Ok(req)
}
pub fn ghac_commit(&self, cache_id: i64, size: u64) -> Result<Request<Buffer>> {
let url = format!("{}{CACHE_URL_BASE}/caches/{cache_id}", self.cache_url);
let bs =
serde_json::to_vec(&GhacCommitRequest { size }).map_err(new_json_serialize_error)?;
let mut req = Request::post(&url);
req = req.header(AUTHORIZATION, format!("Bearer {}", self.catch_token));
req = req.header(ACCEPT, CACHE_HEADER_ACCEPT);
req = req.header(CONTENT_TYPE, "application/json");
req = req.header(CONTENT_LENGTH, bs.len());
let req = req
.body(Buffer::from(Bytes::from(bs)))
.map_err(new_request_build_error)?;
Ok(req)
}
async fn ghac_delete(&self, path: &str) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/repos/{}/actions/caches?key={}",
self.api_url,
self.repo,
percent_encode_path(&p)
);
let mut req = Request::delete(&url);
req = req.header(AUTHORIZATION, format!("Bearer {}", self.api_token));
req = req.header(USER_AGENT, format!("opendal/{VERSION} (service ghac)"));
req = req.header("X-GitHub-Api-Version", GITHUB_API_VERSION);
let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
self.client.send(req).await
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GhacQueryResponse {
archive_location: String,
}
#[derive(Serialize)]
struct GhacReserveRequest {
key: String,
version: String,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GhacReserveResponse {
cache_id: i64,
}
#[derive(Serialize)]
struct GhacCommitRequest {
size: u64,
}