Skip to main content

meta_srv/procedure/repartition/group/
remap_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{Instruction, InstructionReply, RemapManifestReply};
21use common_meta::peer::Peer;
22use common_procedure::{Context as ProcedureContext, Status};
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{info, warn};
25use serde::{Deserialize, Serialize};
26use snafu::{OptionExt, ResultExt, ensure};
27use store_api::storage::RegionId;
28
29use crate::error::{self, Result};
30use crate::handler::HeartbeatMailbox;
31use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest;
32use crate::procedure::repartition::group::{Context, State};
33use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
34use crate::procedure::utils::instruction_error_result;
35use crate::service::mailbox::{Channel, MailboxRef};
36
37#[derive(Debug, Serialize, Deserialize)]
38pub(crate) struct RemapManifest;
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for RemapManifest {
43    async fn next(
44        &mut self,
45        ctx: &mut Context,
46        _procedure_ctx: &ProcedureContext,
47    ) -> Result<(Box<dyn State>, Status)> {
48        let timer = Instant::now();
49        let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
50        let remap = Self::build_remap_manifest_instructions(
51            &ctx.persistent_ctx.sources,
52            &ctx.persistent_ctx.targets,
53            &ctx.persistent_ctx.region_mapping,
54            prepare_result.central_region,
55        )?;
56        let operation_timeout =
57            ctx.next_operation_timeout()
58                .context(error::ExceededDeadlineSnafu {
59                    operation: "Remap manifests",
60                })?;
61        let manifest_paths = Self::remap_manifests(
62            &ctx.mailbox,
63            &ctx.server_addr,
64            &prepare_result.central_region_datanode,
65            &remap,
66            operation_timeout,
67        )
68        .await?;
69        let table_id = ctx.persistent_ctx.table_id;
70        let group_id = ctx.persistent_ctx.group_id;
71        let manifest_count = manifest_paths.len();
72        let input_region_count = ctx.persistent_ctx.sources.len();
73        let target_region_count = ctx.persistent_ctx.targets.len();
74        info!(
75            "Remap manifests finished for repartition, table_id: {}, group_id: {}, input_regions: {}, target_regions: {}, manifest_paths: {}",
76            table_id, group_id, input_region_count, target_region_count, manifest_count
77        );
78
79        if manifest_paths.len() != ctx.persistent_ctx.targets.len() {
80            warn!(
81                "Mismatch in manifest paths count: expected {}, got {}. This occurred during remapping manifests for group {} and table {}.",
82                ctx.persistent_ctx.targets.len(),
83                manifest_paths.len(),
84                group_id,
85                table_id
86            );
87        }
88
89        ctx.persistent_ctx.staging_manifest_paths = manifest_paths;
90        ctx.update_remap_manifest_elapsed(timer.elapsed());
91
92        Ok((Box::new(ApplyStagingManifest), Status::executing(true)))
93    }
94
95    fn as_any(&self) -> &dyn Any {
96        self
97    }
98}
99
100impl RemapManifest {
101    fn build_remap_manifest_instructions(
102        source_regions: &[SourceRegionDescriptor],
103        target_regions: &[TargetRegionDescriptor],
104        region_mapping: &HashMap<RegionId, Vec<RegionId>>,
105        central_region_id: RegionId,
106    ) -> Result<common_meta::instruction::RemapManifest> {
107        let new_partition_exprs = target_regions
108            .iter()
109            .map(|r| {
110                Ok((
111                    r.region_id,
112                    r.partition_expr
113                        .as_json_str()
114                        .context(error::SerializePartitionExprSnafu)?,
115                ))
116            })
117            .collect::<Result<HashMap<RegionId, String>>>()?;
118
119        Ok(common_meta::instruction::RemapManifest {
120            region_id: central_region_id,
121            input_regions: source_regions.iter().map(|r| r.region_id()).collect(),
122            region_mapping: region_mapping.clone(),
123            new_partition_exprs,
124        })
125    }
126
127    async fn remap_manifests(
128        mailbox: &MailboxRef,
129        server_addr: &str,
130        peer: &Peer,
131        remap: &common_meta::instruction::RemapManifest,
132        timeout: Duration,
133    ) -> Result<HashMap<RegionId, String>> {
134        let ch = Channel::Datanode(peer.id);
135        let instruction = Instruction::RemapManifest(remap.clone());
136        let tracing_ctx = TracingContext::from_current_span();
137        let message = MailboxMessage::json_message(
138            &format!(
139                "Remap manifests, central region: {}, input regions: {:?}",
140                remap.region_id, remap.input_regions
141            ),
142            &format!("Metasrv@{}", server_addr),
143            &format!("Datanode-{}@{}", peer.id, peer.addr),
144            common_time::util::current_time_millis(),
145            &instruction,
146            Some(tracing_ctx.to_w3c()),
147        )
148        .with_context(|_| error::SerializeToJsonSnafu {
149            input: instruction.to_string(),
150        })?;
151        let now = Instant::now();
152        let receiver = mailbox.send(&ch, message, timeout).await;
153
154        let receiver = match receiver {
155            Ok(receiver) => receiver,
156            Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
157                reason: format!(
158                    "Pusher not found for remap manifests on datanode {:?}, elapsed: {:?}",
159                    peer,
160                    now.elapsed()
161                ),
162            }
163            .fail()?,
164            Err(err) => {
165                return Err(err);
166            }
167        };
168
169        match receiver.await {
170            Ok(msg) => {
171                let reply = HeartbeatMailbox::json_reply(&msg)?;
172                let elapsed = now.elapsed();
173                let InstructionReply::RemapManifest(reply) = reply else {
174                    return error::UnexpectedInstructionReplySnafu {
175                        mailbox_message: msg.to_string(),
176                        reason: "expect remap manifest reply",
177                    }
178                    .fail();
179                };
180                let manifest_count = reply.manifest_paths.len();
181                info!(
182                    "Received remap manifest reply for central_region: {}, manifest_paths: {}, elapsed: {:?}",
183                    remap.region_id, manifest_count, elapsed
184                );
185
186                Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer)
187            }
188            Err(error::Error::MailboxChannelClosed { .. }) => error::RetryLaterSnafu {
189                reason: format!(
190                    "Mailbox closed when sending remap manifests to datanode {:?}, elapsed: {:?}",
191                    peer,
192                    now.elapsed()
193                ),
194            }
195            .fail()?,
196            Err(error::Error::MailboxTimeout { .. }) => {
197                let reason = format!(
198                    "Mailbox received timeout for remap manifests on datanode {:?}, elapsed: {:?}",
199                    peer,
200                    now.elapsed()
201                );
202                error::RetryLaterSnafu { reason }.fail()
203            }
204            Err(err) => Err(err),
205        }
206    }
207
208    fn handle_remap_manifest_reply(
209        region_id: RegionId,
210        RemapManifestReply {
211            exists,
212            manifest_paths,
213            error,
214        }: RemapManifestReply,
215        now: &Instant,
216        peer: &Peer,
217    ) -> Result<HashMap<RegionId, String>> {
218        ensure!(
219            exists,
220            error::UnexpectedSnafu {
221                violated: format!(
222                    "Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
223                    region_id,
224                    peer,
225                    now.elapsed()
226                )
227            }
228        );
229
230        if let Some(error) = error {
231            return instruction_error_result(
232                &error,
233                format!(
234                    "Failed to remap manifest on datanode {:?}, error: {:?}, elapsed: {:?}",
235                    peer,
236                    error,
237                    now.elapsed()
238                ),
239            );
240        }
241
242        Ok(manifest_paths)
243    }
244}