meta_srv/procedure/region_migration/
open_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::ops::Div;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::RegionIdent;
20use common_meta::distributed_time_constants::default_distributed_time_constants;
21use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
22use common_meta::key::datanode_table::RegionInfo;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use serde::{Deserialize, Serialize};
26use snafu::{OptionExt, ResultExt};
27use tokio::time::Instant;
28
29use crate::error::{self, Result};
30use crate::handler::HeartbeatMailbox;
31use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
32use crate::procedure::region_migration::{Context, State};
33use crate::service::mailbox::Channel;
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct OpenCandidateRegion;
37
38#[async_trait::async_trait]
39#[typetag::serde]
40impl State for OpenCandidateRegion {
41    async fn next(
42        &mut self,
43        ctx: &mut Context,
44        _procedure_ctx: &ProcedureContext,
45    ) -> Result<(Box<dyn State>, Status)> {
46        let instruction = self.build_open_region_instruction(ctx).await?;
47        let now = Instant::now();
48        self.open_candidate_region(ctx, instruction).await?;
49        ctx.update_open_candidate_region_elapsed(now);
50
51        Ok((Box::new(PreFlushRegion), Status::executing(false)))
52    }
53
54    fn as_any(&self) -> &dyn Any {
55        self
56    }
57}
58
59impl OpenCandidateRegion {
60    /// Builds open region instructions
61    ///
62    /// Abort(non-retry):
63    /// - Datanode Table is not found.
64    async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
65        let region_ids = ctx.persistent_ctx.region_ids.clone();
66        let from_peer_id = ctx.persistent_ctx.from_peer.id;
67        let to_peer_id = ctx.persistent_ctx.to_peer.id;
68        let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
69        let mut open_regions = Vec::with_capacity(region_ids.len());
70
71        for region_id in region_ids {
72            let table_id = region_id.table_id();
73            let region_number = region_id.region_number();
74            let datanode_table_value = datanode_table_values.get(&table_id).context(
75                error::DatanodeTableNotFoundSnafu {
76                    table_id,
77                    datanode_id: from_peer_id,
78                },
79            )?;
80            let RegionInfo {
81                region_storage_path,
82                region_options,
83                region_wal_options,
84                engine,
85            } = datanode_table_value.region_info.clone();
86
87            open_regions.push(OpenRegion::new(
88                RegionIdent {
89                    datanode_id: to_peer_id,
90                    table_id,
91                    region_number,
92                    engine,
93                },
94                &region_storage_path,
95                region_options,
96                region_wal_options,
97                true,
98            ));
99        }
100
101        Ok(Instruction::OpenRegions(open_regions))
102    }
103
104    /// Opens the candidate region.
105    ///
106    /// Abort(non-retry):
107    /// - The Datanode is unreachable(e.g., Candidate pusher is not found).
108    /// - Unexpected instruction reply.
109    /// - Another procedure is opening the candidate region.
110    ///
111    /// Retry:
112    /// - Exceeded deadline of open instruction.
113    /// - Datanode failed to open the candidate region.
114    async fn open_candidate_region(
115        &self,
116        ctx: &mut Context,
117        open_instruction: Instruction,
118    ) -> Result<()> {
119        let pc = &ctx.persistent_ctx;
120        let vc = &mut ctx.volatile_ctx;
121        let region_ids = &pc.region_ids;
122        let candidate = &pc.to_peer;
123
124        // This method might be invoked multiple times.
125        // Only registers the guard if `opening_region_guard` is absent.
126        if vc.opening_region_guards.is_empty() {
127            for region_id in region_ids {
128                // Registers the opening region.
129                let guard = ctx
130                    .opening_region_keeper
131                    .register(candidate.id, *region_id)
132                    .context(error::RegionOpeningRaceSnafu {
133                        peer_id: candidate.id,
134                        region_id: *region_id,
135                    })?;
136                vc.opening_region_guards.push(guard);
137            }
138        }
139
140        let msg = MailboxMessage::json_message(
141            &format!("Open candidate regions: {:?}", region_ids),
142            &format!("Metasrv@{}", ctx.server_addr()),
143            &format!("Datanode-{}@{}", candidate.id, candidate.addr),
144            common_time::util::current_time_millis(),
145            &open_instruction,
146        )
147        .with_context(|_| error::SerializeToJsonSnafu {
148            input: open_instruction.to_string(),
149        })?;
150
151        let operation_timeout =
152            ctx.next_operation_timeout()
153                .context(error::ExceededDeadlineSnafu {
154                    operation: "Open candidate region",
155                })?;
156        let operation_timeout = operation_timeout
157            .div(2)
158            .max(default_distributed_time_constants().region_lease);
159        let ch = Channel::Datanode(candidate.id);
160        let now = Instant::now();
161        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
162
163        match receiver.await {
164            Ok(msg) => {
165                let reply = HeartbeatMailbox::json_reply(&msg)?;
166                info!(
167                    "Received open region reply: {:?}, region: {:?}, elapsed: {:?}",
168                    reply,
169                    region_ids,
170                    now.elapsed()
171                );
172                let InstructionReply::OpenRegions(SimpleReply { result, error }) = reply else {
173                    return error::UnexpectedInstructionReplySnafu {
174                        mailbox_message: msg.to_string(),
175                        reason: "expect open region reply",
176                    }
177                    .fail();
178                };
179
180                if result {
181                    Ok(())
182                } else {
183                    error::RetryLaterSnafu {
184                        reason: format!(
185                            "Region {region_ids:?} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
186                            candidate,
187                            now.elapsed()
188                        ),
189                    }
190                    .fail()
191                }
192            }
193            Err(error::Error::MailboxTimeout { .. }) => {
194                let reason = format!(
195                    "Mailbox received timeout for open candidate region {region_ids:?} on datanode {:?}, elapsed: {:?}",
196                    candidate,
197                    now.elapsed()
198                );
199                error::RetryLaterSnafu { reason }.fail()
200            }
201            Err(e) => Err(e),
202        }
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use std::assert_matches::assert_matches;
209    use std::collections::HashMap;
210
211    use common_catalog::consts::MITO2_ENGINE;
212    use common_meta::DatanodeId;
213    use common_meta::key::table_route::TableRouteValue;
214    use common_meta::key::test_utils::new_test_table_info;
215    use common_meta::peer::Peer;
216    use common_meta::rpc::router::{Region, RegionRoute};
217    use store_api::storage::RegionId;
218
219    use super::*;
220    use crate::error::Error;
221    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
222    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
223    use crate::procedure::test_util::{
224        new_close_region_reply, new_open_region_reply, send_mock_reply,
225    };
226
227    fn new_persistent_context() -> PersistentContext {
228        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
229    }
230
231    fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
232        Instruction::OpenRegions(vec![OpenRegion {
233            region_ident: RegionIdent {
234                datanode_id,
235                table_id: region_id.table_id(),
236                region_number: region_id.region_number(),
237                engine: MITO2_ENGINE.to_string(),
238            },
239            region_storage_path: "/bar/foo/region/".to_string(),
240            region_options: Default::default(),
241            region_wal_options: Default::default(),
242            skip_wal_replay: true,
243        }])
244    }
245
246    #[tokio::test]
247    async fn test_datanode_table_is_not_found_error() {
248        let state = OpenCandidateRegion;
249        let persistent_context = new_persistent_context();
250        let env = TestingEnv::new();
251        let mut ctx = env.context_factory().new_context(persistent_context);
252
253        let err = state
254            .build_open_region_instruction(&mut ctx)
255            .await
256            .unwrap_err();
257
258        assert_matches!(err, Error::DatanodeTableNotFound { .. });
259        assert!(!err.is_retryable());
260    }
261
262    #[tokio::test]
263    async fn test_datanode_is_unreachable() {
264        let state = OpenCandidateRegion;
265        // from_peer: 1
266        // to_peer: 2
267        let persistent_context = new_persistent_context();
268        let region_id = persistent_context.region_ids[0];
269        let to_peer_id = persistent_context.to_peer.id;
270        let env = TestingEnv::new();
271        let mut ctx = env.context_factory().new_context(persistent_context);
272
273        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
274        let err = state
275            .open_candidate_region(&mut ctx, open_instruction)
276            .await
277            .unwrap_err();
278
279        assert_matches!(err, Error::PusherNotFound { .. });
280        assert!(!err.is_retryable());
281    }
282
283    #[tokio::test]
284    async fn test_candidate_region_opening_error() {
285        let state = OpenCandidateRegion;
286        // from_peer: 1
287        // to_peer: 2
288        let persistent_context = new_persistent_context();
289        let region_id = persistent_context.region_ids[0];
290        let to_peer_id = persistent_context.to_peer.id;
291
292        let env = TestingEnv::new();
293        let mut ctx = env.context_factory().new_context(persistent_context);
294        let opening_region_keeper = env.opening_region_keeper();
295        let _guard = opening_region_keeper
296            .register(to_peer_id, region_id)
297            .unwrap();
298
299        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
300        let err = state
301            .open_candidate_region(&mut ctx, open_instruction)
302            .await
303            .unwrap_err();
304
305        assert_matches!(err, Error::RegionOpeningRace { .. });
306        assert!(!err.is_retryable());
307    }
308
309    #[tokio::test]
310    async fn test_unexpected_instruction_reply() {
311        let state = OpenCandidateRegion;
312        // from_peer: 1
313        // to_peer: 2
314        let persistent_context = new_persistent_context();
315        let region_id = persistent_context.region_ids[0];
316        let to_peer_id = persistent_context.to_peer.id;
317
318        let mut env = TestingEnv::new();
319        let mut ctx = env.context_factory().new_context(persistent_context);
320        let mailbox_ctx = env.mailbox_context();
321        let mailbox = mailbox_ctx.mailbox().clone();
322
323        let (tx, rx) = tokio::sync::mpsc::channel(1);
324
325        mailbox_ctx
326            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
327            .await;
328
329        // Sends an incorrect reply.
330        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
331
332        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
333        let err = state
334            .open_candidate_region(&mut ctx, open_instruction)
335            .await
336            .unwrap_err();
337
338        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
339        assert!(!err.is_retryable());
340    }
341
342    #[tokio::test]
343    async fn test_instruction_exceeded_deadline() {
344        let state = OpenCandidateRegion;
345        // from_peer: 1
346        // to_peer: 2
347        let persistent_context = new_persistent_context();
348        let region_id = persistent_context.region_ids[0];
349        let to_peer_id = persistent_context.to_peer.id;
350
351        let mut env = TestingEnv::new();
352        let mut ctx = env.context_factory().new_context(persistent_context);
353        let mailbox_ctx = env.mailbox_context();
354        let mailbox = mailbox_ctx.mailbox().clone();
355
356        let (tx, rx) = tokio::sync::mpsc::channel(1);
357
358        mailbox_ctx
359            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
360            .await;
361
362        // Sends an timeout error.
363        send_mock_reply(mailbox, rx, |id| {
364            Err(error::MailboxTimeoutSnafu { id }.build())
365        });
366
367        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
368        let err = state
369            .open_candidate_region(&mut ctx, open_instruction)
370            .await
371            .unwrap_err();
372
373        assert_matches!(err, Error::RetryLater { .. });
374        assert!(err.is_retryable());
375    }
376
377    #[tokio::test]
378    async fn test_open_candidate_region_failed() {
379        let state = OpenCandidateRegion;
380        // from_peer: 1
381        // to_peer: 2
382        let persistent_context = new_persistent_context();
383        let region_id = persistent_context.region_ids[0];
384        let to_peer_id = persistent_context.to_peer.id;
385        let mut env = TestingEnv::new();
386
387        let mut ctx = env.context_factory().new_context(persistent_context);
388        let mailbox_ctx = env.mailbox_context();
389        let mailbox = mailbox_ctx.mailbox().clone();
390
391        let (tx, rx) = tokio::sync::mpsc::channel(1);
392
393        mailbox_ctx
394            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
395            .await;
396
397        send_mock_reply(mailbox, rx, |id| {
398            Ok(new_open_region_reply(
399                id,
400                false,
401                Some("test mocked".to_string()),
402            ))
403        });
404
405        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
406        let err = state
407            .open_candidate_region(&mut ctx, open_instruction)
408            .await
409            .unwrap_err();
410
411        assert_matches!(err, Error::RetryLater { .. });
412        assert!(err.is_retryable());
413        assert!(format!("{err:?}").contains("test mocked"));
414    }
415
416    #[tokio::test]
417    async fn test_next_flush_leader_region_state() {
418        let mut state = Box::new(OpenCandidateRegion);
419        // from_peer: 1
420        // to_peer: 2
421        let persistent_context = new_persistent_context();
422        let from_peer_id = persistent_context.from_peer.id;
423        let region_id = persistent_context.region_ids[0];
424        let to_peer_id = persistent_context.to_peer.id;
425        let mut env = TestingEnv::new();
426
427        // Prepares table
428        let table_info = new_test_table_info(1024, vec![1]).into();
429        let region_routes = vec![RegionRoute {
430            region: Region::new_test(region_id),
431            leader_peer: Some(Peer::empty(from_peer_id)),
432            ..Default::default()
433        }];
434
435        env.table_metadata_manager()
436            .create_table_metadata(
437                table_info,
438                TableRouteValue::physical(region_routes),
439                HashMap::default(),
440            )
441            .await
442            .unwrap();
443
444        let mut ctx = env.context_factory().new_context(persistent_context);
445        let mailbox_ctx = env.mailbox_context();
446        let mailbox = mailbox_ctx.mailbox().clone();
447
448        let (tx, rx) = tokio::sync::mpsc::channel(1);
449
450        mailbox_ctx
451            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
452            .await;
453
454        send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
455        let procedure_ctx = new_procedure_context();
456        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
457        let vc = ctx.volatile_ctx;
458        assert_eq!(vc.opening_region_guards[0].info(), (to_peer_id, region_id));
459
460        let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
461        assert_matches!(flush_leader_region, PreFlushRegion);
462    }
463}