meta_srv/procedure/repartition/group/
remap_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{Instruction, InstructionReply, RemapManifestReply};
21use common_meta::peer::Peer;
22use common_procedure::{Context as ProcedureContext, Status};
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{info, warn};
25use serde::{Deserialize, Serialize};
26use snafu::{OptionExt, ResultExt, ensure};
27use store_api::storage::RegionId;
28
29use crate::error::{self, Result};
30use crate::handler::HeartbeatMailbox;
31use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest;
32use crate::procedure::repartition::group::{Context, State};
33use crate::procedure::repartition::plan::RegionDescriptor;
34use crate::service::mailbox::{Channel, MailboxRef};
35
36#[derive(Debug, Serialize, Deserialize)]
37pub(crate) struct RemapManifest;
38
39#[async_trait::async_trait]
40#[typetag::serde]
41impl State for RemapManifest {
42    async fn next(
43        &mut self,
44        ctx: &mut Context,
45        _procedure_ctx: &ProcedureContext,
46    ) -> Result<(Box<dyn State>, Status)> {
47        let timer = Instant::now();
48        let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
49        let remap = Self::build_remap_manifest_instructions(
50            &ctx.persistent_ctx.sources,
51            &ctx.persistent_ctx.targets,
52            &ctx.persistent_ctx.region_mapping,
53            prepare_result.central_region,
54        )?;
55        let operation_timeout =
56            ctx.next_operation_timeout()
57                .context(error::ExceededDeadlineSnafu {
58                    operation: "Remap manifests",
59                })?;
60        let manifest_paths = Self::remap_manifests(
61            &ctx.mailbox,
62            &ctx.server_addr,
63            &prepare_result.central_region_datanode,
64            &remap,
65            operation_timeout,
66        )
67        .await?;
68        let table_id = ctx.persistent_ctx.table_id;
69        let group_id = ctx.persistent_ctx.group_id;
70        let manifest_count = manifest_paths.len();
71        let input_region_count = ctx.persistent_ctx.sources.len();
72        let target_region_count = ctx.persistent_ctx.targets.len();
73        info!(
74            "Remap manifests finished for repartition, table_id: {}, group_id: {}, input_regions: {}, target_regions: {}, manifest_paths: {}",
75            table_id, group_id, input_region_count, target_region_count, manifest_count
76        );
77
78        if manifest_paths.len() != ctx.persistent_ctx.targets.len() {
79            warn!(
80                "Mismatch in manifest paths count: expected {}, got {}. This occurred during remapping manifests for group {} and table {}.",
81                ctx.persistent_ctx.targets.len(),
82                manifest_paths.len(),
83                group_id,
84                table_id
85            );
86        }
87
88        ctx.persistent_ctx.staging_manifest_paths = manifest_paths;
89        ctx.update_remap_manifest_elapsed(timer.elapsed());
90
91        Ok((Box::new(ApplyStagingManifest), Status::executing(true)))
92    }
93
94    fn as_any(&self) -> &dyn Any {
95        self
96    }
97}
98
99impl RemapManifest {
100    fn build_remap_manifest_instructions(
101        source_regions: &[RegionDescriptor],
102        target_regions: &[RegionDescriptor],
103        region_mapping: &HashMap<RegionId, Vec<RegionId>>,
104        central_region_id: RegionId,
105    ) -> Result<common_meta::instruction::RemapManifest> {
106        let new_partition_exprs = target_regions
107            .iter()
108            .map(|r| {
109                Ok((
110                    r.region_id,
111                    r.partition_expr
112                        .as_json_str()
113                        .context(error::SerializePartitionExprSnafu)?,
114                ))
115            })
116            .collect::<Result<HashMap<RegionId, String>>>()?;
117
118        Ok(common_meta::instruction::RemapManifest {
119            region_id: central_region_id,
120            input_regions: source_regions.iter().map(|r| r.region_id).collect(),
121            region_mapping: region_mapping.clone(),
122            new_partition_exprs,
123        })
124    }
125
126    async fn remap_manifests(
127        mailbox: &MailboxRef,
128        server_addr: &str,
129        peer: &Peer,
130        remap: &common_meta::instruction::RemapManifest,
131        timeout: Duration,
132    ) -> Result<HashMap<RegionId, String>> {
133        let ch = Channel::Datanode(peer.id);
134        let instruction = Instruction::RemapManifest(remap.clone());
135        let tracing_ctx = TracingContext::from_current_span();
136        let message = MailboxMessage::json_message(
137            &format!(
138                "Remap manifests, central region: {}, input regions: {:?}",
139                remap.region_id, remap.input_regions
140            ),
141            &format!("Metasrv@{}", server_addr),
142            &format!("Datanode-{}@{}", peer.id, peer.addr),
143            common_time::util::current_time_millis(),
144            &instruction,
145            Some(tracing_ctx.to_w3c()),
146        )
147        .with_context(|_| error::SerializeToJsonSnafu {
148            input: instruction.to_string(),
149        })?;
150        let now = Instant::now();
151        let receiver = mailbox.send(&ch, message, timeout).await;
152
153        let receiver = match receiver {
154            Ok(receiver) => receiver,
155            Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
156                reason: format!(
157                    "Pusher not found for remap manifests on datanode {:?}, elapsed: {:?}",
158                    peer,
159                    now.elapsed()
160                ),
161            }
162            .fail()?,
163            Err(err) => {
164                return Err(err);
165            }
166        };
167
168        match receiver.await {
169            Ok(msg) => {
170                let reply = HeartbeatMailbox::json_reply(&msg)?;
171                let elapsed = now.elapsed();
172                let InstructionReply::RemapManifest(reply) = reply else {
173                    return error::UnexpectedInstructionReplySnafu {
174                        mailbox_message: msg.to_string(),
175                        reason: "expect remap manifest reply",
176                    }
177                    .fail();
178                };
179                let manifest_count = reply.manifest_paths.len();
180                info!(
181                    "Received remap manifest reply for central_region: {}, manifest_paths: {}, elapsed: {:?}",
182                    remap.region_id, manifest_count, elapsed
183                );
184
185                Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer)
186            }
187            Err(error::Error::MailboxTimeout { .. }) => {
188                let reason = format!(
189                    "Mailbox received timeout for remap manifests on datanode {:?}, elapsed: {:?}",
190                    peer,
191                    now.elapsed()
192                );
193                error::RetryLaterSnafu { reason }.fail()
194            }
195            Err(err) => Err(err),
196        }
197    }
198
199    fn handle_remap_manifest_reply(
200        region_id: RegionId,
201        RemapManifestReply {
202            exists,
203            manifest_paths,
204            error,
205        }: RemapManifestReply,
206        now: &Instant,
207        peer: &Peer,
208    ) -> Result<HashMap<RegionId, String>> {
209        ensure!(
210            exists,
211            error::UnexpectedSnafu {
212                violated: format!(
213                    "Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
214                    region_id,
215                    peer,
216                    now.elapsed()
217                )
218            }
219        );
220
221        if error.is_some() {
222            return error::RetryLaterSnafu {
223                reason: format!(
224                    "Failed to remap manifest on datanode {:?}, error: {:?}, elapsed: {:?}",
225                    peer,
226                    error,
227                    now.elapsed()
228                ),
229            }
230            .fail();
231        }
232
233        Ok(manifest_paths)
234    }
235}