opendal/services/onedrive/
core.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Buf;
24use bytes::Bytes;
25use chrono::DateTime;
26use chrono::Utc;
27use http::header;
28use http::Request;
29use http::Response;
30use http::StatusCode;
31use tokio::sync::Mutex;
32
33use super::error::parse_error;
34use super::graph_model::*;
35use crate::raw::*;
36use crate::*;
37
38pub struct OneDriveCore {
39 pub info: Arc<AccessorInfo>,
40 pub root: String,
41 pub signer: Arc<Mutex<OneDriveSigner>>,
42}
43
44impl Debug for OneDriveCore {
45 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
46 f.debug_struct("OneDriveCore")
47 .field("root", &self.root)
48 .finish_non_exhaustive()
49 }
50}
51
52const SPECIAL_POSIX_ENTRIES: [&str; 3] = [".", "/", ""];
54
55impl OneDriveCore {
57 pub(crate) const DRIVE_ROOT_URL: &str = "https://graph.microsoft.com/v1.0/me/drive/root";
59
60 pub(crate) fn onedrive_item_url(&self, path: &str, build_absolute_path: bool) -> String {
62 if self.root == "/" && SPECIAL_POSIX_ENTRIES.contains(&path) {
65 Self::DRIVE_ROOT_URL.to_string()
66 } else {
67 let absolute_path = if build_absolute_path {
69 let rooted_path = build_rooted_abs_path(&self.root, path);
70 rooted_path
71 .strip_suffix('/')
72 .unwrap_or(rooted_path.as_str())
73 .to_string()
74 } else {
75 path.to_string()
76 };
77 format!(
78 "{}:{}",
79 Self::DRIVE_ROOT_URL,
80 percent_encode_path(&absolute_path),
81 )
82 }
83 }
84
85 pub(crate) async fn onedrive_get_stat_plain(&self, path: &str) -> Result<Response<Buffer>> {
89 let url: String = format!(
90 "{}?{}",
91 self.onedrive_item_url(path, true),
92 GENERAL_SELECT_PARAM
93 );
94 let request = Request::get(&url);
95
96 let mut request = request
97 .extension(Operation::Stat)
98 .body(Buffer::new())
99 .map_err(new_request_build_error)?;
100
101 self.sign(&mut request).await?;
102
103 self.info.http_client().send(request).await
104 }
105
106 pub(crate) async fn ensure_directory(&self, path: &str) -> Result<OneDriveItem> {
112 let response = self.onedrive_get_stat_plain(path).await?;
113 let item: OneDriveItem = match response.status() {
114 StatusCode::OK => {
115 let bytes = response.into_body();
116 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?
117 }
118 StatusCode::NOT_FOUND => {
119 let response = self.onedrive_create_dir(path).await?;
121 match response.status() {
122 StatusCode::CREATED | StatusCode::OK => {
123 let bytes = response.into_body();
124 serde_json::from_reader(bytes.reader())
125 .map_err(new_json_deserialize_error)?
126 }
127 _ => return Err(parse_error(response)),
128 }
129 }
130 _ => return Err(parse_error(response)),
131 };
132
133 Ok(item)
134 }
135
136 pub(crate) async fn sign<T>(&self, request: &mut Request<T>) -> Result<()> {
137 let mut signer = self.signer.lock().await;
138 signer.sign(request).await
139 }
140}
141
142const MAX_MONITOR_ATTEMPT: i32 = 3600;
145const MONITOR_WAIT_SECOND: u64 = 1;
146
147impl OneDriveCore {
155 pub(crate) async fn onedrive_stat(&self, path: &str, args: OpStat) -> Result<Metadata> {
162 let mut url: String = self.onedrive_item_url(path, true);
163 if args.version().is_some() {
164 url += "?$expand=versions(";
165 url += VERSION_SELECT_PARAM;
166 url += ")";
167 }
168
169 let mut request = Request::get(&url);
170 if let Some(etag) = args.if_none_match() {
171 request = request.header(header::IF_NONE_MATCH, etag);
172 }
173
174 let mut request = request
175 .extension(Operation::Stat)
176 .body(Buffer::new())
177 .map_err(new_request_build_error)?;
178
179 self.sign(&mut request).await?;
180
181 let response = self.info.http_client().send(request).await?;
182 if !response.status().is_success() {
183 return Err(parse_error(response));
184 }
185
186 let bytes = response.into_body();
187 let decoded_response: OneDriveItem =
188 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
189
190 let entry_mode: EntryMode = match decoded_response.item_type {
191 ItemType::Folder { .. } => EntryMode::DIR,
192 ItemType::File { .. } => EntryMode::FILE,
193 };
194
195 let mut meta = Metadata::new(entry_mode)
196 .with_etag(decoded_response.e_tag)
197 .with_content_length(decoded_response.size.max(0) as u64);
198
199 if let Some(version) = args.version() {
200 for item_version in decoded_response.versions.as_deref().unwrap_or_default() {
201 if item_version.id == version {
202 meta.set_version(version);
203 break; }
205 }
206
207 if meta.version().is_none() {
208 return Err(Error::new(
209 ErrorKind::NotFound,
210 "cannot find this version of the item",
211 ));
212 }
213 }
214
215 let last_modified = decoded_response.last_modified_date_time;
216 let date_utc_last_modified = parse_datetime_from_rfc3339(&last_modified)?;
217 meta.set_last_modified(date_utc_last_modified);
218
219 Ok(meta)
220 }
221
222 pub(crate) async fn onedrive_list_versions(
228 &self,
229 path: &str,
230 ) -> Result<Vec<OneDriveItemVersion>> {
231 let url: String = format!(
233 "{}:/versions?{}",
234 self.onedrive_item_url(path, true),
235 VERSION_SELECT_PARAM
236 );
237
238 let mut request = Request::get(url)
239 .extension(Operation::List)
240 .body(Buffer::new())
241 .map_err(new_request_build_error)?;
242
243 self.sign(&mut request).await?;
244
245 let response = self.info.http_client().send(request).await?;
246 let decoded_response: GraphApiOneDriveVersionsResponse =
247 serde_json::from_reader(response.into_body().reader())
248 .map_err(new_json_deserialize_error)?;
249 Ok(decoded_response.value)
250 }
251
252 pub(crate) async fn onedrive_get_next_list_page(&self, url: &str) -> Result<Response<Buffer>> {
253 let mut request = Request::get(url)
254 .extension(Operation::List)
255 .body(Buffer::new())
256 .map_err(new_request_build_error)?;
257
258 self.sign(&mut request).await?;
259
260 self.info.http_client().send(request).await
261 }
262
263 pub(crate) async fn onedrive_get_content(
273 &self,
274 path: &str,
275 args: &OpRead,
276 ) -> Result<Response<HttpBody>> {
277 let url: String = format!("{}:/content", self.onedrive_item_url(path, true));
279
280 let mut request = Request::get(&url).header(header::RANGE, args.range().to_header());
281 if let Some(etag) = args.if_none_match() {
282 request = request.header(header::IF_NONE_MATCH, etag);
283 }
284
285 let mut request = request
286 .extension(Operation::Read)
287 .body(Buffer::new())
288 .map_err(new_request_build_error)?;
289
290 self.sign(&mut request).await?;
291
292 self.info.http_client().fetch(request).await
293 }
294
295 pub async fn onedrive_upload_simple(
307 &self,
308 path: &str,
309 args: &OpWrite,
310 body: Buffer,
311 ) -> Result<Response<Buffer>> {
312 let url = format!(
313 "{}:/content?@microsoft.graph.conflictBehavior={}&{}",
314 self.onedrive_item_url(path, true),
315 REPLACE_EXISTING_ITEM_WHEN_CONFLICT,
316 GENERAL_SELECT_PARAM
317 );
318
319 let mut request = Request::put(&url)
326 .header(header::CONTENT_LENGTH, body.len())
327 .header(header::CONTENT_TYPE, "text/plain");
328
329 if let Some(if_match) = args.if_match() {
333 request = request.header(header::IF_MATCH, if_match);
334 }
335
336 let mut request = request
337 .extension(Operation::Write)
338 .body(body)
339 .map_err(new_request_build_error)?;
340
341 self.sign(&mut request).await?;
342
343 self.info.http_client().send(request).await
344 }
345
346 pub(crate) async fn onedrive_chunked_upload(
347 &self,
348 url: &str,
349 args: &OpWrite,
350 offset: usize,
351 chunk_end: usize,
352 total_len: usize,
353 body: Buffer,
354 ) -> Result<Response<Buffer>> {
355 let mut request = Request::put(url);
356
357 let range = format!("bytes {}-{}/{}", offset, chunk_end, total_len);
358 request = request.header(header::CONTENT_RANGE, range);
359
360 let size = chunk_end - offset + 1;
361 request = request.header(header::CONTENT_LENGTH, size);
362
363 if let Some(mime) = args.content_type() {
364 request = request.header(header::CONTENT_TYPE, mime)
365 }
366
367 let request = request
368 .extension(Operation::Write)
369 .body(body)
370 .map_err(new_request_build_error)?;
371 self.info.http_client().send(request).await
374 }
375
376 pub(crate) async fn onedrive_create_upload_session(
382 &self,
383 path: &str,
384 args: &OpWrite,
385 ) -> Result<Response<Buffer>> {
386 let parent_path = get_parent(path);
387 let file_name = get_basename(path);
388 let url = format!(
389 "{}:/createUploadSession",
390 self.onedrive_item_url(parent_path, true),
391 );
392 let mut request = Request::post(url).header(header::CONTENT_TYPE, "application/json");
393
394 if let Some(if_match) = args.if_match() {
395 request = request.header(header::IF_MATCH, if_match);
396 }
397
398 let body = OneDriveUploadSessionCreationRequestBody::new(file_name.to_string());
399 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
400 let body = Buffer::from(Bytes::from(body_bytes));
401 let mut request = request
402 .extension(Operation::Write)
403 .body(body)
404 .map_err(new_request_build_error)?;
405
406 self.sign(&mut request).await?;
407
408 self.info.http_client().send(request).await
409 }
410
411 pub(crate) async fn onedrive_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
418 let parent_path = get_parent(path);
419 let basename = get_basename(path);
420 let folder_name = basename.strip_suffix('/').unwrap_or(basename);
421
422 let url = format!(
423 "{}:/children?{}",
424 self.onedrive_item_url(parent_path, true),
425 GENERAL_SELECT_PARAM
426 );
427
428 let payload = CreateDirPayload::new(folder_name.to_string());
429 let body_bytes = serde_json::to_vec(&payload).map_err(new_json_serialize_error)?;
430 let body = Buffer::from(bytes::Bytes::from(body_bytes));
431
432 let mut request = Request::post(url)
433 .header(header::CONTENT_TYPE, "application/json")
434 .extension(Operation::CreateDir)
435 .body(body)
436 .map_err(new_request_build_error)?;
437
438 self.sign(&mut request).await?;
439
440 self.info.http_client().send(request).await
441 }
442
443 pub(crate) async fn onedrive_delete(&self, path: &str) -> Result<Response<Buffer>> {
447 let url = self.onedrive_item_url(path, true);
448
449 let mut request = Request::delete(&url)
450 .extension(Operation::Delete)
451 .body(Buffer::new())
452 .map_err(new_request_build_error)?;
453
454 self.sign(&mut request).await?;
455
456 self.info.http_client().send(request).await
457 }
458
459 pub(crate) async fn initialize_copy(&self, source: &str, destination: &str) -> Result<String> {
466 let response = self.onedrive_get_stat_plain(source).await?;
468 if !response.status().is_success() {
469 return Err(parse_error(response));
470 }
471
472 let destination_parent = get_parent(destination).to_string();
474 let basename = get_basename(destination);
475
476 let item = self.ensure_directory(&destination_parent).await?;
477 let body = OneDrivePatchRequestBody {
478 parent_reference: ParentReference {
479 path: "".to_string(), drive_id: item.parent_reference.drive_id,
481 id: item.id,
482 },
483 name: basename.to_string(),
484 };
485
486 let response = self.onedrive_get_stat_plain(destination).await?;
488 match response.status() {
489 StatusCode::OK => {
494 let response = self.onedrive_delete(destination).await?;
495 match response.status() {
496 StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => {} _ => return Err(parse_error(response)),
498 }
499 }
500 StatusCode::NOT_FOUND => {} _ => return Err(parse_error(response)),
502 }
503
504 let url: String = format!("{}:/copy", self.onedrive_item_url(source, true));
505
506 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
507 let buffer = Buffer::from(Bytes::from(body_bytes));
508 let mut request = Request::post(&url)
509 .header(header::CONTENT_TYPE, "application/json")
510 .extension(Operation::Copy)
511 .body(buffer)
512 .map_err(new_request_build_error)?;
513
514 self.sign(&mut request).await?;
515
516 let response = self.info.http_client().send(request).await?;
517 match response.status() {
518 StatusCode::ACCEPTED => parse_location(response.headers())?
519 .ok_or_else(|| {
520 Error::new(
521 ErrorKind::Unexpected,
522 "OneDrive didn't return a location URL",
523 )
524 })
525 .map(String::from),
526 _ => Err(parse_error(response)),
527 }
528 }
529
530 pub(crate) async fn wait_until_complete(&self, monitor_url: String) -> Result<()> {
531 for _attempt in 0..MAX_MONITOR_ATTEMPT {
532 let mut request = Request::get(monitor_url.to_string())
533 .header(header::CONTENT_TYPE, "application/json")
534 .extension(Operation::Copy)
535 .body(Buffer::new())
536 .map_err(new_request_build_error)?;
537
538 self.sign(&mut request).await?;
539
540 let response = self.info.http_client().send(request).await?;
541 let status: OneDriveMonitorStatus =
542 serde_json::from_reader(response.into_body().reader())
543 .map_err(new_json_deserialize_error)?;
544 if status.status == "completed" {
545 return Ok(());
546 }
547
548 tokio::time::sleep(Duration::from_secs(MONITOR_WAIT_SECOND)).await;
549 }
550
551 Err(Error::new(
552 ErrorKind::Unexpected,
553 "Exceed monitoring timeout",
554 ))
555 }
556
557 pub(crate) async fn onedrive_move(&self, source: &str, destination: &str) -> Result<()> {
558 let response = self.onedrive_get_stat_plain(source).await?;
560 if !response.status().is_success() {
561 return Err(Error::new(ErrorKind::NotFound, "source not found"));
562 }
563
564 let destination_parent = get_parent(destination).to_string();
566 let basename = get_basename(destination);
567
568 let item = self.ensure_directory(&destination_parent).await?;
569 let body = OneDrivePatchRequestBody {
570 parent_reference: ParentReference {
571 path: "".to_string(), drive_id: item.parent_reference.drive_id,
574 id: item.id,
575 },
576 name: basename.to_string(),
577 };
578 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
579 let buffer = Buffer::from(Bytes::from(body_bytes));
580 let url: String = format!(
581 "{}?@microsoft.graph.conflictBehavior={}&$select=id",
582 self.onedrive_item_url(source, true),
583 REPLACE_EXISTING_ITEM_WHEN_CONFLICT
584 );
585 let mut request = Request::patch(&url)
586 .header(header::CONTENT_TYPE, "application/json")
587 .extension(Operation::Rename)
588 .body(buffer)
589 .map_err(new_request_build_error)?;
590
591 self.sign(&mut request).await?;
592
593 let response = self.info.http_client().send(request).await?;
594 match response.status() {
595 StatusCode::OK => Ok(()),
597 _ => Err(parse_error(response)),
598 }
599 }
600}
601
602pub struct OneDriveSigner {
604 pub info: Arc<AccessorInfo>, pub client_id: String,
607 pub client_secret: String,
608 pub refresh_token: String,
609
610 pub access_token: String,
611 pub expires_in: DateTime<Utc>,
612}
613
614const ONEDRIVE_REFRESH_TOKEN: &str = "https://login.microsoftonline.com/common/oauth2/v2.0/token";
622
623impl OneDriveSigner {
624 pub fn new(info: Arc<AccessorInfo>) -> Self {
625 OneDriveSigner {
626 info,
627
628 client_id: "".to_string(),
629 client_secret: "".to_string(),
630 refresh_token: "".to_string(),
631 access_token: "".to_string(),
632 expires_in: DateTime::<Utc>::MIN_UTC,
633 }
634 }
635
636 async fn refresh_tokens(&mut self) -> Result<()> {
637 let encoded_payload = format!(
639 "client_id={}&client_secret={}&scope=offline_access%20Files.ReadWrite&refresh_token={}&grant_type=refresh_token",
640 percent_encode_path(self.client_id.as_str()),
641 percent_encode_path(self.client_secret.as_str()),
642 percent_encode_path(self.refresh_token.as_str())
643 );
644 let request = Request::post(ONEDRIVE_REFRESH_TOKEN)
645 .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
646 .body(Buffer::from(encoded_payload))
647 .map_err(new_request_build_error)?;
648
649 let response = self.info.http_client().send(request).await?;
650 match response.status() {
651 StatusCode::OK => {
652 let resp_body = response.into_body();
653 let data: GraphOAuthRefreshTokenResponseBody =
654 serde_json::from_reader(resp_body.reader())
655 .map_err(new_json_deserialize_error)?;
656 self.access_token = data.access_token;
657 self.refresh_token = data.refresh_token;
658 self.expires_in = Utc::now()
659 + chrono::TimeDelta::try_seconds(data.expires_in)
660 .expect("expires_in must be valid seconds")
661 - chrono::TimeDelta::minutes(2); Ok(())
663 }
664 _ => Err(parse_error(response)),
665 }
666 }
667
668 pub async fn sign<T>(&mut self, request: &mut Request<T>) -> Result<()> {
670 if !self.access_token.is_empty() && self.expires_in > Utc::now() {
671 let value = format!("Bearer {}", self.access_token)
672 .parse()
673 .expect("access_token must be valid header value");
674
675 request.headers_mut().insert(header::AUTHORIZATION, value);
676 return Ok(());
677 }
678
679 self.refresh_tokens().await?;
680
681 let auth_header_content = format!("Bearer {}", self.access_token)
682 .parse()
683 .expect("Fetched access_token is invalid as a header value");
684
685 request
686 .headers_mut()
687 .insert(header::AUTHORIZATION, auth_header_content);
688
689 Ok(())
690 }
691}