meta_srv/procedure/region_migration/
flush_leader_region.rs1use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use serde::{Deserialize, Serialize};
19use snafu::OptionExt;
20use store_api::region_request::RegionFlushReason;
21use tokio::time::Instant;
22
23use crate::error::{self, Result};
24use crate::procedure::region_migration::update_metadata::UpdateMetadata;
25use crate::procedure::region_migration::{Context, State};
26use crate::procedure::utils;
27
28#[derive(Debug, Serialize, Deserialize)]
32pub struct PreFlushRegion;
33
34#[async_trait::async_trait]
35#[typetag::serde]
36impl State for PreFlushRegion {
37 async fn next(
38 &mut self,
39 ctx: &mut Context,
40 _procedure_ctx: &ProcedureContext,
41 ) -> Result<(Box<dyn State>, Status)> {
42 let timer = Instant::now();
43 self.flush_region(ctx).await?;
44 ctx.update_flush_leader_region_elapsed(timer);
45 Ok((
50 Box::new(UpdateMetadata::Downgrade),
51 Status::executing(false),
52 ))
53 }
54
55 fn as_any(&self) -> &dyn Any {
56 self
57 }
58}
59
60impl PreFlushRegion {
61 async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
75 let operation_timeout =
76 ctx.next_operation_timeout()
77 .context(error::ExceededDeadlineSnafu {
78 operation: "Flush leader region",
79 })?;
80 let region_ids = &ctx.persistent_ctx.region_ids;
81 let leader = &ctx.persistent_ctx.from_peer;
82
83 utils::flush_region(
84 &ctx.mailbox,
85 &ctx.server_addr,
86 region_ids,
87 leader,
88 operation_timeout,
89 utils::ErrorStrategy::Ignore,
90 Some(RegionFlushReason::RegionMigration),
91 )
92 .await
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use std::assert_matches;
99
100 use store_api::storage::RegionId;
101
102 use super::*;
103 use crate::error::Error;
104 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
105 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
106 use crate::procedure::test_util::{
107 new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
108 };
109 use crate::service::mailbox::Channel;
110
111 fn new_persistent_context() -> PersistentContext {
112 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
113 }
114
115 #[tokio::test]
116 async fn test_datanode_is_unreachable() {
117 let state = PreFlushRegion;
118 let persistent_context = new_persistent_context();
121 let env = TestingEnv::new();
122 let mut ctx = env.context_factory().new_context(persistent_context);
123 state.flush_region(&mut ctx).await.unwrap();
125 }
126
127 #[tokio::test]
128 async fn test_unexpected_instruction_reply() {
129 common_telemetry::init_default_ut_logging();
130 let state = PreFlushRegion;
131 let persistent_context = new_persistent_context();
134 let from_peer_id = persistent_context.from_peer.id;
135 let mut env = TestingEnv::new();
136 let mut ctx = env.context_factory().new_context(persistent_context);
137 let mailbox_ctx = env.mailbox_context();
138 let mailbox = mailbox_ctx.mailbox().clone();
139 let (tx, rx) = tokio::sync::mpsc::channel(1);
140 mailbox_ctx
141 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
142 .await;
143 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
145 let err = state.flush_region(&mut ctx).await.unwrap_err();
146 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
147 assert!(!err.is_retryable());
148 }
149
150 #[tokio::test]
151 async fn test_instruction_exceeded_deadline() {
152 let state = PreFlushRegion;
153 let persistent_context = new_persistent_context();
156 let from_peer_id = persistent_context.from_peer.id;
157 let mut env = TestingEnv::new();
158 let mut ctx = env.context_factory().new_context(persistent_context);
159 let mailbox_ctx = env.mailbox_context();
160 let mailbox = mailbox_ctx.mailbox().clone();
161 let (tx, rx) = tokio::sync::mpsc::channel(1);
162 mailbox_ctx
163 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
164 .await;
165 send_mock_reply(mailbox, rx, |id| {
167 Err(error::MailboxTimeoutSnafu { id }.build())
168 });
169
170 let err = state.flush_region(&mut ctx).await.unwrap_err();
171 assert_matches!(err, Error::ExceededDeadline { .. });
172 assert!(!err.is_retryable());
173 }
174
175 #[tokio::test]
176 async fn test_flush_region_failed() {
177 common_telemetry::init_default_ut_logging();
178 let state = PreFlushRegion;
179 let persistent_context = new_persistent_context();
182 let from_peer_id = persistent_context.from_peer.id;
183 let region_id = persistent_context.region_ids[0];
184 let mut env = TestingEnv::new();
185 let mut ctx = env.context_factory().new_context(persistent_context);
186 let mailbox_ctx = env.mailbox_context();
187 let mailbox = mailbox_ctx.mailbox().clone();
188 let (tx, rx) = tokio::sync::mpsc::channel(1);
189 mailbox_ctx
190 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
191 .await;
192 send_mock_reply(mailbox, rx, move |id| {
193 Ok(new_flush_region_reply_for_region(
194 id,
195 region_id,
196 false,
197 Some("test mocked".to_string()),
198 ))
199 });
200 state.flush_region(&mut ctx).await.unwrap();
202 }
203
204 #[tokio::test]
205 async fn test_next_update_metadata_downgrade_state() {
206 common_telemetry::init_default_ut_logging();
207 let mut state = PreFlushRegion;
208 let persistent_context = new_persistent_context();
211 let from_peer_id = persistent_context.from_peer.id;
212 let region_id = persistent_context.region_ids[0];
213 let mut env = TestingEnv::new();
214 let mut ctx = env.context_factory().new_context(persistent_context);
215 let mailbox_ctx = env.mailbox_context();
216 let mailbox = mailbox_ctx.mailbox().clone();
217 let (tx, rx) = tokio::sync::mpsc::channel(1);
218 mailbox_ctx
219 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
220 .await;
221 send_mock_reply(mailbox, rx, move |id| {
222 Ok(new_flush_region_reply_for_region(id, region_id, true, None))
223 });
224 let procedure_ctx = new_procedure_context();
225 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
226
227 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
228 assert_matches!(update_metadata, UpdateMetadata::Downgrade);
229 }
230}