Skip to main content

meta_srv/procedure/repartition/group/
sync_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{Instruction, InstructionReply, SyncRegionReply, SyncRegionsReply};
21use common_meta::peer::Peer;
22use common_meta::rpc::router::RegionRoute;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use common_telemetry::tracing_context::TracingContext;
26use futures::future::join_all;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::region_engine::SyncRegionFromRequest;
30use store_api::region_request::RegionFlushReason;
31use store_api::storage::RegionId;
32
33use crate::error::{self, Error, Result};
34use crate::handler::HeartbeatMailbox;
35use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
36use crate::procedure::repartition::group::utils::{
37    HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
38};
39use crate::procedure::repartition::group::{Context, State};
40use crate::procedure::utils::ErrorStrategy;
41use crate::service::mailbox::{Channel, MailboxRef};
42
43const DEFAULT_SYNC_REGION_PARALLELISM: usize = 3;
44
45/// The state of syncing regions for a repartition group.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct SyncRegion {
48    pub region_routes: Vec<RegionRoute>,
49}
50
51#[async_trait::async_trait]
52#[typetag::serde]
53impl State for SyncRegion {
54    async fn next(
55        &mut self,
56        ctx: &mut Context,
57        _procedure_ctx: &ProcedureContext,
58    ) -> Result<(Box<dyn State>, Status)> {
59        Self::flush_central_region(ctx).await?;
60        self.sync_regions(ctx).await?;
61
62        Ok((
63            Box::new(UpdateMetadata::ApplyStaging),
64            Status::executing(true),
65        ))
66    }
67
68    fn as_any(&self) -> &dyn Any {
69        self
70    }
71}
72
73impl SyncRegion {
74    async fn flush_central_region(ctx: &mut Context) -> Result<()> {
75        let operation_timeout =
76            ctx.next_operation_timeout()
77                .context(error::ExceededDeadlineSnafu {
78                    operation: "Flush central region",
79                })?;
80        let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
81
82        crate::procedure::utils::flush_region(
83            &ctx.mailbox,
84            &ctx.server_addr,
85            &[prepare_result.central_region],
86            &prepare_result.central_region_datanode,
87            operation_timeout,
88            ErrorStrategy::Retry,
89            Some(RegionFlushReason::Repartition),
90        )
91        .await
92    }
93
94    /// Builds instructions to sync regions on datanodes.
95    fn build_sync_region_instructions(
96        central_region: RegionId,
97        region_routes: &[RegionRoute],
98    ) -> HashMap<Peer, Vec<common_meta::instruction::SyncRegion>> {
99        let target_region_routes_by_peer = group_region_routes_by_peer(region_routes);
100        let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
101
102        for (peer, region_ids) in target_region_routes_by_peer {
103            let sync_regions = region_ids
104                .into_iter()
105                .map(|region_id| {
106                    let request = SyncRegionFromRequest::FromRegion {
107                        source_region_id: central_region,
108                        parallelism: DEFAULT_SYNC_REGION_PARALLELISM,
109                    };
110                    common_meta::instruction::SyncRegion { region_id, request }
111                })
112                .collect();
113            instructions.insert((*peer).clone(), sync_regions);
114        }
115
116        instructions
117    }
118
119    /// Syncs regions on datanodes.
120    async fn sync_regions(&self, ctx: &mut Context) -> Result<()> {
121        let table_id = ctx.persistent_ctx.table_id;
122        let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
123        let instructions = Self::build_sync_region_instructions(
124            prepare_result.central_region,
125            &self.region_routes,
126        );
127        let operation_timeout =
128            ctx.next_operation_timeout()
129                .context(error::ExceededDeadlineSnafu {
130                    operation: "Sync regions",
131                })?;
132
133        let (peers, tasks): (Vec<_>, Vec<_>) = instructions
134            .iter()
135            .map(|(peer, sync_regions)| {
136                (
137                    peer,
138                    Self::sync_region(
139                        &ctx.mailbox,
140                        &ctx.server_addr,
141                        peer,
142                        sync_regions,
143                        operation_timeout,
144                    ),
145                )
146            })
147            .unzip();
148
149        info!(
150            "Sent sync regions instructions to peers: {:?} for repartition table {}",
151            peers, table_id
152        );
153
154        let format_err_msg = |idx: usize, error: &Error| {
155            let peer = peers[idx];
156            format!(
157                "Failed to sync regions on datanode {:?}, error: {:?}",
158                peer, error
159            )
160        };
161
162        let results = join_all(tasks).await;
163        let result = handle_multiple_results(&results);
164
165        match result {
166            HandleMultipleResult::AllSuccessful => Ok(()),
167            HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
168                reason: format!(
169                    "All retryable errors during syncing regions for repartition table {}: {:?}",
170                    table_id,
171                    retryable_errors
172                        .iter()
173                        .map(|(idx, error)| format_err_msg(*idx, error))
174                        .collect::<Vec<_>>()
175                        .join(",")
176                ),
177            }
178            .fail(),
179            HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
180                violated: format!(
181                    "All non retryable errors during syncing regions for repartition table {}: {:?}",
182                    table_id,
183                    non_retryable_errors
184                        .iter()
185                        .map(|(idx, error)| format_err_msg(*idx, error))
186                        .collect::<Vec<_>>()
187                        .join(",")
188                ),
189            }
190            .fail(),
191            HandleMultipleResult::PartialRetryable {
192                retryable_errors,
193                non_retryable_errors,
194            } => error::UnexpectedSnafu {
195                violated: format!(
196                    "Partial retryable errors during syncing regions for repartition table {}: {:?}, non retryable errors: {:?}",
197                    table_id,
198                    retryable_errors
199                        .iter()
200                        .map(|(idx, error)| format_err_msg(*idx, error))
201                        .collect::<Vec<_>>()
202                        .join(","),
203                    non_retryable_errors
204                        .iter()
205                        .map(|(idx, error)| format_err_msg(*idx, error))
206                        .collect::<Vec<_>>()
207                        .join(","),
208                ),
209            }
210            .fail(),
211        }
212    }
213
214    /// Syncs regions on a datanode.
215    async fn sync_region(
216        mailbox: &MailboxRef,
217        server_addr: &str,
218        peer: &Peer,
219        sync_regions: &[common_meta::instruction::SyncRegion],
220        timeout: Duration,
221    ) -> Result<()> {
222        let ch = Channel::Datanode(peer.id);
223        let instruction = Instruction::SyncRegions(sync_regions.to_vec());
224        let tracing_ctx = TracingContext::from_current_span();
225        let message = MailboxMessage::json_message(
226            &format!(
227                "Sync regions: {:?}",
228                sync_regions.iter().map(|r| r.region_id).collect::<Vec<_>>()
229            ),
230            &format!("Metasrv@{}", server_addr),
231            &format!("Datanode-{}@{}", peer.id, peer.addr),
232            common_time::util::current_time_millis(),
233            &instruction,
234            Some(tracing_ctx.to_w3c()),
235        )
236        .with_context(|_| error::SerializeToJsonSnafu {
237            input: instruction.to_string(),
238        })?;
239
240        let now = std::time::Instant::now();
241        let receiver = mailbox.send(&ch, message, timeout).await;
242
243        let receiver = match receiver {
244            Ok(receiver) => receiver,
245            Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
246                reason: format!(
247                    "Pusher not found for sync regions on datanode {:?}, elapsed: {:?}",
248                    peer,
249                    now.elapsed()
250                ),
251            }
252            .fail()?,
253            Err(err) => {
254                return Err(err);
255            }
256        };
257
258        match receiver.await {
259            Ok(msg) => {
260                let reply = HeartbeatMailbox::json_reply(&msg)?;
261                info!(
262                    "Received sync regions reply: {:?}, elapsed: {:?}",
263                    reply,
264                    now.elapsed()
265                );
266                let InstructionReply::SyncRegions(SyncRegionsReply { replies }) = reply else {
267                    return error::UnexpectedInstructionReplySnafu {
268                        mailbox_message: msg.to_string(),
269                        reason: "expect sync regions reply",
270                    }
271                    .fail();
272                };
273                for reply in replies {
274                    Self::handle_sync_region_reply(&reply, &now, peer)?;
275                }
276                Ok(())
277            }
278            Err(error::Error::MailboxChannelClosed { .. }) => error::RetryLaterSnafu {
279                reason: format!(
280                    "Mailbox closed when sending sync region to datanode {:?}, elapsed: {:?}",
281                    peer,
282                    now.elapsed()
283                ),
284            }
285            .fail()?,
286            Err(error::Error::MailboxTimeout { .. }) => {
287                let reason = format!(
288                    "Mailbox received timeout for sync regions on datanode {:?}, elapsed: {:?}",
289                    peer,
290                    now.elapsed()
291                );
292                error::RetryLaterSnafu { reason }.fail()
293            }
294            Err(err) => Err(err),
295        }
296    }
297
298    fn handle_sync_region_reply(
299        SyncRegionReply {
300            region_id,
301            ready,
302            exists,
303            error,
304        }: &SyncRegionReply,
305        now: &Instant,
306        peer: &Peer,
307    ) -> Result<()> {
308        ensure!(
309            exists,
310            error::UnexpectedSnafu {
311                violated: format!(
312                    "Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
313                    region_id,
314                    peer,
315                    now.elapsed()
316                )
317            }
318        );
319
320        if let Some(error) = error {
321            return error::RetryLaterSnafu {
322                reason: format!(
323                    "Failed to sync region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
324                    region_id,
325                    peer,
326                    error,
327                    now.elapsed()
328                ),
329            }
330            .fail();
331        }
332
333        ensure!(
334            ready,
335            error::RetryLaterSnafu {
336                reason: format!(
337                    "Region {} failed to sync on datanode {:?}, elapsed: {:?}",
338                    region_id,
339                    peer,
340                    now.elapsed()
341                ),
342            }
343        );
344
345        Ok(())
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    use std::assert_matches;
352
353    use common_meta::peer::Peer;
354    use common_meta::rpc::router::{Region, RegionRoute};
355    use store_api::region_engine::SyncRegionFromRequest;
356    use store_api::storage::RegionId;
357
358    use crate::error::Error;
359    use crate::procedure::repartition::group::GroupPrepareResult;
360    use crate::procedure::repartition::group::sync_region::SyncRegion;
361    use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
362    use crate::procedure::test_util::{new_sync_region_reply, send_mock_reply};
363    use crate::service::mailbox::Channel;
364
365    #[test]
366    fn test_build_sync_region_instructions() {
367        let table_id = 1024;
368        let central_region = RegionId::new(table_id, 1);
369        let region_routes = vec![RegionRoute {
370            region: Region {
371                id: RegionId::new(table_id, 3),
372                ..Default::default()
373            },
374            leader_peer: Some(Peer::empty(1)),
375            ..Default::default()
376        }];
377
378        let instructions =
379            SyncRegion::build_sync_region_instructions(central_region, &region_routes);
380        assert_eq!(instructions.len(), 1);
381        let peer_instructions = instructions.get(&Peer::empty(1)).unwrap();
382        assert_eq!(peer_instructions.len(), 1);
383        assert_eq!(peer_instructions[0].region_id, RegionId::new(table_id, 3));
384        let SyncRegionFromRequest::FromRegion {
385            source_region_id, ..
386        } = &peer_instructions[0].request
387        else {
388            panic!("expect from region request");
389        };
390        assert_eq!(*source_region_id, central_region);
391    }
392
393    fn test_prepare_result(table_id: u32) -> GroupPrepareResult {
394        GroupPrepareResult {
395            source_routes: vec![],
396            target_routes: vec![],
397            central_region: RegionId::new(table_id, 1),
398            central_region_datanode: Peer::empty(1),
399        }
400    }
401
402    #[tokio::test]
403    async fn test_sync_regions_all_successful() {
404        let mut env = TestingEnv::new();
405        let table_id = 1024;
406        let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
407        persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
408
409        let (tx, rx) = tokio::sync::mpsc::channel(1);
410        env.mailbox_ctx
411            .insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
412            .await;
413        send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
414            Ok(new_sync_region_reply(
415                id,
416                RegionId::new(1024, 3),
417                true,
418                true,
419                None,
420            ))
421        });
422
423        let mut ctx = env.create_context(persistent_context);
424        let region_routes = vec![RegionRoute {
425            region: Region {
426                id: RegionId::new(table_id, 3),
427                ..Default::default()
428            },
429            leader_peer: Some(Peer::empty(1)),
430            ..Default::default()
431        }];
432        let sync_region = SyncRegion { region_routes };
433
434        sync_region.sync_regions(&mut ctx).await.unwrap();
435    }
436
437    #[tokio::test]
438    async fn test_sync_regions_retryable() {
439        let env = TestingEnv::new();
440        let table_id = 1024;
441        let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
442        persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
443
444        let mut ctx = env.create_context(persistent_context);
445        let region_routes = vec![RegionRoute {
446            region: Region {
447                id: RegionId::new(table_id, 3),
448                ..Default::default()
449            },
450            leader_peer: Some(Peer::empty(1)),
451            ..Default::default()
452        }];
453        let sync_region = SyncRegion { region_routes };
454
455        let err = sync_region.sync_regions(&mut ctx).await.unwrap_err();
456        assert_matches!(err, Error::RetryLater { .. });
457    }
458}