datanode/heartbeat/handler/
flush_region.rs1use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply};
16use common_telemetry::warn;
17use futures_util::future::BoxFuture;
18use store_api::region_request::{RegionFlushRequest, RegionRequest};
19use store_api::storage::RegionId;
20
21use crate::error;
22use crate::heartbeat::handler::HandlerContext;
23
24impl HandlerContext {
25 pub(crate) fn handle_flush_regions_instruction(
26 self,
27 flush_regions: FlushRegions,
28 ) -> BoxFuture<'static, Option<InstructionReply>> {
29 Box::pin(async move {
30 for region_id in flush_regions.region_ids {
31 let request = RegionRequest::Flush(RegionFlushRequest {
32 row_group_size: None,
33 });
34 let result = self.region_server.handle_request(region_id, request).await;
35
36 match result {
37 Ok(_) => {}
38 Err(error::Error::RegionNotFound { .. }) => {
39 warn!("Received a flush region instruction from meta, but target region: {region_id} is not found.");
40 }
41 Err(err) => {
42 warn!(
43 "Failed to flush region: {region_id}, error: {err}",
44 region_id = region_id,
45 err = err,
46 );
47 }
48 }
49 }
50 None
51 })
52 }
53
54 pub(crate) fn handle_flush_region_instruction(
55 self,
56 region_id: RegionId,
57 ) -> BoxFuture<'static, Option<InstructionReply>> {
58 Box::pin(async move {
59 let Some(writable) = self.region_server.is_region_leader(region_id) else {
60 return Some(InstructionReply::FlushRegion(SimpleReply {
61 result: false,
62 error: Some("Region is not leader".to_string()),
63 }));
64 };
65
66 if !writable {
67 return Some(InstructionReply::FlushRegion(SimpleReply {
68 result: false,
69 error: Some("Region is not writable".to_string()),
70 }));
71 }
72
73 let region_server_moved = self.region_server.clone();
74 let register_result = self
75 .flush_tasks
76 .try_register(
77 region_id,
78 Box::pin(async move {
79 let request = RegionRequest::Flush(RegionFlushRequest {
80 row_group_size: None,
81 });
82 region_server_moved
83 .handle_request(region_id, request)
84 .await?;
85 Ok(())
86 }),
87 )
88 .await;
89 if register_result.is_busy() {
90 warn!("Another flush task is running for the region: {region_id}");
91 }
92 let mut watcher = register_result.into_watcher();
93 let result = self.flush_tasks.wait_until_finish(&mut watcher).await;
94 match result {
95 Ok(()) => Some(InstructionReply::FlushRegion(SimpleReply {
96 result: true,
97 error: None,
98 })),
99 Err(err) => Some(InstructionReply::FlushRegion(SimpleReply {
100 result: false,
101 error: Some(format!("{err:?}")),
102 })),
103 }
104 })
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use std::sync::{Arc, RwLock};
111
112 use common_meta::instruction::FlushRegions;
113 use mito2::engine::MITO_ENGINE_NAME;
114 use store_api::storage::RegionId;
115
116 use super::*;
117 use crate::tests::{mock_region_server, MockRegionEngine};
118
119 #[tokio::test]
120 async fn test_handle_flush_region_instruction() {
121 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
122
123 let mock_region_server = mock_region_server();
124 let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
125 for region_id in ®ion_ids {
126 let flushed_region_ids_ref = flushed_region_ids.clone();
127 let (mock_engine, _) =
128 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
129 region_engine.handle_request_mock_fn =
130 Some(Box::new(move |region_id, _request| {
131 flushed_region_ids_ref.write().unwrap().push(region_id);
132 Ok(0)
133 }))
134 });
135 mock_region_server.register_test_region(*region_id, mock_engine);
136 }
137 let handler_context = HandlerContext::new_for_test(mock_region_server);
138
139 let reply = handler_context
140 .clone()
141 .handle_flush_regions_instruction(FlushRegions {
142 region_ids: region_ids.clone(),
143 })
144 .await;
145 assert!(reply.is_none());
146 assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
147
148 flushed_region_ids.write().unwrap().clear();
149 let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
150 let reply = handler_context
151 .handle_flush_regions_instruction(FlushRegions {
152 region_ids: not_found_region_ids.clone(),
153 })
154 .await;
155 assert!(reply.is_none());
156 assert!(flushed_region_ids.read().unwrap().is_empty());
157 }
158}