opendal/services/onedrive/
core.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Buf;
24use bytes::Bytes;
25use chrono::DateTime;
26use chrono::Utc;
27use http::header;
28use http::Request;
29use http::Response;
30
31use http::StatusCode;
32use tokio::sync::Mutex;
33
34use super::error::parse_error;
35use super::graph_model::*;
36use crate::raw::*;
37use crate::*;
38
39pub struct OneDriveCore {
40 pub info: Arc<AccessorInfo>,
41 pub root: String,
42 pub signer: Arc<Mutex<OneDriveSigner>>,
43}
44
45impl Debug for OneDriveCore {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("OneDriveCore")
48 .field("root", &self.root)
49 .finish_non_exhaustive()
50 }
51}
52
53const SPECIAL_POSIX_ENTRIES: [&str; 3] = [".", "/", ""];
55
56impl OneDriveCore {
58 pub(crate) const DRIVE_ROOT_URL: &str = "https://graph.microsoft.com/v1.0/me/drive/root";
60
61 pub(crate) fn onedrive_item_url(&self, path: &str, build_absolute_path: bool) -> String {
63 if self.root == "/" && SPECIAL_POSIX_ENTRIES.contains(&path) {
66 Self::DRIVE_ROOT_URL.to_string()
67 } else {
68 let absolute_path = if build_absolute_path {
70 let rooted_path = build_rooted_abs_path(&self.root, path);
71 rooted_path
72 .strip_suffix('/')
73 .unwrap_or(rooted_path.as_str())
74 .to_string()
75 } else {
76 path.to_string()
77 };
78 format!(
79 "{}:{}",
80 Self::DRIVE_ROOT_URL,
81 percent_encode_path(&absolute_path),
82 )
83 }
84 }
85
86 pub(crate) async fn onedrive_get_stat_plain(&self, path: &str) -> Result<Response<Buffer>> {
90 let url: String = format!(
91 "{}?{}",
92 self.onedrive_item_url(path, true),
93 GENERAL_SELECT_PARAM
94 );
95 let request = Request::get(&url);
96
97 let mut request = request
98 .extension(Operation::Stat)
99 .body(Buffer::new())
100 .map_err(new_request_build_error)?;
101
102 self.sign(&mut request).await?;
103
104 self.info.http_client().send(request).await
105 }
106
107 pub(crate) async fn ensure_directory(&self, path: &str) -> Result<OneDriveItem> {
113 let response = self.onedrive_get_stat_plain(path).await?;
114 let item: OneDriveItem = match response.status() {
115 StatusCode::OK => {
116 let bytes = response.into_body();
117 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?
118 }
119 StatusCode::NOT_FOUND => {
120 let response = self.onedrive_create_dir(path).await?;
122 match response.status() {
123 StatusCode::CREATED | StatusCode::OK => {
124 let bytes = response.into_body();
125 serde_json::from_reader(bytes.reader())
126 .map_err(new_json_deserialize_error)?
127 }
128 _ => return Err(parse_error(response)),
129 }
130 }
131 _ => return Err(parse_error(response)),
132 };
133
134 Ok(item)
135 }
136
137 pub(crate) async fn sign<T>(&self, request: &mut Request<T>) -> Result<()> {
138 let mut signer = self.signer.lock().await;
139 signer.sign(request).await
140 }
141}
142
143const MAX_MONITOR_ATTEMPT: i32 = 3600;
146const MONITOR_WAIT_SECOND: u64 = 1;
147
148impl OneDriveCore {
156 pub(crate) async fn onedrive_stat(&self, path: &str, args: OpStat) -> Result<Metadata> {
163 let mut url: String = self.onedrive_item_url(path, true);
164 if args.version().is_some() {
165 url += "?$expand=versions(";
166 url += VERSION_SELECT_PARAM;
167 url += ")";
168 }
169
170 let mut request = Request::get(&url);
171 if let Some(etag) = args.if_none_match() {
172 request = request.header(header::IF_NONE_MATCH, etag);
173 }
174
175 let mut request = request
176 .extension(Operation::Stat)
177 .body(Buffer::new())
178 .map_err(new_request_build_error)?;
179
180 self.sign(&mut request).await?;
181
182 let response = self.info.http_client().send(request).await?;
183 if !response.status().is_success() {
184 return Err(parse_error(response));
185 }
186
187 let bytes = response.into_body();
188 let decoded_response: OneDriveItem =
189 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
190
191 let entry_mode: EntryMode = match decoded_response.item_type {
192 ItemType::Folder { .. } => EntryMode::DIR,
193 ItemType::File { .. } => EntryMode::FILE,
194 };
195
196 let mut meta = Metadata::new(entry_mode)
197 .with_etag(decoded_response.e_tag)
198 .with_content_length(decoded_response.size.max(0) as u64);
199
200 if let Some(version) = args.version() {
201 for item_version in decoded_response.versions.as_deref().unwrap_or_default() {
202 if item_version.id == version {
203 meta.set_version(version);
204 break; }
206 }
207
208 if meta.version().is_none() {
209 return Err(Error::new(
210 ErrorKind::NotFound,
211 "cannot find this version of the item",
212 ));
213 }
214 }
215
216 let last_modified = decoded_response.last_modified_date_time;
217 let date_utc_last_modified = parse_datetime_from_rfc3339(&last_modified)?;
218 meta.set_last_modified(date_utc_last_modified);
219
220 Ok(meta)
221 }
222
223 pub(crate) async fn onedrive_list_versions(
229 &self,
230 path: &str,
231 ) -> Result<Vec<OneDriveItemVersion>> {
232 let url: String = format!(
234 "{}:/versions?{}",
235 self.onedrive_item_url(path, true),
236 VERSION_SELECT_PARAM
237 );
238
239 let mut request = Request::get(url)
240 .extension(Operation::List)
241 .body(Buffer::new())
242 .map_err(new_request_build_error)?;
243
244 self.sign(&mut request).await?;
245
246 let response = self.info.http_client().send(request).await?;
247 let decoded_response: GraphApiOneDriveVersionsResponse =
248 serde_json::from_reader(response.into_body().reader())
249 .map_err(new_json_deserialize_error)?;
250 Ok(decoded_response.value)
251 }
252
253 pub(crate) async fn onedrive_get_next_list_page(&self, url: &str) -> Result<Response<Buffer>> {
254 let mut request = Request::get(url)
255 .extension(Operation::List)
256 .body(Buffer::new())
257 .map_err(new_request_build_error)?;
258
259 self.sign(&mut request).await?;
260
261 self.info.http_client().send(request).await
262 }
263
264 pub(crate) async fn onedrive_get_content(
274 &self,
275 path: &str,
276 args: &OpRead,
277 ) -> Result<Response<HttpBody>> {
278 let url: String = format!("{}:/content", self.onedrive_item_url(path, true));
280
281 let mut request = Request::get(&url).header(header::RANGE, args.range().to_header());
282 if let Some(etag) = args.if_none_match() {
283 request = request.header(header::IF_NONE_MATCH, etag);
284 }
285
286 let mut request = request
287 .extension(Operation::Read)
288 .body(Buffer::new())
289 .map_err(new_request_build_error)?;
290
291 self.sign(&mut request).await?;
292
293 self.info.http_client().fetch(request).await
294 }
295
296 pub async fn onedrive_upload_simple(
308 &self,
309 path: &str,
310 args: &OpWrite,
311 body: Buffer,
312 ) -> Result<Response<Buffer>> {
313 let url = format!(
314 "{}:/content?@microsoft.graph.conflictBehavior={}&{}",
315 self.onedrive_item_url(path, true),
316 REPLACE_EXISTING_ITEM_WHEN_CONFLICT,
317 GENERAL_SELECT_PARAM
318 );
319
320 let mut request = Request::put(&url)
327 .header(header::CONTENT_LENGTH, body.len())
328 .header(header::CONTENT_TYPE, "text/plain");
329
330 if let Some(if_match) = args.if_match() {
334 request = request.header(header::IF_MATCH, if_match);
335 }
336
337 let mut request = request
338 .extension(Operation::Write)
339 .body(body)
340 .map_err(new_request_build_error)?;
341
342 self.sign(&mut request).await?;
343
344 self.info.http_client().send(request).await
345 }
346
347 pub(crate) async fn onedrive_chunked_upload(
348 &self,
349 url: &str,
350 args: &OpWrite,
351 offset: usize,
352 chunk_end: usize,
353 total_len: usize,
354 body: Buffer,
355 ) -> Result<Response<Buffer>> {
356 let mut request = Request::put(url);
357
358 let range = format!("bytes {}-{}/{}", offset, chunk_end, total_len);
359 request = request.header(header::CONTENT_RANGE, range);
360
361 let size = chunk_end - offset + 1;
362 request = request.header(header::CONTENT_LENGTH, size);
363
364 if let Some(mime) = args.content_type() {
365 request = request.header(header::CONTENT_TYPE, mime)
366 }
367
368 let request = request
369 .extension(Operation::Write)
370 .body(body)
371 .map_err(new_request_build_error)?;
372 self.info.http_client().send(request).await
375 }
376
377 pub(crate) async fn onedrive_create_upload_session(
383 &self,
384 path: &str,
385 args: &OpWrite,
386 ) -> Result<Response<Buffer>> {
387 let parent_path = get_parent(path);
388 let file_name = get_basename(path);
389 let url = format!(
390 "{}:/createUploadSession",
391 self.onedrive_item_url(parent_path, true),
392 );
393 let mut request = Request::post(url).header(header::CONTENT_TYPE, "application/json");
394
395 if let Some(if_match) = args.if_match() {
396 request = request.header(header::IF_MATCH, if_match);
397 }
398
399 let body = OneDriveUploadSessionCreationRequestBody::new(file_name.to_string());
400 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
401 let body = Buffer::from(Bytes::from(body_bytes));
402 let mut request = request
403 .extension(Operation::Write)
404 .body(body)
405 .map_err(new_request_build_error)?;
406
407 self.sign(&mut request).await?;
408
409 self.info.http_client().send(request).await
410 }
411
412 pub(crate) async fn onedrive_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
419 let parent_path = get_parent(path);
420 let basename = get_basename(path);
421 let folder_name = basename.strip_suffix('/').unwrap_or(basename);
422
423 let url = format!(
424 "{}:/children?{}",
425 self.onedrive_item_url(parent_path, true),
426 GENERAL_SELECT_PARAM
427 );
428
429 let payload = CreateDirPayload::new(folder_name.to_string());
430 let body_bytes = serde_json::to_vec(&payload).map_err(new_json_serialize_error)?;
431 let body = Buffer::from(bytes::Bytes::from(body_bytes));
432
433 let mut request = Request::post(url)
434 .header(header::CONTENT_TYPE, "application/json")
435 .extension(Operation::CreateDir)
436 .body(body)
437 .map_err(new_request_build_error)?;
438
439 self.sign(&mut request).await?;
440
441 self.info.http_client().send(request).await
442 }
443
444 pub(crate) async fn onedrive_delete(&self, path: &str) -> Result<Response<Buffer>> {
448 let url = self.onedrive_item_url(path, true);
449
450 let mut request = Request::delete(&url)
451 .extension(Operation::Delete)
452 .body(Buffer::new())
453 .map_err(new_request_build_error)?;
454
455 self.sign(&mut request).await?;
456
457 self.info.http_client().send(request).await
458 }
459
460 pub(crate) async fn initialize_copy(&self, source: &str, destination: &str) -> Result<String> {
467 let response = self.onedrive_get_stat_plain(source).await?;
469 if !response.status().is_success() {
470 return Err(parse_error(response));
471 }
472
473 let destination_parent = get_parent(destination).to_string();
475 let basename = get_basename(destination);
476
477 let item = self.ensure_directory(&destination_parent).await?;
478 let body = OneDrivePatchRequestBody {
479 parent_reference: ParentReference {
480 path: "".to_string(), drive_id: item.parent_reference.drive_id,
482 id: item.id,
483 },
484 name: basename.to_string(),
485 };
486
487 let response = self.onedrive_get_stat_plain(destination).await?;
489 match response.status() {
490 StatusCode::OK => {
495 let response = self.onedrive_delete(destination).await?;
496 match response.status() {
497 StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => {} _ => return Err(parse_error(response)),
499 }
500 }
501 StatusCode::NOT_FOUND => {} _ => return Err(parse_error(response)),
503 }
504
505 let url: String = format!("{}:/copy", self.onedrive_item_url(source, true));
506
507 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
508 let buffer = Buffer::from(Bytes::from(body_bytes));
509 let mut request = Request::post(&url)
510 .header(header::CONTENT_TYPE, "application/json")
511 .extension(Operation::Copy)
512 .body(buffer)
513 .map_err(new_request_build_error)?;
514
515 self.sign(&mut request).await?;
516
517 let response = self.info.http_client().send(request).await?;
518 match response.status() {
519 StatusCode::ACCEPTED => parse_location(response.headers())?
520 .ok_or_else(|| {
521 Error::new(
522 ErrorKind::Unexpected,
523 "OneDrive didn't return a location URL",
524 )
525 })
526 .map(String::from),
527 _ => Err(parse_error(response)),
528 }
529 }
530
531 pub(crate) async fn wait_until_complete(&self, monitor_url: String) -> Result<()> {
532 for _attempt in 0..MAX_MONITOR_ATTEMPT {
533 let mut request = Request::get(monitor_url.to_string())
534 .header(header::CONTENT_TYPE, "application/json")
535 .extension(Operation::Copy)
536 .body(Buffer::new())
537 .map_err(new_request_build_error)?;
538
539 self.sign(&mut request).await?;
540
541 let response = self.info.http_client().send(request).await?;
542 let status: OneDriveMonitorStatus =
543 serde_json::from_reader(response.into_body().reader())
544 .map_err(new_json_deserialize_error)?;
545 if status.status == "completed" {
546 return Ok(());
547 }
548
549 tokio::time::sleep(Duration::from_secs(MONITOR_WAIT_SECOND)).await;
550 }
551
552 Err(Error::new(
553 ErrorKind::Unexpected,
554 "Exceed monitoring timeout",
555 ))
556 }
557
558 pub(crate) async fn onedrive_move(&self, source: &str, destination: &str) -> Result<()> {
559 let response = self.onedrive_get_stat_plain(source).await?;
561 if !response.status().is_success() {
562 return Err(Error::new(ErrorKind::NotFound, "source not found"));
563 }
564
565 let destination_parent = get_parent(destination).to_string();
567 let basename = get_basename(destination);
568
569 let item = self.ensure_directory(&destination_parent).await?;
570 let body = OneDrivePatchRequestBody {
571 parent_reference: ParentReference {
572 path: "".to_string(), drive_id: item.parent_reference.drive_id,
575 id: item.id,
576 },
577 name: basename.to_string(),
578 };
579 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
580 let buffer = Buffer::from(Bytes::from(body_bytes));
581 let url: String = format!(
582 "{}?@microsoft.graph.conflictBehavior={}&$select=id",
583 self.onedrive_item_url(source, true),
584 REPLACE_EXISTING_ITEM_WHEN_CONFLICT
585 );
586 let mut request = Request::patch(&url)
587 .header(header::CONTENT_TYPE, "application/json")
588 .extension(Operation::Rename)
589 .body(buffer)
590 .map_err(new_request_build_error)?;
591
592 self.sign(&mut request).await?;
593
594 let response = self.info.http_client().send(request).await?;
595 match response.status() {
596 StatusCode::OK => Ok(()),
598 _ => Err(parse_error(response)),
599 }
600 }
601}
602
603pub struct OneDriveSigner {
605 pub info: Arc<AccessorInfo>, pub client_id: String,
608 pub client_secret: String,
609 pub refresh_token: String,
610
611 pub access_token: String,
612 pub expires_in: DateTime<Utc>,
613}
614
615const ONEDRIVE_REFRESH_TOKEN: &str = "https://login.microsoftonline.com/common/oauth2/v2.0/token";
623
624impl OneDriveSigner {
625 pub fn new(info: Arc<AccessorInfo>) -> Self {
626 OneDriveSigner {
627 info,
628
629 client_id: "".to_string(),
630 client_secret: "".to_string(),
631 refresh_token: "".to_string(),
632 access_token: "".to_string(),
633 expires_in: DateTime::<Utc>::MIN_UTC,
634 }
635 }
636
637 async fn refresh_tokens(&mut self) -> Result<()> {
638 let encoded_payload = format!(
640 "client_id={}&client_secret={}&scope=offline_access%20Files.ReadWrite&refresh_token={}&grant_type=refresh_token",
641 percent_encode_path(self.client_id.as_str()),
642 percent_encode_path(self.client_secret.as_str()),
643 percent_encode_path(self.refresh_token.as_str())
644 );
645 let request = Request::post(ONEDRIVE_REFRESH_TOKEN)
646 .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
647 .body(Buffer::from(encoded_payload))
648 .map_err(new_request_build_error)?;
649
650 let response = self.info.http_client().send(request).await?;
651 match response.status() {
652 StatusCode::OK => {
653 let resp_body = response.into_body();
654 let data: GraphOAuthRefreshTokenResponseBody =
655 serde_json::from_reader(resp_body.reader())
656 .map_err(new_json_deserialize_error)?;
657 self.access_token = data.access_token;
658 self.refresh_token = data.refresh_token;
659 self.expires_in = Utc::now()
660 + chrono::TimeDelta::try_seconds(data.expires_in)
661 .expect("expires_in must be valid seconds")
662 - chrono::TimeDelta::minutes(2); Ok(())
664 }
665 _ => Err(parse_error(response)),
666 }
667 }
668
669 pub async fn sign<T>(&mut self, request: &mut Request<T>) -> Result<()> {
671 if !self.access_token.is_empty() && self.expires_in > Utc::now() {
672 let value = format!("Bearer {}", self.access_token)
673 .parse()
674 .expect("access_token must be valid header value");
675
676 request.headers_mut().insert(header::AUTHORIZATION, value);
677 return Ok(());
678 }
679
680 self.refresh_tokens().await?;
681
682 let auth_header_content = format!("Bearer {}", self.access_token)
683 .parse()
684 .expect("Fetched access_token is invalid as a header value");
685
686 request
687 .headers_mut()
688 .insert(header::AUTHORIZATION, auth_header_content);
689
690 Ok(())
691 }
692}