use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
use bytes::Buf;
use http::Response;
use http::StatusCode;
use log::debug;
use reqsign::GoogleCredentialLoader;
use reqsign::GoogleSigner;
use reqsign::GoogleTokenLoad;
use reqsign::GoogleTokenLoader;
use serde::Deserialize;
use serde_json;
use super::core::*;
use super::error::parse_error;
use super::lister::GcsLister;
use super::writer::GcsWriter;
use super::writer::GcsWriters;
use crate::raw::*;
use crate::services::GcsConfig;
use crate::*;
const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";
const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read_write";
impl Configurator for GcsConfig {
type Builder = GcsBuilder;
fn into_builder(self) -> Self::Builder {
GcsBuilder {
config: self,
http_client: None,
customized_token_loader: None,
}
}
}
#[doc = include_str!("docs.md")]
#[derive(Default)]
pub struct GcsBuilder {
config: GcsConfig,
http_client: Option<HttpClient>,
customized_token_loader: Option<Box<dyn GoogleTokenLoad>>,
}
impl Debug for GcsBuilder {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("GcsBuilder");
ds.field("config", &self.config);
ds.finish_non_exhaustive()
}
}
impl GcsBuilder {
pub fn root(mut self, root: &str) -> Self {
self.config.root = if root.is_empty() {
None
} else {
Some(root.to_string())
};
self
}
pub fn bucket(mut self, bucket: &str) -> Self {
self.config.bucket = bucket.to_string();
self
}
pub fn scope(mut self, scope: &str) -> Self {
if !scope.is_empty() {
self.config.scope = Some(scope.to_string())
};
self
}
pub fn service_account(mut self, service_account: &str) -> Self {
if !service_account.is_empty() {
self.config.service_account = Some(service_account.to_string())
};
self
}
pub fn endpoint(mut self, endpoint: &str) -> Self {
if !endpoint.is_empty() {
self.config.endpoint = Some(endpoint.to_string())
};
self
}
pub fn credential(mut self, credential: &str) -> Self {
if !credential.is_empty() {
self.config.credential = Some(credential.to_string())
};
self
}
pub fn credential_path(mut self, path: &str) -> Self {
if !path.is_empty() {
self.config.credential_path = Some(path.to_string())
};
self
}
pub fn http_client(mut self, client: HttpClient) -> Self {
self.http_client = Some(client);
self
}
pub fn customized_token_loader(mut self, token_load: Box<dyn GoogleTokenLoad>) -> Self {
self.customized_token_loader = Some(token_load);
self
}
pub fn token(mut self, token: String) -> Self {
self.config.token = Some(token);
self
}
pub fn disable_vm_metadata(mut self) -> Self {
self.config.disable_vm_metadata = true;
self
}
pub fn disable_config_load(mut self) -> Self {
self.config.disable_config_load = true;
self
}
pub fn predefined_acl(mut self, acl: &str) -> Self {
if !acl.is_empty() {
self.config.predefined_acl = Some(acl.to_string())
};
self
}
pub fn default_storage_class(mut self, class: &str) -> Self {
if !class.is_empty() {
self.config.default_storage_class = Some(class.to_string())
};
self
}
pub fn allow_anonymous(mut self) -> Self {
self.config.allow_anonymous = true;
self
}
}
impl Builder for GcsBuilder {
const SCHEME: Scheme = Scheme::Gcs;
type Config = GcsConfig;
fn build(self) -> Result<impl Access> {
debug!("backend build started: {:?}", self);
let root = normalize_root(&self.config.root.unwrap_or_default());
debug!("backend use root {}", root);
let bucket = match self.config.bucket.is_empty() {
false => Ok(&self.config.bucket),
true => Err(
Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
.with_operation("Builder::build")
.with_context("service", Scheme::Gcs),
),
}?;
let client = if let Some(client) = self.http_client {
client
} else {
HttpClient::new().map_err(|err| {
err.with_operation("Builder::build")
.with_context("service", Scheme::Gcs)
})?
};
let endpoint = self
.config
.endpoint
.clone()
.unwrap_or_else(|| DEFAULT_GCS_ENDPOINT.to_string());
debug!("backend use endpoint: {endpoint}");
let mut cred_loader = GoogleCredentialLoader::default();
if let Some(cred) = &self.config.credential {
cred_loader = cred_loader.with_content(cred);
}
if let Some(cred) = &self.config.credential_path {
cred_loader = cred_loader.with_path(cred);
}
#[cfg(target_arch = "wasm32")]
{
cred_loader = cred_loader.with_disable_env();
cred_loader = cred_loader.with_disable_well_known_location();
}
if self.config.disable_config_load {
cred_loader = cred_loader
.with_disable_env()
.with_disable_well_known_location();
}
let scope = if let Some(scope) = &self.config.scope {
scope
} else {
DEFAULT_GCS_SCOPE
};
let mut token_loader = GoogleTokenLoader::new(scope, GLOBAL_REQWEST_CLIENT.clone());
if let Some(account) = &self.config.service_account {
token_loader = token_loader.with_service_account(account);
}
if let Ok(Some(cred)) = cred_loader.load() {
token_loader = token_loader.with_credentials(cred)
}
if let Some(loader) = self.customized_token_loader {
token_loader = token_loader.with_customized_token_loader(loader)
}
if self.config.disable_vm_metadata {
token_loader = token_loader.with_disable_vm_metadata(true);
}
let signer = GoogleSigner::new("storage");
let backend = GcsBackend {
core: Arc::new(GcsCore {
endpoint,
bucket: bucket.to_string(),
root,
client,
signer,
token_loader,
token: self.config.token,
scope: scope.to_string(),
credential_loader: cred_loader,
predefined_acl: self.config.predefined_acl.clone(),
default_storage_class: self.config.default_storage_class.clone(),
allow_anonymous: self.config.allow_anonymous,
}),
};
Ok(backend)
}
}
#[derive(Clone, Debug)]
pub struct GcsBackend {
core: Arc<GcsCore>,
}
impl Access for GcsBackend {
type Reader = HttpBody;
type Writer = GcsWriters;
type Lister = oio::PageLister<GcsLister>;
type BlockingReader = ();
type BlockingWriter = ();
type BlockingLister = ();
fn info(&self) -> Arc<AccessorInfo> {
let mut am = AccessorInfo::default();
am.set_scheme(Scheme::Gcs)
.set_root(&self.core.root)
.set_name(&self.core.bucket)
.set_native_capability(Capability {
stat: true,
stat_with_if_match: true,
stat_with_if_none_match: true,
read: true,
read_with_if_match: true,
read_with_if_none_match: true,
write: true,
write_can_empty: true,
write_can_multi: true,
write_with_content_type: true,
write_with_user_metadata: true,
write_with_if_not_exists: true,
write_multi_min_size: Some(5 * 1024 * 1024),
write_multi_max_size: if cfg!(target_pointer_width = "64") {
Some(5 * 1024 * 1024 * 1024)
} else {
Some(usize::MAX)
},
delete: true,
copy: true,
list: true,
list_with_limit: true,
list_with_start_after: true,
list_with_recursive: true,
batch: true,
batch_max_operations: Some(100),
presign: true,
presign_stat: true,
presign_read: true,
presign_write: true,
shared: true,
..Default::default()
});
am.into()
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let resp = self.core.gcs_get_object_metadata(path, &args).await?;
if !resp.status().is_success() {
return Err(parse_error(resp));
}
let slc = resp.into_body();
let meta: GetObjectJsonResponse =
serde_json::from_reader(slc.reader()).map_err(new_json_deserialize_error)?;
let mut m = Metadata::new(EntryMode::FILE);
m.set_etag(&meta.etag);
m.set_content_md5(&meta.md5_hash);
let size = meta
.size
.parse::<u64>()
.map_err(|e| Error::new(ErrorKind::Unexpected, "parse u64").set_source(e))?;
m.set_content_length(size);
if !meta.content_type.is_empty() {
m.set_content_type(&meta.content_type);
}
m.set_last_modified(parse_datetime_from_rfc3339(&meta.updated)?);
if !meta.metadata.is_empty() {
m.with_user_metadata(meta.metadata);
}
Ok(RpStat::new(m))
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let resp = self.core.gcs_get_object(path, args.range(), &args).await?;
let status = resp.status();
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
Ok((RpRead::default(), resp.into_body()))
}
_ => {
let (part, mut body) = resp.into_parts();
let buf = body.to_buffer().await?;
Err(parse_error(Response::from_parts(part, buf)))
}
}
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let concurrent = args.concurrent();
let executor = args.executor().cloned();
let w = GcsWriter::new(self.core.clone(), path, args);
let w = oio::MultipartWriter::new(w, executor, concurrent);
Ok((RpWrite::default(), w))
}
async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
let resp = self.core.gcs_delete_object(path).await?;
if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND {
Ok(RpDelete::default())
} else {
Err(parse_error(resp))
}
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = GcsLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
);
Ok((RpList::default(), oio::PageLister::new(l)))
}
async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
let resp = self.core.gcs_copy_object(from, to).await?;
if resp.status().is_success() {
Ok(RpCopy::default())
} else {
Err(parse_error(resp))
}
}
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
let mut req = match args.operation() {
PresignOperation::Stat(v) => self.core.gcs_head_object_xml_request(path, v)?,
PresignOperation::Read(v) => self.core.gcs_get_object_xml_request(path, v)?,
PresignOperation::Write(v) => {
self.core
.gcs_insert_object_xml_request(path, v, Buffer::new())?
}
};
self.core.sign_query(&mut req, args.expire())?;
let (parts, _) = req.into_parts();
Ok(RpPresign::new(PresignedRequest::new(
parts.method,
parts.uri,
parts.headers,
)))
}
async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
let ops = args.into_operation();
if ops.len() > 100 {
return Err(Error::new(
ErrorKind::Unsupported,
"gcs services only allow delete less than 100 keys at once",
)
.with_context("length", ops.len().to_string()));
}
let paths: Vec<String> = ops.into_iter().map(|(p, _)| p).collect();
let resp = self.core.gcs_delete_objects(paths.clone()).await?;
let status = resp.status();
if let StatusCode::OK = status {
let content_type = parse_content_type(resp.headers())?.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"gcs batch delete response content type is empty",
)
})?;
let boundary = content_type
.strip_prefix("multipart/mixed; boundary=")
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"gcs batch delete response content type is not multipart/mixed",
)
})?
.trim_matches('"');
let multipart: Multipart<MixedPart> = Multipart::new()
.with_boundary(boundary)
.parse(resp.into_body().to_bytes())?;
let parts = multipart.into_parts();
let mut batched_result = Vec::with_capacity(parts.len());
for (i, part) in parts.into_iter().enumerate() {
let resp = part.into_response();
let path = paths[i].clone();
if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND {
batched_result.push((path, Ok(RpDelete::default().into())));
} else {
batched_result.push((path, Err(parse_error(resp))));
}
}
Ok(RpBatch::new(batched_result))
} else {
Err(parse_error(resp))
}
}
}
#[derive(Debug, Default, Deserialize)]
#[serde(default, rename_all = "camelCase")]
struct GetObjectJsonResponse {
size: String,
etag: String,
updated: String,
md5_hash: String,
content_type: String,
metadata: HashMap<String, String>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_deserialize_get_object_json_response() {
let content = r#"{
"kind": "storage#object",
"id": "example/1.png/1660563214863653",
"selfLink": "https://www.googleapis.com/storage/v1/b/example/o/1.png",
"mediaLink": "https://content-storage.googleapis.com/download/storage/v1/b/example/o/1.png?generation=1660563214863653&alt=media",
"name": "1.png",
"bucket": "example",
"generation": "1660563214863653",
"metageneration": "1",
"contentType": "image/png",
"storageClass": "STANDARD",
"size": "56535",
"md5Hash": "fHcEH1vPwA6eTPqxuasXcg==",
"crc32c": "j/un9g==",
"etag": "CKWasoTgyPkCEAE=",
"timeCreated": "2022-08-15T11:33:34.866Z",
"updated": "2022-08-15T11:33:34.866Z",
"timeStorageClassUpdated": "2022-08-15T11:33:34.866Z",
"metadata" : {
"location" : "everywhere"
}
}"#;
let meta: GetObjectJsonResponse =
serde_json::from_str(content).expect("json Deserialize must succeed");
assert_eq!(meta.size, "56535");
assert_eq!(meta.updated, "2022-08-15T11:33:34.866Z");
assert_eq!(meta.md5_hash, "fHcEH1vPwA6eTPqxuasXcg==");
assert_eq!(meta.etag, "CKWasoTgyPkCEAE=");
assert_eq!(meta.content_type, "image/png");
assert_eq!(
meta.metadata,
HashMap::from_iter([("location".to_string(), "everywhere".to_string())])
);
}
}