opendal/services/gdrive/
backend.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use bytes::Bytes;
23use chrono::Utc;
24use http::Request;
25use http::Response;
26use http::StatusCode;
27use serde_json::json;
28
29use super::core::GdriveCore;
30use super::core::GdriveFile;
31use super::delete::GdriveDeleter;
32use super::error::parse_error;
33use super::lister::GdriveLister;
34use super::writer::GdriveWriter;
35use crate::raw::*;
36use crate::*;
37
38#[derive(Clone, Debug)]
39pub struct GdriveBackend {
40 pub core: Arc<GdriveCore>,
41}
42
43impl Access for GdriveBackend {
44 type Reader = HttpBody;
45 type Writer = oio::OneShotWriter<GdriveWriter>;
46 type Lister = oio::PageLister<GdriveLister>;
47 type Deleter = oio::OneShotDeleter<GdriveDeleter>;
48 type BlockingReader = ();
49 type BlockingWriter = ();
50 type BlockingLister = ();
51 type BlockingDeleter = ();
52
53 fn info(&self) -> Arc<AccessorInfo> {
54 self.core.info.clone()
55 }
56
57 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
58 let path = build_abs_path(&self.core.root, path);
59 let _ = self.core.path_cache.ensure_dir(&path).await?;
60
61 Ok(RpCreateDir::default())
62 }
63
64 async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
65 let resp = self.core.gdrive_stat(path).await?;
66
67 if resp.status() != StatusCode::OK {
68 return Err(parse_error(resp));
69 }
70
71 let bs = resp.into_body();
72 let gdrive_file: GdriveFile =
73 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
74
75 let file_type = if gdrive_file.mime_type == "application/vnd.google-apps.folder" {
76 EntryMode::DIR
77 } else {
78 EntryMode::FILE
79 };
80 let mut meta = Metadata::new(file_type).with_content_type(gdrive_file.mime_type);
81 if let Some(v) = gdrive_file.size {
82 meta = meta.with_content_length(v.parse::<u64>().map_err(|e| {
83 Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
84 })?);
85 }
86 if let Some(v) = gdrive_file.modified_time {
87 meta = meta.with_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(|e| {
88 Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
89 })?);
90 }
91 Ok(RpStat::new(meta))
92 }
93
94 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
95 let resp = self.core.gdrive_get(path, args.range()).await?;
96
97 let status = resp.status();
98 match status {
99 StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
100 _ => {
101 let (part, mut body) = resp.into_parts();
102 let buf = body.to_buffer().await?;
103 Err(parse_error(Response::from_parts(part, buf)))
104 }
105 }
106 }
107
108 async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
109 let path = build_abs_path(&self.core.root, path);
110
111 let file_id = self.core.path_cache.get(&path).await?;
114
115 Ok((
116 RpWrite::default(),
117 oio::OneShotWriter::new(GdriveWriter::new(self.core.clone(), path, file_id)),
118 ))
119 }
120
121 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
122 Ok((
123 RpDelete::default(),
124 oio::OneShotDeleter::new(GdriveDeleter::new(self.core.clone())),
125 ))
126 }
127
128 async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
129 let path = build_abs_path(&self.core.root, path);
130 let l = GdriveLister::new(path, self.core.clone());
131 Ok((RpList::default(), oio::PageLister::new(l)))
132 }
133
134 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
135 let from = build_abs_path(&self.core.root, from);
136
137 let from_file_id = self.core.path_cache.get(&from).await?.ok_or(Error::new(
138 ErrorKind::NotFound,
139 "the file to copy does not exist",
140 ))?;
141
142 let to_name = get_basename(to);
143 let to_path = build_abs_path(&self.core.root, to);
144 let to_parent_id = self
145 .core
146 .path_cache
147 .ensure_dir(get_parent(&to_path))
148 .await?;
149
150 if let Some(id) = self.core.path_cache.get(&to_path).await? {
152 let resp = self.core.gdrive_trash(&id).await?;
153 let status = resp.status();
154 if status != StatusCode::OK {
155 return Err(parse_error(resp));
156 }
157
158 self.core.path_cache.remove(&to_path).await;
159 }
160
161 let url = format!(
162 "https://www.googleapis.com/drive/v3/files/{}/copy",
163 from_file_id
164 );
165
166 let request_body = &json!({
167 "name": to_name,
168 "parents": [to_parent_id],
169 });
170 let body = Buffer::from(Bytes::from(request_body.to_string()));
171
172 let mut req = Request::post(&url)
173 .body(body)
174 .map_err(new_request_build_error)?;
175 self.core.sign(&mut req).await?;
176
177 let resp = self.core.info.http_client().send(req).await?;
178
179 match resp.status() {
180 StatusCode::OK => Ok(RpCopy::default()),
181 _ => Err(parse_error(resp)),
182 }
183 }
184
185 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
186 let source = build_abs_path(&self.core.root, from);
187 let target = build_abs_path(&self.core.root, to);
188
189 if let Some(id) = self.core.path_cache.get(&target).await? {
191 let resp = self.core.gdrive_trash(&id).await?;
192 let status = resp.status();
193 if status != StatusCode::OK {
194 return Err(parse_error(resp));
195 }
196
197 self.core.path_cache.remove(&target).await;
198 }
199
200 let resp = self
201 .core
202 .gdrive_patch_metadata_request(&source, &target)
203 .await?;
204
205 let status = resp.status();
206
207 match status {
208 StatusCode::OK => {
209 let body = resp.into_body();
210 let meta: GdriveFile =
211 serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
212
213 let cache = &self.core.path_cache;
214
215 cache.remove(&build_abs_path(&self.core.root, from)).await;
216 cache
217 .insert(&build_abs_path(&self.core.root, to), &meta.id)
218 .await;
219
220 Ok(RpRename::default())
221 }
222 _ => Err(parse_error(resp)),
223 }
224 }
225}