opendal/services/onedrive/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Buf;
24use bytes::Bytes;
25use chrono::DateTime;
26use chrono::Utc;
27use http::header;
28use http::Request;
29use http::Response;
30
31use http::StatusCode;
32use tokio::sync::Mutex;
33
34use super::error::parse_error;
35use super::graph_model::*;
36use crate::raw::*;
37use crate::*;
38
39pub struct OneDriveCore {
40    pub info: Arc<AccessorInfo>,
41    pub root: String,
42    pub signer: Arc<Mutex<OneDriveSigner>>,
43}
44
45impl Debug for OneDriveCore {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("OneDriveCore")
48            .field("root", &self.root)
49            .finish_non_exhaustive()
50    }
51}
52
53// OneDrive returns 400 when try to access a dir with the POSIX special directory entries
54const SPECIAL_POSIX_ENTRIES: [&str; 3] = [".", "/", ""];
55
56// organizes a few core module functions
57impl OneDriveCore {
58    // OneDrive personal's base URL. `me` is an alias that represents the user's "Drive".
59    pub(crate) const DRIVE_ROOT_URL: &str = "https://graph.microsoft.com/v1.0/me/drive/root";
60
61    /// Get a URL to an OneDrive item
62    pub(crate) fn onedrive_item_url(&self, path: &str, build_absolute_path: bool) -> String {
63        // OneDrive requires the root to be the same as `DRIVE_ROOT_URL`.
64        // For files under the root, the URL pattern becomes `https://graph.microsoft.com/v1.0/me/drive/root:<path>:`
65        if self.root == "/" && SPECIAL_POSIX_ENTRIES.contains(&path) {
66            Self::DRIVE_ROOT_URL.to_string()
67        } else {
68            // OneDrive returns 400 when try to access a folder with a ending slash
69            let absolute_path = if build_absolute_path {
70                let rooted_path = build_rooted_abs_path(&self.root, path);
71                rooted_path
72                    .strip_suffix('/')
73                    .unwrap_or(rooted_path.as_str())
74                    .to_string()
75            } else {
76                path.to_string()
77            };
78            format!(
79                "{}:{}",
80                Self::DRIVE_ROOT_URL,
81                percent_encode_path(&absolute_path),
82            )
83        }
84    }
85
86    /// Send a simplest stat request about a particular path
87    ///
88    /// See also: [`onedrive_stat()`].
89    pub(crate) async fn onedrive_get_stat_plain(&self, path: &str) -> Result<Response<Buffer>> {
90        let url: String = format!(
91            "{}?{}",
92            self.onedrive_item_url(path, true),
93            GENERAL_SELECT_PARAM
94        );
95        let request = Request::get(&url);
96
97        let mut request = request
98            .body(Buffer::new())
99            .map_err(new_request_build_error)?;
100
101        self.sign(&mut request).await?;
102
103        self.info.http_client().send(request).await
104    }
105
106    /// Create a directory at path if not exist, return the metadata about the folder
107    ///
108    /// When the folder exist, this function works exactly the same as [`onedrive_get_stat_plain()`].
109    ///
110    /// * `path` - a relative folder path
111    pub(crate) async fn ensure_directory(&self, path: &str) -> Result<OneDriveItem> {
112        let response = self.onedrive_get_stat_plain(path).await?;
113        let item: OneDriveItem = match response.status() {
114            StatusCode::OK => {
115                let bytes = response.into_body();
116                serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?
117            }
118            StatusCode::NOT_FOUND => {
119                // We must create directory for the destination
120                let response = self.onedrive_create_dir(path).await?;
121                match response.status() {
122                    StatusCode::CREATED | StatusCode::OK => {
123                        let bytes = response.into_body();
124                        serde_json::from_reader(bytes.reader())
125                            .map_err(new_json_deserialize_error)?
126                    }
127                    _ => return Err(parse_error(response)),
128                }
129            }
130            _ => return Err(parse_error(response)),
131        };
132
133        Ok(item)
134    }
135
136    pub(crate) async fn sign<T>(&self, request: &mut Request<T>) -> Result<()> {
137        let mut signer = self.signer.lock().await;
138        signer.sign(request).await
139    }
140}
141
142// OneDrive copy action is asynchronous. We query an endpoint and wait 1 second.
143// This is the maximum attempts we will wait.
144const MAX_MONITOR_ATTEMPT: i32 = 3600;
145const MONITOR_WAIT_SECOND: u64 = 1;
146
147// OneDrive API parameters allows using with a parameter of:
148//
149// - ID
150// - file path
151//
152// `services-onedrive` uses the file path based API for simplicity.
153// Read more at https://learn.microsoft.com/en-us/graph/onedrive-addressing-driveitems
154impl OneDriveCore {
155    /// Send a stat request about a particular path, including:
156    ///
157    /// - Get stat object only if ETag not matches
158    /// - whether to get the object version
159    ///
160    /// See also [`onedrive_get_stat_plain()`].
161    pub(crate) async fn onedrive_stat(&self, path: &str, args: OpStat) -> Result<Metadata> {
162        let mut url: String = self.onedrive_item_url(path, true);
163        if args.version().is_some() {
164            url += "?$expand=versions(";
165            url += VERSION_SELECT_PARAM;
166            url += ")";
167        }
168
169        let mut request = Request::get(&url);
170        if let Some(etag) = args.if_none_match() {
171            request = request.header(header::IF_NONE_MATCH, etag);
172        }
173
174        let mut request = request
175            .body(Buffer::new())
176            .map_err(new_request_build_error)?;
177
178        self.sign(&mut request).await?;
179
180        let response = self.info.http_client().send(request).await?;
181        if !response.status().is_success() {
182            return Err(parse_error(response));
183        }
184
185        let bytes = response.into_body();
186        let decoded_response: OneDriveItem =
187            serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
188
189        let entry_mode: EntryMode = match decoded_response.item_type {
190            ItemType::Folder { .. } => EntryMode::DIR,
191            ItemType::File { .. } => EntryMode::FILE,
192        };
193
194        let mut meta = Metadata::new(entry_mode)
195            .with_etag(decoded_response.e_tag)
196            .with_content_length(decoded_response.size.max(0) as u64);
197
198        if let Some(version) = args.version() {
199            for item_version in decoded_response.versions.as_deref().unwrap_or_default() {
200                if item_version.id == version {
201                    meta.set_version(version);
202                    break; // early exit
203                }
204            }
205
206            if meta.version().is_none() {
207                return Err(Error::new(
208                    ErrorKind::NotFound,
209                    "cannot find this version of the item",
210                ));
211            }
212        }
213
214        let last_modified = decoded_response.last_modified_date_time;
215        let date_utc_last_modified = parse_datetime_from_rfc3339(&last_modified)?;
216        meta.set_last_modified(date_utc_last_modified);
217
218        Ok(meta)
219    }
220
221    /// Return versions of an item
222    ///
223    /// A folder has no versions.
224    ///
225    /// * `path` - a relative path
226    pub(crate) async fn onedrive_list_versions(
227        &self,
228        path: &str,
229    ) -> Result<Vec<OneDriveItemVersion>> {
230        // don't `$select` this endpoint to get the download URL.
231        let url: String = format!(
232            "{}:/versions?{}",
233            self.onedrive_item_url(path, true),
234            VERSION_SELECT_PARAM
235        );
236
237        let mut request = Request::get(url)
238            .body(Buffer::new())
239            .map_err(new_request_build_error)?;
240
241        self.sign(&mut request).await?;
242
243        let response = self.info.http_client().send(request).await?;
244        let decoded_response: GraphApiOneDriveVersionsResponse =
245            serde_json::from_reader(response.into_body().reader())
246                .map_err(new_json_deserialize_error)?;
247        Ok(decoded_response.value)
248    }
249
250    pub(crate) async fn onedrive_get_next_list_page(&self, url: &str) -> Result<Response<Buffer>> {
251        let mut request = Request::get(url)
252            .body(Buffer::new())
253            .map_err(new_request_build_error)?;
254
255        self.sign(&mut request).await?;
256
257        self.info.http_client().send(request).await
258    }
259
260    /// Download a file
261    ///
262    /// OneDrive handles a download in 2 steps:
263    /// 1. Returns a 302 with a presigned URL. If `If-None-Match` succeed, returns 304.
264    /// 2. With the presigned URL, we can send a GET:
265    ///   1. When getting an item succeed with a `Range` header, we get a 206 Partial Content response.
266    ///   2. When succeed, we get a 200 response.
267    ///
268    /// Read more at https://learn.microsoft.com/en-us/graph/api/driveitem-get-content
269    pub(crate) async fn onedrive_get_content(
270        &self,
271        path: &str,
272        args: &OpRead,
273    ) -> Result<Response<HttpBody>> {
274        // We can't "select" the OneDrive API response fields when reading because "select" shadows not found error
275        let url: String = format!("{}:/content", self.onedrive_item_url(path, true));
276
277        let mut request = Request::get(&url).header(header::RANGE, args.range().to_header());
278        if let Some(etag) = args.if_none_match() {
279            request = request.header(header::IF_NONE_MATCH, etag);
280        }
281
282        let mut request = request
283            .body(Buffer::new())
284            .map_err(new_request_build_error)?;
285
286        self.sign(&mut request).await?;
287
288        self.info.http_client().fetch(request).await
289    }
290
291    /// Upload a file
292    ///
293    /// When creating a file,
294    ///
295    /// * OneDrive returns 201 if the file is new.
296    /// * OneDrive returns 200 if successfully overwrote the file successfully.
297    ///
298    /// Read more at https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content
299    ///
300    /// This function is different than uploading a file with chunks.
301    /// See also [`create_upload_session()`] and [`OneDriveWriter::write_chunked`].
302    pub async fn onedrive_upload_simple(
303        &self,
304        path: &str,
305        args: &OpWrite,
306        body: Buffer,
307    ) -> Result<Response<Buffer>> {
308        let url = format!(
309            "{}:/content?@microsoft.graph.conflictBehavior={}&{}",
310            self.onedrive_item_url(path, true),
311            REPLACE_EXISTING_ITEM_WHEN_CONFLICT,
312            GENERAL_SELECT_PARAM
313        );
314
315        // OneDrive upload API documentation requires "text/plain" as the content type.
316        // In practice, OneDrive ignores the content type,
317        // but decides the type (when stating) based on the extension name.
318        // Also, when the extension name is unknown to OneDrive,
319        // OneDrive sets the content type as "application/octet-stream".
320        // We keep the content type according to the documentation.
321        let mut request = Request::put(&url)
322            .header(header::CONTENT_LENGTH, body.len())
323            .header(header::CONTENT_TYPE, "text/plain");
324
325        // when creating a new file, `IF-Match` has no effect.
326        // when updating a file with the `If-Match`, and if the ETag mismatched,
327        // OneDrive will return 412 Precondition Failed
328        if let Some(if_match) = args.if_match() {
329            request = request.header(header::IF_MATCH, if_match);
330        }
331
332        let mut request = request.body(body).map_err(new_request_build_error)?;
333
334        self.sign(&mut request).await?;
335
336        self.info.http_client().send(request).await
337    }
338
339    pub(crate) async fn onedrive_chunked_upload(
340        &self,
341        url: &str,
342        args: &OpWrite,
343        offset: usize,
344        chunk_end: usize,
345        total_len: usize,
346        body: Buffer,
347    ) -> Result<Response<Buffer>> {
348        let mut request = Request::put(url);
349
350        let range = format!("bytes {}-{}/{}", offset, chunk_end, total_len);
351        request = request.header(header::CONTENT_RANGE, range);
352
353        let size = chunk_end - offset + 1;
354        request = request.header(header::CONTENT_LENGTH, size);
355
356        if let Some(mime) = args.content_type() {
357            request = request.header(header::CONTENT_TYPE, mime)
358        }
359
360        let request = request.body(body).map_err(new_request_build_error)?;
361        // OneDrive documentation requires not sending the `Authorization` header
362
363        self.info.http_client().send(request).await
364    }
365
366    /// Create a upload session for chunk uploads
367    ///
368    /// This endpoint supports `If-None-Match` but [`onedrive_upload_simple()`] doesn't.
369    ///
370    /// Read more at https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#upload-bytes-to-the-upload-session
371    pub(crate) async fn onedrive_create_upload_session(
372        &self,
373        path: &str,
374        args: &OpWrite,
375    ) -> Result<Response<Buffer>> {
376        let parent_path = get_parent(path);
377        let file_name = get_basename(path);
378        let url = format!(
379            "{}:/createUploadSession",
380            self.onedrive_item_url(parent_path, true),
381        );
382        let mut request = Request::post(url).header(header::CONTENT_TYPE, "application/json");
383
384        if let Some(if_match) = args.if_match() {
385            request = request.header(header::IF_MATCH, if_match);
386        }
387
388        let body = OneDriveUploadSessionCreationRequestBody::new(file_name.to_string());
389        let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
390        let body = Buffer::from(Bytes::from(body_bytes));
391        let mut request = request.body(body).map_err(new_request_build_error)?;
392
393        self.sign(&mut request).await?;
394
395        self.info.http_client().send(request).await
396    }
397
398    /// Create a directory
399    ///
400    /// When creating a folder, OneDrive returns a status code with 201.
401    /// When using `microsoft.graph.conflictBehavior=replace` to replace a folder, OneDrive returns 200.
402    ///
403    /// * `path` - the path to the folder without the root
404    pub(crate) async fn onedrive_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
405        let parent_path = get_parent(path);
406        let basename = get_basename(path);
407        let folder_name = basename.strip_suffix('/').unwrap_or(basename);
408
409        let url = format!(
410            "{}:/children?{}",
411            self.onedrive_item_url(parent_path, true),
412            GENERAL_SELECT_PARAM
413        );
414
415        let payload = CreateDirPayload::new(folder_name.to_string());
416        let body_bytes = serde_json::to_vec(&payload).map_err(new_json_serialize_error)?;
417        let body = Buffer::from(bytes::Bytes::from(body_bytes));
418
419        let mut request = Request::post(url)
420            .header(header::CONTENT_TYPE, "application/json")
421            .body(body)
422            .map_err(new_request_build_error)?;
423
424        self.sign(&mut request).await?;
425
426        self.info.http_client().send(request).await
427    }
428
429    /// Delete a `DriveItem`
430    ///
431    /// This moves the items to the recycle bin.
432    pub(crate) async fn onedrive_delete(&self, path: &str) -> Result<Response<Buffer>> {
433        let url = self.onedrive_item_url(path, true);
434
435        let mut request = Request::delete(&url)
436            .body(Buffer::new())
437            .map_err(new_request_build_error)?;
438
439        self.sign(&mut request).await?;
440
441        self.info.http_client().send(request).await
442    }
443
444    /// Initialize a copy
445    ///
446    /// * `source` - the path to the source folder without the root
447    /// * `destination` - the path to the destination folder without the root
448    ///
449    /// See also: [`wait_until_complete()`]
450    pub(crate) async fn initialize_copy(&self, source: &str, destination: &str) -> Result<String> {
451        // we must validate if source exist
452        let response = self.onedrive_get_stat_plain(source).await?;
453        if !response.status().is_success() {
454            return Err(parse_error(response));
455        }
456
457        // We need to stat the destination parent folder to get a parent reference
458        let destination_parent = get_parent(destination).to_string();
459        let basename = get_basename(destination);
460
461        let item = self.ensure_directory(&destination_parent).await?;
462        let body = OneDrivePatchRequestBody {
463            parent_reference: ParentReference {
464                path: "".to_string(), // irrelevant for copy
465                drive_id: item.parent_reference.drive_id,
466                id: item.id,
467            },
468            name: basename.to_string(),
469        };
470
471        // ensure the destination file or folder doesn't exist
472        let response = self.onedrive_get_stat_plain(destination).await?;
473        match response.status() {
474            // We must remove the file or folder because
475            // OneDrive doesn't support `conflictBehavior` for the consumer OneDrive.
476            // `conflictBehavior` seems to work for the consumer OneDrive sometimes could be a coincidence.
477            // Read more at https://learn.microsoft.com/en-us/graph/api/driveitem-copy
478            StatusCode::OK => {
479                let response = self.onedrive_delete(destination).await?;
480                match response.status() {
481                    StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => {} // expected, intentionally empty
482                    _ => return Err(parse_error(response)),
483                }
484            }
485            StatusCode::NOT_FOUND => {} // expected, intentionally empty
486            _ => return Err(parse_error(response)),
487        }
488
489        let url: String = format!("{}:/copy", self.onedrive_item_url(source, true));
490
491        let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
492        let buffer = Buffer::from(Bytes::from(body_bytes));
493        let mut request = Request::post(&url)
494            .header(header::CONTENT_TYPE, "application/json")
495            .body(buffer)
496            .map_err(new_request_build_error)?;
497
498        self.sign(&mut request).await?;
499
500        let response = self.info.http_client().send(request).await?;
501        match response.status() {
502            StatusCode::ACCEPTED => parse_location(response.headers())?
503                .ok_or_else(|| {
504                    Error::new(
505                        ErrorKind::Unexpected,
506                        "OneDrive didn't return a location URL",
507                    )
508                })
509                .map(String::from),
510            _ => Err(parse_error(response)),
511        }
512    }
513
514    pub(crate) async fn wait_until_complete(&self, monitor_url: String) -> Result<()> {
515        for _attempt in 0..MAX_MONITOR_ATTEMPT {
516            let mut request = Request::get(monitor_url.to_string())
517                .header(header::CONTENT_TYPE, "application/json")
518                .body(Buffer::new())
519                .map_err(new_request_build_error)?;
520
521            self.sign(&mut request).await?;
522
523            let response = self.info.http_client().send(request).await?;
524            let status: OneDriveMonitorStatus =
525                serde_json::from_reader(response.into_body().reader())
526                    .map_err(new_json_deserialize_error)?;
527            if status.status == "completed" {
528                return Ok(());
529            }
530
531            tokio::time::sleep(Duration::from_secs(MONITOR_WAIT_SECOND)).await;
532        }
533
534        Err(Error::new(
535            ErrorKind::Unexpected,
536            "Exceed monitoring timeout",
537        ))
538    }
539
540    pub(crate) async fn onedrive_move(&self, source: &str, destination: &str) -> Result<()> {
541        // We must validate if the source folder exists.
542        let response = self.onedrive_get_stat_plain(source).await?;
543        if !response.status().is_success() {
544            return Err(Error::new(ErrorKind::NotFound, "source not found"));
545        }
546
547        // We want a parent reference about the destination's parent, or the destination folder itself.
548        let destination_parent = get_parent(destination).to_string();
549        let basename = get_basename(destination);
550
551        let item = self.ensure_directory(&destination_parent).await?;
552        let body = OneDrivePatchRequestBody {
553            parent_reference: ParentReference {
554                path: "".to_string(), // irrelevant for update
555                // reusing `ParentReference` for convenience. The API requires this value to be correct.
556                drive_id: item.parent_reference.drive_id,
557                id: item.id,
558            },
559            name: basename.to_string(),
560        };
561        let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
562        let buffer = Buffer::from(Bytes::from(body_bytes));
563        let url: String = format!(
564            "{}?@microsoft.graph.conflictBehavior={}&$select=id",
565            self.onedrive_item_url(source, true),
566            REPLACE_EXISTING_ITEM_WHEN_CONFLICT
567        );
568        let mut request = Request::patch(&url)
569            .header(header::CONTENT_TYPE, "application/json")
570            .body(buffer)
571            .map_err(new_request_build_error)?;
572
573        self.sign(&mut request).await?;
574
575        let response = self.info.http_client().send(request).await?;
576        match response.status() {
577            // can get etag, metadata, etc...
578            StatusCode::OK => Ok(()),
579            _ => Err(parse_error(response)),
580        }
581    }
582}
583
584// keeps track of OAuth 2.0 tokens and refreshes the access token.
585pub struct OneDriveSigner {
586    pub info: Arc<AccessorInfo>, // to use `http_client`
587
588    pub client_id: String,
589    pub client_secret: String,
590    pub refresh_token: String,
591
592    pub access_token: String,
593    pub expires_in: DateTime<Utc>,
594}
595
596// OneDrive is part of Graph API hence shares the same authentication and authorization processes.
597// `common` applies to account types:
598//
599// - consumers
600// - work and school account
601//
602// set to `common` for simplicity
603const ONEDRIVE_REFRESH_TOKEN: &str = "https://login.microsoftonline.com/common/oauth2/v2.0/token";
604
605impl OneDriveSigner {
606    pub fn new(info: Arc<AccessorInfo>) -> Self {
607        OneDriveSigner {
608            info,
609
610            client_id: "".to_string(),
611            client_secret: "".to_string(),
612            refresh_token: "".to_string(),
613            access_token: "".to_string(),
614            expires_in: DateTime::<Utc>::MIN_UTC,
615        }
616    }
617
618    async fn refresh_tokens(&mut self) -> Result<()> {
619        // OneDrive users must provide at least this required permission scope
620        let encoded_payload = format!(
621            "client_id={}&client_secret={}&scope=offline_access%20Files.ReadWrite&refresh_token={}&grant_type=refresh_token",
622            percent_encode_path(self.client_id.as_str()),
623            percent_encode_path(self.client_secret.as_str()),
624            percent_encode_path(self.refresh_token.as_str())
625        );
626        let request = Request::post(ONEDRIVE_REFRESH_TOKEN)
627            .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
628            .body(Buffer::from(encoded_payload))
629            .map_err(new_request_build_error)?;
630
631        let response = self.info.http_client().send(request).await?;
632        match response.status() {
633            StatusCode::OK => {
634                let resp_body = response.into_body();
635                let data: GraphOAuthRefreshTokenResponseBody =
636                    serde_json::from_reader(resp_body.reader())
637                        .map_err(new_json_deserialize_error)?;
638                self.access_token = data.access_token;
639                self.refresh_token = data.refresh_token;
640                self.expires_in = Utc::now()
641                    + chrono::TimeDelta::try_seconds(data.expires_in)
642                        .expect("expires_in must be valid seconds")
643                    - chrono::TimeDelta::minutes(2); // assumes 2 mins graceful transmission for implementation simplicity
644                Ok(())
645            }
646            _ => Err(parse_error(response)),
647        }
648    }
649
650    /// Sign a request.
651    pub async fn sign<T>(&mut self, request: &mut Request<T>) -> Result<()> {
652        if !self.access_token.is_empty() && self.expires_in > Utc::now() {
653            let value = format!("Bearer {}", self.access_token)
654                .parse()
655                .expect("access_token must be valid header value");
656
657            request.headers_mut().insert(header::AUTHORIZATION, value);
658            return Ok(());
659        }
660
661        self.refresh_tokens().await?;
662
663        let auth_header_content = format!("Bearer {}", self.access_token)
664            .parse()
665            .expect("Fetched access_token is invalid as a header value");
666
667        request
668            .headers_mut()
669            .insert(header::AUTHORIZATION, auth_header_content);
670
671        Ok(())
672    }
673}