1use std::time::Duration;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{
19 FlushErrorStrategy, FlushRegions, Instruction, InstructionError, InstructionReply,
20};
21use common_meta::peer::Peer;
22use common_telemetry::tracing_context::TracingContext;
23use common_telemetry::{info, warn};
24use snafu::ResultExt;
25use store_api::region_request::RegionFlushReason;
26use store_api::storage::RegionId;
27use tokio::time::Instant;
28
29use crate::error::{self, Error, Result};
30use crate::handler::HeartbeatMailbox;
31use crate::service::mailbox::{Channel, MailboxRef};
32
33pub(crate) enum ErrorStrategy {
34 Ignore,
35 Retry,
36}
37
38#[derive(Debug, Default)]
39struct FlushRegionErrors {
40 retryable: Vec<String>,
41 non_retryable: Vec<String>,
42}
43
44impl FlushRegionErrors {
45 fn is_empty(&self) -> bool {
46 self.retryable.is_empty() && self.non_retryable.is_empty()
47 }
48
49 fn format_all(&self) -> String {
50 self.retryable
51 .iter()
52 .chain(self.non_retryable.iter())
53 .cloned()
54 .collect::<Vec<_>>()
55 .join("; ")
56 }
57
58 fn format_non_retryable(&self) -> String {
59 self.non_retryable.join("; ")
60 }
61}
62
63pub(crate) fn instruction_to_error(error: &InstructionError, reason: String) -> Error {
64 if error.retry_hint.is_retryable() {
65 error::RetryLaterSnafu { reason }.build()
66 } else {
67 error::UnexpectedSnafu { violated: reason }.build()
68 }
69}
70
71pub(crate) fn instruction_error_result<T>(error: &InstructionError, reason: String) -> Result<T> {
72 Err(instruction_to_error(error, reason))
73}
74
75fn handle_flush_region_reply(
76 reply: &InstructionReply,
77 region_ids: &[RegionId],
78 msg: &MailboxMessage,
79) -> Result<(bool, Option<FlushRegionErrors>)> {
80 let result = match reply {
81 InstructionReply::FlushRegions(flush_reply) => {
82 if flush_reply.results.len() != region_ids.len() {
83 return error::UnexpectedInstructionReplySnafu {
84 mailbox_message: msg.to_string(),
85 reason: format!(
86 "expect {} region flush result, but got {}",
87 region_ids.len(),
88 flush_reply.results.len()
89 ),
90 }
91 .fail();
92 }
93
94 match flush_reply.overall_success {
95 true => (true, None),
96 false => {
97 let mut errors = FlushRegionErrors::default();
98 for (region_id, result) in &flush_reply.results {
99 let Err(error) = result else {
100 continue;
101 };
102 let message = format!("{}: {:?}", region_id, error);
103 if error.retry_hint.is_retryable() {
104 errors.retryable.push(message);
105 } else {
106 errors.non_retryable.push(message);
107 }
108 }
109 (false, (!errors.is_empty()).then_some(errors))
110 }
111 }
112 }
113 _ => {
114 return error::UnexpectedInstructionReplySnafu {
115 mailbox_message: msg.to_string(),
116 reason: "expect flush region reply",
117 }
118 .fail();
119 }
120 };
121
122 Ok(result)
123}
124
125pub(crate) async fn flush_region(
137 mailbox: &MailboxRef,
138 server_addr: &str,
139 region_ids: &[RegionId],
140 datanode: &Peer,
141 timeout: Duration,
142 error_strategy: ErrorStrategy,
143 reason: Option<RegionFlushReason>,
144) -> Result<()> {
145 let mut flush_regions =
146 FlushRegions::sync_batch(region_ids.to_vec(), FlushErrorStrategy::TryAll);
147 flush_regions.reason = reason;
148 let flush_instruction = Instruction::FlushRegions(flush_regions);
149
150 let tracing_ctx = TracingContext::from_current_span();
151 let msg = MailboxMessage::json_message(
152 &format!("Flush regions: {:?}", region_ids),
153 &format!("Metasrv@{}", server_addr),
154 &format!("Datanode-{}@{}", datanode.id, datanode.addr),
155 common_time::util::current_time_millis(),
156 &flush_instruction,
157 Some(tracing_ctx.to_w3c()),
158 )
159 .with_context(|_| error::SerializeToJsonSnafu {
160 input: flush_instruction.to_string(),
161 })?;
162
163 let ch = Channel::Datanode(datanode.id);
164 let now = Instant::now();
165 let receiver = mailbox.send(&ch, msg, timeout).await;
166 let receiver = match receiver {
167 Ok(receiver) => receiver,
168 Err(error::Error::PusherNotFound { .. }) => match error_strategy {
169 ErrorStrategy::Ignore => {
170 warn!(
171 "Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
172 region_ids, datanode
173 );
174 return Ok(());
175 }
176 ErrorStrategy::Retry => error::RetryLaterSnafu {
177 reason: format!(
178 "Pusher not found for flush regions on datanode {:?}, elapsed: {:?}",
179 datanode,
180 now.elapsed()
181 ),
182 }
183 .fail()?,
184 },
185 Err(err) => {
186 return Err(err);
187 }
188 };
189
190 match receiver.await {
191 Ok(msg) => {
192 let reply = HeartbeatMailbox::json_reply(&msg)?;
193 info!(
194 "Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}",
195 reply,
196 region_ids,
197 now.elapsed()
198 );
199 let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?;
200 if let Some(error) = error {
201 match error_strategy {
202 ErrorStrategy::Ignore => {
203 warn!(
204 "Failed to flush regions {:?}, the datanode({}) error is ignored: {}",
205 region_ids,
206 datanode,
207 error.format_all()
208 );
209 }
210 ErrorStrategy::Retry => {
211 if !error.non_retryable.is_empty() {
212 return error::UnexpectedSnafu {
213 violated: format!(
214 "Failed to flush regions {:?}, the datanode({}) reported non-retryable errors: {}",
215 region_ids,
216 datanode,
217 error.format_non_retryable(),
218 ),
219 }
220 .fail();
221 }
222 return error::RetryLaterSnafu {
223 reason: format!(
224 "Failed to flush regions {:?}, the datanode({}) error is retried: {}",
225 region_ids,
226 datanode,
227 error.format_all(),
228 ),
229 }
230 .fail()?;
231 }
232 }
233 } else if result {
234 info!(
235 "The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}",
236 region_ids,
237 datanode,
238 now.elapsed()
239 );
240 }
241
242 Ok(())
243 }
244 Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
245 operation: "Flush regions",
246 }
247 .fail(),
248 Err(error::Error::MailboxChannelClosed { .. }) => match error_strategy {
249 ErrorStrategy::Ignore => {
250 warn!(
251 "Failed to flush regions({:?}), the datanode({}) is unreachable(MailboxChannelClosed). Skip flush operation.",
252 region_ids, datanode
253 );
254 Ok(())
255 }
256 ErrorStrategy::Retry => error::RetryLaterSnafu {
257 reason: format!(
258 "Mailbox closed when sending flush region to datanode {:?}, elapsed: {:?}",
259 datanode,
260 now.elapsed()
261 ),
262 }
263 .fail()?,
264 },
265 Err(err) => Err(err),
266 }
267}
268
269#[cfg(any(test, feature = "mock"))]
270pub mod mock {
271 use std::io::Error;
272 use std::sync::Arc;
273
274 use api::v1::region::region_server::RegionServer;
275 use api::v1::region::{RegionResponse, region_request};
276 use api::v1::{ResponseHeader, Status as PbStatus};
277 use async_trait::async_trait;
278 use client::Client;
279 use common_grpc::channel_manager::ChannelManager;
280 use common_meta::peer::Peer;
281 use common_runtime::runtime::BuilderBuild;
282 use common_runtime::{Builder as RuntimeBuilder, Runtime};
283 use hyper_util::rt::TokioIo;
284 use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
285 use tokio::sync::mpsc;
286 use tonic::codec::CompressionEncoding;
287 use tonic::transport::Server;
288 use tower::service_fn;
289
290 #[derive(Clone)]
292 pub struct EchoRegionServer {
293 runtime: Runtime,
294 received_requests: mpsc::Sender<region_request::Body>,
295 }
296
297 impl EchoRegionServer {
298 pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
299 let (tx, rx) = mpsc::channel(10);
300 (
301 Self {
302 runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
303 received_requests: tx,
304 },
305 rx,
306 )
307 }
308
309 pub fn new_client(&self, datanode: &Peer) -> Client {
310 let (client, server) = tokio::io::duplex(1024);
311
312 let handler =
313 RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
314
315 tokio::spawn(async move {
316 Server::builder()
317 .add_service(
318 RegionServer::new(handler)
319 .accept_compressed(CompressionEncoding::Gzip)
320 .accept_compressed(CompressionEncoding::Zstd)
321 .send_compressed(CompressionEncoding::Gzip)
322 .send_compressed(CompressionEncoding::Zstd),
323 )
324 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
325 .await
326 });
327
328 let channel_manager = ChannelManager::new();
329 let mut client = Some(client);
330 channel_manager
331 .reset_with_connector(
332 datanode.addr.clone(),
333 service_fn(move |_| {
334 let client = client.take().unwrap();
335 async move { Ok::<_, Error>(TokioIo::new(client)) }
336 }),
337 )
338 .unwrap();
339 Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
340 }
341 }
342
343 #[async_trait]
344 impl RegionServerHandler for EchoRegionServer {
345 async fn handle(
346 &self,
347 request: region_request::Body,
348 ) -> servers::error::Result<RegionResponse> {
349 self.received_requests.send(request).await.unwrap();
350
351 Ok(RegionResponse {
352 header: Some(ResponseHeader {
353 status: Some(PbStatus {
354 status_code: 0,
355 err_msg: String::default(),
356 }),
357 }),
358 affected_rows: 0,
359 extensions: Default::default(),
360 metadata: Vec::new(),
361 })
362 }
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use std::sync::Arc;
369
370 use common_meta::instruction::{FlushStrategy, Instruction};
371 use common_meta::kv_backend::memory::MemoryKvBackend;
372 use common_meta::sequence::SequenceBuilder;
373
374 use super::*;
375 use crate::procedure::test_util::{MailboxContext, new_flush_region_reply_for_region};
376
377 #[tokio::test]
378 async fn test_flush_region_payload_includes_reason() {
379 let kv_backend = Arc::new(MemoryKvBackend::new());
380 let mailbox_sequence = SequenceBuilder::new("test_flush_region_reason", kv_backend).build();
381 let mut mailbox_ctx = MailboxContext::new(mailbox_sequence);
382
383 let datanode = Peer::new(1, "127.0.0.1:4001");
384 let region_id = RegionId::new(1024, 1);
385 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
386 mailbox_ctx
387 .insert_heartbeat_response_receiver(Channel::Datanode(datanode.id), tx)
388 .await;
389
390 let mailbox = mailbox_ctx.mailbox().clone();
391 let reply_mailbox = mailbox.clone();
392 let reply_task = tokio::spawn(async move {
393 let response = rx.recv().await.unwrap().unwrap();
394 let msg = response.mailbox_message.unwrap();
395 let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
396 let Instruction::FlushRegions(flush_regions) = instruction else {
397 panic!("Expected FlushRegions instruction");
398 };
399
400 assert_eq!(flush_regions.region_ids, vec![region_id]);
401 assert_eq!(flush_regions.strategy, FlushStrategy::Sync);
402 assert_eq!(flush_regions.reason, Some(RegionFlushReason::Repartition));
403
404 reply_mailbox
405 .on_recv(
406 msg.id,
407 Ok(new_flush_region_reply_for_region(
408 msg.id, region_id, true, None,
409 )),
410 )
411 .await
412 .unwrap();
413 });
414
415 flush_region(
416 &mailbox,
417 "127.0.0.1:3002",
418 &[region_id],
419 &datanode,
420 Duration::from_secs(5),
421 ErrorStrategy::Retry,
422 Some(RegionFlushReason::Repartition),
423 )
424 .await
425 .unwrap();
426 reply_task.await.unwrap();
427 }
428}
429
430#[cfg(test)]
431pub mod test_data {
432 use std::sync::Arc;
433
434 use chrono::DateTime;
435 use common_catalog::consts::MITO2_ENGINE;
436 use common_meta::ddl::flow_meta::FlowMetadataAllocator;
437 use common_meta::ddl::table_meta::TableMetadataAllocator;
438 use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
439 use common_meta::key::TableMetadataManager;
440 use common_meta::key::flow::FlowMetadataManager;
441 use common_meta::kv_backend::memory::MemoryKvBackend;
442 use common_meta::node_manager::NodeManagerRef;
443 use common_meta::peer::Peer;
444 use common_meta::region_keeper::MemoryRegionKeeper;
445 use common_meta::region_registry::LeaderRegionRegistry;
446 use common_meta::rpc::router::RegionRoute;
447 use common_meta::sequence::SequenceBuilder;
448 use common_meta::wal_provider::WalProvider;
449 use datatypes::prelude::ConcreteDataType;
450 use datatypes::schema::{ColumnSchema, Schema};
451 use table::metadata::{TableIdent, TableInfo, TableMeta, TableType};
452 use table::requests::TableOptions;
453
454 use crate::cache_invalidator::MetasrvCacheInvalidator;
455 use crate::handler::{HeartbeatMailbox, Pushers};
456 use crate::metasrv::MetasrvInfo;
457 use crate::test_util::new_region_route;
458
459 pub fn new_region_routes() -> Vec<RegionRoute> {
460 let peers = vec![
461 Peer::new(1, "127.0.0.1:4001"),
462 Peer::new(2, "127.0.0.1:4002"),
463 Peer::new(3, "127.0.0.1:4003"),
464 ];
465 vec![
466 new_region_route(1, &peers, 3),
467 new_region_route(2, &peers, 2),
468 new_region_route(3, &peers, 1),
469 ]
470 }
471
472 pub fn new_table_info() -> TableInfo {
473 TableInfo {
474 ident: TableIdent {
475 table_id: 42,
476 version: 1,
477 },
478 name: "my_table".to_string(),
479 desc: Some("blabla".to_string()),
480 catalog_name: "my_catalog".to_string(),
481 schema_name: "my_schema".to_string(),
482 meta: TableMeta {
483 schema: Arc::new(Schema::new(vec![
484 ColumnSchema::new(
485 "ts".to_string(),
486 ConcreteDataType::timestamp_millisecond_datatype(),
487 false,
488 ),
489 ColumnSchema::new(
490 "my_tag1".to_string(),
491 ConcreteDataType::string_datatype(),
492 true,
493 ),
494 ColumnSchema::new(
495 "my_tag2".to_string(),
496 ConcreteDataType::string_datatype(),
497 true,
498 ),
499 ColumnSchema::new(
500 "my_field_column".to_string(),
501 ConcreteDataType::int32_datatype(),
502 true,
503 ),
504 ])),
505 primary_key_indices: vec![1, 2],
506 value_indices: vec![2],
507 engine: MITO2_ENGINE.to_string(),
508 next_column_id: 3,
509 options: TableOptions::default(),
510 created_on: DateTime::default(),
511 updated_on: DateTime::default(),
512 partition_key_indices: vec![],
513 column_ids: vec![],
514 },
515 table_type: TableType::Base,
516 }
517 }
518
519 pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
520 let kv_backend = Arc::new(MemoryKvBackend::new());
521
522 let mailbox_sequence =
523 SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
524 let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
525 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
526 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
527 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
528 Arc::new(WalProvider::default()),
529 ));
530 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
531 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
532 Arc::new(SequenceBuilder::new("test", kv_backend).build()),
533 ));
534 DdlContext {
535 node_manager,
536 cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
537 mailbox,
538 MetasrvInfo {
539 server_addr: "127.0.0.1:4321".to_string(),
540 },
541 )),
542 table_metadata_manager,
543 table_metadata_allocator,
544 flow_metadata_manager,
545 flow_metadata_allocator,
546 memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
547 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
548 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
549 }
550 }
551}