meta_srv/procedure/repartition/group/
sync_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{Instruction, InstructionReply, SyncRegionReply, SyncRegionsReply};
21use common_meta::peer::Peer;
22use common_meta::rpc::router::RegionRoute;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use common_telemetry::tracing_context::TracingContext;
26use futures::future::join_all;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::region_engine::SyncRegionFromRequest;
30use store_api::storage::RegionId;
31
32use crate::error::{self, Error, Result};
33use crate::handler::HeartbeatMailbox;
34use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
35use crate::procedure::repartition::group::utils::{
36    HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
37};
38use crate::procedure::repartition::group::{Context, State};
39use crate::procedure::utils::ErrorStrategy;
40use crate::service::mailbox::{Channel, MailboxRef};
41
42const DEFAULT_SYNC_REGION_PARALLELISM: usize = 3;
43
44/// The state of syncing regions for a repartition group.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct SyncRegion {
47    pub region_routes: Vec<RegionRoute>,
48}
49
50#[async_trait::async_trait]
51#[typetag::serde]
52impl State for SyncRegion {
53    async fn next(
54        &mut self,
55        ctx: &mut Context,
56        _procedure_ctx: &ProcedureContext,
57    ) -> Result<(Box<dyn State>, Status)> {
58        Self::flush_central_region(ctx).await?;
59        self.sync_regions(ctx).await?;
60
61        Ok((
62            Box::new(UpdateMetadata::ApplyStaging),
63            Status::executing(true),
64        ))
65    }
66
67    fn as_any(&self) -> &dyn Any {
68        self
69    }
70}
71
72impl SyncRegion {
73    async fn flush_central_region(ctx: &mut Context) -> Result<()> {
74        let operation_timeout =
75            ctx.next_operation_timeout()
76                .context(error::ExceededDeadlineSnafu {
77                    operation: "Flush central region",
78                })?;
79        let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
80
81        crate::procedure::utils::flush_region(
82            &ctx.mailbox,
83            &ctx.server_addr,
84            &[prepare_result.central_region],
85            &prepare_result.central_region_datanode,
86            operation_timeout,
87            ErrorStrategy::Retry,
88        )
89        .await
90    }
91
92    /// Builds instructions to sync regions on datanodes.
93    fn build_sync_region_instructions(
94        central_region: RegionId,
95        region_routes: &[RegionRoute],
96    ) -> HashMap<Peer, Vec<common_meta::instruction::SyncRegion>> {
97        let target_region_routes_by_peer = group_region_routes_by_peer(region_routes);
98        let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
99
100        for (peer, region_ids) in target_region_routes_by_peer {
101            let sync_regions = region_ids
102                .into_iter()
103                .map(|region_id| {
104                    let request = SyncRegionFromRequest::FromRegion {
105                        source_region_id: central_region,
106                        parallelism: DEFAULT_SYNC_REGION_PARALLELISM,
107                    };
108                    common_meta::instruction::SyncRegion { region_id, request }
109                })
110                .collect();
111            instructions.insert((*peer).clone(), sync_regions);
112        }
113
114        instructions
115    }
116
117    /// Syncs regions on datanodes.
118    async fn sync_regions(&self, ctx: &mut Context) -> Result<()> {
119        let table_id = ctx.persistent_ctx.table_id;
120        let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
121        let instructions = Self::build_sync_region_instructions(
122            prepare_result.central_region,
123            &self.region_routes,
124        );
125        let operation_timeout =
126            ctx.next_operation_timeout()
127                .context(error::ExceededDeadlineSnafu {
128                    operation: "Sync regions",
129                })?;
130
131        let (peers, tasks): (Vec<_>, Vec<_>) = instructions
132            .iter()
133            .map(|(peer, sync_regions)| {
134                (
135                    peer,
136                    Self::sync_region(
137                        &ctx.mailbox,
138                        &ctx.server_addr,
139                        peer,
140                        sync_regions,
141                        operation_timeout,
142                    ),
143                )
144            })
145            .unzip();
146
147        info!(
148            "Sent sync regions instructions to peers: {:?} for repartition table {}",
149            peers, table_id
150        );
151
152        let format_err_msg = |idx: usize, error: &Error| {
153            let peer = peers[idx];
154            format!(
155                "Failed to sync regions on datanode {:?}, error: {:?}",
156                peer, error
157            )
158        };
159
160        let results = join_all(tasks).await;
161        let result = handle_multiple_results(&results);
162
163        match result {
164            HandleMultipleResult::AllSuccessful => Ok(()),
165            HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
166                reason: format!(
167                    "All retryable errors during syncing regions for repartition table {}: {:?}",
168                    table_id,
169                    retryable_errors
170                        .iter()
171                        .map(|(idx, error)| format_err_msg(*idx, error))
172                        .collect::<Vec<_>>()
173                        .join(",")
174                ),
175            }
176            .fail(),
177            HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
178                violated: format!(
179                    "All non retryable errors during syncing regions for repartition table {}: {:?}",
180                    table_id,
181                    non_retryable_errors
182                        .iter()
183                        .map(|(idx, error)| format_err_msg(*idx, error))
184                        .collect::<Vec<_>>()
185                        .join(",")
186                ),
187            }
188            .fail(),
189            HandleMultipleResult::PartialRetryable {
190                retryable_errors,
191                non_retryable_errors,
192            } => error::UnexpectedSnafu {
193                violated: format!(
194                    "Partial retryable errors during syncing regions for repartition table {}: {:?}, non retryable errors: {:?}",
195                    table_id,
196                    retryable_errors
197                        .iter()
198                        .map(|(idx, error)| format_err_msg(*idx, error))
199                        .collect::<Vec<_>>()
200                        .join(","),
201                    non_retryable_errors
202                        .iter()
203                        .map(|(idx, error)| format_err_msg(*idx, error))
204                        .collect::<Vec<_>>()
205                        .join(","),
206                ),
207            }
208            .fail(),
209        }
210    }
211
212    /// Syncs regions on a datanode.
213    async fn sync_region(
214        mailbox: &MailboxRef,
215        server_addr: &str,
216        peer: &Peer,
217        sync_regions: &[common_meta::instruction::SyncRegion],
218        timeout: Duration,
219    ) -> Result<()> {
220        let ch = Channel::Datanode(peer.id);
221        let instruction = Instruction::SyncRegions(sync_regions.to_vec());
222        let tracing_ctx = TracingContext::from_current_span();
223        let message = MailboxMessage::json_message(
224            &format!(
225                "Sync regions: {:?}",
226                sync_regions.iter().map(|r| r.region_id).collect::<Vec<_>>()
227            ),
228            &format!("Metasrv@{}", server_addr),
229            &format!("Datanode-{}@{}", peer.id, peer.addr),
230            common_time::util::current_time_millis(),
231            &instruction,
232            Some(tracing_ctx.to_w3c()),
233        )
234        .with_context(|_| error::SerializeToJsonSnafu {
235            input: instruction.to_string(),
236        })?;
237
238        let now = std::time::Instant::now();
239        let receiver = mailbox.send(&ch, message, timeout).await;
240
241        let receiver = match receiver {
242            Ok(receiver) => receiver,
243            Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
244                reason: format!(
245                    "Pusher not found for sync regions on datanode {:?}, elapsed: {:?}",
246                    peer,
247                    now.elapsed()
248                ),
249            }
250            .fail()?,
251            Err(err) => {
252                return Err(err);
253            }
254        };
255
256        match receiver.await {
257            Ok(msg) => {
258                let reply = HeartbeatMailbox::json_reply(&msg)?;
259                info!(
260                    "Received sync regions reply: {:?}, elapsed: {:?}",
261                    reply,
262                    now.elapsed()
263                );
264                let InstructionReply::SyncRegions(SyncRegionsReply { replies }) = reply else {
265                    return error::UnexpectedInstructionReplySnafu {
266                        mailbox_message: msg.to_string(),
267                        reason: "expect sync regions reply",
268                    }
269                    .fail();
270                };
271                for reply in replies {
272                    Self::handle_sync_region_reply(&reply, &now, peer)?;
273                }
274                Ok(())
275            }
276            Err(error::Error::MailboxTimeout { .. }) => {
277                let reason = format!(
278                    "Mailbox received timeout for sync regions on datanode {:?}, elapsed: {:?}",
279                    peer,
280                    now.elapsed()
281                );
282                error::RetryLaterSnafu { reason }.fail()
283            }
284            Err(err) => Err(err),
285        }
286    }
287
288    fn handle_sync_region_reply(
289        SyncRegionReply {
290            region_id,
291            ready,
292            exists,
293            error,
294        }: &SyncRegionReply,
295        now: &Instant,
296        peer: &Peer,
297    ) -> Result<()> {
298        ensure!(
299            exists,
300            error::UnexpectedSnafu {
301                violated: format!(
302                    "Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
303                    region_id,
304                    peer,
305                    now.elapsed()
306                )
307            }
308        );
309
310        if let Some(error) = error {
311            return error::RetryLaterSnafu {
312                reason: format!(
313                    "Failed to sync region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
314                    region_id,
315                    peer,
316                    error,
317                    now.elapsed()
318                ),
319            }
320            .fail();
321        }
322
323        ensure!(
324            ready,
325            error::RetryLaterSnafu {
326                reason: format!(
327                    "Region {} failed to sync on datanode {:?}, elapsed: {:?}",
328                    region_id,
329                    peer,
330                    now.elapsed()
331                ),
332            }
333        );
334
335        Ok(())
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use std::assert_matches::assert_matches;
342
343    use common_meta::peer::Peer;
344    use common_meta::rpc::router::{Region, RegionRoute};
345    use store_api::region_engine::SyncRegionFromRequest;
346    use store_api::storage::RegionId;
347
348    use crate::error::Error;
349    use crate::procedure::repartition::group::GroupPrepareResult;
350    use crate::procedure::repartition::group::sync_region::SyncRegion;
351    use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
352    use crate::procedure::test_util::{new_sync_region_reply, send_mock_reply};
353    use crate::service::mailbox::Channel;
354
355    #[test]
356    fn test_build_sync_region_instructions() {
357        let table_id = 1024;
358        let central_region = RegionId::new(table_id, 1);
359        let region_routes = vec![RegionRoute {
360            region: Region {
361                id: RegionId::new(table_id, 3),
362                ..Default::default()
363            },
364            leader_peer: Some(Peer::empty(1)),
365            ..Default::default()
366        }];
367
368        let instructions =
369            SyncRegion::build_sync_region_instructions(central_region, &region_routes);
370        assert_eq!(instructions.len(), 1);
371        let peer_instructions = instructions.get(&Peer::empty(1)).unwrap();
372        assert_eq!(peer_instructions.len(), 1);
373        assert_eq!(peer_instructions[0].region_id, RegionId::new(table_id, 3));
374        let SyncRegionFromRequest::FromRegion {
375            source_region_id, ..
376        } = &peer_instructions[0].request
377        else {
378            panic!("expect from region request");
379        };
380        assert_eq!(*source_region_id, central_region);
381    }
382
383    fn test_prepare_result(table_id: u32) -> GroupPrepareResult {
384        GroupPrepareResult {
385            source_routes: vec![],
386            target_routes: vec![],
387            central_region: RegionId::new(table_id, 1),
388            central_region_datanode: Peer::empty(1),
389        }
390    }
391
392    #[tokio::test]
393    async fn test_sync_regions_all_successful() {
394        let mut env = TestingEnv::new();
395        let table_id = 1024;
396        let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
397        persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
398
399        let (tx, rx) = tokio::sync::mpsc::channel(1);
400        env.mailbox_ctx
401            .insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
402            .await;
403        send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
404            Ok(new_sync_region_reply(
405                id,
406                RegionId::new(1024, 3),
407                true,
408                true,
409                None,
410            ))
411        });
412
413        let mut ctx = env.create_context(persistent_context);
414        let region_routes = vec![RegionRoute {
415            region: Region {
416                id: RegionId::new(table_id, 3),
417                ..Default::default()
418            },
419            leader_peer: Some(Peer::empty(1)),
420            ..Default::default()
421        }];
422        let sync_region = SyncRegion { region_routes };
423
424        sync_region.sync_regions(&mut ctx).await.unwrap();
425    }
426
427    #[tokio::test]
428    async fn test_sync_regions_retryable() {
429        let env = TestingEnv::new();
430        let table_id = 1024;
431        let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
432        persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
433
434        let mut ctx = env.create_context(persistent_context);
435        let region_routes = vec![RegionRoute {
436            region: Region {
437                id: RegionId::new(table_id, 3),
438                ..Default::default()
439            },
440            leader_peer: Some(Peer::empty(1)),
441            ..Default::default()
442        }];
443        let sync_region = SyncRegion { region_routes };
444
445        let err = sync_region.sync_regions(&mut ctx).await.unwrap_err();
446        assert_matches!(err, Error::RetryLater { .. });
447    }
448}