opendal/services/yandex_disk/
core.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::header;
24use http::request;
25use http::Request;
26use http::Response;
27use http::StatusCode;
28use serde::Deserialize;
29
30use super::error::parse_error;
31use crate::raw::*;
32use crate::*;
33
34#[derive(Clone)]
35pub struct YandexDiskCore {
36 pub info: Arc<AccessorInfo>,
37 pub root: String,
39 pub access_token: String,
41}
42
43impl Debug for YandexDiskCore {
44 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45 f.debug_struct("Backend")
46 .field("root", &self.root)
47 .finish_non_exhaustive()
48 }
49}
50
51impl YandexDiskCore {
52 #[inline]
53 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
54 self.info.http_client().send(req).await
55 }
56
57 #[inline]
58 pub fn sign(&self, req: request::Builder) -> request::Builder {
59 req.header(
60 header::AUTHORIZATION,
61 format!("OAuth {}", self.access_token),
62 )
63 }
64}
65
66impl YandexDiskCore {
67 async fn get_upload_url(&self, path: &str) -> Result<String> {
69 let path = build_rooted_abs_path(&self.root, path);
70
71 let url = format!(
72 "https://cloud-api.yandex.net/v1/disk/resources/upload?path={}&overwrite=true",
73 percent_encode_path(&path)
74 );
75
76 let req = Request::get(url);
77
78 let req = req.extension(Operation::Write);
79
80 let req = self.sign(req);
81
82 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
84
85 let resp = self.send(req).await?;
86
87 let status = resp.status();
88
89 match status {
90 StatusCode::OK => {
91 let bytes = resp.into_body();
92
93 let resp: GetUploadUrlResponse =
94 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
95
96 Ok(resp.href)
97 }
98 _ => Err(parse_error(resp)),
99 }
100 }
101
102 pub async fn upload(&self, path: &str, body: Buffer) -> Result<Response<Buffer>> {
103 let upload_url = self.get_upload_url(path).await?;
104 let req = Request::put(upload_url)
105 .extension(Operation::Write)
106 .body(body)
107 .map_err(new_request_build_error)?;
108
109 self.send(req).await
110 }
111
112 async fn get_download_url(&self, path: &str) -> Result<String> {
113 let path = build_rooted_abs_path(&self.root, path);
114
115 let url = format!(
116 "https://cloud-api.yandex.net/v1/disk/resources/download?path={}&overwrite=true",
117 percent_encode_path(&path)
118 );
119
120 let req = Request::get(url);
121
122 let req = req.extension(Operation::Read);
123
124 let req = self.sign(req);
125
126 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
128
129 let resp = self.send(req).await?;
130
131 let status = resp.status();
132
133 match status {
134 StatusCode::OK => {
135 let bytes = resp.into_body();
136
137 let resp: GetUploadUrlResponse =
138 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
139
140 Ok(resp.href)
141 }
142 _ => Err(parse_error(resp)),
143 }
144 }
145
146 pub async fn download(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
147 let download_url = self.get_download_url(path).await?;
148 let req = Request::get(download_url)
149 .header(header::RANGE, range.to_header())
150 .extension(Operation::Read)
151 .body(Buffer::new())
152 .map_err(new_request_build_error)?;
153
154 self.info.http_client().fetch(req).await
155 }
156
157 pub async fn ensure_dir_exists(&self, path: &str) -> Result<()> {
158 let path = build_abs_path(&self.root, path);
159
160 let paths = path.split('/').collect::<Vec<&str>>();
161
162 for i in 0..paths.len() - 1 {
163 let path = paths[..i + 1].join("/");
164 let resp = self.create_dir(&path).await?;
165
166 let status = resp.status();
167
168 match status {
169 StatusCode::CREATED | StatusCode::CONFLICT => {}
170 _ => return Err(parse_error(resp)),
171 }
172 }
173 Ok(())
174 }
175
176 pub async fn create_dir(&self, path: &str) -> Result<Response<Buffer>> {
177 let url = format!(
178 "https://cloud-api.yandex.net/v1/disk/resources?path=/{}",
179 percent_encode_path(path),
180 );
181
182 let req = Request::put(url);
183
184 let req = req.extension(Operation::CreateDir);
185
186 let req = self.sign(req);
187
188 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
190
191 self.send(req).await
192 }
193
194 pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
195 let from = build_rooted_abs_path(&self.root, from);
196 let to = build_rooted_abs_path(&self.root, to);
197
198 let url = format!(
199 "https://cloud-api.yandex.net/v1/disk/resources/copy?from={}&path={}&overwrite=true",
200 percent_encode_path(&from),
201 percent_encode_path(&to)
202 );
203
204 let req = Request::post(url);
205
206 let req = req.extension(Operation::Copy);
207
208 let req = self.sign(req);
209
210 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
212
213 self.send(req).await
214 }
215
216 pub async fn move_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
217 let from = build_rooted_abs_path(&self.root, from);
218 let to = build_rooted_abs_path(&self.root, to);
219
220 let url = format!(
221 "https://cloud-api.yandex.net/v1/disk/resources/move?from={}&path={}&overwrite=true",
222 percent_encode_path(&from),
223 percent_encode_path(&to)
224 );
225
226 let req = Request::post(url);
227
228 let req = req.extension(Operation::Rename);
229
230 let req = self.sign(req);
231
232 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
234
235 self.send(req).await
236 }
237
238 pub async fn delete(&self, path: &str) -> Result<Response<Buffer>> {
239 let path = build_rooted_abs_path(&self.root, path);
240
241 let url = format!(
242 "https://cloud-api.yandex.net/v1/disk/resources?path={}&permanently=true",
243 percent_encode_path(&path),
244 );
245
246 let req = Request::delete(url);
247
248 let req = req.extension(Operation::Delete);
249
250 let req = self.sign(req);
251
252 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
254
255 self.send(req).await
256 }
257
258 pub async fn metainformation(
259 &self,
260 path: &str,
261 limit: Option<usize>,
262 offset: Option<String>,
263 ) -> Result<Response<Buffer>> {
264 let path = build_rooted_abs_path(&self.root, path);
265
266 let mut url = format!(
267 "https://cloud-api.yandex.net/v1/disk/resources?path={}",
268 percent_encode_path(&path),
269 );
270
271 if let Some(limit) = limit {
272 url = format!("{}&limit={}", url, limit);
273 }
274
275 if let Some(offset) = offset {
276 url = format!("{}&offset={}", url, offset);
277 }
278
279 let req = Request::get(url);
280
281 let req = req.extension(Operation::Stat);
282
283 let req = self.sign(req);
284
285 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
287
288 self.send(req).await
289 }
290}
291
292pub(super) fn parse_info(mf: MetainformationResponse) -> Result<Metadata> {
293 let mode = if mf.ty == "file" {
294 EntryMode::FILE
295 } else {
296 EntryMode::DIR
297 };
298
299 let mut m = Metadata::new(mode);
300
301 m.set_last_modified(parse_datetime_from_rfc3339(&mf.modified)?);
302
303 if let Some(md5) = mf.md5 {
304 m.set_content_md5(&md5);
305 }
306
307 if let Some(mime_type) = mf.mime_type {
308 m.set_content_type(&mime_type);
309 }
310
311 if let Some(size) = mf.size {
312 m.set_content_length(size);
313 }
314
315 Ok(m)
316}
317
318#[derive(Debug, Deserialize)]
319pub struct GetUploadUrlResponse {
320 pub href: String,
321}
322
323#[derive(Debug, Deserialize)]
324pub struct MetainformationResponse {
325 #[serde(rename = "type")]
326 pub ty: String,
327 pub path: String,
328 pub modified: String,
329 pub md5: Option<String>,
330 pub mime_type: Option<String>,
331 pub size: Option<u64>,
332 #[serde(rename = "_embedded")]
333 pub embedded: Option<Embedded>,
334}
335
336#[derive(Debug, Deserialize)]
337pub struct Embedded {
338 pub total: usize,
339 pub items: Vec<MetainformationResponse>,
340}