1use std::time::Duration;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
19use common_meta::peer::Peer;
20use common_telemetry::tracing_context::TracingContext;
21use common_telemetry::{info, warn};
22use snafu::ResultExt;
23use store_api::storage::RegionId;
24use tokio::time::Instant;
25
26use crate::error::{self, Error, Result};
27use crate::handler::HeartbeatMailbox;
28use crate::service::mailbox::{Channel, MailboxRef};
29
30pub(crate) enum ErrorStrategy {
31 Ignore,
32 Retry,
33}
34
35fn handle_flush_region_reply(
36 reply: &InstructionReply,
37 region_ids: &[RegionId],
38 msg: &MailboxMessage,
39) -> Result<(bool, Option<String>)> {
40 let result = match reply {
41 InstructionReply::FlushRegions(flush_reply) => {
42 if flush_reply.results.len() != region_ids.len() {
43 return error::UnexpectedInstructionReplySnafu {
44 mailbox_message: msg.to_string(),
45 reason: format!(
46 "expect {} region flush result, but got {}",
47 region_ids.len(),
48 flush_reply.results.len()
49 ),
50 }
51 .fail();
52 }
53
54 match flush_reply.overall_success {
55 true => (true, None),
56 false => (
57 false,
58 Some(
59 flush_reply
60 .results
61 .iter()
62 .filter_map(|(region_id, result)| match result {
63 Ok(_) => None,
64 Err(e) => Some(format!("{}: {:?}", region_id, e)),
65 })
66 .collect::<Vec<String>>()
67 .join("; "),
68 ),
69 ),
70 }
71 }
72 _ => {
73 return error::UnexpectedInstructionReplySnafu {
74 mailbox_message: msg.to_string(),
75 reason: "expect flush region reply",
76 }
77 .fail();
78 }
79 };
80
81 Ok(result)
82}
83
84pub(crate) async fn flush_region(
96 mailbox: &MailboxRef,
97 server_addr: &str,
98 region_ids: &[RegionId],
99 datanode: &Peer,
100 timeout: Duration,
101 error_strategy: ErrorStrategy,
102) -> Result<()> {
103 let flush_instruction = Instruction::FlushRegions(FlushRegions::sync_batch(
104 region_ids.to_vec(),
105 FlushErrorStrategy::TryAll,
106 ));
107
108 let tracing_ctx = TracingContext::from_current_span();
109 let msg = MailboxMessage::json_message(
110 &format!("Flush regions: {:?}", region_ids),
111 &format!("Metasrv@{}", server_addr),
112 &format!("Datanode-{}@{}", datanode.id, datanode.addr),
113 common_time::util::current_time_millis(),
114 &flush_instruction,
115 Some(tracing_ctx.to_w3c()),
116 )
117 .with_context(|_| error::SerializeToJsonSnafu {
118 input: flush_instruction.to_string(),
119 })?;
120
121 let ch = Channel::Datanode(datanode.id);
122 let now = Instant::now();
123 let receiver = mailbox.send(&ch, msg, timeout).await;
124 let receiver = match receiver {
125 Ok(receiver) => receiver,
126 Err(error::Error::PusherNotFound { .. }) => match error_strategy {
127 ErrorStrategy::Ignore => {
128 warn!(
129 "Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
130 region_ids, datanode
131 );
132 return Ok(());
133 }
134 ErrorStrategy::Retry => error::RetryLaterSnafu {
135 reason: format!(
136 "Pusher not found for flush regions on datanode {:?}, elapsed: {:?}",
137 datanode,
138 now.elapsed()
139 ),
140 }
141 .fail()?,
142 },
143 Err(err) => {
144 return Err(err);
145 }
146 };
147
148 match receiver.await {
149 Ok(msg) => {
150 let reply = HeartbeatMailbox::json_reply(&msg)?;
151 info!(
152 "Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}",
153 reply,
154 region_ids,
155 now.elapsed()
156 );
157 let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?;
158 if let Some(error) = error {
159 match error_strategy {
160 ErrorStrategy::Ignore => {
161 warn!(
162 "Failed to flush regions {:?}, the datanode({}) error is ignored: {}",
163 region_ids, datanode, error
164 );
165 }
166 ErrorStrategy::Retry => {
167 return error::RetryLaterSnafu {
168 reason: format!(
169 "Failed to flush regions {:?}, the datanode({}) error is retried: {}",
170 region_ids,
171 datanode,
172 error,
173 ),
174 }
175 .fail()?;
176 }
177 }
178 } else if result {
179 info!(
180 "The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}",
181 region_ids,
182 datanode,
183 now.elapsed()
184 );
185 }
186
187 Ok(())
188 }
189 Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
190 operation: "Flush regions",
191 }
192 .fail(),
193 Err(err) => Err(err),
194 }
195}
196
197#[cfg(any(test, feature = "mock"))]
198pub mod mock {
199 use std::io::Error;
200 use std::sync::Arc;
201
202 use api::v1::region::region_server::RegionServer;
203 use api::v1::region::{RegionResponse, region_request};
204 use api::v1::{ResponseHeader, Status as PbStatus};
205 use async_trait::async_trait;
206 use client::Client;
207 use common_grpc::channel_manager::ChannelManager;
208 use common_meta::peer::Peer;
209 use common_runtime::runtime::BuilderBuild;
210 use common_runtime::{Builder as RuntimeBuilder, Runtime};
211 use hyper_util::rt::TokioIo;
212 use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
213 use tokio::sync::mpsc;
214 use tonic::codec::CompressionEncoding;
215 use tonic::transport::Server;
216 use tower::service_fn;
217
218 #[derive(Clone)]
220 pub struct EchoRegionServer {
221 runtime: Runtime,
222 received_requests: mpsc::Sender<region_request::Body>,
223 }
224
225 impl EchoRegionServer {
226 pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
227 let (tx, rx) = mpsc::channel(10);
228 (
229 Self {
230 runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
231 received_requests: tx,
232 },
233 rx,
234 )
235 }
236
237 pub fn new_client(&self, datanode: &Peer) -> Client {
238 let (client, server) = tokio::io::duplex(1024);
239
240 let handler =
241 RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
242
243 tokio::spawn(async move {
244 Server::builder()
245 .add_service(
246 RegionServer::new(handler)
247 .accept_compressed(CompressionEncoding::Gzip)
248 .accept_compressed(CompressionEncoding::Zstd)
249 .send_compressed(CompressionEncoding::Gzip)
250 .send_compressed(CompressionEncoding::Zstd),
251 )
252 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
253 .await
254 });
255
256 let channel_manager = ChannelManager::new();
257 let mut client = Some(client);
258 channel_manager
259 .reset_with_connector(
260 datanode.addr.clone(),
261 service_fn(move |_| {
262 let client = client.take().unwrap();
263 async move { Ok::<_, Error>(TokioIo::new(client)) }
264 }),
265 )
266 .unwrap();
267 Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
268 }
269 }
270
271 #[async_trait]
272 impl RegionServerHandler for EchoRegionServer {
273 async fn handle(
274 &self,
275 request: region_request::Body,
276 ) -> servers::error::Result<RegionResponse> {
277 self.received_requests.send(request).await.unwrap();
278
279 Ok(RegionResponse {
280 header: Some(ResponseHeader {
281 status: Some(PbStatus {
282 status_code: 0,
283 err_msg: String::default(),
284 }),
285 }),
286 affected_rows: 0,
287 extensions: Default::default(),
288 metadata: Vec::new(),
289 })
290 }
291 }
292}
293
294#[cfg(test)]
295pub mod test_data {
296 use std::sync::Arc;
297
298 use chrono::DateTime;
299 use common_catalog::consts::MITO2_ENGINE;
300 use common_meta::ddl::flow_meta::FlowMetadataAllocator;
301 use common_meta::ddl::table_meta::TableMetadataAllocator;
302 use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
303 use common_meta::key::TableMetadataManager;
304 use common_meta::key::flow::FlowMetadataManager;
305 use common_meta::kv_backend::memory::MemoryKvBackend;
306 use common_meta::node_manager::NodeManagerRef;
307 use common_meta::peer::Peer;
308 use common_meta::region_keeper::MemoryRegionKeeper;
309 use common_meta::region_registry::LeaderRegionRegistry;
310 use common_meta::rpc::router::RegionRoute;
311 use common_meta::sequence::SequenceBuilder;
312 use common_meta::wal_provider::WalProvider;
313 use datatypes::prelude::ConcreteDataType;
314 use datatypes::schema::{ColumnSchema, Schema};
315 use table::metadata::{TableIdent, TableInfo, TableMeta, TableType};
316 use table::requests::TableOptions;
317
318 use crate::cache_invalidator::MetasrvCacheInvalidator;
319 use crate::handler::{HeartbeatMailbox, Pushers};
320 use crate::metasrv::MetasrvInfo;
321 use crate::test_util::new_region_route;
322
323 pub fn new_region_routes() -> Vec<RegionRoute> {
324 let peers = vec![
325 Peer::new(1, "127.0.0.1:4001"),
326 Peer::new(2, "127.0.0.1:4002"),
327 Peer::new(3, "127.0.0.1:4003"),
328 ];
329 vec![
330 new_region_route(1, &peers, 3),
331 new_region_route(2, &peers, 2),
332 new_region_route(3, &peers, 1),
333 ]
334 }
335
336 pub fn new_table_info() -> TableInfo {
337 TableInfo {
338 ident: TableIdent {
339 table_id: 42,
340 version: 1,
341 },
342 name: "my_table".to_string(),
343 desc: Some("blabla".to_string()),
344 catalog_name: "my_catalog".to_string(),
345 schema_name: "my_schema".to_string(),
346 meta: TableMeta {
347 schema: Arc::new(Schema::new(vec![
348 ColumnSchema::new(
349 "ts".to_string(),
350 ConcreteDataType::timestamp_millisecond_datatype(),
351 false,
352 ),
353 ColumnSchema::new(
354 "my_tag1".to_string(),
355 ConcreteDataType::string_datatype(),
356 true,
357 ),
358 ColumnSchema::new(
359 "my_tag2".to_string(),
360 ConcreteDataType::string_datatype(),
361 true,
362 ),
363 ColumnSchema::new(
364 "my_field_column".to_string(),
365 ConcreteDataType::int32_datatype(),
366 true,
367 ),
368 ])),
369 primary_key_indices: vec![1, 2],
370 value_indices: vec![2],
371 engine: MITO2_ENGINE.to_string(),
372 next_column_id: 3,
373 options: TableOptions::default(),
374 created_on: DateTime::default(),
375 updated_on: DateTime::default(),
376 partition_key_indices: vec![],
377 column_ids: vec![],
378 },
379 table_type: TableType::Base,
380 }
381 }
382
383 pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
384 let kv_backend = Arc::new(MemoryKvBackend::new());
385
386 let mailbox_sequence =
387 SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
388 let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
389 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
390 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
391 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
392 Arc::new(WalProvider::default()),
393 ));
394 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
395 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
396 Arc::new(SequenceBuilder::new("test", kv_backend).build()),
397 ));
398 DdlContext {
399 node_manager,
400 cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
401 mailbox,
402 MetasrvInfo {
403 server_addr: "127.0.0.1:4321".to_string(),
404 },
405 )),
406 table_metadata_manager,
407 table_metadata_allocator,
408 flow_metadata_manager,
409 flow_metadata_allocator,
410 memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
411 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
412 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
413 }
414 }
415}