meta_srv/procedure/repartition/group/
remap_manifest.rs1use std::any::Any;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{Instruction, InstructionReply, RemapManifestReply};
21use common_meta::peer::Peer;
22use common_procedure::{Context as ProcedureContext, Status};
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{info, warn};
25use serde::{Deserialize, Serialize};
26use snafu::{OptionExt, ResultExt, ensure};
27use store_api::storage::RegionId;
28
29use crate::error::{self, Result};
30use crate::handler::HeartbeatMailbox;
31use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest;
32use crate::procedure::repartition::group::{Context, State};
33use crate::procedure::repartition::plan::RegionDescriptor;
34use crate::service::mailbox::{Channel, MailboxRef};
35
36#[derive(Debug, Serialize, Deserialize)]
37pub(crate) struct RemapManifest;
38
39#[async_trait::async_trait]
40#[typetag::serde]
41impl State for RemapManifest {
42 async fn next(
43 &mut self,
44 ctx: &mut Context,
45 _procedure_ctx: &ProcedureContext,
46 ) -> Result<(Box<dyn State>, Status)> {
47 let timer = Instant::now();
48 let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
49 let remap = Self::build_remap_manifest_instructions(
50 &ctx.persistent_ctx.sources,
51 &ctx.persistent_ctx.targets,
52 &ctx.persistent_ctx.region_mapping,
53 prepare_result.central_region,
54 )?;
55 let operation_timeout =
56 ctx.next_operation_timeout()
57 .context(error::ExceededDeadlineSnafu {
58 operation: "Remap manifests",
59 })?;
60 let manifest_paths = Self::remap_manifests(
61 &ctx.mailbox,
62 &ctx.server_addr,
63 &prepare_result.central_region_datanode,
64 &remap,
65 operation_timeout,
66 )
67 .await?;
68 let table_id = ctx.persistent_ctx.table_id;
69 let group_id = ctx.persistent_ctx.group_id;
70 let manifest_count = manifest_paths.len();
71 let input_region_count = ctx.persistent_ctx.sources.len();
72 let target_region_count = ctx.persistent_ctx.targets.len();
73 info!(
74 "Remap manifests finished for repartition, table_id: {}, group_id: {}, input_regions: {}, target_regions: {}, manifest_paths: {}",
75 table_id, group_id, input_region_count, target_region_count, manifest_count
76 );
77
78 if manifest_paths.len() != ctx.persistent_ctx.targets.len() {
79 warn!(
80 "Mismatch in manifest paths count: expected {}, got {}. This occurred during remapping manifests for group {} and table {}.",
81 ctx.persistent_ctx.targets.len(),
82 manifest_paths.len(),
83 group_id,
84 table_id
85 );
86 }
87
88 ctx.persistent_ctx.staging_manifest_paths = manifest_paths;
89 ctx.update_remap_manifest_elapsed(timer.elapsed());
90
91 Ok((Box::new(ApplyStagingManifest), Status::executing(true)))
92 }
93
94 fn as_any(&self) -> &dyn Any {
95 self
96 }
97}
98
99impl RemapManifest {
100 fn build_remap_manifest_instructions(
101 source_regions: &[RegionDescriptor],
102 target_regions: &[RegionDescriptor],
103 region_mapping: &HashMap<RegionId, Vec<RegionId>>,
104 central_region_id: RegionId,
105 ) -> Result<common_meta::instruction::RemapManifest> {
106 let new_partition_exprs = target_regions
107 .iter()
108 .map(|r| {
109 Ok((
110 r.region_id,
111 r.partition_expr
112 .as_json_str()
113 .context(error::SerializePartitionExprSnafu)?,
114 ))
115 })
116 .collect::<Result<HashMap<RegionId, String>>>()?;
117
118 Ok(common_meta::instruction::RemapManifest {
119 region_id: central_region_id,
120 input_regions: source_regions.iter().map(|r| r.region_id).collect(),
121 region_mapping: region_mapping.clone(),
122 new_partition_exprs,
123 })
124 }
125
126 async fn remap_manifests(
127 mailbox: &MailboxRef,
128 server_addr: &str,
129 peer: &Peer,
130 remap: &common_meta::instruction::RemapManifest,
131 timeout: Duration,
132 ) -> Result<HashMap<RegionId, String>> {
133 let ch = Channel::Datanode(peer.id);
134 let instruction = Instruction::RemapManifest(remap.clone());
135 let tracing_ctx = TracingContext::from_current_span();
136 let message = MailboxMessage::json_message(
137 &format!(
138 "Remap manifests, central region: {}, input regions: {:?}",
139 remap.region_id, remap.input_regions
140 ),
141 &format!("Metasrv@{}", server_addr),
142 &format!("Datanode-{}@{}", peer.id, peer.addr),
143 common_time::util::current_time_millis(),
144 &instruction,
145 Some(tracing_ctx.to_w3c()),
146 )
147 .with_context(|_| error::SerializeToJsonSnafu {
148 input: instruction.to_string(),
149 })?;
150 let now = Instant::now();
151 let receiver = mailbox.send(&ch, message, timeout).await;
152
153 let receiver = match receiver {
154 Ok(receiver) => receiver,
155 Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
156 reason: format!(
157 "Pusher not found for remap manifests on datanode {:?}, elapsed: {:?}",
158 peer,
159 now.elapsed()
160 ),
161 }
162 .fail()?,
163 Err(err) => {
164 return Err(err);
165 }
166 };
167
168 match receiver.await {
169 Ok(msg) => {
170 let reply = HeartbeatMailbox::json_reply(&msg)?;
171 let elapsed = now.elapsed();
172 let InstructionReply::RemapManifest(reply) = reply else {
173 return error::UnexpectedInstructionReplySnafu {
174 mailbox_message: msg.to_string(),
175 reason: "expect remap manifest reply",
176 }
177 .fail();
178 };
179 let manifest_count = reply.manifest_paths.len();
180 info!(
181 "Received remap manifest reply for central_region: {}, manifest_paths: {}, elapsed: {:?}",
182 remap.region_id, manifest_count, elapsed
183 );
184
185 Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer)
186 }
187 Err(error::Error::MailboxTimeout { .. }) => {
188 let reason = format!(
189 "Mailbox received timeout for remap manifests on datanode {:?}, elapsed: {:?}",
190 peer,
191 now.elapsed()
192 );
193 error::RetryLaterSnafu { reason }.fail()
194 }
195 Err(err) => Err(err),
196 }
197 }
198
199 fn handle_remap_manifest_reply(
200 region_id: RegionId,
201 RemapManifestReply {
202 exists,
203 manifest_paths,
204 error,
205 }: RemapManifestReply,
206 now: &Instant,
207 peer: &Peer,
208 ) -> Result<HashMap<RegionId, String>> {
209 ensure!(
210 exists,
211 error::UnexpectedSnafu {
212 violated: format!(
213 "Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
214 region_id,
215 peer,
216 now.elapsed()
217 )
218 }
219 );
220
221 if error.is_some() {
222 return error::RetryLaterSnafu {
223 reason: format!(
224 "Failed to remap manifest on datanode {:?}, error: {:?}, elapsed: {:?}",
225 peer,
226 error,
227 now.elapsed()
228 ),
229 }
230 .fail();
231 }
232
233 Ok(manifest_paths)
234 }
235}