opendal/services/aliyun_drive/
backend.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use chrono::Utc;
24use http::header;
25use http::Request;
26use http::Response;
27use http::StatusCode;
28use log::debug;
29use tokio::sync::Mutex;
30
31use super::core::*;
32use super::delete::AliyunDriveDeleter;
33use super::error::parse_error;
34use super::lister::AliyunDriveLister;
35use super::lister::AliyunDriveParent;
36use super::writer::AliyunDriveWriter;
37use crate::raw::*;
38use crate::services::AliyunDriveConfig;
39use crate::*;
40
41impl Configurator for AliyunDriveConfig {
42 type Builder = AliyunDriveBuilder;
43
44 #[allow(deprecated)]
45 fn into_builder(self) -> Self::Builder {
46 AliyunDriveBuilder {
47 config: self,
48 http_client: None,
49 }
50 }
51}
52
53#[doc = include_str!("docs.md")]
54#[derive(Default)]
55pub struct AliyunDriveBuilder {
56 config: AliyunDriveConfig,
57
58 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
59 http_client: Option<HttpClient>,
60}
61
62impl Debug for AliyunDriveBuilder {
63 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
64 let mut d = f.debug_struct("AliyunDriveBuilder");
65
66 d.field("config", &self.config);
67 d.finish_non_exhaustive()
68 }
69}
70
71impl AliyunDriveBuilder {
72 pub fn root(mut self, root: &str) -> Self {
76 self.config.root = if root.is_empty() {
77 None
78 } else {
79 Some(root.to_string())
80 };
81
82 self
83 }
84
85 pub fn access_token(mut self, access_token: &str) -> Self {
87 self.config.access_token = Some(access_token.to_string());
88
89 self
90 }
91
92 pub fn client_id(mut self, client_id: &str) -> Self {
94 self.config.client_id = Some(client_id.to_string());
95
96 self
97 }
98
99 pub fn client_secret(mut self, client_secret: &str) -> Self {
101 self.config.client_secret = Some(client_secret.to_string());
102
103 self
104 }
105
106 pub fn refresh_token(mut self, refresh_token: &str) -> Self {
108 self.config.refresh_token = Some(refresh_token.to_string());
109
110 self
111 }
112
113 pub fn drive_type(mut self, drive_type: &str) -> Self {
115 self.config.drive_type = drive_type.to_string();
116
117 self
118 }
119
120 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
127 #[allow(deprecated)]
128 pub fn http_client(mut self, client: HttpClient) -> Self {
129 self.http_client = Some(client);
130 self
131 }
132}
133
134impl Builder for AliyunDriveBuilder {
135 const SCHEME: Scheme = Scheme::AliyunDrive;
136 type Config = AliyunDriveConfig;
137
138 fn build(self) -> Result<impl Access> {
139 debug!("backend build started: {:?}", &self);
140
141 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
142 debug!("backend use root {}", &root);
143
144 let sign = match self.config.access_token.clone() {
145 Some(access_token) if !access_token.is_empty() => {
146 AliyunDriveSign::Access(access_token)
147 }
148 _ => match (
149 self.config.client_id.clone(),
150 self.config.client_secret.clone(),
151 self.config.refresh_token.clone(),
152 ) {
153 (Some(client_id), Some(client_secret), Some(refresh_token)) if
154 !client_id.is_empty() && !client_secret.is_empty() && !refresh_token.is_empty() => {
155 AliyunDriveSign::Refresh(client_id, client_secret, refresh_token, None, 0)
156 }
157 _ => return Err(Error::new(
158 ErrorKind::ConfigInvalid,
159 "access_token and a set of client_id, client_secret, and refresh_token are both missing.")
160 .with_operation("Builder::build")
161 .with_context("service", Scheme::AliyunDrive)),
162 },
163 };
164
165 let drive_type = match self.config.drive_type.as_str() {
166 "" | "default" => DriveType::Default,
167 "resource" => DriveType::Resource,
168 "backup" => DriveType::Backup,
169 _ => {
170 return Err(Error::new(
171 ErrorKind::ConfigInvalid,
172 "drive_type is invalid.",
173 ))
174 }
175 };
176 debug!("backend use drive_type {:?}", drive_type);
177
178 Ok(AliyunDriveBackend {
179 core: Arc::new(AliyunDriveCore {
180 info: {
181 let am = AccessorInfo::default();
182 am.set_scheme(Scheme::AliyunDrive)
183 .set_root(&root)
184 .set_native_capability(Capability {
185 stat: true,
186 create_dir: true,
187 read: true,
188 write: true,
189 write_can_multi: true,
190 write_multi_min_size: Some(100 * 1024),
192 write_multi_max_size: if cfg!(target_pointer_width = "64") {
194 Some(5 * 1024 * 1024 * 1024)
195 } else {
196 Some(usize::MAX)
197 },
198 delete: true,
199 copy: true,
200 rename: true,
201 list: true,
202 list_with_limit: true,
203 shared: true,
204 stat_has_content_length: true,
205 stat_has_content_type: true,
206 list_has_last_modified: true,
207 list_has_content_length: true,
208 list_has_content_type: true,
209 ..Default::default()
210 });
211
212 #[allow(deprecated)]
214 if let Some(client) = self.http_client {
215 am.update_http_client(|_| client);
216 }
217
218 am.into()
219 },
220 endpoint: "https://openapi.alipan.com".to_string(),
221 root,
222 drive_type,
223 signer: Arc::new(Mutex::new(AliyunDriveSigner {
224 drive_id: None,
225 sign,
226 })),
227 dir_lock: Arc::new(Mutex::new(())),
228 }),
229 })
230 }
231}
232
233#[derive(Clone, Debug)]
234pub struct AliyunDriveBackend {
235 core: Arc<AliyunDriveCore>,
236}
237
238impl Access for AliyunDriveBackend {
239 type Reader = HttpBody;
240 type Writer = AliyunDriveWriter;
241 type Lister = oio::PageLister<AliyunDriveLister>;
242 type Deleter = oio::OneShotDeleter<AliyunDriveDeleter>;
243 type BlockingReader = ();
244 type BlockingWriter = ();
245 type BlockingLister = ();
246 type BlockingDeleter = ();
247
248 fn info(&self) -> Arc<AccessorInfo> {
249 self.core.info.clone()
250 }
251
252 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
253 self.core.ensure_dir_exists(path).await?;
254
255 Ok(RpCreateDir::default())
256 }
257
258 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
259 if from == to {
260 return Ok(RpRename::default());
261 }
262 let res = self.core.get_by_path(from).await?;
263 let file: AliyunDriveFile =
264 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
265 match self.core.get_by_path(to).await {
267 Err(err) if err.kind() == ErrorKind::NotFound => {}
268 Err(err) => return Err(err),
269 Ok(res) => {
270 let file: AliyunDriveFile =
271 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
272 self.core.delete_path(&file.file_id).await?;
273 }
274 };
275
276 let parent_file_id = self.core.ensure_dir_exists(get_parent(to)).await?;
277 self.core.move_path(&file.file_id, &parent_file_id).await?;
278
279 let from_name = get_basename(from);
280 let to_name = get_basename(to);
281
282 if from_name != to_name {
283 self.core.update_path(&file.file_id, to_name).await?;
284 }
285
286 Ok(RpRename::default())
287 }
288
289 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
290 if from == to {
291 return Ok(RpCopy::default());
292 }
293 let res = self.core.get_by_path(from).await?;
294 let file: AliyunDriveFile =
295 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
296 match self.core.get_by_path(to).await {
298 Err(err) if err.kind() == ErrorKind::NotFound => {}
299 Err(err) => return Err(err),
300 Ok(res) => {
301 let file: AliyunDriveFile =
302 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
303 self.core.delete_path(&file.file_id).await?;
304 }
305 };
306 let parent_path = get_parent(to);
309 let parent_file_id = self.core.ensure_dir_exists(parent_path).await?;
310
311 let auto_rename = file.parent_file_id == parent_file_id;
315 let res = self
316 .core
317 .copy_path(&file.file_id, &parent_file_id, auto_rename)
318 .await?;
319 let file: CopyResponse =
320 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
321 let file_id = file.file_id;
322
323 let from_name = get_basename(from);
324 let to_name = get_basename(to);
325
326 if from_name != to_name {
327 self.core.update_path(&file_id, to_name).await?;
328 }
329
330 Ok(RpCopy::default())
331 }
332
333 async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
334 let res = self.core.get_by_path(path).await?;
335 let file: AliyunDriveFile =
336 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
337
338 if file.path_type == "folder" {
339 let meta = Metadata::new(EntryMode::DIR).with_last_modified(
340 file.updated_at
341 .parse::<chrono::DateTime<Utc>>()
342 .map_err(|e| {
343 Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
344 })?,
345 );
346
347 return Ok(RpStat::new(meta));
348 }
349
350 let mut meta = Metadata::new(EntryMode::FILE).with_last_modified(
351 file.updated_at
352 .parse::<chrono::DateTime<Utc>>()
353 .map_err(|e| {
354 Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
355 })?,
356 );
357 if let Some(v) = file.size {
358 meta = meta.with_content_length(v);
359 }
360 if let Some(v) = file.content_type {
361 meta = meta.with_content_type(v);
362 }
363
364 Ok(RpStat::new(meta))
365 }
366
367 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
368 let res = self.core.get_by_path(path).await?;
369 let file: AliyunDriveFile =
370 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
371
372 let download_url = self.core.get_download_url(&file.file_id).await?;
373 let req = Request::get(&download_url)
374 .header(header::RANGE, args.range().to_header())
375 .body(Buffer::new())
376 .map_err(new_request_build_error)?;
377
378 let resp = self.core.info.http_client().fetch(req).await?;
379 let status = resp.status();
380 match status {
381 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
382 Ok((RpRead::default(), resp.into_body()))
383 }
384 _ => {
385 let (part, mut body) = resp.into_parts();
386 let buf = body.to_buffer().await?;
387 Err(parse_error(Response::from_parts(part, buf)))
388 }
389 }
390 }
391
392 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
393 Ok((
394 RpDelete::default(),
395 oio::OneShotDeleter::new(AliyunDriveDeleter::new(self.core.clone())),
396 ))
397 }
398
399 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
400 let parent = match self.core.get_by_path(path).await {
401 Err(err) if err.kind() == ErrorKind::NotFound => None,
402 Err(err) => return Err(err),
403 Ok(res) => {
404 let file: AliyunDriveFile =
405 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
406 Some(AliyunDriveParent {
407 file_id: file.file_id,
408 path: path.to_string(),
409 updated_at: file.updated_at,
410 })
411 }
412 };
413
414 let l = AliyunDriveLister::new(self.core.clone(), parent, args.limit());
415
416 Ok((RpList::default(), oio::PageLister::new(l)))
417 }
418
419 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
420 let parent_path = get_parent(path);
421 let parent_file_id = self.core.ensure_dir_exists(parent_path).await?;
422
423 match self.core.get_by_path(path).await {
425 Err(err) if err.kind() == ErrorKind::NotFound => {}
426 Err(err) => return Err(err),
427 Ok(res) => {
428 let file: AliyunDriveFile =
429 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
430 self.core.delete_path(&file.file_id).await?;
431 }
432 };
433
434 let writer =
435 AliyunDriveWriter::new(self.core.clone(), &parent_file_id, get_basename(path), args);
436
437 Ok((RpWrite::default(), writer))
438 }
439}