mito2/worker/
handle_apply_staging.rs1use chrono::Utc;
16use common_telemetry::{debug, info};
17use snafu::ResultExt;
18use store_api::logstore::LogStore;
19use store_api::region_request::ApplyStagingManifestRequest;
20use store_api::storage::RegionId;
21use tokio::sync::oneshot;
22
23use crate::error::{
24 RegionStateSnafu, Result, SerdeJsonSnafu, StagingPartitionExprMismatchSnafu, UnexpectedSnafu,
25};
26use crate::manifest::action::{RegionEdit, RegionManifest};
27use crate::manifest::storage::manifest_dir;
28use crate::manifest::storage::staging::{StagingBlobStorage, staging_blob_path};
29use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
30use crate::request::{OptionOutputTx, RegionEditRequest, WorkerRequest, WorkerRequestWithTime};
31use crate::sst::location::region_dir_from_table_dir;
32use crate::worker::RegionWorkerLoop;
33
34impl<S: LogStore> RegionWorkerLoop<S> {
35 pub(crate) async fn handle_apply_staging_manifest_request(
36 &mut self,
37 region_id: RegionId,
38 request: ApplyStagingManifestRequest,
39 sender: OptionOutputTx,
40 ) {
41 let region = match self.regions.writable_region(region_id) {
42 Ok(region) => region,
43 Err(e) => {
44 sender.send(Err(e));
45 return;
46 }
47 };
48
49 if !region.is_staging() {
50 let manifest_partition_expr = region.metadata().partition_expr.as_ref().cloned();
51 let is_match = manifest_partition_expr.as_ref() == Some(&request.partition_expr);
52 debug!(
53 "region {} manifest partition expr: {:?}, request partition expr: {:?}",
54 region_id, manifest_partition_expr, request.partition_expr
55 );
56 if is_match {
57 info!(
60 "Region {} already applied the staging manifest, partition expr: {}, ignore the apply staging manifest request",
61 region_id, request.partition_expr
62 );
63 sender.send(Ok(0));
64 return;
65 }
66
67 sender.send(
68 RegionStateSnafu {
69 region_id,
70 state: region.state(),
71 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
72 }
73 .fail(),
74 );
75 return;
76 }
77
78 let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
79 if staging_partition_expr.as_ref() != Some(&request.partition_expr) {
81 sender.send(
82 StagingPartitionExprMismatchSnafu {
83 manifest_expr: staging_partition_expr,
84 request_expr: request.partition_expr,
85 }
86 .fail(),
87 );
88 return;
89 }
90
91 let worker_sender = self.sender.clone();
92 common_runtime::spawn_global(async move {
93 let staging_manifest = match Self::fetch_staging_manifest(
94 ®ion,
95 request.central_region_id,
96 &request.manifest_path,
97 )
98 .await
99 {
100 Ok(staging_manifest) => staging_manifest,
101 Err(e) => {
102 sender.send(Err(e));
103 return;
104 }
105 };
106 if staging_manifest.metadata.partition_expr.as_ref() != Some(&request.partition_expr) {
107 sender.send(Err(StagingPartitionExprMismatchSnafu {
108 manifest_expr: staging_manifest.metadata.partition_expr.clone(),
109 request_expr: request.partition_expr,
110 }
111 .build()));
112 return;
113 }
114
115 let files_to_add = staging_manifest.files.values().cloned().collect::<Vec<_>>();
116 let edit = RegionEdit {
117 files_to_add,
118 files_to_remove: vec![],
119 timestamp_ms: Some(Utc::now().timestamp_millis()),
120 compaction_time_window: None,
121 flushed_entry_id: None,
122 flushed_sequence: None,
123 committed_sequence: None,
124 };
125
126 let (tx, rx) = oneshot::channel();
127 info!(
128 "Applying staging manifest request to region {}",
129 region.region_id,
130 );
131 let _ = worker_sender
132 .send(WorkerRequestWithTime::new(WorkerRequest::EditRegion(
133 RegionEditRequest {
134 region_id: region.region_id,
135 edit,
136 tx,
137 },
138 )))
139 .await;
140
141 if let Ok(result) = rx.await {
144 let Ok(()) = result else {
145 sender.send(result.map(|_| 0));
146 return;
147 };
148 let mut manager = region.manifest_ctx.manifest_manager.write().await;
149 match region.exit_staging_on_success(&mut manager).await {
150 Ok(()) => {
151 sender.send(Ok(0));
152 }
153 Err(e) => sender.send(Err(e)),
154 }
155 } else {
156 sender.send(
157 UnexpectedSnafu {
158 reason: "edit region receiver channel closed",
159 }
160 .fail(),
161 );
162 }
163 });
164 }
165
166 async fn fetch_staging_manifest(
171 region: &MitoRegionRef,
172 central_region_id: RegionId,
173 manifest_path: &str,
174 ) -> Result<RegionManifest> {
175 let region_dir =
176 region_dir_from_table_dir(region.table_dir(), central_region_id, region.path_type());
177 let staging_blob_path = staging_blob_path(&manifest_dir(®ion_dir));
178 let staging_blob_storage = StagingBlobStorage::new(
179 staging_blob_path,
180 region.access_layer().object_store().clone(),
181 );
182 let staging_manifest = staging_blob_storage.get(manifest_path).await?;
183
184 serde_json::from_slice::<RegionManifest>(&staging_manifest).context(SerdeJsonSnafu)
185 }
186}