meta_srv/procedure/repartition/group/
sync_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{Instruction, InstructionReply, SyncRegionReply, SyncRegionsReply};
21use common_meta::peer::Peer;
22use common_meta::rpc::router::RegionRoute;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use futures::future::join_all;
26use serde::{Deserialize, Serialize};
27use snafu::{OptionExt, ResultExt, ensure};
28use store_api::region_engine::SyncRegionFromRequest;
29use store_api::storage::RegionId;
30
31use crate::error::{self, Error, Result};
32use crate::handler::HeartbeatMailbox;
33use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
34use crate::procedure::repartition::group::utils::{
35    HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
36};
37use crate::procedure::repartition::group::{Context, State};
38use crate::procedure::utils::ErrorStrategy;
39use crate::service::mailbox::{Channel, MailboxRef};
40
41const DEFAULT_SYNC_REGION_PARALLELISM: usize = 3;
42
43/// The state of syncing regions for a repartition group.
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct SyncRegion {
46    pub region_routes: Vec<RegionRoute>,
47}
48
49#[async_trait::async_trait]
50#[typetag::serde]
51impl State for SyncRegion {
52    async fn next(
53        &mut self,
54        ctx: &mut Context,
55        _procedure_ctx: &ProcedureContext,
56    ) -> Result<(Box<dyn State>, Status)> {
57        Self::flush_central_region(ctx).await?;
58        self.sync_regions(ctx).await?;
59
60        Ok((
61            Box::new(UpdateMetadata::ApplyStaging),
62            Status::executing(true),
63        ))
64    }
65
66    fn as_any(&self) -> &dyn Any {
67        self
68    }
69}
70
71impl SyncRegion {
72    async fn flush_central_region(ctx: &mut Context) -> Result<()> {
73        let operation_timeout =
74            ctx.next_operation_timeout()
75                .context(error::ExceededDeadlineSnafu {
76                    operation: "Flush central region",
77                })?;
78        let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
79
80        crate::procedure::utils::flush_region(
81            &ctx.mailbox,
82            &ctx.server_addr,
83            &[prepare_result.central_region],
84            &prepare_result.central_region_datanode,
85            operation_timeout,
86            ErrorStrategy::Retry,
87        )
88        .await
89    }
90
91    /// Builds instructions to sync regions on datanodes.
92    fn build_sync_region_instructions(
93        central_region: RegionId,
94        region_routes: &[RegionRoute],
95    ) -> HashMap<Peer, Vec<common_meta::instruction::SyncRegion>> {
96        let target_region_routes_by_peer = group_region_routes_by_peer(region_routes);
97        let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
98
99        for (peer, region_ids) in target_region_routes_by_peer {
100            let sync_regions = region_ids
101                .into_iter()
102                .map(|region_id| {
103                    let request = SyncRegionFromRequest::FromRegion {
104                        source_region_id: central_region,
105                        parallelism: DEFAULT_SYNC_REGION_PARALLELISM,
106                    };
107                    common_meta::instruction::SyncRegion { region_id, request }
108                })
109                .collect();
110            instructions.insert((*peer).clone(), sync_regions);
111        }
112
113        instructions
114    }
115
116    /// Syncs regions on datanodes.
117    async fn sync_regions(&self, ctx: &mut Context) -> Result<()> {
118        let table_id = ctx.persistent_ctx.table_id;
119        let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
120        let instructions = Self::build_sync_region_instructions(
121            prepare_result.central_region,
122            &self.region_routes,
123        );
124        let operation_timeout =
125            ctx.next_operation_timeout()
126                .context(error::ExceededDeadlineSnafu {
127                    operation: "Sync regions",
128                })?;
129
130        let (peers, tasks): (Vec<_>, Vec<_>) = instructions
131            .iter()
132            .map(|(peer, sync_regions)| {
133                (
134                    peer,
135                    Self::sync_region(
136                        &ctx.mailbox,
137                        &ctx.server_addr,
138                        peer,
139                        sync_regions,
140                        operation_timeout,
141                    ),
142                )
143            })
144            .unzip();
145
146        info!(
147            "Sent sync regions instructions to peers: {:?} for repartition table {}",
148            peers, table_id
149        );
150
151        let format_err_msg = |idx: usize, error: &Error| {
152            let peer = peers[idx];
153            format!(
154                "Failed to sync regions on datanode {:?}, error: {:?}",
155                peer, error
156            )
157        };
158
159        let results = join_all(tasks).await;
160        let result = handle_multiple_results(&results);
161
162        match result {
163            HandleMultipleResult::AllSuccessful => Ok(()),
164            HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
165                reason: format!(
166                    "All retryable errors during syncing regions for repartition table {}: {:?}",
167                    table_id,
168                    retryable_errors
169                        .iter()
170                        .map(|(idx, error)| format_err_msg(*idx, error))
171                        .collect::<Vec<_>>()
172                        .join(",")
173                ),
174            }
175            .fail(),
176            HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
177                violated: format!(
178                    "All non retryable errors during syncing regions for repartition table {}: {:?}",
179                    table_id,
180                    non_retryable_errors
181                        .iter()
182                        .map(|(idx, error)| format_err_msg(*idx, error))
183                        .collect::<Vec<_>>()
184                        .join(",")
185                ),
186            }
187            .fail(),
188            HandleMultipleResult::PartialRetryable {
189                retryable_errors,
190                non_retryable_errors,
191            } => error::UnexpectedSnafu {
192                violated: format!(
193                    "Partial retryable errors during syncing regions for repartition table {}: {:?}, non retryable errors: {:?}",
194                    table_id,
195                    retryable_errors
196                        .iter()
197                        .map(|(idx, error)| format_err_msg(*idx, error))
198                        .collect::<Vec<_>>()
199                        .join(","),
200                    non_retryable_errors
201                        .iter()
202                        .map(|(idx, error)| format_err_msg(*idx, error))
203                        .collect::<Vec<_>>()
204                        .join(","),
205                ),
206            }
207            .fail(),
208        }
209    }
210
211    /// Syncs regions on a datanode.
212    async fn sync_region(
213        mailbox: &MailboxRef,
214        server_addr: &str,
215        peer: &Peer,
216        sync_regions: &[common_meta::instruction::SyncRegion],
217        timeout: Duration,
218    ) -> Result<()> {
219        let ch = Channel::Datanode(peer.id);
220        let instruction = Instruction::SyncRegions(sync_regions.to_vec());
221        let message = MailboxMessage::json_message(
222            &format!(
223                "Sync regions: {:?}",
224                sync_regions.iter().map(|r| r.region_id).collect::<Vec<_>>()
225            ),
226            &format!("Metasrv@{}", server_addr),
227            &format!("Datanode-{}@{}", peer.id, peer.addr),
228            common_time::util::current_time_millis(),
229            &instruction,
230        )
231        .with_context(|_| error::SerializeToJsonSnafu {
232            input: instruction.to_string(),
233        })?;
234
235        let now = std::time::Instant::now();
236        let receiver = mailbox.send(&ch, message, timeout).await;
237
238        let receiver = match receiver {
239            Ok(receiver) => receiver,
240            Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
241                reason: format!(
242                    "Pusher not found for sync regions on datanode {:?}, elapsed: {:?}",
243                    peer,
244                    now.elapsed()
245                ),
246            }
247            .fail()?,
248            Err(err) => {
249                return Err(err);
250            }
251        };
252
253        match receiver.await {
254            Ok(msg) => {
255                let reply = HeartbeatMailbox::json_reply(&msg)?;
256                info!(
257                    "Received sync regions reply: {:?}, elapsed: {:?}",
258                    reply,
259                    now.elapsed()
260                );
261                let InstructionReply::SyncRegions(SyncRegionsReply { replies }) = reply else {
262                    return error::UnexpectedInstructionReplySnafu {
263                        mailbox_message: msg.to_string(),
264                        reason: "expect sync regions reply",
265                    }
266                    .fail();
267                };
268                for reply in replies {
269                    Self::handle_sync_region_reply(&reply, &now, peer)?;
270                }
271                Ok(())
272            }
273            Err(error::Error::MailboxTimeout { .. }) => {
274                let reason = format!(
275                    "Mailbox received timeout for sync regions on datanode {:?}, elapsed: {:?}",
276                    peer,
277                    now.elapsed()
278                );
279                error::RetryLaterSnafu { reason }.fail()
280            }
281            Err(err) => Err(err),
282        }
283    }
284
285    fn handle_sync_region_reply(
286        SyncRegionReply {
287            region_id,
288            ready,
289            exists,
290            error,
291        }: &SyncRegionReply,
292        now: &Instant,
293        peer: &Peer,
294    ) -> Result<()> {
295        ensure!(
296            exists,
297            error::UnexpectedSnafu {
298                violated: format!(
299                    "Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
300                    region_id,
301                    peer,
302                    now.elapsed()
303                )
304            }
305        );
306
307        if let Some(error) = error {
308            return error::RetryLaterSnafu {
309                reason: format!(
310                    "Failed to sync region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
311                    region_id,
312                    peer,
313                    error,
314                    now.elapsed()
315                ),
316            }
317            .fail();
318        }
319
320        ensure!(
321            ready,
322            error::RetryLaterSnafu {
323                reason: format!(
324                    "Region {} failed to sync on datanode {:?}, elapsed: {:?}",
325                    region_id,
326                    peer,
327                    now.elapsed()
328                ),
329            }
330        );
331
332        Ok(())
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use std::assert_matches::assert_matches;
339
340    use common_meta::peer::Peer;
341    use common_meta::rpc::router::{Region, RegionRoute};
342    use store_api::region_engine::SyncRegionFromRequest;
343    use store_api::storage::RegionId;
344
345    use crate::error::Error;
346    use crate::procedure::repartition::group::GroupPrepareResult;
347    use crate::procedure::repartition::group::sync_region::SyncRegion;
348    use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
349    use crate::procedure::test_util::{new_sync_region_reply, send_mock_reply};
350    use crate::service::mailbox::Channel;
351
352    #[test]
353    fn test_build_sync_region_instructions() {
354        let table_id = 1024;
355        let central_region = RegionId::new(table_id, 1);
356        let region_routes = vec![RegionRoute {
357            region: Region {
358                id: RegionId::new(table_id, 3),
359                ..Default::default()
360            },
361            leader_peer: Some(Peer::empty(1)),
362            ..Default::default()
363        }];
364
365        let instructions =
366            SyncRegion::build_sync_region_instructions(central_region, &region_routes);
367        assert_eq!(instructions.len(), 1);
368        let peer_instructions = instructions.get(&Peer::empty(1)).unwrap();
369        assert_eq!(peer_instructions.len(), 1);
370        assert_eq!(peer_instructions[0].region_id, RegionId::new(table_id, 3));
371        let SyncRegionFromRequest::FromRegion {
372            source_region_id, ..
373        } = &peer_instructions[0].request
374        else {
375            panic!("expect from region request");
376        };
377        assert_eq!(*source_region_id, central_region);
378    }
379
380    fn test_prepare_result(table_id: u32) -> GroupPrepareResult {
381        GroupPrepareResult {
382            source_routes: vec![],
383            target_routes: vec![],
384            central_region: RegionId::new(table_id, 1),
385            central_region_datanode: Peer::empty(1),
386        }
387    }
388
389    #[tokio::test]
390    async fn test_sync_regions_all_successful() {
391        let mut env = TestingEnv::new();
392        let table_id = 1024;
393        let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
394        persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
395
396        let (tx, rx) = tokio::sync::mpsc::channel(1);
397        env.mailbox_ctx
398            .insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
399            .await;
400        send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
401            Ok(new_sync_region_reply(
402                id,
403                RegionId::new(1024, 3),
404                true,
405                true,
406                None,
407            ))
408        });
409
410        let mut ctx = env.create_context(persistent_context);
411        let region_routes = vec![RegionRoute {
412            region: Region {
413                id: RegionId::new(table_id, 3),
414                ..Default::default()
415            },
416            leader_peer: Some(Peer::empty(1)),
417            ..Default::default()
418        }];
419        let sync_region = SyncRegion { region_routes };
420
421        sync_region.sync_regions(&mut ctx).await.unwrap();
422    }
423
424    #[tokio::test]
425    async fn test_sync_regions_retryable() {
426        let env = TestingEnv::new();
427        let table_id = 1024;
428        let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
429        persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
430
431        let mut ctx = env.create_context(persistent_context);
432        let region_routes = vec![RegionRoute {
433            region: Region {
434                id: RegionId::new(table_id, 3),
435                ..Default::default()
436            },
437            leader_peer: Some(Peer::empty(1)),
438            ..Default::default()
439        }];
440        let sync_region = SyncRegion { region_routes };
441
442        let err = sync_region.sync_regions(&mut ctx).await.unwrap_err();
443        assert_matches!(err, Error::RetryLater { .. });
444    }
445}