meta_srv/procedure/region_migration/
flush_leader_region.rs1use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use serde::{Deserialize, Serialize};
19use snafu::OptionExt;
20use tokio::time::Instant;
21
22use crate::error::{self, Result};
23use crate::procedure::region_migration::update_metadata::UpdateMetadata;
24use crate::procedure::region_migration::{Context, State};
25use crate::procedure::utils;
26
27#[derive(Debug, Serialize, Deserialize)]
31pub struct PreFlushRegion;
32
33#[async_trait::async_trait]
34#[typetag::serde]
35impl State for PreFlushRegion {
36 async fn next(
37 &mut self,
38 ctx: &mut Context,
39 _procedure_ctx: &ProcedureContext,
40 ) -> Result<(Box<dyn State>, Status)> {
41 let timer = Instant::now();
42 self.flush_region(ctx).await?;
43 ctx.update_flush_leader_region_elapsed(timer);
44 Ok((
49 Box::new(UpdateMetadata::Downgrade),
50 Status::executing(false),
51 ))
52 }
53
54 fn as_any(&self) -> &dyn Any {
55 self
56 }
57}
58
59impl PreFlushRegion {
60 async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
74 let operation_timeout =
75 ctx.next_operation_timeout()
76 .context(error::ExceededDeadlineSnafu {
77 operation: "Flush leader region",
78 })?;
79 let region_ids = &ctx.persistent_ctx.region_ids;
80 let leader = &ctx.persistent_ctx.from_peer;
81
82 utils::flush_region(
83 &ctx.mailbox,
84 &ctx.server_addr,
85 region_ids,
86 leader,
87 operation_timeout,
88 utils::ErrorStrategy::Ignore,
89 )
90 .await
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use std::assert_matches::assert_matches;
97
98 use store_api::storage::RegionId;
99
100 use super::*;
101 use crate::error::Error;
102 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
103 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
104 use crate::procedure::test_util::{
105 new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
106 };
107 use crate::service::mailbox::Channel;
108
109 fn new_persistent_context() -> PersistentContext {
110 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
111 }
112
113 #[tokio::test]
114 async fn test_datanode_is_unreachable() {
115 let state = PreFlushRegion;
116 let persistent_context = new_persistent_context();
119 let env = TestingEnv::new();
120 let mut ctx = env.context_factory().new_context(persistent_context);
121 state.flush_region(&mut ctx).await.unwrap();
123 }
124
125 #[tokio::test]
126 async fn test_unexpected_instruction_reply() {
127 common_telemetry::init_default_ut_logging();
128 let state = PreFlushRegion;
129 let persistent_context = new_persistent_context();
132 let from_peer_id = persistent_context.from_peer.id;
133 let mut env = TestingEnv::new();
134 let mut ctx = env.context_factory().new_context(persistent_context);
135 let mailbox_ctx = env.mailbox_context();
136 let mailbox = mailbox_ctx.mailbox().clone();
137 let (tx, rx) = tokio::sync::mpsc::channel(1);
138 mailbox_ctx
139 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
140 .await;
141 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
143 let err = state.flush_region(&mut ctx).await.unwrap_err();
144 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
145 assert!(!err.is_retryable());
146 }
147
148 #[tokio::test]
149 async fn test_instruction_exceeded_deadline() {
150 let state = PreFlushRegion;
151 let persistent_context = new_persistent_context();
154 let from_peer_id = persistent_context.from_peer.id;
155 let mut env = TestingEnv::new();
156 let mut ctx = env.context_factory().new_context(persistent_context);
157 let mailbox_ctx = env.mailbox_context();
158 let mailbox = mailbox_ctx.mailbox().clone();
159 let (tx, rx) = tokio::sync::mpsc::channel(1);
160 mailbox_ctx
161 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
162 .await;
163 send_mock_reply(mailbox, rx, |id| {
165 Err(error::MailboxTimeoutSnafu { id }.build())
166 });
167
168 let err = state.flush_region(&mut ctx).await.unwrap_err();
169 assert_matches!(err, Error::ExceededDeadline { .. });
170 assert!(!err.is_retryable());
171 }
172
173 #[tokio::test]
174 async fn test_flush_region_failed() {
175 common_telemetry::init_default_ut_logging();
176 let state = PreFlushRegion;
177 let persistent_context = new_persistent_context();
180 let from_peer_id = persistent_context.from_peer.id;
181 let region_id = persistent_context.region_ids[0];
182 let mut env = TestingEnv::new();
183 let mut ctx = env.context_factory().new_context(persistent_context);
184 let mailbox_ctx = env.mailbox_context();
185 let mailbox = mailbox_ctx.mailbox().clone();
186 let (tx, rx) = tokio::sync::mpsc::channel(1);
187 mailbox_ctx
188 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
189 .await;
190 send_mock_reply(mailbox, rx, move |id| {
191 Ok(new_flush_region_reply_for_region(
192 id,
193 region_id,
194 false,
195 Some("test mocked".to_string()),
196 ))
197 });
198 state.flush_region(&mut ctx).await.unwrap();
200 }
201
202 #[tokio::test]
203 async fn test_next_update_metadata_downgrade_state() {
204 common_telemetry::init_default_ut_logging();
205 let mut state = PreFlushRegion;
206 let persistent_context = new_persistent_context();
209 let from_peer_id = persistent_context.from_peer.id;
210 let region_id = persistent_context.region_ids[0];
211 let mut env = TestingEnv::new();
212 let mut ctx = env.context_factory().new_context(persistent_context);
213 let mailbox_ctx = env.mailbox_context();
214 let mailbox = mailbox_ctx.mailbox().clone();
215 let (tx, rx) = tokio::sync::mpsc::channel(1);
216 mailbox_ctx
217 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
218 .await;
219 send_mock_reply(mailbox, rx, move |id| {
220 Ok(new_flush_region_reply_for_region(id, region_id, true, None))
221 });
222 let procedure_ctx = new_procedure_context();
223 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
224
225 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
226 assert_matches!(update_metadata, UpdateMetadata::Downgrade);
227 }
228}