opendal/services/gdrive/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use bytes::Bytes;
23use chrono::Utc;
24use http::Request;
25use http::Response;
26use http::StatusCode;
27use serde_json::json;
28
29use super::core::GdriveCore;
30use super::core::GdriveFile;
31use super::delete::GdriveDeleter;
32use super::error::parse_error;
33use super::lister::GdriveLister;
34use super::writer::GdriveWriter;
35use crate::raw::*;
36use crate::*;
37
38#[derive(Clone, Debug)]
39pub struct GdriveBackend {
40    pub core: Arc<GdriveCore>,
41}
42
43impl Access for GdriveBackend {
44    type Reader = HttpBody;
45    type Writer = oio::OneShotWriter<GdriveWriter>;
46    type Lister = oio::PageLister<GdriveLister>;
47    type Deleter = oio::OneShotDeleter<GdriveDeleter>;
48    type BlockingReader = ();
49    type BlockingWriter = ();
50    type BlockingLister = ();
51    type BlockingDeleter = ();
52
53    fn info(&self) -> Arc<AccessorInfo> {
54        self.core.info.clone()
55    }
56
57    async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
58        let path = build_abs_path(&self.core.root, path);
59        let _ = self.core.path_cache.ensure_dir(&path).await?;
60
61        Ok(RpCreateDir::default())
62    }
63
64    async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
65        let resp = self.core.gdrive_stat(path).await?;
66
67        if resp.status() != StatusCode::OK {
68            return Err(parse_error(resp));
69        }
70
71        let bs = resp.into_body();
72        let gdrive_file: GdriveFile =
73            serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
74
75        let file_type = if gdrive_file.mime_type == "application/vnd.google-apps.folder" {
76            EntryMode::DIR
77        } else {
78            EntryMode::FILE
79        };
80        let mut meta = Metadata::new(file_type).with_content_type(gdrive_file.mime_type);
81        if let Some(v) = gdrive_file.size {
82            meta = meta.with_content_length(v.parse::<u64>().map_err(|e| {
83                Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
84            })?);
85        }
86        if let Some(v) = gdrive_file.modified_time {
87            meta = meta.with_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(|e| {
88                Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
89            })?);
90        }
91        Ok(RpStat::new(meta))
92    }
93
94    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
95        let resp = self.core.gdrive_get(path, args.range()).await?;
96
97        let status = resp.status();
98        match status {
99            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
100            _ => {
101                let (part, mut body) = resp.into_parts();
102                let buf = body.to_buffer().await?;
103                Err(parse_error(Response::from_parts(part, buf)))
104            }
105        }
106    }
107
108    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
109        let path = build_abs_path(&self.core.root, path);
110
111        // As Google Drive allows files have the same name, we need to check if the file exists.
112        // If the file exists, we will keep its ID and update it.
113        let file_id = self.core.path_cache.get(&path).await?;
114
115        Ok((
116            RpWrite::default(),
117            oio::OneShotWriter::new(GdriveWriter::new(self.core.clone(), path, file_id)),
118        ))
119    }
120
121    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
122        Ok((
123            RpDelete::default(),
124            oio::OneShotDeleter::new(GdriveDeleter::new(self.core.clone())),
125        ))
126    }
127
128    async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
129        let path = build_abs_path(&self.core.root, path);
130        let l = GdriveLister::new(path, self.core.clone());
131        Ok((RpList::default(), oio::PageLister::new(l)))
132    }
133
134    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
135        let from = build_abs_path(&self.core.root, from);
136
137        let from_file_id = self.core.path_cache.get(&from).await?.ok_or(Error::new(
138            ErrorKind::NotFound,
139            "the file to copy does not exist",
140        ))?;
141
142        let to_name = get_basename(to);
143        let to_path = build_abs_path(&self.core.root, to);
144        let to_parent_id = self
145            .core
146            .path_cache
147            .ensure_dir(get_parent(&to_path))
148            .await?;
149
150        // copy will overwrite `to`, delete it if exist
151        if let Some(id) = self.core.path_cache.get(&to_path).await? {
152            let resp = self.core.gdrive_trash(&id).await?;
153            let status = resp.status();
154            if status != StatusCode::OK {
155                return Err(parse_error(resp));
156            }
157
158            self.core.path_cache.remove(&to_path).await;
159        }
160
161        let url = format!(
162            "https://www.googleapis.com/drive/v3/files/{}/copy",
163            from_file_id
164        );
165
166        let request_body = &json!({
167            "name": to_name,
168            "parents": [to_parent_id],
169        });
170        let body = Buffer::from(Bytes::from(request_body.to_string()));
171
172        let mut req = Request::post(&url)
173            .body(body)
174            .map_err(new_request_build_error)?;
175        self.core.sign(&mut req).await?;
176
177        let resp = self.core.info.http_client().send(req).await?;
178
179        match resp.status() {
180            StatusCode::OK => Ok(RpCopy::default()),
181            _ => Err(parse_error(resp)),
182        }
183    }
184
185    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
186        let source = build_abs_path(&self.core.root, from);
187        let target = build_abs_path(&self.core.root, to);
188
189        // rename will overwrite `to`, delete it if exist
190        if let Some(id) = self.core.path_cache.get(&target).await? {
191            let resp = self.core.gdrive_trash(&id).await?;
192            let status = resp.status();
193            if status != StatusCode::OK {
194                return Err(parse_error(resp));
195            }
196
197            self.core.path_cache.remove(&target).await;
198        }
199
200        let resp = self
201            .core
202            .gdrive_patch_metadata_request(&source, &target)
203            .await?;
204
205        let status = resp.status();
206
207        match status {
208            StatusCode::OK => {
209                let body = resp.into_body();
210                let meta: GdriveFile =
211                    serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
212
213                let cache = &self.core.path_cache;
214
215                cache.remove(&build_abs_path(&self.core.root, from)).await;
216                cache
217                    .insert(&build_abs_path(&self.core.root, to), &meta.id)
218                    .await;
219
220                Ok(RpRename::default())
221            }
222            _ => Err(parse_error(resp)),
223        }
224    }
225}