meta_srv/procedure/region_migration/
open_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::ops::Div;
17use std::time::Duration;
18
19use api::v1::meta::MailboxMessage;
20use common_meta::RegionIdent;
21use common_meta::distributed_time_constants::REGION_LEASE_SECS;
22use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
23use common_meta::key::datanode_table::RegionInfo;
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::info;
26use serde::{Deserialize, Serialize};
27use snafu::{OptionExt, ResultExt};
28use tokio::time::Instant;
29
30use crate::error::{self, Result};
31use crate::handler::HeartbeatMailbox;
32use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
33use crate::procedure::region_migration::{Context, State};
34use crate::service::mailbox::Channel;
35
36/// Uses lease time of a region as the timeout of opening a candidate region.
37const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct OpenCandidateRegion;
41
42#[async_trait::async_trait]
43#[typetag::serde]
44impl State for OpenCandidateRegion {
45    async fn next(
46        &mut self,
47        ctx: &mut Context,
48        _procedure_ctx: &ProcedureContext,
49    ) -> Result<(Box<dyn State>, Status)> {
50        let instruction = self.build_open_region_instruction(ctx).await?;
51        let now = Instant::now();
52        self.open_candidate_region(ctx, instruction).await?;
53        ctx.update_open_candidate_region_elapsed(now);
54
55        Ok((Box::new(PreFlushRegion), Status::executing(false)))
56    }
57
58    fn as_any(&self) -> &dyn Any {
59        self
60    }
61}
62
63impl OpenCandidateRegion {
64    /// Builds open region instructions
65    ///
66    /// Abort(non-retry):
67    /// - Datanode Table is not found.
68    async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
69        let region_ids = ctx.persistent_ctx.region_ids.clone();
70        let from_peer_id = ctx.persistent_ctx.from_peer.id;
71        let to_peer_id = ctx.persistent_ctx.to_peer.id;
72        let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
73        let mut open_regions = Vec::with_capacity(region_ids.len());
74
75        for region_id in region_ids {
76            let table_id = region_id.table_id();
77            let region_number = region_id.region_number();
78            let datanode_table_value = datanode_table_values.get(&table_id).context(
79                error::DatanodeTableNotFoundSnafu {
80                    table_id,
81                    datanode_id: from_peer_id,
82                },
83            )?;
84            let RegionInfo {
85                region_storage_path,
86                region_options,
87                region_wal_options,
88                engine,
89            } = datanode_table_value.region_info.clone();
90
91            open_regions.push(OpenRegion::new(
92                RegionIdent {
93                    datanode_id: to_peer_id,
94                    table_id,
95                    region_number,
96                    engine,
97                },
98                &region_storage_path,
99                region_options,
100                region_wal_options,
101                true,
102            ));
103        }
104
105        Ok(Instruction::OpenRegions(open_regions))
106    }
107
108    /// Opens the candidate region.
109    ///
110    /// Abort(non-retry):
111    /// - The Datanode is unreachable(e.g., Candidate pusher is not found).
112    /// - Unexpected instruction reply.
113    /// - Another procedure is opening the candidate region.
114    ///
115    /// Retry:
116    /// - Exceeded deadline of open instruction.
117    /// - Datanode failed to open the candidate region.
118    async fn open_candidate_region(
119        &self,
120        ctx: &mut Context,
121        open_instruction: Instruction,
122    ) -> Result<()> {
123        let pc = &ctx.persistent_ctx;
124        let vc = &mut ctx.volatile_ctx;
125        let region_ids = &pc.region_ids;
126        let candidate = &pc.to_peer;
127
128        // This method might be invoked multiple times.
129        // Only registers the guard if `opening_region_guard` is absent.
130        if vc.opening_region_guards.is_empty() {
131            for region_id in region_ids {
132                // Registers the opening region.
133                let guard = ctx
134                    .opening_region_keeper
135                    .register(candidate.id, *region_id)
136                    .context(error::RegionOpeningRaceSnafu {
137                        peer_id: candidate.id,
138                        region_id: *region_id,
139                    })?;
140                vc.opening_region_guards.push(guard);
141            }
142        }
143
144        let msg = MailboxMessage::json_message(
145            &format!("Open candidate regions: {:?}", region_ids),
146            &format!("Metasrv@{}", ctx.server_addr()),
147            &format!("Datanode-{}@{}", candidate.id, candidate.addr),
148            common_time::util::current_time_millis(),
149            &open_instruction,
150        )
151        .with_context(|_| error::SerializeToJsonSnafu {
152            input: open_instruction.to_string(),
153        })?;
154
155        let operation_timeout =
156            ctx.next_operation_timeout()
157                .context(error::ExceededDeadlineSnafu {
158                    operation: "Open candidate region",
159                })?;
160        let operation_timeout = operation_timeout.div(2).max(OPEN_CANDIDATE_REGION_TIMEOUT);
161        let ch = Channel::Datanode(candidate.id);
162        let now = Instant::now();
163        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
164
165        match receiver.await {
166            Ok(msg) => {
167                let reply = HeartbeatMailbox::json_reply(&msg)?;
168                info!(
169                    "Received open region reply: {:?}, region: {:?}, elapsed: {:?}",
170                    reply,
171                    region_ids,
172                    now.elapsed()
173                );
174                let InstructionReply::OpenRegions(SimpleReply { result, error }) = reply else {
175                    return error::UnexpectedInstructionReplySnafu {
176                        mailbox_message: msg.to_string(),
177                        reason: "expect open region reply",
178                    }
179                    .fail();
180                };
181
182                if result {
183                    Ok(())
184                } else {
185                    error::RetryLaterSnafu {
186                        reason: format!(
187                            "Region {region_ids:?} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
188                            candidate,
189                            now.elapsed()
190                        ),
191                    }
192                    .fail()
193                }
194            }
195            Err(error::Error::MailboxTimeout { .. }) => {
196                let reason = format!(
197                    "Mailbox received timeout for open candidate region {region_ids:?} on datanode {:?}, elapsed: {:?}",
198                    candidate,
199                    now.elapsed()
200                );
201                error::RetryLaterSnafu { reason }.fail()
202            }
203            Err(e) => Err(e),
204        }
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use std::assert_matches::assert_matches;
211    use std::collections::HashMap;
212
213    use common_catalog::consts::MITO2_ENGINE;
214    use common_meta::DatanodeId;
215    use common_meta::key::table_route::TableRouteValue;
216    use common_meta::key::test_utils::new_test_table_info;
217    use common_meta::peer::Peer;
218    use common_meta::rpc::router::{Region, RegionRoute};
219    use store_api::storage::RegionId;
220
221    use super::*;
222    use crate::error::Error;
223    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
224    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
225    use crate::procedure::test_util::{
226        new_close_region_reply, new_open_region_reply, send_mock_reply,
227    };
228
229    fn new_persistent_context() -> PersistentContext {
230        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
231    }
232
233    fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
234        Instruction::OpenRegions(vec![OpenRegion {
235            region_ident: RegionIdent {
236                datanode_id,
237                table_id: region_id.table_id(),
238                region_number: region_id.region_number(),
239                engine: MITO2_ENGINE.to_string(),
240            },
241            region_storage_path: "/bar/foo/region/".to_string(),
242            region_options: Default::default(),
243            region_wal_options: Default::default(),
244            skip_wal_replay: true,
245        }])
246    }
247
248    #[tokio::test]
249    async fn test_datanode_table_is_not_found_error() {
250        let state = OpenCandidateRegion;
251        let persistent_context = new_persistent_context();
252        let env = TestingEnv::new();
253        let mut ctx = env.context_factory().new_context(persistent_context);
254
255        let err = state
256            .build_open_region_instruction(&mut ctx)
257            .await
258            .unwrap_err();
259
260        assert_matches!(err, Error::DatanodeTableNotFound { .. });
261        assert!(!err.is_retryable());
262    }
263
264    #[tokio::test]
265    async fn test_datanode_is_unreachable() {
266        let state = OpenCandidateRegion;
267        // from_peer: 1
268        // to_peer: 2
269        let persistent_context = new_persistent_context();
270        let region_id = persistent_context.region_ids[0];
271        let to_peer_id = persistent_context.to_peer.id;
272        let env = TestingEnv::new();
273        let mut ctx = env.context_factory().new_context(persistent_context);
274
275        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
276        let err = state
277            .open_candidate_region(&mut ctx, open_instruction)
278            .await
279            .unwrap_err();
280
281        assert_matches!(err, Error::PusherNotFound { .. });
282        assert!(!err.is_retryable());
283    }
284
285    #[tokio::test]
286    async fn test_candidate_region_opening_error() {
287        let state = OpenCandidateRegion;
288        // from_peer: 1
289        // to_peer: 2
290        let persistent_context = new_persistent_context();
291        let region_id = persistent_context.region_ids[0];
292        let to_peer_id = persistent_context.to_peer.id;
293
294        let env = TestingEnv::new();
295        let mut ctx = env.context_factory().new_context(persistent_context);
296        let opening_region_keeper = env.opening_region_keeper();
297        let _guard = opening_region_keeper
298            .register(to_peer_id, region_id)
299            .unwrap();
300
301        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
302        let err = state
303            .open_candidate_region(&mut ctx, open_instruction)
304            .await
305            .unwrap_err();
306
307        assert_matches!(err, Error::RegionOpeningRace { .. });
308        assert!(!err.is_retryable());
309    }
310
311    #[tokio::test]
312    async fn test_unexpected_instruction_reply() {
313        let state = OpenCandidateRegion;
314        // from_peer: 1
315        // to_peer: 2
316        let persistent_context = new_persistent_context();
317        let region_id = persistent_context.region_ids[0];
318        let to_peer_id = persistent_context.to_peer.id;
319
320        let mut env = TestingEnv::new();
321        let mut ctx = env.context_factory().new_context(persistent_context);
322        let mailbox_ctx = env.mailbox_context();
323        let mailbox = mailbox_ctx.mailbox().clone();
324
325        let (tx, rx) = tokio::sync::mpsc::channel(1);
326
327        mailbox_ctx
328            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
329            .await;
330
331        // Sends an incorrect reply.
332        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
333
334        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
335        let err = state
336            .open_candidate_region(&mut ctx, open_instruction)
337            .await
338            .unwrap_err();
339
340        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
341        assert!(!err.is_retryable());
342    }
343
344    #[tokio::test]
345    async fn test_instruction_exceeded_deadline() {
346        let state = OpenCandidateRegion;
347        // from_peer: 1
348        // to_peer: 2
349        let persistent_context = new_persistent_context();
350        let region_id = persistent_context.region_ids[0];
351        let to_peer_id = persistent_context.to_peer.id;
352
353        let mut env = TestingEnv::new();
354        let mut ctx = env.context_factory().new_context(persistent_context);
355        let mailbox_ctx = env.mailbox_context();
356        let mailbox = mailbox_ctx.mailbox().clone();
357
358        let (tx, rx) = tokio::sync::mpsc::channel(1);
359
360        mailbox_ctx
361            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
362            .await;
363
364        // Sends an timeout error.
365        send_mock_reply(mailbox, rx, |id| {
366            Err(error::MailboxTimeoutSnafu { id }.build())
367        });
368
369        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
370        let err = state
371            .open_candidate_region(&mut ctx, open_instruction)
372            .await
373            .unwrap_err();
374
375        assert_matches!(err, Error::RetryLater { .. });
376        assert!(err.is_retryable());
377    }
378
379    #[tokio::test]
380    async fn test_open_candidate_region_failed() {
381        let state = OpenCandidateRegion;
382        // from_peer: 1
383        // to_peer: 2
384        let persistent_context = new_persistent_context();
385        let region_id = persistent_context.region_ids[0];
386        let to_peer_id = persistent_context.to_peer.id;
387        let mut env = TestingEnv::new();
388
389        let mut ctx = env.context_factory().new_context(persistent_context);
390        let mailbox_ctx = env.mailbox_context();
391        let mailbox = mailbox_ctx.mailbox().clone();
392
393        let (tx, rx) = tokio::sync::mpsc::channel(1);
394
395        mailbox_ctx
396            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
397            .await;
398
399        send_mock_reply(mailbox, rx, |id| {
400            Ok(new_open_region_reply(
401                id,
402                false,
403                Some("test mocked".to_string()),
404            ))
405        });
406
407        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
408        let err = state
409            .open_candidate_region(&mut ctx, open_instruction)
410            .await
411            .unwrap_err();
412
413        assert_matches!(err, Error::RetryLater { .. });
414        assert!(err.is_retryable());
415        assert!(format!("{err:?}").contains("test mocked"));
416    }
417
418    #[tokio::test]
419    async fn test_next_flush_leader_region_state() {
420        let mut state = Box::new(OpenCandidateRegion);
421        // from_peer: 1
422        // to_peer: 2
423        let persistent_context = new_persistent_context();
424        let from_peer_id = persistent_context.from_peer.id;
425        let region_id = persistent_context.region_ids[0];
426        let to_peer_id = persistent_context.to_peer.id;
427        let mut env = TestingEnv::new();
428
429        // Prepares table
430        let table_info = new_test_table_info(1024, vec![1]).into();
431        let region_routes = vec![RegionRoute {
432            region: Region::new_test(region_id),
433            leader_peer: Some(Peer::empty(from_peer_id)),
434            ..Default::default()
435        }];
436
437        env.table_metadata_manager()
438            .create_table_metadata(
439                table_info,
440                TableRouteValue::physical(region_routes),
441                HashMap::default(),
442            )
443            .await
444            .unwrap();
445
446        let mut ctx = env.context_factory().new_context(persistent_context);
447        let mailbox_ctx = env.mailbox_context();
448        let mailbox = mailbox_ctx.mailbox().clone();
449
450        let (tx, rx) = tokio::sync::mpsc::channel(1);
451
452        mailbox_ctx
453            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
454            .await;
455
456        send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
457        let procedure_ctx = new_procedure_context();
458        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
459        let vc = ctx.volatile_ctx;
460        assert_eq!(vc.opening_region_guards[0].info(), (to_peer_id, region_id));
461
462        let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
463        assert_matches!(flush_leader_region, PreFlushRegion);
464    }
465}