1use std::time::Duration;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
19use common_meta::peer::Peer;
20use common_telemetry::{info, warn};
21use snafu::ResultExt;
22use store_api::storage::RegionId;
23use tokio::time::Instant;
24
25use crate::error::{self, Error, Result};
26use crate::handler::HeartbeatMailbox;
27use crate::service::mailbox::{Channel, MailboxRef};
28
29pub(crate) enum ErrorStrategy {
30 Ignore,
31 Retry,
32}
33
34fn handle_flush_region_reply(
35 reply: &InstructionReply,
36 region_ids: &[RegionId],
37 msg: &MailboxMessage,
38) -> Result<(bool, Option<String>)> {
39 let result = match reply {
40 InstructionReply::FlushRegions(flush_reply) => {
41 if flush_reply.results.len() != region_ids.len() {
42 return error::UnexpectedInstructionReplySnafu {
43 mailbox_message: msg.to_string(),
44 reason: format!(
45 "expect {} region flush result, but got {}",
46 region_ids.len(),
47 flush_reply.results.len()
48 ),
49 }
50 .fail();
51 }
52
53 match flush_reply.overall_success {
54 true => (true, None),
55 false => (
56 false,
57 Some(
58 flush_reply
59 .results
60 .iter()
61 .filter_map(|(region_id, result)| match result {
62 Ok(_) => None,
63 Err(e) => Some(format!("{}: {:?}", region_id, e)),
64 })
65 .collect::<Vec<String>>()
66 .join("; "),
67 ),
68 ),
69 }
70 }
71 _ => {
72 return error::UnexpectedInstructionReplySnafu {
73 mailbox_message: msg.to_string(),
74 reason: "expect flush region reply",
75 }
76 .fail();
77 }
78 };
79
80 Ok(result)
81}
82
83pub(crate) async fn flush_region(
95 mailbox: &MailboxRef,
96 server_addr: &str,
97 region_ids: &[RegionId],
98 datanode: &Peer,
99 timeout: Duration,
100 error_strategy: ErrorStrategy,
101) -> Result<()> {
102 let flush_instruction = Instruction::FlushRegions(FlushRegions::sync_batch(
103 region_ids.to_vec(),
104 FlushErrorStrategy::TryAll,
105 ));
106
107 let msg = MailboxMessage::json_message(
108 &format!("Flush regions: {:?}", region_ids),
109 &format!("Metasrv@{}", server_addr),
110 &format!("Datanode-{}@{}", datanode.id, datanode.addr),
111 common_time::util::current_time_millis(),
112 &flush_instruction,
113 )
114 .with_context(|_| error::SerializeToJsonSnafu {
115 input: flush_instruction.to_string(),
116 })?;
117
118 let ch = Channel::Datanode(datanode.id);
119 let now = Instant::now();
120 let receiver = mailbox.send(&ch, msg, timeout).await;
121 let receiver = match receiver {
122 Ok(receiver) => receiver,
123 Err(error::Error::PusherNotFound { .. }) => match error_strategy {
124 ErrorStrategy::Ignore => {
125 warn!(
126 "Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
127 region_ids, datanode
128 );
129 return Ok(());
130 }
131 ErrorStrategy::Retry => error::RetryLaterSnafu {
132 reason: format!(
133 "Pusher not found for flush regions on datanode {:?}, elapsed: {:?}",
134 datanode,
135 now.elapsed()
136 ),
137 }
138 .fail()?,
139 },
140 Err(err) => {
141 return Err(err);
142 }
143 };
144
145 match receiver.await {
146 Ok(msg) => {
147 let reply = HeartbeatMailbox::json_reply(&msg)?;
148 info!(
149 "Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}",
150 reply,
151 region_ids,
152 now.elapsed()
153 );
154 let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?;
155 if let Some(error) = error {
156 match error_strategy {
157 ErrorStrategy::Ignore => {
158 warn!(
159 "Failed to flush regions {:?}, the datanode({}) error is ignored: {}",
160 region_ids, datanode, error
161 );
162 }
163 ErrorStrategy::Retry => {
164 return error::RetryLaterSnafu {
165 reason: format!(
166 "Failed to flush regions {:?}, the datanode({}) error is retried: {}",
167 region_ids,
168 datanode,
169 error,
170 ),
171 }
172 .fail()?;
173 }
174 }
175 } else if result {
176 info!(
177 "The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}",
178 region_ids,
179 datanode,
180 now.elapsed()
181 );
182 }
183
184 Ok(())
185 }
186 Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
187 operation: "Flush regions",
188 }
189 .fail(),
190 Err(err) => Err(err),
191 }
192}
193
194#[cfg(any(test, feature = "mock"))]
195pub mod mock {
196 use std::io::Error;
197 use std::sync::Arc;
198
199 use api::v1::region::region_server::RegionServer;
200 use api::v1::region::{RegionResponse, region_request};
201 use api::v1::{ResponseHeader, Status as PbStatus};
202 use async_trait::async_trait;
203 use client::Client;
204 use common_grpc::channel_manager::ChannelManager;
205 use common_meta::peer::Peer;
206 use common_runtime::runtime::BuilderBuild;
207 use common_runtime::{Builder as RuntimeBuilder, Runtime};
208 use hyper_util::rt::TokioIo;
209 use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
210 use tokio::sync::mpsc;
211 use tonic::codec::CompressionEncoding;
212 use tonic::transport::Server;
213 use tower::service_fn;
214
215 #[derive(Clone)]
217 pub struct EchoRegionServer {
218 runtime: Runtime,
219 received_requests: mpsc::Sender<region_request::Body>,
220 }
221
222 impl EchoRegionServer {
223 pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
224 let (tx, rx) = mpsc::channel(10);
225 (
226 Self {
227 runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
228 received_requests: tx,
229 },
230 rx,
231 )
232 }
233
234 pub fn new_client(&self, datanode: &Peer) -> Client {
235 let (client, server) = tokio::io::duplex(1024);
236
237 let handler =
238 RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
239
240 tokio::spawn(async move {
241 Server::builder()
242 .add_service(
243 RegionServer::new(handler)
244 .accept_compressed(CompressionEncoding::Gzip)
245 .accept_compressed(CompressionEncoding::Zstd)
246 .send_compressed(CompressionEncoding::Gzip)
247 .send_compressed(CompressionEncoding::Zstd),
248 )
249 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
250 .await
251 });
252
253 let channel_manager = ChannelManager::new();
254 let mut client = Some(client);
255 channel_manager
256 .reset_with_connector(
257 datanode.addr.clone(),
258 service_fn(move |_| {
259 let client = client.take().unwrap();
260 async move { Ok::<_, Error>(TokioIo::new(client)) }
261 }),
262 )
263 .unwrap();
264 Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
265 }
266 }
267
268 #[async_trait]
269 impl RegionServerHandler for EchoRegionServer {
270 async fn handle(
271 &self,
272 request: region_request::Body,
273 ) -> servers::error::Result<RegionResponse> {
274 self.received_requests.send(request).await.unwrap();
275
276 Ok(RegionResponse {
277 header: Some(ResponseHeader {
278 status: Some(PbStatus {
279 status_code: 0,
280 err_msg: String::default(),
281 }),
282 }),
283 affected_rows: 0,
284 extensions: Default::default(),
285 metadata: Vec::new(),
286 })
287 }
288 }
289}
290
291#[cfg(test)]
292pub mod test_data {
293 use std::sync::Arc;
294
295 use chrono::DateTime;
296 use common_catalog::consts::MITO2_ENGINE;
297 use common_meta::ddl::flow_meta::FlowMetadataAllocator;
298 use common_meta::ddl::table_meta::TableMetadataAllocator;
299 use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
300 use common_meta::key::TableMetadataManager;
301 use common_meta::key::flow::FlowMetadataManager;
302 use common_meta::kv_backend::memory::MemoryKvBackend;
303 use common_meta::node_manager::NodeManagerRef;
304 use common_meta::peer::Peer;
305 use common_meta::region_keeper::MemoryRegionKeeper;
306 use common_meta::region_registry::LeaderRegionRegistry;
307 use common_meta::rpc::router::RegionRoute;
308 use common_meta::sequence::SequenceBuilder;
309 use common_meta::wal_provider::WalProvider;
310 use datatypes::prelude::ConcreteDataType;
311 use datatypes::schema::{ColumnSchema, RawSchema};
312 use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
313 use table::requests::TableOptions;
314
315 use crate::cache_invalidator::MetasrvCacheInvalidator;
316 use crate::handler::{HeartbeatMailbox, Pushers};
317 use crate::metasrv::MetasrvInfo;
318 use crate::test_util::new_region_route;
319
320 pub fn new_region_routes() -> Vec<RegionRoute> {
321 let peers = vec![
322 Peer::new(1, "127.0.0.1:4001"),
323 Peer::new(2, "127.0.0.1:4002"),
324 Peer::new(3, "127.0.0.1:4003"),
325 ];
326 vec![
327 new_region_route(1, &peers, 3),
328 new_region_route(2, &peers, 2),
329 new_region_route(3, &peers, 1),
330 ]
331 }
332
333 pub fn new_table_info() -> RawTableInfo {
334 RawTableInfo {
335 ident: TableIdent {
336 table_id: 42,
337 version: 1,
338 },
339 name: "my_table".to_string(),
340 desc: Some("blabla".to_string()),
341 catalog_name: "my_catalog".to_string(),
342 schema_name: "my_schema".to_string(),
343 meta: RawTableMeta {
344 schema: RawSchema {
345 column_schemas: vec![
346 ColumnSchema::new(
347 "ts".to_string(),
348 ConcreteDataType::timestamp_millisecond_datatype(),
349 false,
350 ),
351 ColumnSchema::new(
352 "my_tag1".to_string(),
353 ConcreteDataType::string_datatype(),
354 true,
355 ),
356 ColumnSchema::new(
357 "my_tag2".to_string(),
358 ConcreteDataType::string_datatype(),
359 true,
360 ),
361 ColumnSchema::new(
362 "my_field_column".to_string(),
363 ConcreteDataType::int32_datatype(),
364 true,
365 ),
366 ],
367 timestamp_index: Some(0),
368 version: 0,
369 },
370 primary_key_indices: vec![1, 2],
371 value_indices: vec![2],
372 engine: MITO2_ENGINE.to_string(),
373 next_column_id: 3,
374 options: TableOptions::default(),
375 created_on: DateTime::default(),
376 updated_on: DateTime::default(),
377 partition_key_indices: vec![],
378 column_ids: vec![],
379 },
380 table_type: TableType::Base,
381 }
382 }
383
384 pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
385 let kv_backend = Arc::new(MemoryKvBackend::new());
386
387 let mailbox_sequence =
388 SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
389 let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
390 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
391 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
392 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
393 Arc::new(WalProvider::default()),
394 ));
395 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
396 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
397 Arc::new(SequenceBuilder::new("test", kv_backend).build()),
398 ));
399 DdlContext {
400 node_manager,
401 cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
402 mailbox,
403 MetasrvInfo {
404 server_addr: "127.0.0.1:4321".to_string(),
405 },
406 )),
407 table_metadata_manager,
408 table_metadata_allocator,
409 flow_metadata_manager,
410 flow_metadata_allocator,
411 memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
412 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
413 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
414 }
415 }
416}