meta_srv/procedure/region_migration/
open_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::ops::Div;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::RegionIdent;
20use common_meta::distributed_time_constants::default_distributed_time_constants;
21use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
22use common_meta::key::datanode_table::RegionInfo;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use common_telemetry::tracing_context::TracingContext;
26use serde::{Deserialize, Serialize};
27use snafu::{OptionExt, ResultExt};
28use tokio::time::Instant;
29
30use crate::error::{self, Result};
31use crate::handler::HeartbeatMailbox;
32use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
33use crate::procedure::region_migration::{Context, State};
34use crate::service::mailbox::Channel;
35
36#[derive(Debug, Serialize, Deserialize)]
37pub struct OpenCandidateRegion;
38
39#[async_trait::async_trait]
40#[typetag::serde]
41impl State for OpenCandidateRegion {
42    async fn next(
43        &mut self,
44        ctx: &mut Context,
45        _procedure_ctx: &ProcedureContext,
46    ) -> Result<(Box<dyn State>, Status)> {
47        let instruction = self.build_open_region_instruction(ctx).await?;
48        let now = Instant::now();
49        self.open_candidate_region(ctx, instruction).await?;
50        ctx.update_open_candidate_region_elapsed(now);
51
52        Ok((Box::new(PreFlushRegion), Status::executing(false)))
53    }
54
55    fn as_any(&self) -> &dyn Any {
56        self
57    }
58}
59
60impl OpenCandidateRegion {
61    /// Builds open region instructions
62    ///
63    /// Abort(non-retry):
64    /// - Datanode Table is not found.
65    async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
66        let region_ids = ctx.persistent_ctx.region_ids.clone();
67        let from_peer_id = ctx.persistent_ctx.from_peer.id;
68        let to_peer_id = ctx.persistent_ctx.to_peer.id;
69        let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
70        let mut open_regions = Vec::with_capacity(region_ids.len());
71
72        for region_id in region_ids {
73            let table_id = region_id.table_id();
74            let region_number = region_id.region_number();
75            let datanode_table_value = datanode_table_values.get(&table_id).context(
76                error::DatanodeTableNotFoundSnafu {
77                    table_id,
78                    datanode_id: from_peer_id,
79                },
80            )?;
81            let RegionInfo {
82                region_storage_path,
83                region_options,
84                region_wal_options,
85                engine,
86            } = datanode_table_value.region_info.clone();
87
88            open_regions.push(OpenRegion::new(
89                RegionIdent {
90                    datanode_id: to_peer_id,
91                    table_id,
92                    region_number,
93                    engine,
94                },
95                &region_storage_path,
96                region_options,
97                region_wal_options,
98                true,
99            ));
100        }
101
102        Ok(Instruction::OpenRegions(open_regions))
103    }
104
105    /// Opens the candidate region.
106    ///
107    /// Abort(non-retry):
108    /// - The Datanode is unreachable(e.g., Candidate pusher is not found).
109    /// - Unexpected instruction reply.
110    /// - Another procedure is opening the candidate region.
111    ///
112    /// Retry:
113    /// - Exceeded deadline of open instruction.
114    /// - Datanode failed to open the candidate region.
115    async fn open_candidate_region(
116        &self,
117        ctx: &mut Context,
118        open_instruction: Instruction,
119    ) -> Result<()> {
120        let pc = &ctx.persistent_ctx;
121        let vc = &mut ctx.volatile_ctx;
122        let region_ids = &pc.region_ids;
123        let candidate = &pc.to_peer;
124
125        // This method might be invoked multiple times.
126        // Only registers the guard if `opening_region_guard` is absent.
127        if vc.opening_region_guards.is_empty() {
128            for region_id in region_ids {
129                // Registers the opening region.
130                let guard = ctx
131                    .opening_region_keeper
132                    .register(candidate.id, *region_id)
133                    .context(error::RegionOperatingRaceSnafu {
134                        peer_id: candidate.id,
135                        region_id: *region_id,
136                    })?;
137                vc.opening_region_guards.push(guard);
138            }
139        }
140
141        let tracing_ctx = TracingContext::from_current_span();
142        let msg = MailboxMessage::json_message(
143            &format!("Open candidate regions: {:?}", region_ids),
144            &format!("Metasrv@{}", ctx.server_addr()),
145            &format!("Datanode-{}@{}", candidate.id, candidate.addr),
146            common_time::util::current_time_millis(),
147            &open_instruction,
148            Some(tracing_ctx.to_w3c()),
149        )
150        .with_context(|_| error::SerializeToJsonSnafu {
151            input: open_instruction.to_string(),
152        })?;
153
154        let operation_timeout =
155            ctx.next_operation_timeout()
156                .context(error::ExceededDeadlineSnafu {
157                    operation: "Open candidate region",
158                })?;
159        let operation_timeout = operation_timeout
160            .div(2)
161            .max(default_distributed_time_constants().region_lease);
162        let ch = Channel::Datanode(candidate.id);
163        let now = Instant::now();
164        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
165
166        match receiver.await {
167            Ok(msg) => {
168                let reply = HeartbeatMailbox::json_reply(&msg)?;
169                info!(
170                    "Received open region reply: {:?}, region: {:?}, elapsed: {:?}",
171                    reply,
172                    region_ids,
173                    now.elapsed()
174                );
175                let InstructionReply::OpenRegions(SimpleReply { result, error }) = reply else {
176                    return error::UnexpectedInstructionReplySnafu {
177                        mailbox_message: msg.to_string(),
178                        reason: "expect open region reply",
179                    }
180                    .fail();
181                };
182
183                if result {
184                    Ok(())
185                } else {
186                    error::RetryLaterSnafu {
187                        reason: format!(
188                            "Region {region_ids:?} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
189                            candidate,
190                            now.elapsed()
191                        ),
192                    }
193                    .fail()
194                }
195            }
196            Err(error::Error::MailboxTimeout { .. }) => {
197                let reason = format!(
198                    "Mailbox received timeout for open candidate region {region_ids:?} on datanode {:?}, elapsed: {:?}",
199                    candidate,
200                    now.elapsed()
201                );
202                error::RetryLaterSnafu { reason }.fail()
203            }
204            Err(e) => Err(e),
205        }
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use std::assert_matches::assert_matches;
212    use std::collections::HashMap;
213
214    use common_catalog::consts::MITO2_ENGINE;
215    use common_meta::DatanodeId;
216    use common_meta::key::table_route::TableRouteValue;
217    use common_meta::key::test_utils::new_test_table_info;
218    use common_meta::peer::Peer;
219    use common_meta::rpc::router::{Region, RegionRoute};
220    use store_api::storage::RegionId;
221
222    use super::*;
223    use crate::error::Error;
224    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
225    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
226    use crate::procedure::test_util::{
227        new_close_region_reply, new_open_region_reply, send_mock_reply,
228    };
229
230    fn new_persistent_context() -> PersistentContext {
231        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
232    }
233
234    fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
235        Instruction::OpenRegions(vec![OpenRegion {
236            region_ident: RegionIdent {
237                datanode_id,
238                table_id: region_id.table_id(),
239                region_number: region_id.region_number(),
240                engine: MITO2_ENGINE.to_string(),
241            },
242            region_storage_path: "/bar/foo/region/".to_string(),
243            region_options: Default::default(),
244            region_wal_options: Default::default(),
245            skip_wal_replay: true,
246        }])
247    }
248
249    #[tokio::test]
250    async fn test_datanode_table_is_not_found_error() {
251        let state = OpenCandidateRegion;
252        let persistent_context = new_persistent_context();
253        let env = TestingEnv::new();
254        let mut ctx = env.context_factory().new_context(persistent_context);
255
256        let err = state
257            .build_open_region_instruction(&mut ctx)
258            .await
259            .unwrap_err();
260
261        assert_matches!(err, Error::DatanodeTableNotFound { .. });
262        assert!(!err.is_retryable());
263    }
264
265    #[tokio::test]
266    async fn test_datanode_is_unreachable() {
267        let state = OpenCandidateRegion;
268        // from_peer: 1
269        // to_peer: 2
270        let persistent_context = new_persistent_context();
271        let region_id = persistent_context.region_ids[0];
272        let to_peer_id = persistent_context.to_peer.id;
273        let env = TestingEnv::new();
274        let mut ctx = env.context_factory().new_context(persistent_context);
275
276        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
277        let err = state
278            .open_candidate_region(&mut ctx, open_instruction)
279            .await
280            .unwrap_err();
281
282        assert_matches!(err, Error::PusherNotFound { .. });
283        assert!(!err.is_retryable());
284    }
285
286    #[tokio::test]
287    async fn test_candidate_region_opening_error() {
288        let state = OpenCandidateRegion;
289        // from_peer: 1
290        // to_peer: 2
291        let persistent_context = new_persistent_context();
292        let region_id = persistent_context.region_ids[0];
293        let to_peer_id = persistent_context.to_peer.id;
294
295        let env = TestingEnv::new();
296        let mut ctx = env.context_factory().new_context(persistent_context);
297        let opening_region_keeper = env.opening_region_keeper();
298        let _guard = opening_region_keeper
299            .register(to_peer_id, region_id)
300            .unwrap();
301
302        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
303        let err = state
304            .open_candidate_region(&mut ctx, open_instruction)
305            .await
306            .unwrap_err();
307
308        assert_matches!(err, Error::RegionOperatingRace { .. });
309        assert!(!err.is_retryable());
310    }
311
312    #[tokio::test]
313    async fn test_unexpected_instruction_reply() {
314        let state = OpenCandidateRegion;
315        // from_peer: 1
316        // to_peer: 2
317        let persistent_context = new_persistent_context();
318        let region_id = persistent_context.region_ids[0];
319        let to_peer_id = persistent_context.to_peer.id;
320
321        let mut env = TestingEnv::new();
322        let mut ctx = env.context_factory().new_context(persistent_context);
323        let mailbox_ctx = env.mailbox_context();
324        let mailbox = mailbox_ctx.mailbox().clone();
325
326        let (tx, rx) = tokio::sync::mpsc::channel(1);
327
328        mailbox_ctx
329            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
330            .await;
331
332        // Sends an incorrect reply.
333        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
334
335        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
336        let err = state
337            .open_candidate_region(&mut ctx, open_instruction)
338            .await
339            .unwrap_err();
340
341        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
342        assert!(!err.is_retryable());
343    }
344
345    #[tokio::test]
346    async fn test_instruction_exceeded_deadline() {
347        let state = OpenCandidateRegion;
348        // from_peer: 1
349        // to_peer: 2
350        let persistent_context = new_persistent_context();
351        let region_id = persistent_context.region_ids[0];
352        let to_peer_id = persistent_context.to_peer.id;
353
354        let mut env = TestingEnv::new();
355        let mut ctx = env.context_factory().new_context(persistent_context);
356        let mailbox_ctx = env.mailbox_context();
357        let mailbox = mailbox_ctx.mailbox().clone();
358
359        let (tx, rx) = tokio::sync::mpsc::channel(1);
360
361        mailbox_ctx
362            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
363            .await;
364
365        // Sends an timeout error.
366        send_mock_reply(mailbox, rx, |id| {
367            Err(error::MailboxTimeoutSnafu { id }.build())
368        });
369
370        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
371        let err = state
372            .open_candidate_region(&mut ctx, open_instruction)
373            .await
374            .unwrap_err();
375
376        assert_matches!(err, Error::RetryLater { .. });
377        assert!(err.is_retryable());
378    }
379
380    #[tokio::test]
381    async fn test_open_candidate_region_failed() {
382        let state = OpenCandidateRegion;
383        // from_peer: 1
384        // to_peer: 2
385        let persistent_context = new_persistent_context();
386        let region_id = persistent_context.region_ids[0];
387        let to_peer_id = persistent_context.to_peer.id;
388        let mut env = TestingEnv::new();
389
390        let mut ctx = env.context_factory().new_context(persistent_context);
391        let mailbox_ctx = env.mailbox_context();
392        let mailbox = mailbox_ctx.mailbox().clone();
393
394        let (tx, rx) = tokio::sync::mpsc::channel(1);
395
396        mailbox_ctx
397            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
398            .await;
399
400        send_mock_reply(mailbox, rx, |id| {
401            Ok(new_open_region_reply(
402                id,
403                false,
404                Some("test mocked".to_string()),
405            ))
406        });
407
408        let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
409        let err = state
410            .open_candidate_region(&mut ctx, open_instruction)
411            .await
412            .unwrap_err();
413
414        assert_matches!(err, Error::RetryLater { .. });
415        assert!(err.is_retryable());
416        assert!(format!("{err:?}").contains("test mocked"));
417    }
418
419    #[tokio::test]
420    async fn test_next_flush_leader_region_state() {
421        let mut state = Box::new(OpenCandidateRegion);
422        // from_peer: 1
423        // to_peer: 2
424        let persistent_context = new_persistent_context();
425        let from_peer_id = persistent_context.from_peer.id;
426        let region_id = persistent_context.region_ids[0];
427        let to_peer_id = persistent_context.to_peer.id;
428        let mut env = TestingEnv::new();
429
430        // Prepares table
431        let table_info = new_test_table_info(1024);
432        let region_routes = vec![RegionRoute {
433            region: Region::new_test(region_id),
434            leader_peer: Some(Peer::empty(from_peer_id)),
435            ..Default::default()
436        }];
437
438        env.table_metadata_manager()
439            .create_table_metadata(
440                table_info,
441                TableRouteValue::physical(region_routes),
442                HashMap::default(),
443            )
444            .await
445            .unwrap();
446
447        let mut ctx = env.context_factory().new_context(persistent_context);
448        let mailbox_ctx = env.mailbox_context();
449        let mailbox = mailbox_ctx.mailbox().clone();
450
451        let (tx, rx) = tokio::sync::mpsc::channel(1);
452
453        mailbox_ctx
454            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
455            .await;
456
457        send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
458        let procedure_ctx = new_procedure_context();
459        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
460        let vc = ctx.volatile_ctx;
461        assert_eq!(vc.opening_region_guards[0].info(), (to_peer_id, region_id));
462
463        let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
464        assert_matches!(flush_leader_region, PreFlushRegion);
465    }
466}