Skip to main content

meta_srv/procedure/region_migration/
open_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::ops::Div;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::RegionIdent;
20use common_meta::distributed_time_constants::default_distributed_time_constants;
21use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
22use common_meta::key::datanode_table::RegionInfo;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use common_telemetry::tracing_context::TracingContext;
26use serde::{Deserialize, Serialize};
27use snafu::{OptionExt, ResultExt};
28use store_api::region_engine::RegionRole;
29use tokio::time::Instant;
30
31use crate::error::{self, Result};
32use crate::handler::HeartbeatMailbox;
33use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
34use crate::procedure::region_migration::{Context, State};
35use crate::service::mailbox::Channel;
36
37#[derive(Debug, Serialize, Deserialize)]
38pub struct OpenCandidateRegion;
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for OpenCandidateRegion {
43    async fn next(
44        &mut self,
45        ctx: &mut Context,
46        _procedure_ctx: &ProcedureContext,
47    ) -> Result<(Box<dyn State>, Status)> {
48        let instruction = self.build_open_region_instruction(ctx).await?;
49        let now = Instant::now();
50        self.open_candidate_region(ctx, instruction).await?;
51        ctx.update_open_candidate_region_elapsed(now);
52
53        Ok((Box::new(PreFlushRegion), Status::executing(false)))
54    }
55
56    fn as_any(&self) -> &dyn Any {
57        self
58    }
59}
60
61impl OpenCandidateRegion {
62    /// Builds open region instructions
63    ///
64    /// Abort(non-retry):
65    /// - Datanode Table is not found.
66    async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
67        let region_ids = ctx.persistent_ctx.region_ids.clone();
68        let from_peer_id = ctx.persistent_ctx.from_peer.id;
69        let to_peer_id = ctx.persistent_ctx.to_peer.id;
70        let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
71        let mut open_regions = Vec::with_capacity(region_ids.len());
72
73        for region_id in region_ids {
74            let table_id = region_id.table_id();
75            let region_number = region_id.region_number();
76            let datanode_table_value = datanode_table_values.get(&table_id).context(
77                error::DatanodeTableNotFoundSnafu {
78                    table_id,
79                    datanode_id: from_peer_id,
80                },
81            )?;
82            let RegionInfo {
83                region_storage_path,
84                region_options,
85                region_wal_options,
86                engine,
87            } = datanode_table_value.region_info.clone();
88
89            open_regions.push(OpenRegion::new(
90                RegionIdent {
91                    datanode_id: to_peer_id,
92                    table_id,
93                    region_number,
94                    engine,
95                },
96                &region_storage_path,
97                region_options,
98                region_wal_options,
99                true,
100            ));
101        }
102
103        Ok(Instruction::OpenRegions(open_regions))
104    }
105
106    /// Opens the candidate region.
107    ///
108    /// Abort(non-retry):
109    /// - The Datanode is unreachable(e.g., Candidate pusher is not found).
110    /// - Unexpected instruction reply.
111    /// - Another procedure is opening the candidate region.
112    ///
113    /// Retry:
114    /// - Exceeded deadline of open instruction.
115    /// - Datanode failed to open the candidate region.
116    async fn open_candidate_region(
117        &self,
118        ctx: &mut Context,
119        open_instruction: Instruction,
120    ) -> Result<()> {
121        let pc = &ctx.persistent_ctx;
122        let vc = &mut ctx.volatile_ctx;
123        let region_ids = &pc.region_ids;
124        let candidate = &pc.to_peer;
125
126        // This method might be invoked multiple times.
127        // Only registers the guard if `opening_region_guard` is absent.
128        if vc.opening_region_guards.is_empty() {
129            for region_id in region_ids {
130                // Registers the opening region.
131                let guard = ctx
132                    .opening_region_keeper
133                    .register_with_role(candidate.id, *region_id, RegionRole::Follower)
134                    .context(error::RegionOperatingRaceSnafu {
135                        peer_id: candidate.id,
136                        region_id: *region_id,
137                    })?;
138                vc.opening_region_guards.push(guard);
139            }
140        }
141
142        let tracing_ctx = TracingContext::from_current_span();
143        let msg = MailboxMessage::json_message(
144            &format!("Open candidate regions: {:?}", region_ids),
145            &format!("Metasrv@{}", ctx.server_addr()),
146            &format!("Datanode-{}@{}", candidate.id, candidate.addr),
147            common_time::util::current_time_millis(),
148            &open_instruction,
149            Some(tracing_ctx.to_w3c()),
150        )
151        .with_context(|_| error::SerializeToJsonSnafu {
152            input: open_instruction.to_string(),
153        })?;
154
155        let operation_timeout =
156            ctx.next_operation_timeout()
157                .context(error::ExceededDeadlineSnafu {
158                    operation: "Open candidate region",
159                })?;
160        let operation_timeout = operation_timeout
161            .div(2)
162            .max(default_distributed_time_constants().region_lease);
163        let ch = Channel::Datanode(candidate.id);
164        let now = Instant::now();
165        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
166
167        match receiver.await {
168            Ok(msg) => {
169                let reply = HeartbeatMailbox::json_reply(&msg)?;
170                info!(
171                    "Received open region reply: {:?}, region: {:?}, elapsed: {:?}",
172                    reply,
173                    region_ids,
174                    now.elapsed()
175                );
176                let InstructionReply::OpenRegions(SimpleReply { result, error }) = reply else {
177                    return error::UnexpectedInstructionReplySnafu {
178                        mailbox_message: msg.to_string(),
179                        reason: "expect open region reply",
180                    }
181                    .fail();
182                };
183
184                if result {
185                    Ok(())
186                } else {
187                    error::RetryLaterSnafu {
188                        reason: format!(
189                            "Region {region_ids:?} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
190                            candidate,
191                            now.elapsed()
192                        ),
193                    }
194                    .fail()
195                }
196            }
197            Err(error::Error::MailboxTimeout { .. }) => {
198                let reason = format!(
199                    "Mailbox received timeout for open candidate region {region_ids:?} on datanode {:?}, elapsed: {:?}",
200                    candidate,
201                    now.elapsed()
202                );
203                error::RetryLaterSnafu { reason }.fail()
204            }
205            Err(e) => Err(e),
206        }
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use std::assert_matches;
213    use std::collections::HashMap;
214
215    use common_catalog::consts::MITO2_ENGINE;
216    use common_meta::DatanodeId;
217    use common_meta::key::table_route::TableRouteValue;
218    use common_meta::key::test_utils::new_test_table_info;
219    use common_meta::peer::Peer;
220    use common_meta::rpc::router::{Region, RegionRoute};
221    use store_api::storage::RegionId;
222
223    use super::*;
224    use crate::error::Error;
225    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
226    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
227    use crate::procedure::test_util::{
228        new_close_region_reply, new_open_region_reply, send_mock_reply,
229    };
230
231    fn new_persistent_context() -> PersistentContext {
232        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
233    }
234
235    fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
236        Instruction::OpenRegions(vec![OpenRegion {
237            region_ident: RegionIdent {
238                datanode_id,
239                table_id: region_id.table_id(),
240                region_number: region_id.region_number(),
241                engine: MITO2_ENGINE.to_string(),
242            },
243            region_storage_path: "/bar/foo/region/".to_string(),
244            region_options: Default::default(),
245            region_wal_options: Default::default(),
246            skip_wal_replay: true,
247        }])
248    }
249
250    #[tokio::test]
251    async fn test_datanode_table_is_not_found_error() {
252        let state = OpenCandidateRegion;
253        let persistent_context = new_persistent_context();
254        let env = TestingEnv::new();
255        let mut ctx = env.context_factory().new_context(persistent_context);
256
257        let err = state
258            .build_open_region_instruction(&mut ctx)
259            .await
260            .unwrap_err();
261
262        assert_matches!(err, Error::DatanodeTableNotFound { .. });
263        assert!(!err.is_retryable());
264    }
265
266    #[tokio::test]
267    async fn test_datanode_is_unreachable() {
268        let state = OpenCandidateRegion;
269        // from_peer: 1
270        // to_peer: 2
271        let persistent_context = new_persistent_context();
272        let region_id = persistent_context.region_ids[0];
273        let to_peer_id = persistent_context.to_peer.id;
274        let env = TestingEnv::new();
275        let mut ctx = env.context_factory().new_context(persistent_context);
276
277        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
278        let err = state
279            .open_candidate_region(&mut ctx, open_instruction)
280            .await
281            .unwrap_err();
282
283        assert_matches!(err, Error::PusherNotFound { .. });
284        assert!(!err.is_retryable());
285    }
286
287    #[tokio::test]
288    async fn test_candidate_region_opening_error() {
289        let state = OpenCandidateRegion;
290        // from_peer: 1
291        // to_peer: 2
292        let persistent_context = new_persistent_context();
293        let region_id = persistent_context.region_ids[0];
294        let to_peer_id = persistent_context.to_peer.id;
295
296        let env = TestingEnv::new();
297        let mut ctx = env.context_factory().new_context(persistent_context);
298        let opening_region_keeper = env.opening_region_keeper();
299        let _guard = opening_region_keeper
300            .register_with_role(to_peer_id, region_id, RegionRole::Follower)
301            .unwrap();
302
303        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
304        let err = state
305            .open_candidate_region(&mut ctx, open_instruction)
306            .await
307            .unwrap_err();
308
309        assert_matches!(err, Error::RegionOperatingRace { .. });
310        assert!(!err.is_retryable());
311    }
312
313    #[tokio::test]
314    async fn test_unexpected_instruction_reply() {
315        let state = OpenCandidateRegion;
316        // from_peer: 1
317        // to_peer: 2
318        let persistent_context = new_persistent_context();
319        let region_id = persistent_context.region_ids[0];
320        let to_peer_id = persistent_context.to_peer.id;
321
322        let mut env = TestingEnv::new();
323        let mut ctx = env.context_factory().new_context(persistent_context);
324        let mailbox_ctx = env.mailbox_context();
325        let mailbox = mailbox_ctx.mailbox().clone();
326
327        let (tx, rx) = tokio::sync::mpsc::channel(1);
328
329        mailbox_ctx
330            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
331            .await;
332
333        // Sends an incorrect reply.
334        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
335
336        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
337        let err = state
338            .open_candidate_region(&mut ctx, open_instruction)
339            .await
340            .unwrap_err();
341
342        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
343        assert!(!err.is_retryable());
344    }
345
346    #[tokio::test]
347    async fn test_instruction_exceeded_deadline() {
348        let state = OpenCandidateRegion;
349        // from_peer: 1
350        // to_peer: 2
351        let persistent_context = new_persistent_context();
352        let region_id = persistent_context.region_ids[0];
353        let to_peer_id = persistent_context.to_peer.id;
354
355        let mut env = TestingEnv::new();
356        let mut ctx = env.context_factory().new_context(persistent_context);
357        let mailbox_ctx = env.mailbox_context();
358        let mailbox = mailbox_ctx.mailbox().clone();
359
360        let (tx, rx) = tokio::sync::mpsc::channel(1);
361
362        mailbox_ctx
363            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
364            .await;
365
366        // Sends an timeout error.
367        send_mock_reply(mailbox, rx, |id| {
368            Err(error::MailboxTimeoutSnafu { id }.build())
369        });
370
371        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
372        let err = state
373            .open_candidate_region(&mut ctx, open_instruction)
374            .await
375            .unwrap_err();
376
377        assert_matches!(err, Error::RetryLater { .. });
378        assert!(err.is_retryable());
379    }
380
381    #[tokio::test]
382    async fn test_open_candidate_region_failed() {
383        let state = OpenCandidateRegion;
384        // from_peer: 1
385        // to_peer: 2
386        let persistent_context = new_persistent_context();
387        let region_id = persistent_context.region_ids[0];
388        let to_peer_id = persistent_context.to_peer.id;
389        let mut env = TestingEnv::new();
390
391        let mut ctx = env.context_factory().new_context(persistent_context);
392        let mailbox_ctx = env.mailbox_context();
393        let mailbox = mailbox_ctx.mailbox().clone();
394
395        let (tx, rx) = tokio::sync::mpsc::channel(1);
396
397        mailbox_ctx
398            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
399            .await;
400
401        send_mock_reply(mailbox, rx, |id| {
402            Ok(new_open_region_reply(
403                id,
404                false,
405                Some("test mocked".to_string()),
406            ))
407        });
408
409        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
410        let err = state
411            .open_candidate_region(&mut ctx, open_instruction)
412            .await
413            .unwrap_err();
414
415        assert_matches!(err, Error::RetryLater { .. });
416        assert!(err.is_retryable());
417        assert!(format!("{err:?}").contains("test mocked"));
418    }
419
420    #[tokio::test]
421    async fn test_next_flush_leader_region_state() {
422        let mut state = Box::new(OpenCandidateRegion);
423        // from_peer: 1
424        // to_peer: 2
425        let persistent_context = new_persistent_context();
426        let from_peer_id = persistent_context.from_peer.id;
427        let region_id = persistent_context.region_ids[0];
428        let to_peer_id = persistent_context.to_peer.id;
429        let mut env = TestingEnv::new();
430
431        // Prepares table
432        let table_info = new_test_table_info(1024);
433        let region_routes = vec![RegionRoute {
434            region: Region::new_test(region_id),
435            leader_peer: Some(Peer::empty(from_peer_id)),
436            ..Default::default()
437        }];
438
439        env.table_metadata_manager()
440            .create_table_metadata(
441                table_info,
442                TableRouteValue::physical(region_routes),
443                HashMap::default(),
444            )
445            .await
446            .unwrap();
447
448        let mut ctx = env.context_factory().new_context(persistent_context);
449        let mailbox_ctx = env.mailbox_context();
450        let mailbox = mailbox_ctx.mailbox().clone();
451
452        let (tx, rx) = tokio::sync::mpsc::channel(1);
453
454        mailbox_ctx
455            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
456            .await;
457
458        send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
459        let procedure_ctx = new_procedure_context();
460        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
461        let vc = ctx.volatile_ctx;
462        assert_eq!(vc.opening_region_guards[0].info(), (to_peer_id, region_id));
463
464        let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
465        assert_matches!(flush_leader_region, PreFlushRegion);
466    }
467}