1use std::time::Duration;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
19use common_meta::peer::Peer;
20use common_telemetry::tracing_context::TracingContext;
21use common_telemetry::{info, warn};
22use snafu::ResultExt;
23use store_api::storage::RegionId;
24use tokio::time::Instant;
25
26use crate::error::{self, Error, Result};
27use crate::handler::HeartbeatMailbox;
28use crate::service::mailbox::{Channel, MailboxRef};
29
30pub(crate) enum ErrorStrategy {
31 Ignore,
32 Retry,
33}
34
35fn handle_flush_region_reply(
36 reply: &InstructionReply,
37 region_ids: &[RegionId],
38 msg: &MailboxMessage,
39) -> Result<(bool, Option<String>)> {
40 let result = match reply {
41 InstructionReply::FlushRegions(flush_reply) => {
42 if flush_reply.results.len() != region_ids.len() {
43 return error::UnexpectedInstructionReplySnafu {
44 mailbox_message: msg.to_string(),
45 reason: format!(
46 "expect {} region flush result, but got {}",
47 region_ids.len(),
48 flush_reply.results.len()
49 ),
50 }
51 .fail();
52 }
53
54 match flush_reply.overall_success {
55 true => (true, None),
56 false => (
57 false,
58 Some(
59 flush_reply
60 .results
61 .iter()
62 .filter_map(|(region_id, result)| match result {
63 Ok(_) => None,
64 Err(e) => Some(format!("{}: {:?}", region_id, e)),
65 })
66 .collect::<Vec<String>>()
67 .join("; "),
68 ),
69 ),
70 }
71 }
72 _ => {
73 return error::UnexpectedInstructionReplySnafu {
74 mailbox_message: msg.to_string(),
75 reason: "expect flush region reply",
76 }
77 .fail();
78 }
79 };
80
81 Ok(result)
82}
83
84pub(crate) async fn flush_region(
96 mailbox: &MailboxRef,
97 server_addr: &str,
98 region_ids: &[RegionId],
99 datanode: &Peer,
100 timeout: Duration,
101 error_strategy: ErrorStrategy,
102) -> Result<()> {
103 let flush_instruction = Instruction::FlushRegions(FlushRegions::sync_batch(
104 region_ids.to_vec(),
105 FlushErrorStrategy::TryAll,
106 ));
107
108 let tracing_ctx = TracingContext::from_current_span();
109 let msg = MailboxMessage::json_message(
110 &format!("Flush regions: {:?}", region_ids),
111 &format!("Metasrv@{}", server_addr),
112 &format!("Datanode-{}@{}", datanode.id, datanode.addr),
113 common_time::util::current_time_millis(),
114 &flush_instruction,
115 Some(tracing_ctx.to_w3c()),
116 )
117 .with_context(|_| error::SerializeToJsonSnafu {
118 input: flush_instruction.to_string(),
119 })?;
120
121 let ch = Channel::Datanode(datanode.id);
122 let now = Instant::now();
123 let receiver = mailbox.send(&ch, msg, timeout).await;
124 let receiver = match receiver {
125 Ok(receiver) => receiver,
126 Err(error::Error::PusherNotFound { .. }) => match error_strategy {
127 ErrorStrategy::Ignore => {
128 warn!(
129 "Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
130 region_ids, datanode
131 );
132 return Ok(());
133 }
134 ErrorStrategy::Retry => error::RetryLaterSnafu {
135 reason: format!(
136 "Pusher not found for flush regions on datanode {:?}, elapsed: {:?}",
137 datanode,
138 now.elapsed()
139 ),
140 }
141 .fail()?,
142 },
143 Err(err) => {
144 return Err(err);
145 }
146 };
147
148 match receiver.await {
149 Ok(msg) => {
150 let reply = HeartbeatMailbox::json_reply(&msg)?;
151 info!(
152 "Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}",
153 reply,
154 region_ids,
155 now.elapsed()
156 );
157 let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?;
158 if let Some(error) = error {
159 match error_strategy {
160 ErrorStrategy::Ignore => {
161 warn!(
162 "Failed to flush regions {:?}, the datanode({}) error is ignored: {}",
163 region_ids, datanode, error
164 );
165 }
166 ErrorStrategy::Retry => {
167 return error::RetryLaterSnafu {
168 reason: format!(
169 "Failed to flush regions {:?}, the datanode({}) error is retried: {}",
170 region_ids,
171 datanode,
172 error,
173 ),
174 }
175 .fail()?;
176 }
177 }
178 } else if result {
179 info!(
180 "The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}",
181 region_ids,
182 datanode,
183 now.elapsed()
184 );
185 }
186
187 Ok(())
188 }
189 Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
190 operation: "Flush regions",
191 }
192 .fail(),
193 Err(error::Error::MailboxChannelClosed { .. }) => match error_strategy {
194 ErrorStrategy::Ignore => {
195 warn!(
196 "Failed to flush regions({:?}), the datanode({}) is unreachable(MailboxChannelClosed). Skip flush operation.",
197 region_ids, datanode
198 );
199 Ok(())
200 }
201 ErrorStrategy::Retry => error::RetryLaterSnafu {
202 reason: format!(
203 "Mailbox closed when sending flush region to datanode {:?}, elapsed: {:?}",
204 datanode,
205 now.elapsed()
206 ),
207 }
208 .fail()?,
209 },
210 Err(err) => Err(err),
211 }
212}
213
214#[cfg(any(test, feature = "mock"))]
215pub mod mock {
216 use std::io::Error;
217 use std::sync::Arc;
218
219 use api::v1::region::region_server::RegionServer;
220 use api::v1::region::{RegionResponse, region_request};
221 use api::v1::{ResponseHeader, Status as PbStatus};
222 use async_trait::async_trait;
223 use client::Client;
224 use common_grpc::channel_manager::ChannelManager;
225 use common_meta::peer::Peer;
226 use common_runtime::runtime::BuilderBuild;
227 use common_runtime::{Builder as RuntimeBuilder, Runtime};
228 use hyper_util::rt::TokioIo;
229 use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
230 use tokio::sync::mpsc;
231 use tonic::codec::CompressionEncoding;
232 use tonic::transport::Server;
233 use tower::service_fn;
234
235 #[derive(Clone)]
237 pub struct EchoRegionServer {
238 runtime: Runtime,
239 received_requests: mpsc::Sender<region_request::Body>,
240 }
241
242 impl EchoRegionServer {
243 pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
244 let (tx, rx) = mpsc::channel(10);
245 (
246 Self {
247 runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
248 received_requests: tx,
249 },
250 rx,
251 )
252 }
253
254 pub fn new_client(&self, datanode: &Peer) -> Client {
255 let (client, server) = tokio::io::duplex(1024);
256
257 let handler =
258 RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
259
260 tokio::spawn(async move {
261 Server::builder()
262 .add_service(
263 RegionServer::new(handler)
264 .accept_compressed(CompressionEncoding::Gzip)
265 .accept_compressed(CompressionEncoding::Zstd)
266 .send_compressed(CompressionEncoding::Gzip)
267 .send_compressed(CompressionEncoding::Zstd),
268 )
269 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
270 .await
271 });
272
273 let channel_manager = ChannelManager::new();
274 let mut client = Some(client);
275 channel_manager
276 .reset_with_connector(
277 datanode.addr.clone(),
278 service_fn(move |_| {
279 let client = client.take().unwrap();
280 async move { Ok::<_, Error>(TokioIo::new(client)) }
281 }),
282 )
283 .unwrap();
284 Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
285 }
286 }
287
288 #[async_trait]
289 impl RegionServerHandler for EchoRegionServer {
290 async fn handle(
291 &self,
292 request: region_request::Body,
293 ) -> servers::error::Result<RegionResponse> {
294 self.received_requests.send(request).await.unwrap();
295
296 Ok(RegionResponse {
297 header: Some(ResponseHeader {
298 status: Some(PbStatus {
299 status_code: 0,
300 err_msg: String::default(),
301 }),
302 }),
303 affected_rows: 0,
304 extensions: Default::default(),
305 metadata: Vec::new(),
306 })
307 }
308 }
309}
310
311#[cfg(test)]
312pub mod test_data {
313 use std::sync::Arc;
314
315 use chrono::DateTime;
316 use common_catalog::consts::MITO2_ENGINE;
317 use common_meta::ddl::flow_meta::FlowMetadataAllocator;
318 use common_meta::ddl::table_meta::TableMetadataAllocator;
319 use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
320 use common_meta::key::TableMetadataManager;
321 use common_meta::key::flow::FlowMetadataManager;
322 use common_meta::kv_backend::memory::MemoryKvBackend;
323 use common_meta::node_manager::NodeManagerRef;
324 use common_meta::peer::Peer;
325 use common_meta::region_keeper::MemoryRegionKeeper;
326 use common_meta::region_registry::LeaderRegionRegistry;
327 use common_meta::rpc::router::RegionRoute;
328 use common_meta::sequence::SequenceBuilder;
329 use common_meta::wal_provider::WalProvider;
330 use datatypes::prelude::ConcreteDataType;
331 use datatypes::schema::{ColumnSchema, Schema};
332 use table::metadata::{TableIdent, TableInfo, TableMeta, TableType};
333 use table::requests::TableOptions;
334
335 use crate::cache_invalidator::MetasrvCacheInvalidator;
336 use crate::handler::{HeartbeatMailbox, Pushers};
337 use crate::metasrv::MetasrvInfo;
338 use crate::test_util::new_region_route;
339
340 pub fn new_region_routes() -> Vec<RegionRoute> {
341 let peers = vec![
342 Peer::new(1, "127.0.0.1:4001"),
343 Peer::new(2, "127.0.0.1:4002"),
344 Peer::new(3, "127.0.0.1:4003"),
345 ];
346 vec![
347 new_region_route(1, &peers, 3),
348 new_region_route(2, &peers, 2),
349 new_region_route(3, &peers, 1),
350 ]
351 }
352
353 pub fn new_table_info() -> TableInfo {
354 TableInfo {
355 ident: TableIdent {
356 table_id: 42,
357 version: 1,
358 },
359 name: "my_table".to_string(),
360 desc: Some("blabla".to_string()),
361 catalog_name: "my_catalog".to_string(),
362 schema_name: "my_schema".to_string(),
363 meta: TableMeta {
364 schema: Arc::new(Schema::new(vec![
365 ColumnSchema::new(
366 "ts".to_string(),
367 ConcreteDataType::timestamp_millisecond_datatype(),
368 false,
369 ),
370 ColumnSchema::new(
371 "my_tag1".to_string(),
372 ConcreteDataType::string_datatype(),
373 true,
374 ),
375 ColumnSchema::new(
376 "my_tag2".to_string(),
377 ConcreteDataType::string_datatype(),
378 true,
379 ),
380 ColumnSchema::new(
381 "my_field_column".to_string(),
382 ConcreteDataType::int32_datatype(),
383 true,
384 ),
385 ])),
386 primary_key_indices: vec![1, 2],
387 value_indices: vec![2],
388 engine: MITO2_ENGINE.to_string(),
389 next_column_id: 3,
390 options: TableOptions::default(),
391 created_on: DateTime::default(),
392 updated_on: DateTime::default(),
393 partition_key_indices: vec![],
394 column_ids: vec![],
395 },
396 table_type: TableType::Base,
397 }
398 }
399
400 pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
401 let kv_backend = Arc::new(MemoryKvBackend::new());
402
403 let mailbox_sequence =
404 SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
405 let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
406 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
407 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
408 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
409 Arc::new(WalProvider::default()),
410 ));
411 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
412 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
413 Arc::new(SequenceBuilder::new("test", kv_backend).build()),
414 ));
415 DdlContext {
416 node_manager,
417 cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
418 mailbox,
419 MetasrvInfo {
420 server_addr: "127.0.0.1:4321".to_string(),
421 },
422 )),
423 table_metadata_manager,
424 table_metadata_allocator,
425 flow_metadata_manager,
426 flow_metadata_allocator,
427 memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
428 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
429 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
430 }
431 }
432}