meta_srv/procedure/region_migration/
open_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::distributed_time_constants::REGION_LEASE_SECS;
20use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
21use common_meta::key::datanode_table::RegionInfo;
22use common_meta::RegionIdent;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use serde::{Deserialize, Serialize};
26use snafu::{OptionExt, ResultExt};
27use tokio::time::Instant;
28
29use crate::error::{self, Result};
30use crate::handler::HeartbeatMailbox;
31use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
32use crate::procedure::region_migration::{Context, State};
33use crate::service::mailbox::Channel;
34
35/// Uses lease time of a region as the timeout of opening a candidate region.
36const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
37
38#[derive(Debug, Serialize, Deserialize)]
39pub struct OpenCandidateRegion;
40
41#[async_trait::async_trait]
42#[typetag::serde]
43impl State for OpenCandidateRegion {
44    async fn next(
45        &mut self,
46        ctx: &mut Context,
47        _procedure_ctx: &ProcedureContext,
48    ) -> Result<(Box<dyn State>, Status)> {
49        let instruction = self.build_open_region_instruction(ctx).await?;
50        let now = Instant::now();
51        self.open_candidate_region(ctx, instruction).await?;
52        ctx.update_open_candidate_region_elapsed(now);
53
54        Ok((Box::new(PreFlushRegion), Status::executing(false)))
55    }
56
57    fn as_any(&self) -> &dyn Any {
58        self
59    }
60}
61
62impl OpenCandidateRegion {
63    /// Builds open region instructions
64    ///
65    /// Abort(non-retry):
66    /// - Datanode Table is not found.
67    async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
68        let pc = &ctx.persistent_ctx;
69        let table_id = pc.region_id.table_id();
70        let region_number = pc.region_id.region_number();
71        let candidate_id = pc.to_peer.id;
72        let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
73
74        let RegionInfo {
75            region_storage_path,
76            region_options,
77            region_wal_options,
78            engine,
79        } = datanode_table_value.region_info.clone();
80
81        let open_instruction = Instruction::OpenRegion(OpenRegion::new(
82            RegionIdent {
83                datanode_id: candidate_id,
84                table_id,
85                region_number,
86                engine,
87            },
88            &region_storage_path,
89            region_options,
90            region_wal_options,
91            true,
92        ));
93
94        Ok(open_instruction)
95    }
96
97    /// Opens the candidate region.
98    ///
99    /// Abort(non-retry):
100    /// - The Datanode is unreachable(e.g., Candidate pusher is not found).
101    /// - Unexpected instruction reply.
102    /// - Another procedure is opening the candidate region.
103    ///
104    /// Retry:
105    /// - Exceeded deadline of open instruction.
106    /// - Datanode failed to open the candidate region.
107    async fn open_candidate_region(
108        &self,
109        ctx: &mut Context,
110        open_instruction: Instruction,
111    ) -> Result<()> {
112        let pc = &ctx.persistent_ctx;
113        let vc = &mut ctx.volatile_ctx;
114        let region_id = pc.region_id;
115        let candidate = &pc.to_peer;
116
117        // This method might be invoked multiple times.
118        // Only registers the guard if `opening_region_guard` is absent.
119        if vc.opening_region_guard.is_none() {
120            // Registers the opening region.
121            let guard = ctx
122                .opening_region_keeper
123                .register(candidate.id, region_id)
124                .context(error::RegionOpeningRaceSnafu {
125                    peer_id: candidate.id,
126                    region_id,
127                })?;
128            vc.opening_region_guard = Some(guard);
129        }
130
131        let msg = MailboxMessage::json_message(
132            &format!("Open candidate region: {}", region_id),
133            &format!("Metasrv@{}", ctx.server_addr()),
134            &format!("Datanode-{}@{}", candidate.id, candidate.addr),
135            common_time::util::current_time_millis(),
136            &open_instruction,
137        )
138        .with_context(|_| error::SerializeToJsonSnafu {
139            input: open_instruction.to_string(),
140        })?;
141
142        let ch = Channel::Datanode(candidate.id);
143        let now = Instant::now();
144        let receiver = ctx
145            .mailbox
146            .send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT)
147            .await?;
148
149        match receiver.await {
150            Ok(msg) => {
151                let reply = HeartbeatMailbox::json_reply(&msg)?;
152                info!(
153                    "Received open region reply: {:?}, region: {}, elapsed: {:?}",
154                    reply,
155                    region_id,
156                    now.elapsed()
157                );
158                let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else {
159                    return error::UnexpectedInstructionReplySnafu {
160                        mailbox_message: msg.to_string(),
161                        reason: "expect open region reply",
162                    }
163                    .fail();
164                };
165
166                if result {
167                    Ok(())
168                } else {
169                    error::RetryLaterSnafu {
170                        reason: format!(
171                            "Region {region_id} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
172                            candidate,
173                            now.elapsed()
174                        ),
175                    }
176                    .fail()
177                }
178            }
179            Err(error::Error::MailboxTimeout { .. }) => {
180                let reason = format!(
181                    "Mailbox received timeout for open candidate region {region_id} on datanode {:?}, elapsed: {:?}",
182                    candidate,
183                    now.elapsed()
184                );
185                error::RetryLaterSnafu { reason }.fail()
186            }
187            Err(e) => Err(e),
188        }
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use std::assert_matches::assert_matches;
195    use std::collections::HashMap;
196
197    use common_catalog::consts::MITO2_ENGINE;
198    use common_meta::key::table_route::TableRouteValue;
199    use common_meta::key::test_utils::new_test_table_info;
200    use common_meta::peer::Peer;
201    use common_meta::rpc::router::{Region, RegionRoute};
202    use common_meta::DatanodeId;
203    use store_api::storage::RegionId;
204
205    use super::*;
206    use crate::error::Error;
207    use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
208    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
209    use crate::procedure::test_util::{
210        new_close_region_reply, new_open_region_reply, send_mock_reply,
211    };
212
213    fn new_persistent_context() -> PersistentContext {
214        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
215    }
216
217    fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
218        Instruction::OpenRegion(OpenRegion {
219            region_ident: RegionIdent {
220                datanode_id,
221                table_id: region_id.table_id(),
222                region_number: region_id.region_number(),
223                engine: MITO2_ENGINE.to_string(),
224            },
225            region_storage_path: "/bar/foo/region/".to_string(),
226            region_options: Default::default(),
227            region_wal_options: Default::default(),
228            skip_wal_replay: true,
229        })
230    }
231
232    #[tokio::test]
233    async fn test_datanode_table_is_not_found_error() {
234        let state = OpenCandidateRegion;
235        let persistent_context = new_persistent_context();
236        let env = TestingEnv::new();
237        let mut ctx = env.context_factory().new_context(persistent_context);
238
239        let err = state
240            .build_open_region_instruction(&mut ctx)
241            .await
242            .unwrap_err();
243
244        assert_matches!(err, Error::DatanodeTableNotFound { .. });
245        assert!(!err.is_retryable());
246    }
247
248    #[tokio::test]
249    async fn test_datanode_is_unreachable() {
250        let state = OpenCandidateRegion;
251        // from_peer: 1
252        // to_peer: 2
253        let persistent_context = new_persistent_context();
254        let region_id = persistent_context.region_id;
255        let to_peer_id = persistent_context.to_peer.id;
256        let env = TestingEnv::new();
257        let mut ctx = env.context_factory().new_context(persistent_context);
258
259        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
260        let err = state
261            .open_candidate_region(&mut ctx, open_instruction)
262            .await
263            .unwrap_err();
264
265        assert_matches!(err, Error::PusherNotFound { .. });
266        assert!(!err.is_retryable());
267    }
268
269    #[tokio::test]
270    async fn test_candidate_region_opening_error() {
271        let state = OpenCandidateRegion;
272        // from_peer: 1
273        // to_peer: 2
274        let persistent_context = new_persistent_context();
275        let region_id = persistent_context.region_id;
276        let to_peer_id = persistent_context.to_peer.id;
277
278        let env = TestingEnv::new();
279        let mut ctx = env.context_factory().new_context(persistent_context);
280        let opening_region_keeper = env.opening_region_keeper();
281        let _guard = opening_region_keeper
282            .register(to_peer_id, region_id)
283            .unwrap();
284
285        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
286        let err = state
287            .open_candidate_region(&mut ctx, open_instruction)
288            .await
289            .unwrap_err();
290
291        assert_matches!(err, Error::RegionOpeningRace { .. });
292        assert!(!err.is_retryable());
293    }
294
295    #[tokio::test]
296    async fn test_unexpected_instruction_reply() {
297        let state = OpenCandidateRegion;
298        // from_peer: 1
299        // to_peer: 2
300        let persistent_context = new_persistent_context();
301        let region_id = persistent_context.region_id;
302        let to_peer_id = persistent_context.to_peer.id;
303
304        let mut env = TestingEnv::new();
305        let mut ctx = env.context_factory().new_context(persistent_context);
306        let mailbox_ctx = env.mailbox_context();
307        let mailbox = mailbox_ctx.mailbox().clone();
308
309        let (tx, rx) = tokio::sync::mpsc::channel(1);
310
311        mailbox_ctx
312            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
313            .await;
314
315        // Sends an incorrect reply.
316        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
317
318        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
319        let err = state
320            .open_candidate_region(&mut ctx, open_instruction)
321            .await
322            .unwrap_err();
323
324        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
325        assert!(!err.is_retryable());
326    }
327
328    #[tokio::test]
329    async fn test_instruction_exceeded_deadline() {
330        let state = OpenCandidateRegion;
331        // from_peer: 1
332        // to_peer: 2
333        let persistent_context = new_persistent_context();
334        let region_id = persistent_context.region_id;
335        let to_peer_id = persistent_context.to_peer.id;
336
337        let mut env = TestingEnv::new();
338        let mut ctx = env.context_factory().new_context(persistent_context);
339        let mailbox_ctx = env.mailbox_context();
340        let mailbox = mailbox_ctx.mailbox().clone();
341
342        let (tx, rx) = tokio::sync::mpsc::channel(1);
343
344        mailbox_ctx
345            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
346            .await;
347
348        // Sends an timeout error.
349        send_mock_reply(mailbox, rx, |id| {
350            Err(error::MailboxTimeoutSnafu { id }.build())
351        });
352
353        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
354        let err = state
355            .open_candidate_region(&mut ctx, open_instruction)
356            .await
357            .unwrap_err();
358
359        assert_matches!(err, Error::RetryLater { .. });
360        assert!(err.is_retryable());
361    }
362
363    #[tokio::test]
364    async fn test_open_candidate_region_failed() {
365        let state = OpenCandidateRegion;
366        // from_peer: 1
367        // to_peer: 2
368        let persistent_context = new_persistent_context();
369        let region_id = persistent_context.region_id;
370        let to_peer_id = persistent_context.to_peer.id;
371        let mut env = TestingEnv::new();
372
373        let mut ctx = env.context_factory().new_context(persistent_context);
374        let mailbox_ctx = env.mailbox_context();
375        let mailbox = mailbox_ctx.mailbox().clone();
376
377        let (tx, rx) = tokio::sync::mpsc::channel(1);
378
379        mailbox_ctx
380            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
381            .await;
382
383        send_mock_reply(mailbox, rx, |id| {
384            Ok(new_open_region_reply(
385                id,
386                false,
387                Some("test mocked".to_string()),
388            ))
389        });
390
391        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
392        let err = state
393            .open_candidate_region(&mut ctx, open_instruction)
394            .await
395            .unwrap_err();
396
397        assert_matches!(err, Error::RetryLater { .. });
398        assert!(err.is_retryable());
399        assert!(format!("{err:?}").contains("test mocked"));
400    }
401
402    #[tokio::test]
403    async fn test_next_flush_leader_region_state() {
404        let mut state = Box::new(OpenCandidateRegion);
405        // from_peer: 1
406        // to_peer: 2
407        let persistent_context = new_persistent_context();
408        let from_peer_id = persistent_context.from_peer.id;
409        let region_id = persistent_context.region_id;
410        let to_peer_id = persistent_context.to_peer.id;
411        let mut env = TestingEnv::new();
412
413        // Prepares table
414        let table_info = new_test_table_info(1024, vec![1]).into();
415        let region_routes = vec![RegionRoute {
416            region: Region::new_test(persistent_context.region_id),
417            leader_peer: Some(Peer::empty(from_peer_id)),
418            ..Default::default()
419        }];
420
421        env.table_metadata_manager()
422            .create_table_metadata(
423                table_info,
424                TableRouteValue::physical(region_routes),
425                HashMap::default(),
426            )
427            .await
428            .unwrap();
429
430        let mut ctx = env.context_factory().new_context(persistent_context);
431        let mailbox_ctx = env.mailbox_context();
432        let mailbox = mailbox_ctx.mailbox().clone();
433
434        let (tx, rx) = tokio::sync::mpsc::channel(1);
435
436        mailbox_ctx
437            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
438            .await;
439
440        send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
441        let procedure_ctx = new_procedure_context();
442        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
443        let vc = ctx.volatile_ctx;
444        assert_eq!(
445            vc.opening_region_guard.unwrap().info(),
446            (to_peer_id, region_id)
447        );
448
449        let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
450        assert_matches!(flush_leader_region, PreFlushRegion);
451    }
452}