Skip to main content

meta_srv/procedure/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{
19    FlushErrorStrategy, FlushRegions, Instruction, InstructionError, InstructionReply,
20};
21use common_meta::peer::Peer;
22use common_telemetry::tracing_context::TracingContext;
23use common_telemetry::{info, warn};
24use snafu::ResultExt;
25use store_api::region_request::RegionFlushReason;
26use store_api::storage::RegionId;
27use tokio::time::Instant;
28
29use crate::error::{self, Error, Result};
30use crate::handler::HeartbeatMailbox;
31use crate::service::mailbox::{Channel, MailboxRef};
32
33pub(crate) enum ErrorStrategy {
34    Ignore,
35    Retry,
36}
37
38#[derive(Debug, Default)]
39struct FlushRegionErrors {
40    retryable: Vec<String>,
41    non_retryable: Vec<String>,
42}
43
44impl FlushRegionErrors {
45    fn is_empty(&self) -> bool {
46        self.retryable.is_empty() && self.non_retryable.is_empty()
47    }
48
49    fn format_all(&self) -> String {
50        self.retryable
51            .iter()
52            .chain(self.non_retryable.iter())
53            .cloned()
54            .collect::<Vec<_>>()
55            .join("; ")
56    }
57
58    fn format_non_retryable(&self) -> String {
59        self.non_retryable.join("; ")
60    }
61}
62
63pub(crate) fn instruction_to_error(error: &InstructionError, reason: String) -> Error {
64    if error.retry_hint.is_retryable() {
65        error::RetryLaterSnafu { reason }.build()
66    } else {
67        error::UnexpectedSnafu { violated: reason }.build()
68    }
69}
70
71pub(crate) fn instruction_error_result<T>(error: &InstructionError, reason: String) -> Result<T> {
72    Err(instruction_to_error(error, reason))
73}
74
75fn handle_flush_region_reply(
76    reply: &InstructionReply,
77    region_ids: &[RegionId],
78    msg: &MailboxMessage,
79) -> Result<(bool, Option<FlushRegionErrors>)> {
80    let result = match reply {
81        InstructionReply::FlushRegions(flush_reply) => {
82            if flush_reply.results.len() != region_ids.len() {
83                return error::UnexpectedInstructionReplySnafu {
84                    mailbox_message: msg.to_string(),
85                    reason: format!(
86                        "expect {} region flush result, but got {}",
87                        region_ids.len(),
88                        flush_reply.results.len()
89                    ),
90                }
91                .fail();
92            }
93
94            match flush_reply.overall_success {
95                true => (true, None),
96                false => {
97                    let mut errors = FlushRegionErrors::default();
98                    for (region_id, result) in &flush_reply.results {
99                        let Err(error) = result else {
100                            continue;
101                        };
102                        let message = format!("{}: {:?}", region_id, error);
103                        if error.retry_hint.is_retryable() {
104                            errors.retryable.push(message);
105                        } else {
106                            errors.non_retryable.push(message);
107                        }
108                    }
109                    (false, (!errors.is_empty()).then_some(errors))
110                }
111            }
112        }
113        _ => {
114            return error::UnexpectedInstructionReplySnafu {
115                mailbox_message: msg.to_string(),
116                reason: "expect flush region reply",
117            }
118            .fail();
119        }
120    };
121
122    Ok(result)
123}
124
125/// Flushes the regions on the datanode.
126///
127/// Retry Or Ignore:
128/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
129/// - Failed to flush region on the Datanode.
130///
131/// Abort:
132/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
133/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
134/// - [ExceededDeadline](error::Error::ExceededDeadline)
135/// - Invalid JSON.
136pub(crate) async fn flush_region(
137    mailbox: &MailboxRef,
138    server_addr: &str,
139    region_ids: &[RegionId],
140    datanode: &Peer,
141    timeout: Duration,
142    error_strategy: ErrorStrategy,
143    reason: Option<RegionFlushReason>,
144) -> Result<()> {
145    let mut flush_regions =
146        FlushRegions::sync_batch(region_ids.to_vec(), FlushErrorStrategy::TryAll);
147    flush_regions.reason = reason;
148    let flush_instruction = Instruction::FlushRegions(flush_regions);
149
150    let tracing_ctx = TracingContext::from_current_span();
151    let msg = MailboxMessage::json_message(
152        &format!("Flush regions: {:?}", region_ids),
153        &format!("Metasrv@{}", server_addr),
154        &format!("Datanode-{}@{}", datanode.id, datanode.addr),
155        common_time::util::current_time_millis(),
156        &flush_instruction,
157        Some(tracing_ctx.to_w3c()),
158    )
159    .with_context(|_| error::SerializeToJsonSnafu {
160        input: flush_instruction.to_string(),
161    })?;
162
163    let ch = Channel::Datanode(datanode.id);
164    let now = Instant::now();
165    let receiver = mailbox.send(&ch, msg, timeout).await;
166    let receiver = match receiver {
167        Ok(receiver) => receiver,
168        Err(error::Error::PusherNotFound { .. }) => match error_strategy {
169            ErrorStrategy::Ignore => {
170                warn!(
171                    "Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
172                    region_ids, datanode
173                );
174                return Ok(());
175            }
176            ErrorStrategy::Retry => error::RetryLaterSnafu {
177                reason: format!(
178                    "Pusher not found for flush regions on datanode {:?}, elapsed: {:?}",
179                    datanode,
180                    now.elapsed()
181                ),
182            }
183            .fail()?,
184        },
185        Err(err) => {
186            return Err(err);
187        }
188    };
189
190    match receiver.await {
191        Ok(msg) => {
192            let reply = HeartbeatMailbox::json_reply(&msg)?;
193            info!(
194                "Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}",
195                reply,
196                region_ids,
197                now.elapsed()
198            );
199            let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?;
200            if let Some(error) = error {
201                match error_strategy {
202                    ErrorStrategy::Ignore => {
203                        warn!(
204                            "Failed to flush regions {:?}, the datanode({}) error is ignored: {}",
205                            region_ids,
206                            datanode,
207                            error.format_all()
208                        );
209                    }
210                    ErrorStrategy::Retry => {
211                        if !error.non_retryable.is_empty() {
212                            return error::UnexpectedSnafu {
213                                violated: format!(
214                                    "Failed to flush regions {:?}, the datanode({}) reported non-retryable errors: {}",
215                                    region_ids,
216                                    datanode,
217                                    error.format_non_retryable(),
218                                ),
219                            }
220                            .fail();
221                        }
222                        return error::RetryLaterSnafu {
223                            reason: format!(
224                                "Failed to flush regions {:?}, the datanode({}) error is retried: {}",
225                                region_ids,
226                                datanode,
227                                error.format_all(),
228                            ),
229                        }
230                        .fail()?;
231                    }
232                }
233            } else if result {
234                info!(
235                    "The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}",
236                    region_ids,
237                    datanode,
238                    now.elapsed()
239                );
240            }
241
242            Ok(())
243        }
244        Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
245            operation: "Flush regions",
246        }
247        .fail(),
248        Err(error::Error::MailboxChannelClosed { .. }) => match error_strategy {
249            ErrorStrategy::Ignore => {
250                warn!(
251                    "Failed to flush regions({:?}), the datanode({}) is unreachable(MailboxChannelClosed). Skip flush operation.",
252                    region_ids, datanode
253                );
254                Ok(())
255            }
256            ErrorStrategy::Retry => error::RetryLaterSnafu {
257                reason: format!(
258                    "Mailbox closed when sending flush region to datanode {:?}, elapsed: {:?}",
259                    datanode,
260                    now.elapsed()
261                ),
262            }
263            .fail()?,
264        },
265        Err(err) => Err(err),
266    }
267}
268
269#[cfg(any(test, feature = "mock"))]
270pub mod mock {
271    use std::io::Error;
272    use std::sync::Arc;
273
274    use api::v1::region::region_server::RegionServer;
275    use api::v1::region::{RegionResponse, region_request};
276    use api::v1::{ResponseHeader, Status as PbStatus};
277    use async_trait::async_trait;
278    use client::Client;
279    use common_grpc::channel_manager::ChannelManager;
280    use common_meta::peer::Peer;
281    use common_runtime::runtime::BuilderBuild;
282    use common_runtime::{Builder as RuntimeBuilder, Runtime};
283    use hyper_util::rt::TokioIo;
284    use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
285    use tokio::sync::mpsc;
286    use tonic::codec::CompressionEncoding;
287    use tonic::transport::Server;
288    use tower::service_fn;
289
290    /// An mock implementation of region server that simply echoes the request.
291    #[derive(Clone)]
292    pub struct EchoRegionServer {
293        runtime: Runtime,
294        received_requests: mpsc::Sender<region_request::Body>,
295    }
296
297    impl EchoRegionServer {
298        pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
299            let (tx, rx) = mpsc::channel(10);
300            (
301                Self {
302                    runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
303                    received_requests: tx,
304                },
305                rx,
306            )
307        }
308
309        pub fn new_client(&self, datanode: &Peer) -> Client {
310            let (client, server) = tokio::io::duplex(1024);
311
312            let handler =
313                RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
314
315            tokio::spawn(async move {
316                Server::builder()
317                    .add_service(
318                        RegionServer::new(handler)
319                            .accept_compressed(CompressionEncoding::Gzip)
320                            .accept_compressed(CompressionEncoding::Zstd)
321                            .send_compressed(CompressionEncoding::Gzip)
322                            .send_compressed(CompressionEncoding::Zstd),
323                    )
324                    .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
325                    .await
326            });
327
328            let channel_manager = ChannelManager::new();
329            let mut client = Some(client);
330            channel_manager
331                .reset_with_connector(
332                    datanode.addr.clone(),
333                    service_fn(move |_| {
334                        let client = client.take().unwrap();
335                        async move { Ok::<_, Error>(TokioIo::new(client)) }
336                    }),
337                )
338                .unwrap();
339            Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
340        }
341    }
342
343    #[async_trait]
344    impl RegionServerHandler for EchoRegionServer {
345        async fn handle(
346            &self,
347            request: region_request::Body,
348        ) -> servers::error::Result<RegionResponse> {
349            self.received_requests.send(request).await.unwrap();
350
351            Ok(RegionResponse {
352                header: Some(ResponseHeader {
353                    status: Some(PbStatus {
354                        status_code: 0,
355                        err_msg: String::default(),
356                    }),
357                }),
358                affected_rows: 0,
359                extensions: Default::default(),
360                metadata: Vec::new(),
361            })
362        }
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use std::sync::Arc;
369
370    use common_meta::instruction::{FlushStrategy, Instruction};
371    use common_meta::kv_backend::memory::MemoryKvBackend;
372    use common_meta::sequence::SequenceBuilder;
373
374    use super::*;
375    use crate::procedure::test_util::{MailboxContext, new_flush_region_reply_for_region};
376
377    #[tokio::test]
378    async fn test_flush_region_payload_includes_reason() {
379        let kv_backend = Arc::new(MemoryKvBackend::new());
380        let mailbox_sequence = SequenceBuilder::new("test_flush_region_reason", kv_backend).build();
381        let mut mailbox_ctx = MailboxContext::new(mailbox_sequence);
382
383        let datanode = Peer::new(1, "127.0.0.1:4001");
384        let region_id = RegionId::new(1024, 1);
385        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
386        mailbox_ctx
387            .insert_heartbeat_response_receiver(Channel::Datanode(datanode.id), tx)
388            .await;
389
390        let mailbox = mailbox_ctx.mailbox().clone();
391        let reply_mailbox = mailbox.clone();
392        let reply_task = tokio::spawn(async move {
393            let response = rx.recv().await.unwrap().unwrap();
394            let msg = response.mailbox_message.unwrap();
395            let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
396            let Instruction::FlushRegions(flush_regions) = instruction else {
397                panic!("Expected FlushRegions instruction");
398            };
399
400            assert_eq!(flush_regions.region_ids, vec![region_id]);
401            assert_eq!(flush_regions.strategy, FlushStrategy::Sync);
402            assert_eq!(flush_regions.reason, Some(RegionFlushReason::Repartition));
403
404            reply_mailbox
405                .on_recv(
406                    msg.id,
407                    Ok(new_flush_region_reply_for_region(
408                        msg.id, region_id, true, None,
409                    )),
410                )
411                .await
412                .unwrap();
413        });
414
415        flush_region(
416            &mailbox,
417            "127.0.0.1:3002",
418            &[region_id],
419            &datanode,
420            Duration::from_secs(5),
421            ErrorStrategy::Retry,
422            Some(RegionFlushReason::Repartition),
423        )
424        .await
425        .unwrap();
426        reply_task.await.unwrap();
427    }
428}
429
430#[cfg(test)]
431pub mod test_data {
432    use std::sync::Arc;
433
434    use chrono::DateTime;
435    use common_catalog::consts::MITO2_ENGINE;
436    use common_meta::ddl::flow_meta::FlowMetadataAllocator;
437    use common_meta::ddl::table_meta::TableMetadataAllocator;
438    use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
439    use common_meta::key::TableMetadataManager;
440    use common_meta::key::flow::FlowMetadataManager;
441    use common_meta::kv_backend::memory::MemoryKvBackend;
442    use common_meta::node_manager::NodeManagerRef;
443    use common_meta::peer::Peer;
444    use common_meta::region_keeper::MemoryRegionKeeper;
445    use common_meta::region_registry::LeaderRegionRegistry;
446    use common_meta::rpc::router::RegionRoute;
447    use common_meta::sequence::SequenceBuilder;
448    use common_meta::wal_provider::WalProvider;
449    use datatypes::prelude::ConcreteDataType;
450    use datatypes::schema::{ColumnSchema, Schema};
451    use table::metadata::{TableIdent, TableInfo, TableMeta, TableType};
452    use table::requests::TableOptions;
453
454    use crate::cache_invalidator::MetasrvCacheInvalidator;
455    use crate::handler::{HeartbeatMailbox, Pushers};
456    use crate::metasrv::MetasrvInfo;
457    use crate::test_util::new_region_route;
458
459    pub fn new_region_routes() -> Vec<RegionRoute> {
460        let peers = vec![
461            Peer::new(1, "127.0.0.1:4001"),
462            Peer::new(2, "127.0.0.1:4002"),
463            Peer::new(3, "127.0.0.1:4003"),
464        ];
465        vec![
466            new_region_route(1, &peers, 3),
467            new_region_route(2, &peers, 2),
468            new_region_route(3, &peers, 1),
469        ]
470    }
471
472    pub fn new_table_info() -> TableInfo {
473        TableInfo {
474            ident: TableIdent {
475                table_id: 42,
476                version: 1,
477            },
478            name: "my_table".to_string(),
479            desc: Some("blabla".to_string()),
480            catalog_name: "my_catalog".to_string(),
481            schema_name: "my_schema".to_string(),
482            meta: TableMeta {
483                schema: Arc::new(Schema::new(vec![
484                    ColumnSchema::new(
485                        "ts".to_string(),
486                        ConcreteDataType::timestamp_millisecond_datatype(),
487                        false,
488                    ),
489                    ColumnSchema::new(
490                        "my_tag1".to_string(),
491                        ConcreteDataType::string_datatype(),
492                        true,
493                    ),
494                    ColumnSchema::new(
495                        "my_tag2".to_string(),
496                        ConcreteDataType::string_datatype(),
497                        true,
498                    ),
499                    ColumnSchema::new(
500                        "my_field_column".to_string(),
501                        ConcreteDataType::int32_datatype(),
502                        true,
503                    ),
504                ])),
505                primary_key_indices: vec![1, 2],
506                value_indices: vec![2],
507                engine: MITO2_ENGINE.to_string(),
508                next_column_id: 3,
509                options: TableOptions::default(),
510                created_on: DateTime::default(),
511                updated_on: DateTime::default(),
512                partition_key_indices: vec![],
513                column_ids: vec![],
514            },
515            table_type: TableType::Base,
516        }
517    }
518
519    pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
520        let kv_backend = Arc::new(MemoryKvBackend::new());
521
522        let mailbox_sequence =
523            SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
524        let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
525        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
526        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
527            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
528            Arc::new(WalProvider::default()),
529        ));
530        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
531        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
532            Arc::new(SequenceBuilder::new("test", kv_backend).build()),
533        ));
534        DdlContext {
535            node_manager,
536            cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
537                mailbox,
538                MetasrvInfo {
539                    server_addr: "127.0.0.1:4321".to_string(),
540                },
541            )),
542            table_metadata_manager,
543            table_metadata_allocator,
544            flow_metadata_manager,
545            flow_metadata_allocator,
546            memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
547            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
548            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
549        }
550    }
551}