opendal/services/onedrive/
core.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Buf;
24use bytes::Bytes;
25use http::Request;
26use http::Response;
27use http::StatusCode;
28use http::header;
29use tokio::sync::Mutex;
30
31use super::error::parse_error;
32use super::graph_model::*;
33use crate::raw::*;
34use crate::*;
35
36pub struct OneDriveCore {
37 pub info: Arc<AccessorInfo>,
38 pub root: String,
39 pub signer: Arc<Mutex<OneDriveSigner>>,
40}
41
42impl Debug for OneDriveCore {
43 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct("OneDriveCore")
45 .field("root", &self.root)
46 .finish_non_exhaustive()
47 }
48}
49
50const SPECIAL_POSIX_ENTRIES: [&str; 3] = [".", "/", ""];
52
53impl OneDriveCore {
55 pub(crate) const DRIVE_ROOT_URL: &'static str =
57 "https://graph.microsoft.com/v1.0/me/drive/root";
58
59 pub(crate) fn onedrive_item_url(&self, path: &str, build_absolute_path: bool) -> String {
61 if self.root == "/" && SPECIAL_POSIX_ENTRIES.contains(&path) {
64 Self::DRIVE_ROOT_URL.to_string()
65 } else {
66 let absolute_path = if build_absolute_path {
68 let rooted_path = build_rooted_abs_path(&self.root, path);
69 rooted_path
70 .strip_suffix('/')
71 .unwrap_or(rooted_path.as_str())
72 .to_string()
73 } else {
74 path.to_string()
75 };
76 format!(
77 "{}:{}",
78 Self::DRIVE_ROOT_URL,
79 percent_encode_path(&absolute_path),
80 )
81 }
82 }
83
84 pub(crate) async fn onedrive_get_stat_plain(&self, path: &str) -> Result<Response<Buffer>> {
88 let url: String = format!(
89 "{}?{}",
90 self.onedrive_item_url(path, true),
91 GENERAL_SELECT_PARAM
92 );
93 let request = Request::get(&url);
94
95 let mut request = request
96 .extension(Operation::Stat)
97 .body(Buffer::new())
98 .map_err(new_request_build_error)?;
99
100 self.sign(&mut request).await?;
101
102 self.info.http_client().send(request).await
103 }
104
105 pub(crate) async fn ensure_directory(&self, path: &str) -> Result<OneDriveItem> {
111 let response = self.onedrive_get_stat_plain(path).await?;
112 let item: OneDriveItem = match response.status() {
113 StatusCode::OK => {
114 let bytes = response.into_body();
115 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?
116 }
117 StatusCode::NOT_FOUND => {
118 let response = self.onedrive_create_dir(path).await?;
120 match response.status() {
121 StatusCode::CREATED | StatusCode::OK => {
122 let bytes = response.into_body();
123 serde_json::from_reader(bytes.reader())
124 .map_err(new_json_deserialize_error)?
125 }
126 _ => return Err(parse_error(response)),
127 }
128 }
129 _ => return Err(parse_error(response)),
130 };
131
132 Ok(item)
133 }
134
135 pub(crate) async fn sign<T>(&self, request: &mut Request<T>) -> Result<()> {
136 let mut signer = self.signer.lock().await;
137 signer.sign(request).await
138 }
139}
140
141const MAX_MONITOR_ATTEMPT: i32 = 3600;
144const MONITOR_WAIT_SECOND: u64 = 1;
145
146impl OneDriveCore {
154 pub(crate) async fn onedrive_stat(&self, path: &str, args: OpStat) -> Result<Metadata> {
161 let mut url: String = self.onedrive_item_url(path, true);
162 if args.version().is_some() {
163 url += "?$expand=versions(";
164 url += VERSION_SELECT_PARAM;
165 url += ")";
166 }
167
168 let mut request = Request::get(&url);
169 if let Some(etag) = args.if_none_match() {
170 request = request.header(header::IF_NONE_MATCH, etag);
171 }
172
173 let mut request = request
174 .extension(Operation::Stat)
175 .body(Buffer::new())
176 .map_err(new_request_build_error)?;
177
178 self.sign(&mut request).await?;
179
180 let response = self.info.http_client().send(request).await?;
181 if !response.status().is_success() {
182 return Err(parse_error(response));
183 }
184
185 let bytes = response.into_body();
186 let decoded_response: OneDriveItem =
187 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
188
189 let entry_mode: EntryMode = match decoded_response.item_type {
190 ItemType::Folder { .. } => EntryMode::DIR,
191 ItemType::File { .. } => EntryMode::FILE,
192 };
193
194 let mut meta = Metadata::new(entry_mode)
195 .with_etag(decoded_response.e_tag)
196 .with_content_length(decoded_response.size.max(0) as u64);
197
198 if let Some(version) = args.version() {
199 for item_version in decoded_response.versions.as_deref().unwrap_or_default() {
200 if item_version.id == version {
201 meta.set_version(version);
202 break; }
204 }
205
206 if meta.version().is_none() {
207 return Err(Error::new(
208 ErrorKind::NotFound,
209 "cannot find this version of the item",
210 ));
211 }
212 }
213
214 let last_modified = decoded_response.last_modified_date_time;
215 let date_utc_last_modified = last_modified.parse::<Timestamp>()?;
216 meta.set_last_modified(date_utc_last_modified);
217
218 Ok(meta)
219 }
220
221 pub(crate) async fn onedrive_list_versions(
227 &self,
228 path: &str,
229 ) -> Result<Vec<OneDriveItemVersion>> {
230 let url: String = format!(
232 "{}:/versions?{}",
233 self.onedrive_item_url(path, true),
234 VERSION_SELECT_PARAM
235 );
236
237 let mut request = Request::get(url)
238 .extension(Operation::List)
239 .body(Buffer::new())
240 .map_err(new_request_build_error)?;
241
242 self.sign(&mut request).await?;
243
244 let response = self.info.http_client().send(request).await?;
245 let decoded_response: GraphApiOneDriveVersionsResponse =
246 serde_json::from_reader(response.into_body().reader())
247 .map_err(new_json_deserialize_error)?;
248 Ok(decoded_response.value)
249 }
250
251 pub(crate) async fn onedrive_get_next_list_page(&self, url: &str) -> Result<Response<Buffer>> {
252 let mut request = Request::get(url)
253 .extension(Operation::List)
254 .body(Buffer::new())
255 .map_err(new_request_build_error)?;
256
257 self.sign(&mut request).await?;
258
259 self.info.http_client().send(request).await
260 }
261
262 pub(crate) async fn onedrive_get_content(
272 &self,
273 path: &str,
274 args: &OpRead,
275 ) -> Result<Response<HttpBody>> {
276 let url: String = format!("{}:/content", self.onedrive_item_url(path, true));
278
279 let mut request = Request::get(&url).header(header::RANGE, args.range().to_header());
280 if let Some(etag) = args.if_none_match() {
281 request = request.header(header::IF_NONE_MATCH, etag);
282 }
283
284 let mut request = request
285 .extension(Operation::Read)
286 .body(Buffer::new())
287 .map_err(new_request_build_error)?;
288
289 self.sign(&mut request).await?;
290
291 self.info.http_client().fetch(request).await
292 }
293
294 pub async fn onedrive_upload_simple(
306 &self,
307 path: &str,
308 args: &OpWrite,
309 body: Buffer,
310 ) -> Result<Response<Buffer>> {
311 let url = format!(
312 "{}:/content?@microsoft.graph.conflictBehavior={}&{}",
313 self.onedrive_item_url(path, true),
314 REPLACE_EXISTING_ITEM_WHEN_CONFLICT,
315 GENERAL_SELECT_PARAM
316 );
317
318 let mut request = Request::put(&url)
325 .header(header::CONTENT_LENGTH, body.len())
326 .header(header::CONTENT_TYPE, "text/plain");
327
328 if let Some(if_match) = args.if_match() {
332 request = request.header(header::IF_MATCH, if_match);
333 }
334
335 let mut request = request
336 .extension(Operation::Write)
337 .body(body)
338 .map_err(new_request_build_error)?;
339
340 self.sign(&mut request).await?;
341
342 self.info.http_client().send(request).await
343 }
344
345 pub(crate) async fn onedrive_chunked_upload(
346 &self,
347 url: &str,
348 args: &OpWrite,
349 offset: usize,
350 chunk_end: usize,
351 total_len: usize,
352 body: Buffer,
353 ) -> Result<Response<Buffer>> {
354 let mut request = Request::put(url);
355
356 let range = format!("bytes {offset}-{chunk_end}/{total_len}");
357 request = request.header(header::CONTENT_RANGE, range);
358
359 let size = chunk_end - offset + 1;
360 request = request.header(header::CONTENT_LENGTH, size);
361
362 if let Some(mime) = args.content_type() {
363 request = request.header(header::CONTENT_TYPE, mime)
364 }
365
366 let request = request
367 .extension(Operation::Write)
368 .body(body)
369 .map_err(new_request_build_error)?;
370 self.info.http_client().send(request).await
373 }
374
375 pub(crate) async fn onedrive_create_upload_session(
381 &self,
382 path: &str,
383 args: &OpWrite,
384 ) -> Result<Response<Buffer>> {
385 let parent_path = get_parent(path);
386 let file_name = get_basename(path);
387 let url = format!(
388 "{}:/createUploadSession",
389 self.onedrive_item_url(parent_path, true),
390 );
391 let mut request = Request::post(url).header(header::CONTENT_TYPE, "application/json");
392
393 if let Some(if_match) = args.if_match() {
394 request = request.header(header::IF_MATCH, if_match);
395 }
396
397 let body = OneDriveUploadSessionCreationRequestBody::new(file_name.to_string());
398 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
399 let body = Buffer::from(Bytes::from(body_bytes));
400 let mut request = request
401 .extension(Operation::Write)
402 .body(body)
403 .map_err(new_request_build_error)?;
404
405 self.sign(&mut request).await?;
406
407 self.info.http_client().send(request).await
408 }
409
410 pub(crate) async fn onedrive_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
417 let parent_path = get_parent(path);
418 let basename = get_basename(path);
419 let folder_name = basename.strip_suffix('/').unwrap_or(basename);
420
421 let url = format!(
422 "{}:/children?{}",
423 self.onedrive_item_url(parent_path, true),
424 GENERAL_SELECT_PARAM
425 );
426
427 let payload = CreateDirPayload::new(folder_name.to_string());
428 let body_bytes = serde_json::to_vec(&payload).map_err(new_json_serialize_error)?;
429 let body = Buffer::from(bytes::Bytes::from(body_bytes));
430
431 let mut request = Request::post(url)
432 .header(header::CONTENT_TYPE, "application/json")
433 .extension(Operation::CreateDir)
434 .body(body)
435 .map_err(new_request_build_error)?;
436
437 self.sign(&mut request).await?;
438
439 self.info.http_client().send(request).await
440 }
441
442 pub(crate) async fn onedrive_delete(&self, path: &str) -> Result<Response<Buffer>> {
446 let url = self.onedrive_item_url(path, true);
447
448 let mut request = Request::delete(&url)
449 .extension(Operation::Delete)
450 .body(Buffer::new())
451 .map_err(new_request_build_error)?;
452
453 self.sign(&mut request).await?;
454
455 self.info.http_client().send(request).await
456 }
457
458 pub(crate) async fn initialize_copy(&self, source: &str, destination: &str) -> Result<String> {
465 let response = self.onedrive_get_stat_plain(source).await?;
467 if !response.status().is_success() {
468 return Err(parse_error(response));
469 }
470
471 let destination_parent = get_parent(destination).to_string();
473 let basename = get_basename(destination);
474
475 let item = self.ensure_directory(&destination_parent).await?;
476 let body = OneDrivePatchRequestBody {
477 parent_reference: ParentReference {
478 path: "".to_string(), drive_id: item.parent_reference.drive_id,
480 id: item.id,
481 },
482 name: basename.to_string(),
483 };
484
485 let response = self.onedrive_get_stat_plain(destination).await?;
487 match response.status() {
488 StatusCode::OK => {
493 let response = self.onedrive_delete(destination).await?;
494 match response.status() {
495 StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => {} _ => return Err(parse_error(response)),
497 }
498 }
499 StatusCode::NOT_FOUND => {} _ => return Err(parse_error(response)),
501 }
502
503 let url: String = format!("{}:/copy", self.onedrive_item_url(source, true));
504
505 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
506 let buffer = Buffer::from(Bytes::from(body_bytes));
507 let mut request = Request::post(&url)
508 .header(header::CONTENT_TYPE, "application/json")
509 .extension(Operation::Copy)
510 .body(buffer)
511 .map_err(new_request_build_error)?;
512
513 self.sign(&mut request).await?;
514
515 let response = self.info.http_client().send(request).await?;
516 match response.status() {
517 StatusCode::ACCEPTED => parse_location(response.headers())?
518 .ok_or_else(|| {
519 Error::new(
520 ErrorKind::Unexpected,
521 "OneDrive didn't return a location URL",
522 )
523 })
524 .map(String::from),
525 _ => Err(parse_error(response)),
526 }
527 }
528
529 pub(crate) async fn wait_until_complete(&self, monitor_url: String) -> Result<()> {
530 for _attempt in 0..MAX_MONITOR_ATTEMPT {
531 let mut request = Request::get(monitor_url.to_string())
532 .header(header::CONTENT_TYPE, "application/json")
533 .extension(Operation::Copy)
534 .body(Buffer::new())
535 .map_err(new_request_build_error)?;
536
537 self.sign(&mut request).await?;
538
539 let response = self.info.http_client().send(request).await?;
540 let status: OneDriveMonitorStatus =
541 serde_json::from_reader(response.into_body().reader())
542 .map_err(new_json_deserialize_error)?;
543 if status.status == "completed" {
544 return Ok(());
545 }
546
547 tokio::time::sleep(Duration::from_secs(MONITOR_WAIT_SECOND)).await;
548 }
549
550 Err(Error::new(
551 ErrorKind::Unexpected,
552 "Exceed monitoring timeout",
553 ))
554 }
555
556 pub(crate) async fn onedrive_move(&self, source: &str, destination: &str) -> Result<()> {
557 let response = self.onedrive_get_stat_plain(source).await?;
559 if !response.status().is_success() {
560 return Err(Error::new(ErrorKind::NotFound, "source not found"));
561 }
562
563 let destination_parent = get_parent(destination).to_string();
565 let basename = get_basename(destination);
566
567 let item = self.ensure_directory(&destination_parent).await?;
568 let body = OneDrivePatchRequestBody {
569 parent_reference: ParentReference {
570 path: "".to_string(), drive_id: item.parent_reference.drive_id,
573 id: item.id,
574 },
575 name: basename.to_string(),
576 };
577 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
578 let buffer = Buffer::from(Bytes::from(body_bytes));
579 let url: String = format!(
580 "{}?@microsoft.graph.conflictBehavior={}&$select=id",
581 self.onedrive_item_url(source, true),
582 REPLACE_EXISTING_ITEM_WHEN_CONFLICT
583 );
584 let mut request = Request::patch(&url)
585 .header(header::CONTENT_TYPE, "application/json")
586 .extension(Operation::Rename)
587 .body(buffer)
588 .map_err(new_request_build_error)?;
589
590 self.sign(&mut request).await?;
591
592 let response = self.info.http_client().send(request).await?;
593 match response.status() {
594 StatusCode::OK => Ok(()),
596 _ => Err(parse_error(response)),
597 }
598 }
599}
600
601pub struct OneDriveSigner {
603 pub info: Arc<AccessorInfo>, pub client_id: String,
606 pub client_secret: String,
607 pub refresh_token: String,
608
609 pub access_token: String,
610 pub expires_in: Timestamp,
611}
612
613const ONEDRIVE_REFRESH_TOKEN: &str = "https://login.microsoftonline.com/common/oauth2/v2.0/token";
621
622impl OneDriveSigner {
623 pub fn new(info: Arc<AccessorInfo>) -> Self {
624 OneDriveSigner {
625 info,
626
627 client_id: "".to_string(),
628 client_secret: "".to_string(),
629 refresh_token: "".to_string(),
630 access_token: "".to_string(),
631 expires_in: Timestamp::MIN,
632 }
633 }
634
635 async fn refresh_tokens(&mut self) -> Result<()> {
636 let encoded_payload = format!(
638 "client_id={}&client_secret={}&scope=offline_access%20Files.ReadWrite&refresh_token={}&grant_type=refresh_token",
639 percent_encode_path(self.client_id.as_str()),
640 percent_encode_path(self.client_secret.as_str()),
641 percent_encode_path(self.refresh_token.as_str())
642 );
643 let request = Request::post(ONEDRIVE_REFRESH_TOKEN)
644 .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
645 .body(Buffer::from(encoded_payload))
646 .map_err(new_request_build_error)?;
647
648 let response = self.info.http_client().send(request).await?;
649 match response.status() {
650 StatusCode::OK => {
651 let resp_body = response.into_body();
652 let data: GraphOAuthRefreshTokenResponseBody =
653 serde_json::from_reader(resp_body.reader())
654 .map_err(new_json_deserialize_error)?;
655 self.access_token = data.access_token;
656 self.refresh_token = data.refresh_token;
657 self.expires_in = Timestamp::now() + Duration::from_secs(data.expires_in)
658 - Duration::from_secs(120); Ok(())
660 }
661 _ => Err(parse_error(response)),
662 }
663 }
664
665 pub async fn sign<T>(&mut self, request: &mut Request<T>) -> Result<()> {
667 if !self.access_token.is_empty() && self.expires_in > Timestamp::now() {
668 let value = format!("Bearer {}", self.access_token)
669 .parse()
670 .expect("access_token must be valid header value");
671
672 request.headers_mut().insert(header::AUTHORIZATION, value);
673 return Ok(());
674 }
675
676 self.refresh_tokens().await?;
677
678 let auth_header_content = format!("Bearer {}", self.access_token)
679 .parse()
680 .expect("Fetched access_token is invalid as a header value");
681
682 request
683 .headers_mut()
684 .insert(header::AUTHORIZATION, auth_header_content);
685
686 Ok(())
687 }
688}