opendal/services/yandex_disk/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::header;
24use http::request;
25use http::Request;
26use http::Response;
27use http::StatusCode;
28use serde::Deserialize;
29
30use super::error::parse_error;
31use crate::raw::*;
32use crate::*;
33
34#[derive(Clone)]
35pub struct YandexDiskCore {
36    pub info: Arc<AccessorInfo>,
37    /// The root of this core.
38    pub root: String,
39    /// Yandex Disk oauth access_token.
40    pub access_token: String,
41}
42
43impl Debug for YandexDiskCore {
44    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45        f.debug_struct("Backend")
46            .field("root", &self.root)
47            .finish_non_exhaustive()
48    }
49}
50
51impl YandexDiskCore {
52    #[inline]
53    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
54        self.info.http_client().send(req).await
55    }
56
57    #[inline]
58    pub fn sign(&self, req: request::Builder) -> request::Builder {
59        req.header(
60            header::AUTHORIZATION,
61            format!("OAuth {}", self.access_token),
62        )
63    }
64}
65
66impl YandexDiskCore {
67    /// Get upload url.
68    async fn get_upload_url(&self, path: &str) -> Result<String> {
69        let path = build_rooted_abs_path(&self.root, path);
70
71        let url = format!(
72            "https://cloud-api.yandex.net/v1/disk/resources/upload?path={}&overwrite=true",
73            percent_encode_path(&path)
74        );
75
76        let req = Request::get(url);
77
78        let req = req.extension(Operation::Write);
79
80        let req = self.sign(req);
81
82        // Set body
83        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
84
85        let resp = self.send(req).await?;
86
87        let status = resp.status();
88
89        match status {
90            StatusCode::OK => {
91                let bytes = resp.into_body();
92
93                let resp: GetUploadUrlResponse =
94                    serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
95
96                Ok(resp.href)
97            }
98            _ => Err(parse_error(resp)),
99        }
100    }
101
102    pub async fn upload(&self, path: &str, body: Buffer) -> Result<Response<Buffer>> {
103        let upload_url = self.get_upload_url(path).await?;
104        let req = Request::put(upload_url)
105            .extension(Operation::Write)
106            .body(body)
107            .map_err(new_request_build_error)?;
108
109        self.send(req).await
110    }
111
112    async fn get_download_url(&self, path: &str) -> Result<String> {
113        let path = build_rooted_abs_path(&self.root, path);
114
115        let url = format!(
116            "https://cloud-api.yandex.net/v1/disk/resources/download?path={}&overwrite=true",
117            percent_encode_path(&path)
118        );
119
120        let req = Request::get(url);
121
122        let req = req.extension(Operation::Read);
123
124        let req = self.sign(req);
125
126        // Set body
127        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
128
129        let resp = self.send(req).await?;
130
131        let status = resp.status();
132
133        match status {
134            StatusCode::OK => {
135                let bytes = resp.into_body();
136
137                let resp: GetUploadUrlResponse =
138                    serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
139
140                Ok(resp.href)
141            }
142            _ => Err(parse_error(resp)),
143        }
144    }
145
146    pub async fn download(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
147        let download_url = self.get_download_url(path).await?;
148        let req = Request::get(download_url)
149            .header(header::RANGE, range.to_header())
150            .extension(Operation::Read)
151            .body(Buffer::new())
152            .map_err(new_request_build_error)?;
153
154        self.info.http_client().fetch(req).await
155    }
156
157    pub async fn ensure_dir_exists(&self, path: &str) -> Result<()> {
158        let path = build_abs_path(&self.root, path);
159
160        let paths = path.split('/').collect::<Vec<&str>>();
161
162        for i in 0..paths.len() - 1 {
163            let path = paths[..i + 1].join("/");
164            let resp = self.create_dir(&path).await?;
165
166            let status = resp.status();
167
168            match status {
169                StatusCode::CREATED | StatusCode::CONFLICT => {}
170                _ => return Err(parse_error(resp)),
171            }
172        }
173        Ok(())
174    }
175
176    pub async fn create_dir(&self, path: &str) -> Result<Response<Buffer>> {
177        let url = format!(
178            "https://cloud-api.yandex.net/v1/disk/resources?path=/{}",
179            percent_encode_path(path),
180        );
181
182        let req = Request::put(url);
183
184        let req = req.extension(Operation::CreateDir);
185
186        let req = self.sign(req);
187
188        // Set body
189        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
190
191        self.send(req).await
192    }
193
194    pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
195        let from = build_rooted_abs_path(&self.root, from);
196        let to = build_rooted_abs_path(&self.root, to);
197
198        let url = format!(
199            "https://cloud-api.yandex.net/v1/disk/resources/copy?from={}&path={}&overwrite=true",
200            percent_encode_path(&from),
201            percent_encode_path(&to)
202        );
203
204        let req = Request::post(url);
205
206        let req = req.extension(Operation::Copy);
207
208        let req = self.sign(req);
209
210        // Set body
211        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
212
213        self.send(req).await
214    }
215
216    pub async fn move_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
217        let from = build_rooted_abs_path(&self.root, from);
218        let to = build_rooted_abs_path(&self.root, to);
219
220        let url = format!(
221            "https://cloud-api.yandex.net/v1/disk/resources/move?from={}&path={}&overwrite=true",
222            percent_encode_path(&from),
223            percent_encode_path(&to)
224        );
225
226        let req = Request::post(url);
227
228        let req = req.extension(Operation::Rename);
229
230        let req = self.sign(req);
231
232        // Set body
233        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
234
235        self.send(req).await
236    }
237
238    pub async fn delete(&self, path: &str) -> Result<Response<Buffer>> {
239        let path = build_rooted_abs_path(&self.root, path);
240
241        let url = format!(
242            "https://cloud-api.yandex.net/v1/disk/resources?path={}&permanently=true",
243            percent_encode_path(&path),
244        );
245
246        let req = Request::delete(url);
247
248        let req = req.extension(Operation::Delete);
249
250        let req = self.sign(req);
251
252        // Set body
253        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
254
255        self.send(req).await
256    }
257
258    pub async fn metainformation(
259        &self,
260        path: &str,
261        limit: Option<usize>,
262        offset: Option<String>,
263    ) -> Result<Response<Buffer>> {
264        let path = build_rooted_abs_path(&self.root, path);
265
266        let mut url = format!(
267            "https://cloud-api.yandex.net/v1/disk/resources?path={}",
268            percent_encode_path(&path),
269        );
270
271        if let Some(limit) = limit {
272            url = format!("{}&limit={}", url, limit);
273        }
274
275        if let Some(offset) = offset {
276            url = format!("{}&offset={}", url, offset);
277        }
278
279        let req = Request::get(url);
280
281        let req = req.extension(Operation::Stat);
282
283        let req = self.sign(req);
284
285        // Set body
286        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
287
288        self.send(req).await
289    }
290}
291
292pub(super) fn parse_info(mf: MetainformationResponse) -> Result<Metadata> {
293    let mode = if mf.ty == "file" {
294        EntryMode::FILE
295    } else {
296        EntryMode::DIR
297    };
298
299    let mut m = Metadata::new(mode);
300
301    m.set_last_modified(parse_datetime_from_rfc3339(&mf.modified)?);
302
303    if let Some(md5) = mf.md5 {
304        m.set_content_md5(&md5);
305    }
306
307    if let Some(mime_type) = mf.mime_type {
308        m.set_content_type(&mime_type);
309    }
310
311    if let Some(size) = mf.size {
312        m.set_content_length(size);
313    }
314
315    Ok(m)
316}
317
318#[derive(Debug, Deserialize)]
319pub struct GetUploadUrlResponse {
320    pub href: String,
321}
322
323#[derive(Debug, Deserialize)]
324pub struct MetainformationResponse {
325    #[serde(rename = "type")]
326    pub ty: String,
327    pub path: String,
328    pub modified: String,
329    pub md5: Option<String>,
330    pub mime_type: Option<String>,
331    pub size: Option<u64>,
332    #[serde(rename = "_embedded")]
333    pub embedded: Option<Embedded>,
334}
335
336#[derive(Debug, Deserialize)]
337pub struct Embedded {
338    pub total: usize,
339    pub items: Vec<MetainformationResponse>,
340}