meta_srv/procedure/repartition/group/
remap_manifest.rs1use std::any::Any;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{Instruction, InstructionReply, RemapManifestReply};
21use common_meta::peer::Peer;
22use common_procedure::{Context as ProcedureContext, Status};
23use common_telemetry::{info, warn};
24use serde::{Deserialize, Serialize};
25use snafu::{OptionExt, ResultExt, ensure};
26use store_api::storage::RegionId;
27
28use crate::error::{self, Result};
29use crate::handler::HeartbeatMailbox;
30use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest;
31use crate::procedure::repartition::group::{Context, State};
32use crate::procedure::repartition::plan::RegionDescriptor;
33use crate::service::mailbox::{Channel, MailboxRef};
34
35#[derive(Debug, Serialize, Deserialize)]
36pub(crate) struct RemapManifest;
37
38#[async_trait::async_trait]
39#[typetag::serde]
40impl State for RemapManifest {
41 async fn next(
42 &mut self,
43 ctx: &mut Context,
44 _procedure_ctx: &ProcedureContext,
45 ) -> Result<(Box<dyn State>, Status)> {
46 let timer = Instant::now();
47 let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
48 let remap = Self::build_remap_manifest_instructions(
49 &ctx.persistent_ctx.sources,
50 &ctx.persistent_ctx.targets,
51 &ctx.persistent_ctx.region_mapping,
52 prepare_result.central_region,
53 )?;
54 let operation_timeout =
55 ctx.next_operation_timeout()
56 .context(error::ExceededDeadlineSnafu {
57 operation: "Remap manifests",
58 })?;
59 let manifest_paths = Self::remap_manifests(
60 &ctx.mailbox,
61 &ctx.server_addr,
62 &prepare_result.central_region_datanode,
63 &remap,
64 operation_timeout,
65 )
66 .await?;
67 let table_id = ctx.persistent_ctx.table_id;
68 let group_id = ctx.persistent_ctx.group_id;
69 let manifest_count = manifest_paths.len();
70 let input_region_count = ctx.persistent_ctx.sources.len();
71 let target_region_count = ctx.persistent_ctx.targets.len();
72 info!(
73 "Remap manifests finished for repartition, table_id: {}, group_id: {}, input_regions: {}, target_regions: {}, manifest_paths: {}",
74 table_id, group_id, input_region_count, target_region_count, manifest_count
75 );
76
77 if manifest_paths.len() != ctx.persistent_ctx.targets.len() {
78 warn!(
79 "Mismatch in manifest paths count: expected {}, got {}. This occurred during remapping manifests for group {} and table {}.",
80 ctx.persistent_ctx.targets.len(),
81 manifest_paths.len(),
82 group_id,
83 table_id
84 );
85 }
86
87 ctx.persistent_ctx.staging_manifest_paths = manifest_paths;
88 ctx.update_remap_manifest_elapsed(timer.elapsed());
89
90 Ok((Box::new(ApplyStagingManifest), Status::executing(true)))
91 }
92
93 fn as_any(&self) -> &dyn Any {
94 self
95 }
96}
97
98impl RemapManifest {
99 fn build_remap_manifest_instructions(
100 source_regions: &[RegionDescriptor],
101 target_regions: &[RegionDescriptor],
102 region_mapping: &HashMap<RegionId, Vec<RegionId>>,
103 central_region_id: RegionId,
104 ) -> Result<common_meta::instruction::RemapManifest> {
105 let new_partition_exprs = target_regions
106 .iter()
107 .map(|r| {
108 Ok((
109 r.region_id,
110 r.partition_expr
111 .as_json_str()
112 .context(error::SerializePartitionExprSnafu)?,
113 ))
114 })
115 .collect::<Result<HashMap<RegionId, String>>>()?;
116
117 Ok(common_meta::instruction::RemapManifest {
118 region_id: central_region_id,
119 input_regions: source_regions.iter().map(|r| r.region_id).collect(),
120 region_mapping: region_mapping.clone(),
121 new_partition_exprs,
122 })
123 }
124
125 async fn remap_manifests(
126 mailbox: &MailboxRef,
127 server_addr: &str,
128 peer: &Peer,
129 remap: &common_meta::instruction::RemapManifest,
130 timeout: Duration,
131 ) -> Result<HashMap<RegionId, String>> {
132 let ch = Channel::Datanode(peer.id);
133 let instruction = Instruction::RemapManifest(remap.clone());
134 let message = MailboxMessage::json_message(
135 &format!(
136 "Remap manifests, central region: {}, input regions: {:?}",
137 remap.region_id, remap.input_regions
138 ),
139 &format!("Metasrv@{}", server_addr),
140 &format!("Datanode-{}@{}", peer.id, peer.addr),
141 common_time::util::current_time_millis(),
142 &instruction,
143 )
144 .with_context(|_| error::SerializeToJsonSnafu {
145 input: instruction.to_string(),
146 })?;
147 let now = Instant::now();
148 let receiver = mailbox.send(&ch, message, timeout).await;
149
150 let receiver = match receiver {
151 Ok(receiver) => receiver,
152 Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
153 reason: format!(
154 "Pusher not found for remap manifests on datanode {:?}, elapsed: {:?}",
155 peer,
156 now.elapsed()
157 ),
158 }
159 .fail()?,
160 Err(err) => {
161 return Err(err);
162 }
163 };
164
165 match receiver.await {
166 Ok(msg) => {
167 let reply = HeartbeatMailbox::json_reply(&msg)?;
168 let elapsed = now.elapsed();
169 let InstructionReply::RemapManifest(reply) = reply else {
170 return error::UnexpectedInstructionReplySnafu {
171 mailbox_message: msg.to_string(),
172 reason: "expect remap manifest reply",
173 }
174 .fail();
175 };
176 let manifest_count = reply.manifest_paths.len();
177 info!(
178 "Received remap manifest reply for central_region: {}, manifest_paths: {}, elapsed: {:?}",
179 remap.region_id, manifest_count, elapsed
180 );
181
182 Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer)
183 }
184 Err(error::Error::MailboxTimeout { .. }) => {
185 let reason = format!(
186 "Mailbox received timeout for remap manifests on datanode {:?}, elapsed: {:?}",
187 peer,
188 now.elapsed()
189 );
190 error::RetryLaterSnafu { reason }.fail()
191 }
192 Err(err) => Err(err),
193 }
194 }
195
196 fn handle_remap_manifest_reply(
197 region_id: RegionId,
198 RemapManifestReply {
199 exists,
200 manifest_paths,
201 error,
202 }: RemapManifestReply,
203 now: &Instant,
204 peer: &Peer,
205 ) -> Result<HashMap<RegionId, String>> {
206 ensure!(
207 exists,
208 error::UnexpectedSnafu {
209 violated: format!(
210 "Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
211 region_id,
212 peer,
213 now.elapsed()
214 )
215 }
216 );
217
218 if error.is_some() {
219 return error::RetryLaterSnafu {
220 reason: format!(
221 "Failed to remap manifest on datanode {:?}, error: {:?}, elapsed: {:?}",
222 peer,
223 error,
224 now.elapsed()
225 ),
226 }
227 .fail();
228 }
229
230 Ok(manifest_paths)
231 }
232}