Skip to main content

meta_srv/procedure/region_migration/
open_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::ops::Div;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::RegionIdent;
20use common_meta::distributed_time_constants::default_distributed_time_constants;
21use common_meta::instruction::{
22    Instruction, InstructionReply, OpenRegion, OpenRegionReason, SimpleReply,
23};
24use common_meta::key::datanode_table::RegionInfo;
25use common_procedure::{Context as ProcedureContext, Status};
26use common_telemetry::info;
27use common_telemetry::tracing_context::TracingContext;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt};
30use store_api::region_engine::RegionRole;
31use store_api::region_request::RegionRequirements;
32use tokio::time::Instant;
33
34use crate::error::{self, Result};
35use crate::handler::HeartbeatMailbox;
36use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
37use crate::procedure::region_migration::{Context, RegionMigrationTriggerReason, State};
38use crate::procedure::utils::instruction_error_result;
39use crate::service::mailbox::Channel;
40
41#[derive(Debug, Serialize, Deserialize)]
42pub struct OpenCandidateRegion;
43
44#[async_trait::async_trait]
45#[typetag::serde]
46impl State for OpenCandidateRegion {
47    async fn next(
48        &mut self,
49        ctx: &mut Context,
50        _procedure_ctx: &ProcedureContext,
51    ) -> Result<(Box<dyn State>, Status)> {
52        let instruction = self.build_open_region_instruction(ctx).await?;
53        let now = Instant::now();
54        self.open_candidate_region(ctx, instruction).await?;
55        ctx.update_open_candidate_region_elapsed(now);
56
57        Ok((Box::new(PreFlushRegion), Status::executing(false)))
58    }
59
60    fn as_any(&self) -> &dyn Any {
61        self
62    }
63}
64
65impl OpenCandidateRegion {
66    /// Builds open region instructions
67    ///
68    /// Abort(non-retry):
69    /// - Datanode Table is not found.
70    async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
71        let region_ids = ctx.persistent_ctx.region_ids.clone();
72        let from_peer_id = ctx.persistent_ctx.from_peer.id;
73        let to_peer_id = ctx.persistent_ctx.to_peer.id;
74        let reason = match ctx.persistent_ctx.trigger_reason {
75            RegionMigrationTriggerReason::Failover => OpenRegionReason::RegionFailover,
76            _ => OpenRegionReason::RegionMigration,
77        };
78        let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
79        let mut open_regions = Vec::with_capacity(region_ids.len());
80
81        for region_id in region_ids {
82            let table_id = region_id.table_id();
83            let region_number = region_id.region_number();
84            let datanode_table_value = datanode_table_values.get(&table_id).context(
85                error::DatanodeTableNotFoundSnafu {
86                    table_id,
87                    datanode_id: from_peer_id,
88                },
89            )?;
90            let RegionInfo {
91                region_storage_path,
92                region_options,
93                region_wal_options,
94                engine,
95            } = datanode_table_value.region_info.clone();
96
97            open_regions.push(OpenRegion::new(
98                RegionIdent {
99                    datanode_id: to_peer_id,
100                    table_id,
101                    region_number,
102                    engine,
103                },
104                &region_storage_path,
105                region_options,
106                region_wal_options,
107                true,
108                Some(reason),
109                RegionRequirements::object_storage(),
110            ));
111        }
112
113        Ok(Instruction::OpenRegions(open_regions))
114    }
115
116    /// Opens the candidate region.
117    ///
118    /// Abort(non-retry):
119    /// - The Datanode is unreachable(e.g., Candidate pusher is not found).
120    /// - Unexpected instruction reply.
121    /// - Another procedure is opening the candidate region.
122    ///
123    /// Retry:
124    /// - Exceeded deadline of open instruction.
125    /// - Datanode failed to open the candidate region.
126    async fn open_candidate_region(
127        &self,
128        ctx: &mut Context,
129        open_instruction: Instruction,
130    ) -> Result<()> {
131        let pc = &ctx.persistent_ctx;
132        let vc = &mut ctx.volatile_ctx;
133        let region_ids = &pc.region_ids;
134        let candidate = &pc.to_peer;
135
136        // This method might be invoked multiple times.
137        // Only registers the guard if `opening_region_guard` is absent.
138        if vc.opening_region_guards.is_empty() {
139            for region_id in region_ids {
140                // Registers the opening region.
141                let guard = ctx
142                    .opening_region_keeper
143                    .register_with_role(candidate.id, *region_id, RegionRole::Follower)
144                    .context(error::RegionOperatingRaceSnafu {
145                        peer_id: candidate.id,
146                        region_id: *region_id,
147                    })?;
148                vc.opening_region_guards.push(guard);
149            }
150        }
151
152        let tracing_ctx = TracingContext::from_current_span();
153        let msg = MailboxMessage::json_message(
154            &format!("Open candidate regions: {:?}", region_ids),
155            &format!("Metasrv@{}", ctx.server_addr()),
156            &format!("Datanode-{}@{}", candidate.id, candidate.addr),
157            common_time::util::current_time_millis(),
158            &open_instruction,
159            Some(tracing_ctx.to_w3c()),
160        )
161        .with_context(|_| error::SerializeToJsonSnafu {
162            input: open_instruction.to_string(),
163        })?;
164
165        let operation_timeout =
166            ctx.next_operation_timeout()
167                .context(error::ExceededDeadlineSnafu {
168                    operation: "Open candidate region",
169                })?;
170        let operation_timeout = operation_timeout
171            .div(2)
172            .max(default_distributed_time_constants().region_lease);
173        let ch = Channel::Datanode(candidate.id);
174        let now = Instant::now();
175        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
176
177        match receiver.await {
178            Ok(msg) => {
179                let reply = HeartbeatMailbox::json_reply(&msg)?;
180                info!(
181                    "Received open region reply: {:?}, region: {:?}, elapsed: {:?}",
182                    reply,
183                    region_ids,
184                    now.elapsed()
185                );
186                let InstructionReply::OpenRegions(SimpleReply { result, error }) = reply else {
187                    return error::UnexpectedInstructionReplySnafu {
188                        mailbox_message: msg.to_string(),
189                        reason: "expect open region reply",
190                    }
191                    .fail();
192                };
193
194                if result {
195                    Ok(())
196                } else if let Some(error) = error {
197                    instruction_error_result(
198                        &error,
199                        format!(
200                            "Region {region_ids:?} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
201                            candidate,
202                            now.elapsed()
203                        ),
204                    )
205                } else {
206                    error::UnexpectedSnafu {
207                        violated: format!(
208                            "Region {region_ids:?} is not opened by datanode {:?}, but error is absent, elapsed: {:?}",
209                            candidate,
210                            now.elapsed()
211                        ),
212                    }
213                    .fail()
214                }
215            }
216            Err(error::Error::MailboxTimeout { .. }) => {
217                let reason = format!(
218                    "Mailbox received timeout for open candidate region {region_ids:?} on datanode {:?}, elapsed: {:?}",
219                    candidate,
220                    now.elapsed()
221                );
222                error::RetryLaterSnafu { reason }.fail()
223            }
224            Err(e) => Err(e),
225        }
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use std::assert_matches;
232    use std::collections::HashMap;
233
234    use common_catalog::consts::MITO2_ENGINE;
235    use common_error::ext::RetryHint;
236    use common_error::status_code::StatusCode;
237    use common_meta::DatanodeId;
238    use common_meta::instruction::InstructionError;
239    use common_meta::key::table_route::TableRouteValue;
240    use common_meta::key::test_utils::new_test_table_info;
241    use common_meta::peer::Peer;
242    use common_meta::rpc::router::{Region, RegionRoute};
243    use store_api::storage::RegionId;
244
245    use super::*;
246    use crate::error::Error;
247    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
248    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
249    use crate::procedure::test_util::{
250        new_close_region_reply, new_open_region_reply, new_open_region_reply_with_error,
251        send_mock_reply,
252    };
253
254    fn new_persistent_context() -> PersistentContext {
255        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
256    }
257
258    fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
259        Instruction::OpenRegions(vec![OpenRegion::new(
260            RegionIdent {
261                datanode_id,
262                table_id: region_id.table_id(),
263                region_number: region_id.region_number(),
264                engine: MITO2_ENGINE.to_string(),
265            },
266            "/bar/foo/region/",
267            Default::default(),
268            Default::default(),
269            true,
270            Some(OpenRegionReason::RegionMigration),
271            RegionRequirements::object_storage(),
272        )])
273    }
274
275    #[tokio::test]
276    async fn test_datanode_table_is_not_found_error() {
277        let state = OpenCandidateRegion;
278        let persistent_context = new_persistent_context();
279        let env = TestingEnv::new();
280        let mut ctx = env.context_factory().new_context(persistent_context);
281
282        let err = state
283            .build_open_region_instruction(&mut ctx)
284            .await
285            .unwrap_err();
286
287        assert_matches!(err, Error::DatanodeTableNotFound { .. });
288        assert!(!err.is_retryable());
289    }
290
291    #[tokio::test]
292    async fn test_build_open_region_instruction_reason() {
293        let state = OpenCandidateRegion;
294        let mut persistent_context = new_persistent_context();
295        let from_peer_id = persistent_context.from_peer.id;
296        let region_id = persistent_context.region_ids[0];
297        let env = TestingEnv::new();
298
299        let table_info = new_test_table_info(1024);
300        let region_routes = vec![RegionRoute {
301            region: Region::new_test(region_id),
302            leader_peer: Some(Peer::empty(from_peer_id)),
303            ..Default::default()
304        }];
305        env.table_metadata_manager()
306            .create_table_metadata(
307                table_info,
308                TableRouteValue::physical(region_routes),
309                HashMap::default(),
310            )
311            .await
312            .unwrap();
313
314        let mut ctx = env
315            .context_factory()
316            .new_context(persistent_context.clone());
317        let instruction = state.build_open_region_instruction(&mut ctx).await.unwrap();
318        let open_regions = instruction.into_open_regions().unwrap();
319        assert_eq!(
320            Some(OpenRegionReason::RegionMigration),
321            open_regions[0].reason
322        );
323        assert_eq!(
324            RegionRequirements::object_storage(),
325            open_regions[0].requirements
326        );
327
328        persistent_context.trigger_reason = RegionMigrationTriggerReason::Failover;
329        let mut ctx = env.context_factory().new_context(persistent_context);
330        let instruction = state.build_open_region_instruction(&mut ctx).await.unwrap();
331        let open_regions = instruction.into_open_regions().unwrap();
332        assert_eq!(
333            Some(OpenRegionReason::RegionFailover),
334            open_regions[0].reason
335        );
336        assert_eq!(
337            RegionRequirements::object_storage(),
338            open_regions[0].requirements
339        );
340    }
341
342    #[tokio::test]
343    async fn test_datanode_is_unreachable() {
344        let state = OpenCandidateRegion;
345        // from_peer: 1
346        // to_peer: 2
347        let persistent_context = new_persistent_context();
348        let region_id = persistent_context.region_ids[0];
349        let to_peer_id = persistent_context.to_peer.id;
350        let env = TestingEnv::new();
351        let mut ctx = env.context_factory().new_context(persistent_context);
352
353        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
354        let err = state
355            .open_candidate_region(&mut ctx, open_instruction)
356            .await
357            .unwrap_err();
358
359        assert_matches!(err, Error::PusherNotFound { .. });
360        assert!(!err.is_retryable());
361    }
362
363    #[tokio::test]
364    async fn test_candidate_region_opening_error() {
365        let state = OpenCandidateRegion;
366        // from_peer: 1
367        // to_peer: 2
368        let persistent_context = new_persistent_context();
369        let region_id = persistent_context.region_ids[0];
370        let to_peer_id = persistent_context.to_peer.id;
371
372        let env = TestingEnv::new();
373        let mut ctx = env.context_factory().new_context(persistent_context);
374        let opening_region_keeper = env.opening_region_keeper();
375        let _guard = opening_region_keeper
376            .register_with_role(to_peer_id, region_id, RegionRole::Follower)
377            .unwrap();
378
379        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
380        let err = state
381            .open_candidate_region(&mut ctx, open_instruction)
382            .await
383            .unwrap_err();
384
385        assert_matches!(err, Error::RegionOperatingRace { .. });
386        assert!(!err.is_retryable());
387    }
388
389    #[tokio::test]
390    async fn test_unexpected_instruction_reply() {
391        let state = OpenCandidateRegion;
392        // from_peer: 1
393        // to_peer: 2
394        let persistent_context = new_persistent_context();
395        let region_id = persistent_context.region_ids[0];
396        let to_peer_id = persistent_context.to_peer.id;
397
398        let mut env = TestingEnv::new();
399        let mut ctx = env.context_factory().new_context(persistent_context);
400        let mailbox_ctx = env.mailbox_context();
401        let mailbox = mailbox_ctx.mailbox().clone();
402
403        let (tx, rx) = tokio::sync::mpsc::channel(1);
404
405        mailbox_ctx
406            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
407            .await;
408
409        // Sends an incorrect reply.
410        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
411
412        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
413        let err = state
414            .open_candidate_region(&mut ctx, open_instruction)
415            .await
416            .unwrap_err();
417
418        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
419        assert!(!err.is_retryable());
420    }
421
422    #[tokio::test]
423    async fn test_instruction_exceeded_deadline() {
424        let state = OpenCandidateRegion;
425        // from_peer: 1
426        // to_peer: 2
427        let persistent_context = new_persistent_context();
428        let region_id = persistent_context.region_ids[0];
429        let to_peer_id = persistent_context.to_peer.id;
430
431        let mut env = TestingEnv::new();
432        let mut ctx = env.context_factory().new_context(persistent_context);
433        let mailbox_ctx = env.mailbox_context();
434        let mailbox = mailbox_ctx.mailbox().clone();
435
436        let (tx, rx) = tokio::sync::mpsc::channel(1);
437
438        mailbox_ctx
439            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
440            .await;
441
442        // Sends an timeout error.
443        send_mock_reply(mailbox, rx, |id| {
444            Err(error::MailboxTimeoutSnafu { id }.build())
445        });
446
447        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
448        let err = state
449            .open_candidate_region(&mut ctx, open_instruction)
450            .await
451            .unwrap_err();
452
453        assert_matches!(err, Error::RetryLater { .. });
454        assert!(err.is_retryable());
455    }
456
457    #[tokio::test]
458    async fn test_open_candidate_region_failed() {
459        let state = OpenCandidateRegion;
460        // from_peer: 1
461        // to_peer: 2
462        let persistent_context = new_persistent_context();
463        let region_id = persistent_context.region_ids[0];
464        let to_peer_id = persistent_context.to_peer.id;
465        let mut env = TestingEnv::new();
466
467        let mut ctx = env.context_factory().new_context(persistent_context);
468        let mailbox_ctx = env.mailbox_context();
469        let mailbox = mailbox_ctx.mailbox().clone();
470
471        let (tx, rx) = tokio::sync::mpsc::channel(1);
472
473        mailbox_ctx
474            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
475            .await;
476
477        send_mock_reply(mailbox, rx, |id| {
478            Ok(new_open_region_reply(
479                id,
480                false,
481                Some("test mocked".to_string()),
482            ))
483        });
484
485        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
486        let err = state
487            .open_candidate_region(&mut ctx, open_instruction)
488            .await
489            .unwrap_err();
490
491        assert_matches!(err, Error::RetryLater { .. });
492        assert!(err.is_retryable());
493        assert!(format!("{err:?}").contains("test mocked"));
494    }
495
496    #[tokio::test]
497    async fn test_open_candidate_region_non_retryable_instruction_error() {
498        let state = OpenCandidateRegion;
499        let persistent_context = new_persistent_context();
500        let region_id = persistent_context.region_ids[0];
501        let to_peer_id = persistent_context.to_peer.id;
502        let mut env = TestingEnv::new();
503
504        let mut ctx = env.context_factory().new_context(persistent_context);
505        let mailbox_ctx = env.mailbox_context();
506        let mailbox = mailbox_ctx.mailbox().clone();
507
508        let (tx, rx) = tokio::sync::mpsc::channel(1);
509
510        mailbox_ctx
511            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
512            .await;
513
514        send_mock_reply(mailbox, rx, |id| {
515            Ok(new_open_region_reply_with_error(
516                id,
517                false,
518                Some(InstructionError {
519                    code: StatusCode::Internal,
520                    message: "non retryable mocked".to_string(),
521                    retry_hint: RetryHint::NonRetryable,
522                }),
523            ))
524        });
525
526        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
527        let err = state
528            .open_candidate_region(&mut ctx, open_instruction)
529            .await
530            .unwrap_err();
531
532        assert_matches!(err, Error::Unexpected { .. });
533        assert!(!err.is_retryable());
534        assert!(format!("{err:?}").contains("non retryable mocked"));
535    }
536
537    #[tokio::test]
538    async fn test_open_candidate_region_false_without_error_is_unexpected() {
539        let state = OpenCandidateRegion;
540        let persistent_context = new_persistent_context();
541        let region_id = persistent_context.region_ids[0];
542        let to_peer_id = persistent_context.to_peer.id;
543        let mut env = TestingEnv::new();
544
545        let mut ctx = env.context_factory().new_context(persistent_context);
546        let mailbox_ctx = env.mailbox_context();
547        let mailbox = mailbox_ctx.mailbox().clone();
548
549        let (tx, rx) = tokio::sync::mpsc::channel(1);
550
551        mailbox_ctx
552            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
553            .await;
554
555        send_mock_reply(mailbox, rx, |id| {
556            Ok(new_open_region_reply_with_error(id, false, None))
557        });
558
559        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
560        let err = state
561            .open_candidate_region(&mut ctx, open_instruction)
562            .await
563            .unwrap_err();
564
565        assert_matches!(err, Error::Unexpected { .. });
566        assert!(!err.is_retryable());
567    }
568
569    #[tokio::test]
570    async fn test_next_flush_leader_region_state() {
571        let mut state = Box::new(OpenCandidateRegion);
572        // from_peer: 1
573        // to_peer: 2
574        let persistent_context = new_persistent_context();
575        let from_peer_id = persistent_context.from_peer.id;
576        let region_id = persistent_context.region_ids[0];
577        let to_peer_id = persistent_context.to_peer.id;
578        let mut env = TestingEnv::new();
579
580        // Prepares table
581        let table_info = new_test_table_info(1024);
582        let region_routes = vec![RegionRoute {
583            region: Region::new_test(region_id),
584            leader_peer: Some(Peer::empty(from_peer_id)),
585            ..Default::default()
586        }];
587
588        env.table_metadata_manager()
589            .create_table_metadata(
590                table_info,
591                TableRouteValue::physical(region_routes),
592                HashMap::default(),
593            )
594            .await
595            .unwrap();
596
597        let mut ctx = env.context_factory().new_context(persistent_context);
598        let mailbox_ctx = env.mailbox_context();
599        let mailbox = mailbox_ctx.mailbox().clone();
600
601        let (tx, rx) = tokio::sync::mpsc::channel(1);
602
603        mailbox_ctx
604            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
605            .await;
606
607        send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
608        let procedure_ctx = new_procedure_context();
609        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
610        let vc = ctx.volatile_ctx;
611        assert_eq!(vc.opening_region_guards[0].info(), (to_peer_id, region_id));
612
613        let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
614        assert_matches!(flush_leader_region, PreFlushRegion);
615    }
616}