use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
use bytes::Buf;
use http::StatusCode;
use log::debug;
use reqsign::GoogleCredentialLoader;
use reqsign::GoogleSigner;
use reqsign::GoogleTokenLoad;
use reqsign::GoogleTokenLoader;
use serde::Deserialize;
use serde_json;
use super::core::*;
use super::error::parse_error;
use super::lister::GcsLister;
use super::reader::GcsReader;
use super::writer::GcsWriter;
use super::writer::GcsWriters;
use crate::raw::*;
use crate::*;
const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";
const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read_write";
#[derive(Default, Deserialize)]
#[serde(default)]
#[non_exhaustive]
pub struct GcsConfig {
root: Option<String>,
bucket: String,
endpoint: Option<String>,
scope: Option<String>,
service_account: Option<String>,
credential: Option<String>,
credential_path: Option<String>,
predefined_acl: Option<String>,
default_storage_class: Option<String>,
}
impl Debug for GcsConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GcsConfig")
.field("root", &self.root)
.field("bucket", &self.bucket)
.field("endpoint", &self.endpoint)
.field("scope", &self.scope)
.finish_non_exhaustive()
}
}
#[doc = include_str!("docs.md")]
#[derive(Default)]
pub struct GcsBuilder {
config: GcsConfig,
http_client: Option<HttpClient>,
customed_token_loader: Option<Box<dyn GoogleTokenLoad>>,
}
impl Debug for GcsBuilder {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("GcsBuilder");
ds.field("config", &self.config);
ds.finish_non_exhaustive()
}
}
impl GcsBuilder {
pub fn root(&mut self, root: &str) -> &mut Self {
if !root.is_empty() {
self.config.root = Some(root.to_string())
}
self
}
pub fn bucket(&mut self, bucket: &str) -> &mut Self {
self.config.bucket = bucket.to_string();
self
}
pub fn scope(&mut self, scope: &str) -> &mut Self {
if !scope.is_empty() {
self.config.scope = Some(scope.to_string())
};
self
}
pub fn service_account(&mut self, service_account: &str) -> &mut Self {
if !service_account.is_empty() {
self.config.service_account = Some(service_account.to_string())
};
self
}
pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
if !endpoint.is_empty() {
self.config.endpoint = Some(endpoint.to_string())
};
self
}
pub fn credential(&mut self, credential: &str) -> &mut Self {
if !credential.is_empty() {
self.config.credential = Some(credential.to_string())
};
self
}
pub fn credential_path(&mut self, path: &str) -> &mut Self {
if !path.is_empty() {
self.config.credential_path = Some(path.to_string())
};
self
}
pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
self.http_client = Some(client);
self
}
pub fn customed_token_loader(&mut self, token_load: Box<dyn GoogleTokenLoad>) -> &mut Self {
self.customed_token_loader = Some(token_load);
self
}
pub fn predefined_acl(&mut self, acl: &str) -> &mut Self {
if !acl.is_empty() {
self.config.predefined_acl = Some(acl.to_string())
};
self
}
pub fn default_storage_class(&mut self, class: &str) -> &mut Self {
if !class.is_empty() {
self.config.default_storage_class = Some(class.to_string())
};
self
}
}
impl Builder for GcsBuilder {
const SCHEME: Scheme = Scheme::Gcs;
type Accessor = GcsBackend;
fn from_map(map: HashMap<String, String>) -> Self {
let config = GcsConfig::deserialize(ConfigDeserializer::new(map))
.expect("config deserialize must succeed");
GcsBuilder {
config,
..GcsBuilder::default()
}
}
fn build(&mut self) -> Result<Self::Accessor> {
debug!("backend build started: {:?}", self);
let root = normalize_root(&self.config.root.take().unwrap_or_default());
debug!("backend use root {}", root);
let bucket = match self.config.bucket.is_empty() {
false => Ok(&self.config.bucket),
true => Err(
Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
.with_operation("Builder::build")
.with_context("service", Scheme::Gcs),
),
}?;
let client = if let Some(client) = self.http_client.take() {
client
} else {
HttpClient::new().map_err(|err| {
err.with_operation("Builder::build")
.with_context("service", Scheme::Gcs)
})?
};
let endpoint = self
.config
.endpoint
.clone()
.unwrap_or_else(|| DEFAULT_GCS_ENDPOINT.to_string());
debug!("backend use endpoint: {endpoint}");
let mut cred_loader = GoogleCredentialLoader::default();
if let Some(cred) = &self.config.credential {
cred_loader = cred_loader.with_content(cred);
}
if let Some(cred) = &self.config.credential_path {
cred_loader = cred_loader.with_path(cred);
}
#[cfg(target_arch = "wasm32")]
{
cred_loader = cred_loader.with_disable_env();
cred_loader = cred_loader.with_disable_well_known_location();
}
let scope = if let Some(scope) = &self.config.scope {
scope
} else {
DEFAULT_GCS_SCOPE
};
let mut token_loader = GoogleTokenLoader::new(scope, client.client());
if let Some(account) = &self.config.service_account {
token_loader = token_loader.with_service_account(account);
}
if let Ok(Some(cred)) = cred_loader.load() {
token_loader = token_loader.with_credentials(cred)
}
if let Some(loader) = self.customed_token_loader.take() {
token_loader = token_loader.with_customed_token_loader(loader)
}
let signer = GoogleSigner::new("storage");
let backend = GcsBackend {
core: Arc::new(GcsCore {
endpoint,
bucket: bucket.to_string(),
root,
client,
signer,
token_loader,
credential_loader: cred_loader,
predefined_acl: self.config.predefined_acl.clone(),
default_storage_class: self.config.default_storage_class.clone(),
}),
};
Ok(backend)
}
}
#[derive(Clone, Debug)]
pub struct GcsBackend {
core: Arc<GcsCore>,
}
impl Access for GcsBackend {
type Reader = GcsReader;
type Writer = GcsWriters;
type Lister = oio::PageLister<GcsLister>;
type BlockingReader = ();
type BlockingWriter = ();
type BlockingLister = ();
fn info(&self) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.set_scheme(Scheme::Gcs)
.set_root(&self.core.root)
.set_name(&self.core.bucket)
.set_native_capability(Capability {
stat: true,
stat_with_if_match: true,
stat_with_if_none_match: true,
read: true,
read_with_if_match: true,
read_with_if_none_match: true,
write: true,
write_can_empty: true,
write_can_multi: true,
write_with_content_type: true,
write_multi_align_size: Some(256 * 1024 * 1024),
delete: true,
copy: true,
list: true,
list_with_limit: true,
list_with_start_after: true,
list_with_recursive: true,
batch: true,
batch_max_operations: Some(100),
presign: true,
presign_stat: true,
presign_read: true,
presign_write: true,
..Default::default()
});
am
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let resp = self.core.gcs_get_object_metadata(path, &args).await?;
if !resp.status().is_success() {
return Err(parse_error(resp).await?);
}
let slc = resp.into_body();
let meta: GetObjectJsonResponse =
serde_json::from_reader(slc.reader()).map_err(new_json_deserialize_error)?;
let mut m = Metadata::new(EntryMode::FILE);
m.set_etag(&meta.etag);
m.set_content_md5(&meta.md5_hash);
let size = meta
.size
.parse::<u64>()
.map_err(|e| Error::new(ErrorKind::Unexpected, "parse u64").set_source(e))?;
m.set_content_length(size);
if !meta.content_type.is_empty() {
m.set_content_type(&meta.content_type);
}
m.set_last_modified(parse_datetime_from_rfc3339(&meta.updated)?);
Ok(RpStat::new(m))
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
Ok((
RpRead::default(),
GcsReader::new(self.core.clone(), path, args),
))
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let concurrent = args.concurrent();
let w = GcsWriter::new(self.core.clone(), path, args);
let w = oio::RangeWriter::new(w, concurrent);
Ok((RpWrite::default(), w))
}
async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
let resp = self.core.gcs_delete_object(path).await?;
if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND {
Ok(RpDelete::default())
} else {
Err(parse_error(resp).await?)
}
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = GcsLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
);
Ok((RpList::default(), oio::PageLister::new(l)))
}
async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
let resp = self.core.gcs_copy_object(from, to).await?;
if resp.status().is_success() {
Ok(RpCopy::default())
} else {
Err(parse_error(resp).await?)
}
}
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
let mut req = match args.operation() {
PresignOperation::Stat(v) => self.core.gcs_head_object_xml_request(path, v)?,
PresignOperation::Read(v) => self.core.gcs_get_object_xml_request(path, v)?,
PresignOperation::Write(v) => {
self.core
.gcs_insert_object_xml_request(path, v, Buffer::new())?
}
};
self.core.sign_query(&mut req, args.expire()).await?;
let (parts, _) = req.into_parts();
Ok(RpPresign::new(PresignedRequest::new(
parts.method,
parts.uri,
parts.headers,
)))
}
async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
let ops = args.into_operation();
if ops.len() > 100 {
return Err(Error::new(
ErrorKind::Unsupported,
"gcs services only allow delete less than 100 keys at once",
)
.with_context("length", ops.len().to_string()));
}
let paths: Vec<String> = ops.into_iter().map(|(p, _)| p).collect();
let resp = self.core.gcs_delete_objects(paths.clone()).await?;
let status = resp.status();
if let StatusCode::OK = status {
let content_type = parse_content_type(resp.headers())?.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"gcs batch delete response content type is empty",
)
})?;
let boundary = content_type
.strip_prefix("multipart/mixed; boundary=")
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"gcs batch delete response content type is not multipart/mixed",
)
})?
.trim_matches('"');
let multipart: Multipart<MixedPart> = Multipart::new()
.with_boundary(boundary)
.parse(resp.into_body().to_bytes())?;
let parts = multipart.into_parts();
let mut batched_result = Vec::with_capacity(parts.len());
for (i, part) in parts.into_iter().enumerate() {
let resp = part.into_response();
let path = paths[i].clone();
if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND {
batched_result.push((path, Ok(RpDelete::default().into())));
} else {
batched_result.push((path, Err(parse_error(resp).await?)));
}
}
Ok(RpBatch::new(batched_result))
} else {
Err(parse_error(resp).await?)
}
}
}
#[derive(Debug, Default, Deserialize)]
#[serde(default, rename_all = "camelCase")]
struct GetObjectJsonResponse {
size: String,
etag: String,
updated: String,
md5_hash: String,
content_type: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_deserialize_get_object_json_response() {
let content = r#"{
"kind": "storage#object",
"id": "example/1.png/1660563214863653",
"selfLink": "https://www.googleapis.com/storage/v1/b/example/o/1.png",
"mediaLink": "https://content-storage.googleapis.com/download/storage/v1/b/example/o/1.png?generation=1660563214863653&alt=media",
"name": "1.png",
"bucket": "example",
"generation": "1660563214863653",
"metageneration": "1",
"contentType": "image/png",
"storageClass": "STANDARD",
"size": "56535",
"md5Hash": "fHcEH1vPwA6eTPqxuasXcg==",
"crc32c": "j/un9g==",
"etag": "CKWasoTgyPkCEAE=",
"timeCreated": "2022-08-15T11:33:34.866Z",
"updated": "2022-08-15T11:33:34.866Z",
"timeStorageClassUpdated": "2022-08-15T11:33:34.866Z"
}"#;
let meta: GetObjectJsonResponse =
serde_json::from_str(content).expect("json Deserialize must succeed");
assert_eq!(meta.size, "56535");
assert_eq!(meta.updated, "2022-08-15T11:33:34.866Z");
assert_eq!(meta.md5_hash, "fHcEH1vPwA6eTPqxuasXcg==");
assert_eq!(meta.etag, "CKWasoTgyPkCEAE=");
assert_eq!(meta.content_type, "image/png");
}
}