mito2/worker/
handle_apply_staging.rs1use chrono::Utc;
16use common_telemetry::{debug, info};
17use snafu::ResultExt;
18use store_api::logstore::LogStore;
19use store_api::region_request::ApplyStagingManifestRequest;
20use store_api::storage::RegionId;
21use tokio::sync::oneshot;
22
23use crate::error::{
24 RegionStateSnafu, Result, SerdeJsonSnafu, StagingPartitionExprMismatchSnafu, UnexpectedSnafu,
25};
26use crate::manifest::action::{RegionEdit, RegionManifest};
27use crate::manifest::storage::manifest_dir;
28use crate::manifest::storage::staging::{StagingBlobStorage, staging_blob_path};
29use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
30use crate::request::{OptionOutputTx, RegionEditRequest, WorkerRequest, WorkerRequestWithTime};
31use crate::sst::location::region_dir_from_table_dir;
32use crate::worker::RegionWorkerLoop;
33
34impl<S: LogStore> RegionWorkerLoop<S> {
35 pub(crate) async fn handle_apply_staging_manifest_request(
36 &mut self,
37 region_id: RegionId,
38 request: ApplyStagingManifestRequest,
39 sender: OptionOutputTx,
40 ) {
41 let region = match self.regions.writable_region(region_id) {
42 Ok(region) => region,
43 Err(e) => {
44 sender.send(Err(e));
45 return;
46 }
47 };
48
49 if !region.is_staging() {
50 let manifest_partition_expr = region.metadata().partition_expr.as_ref().cloned();
51 let is_match = manifest_partition_expr.as_ref() == Some(&request.partition_expr);
52 debug!(
53 "region {} manifest partition expr: {:?}, request partition expr: {:?}",
54 region_id, manifest_partition_expr, request.partition_expr
55 );
56 if is_match {
57 info!(
60 "Region {} already applied the staging manifest, partition expr: {}, ignore the apply staging manifest request",
61 region_id, request.partition_expr
62 );
63 sender.send(Ok(0));
64 return;
65 }
66
67 sender.send(
68 RegionStateSnafu {
69 region_id,
70 state: region.state(),
71 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
72 }
73 .fail(),
74 );
75 return;
76 }
77
78 let staging_partition_info = region.staging_partition_info.lock().unwrap().clone();
79
80 let staging_partition_expr = staging_partition_info
81 .as_ref()
82 .and_then(|info| info.partition_expr());
83 if staging_partition_expr != Some(request.partition_expr.as_str()) {
85 sender.send(
86 StagingPartitionExprMismatchSnafu {
87 manifest_expr: staging_partition_info
88 .as_ref()
89 .and_then(|info| info.partition_expr().map(ToString::to_string)),
90 request_expr: request.partition_expr,
91 }
92 .fail(),
93 );
94 return;
95 }
96
97 let worker_sender = self.sender.clone();
98 common_runtime::spawn_global(async move {
99 let staging_manifest = match Self::fetch_staging_manifest(
100 ®ion,
101 request.central_region_id,
102 &request.manifest_path,
103 )
104 .await
105 {
106 Ok(staging_manifest) => staging_manifest,
107 Err(e) => {
108 sender.send(Err(e));
109 return;
110 }
111 };
112 if staging_manifest.metadata.partition_expr.as_ref() != Some(&request.partition_expr) {
113 sender.send(Err(StagingPartitionExprMismatchSnafu {
114 manifest_expr: staging_manifest.metadata.partition_expr.clone(),
115 request_expr: request.partition_expr,
116 }
117 .build()));
118 return;
119 }
120
121 let files_to_add = staging_manifest.files.values().cloned().collect::<Vec<_>>();
122 let edit = RegionEdit {
123 files_to_add,
124 files_to_remove: vec![],
125 timestamp_ms: Some(Utc::now().timestamp_millis()),
126 compaction_time_window: None,
127 flushed_entry_id: None,
128 flushed_sequence: None,
129 committed_sequence: None,
130 };
131
132 let (tx, rx) = oneshot::channel();
133 info!(
134 "Applying staging manifest request to region {}",
135 region.region_id,
136 );
137 let _ = worker_sender
138 .send(WorkerRequestWithTime::new(WorkerRequest::EditRegion(
139 RegionEditRequest {
140 region_id: region.region_id,
141 edit,
142 tx,
143 },
144 )))
145 .await;
146
147 if let Ok(result) = rx.await {
150 let Ok(()) = result else {
151 sender.send(result.map(|_| 0));
152 return;
153 };
154 let mut manager = region.manifest_ctx.manifest_manager.write().await;
155 match region.exit_staging_on_success(&mut manager).await {
156 Ok(()) => {
157 sender.send(Ok(0));
158 }
159 Err(e) => sender.send(Err(e)),
160 }
161 } else {
162 sender.send(
163 UnexpectedSnafu {
164 reason: "edit region receiver channel closed",
165 }
166 .fail(),
167 );
168 }
169 });
170 }
171
172 async fn fetch_staging_manifest(
177 region: &MitoRegionRef,
178 central_region_id: RegionId,
179 manifest_path: &str,
180 ) -> Result<RegionManifest> {
181 let region_dir =
182 region_dir_from_table_dir(region.table_dir(), central_region_id, region.path_type());
183 let staging_blob_path = staging_blob_path(&manifest_dir(®ion_dir));
184 let staging_blob_storage = StagingBlobStorage::new(
185 staging_blob_path,
186 region.access_layer().object_store().clone(),
187 );
188 let staging_manifest = staging_blob_storage.get(manifest_path).await?;
189
190 serde_json::from_slice::<RegionManifest>(&staging_manifest).context(SerdeJsonSnafu)
191 }
192}