1use std::time::Duration;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
19use common_meta::peer::Peer;
20use common_telemetry::tracing_context::TracingContext;
21use common_telemetry::{info, warn};
22use snafu::ResultExt;
23use store_api::region_request::RegionFlushReason;
24use store_api::storage::RegionId;
25use tokio::time::Instant;
26
27use crate::error::{self, Error, Result};
28use crate::handler::HeartbeatMailbox;
29use crate::service::mailbox::{Channel, MailboxRef};
30
31pub(crate) enum ErrorStrategy {
32 Ignore,
33 Retry,
34}
35
36fn handle_flush_region_reply(
37 reply: &InstructionReply,
38 region_ids: &[RegionId],
39 msg: &MailboxMessage,
40) -> Result<(bool, Option<String>)> {
41 let result = match reply {
42 InstructionReply::FlushRegions(flush_reply) => {
43 if flush_reply.results.len() != region_ids.len() {
44 return error::UnexpectedInstructionReplySnafu {
45 mailbox_message: msg.to_string(),
46 reason: format!(
47 "expect {} region flush result, but got {}",
48 region_ids.len(),
49 flush_reply.results.len()
50 ),
51 }
52 .fail();
53 }
54
55 match flush_reply.overall_success {
56 true => (true, None),
57 false => (
58 false,
59 Some(
60 flush_reply
61 .results
62 .iter()
63 .filter_map(|(region_id, result)| match result {
64 Ok(_) => None,
65 Err(e) => Some(format!("{}: {:?}", region_id, e)),
66 })
67 .collect::<Vec<String>>()
68 .join("; "),
69 ),
70 ),
71 }
72 }
73 _ => {
74 return error::UnexpectedInstructionReplySnafu {
75 mailbox_message: msg.to_string(),
76 reason: "expect flush region reply",
77 }
78 .fail();
79 }
80 };
81
82 Ok(result)
83}
84
85pub(crate) async fn flush_region(
97 mailbox: &MailboxRef,
98 server_addr: &str,
99 region_ids: &[RegionId],
100 datanode: &Peer,
101 timeout: Duration,
102 error_strategy: ErrorStrategy,
103 reason: Option<RegionFlushReason>,
104) -> Result<()> {
105 let mut flush_regions =
106 FlushRegions::sync_batch(region_ids.to_vec(), FlushErrorStrategy::TryAll);
107 flush_regions.reason = reason;
108 let flush_instruction = Instruction::FlushRegions(flush_regions);
109
110 let tracing_ctx = TracingContext::from_current_span();
111 let msg = MailboxMessage::json_message(
112 &format!("Flush regions: {:?}", region_ids),
113 &format!("Metasrv@{}", server_addr),
114 &format!("Datanode-{}@{}", datanode.id, datanode.addr),
115 common_time::util::current_time_millis(),
116 &flush_instruction,
117 Some(tracing_ctx.to_w3c()),
118 )
119 .with_context(|_| error::SerializeToJsonSnafu {
120 input: flush_instruction.to_string(),
121 })?;
122
123 let ch = Channel::Datanode(datanode.id);
124 let now = Instant::now();
125 let receiver = mailbox.send(&ch, msg, timeout).await;
126 let receiver = match receiver {
127 Ok(receiver) => receiver,
128 Err(error::Error::PusherNotFound { .. }) => match error_strategy {
129 ErrorStrategy::Ignore => {
130 warn!(
131 "Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
132 region_ids, datanode
133 );
134 return Ok(());
135 }
136 ErrorStrategy::Retry => error::RetryLaterSnafu {
137 reason: format!(
138 "Pusher not found for flush regions on datanode {:?}, elapsed: {:?}",
139 datanode,
140 now.elapsed()
141 ),
142 }
143 .fail()?,
144 },
145 Err(err) => {
146 return Err(err);
147 }
148 };
149
150 match receiver.await {
151 Ok(msg) => {
152 let reply = HeartbeatMailbox::json_reply(&msg)?;
153 info!(
154 "Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}",
155 reply,
156 region_ids,
157 now.elapsed()
158 );
159 let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?;
160 if let Some(error) = error {
161 match error_strategy {
162 ErrorStrategy::Ignore => {
163 warn!(
164 "Failed to flush regions {:?}, the datanode({}) error is ignored: {}",
165 region_ids, datanode, error
166 );
167 }
168 ErrorStrategy::Retry => {
169 return error::RetryLaterSnafu {
170 reason: format!(
171 "Failed to flush regions {:?}, the datanode({}) error is retried: {}",
172 region_ids,
173 datanode,
174 error,
175 ),
176 }
177 .fail()?;
178 }
179 }
180 } else if result {
181 info!(
182 "The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}",
183 region_ids,
184 datanode,
185 now.elapsed()
186 );
187 }
188
189 Ok(())
190 }
191 Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
192 operation: "Flush regions",
193 }
194 .fail(),
195 Err(error::Error::MailboxChannelClosed { .. }) => match error_strategy {
196 ErrorStrategy::Ignore => {
197 warn!(
198 "Failed to flush regions({:?}), the datanode({}) is unreachable(MailboxChannelClosed). Skip flush operation.",
199 region_ids, datanode
200 );
201 Ok(())
202 }
203 ErrorStrategy::Retry => error::RetryLaterSnafu {
204 reason: format!(
205 "Mailbox closed when sending flush region to datanode {:?}, elapsed: {:?}",
206 datanode,
207 now.elapsed()
208 ),
209 }
210 .fail()?,
211 },
212 Err(err) => Err(err),
213 }
214}
215
216#[cfg(any(test, feature = "mock"))]
217pub mod mock {
218 use std::io::Error;
219 use std::sync::Arc;
220
221 use api::v1::region::region_server::RegionServer;
222 use api::v1::region::{RegionResponse, region_request};
223 use api::v1::{ResponseHeader, Status as PbStatus};
224 use async_trait::async_trait;
225 use client::Client;
226 use common_grpc::channel_manager::ChannelManager;
227 use common_meta::peer::Peer;
228 use common_runtime::runtime::BuilderBuild;
229 use common_runtime::{Builder as RuntimeBuilder, Runtime};
230 use hyper_util::rt::TokioIo;
231 use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
232 use tokio::sync::mpsc;
233 use tonic::codec::CompressionEncoding;
234 use tonic::transport::Server;
235 use tower::service_fn;
236
237 #[derive(Clone)]
239 pub struct EchoRegionServer {
240 runtime: Runtime,
241 received_requests: mpsc::Sender<region_request::Body>,
242 }
243
244 impl EchoRegionServer {
245 pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
246 let (tx, rx) = mpsc::channel(10);
247 (
248 Self {
249 runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
250 received_requests: tx,
251 },
252 rx,
253 )
254 }
255
256 pub fn new_client(&self, datanode: &Peer) -> Client {
257 let (client, server) = tokio::io::duplex(1024);
258
259 let handler =
260 RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
261
262 tokio::spawn(async move {
263 Server::builder()
264 .add_service(
265 RegionServer::new(handler)
266 .accept_compressed(CompressionEncoding::Gzip)
267 .accept_compressed(CompressionEncoding::Zstd)
268 .send_compressed(CompressionEncoding::Gzip)
269 .send_compressed(CompressionEncoding::Zstd),
270 )
271 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
272 .await
273 });
274
275 let channel_manager = ChannelManager::new();
276 let mut client = Some(client);
277 channel_manager
278 .reset_with_connector(
279 datanode.addr.clone(),
280 service_fn(move |_| {
281 let client = client.take().unwrap();
282 async move { Ok::<_, Error>(TokioIo::new(client)) }
283 }),
284 )
285 .unwrap();
286 Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
287 }
288 }
289
290 #[async_trait]
291 impl RegionServerHandler for EchoRegionServer {
292 async fn handle(
293 &self,
294 request: region_request::Body,
295 ) -> servers::error::Result<RegionResponse> {
296 self.received_requests.send(request).await.unwrap();
297
298 Ok(RegionResponse {
299 header: Some(ResponseHeader {
300 status: Some(PbStatus {
301 status_code: 0,
302 err_msg: String::default(),
303 }),
304 }),
305 affected_rows: 0,
306 extensions: Default::default(),
307 metadata: Vec::new(),
308 })
309 }
310 }
311}
312
313#[cfg(test)]
314mod tests {
315 use std::sync::Arc;
316
317 use common_meta::instruction::{FlushStrategy, Instruction};
318 use common_meta::kv_backend::memory::MemoryKvBackend;
319 use common_meta::sequence::SequenceBuilder;
320
321 use super::*;
322 use crate::procedure::test_util::{MailboxContext, new_flush_region_reply_for_region};
323
324 #[tokio::test]
325 async fn test_flush_region_payload_includes_reason() {
326 let kv_backend = Arc::new(MemoryKvBackend::new());
327 let mailbox_sequence = SequenceBuilder::new("test_flush_region_reason", kv_backend).build();
328 let mut mailbox_ctx = MailboxContext::new(mailbox_sequence);
329
330 let datanode = Peer::new(1, "127.0.0.1:4001");
331 let region_id = RegionId::new(1024, 1);
332 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
333 mailbox_ctx
334 .insert_heartbeat_response_receiver(Channel::Datanode(datanode.id), tx)
335 .await;
336
337 let mailbox = mailbox_ctx.mailbox().clone();
338 let reply_mailbox = mailbox.clone();
339 let reply_task = tokio::spawn(async move {
340 let response = rx.recv().await.unwrap().unwrap();
341 let msg = response.mailbox_message.unwrap();
342 let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
343 let Instruction::FlushRegions(flush_regions) = instruction else {
344 panic!("Expected FlushRegions instruction");
345 };
346
347 assert_eq!(flush_regions.region_ids, vec![region_id]);
348 assert_eq!(flush_regions.strategy, FlushStrategy::Sync);
349 assert_eq!(flush_regions.reason, Some(RegionFlushReason::Repartition));
350
351 reply_mailbox
352 .on_recv(
353 msg.id,
354 Ok(new_flush_region_reply_for_region(
355 msg.id, region_id, true, None,
356 )),
357 )
358 .await
359 .unwrap();
360 });
361
362 flush_region(
363 &mailbox,
364 "127.0.0.1:3002",
365 &[region_id],
366 &datanode,
367 Duration::from_secs(5),
368 ErrorStrategy::Retry,
369 Some(RegionFlushReason::Repartition),
370 )
371 .await
372 .unwrap();
373 reply_task.await.unwrap();
374 }
375}
376
377#[cfg(test)]
378pub mod test_data {
379 use std::sync::Arc;
380
381 use chrono::DateTime;
382 use common_catalog::consts::MITO2_ENGINE;
383 use common_meta::ddl::flow_meta::FlowMetadataAllocator;
384 use common_meta::ddl::table_meta::TableMetadataAllocator;
385 use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
386 use common_meta::key::TableMetadataManager;
387 use common_meta::key::flow::FlowMetadataManager;
388 use common_meta::kv_backend::memory::MemoryKvBackend;
389 use common_meta::node_manager::NodeManagerRef;
390 use common_meta::peer::Peer;
391 use common_meta::region_keeper::MemoryRegionKeeper;
392 use common_meta::region_registry::LeaderRegionRegistry;
393 use common_meta::rpc::router::RegionRoute;
394 use common_meta::sequence::SequenceBuilder;
395 use common_meta::wal_provider::WalProvider;
396 use datatypes::prelude::ConcreteDataType;
397 use datatypes::schema::{ColumnSchema, Schema};
398 use table::metadata::{TableIdent, TableInfo, TableMeta, TableType};
399 use table::requests::TableOptions;
400
401 use crate::cache_invalidator::MetasrvCacheInvalidator;
402 use crate::handler::{HeartbeatMailbox, Pushers};
403 use crate::metasrv::MetasrvInfo;
404 use crate::test_util::new_region_route;
405
406 pub fn new_region_routes() -> Vec<RegionRoute> {
407 let peers = vec![
408 Peer::new(1, "127.0.0.1:4001"),
409 Peer::new(2, "127.0.0.1:4002"),
410 Peer::new(3, "127.0.0.1:4003"),
411 ];
412 vec![
413 new_region_route(1, &peers, 3),
414 new_region_route(2, &peers, 2),
415 new_region_route(3, &peers, 1),
416 ]
417 }
418
419 pub fn new_table_info() -> TableInfo {
420 TableInfo {
421 ident: TableIdent {
422 table_id: 42,
423 version: 1,
424 },
425 name: "my_table".to_string(),
426 desc: Some("blabla".to_string()),
427 catalog_name: "my_catalog".to_string(),
428 schema_name: "my_schema".to_string(),
429 meta: TableMeta {
430 schema: Arc::new(Schema::new(vec![
431 ColumnSchema::new(
432 "ts".to_string(),
433 ConcreteDataType::timestamp_millisecond_datatype(),
434 false,
435 ),
436 ColumnSchema::new(
437 "my_tag1".to_string(),
438 ConcreteDataType::string_datatype(),
439 true,
440 ),
441 ColumnSchema::new(
442 "my_tag2".to_string(),
443 ConcreteDataType::string_datatype(),
444 true,
445 ),
446 ColumnSchema::new(
447 "my_field_column".to_string(),
448 ConcreteDataType::int32_datatype(),
449 true,
450 ),
451 ])),
452 primary_key_indices: vec![1, 2],
453 value_indices: vec![2],
454 engine: MITO2_ENGINE.to_string(),
455 next_column_id: 3,
456 options: TableOptions::default(),
457 created_on: DateTime::default(),
458 updated_on: DateTime::default(),
459 partition_key_indices: vec![],
460 column_ids: vec![],
461 },
462 table_type: TableType::Base,
463 }
464 }
465
466 pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
467 let kv_backend = Arc::new(MemoryKvBackend::new());
468
469 let mailbox_sequence =
470 SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
471 let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
472 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
473 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
474 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
475 Arc::new(WalProvider::default()),
476 ));
477 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
478 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
479 Arc::new(SequenceBuilder::new("test", kv_backend).build()),
480 ));
481 DdlContext {
482 node_manager,
483 cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
484 mailbox,
485 MetasrvInfo {
486 server_addr: "127.0.0.1:4321".to_string(),
487 },
488 )),
489 table_metadata_manager,
490 table_metadata_allocator,
491 flow_metadata_manager,
492 flow_metadata_allocator,
493 memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
494 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
495 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
496 }
497 }
498}