opendal/services/yandex_disk/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Buf;
19use http::header;
20use http::request;
21use http::Request;
22use http::Response;
23use http::StatusCode;
24use serde::Deserialize;
25use std::fmt::Debug;
26use std::fmt::Formatter;
27use std::sync::Arc;
28
29use super::error::parse_error;
30use crate::raw::*;
31use crate::*;
32
33#[derive(Clone)]
34pub struct YandexDiskCore {
35    pub info: Arc<AccessorInfo>,
36    /// The root of this core.
37    pub root: String,
38    /// Yandex Disk oauth access_token.
39    pub access_token: String,
40}
41
42impl Debug for YandexDiskCore {
43    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44        f.debug_struct("Backend")
45            .field("root", &self.root)
46            .finish_non_exhaustive()
47    }
48}
49
50impl YandexDiskCore {
51    #[inline]
52    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
53        self.info.http_client().send(req).await
54    }
55
56    #[inline]
57    pub fn sign(&self, req: request::Builder) -> request::Builder {
58        req.header(
59            header::AUTHORIZATION,
60            format!("OAuth {}", self.access_token),
61        )
62    }
63}
64
65impl YandexDiskCore {
66    /// Get upload url.
67    async fn get_upload_url(&self, path: &str) -> Result<String> {
68        let path = build_rooted_abs_path(&self.root, path);
69
70        let url = format!(
71            "https://cloud-api.yandex.net/v1/disk/resources/upload?path={}&overwrite=true",
72            percent_encode_path(&path)
73        );
74
75        let req = Request::get(url);
76
77        let req = req.extension(Operation::Write);
78
79        let req = self.sign(req);
80
81        // Set body
82        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
83
84        let resp = self.send(req).await?;
85
86        let status = resp.status();
87
88        match status {
89            StatusCode::OK => {
90                let bytes = resp.into_body();
91
92                let resp: GetUploadUrlResponse =
93                    serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
94
95                Ok(resp.href)
96            }
97            _ => Err(parse_error(resp)),
98        }
99    }
100
101    pub async fn upload(&self, path: &str, body: Buffer) -> Result<Response<Buffer>> {
102        let upload_url = self.get_upload_url(path).await?;
103        let req = Request::put(upload_url)
104            .extension(Operation::Write)
105            .body(body)
106            .map_err(new_request_build_error)?;
107
108        self.send(req).await
109    }
110
111    async fn get_download_url(&self, path: &str) -> Result<String> {
112        let path = build_rooted_abs_path(&self.root, path);
113
114        let url = format!(
115            "https://cloud-api.yandex.net/v1/disk/resources/download?path={}&overwrite=true",
116            percent_encode_path(&path)
117        );
118
119        let req = Request::get(url);
120
121        let req = req.extension(Operation::Read);
122
123        let req = self.sign(req);
124
125        // Set body
126        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
127
128        let resp = self.send(req).await?;
129
130        let status = resp.status();
131
132        match status {
133            StatusCode::OK => {
134                let bytes = resp.into_body();
135
136                let resp: GetUploadUrlResponse =
137                    serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
138
139                Ok(resp.href)
140            }
141            _ => Err(parse_error(resp)),
142        }
143    }
144
145    pub async fn download(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
146        let download_url = self.get_download_url(path).await?;
147        let req = Request::get(download_url)
148            .header(header::RANGE, range.to_header())
149            .extension(Operation::Read)
150            .body(Buffer::new())
151            .map_err(new_request_build_error)?;
152
153        self.info.http_client().fetch(req).await
154    }
155
156    pub async fn ensure_dir_exists(&self, path: &str) -> Result<()> {
157        let path = build_abs_path(&self.root, path);
158
159        let paths = path.split('/').collect::<Vec<&str>>();
160
161        for i in 0..paths.len() - 1 {
162            let path = paths[..i + 1].join("/");
163            let resp = self.create_dir(&path).await?;
164
165            let status = resp.status();
166
167            match status {
168                StatusCode::CREATED | StatusCode::CONFLICT => {}
169                _ => return Err(parse_error(resp)),
170            }
171        }
172        Ok(())
173    }
174
175    pub async fn create_dir(&self, path: &str) -> Result<Response<Buffer>> {
176        let url = format!(
177            "https://cloud-api.yandex.net/v1/disk/resources?path=/{}",
178            percent_encode_path(path),
179        );
180
181        let req = Request::put(url);
182
183        let req = req.extension(Operation::CreateDir);
184
185        let req = self.sign(req);
186
187        // Set body
188        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
189
190        self.send(req).await
191    }
192
193    pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
194        let from = build_rooted_abs_path(&self.root, from);
195        let to = build_rooted_abs_path(&self.root, to);
196
197        let url = format!(
198            "https://cloud-api.yandex.net/v1/disk/resources/copy?from={}&path={}&overwrite=true",
199            percent_encode_path(&from),
200            percent_encode_path(&to)
201        );
202
203        let req = Request::post(url);
204
205        let req = req.extension(Operation::Copy);
206
207        let req = self.sign(req);
208
209        // Set body
210        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
211
212        self.send(req).await
213    }
214
215    pub async fn move_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
216        let from = build_rooted_abs_path(&self.root, from);
217        let to = build_rooted_abs_path(&self.root, to);
218
219        let url = format!(
220            "https://cloud-api.yandex.net/v1/disk/resources/move?from={}&path={}&overwrite=true",
221            percent_encode_path(&from),
222            percent_encode_path(&to)
223        );
224
225        let req = Request::post(url);
226
227        let req = req.extension(Operation::Rename);
228
229        let req = self.sign(req);
230
231        // Set body
232        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
233
234        self.send(req).await
235    }
236
237    pub async fn delete(&self, path: &str) -> Result<Response<Buffer>> {
238        let path = build_rooted_abs_path(&self.root, path);
239
240        let url = format!(
241            "https://cloud-api.yandex.net/v1/disk/resources?path={}&permanently=true",
242            percent_encode_path(&path),
243        );
244
245        let req = Request::delete(url);
246
247        let req = req.extension(Operation::Delete);
248
249        let req = self.sign(req);
250
251        // Set body
252        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
253
254        self.send(req).await
255    }
256
257    pub async fn metainformation(
258        &self,
259        path: &str,
260        limit: Option<usize>,
261        offset: Option<String>,
262    ) -> Result<Response<Buffer>> {
263        let path = build_rooted_abs_path(&self.root, path);
264
265        let mut url = format!(
266            "https://cloud-api.yandex.net/v1/disk/resources?path={}",
267            percent_encode_path(&path),
268        );
269
270        if let Some(limit) = limit {
271            url = format!("{}&limit={}", url, limit);
272        }
273
274        if let Some(offset) = offset {
275            url = format!("{}&offset={}", url, offset);
276        }
277
278        let req = Request::get(url);
279
280        let req = req.extension(Operation::Stat);
281
282        let req = self.sign(req);
283
284        // Set body
285        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
286
287        self.send(req).await
288    }
289}
290
291pub(super) fn parse_info(mf: MetainformationResponse) -> Result<Metadata> {
292    let mode = if mf.ty == "file" {
293        EntryMode::FILE
294    } else {
295        EntryMode::DIR
296    };
297
298    let mut m = Metadata::new(mode);
299
300    m.set_last_modified(parse_datetime_from_rfc3339(&mf.modified)?);
301
302    if let Some(md5) = mf.md5 {
303        m.set_content_md5(&md5);
304    }
305
306    if let Some(mime_type) = mf.mime_type {
307        m.set_content_type(&mime_type);
308    }
309
310    if let Some(size) = mf.size {
311        m.set_content_length(size);
312    }
313
314    Ok(m)
315}
316
317#[derive(Debug, Deserialize)]
318pub struct GetUploadUrlResponse {
319    pub href: String,
320}
321
322#[derive(Debug, Deserialize)]
323pub struct MetainformationResponse {
324    #[serde(rename = "type")]
325    pub ty: String,
326    pub path: String,
327    pub modified: String,
328    pub md5: Option<String>,
329    pub mime_type: Option<String>,
330    pub size: Option<u64>,
331    #[serde(rename = "_embedded")]
332    pub embedded: Option<Embedded>,
333}
334
335#[derive(Debug, Deserialize)]
336pub struct Embedded {
337    pub total: usize,
338    pub items: Vec<MetainformationResponse>,
339}