mito2/worker/
handle_apply_staging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use chrono::Utc;
16use common_telemetry::{debug, info};
17use snafu::ResultExt;
18use store_api::logstore::LogStore;
19use store_api::region_request::ApplyStagingManifestRequest;
20use store_api::storage::RegionId;
21use tokio::sync::oneshot;
22
23use crate::error::{
24    RegionStateSnafu, Result, SerdeJsonSnafu, StagingPartitionExprMismatchSnafu, UnexpectedSnafu,
25};
26use crate::manifest::action::{RegionEdit, RegionManifest};
27use crate::manifest::storage::manifest_dir;
28use crate::manifest::storage::staging::{StagingBlobStorage, staging_blob_path};
29use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
30use crate::request::{OptionOutputTx, RegionEditRequest, WorkerRequest, WorkerRequestWithTime};
31use crate::sst::location::region_dir_from_table_dir;
32use crate::worker::RegionWorkerLoop;
33
34impl<S: LogStore> RegionWorkerLoop<S> {
35    pub(crate) async fn handle_apply_staging_manifest_request(
36        &mut self,
37        region_id: RegionId,
38        request: ApplyStagingManifestRequest,
39        sender: OptionOutputTx,
40    ) {
41        let region = match self.regions.writable_region(region_id) {
42            Ok(region) => region,
43            Err(e) => {
44                sender.send(Err(e));
45                return;
46            }
47        };
48
49        if !region.is_staging() {
50            let manifest_partition_expr = region.metadata().partition_expr.as_ref().cloned();
51            let is_match = manifest_partition_expr.as_ref() == Some(&request.partition_expr);
52            debug!(
53                "region {} manifest partition expr: {:?}, request partition expr: {:?}",
54                region_id, manifest_partition_expr, request.partition_expr
55            );
56            if is_match {
57                // If current partition expr is already the same as the request,
58                // treats the region already applied the staging manifest.
59                info!(
60                    "Region {} already applied the staging manifest, partition expr: {}, ignore the apply staging manifest request",
61                    region_id, request.partition_expr
62                );
63                sender.send(Ok(0));
64                return;
65            }
66
67            sender.send(
68                RegionStateSnafu {
69                    region_id,
70                    state: region.state(),
71                    expect: RegionRoleState::Leader(RegionLeaderState::Staging),
72                }
73                .fail(),
74            );
75            return;
76        }
77
78        let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
79        // If the partition expr mismatch, return error.
80        if staging_partition_expr.as_ref() != Some(&request.partition_expr) {
81            sender.send(
82                StagingPartitionExprMismatchSnafu {
83                    manifest_expr: staging_partition_expr,
84                    request_expr: request.partition_expr,
85                }
86                .fail(),
87            );
88            return;
89        }
90
91        let worker_sender = self.sender.clone();
92        common_runtime::spawn_global(async move {
93            let staging_manifest = match Self::fetch_staging_manifest(
94                &region,
95                request.central_region_id,
96                &request.manifest_path,
97            )
98            .await
99            {
100                Ok(staging_manifest) => staging_manifest,
101                Err(e) => {
102                    sender.send(Err(e));
103                    return;
104                }
105            };
106            if staging_manifest.metadata.partition_expr.as_ref() != Some(&request.partition_expr) {
107                sender.send(Err(StagingPartitionExprMismatchSnafu {
108                    manifest_expr: staging_manifest.metadata.partition_expr.clone(),
109                    request_expr: request.partition_expr,
110                }
111                .build()));
112                return;
113            }
114
115            let files_to_add = staging_manifest.files.values().cloned().collect::<Vec<_>>();
116            let edit = RegionEdit {
117                files_to_add,
118                files_to_remove: vec![],
119                timestamp_ms: Some(Utc::now().timestamp_millis()),
120                compaction_time_window: None,
121                flushed_entry_id: None,
122                flushed_sequence: None,
123                committed_sequence: None,
124            };
125
126            let (tx, rx) = oneshot::channel();
127            info!(
128                "Applying staging manifest request to region {}",
129                region.region_id,
130            );
131            let _ = worker_sender
132                .send(WorkerRequestWithTime::new(WorkerRequest::EditRegion(
133                    RegionEditRequest {
134                        region_id: region.region_id,
135                        edit,
136                        tx,
137                    },
138                )))
139                .await;
140
141            // Await the result from the region edit and forward the outcome to the original sender.
142            // If the operation completes successfully, respond with Ok(0); otherwise, respond with an appropriate error.
143            if let Ok(result) = rx.await {
144                let Ok(()) = result else {
145                    sender.send(result.map(|_| 0));
146                    return;
147                };
148                let mut manager = region.manifest_ctx.manifest_manager.write().await;
149                match region.exit_staging_on_success(&mut manager).await {
150                    Ok(()) => {
151                        sender.send(Ok(0));
152                    }
153                    Err(e) => sender.send(Err(e)),
154                }
155            } else {
156                sender.send(
157                    UnexpectedSnafu {
158                        reason: "edit region receiver channel closed",
159                    }
160                    .fail(),
161                );
162            }
163        });
164    }
165
166    /// Fetches the staging manifest from the central region's staging blob storage.
167    ///
168    /// The `central_region_id` is used to locate the staging directory because the staging
169    /// manifest was created by the central region during `remap_manifests` operation.
170    async fn fetch_staging_manifest(
171        region: &MitoRegionRef,
172        central_region_id: RegionId,
173        manifest_path: &str,
174    ) -> Result<RegionManifest> {
175        let region_dir =
176            region_dir_from_table_dir(region.table_dir(), central_region_id, region.path_type());
177        let staging_blob_path = staging_blob_path(&manifest_dir(&region_dir));
178        let staging_blob_storage = StagingBlobStorage::new(
179            staging_blob_path,
180            region.access_layer().object_store().clone(),
181        );
182        let staging_manifest = staging_blob_storage.get(manifest_path).await?;
183
184        serde_json::from_slice::<RegionManifest>(&staging_manifest).context(SerdeJsonSnafu)
185    }
186}