meta_srv/procedure/repartition/group/
remap_manifest.rs1use std::any::Any;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{Instruction, InstructionReply, RemapManifestReply};
21use common_meta::peer::Peer;
22use common_procedure::{Context as ProcedureContext, Status};
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{info, warn};
25use serde::{Deserialize, Serialize};
26use snafu::{OptionExt, ResultExt, ensure};
27use store_api::storage::RegionId;
28
29use crate::error::{self, Result};
30use crate::handler::HeartbeatMailbox;
31use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest;
32use crate::procedure::repartition::group::{Context, State};
33use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
34use crate::procedure::utils::instruction_error_result;
35use crate::service::mailbox::{Channel, MailboxRef};
36
37#[derive(Debug, Serialize, Deserialize)]
38pub(crate) struct RemapManifest;
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for RemapManifest {
43 async fn next(
44 &mut self,
45 ctx: &mut Context,
46 _procedure_ctx: &ProcedureContext,
47 ) -> Result<(Box<dyn State>, Status)> {
48 let timer = Instant::now();
49 let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
50 let remap = Self::build_remap_manifest_instructions(
51 &ctx.persistent_ctx.sources,
52 &ctx.persistent_ctx.targets,
53 &ctx.persistent_ctx.region_mapping,
54 prepare_result.central_region,
55 )?;
56 let operation_timeout =
57 ctx.next_operation_timeout()
58 .context(error::ExceededDeadlineSnafu {
59 operation: "Remap manifests",
60 })?;
61 let manifest_paths = Self::remap_manifests(
62 &ctx.mailbox,
63 &ctx.server_addr,
64 &prepare_result.central_region_datanode,
65 &remap,
66 operation_timeout,
67 )
68 .await?;
69 let table_id = ctx.persistent_ctx.table_id;
70 let group_id = ctx.persistent_ctx.group_id;
71 let manifest_count = manifest_paths.len();
72 let input_region_count = ctx.persistent_ctx.sources.len();
73 let target_region_count = ctx.persistent_ctx.targets.len();
74 info!(
75 "Remap manifests finished for repartition, table_id: {}, group_id: {}, input_regions: {}, target_regions: {}, manifest_paths: {}",
76 table_id, group_id, input_region_count, target_region_count, manifest_count
77 );
78
79 if manifest_paths.len() != ctx.persistent_ctx.targets.len() {
80 warn!(
81 "Mismatch in manifest paths count: expected {}, got {}. This occurred during remapping manifests for group {} and table {}.",
82 ctx.persistent_ctx.targets.len(),
83 manifest_paths.len(),
84 group_id,
85 table_id
86 );
87 }
88
89 ctx.persistent_ctx.staging_manifest_paths = manifest_paths;
90 ctx.update_remap_manifest_elapsed(timer.elapsed());
91
92 Ok((Box::new(ApplyStagingManifest), Status::executing(true)))
93 }
94
95 fn as_any(&self) -> &dyn Any {
96 self
97 }
98}
99
100impl RemapManifest {
101 fn build_remap_manifest_instructions(
102 source_regions: &[SourceRegionDescriptor],
103 target_regions: &[TargetRegionDescriptor],
104 region_mapping: &HashMap<RegionId, Vec<RegionId>>,
105 central_region_id: RegionId,
106 ) -> Result<common_meta::instruction::RemapManifest> {
107 let new_partition_exprs = target_regions
108 .iter()
109 .map(|r| {
110 Ok((
111 r.region_id,
112 r.partition_expr
113 .as_json_str()
114 .context(error::SerializePartitionExprSnafu)?,
115 ))
116 })
117 .collect::<Result<HashMap<RegionId, String>>>()?;
118
119 Ok(common_meta::instruction::RemapManifest {
120 region_id: central_region_id,
121 input_regions: source_regions.iter().map(|r| r.region_id()).collect(),
122 region_mapping: region_mapping.clone(),
123 new_partition_exprs,
124 })
125 }
126
127 async fn remap_manifests(
128 mailbox: &MailboxRef,
129 server_addr: &str,
130 peer: &Peer,
131 remap: &common_meta::instruction::RemapManifest,
132 timeout: Duration,
133 ) -> Result<HashMap<RegionId, String>> {
134 let ch = Channel::Datanode(peer.id);
135 let instruction = Instruction::RemapManifest(remap.clone());
136 let tracing_ctx = TracingContext::from_current_span();
137 let message = MailboxMessage::json_message(
138 &format!(
139 "Remap manifests, central region: {}, input regions: {:?}",
140 remap.region_id, remap.input_regions
141 ),
142 &format!("Metasrv@{}", server_addr),
143 &format!("Datanode-{}@{}", peer.id, peer.addr),
144 common_time::util::current_time_millis(),
145 &instruction,
146 Some(tracing_ctx.to_w3c()),
147 )
148 .with_context(|_| error::SerializeToJsonSnafu {
149 input: instruction.to_string(),
150 })?;
151 let now = Instant::now();
152 let receiver = mailbox.send(&ch, message, timeout).await;
153
154 let receiver = match receiver {
155 Ok(receiver) => receiver,
156 Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
157 reason: format!(
158 "Pusher not found for remap manifests on datanode {:?}, elapsed: {:?}",
159 peer,
160 now.elapsed()
161 ),
162 }
163 .fail()?,
164 Err(err) => {
165 return Err(err);
166 }
167 };
168
169 match receiver.await {
170 Ok(msg) => {
171 let reply = HeartbeatMailbox::json_reply(&msg)?;
172 let elapsed = now.elapsed();
173 let InstructionReply::RemapManifest(reply) = reply else {
174 return error::UnexpectedInstructionReplySnafu {
175 mailbox_message: msg.to_string(),
176 reason: "expect remap manifest reply",
177 }
178 .fail();
179 };
180 let manifest_count = reply.manifest_paths.len();
181 info!(
182 "Received remap manifest reply for central_region: {}, manifest_paths: {}, elapsed: {:?}",
183 remap.region_id, manifest_count, elapsed
184 );
185
186 Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer)
187 }
188 Err(error::Error::MailboxChannelClosed { .. }) => error::RetryLaterSnafu {
189 reason: format!(
190 "Mailbox closed when sending remap manifests to datanode {:?}, elapsed: {:?}",
191 peer,
192 now.elapsed()
193 ),
194 }
195 .fail()?,
196 Err(error::Error::MailboxTimeout { .. }) => {
197 let reason = format!(
198 "Mailbox received timeout for remap manifests on datanode {:?}, elapsed: {:?}",
199 peer,
200 now.elapsed()
201 );
202 error::RetryLaterSnafu { reason }.fail()
203 }
204 Err(err) => Err(err),
205 }
206 }
207
208 fn handle_remap_manifest_reply(
209 region_id: RegionId,
210 RemapManifestReply {
211 exists,
212 manifest_paths,
213 error,
214 }: RemapManifestReply,
215 now: &Instant,
216 peer: &Peer,
217 ) -> Result<HashMap<RegionId, String>> {
218 ensure!(
219 exists,
220 error::UnexpectedSnafu {
221 violated: format!(
222 "Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
223 region_id,
224 peer,
225 now.elapsed()
226 )
227 }
228 );
229
230 if let Some(error) = error {
231 return instruction_error_result(
232 &error,
233 format!(
234 "Failed to remap manifest on datanode {:?}, error: {:?}, elapsed: {:?}",
235 peer,
236 error,
237 now.elapsed()
238 ),
239 );
240 }
241
242 Ok(manifest_paths)
243 }
244}