meta_srv/procedure/repartition/group/
remap_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{Instruction, InstructionReply, RemapManifestReply};
21use common_meta::peer::Peer;
22use common_procedure::{Context as ProcedureContext, Status};
23use common_telemetry::{info, warn};
24use serde::{Deserialize, Serialize};
25use snafu::{OptionExt, ResultExt, ensure};
26use store_api::storage::RegionId;
27
28use crate::error::{self, Result};
29use crate::handler::HeartbeatMailbox;
30use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest;
31use crate::procedure::repartition::group::{Context, State};
32use crate::procedure::repartition::plan::RegionDescriptor;
33use crate::service::mailbox::{Channel, MailboxRef};
34
35#[derive(Debug, Serialize, Deserialize)]
36pub(crate) struct RemapManifest;
37
38#[async_trait::async_trait]
39#[typetag::serde]
40impl State for RemapManifest {
41    async fn next(
42        &mut self,
43        ctx: &mut Context,
44        _procedure_ctx: &ProcedureContext,
45    ) -> Result<(Box<dyn State>, Status)> {
46        let timer = Instant::now();
47        let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
48        let remap = Self::build_remap_manifest_instructions(
49            &ctx.persistent_ctx.sources,
50            &ctx.persistent_ctx.targets,
51            &ctx.persistent_ctx.region_mapping,
52            prepare_result.central_region,
53        )?;
54        let operation_timeout =
55            ctx.next_operation_timeout()
56                .context(error::ExceededDeadlineSnafu {
57                    operation: "Remap manifests",
58                })?;
59        let manifest_paths = Self::remap_manifests(
60            &ctx.mailbox,
61            &ctx.server_addr,
62            &prepare_result.central_region_datanode,
63            &remap,
64            operation_timeout,
65        )
66        .await?;
67        let table_id = ctx.persistent_ctx.table_id;
68        let group_id = ctx.persistent_ctx.group_id;
69        let manifest_count = manifest_paths.len();
70        let input_region_count = ctx.persistent_ctx.sources.len();
71        let target_region_count = ctx.persistent_ctx.targets.len();
72        info!(
73            "Remap manifests finished for repartition, table_id: {}, group_id: {}, input_regions: {}, target_regions: {}, manifest_paths: {}",
74            table_id, group_id, input_region_count, target_region_count, manifest_count
75        );
76
77        if manifest_paths.len() != ctx.persistent_ctx.targets.len() {
78            warn!(
79                "Mismatch in manifest paths count: expected {}, got {}. This occurred during remapping manifests for group {} and table {}.",
80                ctx.persistent_ctx.targets.len(),
81                manifest_paths.len(),
82                group_id,
83                table_id
84            );
85        }
86
87        ctx.persistent_ctx.staging_manifest_paths = manifest_paths;
88        ctx.update_remap_manifest_elapsed(timer.elapsed());
89
90        Ok((Box::new(ApplyStagingManifest), Status::executing(true)))
91    }
92
93    fn as_any(&self) -> &dyn Any {
94        self
95    }
96}
97
98impl RemapManifest {
99    fn build_remap_manifest_instructions(
100        source_regions: &[RegionDescriptor],
101        target_regions: &[RegionDescriptor],
102        region_mapping: &HashMap<RegionId, Vec<RegionId>>,
103        central_region_id: RegionId,
104    ) -> Result<common_meta::instruction::RemapManifest> {
105        let new_partition_exprs = target_regions
106            .iter()
107            .map(|r| {
108                Ok((
109                    r.region_id,
110                    r.partition_expr
111                        .as_json_str()
112                        .context(error::SerializePartitionExprSnafu)?,
113                ))
114            })
115            .collect::<Result<HashMap<RegionId, String>>>()?;
116
117        Ok(common_meta::instruction::RemapManifest {
118            region_id: central_region_id,
119            input_regions: source_regions.iter().map(|r| r.region_id).collect(),
120            region_mapping: region_mapping.clone(),
121            new_partition_exprs,
122        })
123    }
124
125    async fn remap_manifests(
126        mailbox: &MailboxRef,
127        server_addr: &str,
128        peer: &Peer,
129        remap: &common_meta::instruction::RemapManifest,
130        timeout: Duration,
131    ) -> Result<HashMap<RegionId, String>> {
132        let ch = Channel::Datanode(peer.id);
133        let instruction = Instruction::RemapManifest(remap.clone());
134        let message = MailboxMessage::json_message(
135            &format!(
136                "Remap manifests, central region: {}, input regions: {:?}",
137                remap.region_id, remap.input_regions
138            ),
139            &format!("Metasrv@{}", server_addr),
140            &format!("Datanode-{}@{}", peer.id, peer.addr),
141            common_time::util::current_time_millis(),
142            &instruction,
143        )
144        .with_context(|_| error::SerializeToJsonSnafu {
145            input: instruction.to_string(),
146        })?;
147        let now = Instant::now();
148        let receiver = mailbox.send(&ch, message, timeout).await;
149
150        let receiver = match receiver {
151            Ok(receiver) => receiver,
152            Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
153                reason: format!(
154                    "Pusher not found for remap manifests on datanode {:?}, elapsed: {:?}",
155                    peer,
156                    now.elapsed()
157                ),
158            }
159            .fail()?,
160            Err(err) => {
161                return Err(err);
162            }
163        };
164
165        match receiver.await {
166            Ok(msg) => {
167                let reply = HeartbeatMailbox::json_reply(&msg)?;
168                let elapsed = now.elapsed();
169                let InstructionReply::RemapManifest(reply) = reply else {
170                    return error::UnexpectedInstructionReplySnafu {
171                        mailbox_message: msg.to_string(),
172                        reason: "expect remap manifest reply",
173                    }
174                    .fail();
175                };
176                let manifest_count = reply.manifest_paths.len();
177                info!(
178                    "Received remap manifest reply for central_region: {}, manifest_paths: {}, elapsed: {:?}",
179                    remap.region_id, manifest_count, elapsed
180                );
181
182                Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer)
183            }
184            Err(error::Error::MailboxTimeout { .. }) => {
185                let reason = format!(
186                    "Mailbox received timeout for remap manifests on datanode {:?}, elapsed: {:?}",
187                    peer,
188                    now.elapsed()
189                );
190                error::RetryLaterSnafu { reason }.fail()
191            }
192            Err(err) => Err(err),
193        }
194    }
195
196    fn handle_remap_manifest_reply(
197        region_id: RegionId,
198        RemapManifestReply {
199            exists,
200            manifest_paths,
201            error,
202        }: RemapManifestReply,
203        now: &Instant,
204        peer: &Peer,
205    ) -> Result<HashMap<RegionId, String>> {
206        ensure!(
207            exists,
208            error::UnexpectedSnafu {
209                violated: format!(
210                    "Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
211                    region_id,
212                    peer,
213                    now.elapsed()
214                )
215            }
216        );
217
218        if error.is_some() {
219            return error::RetryLaterSnafu {
220                reason: format!(
221                    "Failed to remap manifest on datanode {:?}, error: {:?}, elapsed: {:?}",
222                    peer,
223                    error,
224                    now.elapsed()
225                ),
226            }
227            .fail();
228        }
229
230        Ok(manifest_paths)
231    }
232}