Skip to main content

meta_srv/procedure/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
19use common_meta::peer::Peer;
20use common_telemetry::tracing_context::TracingContext;
21use common_telemetry::{info, warn};
22use snafu::ResultExt;
23use store_api::region_request::RegionFlushReason;
24use store_api::storage::RegionId;
25use tokio::time::Instant;
26
27use crate::error::{self, Error, Result};
28use crate::handler::HeartbeatMailbox;
29use crate::service::mailbox::{Channel, MailboxRef};
30
31pub(crate) enum ErrorStrategy {
32    Ignore,
33    Retry,
34}
35
36fn handle_flush_region_reply(
37    reply: &InstructionReply,
38    region_ids: &[RegionId],
39    msg: &MailboxMessage,
40) -> Result<(bool, Option<String>)> {
41    let result = match reply {
42        InstructionReply::FlushRegions(flush_reply) => {
43            if flush_reply.results.len() != region_ids.len() {
44                return error::UnexpectedInstructionReplySnafu {
45                    mailbox_message: msg.to_string(),
46                    reason: format!(
47                        "expect {} region flush result, but got {}",
48                        region_ids.len(),
49                        flush_reply.results.len()
50                    ),
51                }
52                .fail();
53            }
54
55            match flush_reply.overall_success {
56                true => (true, None),
57                false => (
58                    false,
59                    Some(
60                        flush_reply
61                            .results
62                            .iter()
63                            .filter_map(|(region_id, result)| match result {
64                                Ok(_) => None,
65                                Err(e) => Some(format!("{}: {:?}", region_id, e)),
66                            })
67                            .collect::<Vec<String>>()
68                            .join("; "),
69                    ),
70                ),
71            }
72        }
73        _ => {
74            return error::UnexpectedInstructionReplySnafu {
75                mailbox_message: msg.to_string(),
76                reason: "expect flush region reply",
77            }
78            .fail();
79        }
80    };
81
82    Ok(result)
83}
84
85/// Flushes the regions on the datanode.
86///
87/// Retry Or Ignore:
88/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
89/// - Failed to flush region on the Datanode.
90///
91/// Abort:
92/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
93/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
94/// - [ExceededDeadline](error::Error::ExceededDeadline)
95/// - Invalid JSON.
96pub(crate) async fn flush_region(
97    mailbox: &MailboxRef,
98    server_addr: &str,
99    region_ids: &[RegionId],
100    datanode: &Peer,
101    timeout: Duration,
102    error_strategy: ErrorStrategy,
103    reason: Option<RegionFlushReason>,
104) -> Result<()> {
105    let mut flush_regions =
106        FlushRegions::sync_batch(region_ids.to_vec(), FlushErrorStrategy::TryAll);
107    flush_regions.reason = reason;
108    let flush_instruction = Instruction::FlushRegions(flush_regions);
109
110    let tracing_ctx = TracingContext::from_current_span();
111    let msg = MailboxMessage::json_message(
112        &format!("Flush regions: {:?}", region_ids),
113        &format!("Metasrv@{}", server_addr),
114        &format!("Datanode-{}@{}", datanode.id, datanode.addr),
115        common_time::util::current_time_millis(),
116        &flush_instruction,
117        Some(tracing_ctx.to_w3c()),
118    )
119    .with_context(|_| error::SerializeToJsonSnafu {
120        input: flush_instruction.to_string(),
121    })?;
122
123    let ch = Channel::Datanode(datanode.id);
124    let now = Instant::now();
125    let receiver = mailbox.send(&ch, msg, timeout).await;
126    let receiver = match receiver {
127        Ok(receiver) => receiver,
128        Err(error::Error::PusherNotFound { .. }) => match error_strategy {
129            ErrorStrategy::Ignore => {
130                warn!(
131                    "Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
132                    region_ids, datanode
133                );
134                return Ok(());
135            }
136            ErrorStrategy::Retry => error::RetryLaterSnafu {
137                reason: format!(
138                    "Pusher not found for flush regions on datanode {:?}, elapsed: {:?}",
139                    datanode,
140                    now.elapsed()
141                ),
142            }
143            .fail()?,
144        },
145        Err(err) => {
146            return Err(err);
147        }
148    };
149
150    match receiver.await {
151        Ok(msg) => {
152            let reply = HeartbeatMailbox::json_reply(&msg)?;
153            info!(
154                "Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}",
155                reply,
156                region_ids,
157                now.elapsed()
158            );
159            let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?;
160            if let Some(error) = error {
161                match error_strategy {
162                    ErrorStrategy::Ignore => {
163                        warn!(
164                            "Failed to flush regions {:?}, the datanode({}) error is ignored: {}",
165                            region_ids, datanode, error
166                        );
167                    }
168                    ErrorStrategy::Retry => {
169                        return error::RetryLaterSnafu {
170                            reason: format!(
171                                "Failed to flush regions {:?}, the datanode({}) error is retried: {}",
172                                region_ids,
173                                datanode,
174                                error,
175                            ),
176                        }
177                        .fail()?;
178                    }
179                }
180            } else if result {
181                info!(
182                    "The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}",
183                    region_ids,
184                    datanode,
185                    now.elapsed()
186                );
187            }
188
189            Ok(())
190        }
191        Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
192            operation: "Flush regions",
193        }
194        .fail(),
195        Err(error::Error::MailboxChannelClosed { .. }) => match error_strategy {
196            ErrorStrategy::Ignore => {
197                warn!(
198                    "Failed to flush regions({:?}), the datanode({}) is unreachable(MailboxChannelClosed). Skip flush operation.",
199                    region_ids, datanode
200                );
201                Ok(())
202            }
203            ErrorStrategy::Retry => error::RetryLaterSnafu {
204                reason: format!(
205                    "Mailbox closed when sending flush region to datanode {:?}, elapsed: {:?}",
206                    datanode,
207                    now.elapsed()
208                ),
209            }
210            .fail()?,
211        },
212        Err(err) => Err(err),
213    }
214}
215
216#[cfg(any(test, feature = "mock"))]
217pub mod mock {
218    use std::io::Error;
219    use std::sync::Arc;
220
221    use api::v1::region::region_server::RegionServer;
222    use api::v1::region::{RegionResponse, region_request};
223    use api::v1::{ResponseHeader, Status as PbStatus};
224    use async_trait::async_trait;
225    use client::Client;
226    use common_grpc::channel_manager::ChannelManager;
227    use common_meta::peer::Peer;
228    use common_runtime::runtime::BuilderBuild;
229    use common_runtime::{Builder as RuntimeBuilder, Runtime};
230    use hyper_util::rt::TokioIo;
231    use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
232    use tokio::sync::mpsc;
233    use tonic::codec::CompressionEncoding;
234    use tonic::transport::Server;
235    use tower::service_fn;
236
237    /// An mock implementation of region server that simply echoes the request.
238    #[derive(Clone)]
239    pub struct EchoRegionServer {
240        runtime: Runtime,
241        received_requests: mpsc::Sender<region_request::Body>,
242    }
243
244    impl EchoRegionServer {
245        pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
246            let (tx, rx) = mpsc::channel(10);
247            (
248                Self {
249                    runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
250                    received_requests: tx,
251                },
252                rx,
253            )
254        }
255
256        pub fn new_client(&self, datanode: &Peer) -> Client {
257            let (client, server) = tokio::io::duplex(1024);
258
259            let handler =
260                RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
261
262            tokio::spawn(async move {
263                Server::builder()
264                    .add_service(
265                        RegionServer::new(handler)
266                            .accept_compressed(CompressionEncoding::Gzip)
267                            .accept_compressed(CompressionEncoding::Zstd)
268                            .send_compressed(CompressionEncoding::Gzip)
269                            .send_compressed(CompressionEncoding::Zstd),
270                    )
271                    .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
272                    .await
273            });
274
275            let channel_manager = ChannelManager::new();
276            let mut client = Some(client);
277            channel_manager
278                .reset_with_connector(
279                    datanode.addr.clone(),
280                    service_fn(move |_| {
281                        let client = client.take().unwrap();
282                        async move { Ok::<_, Error>(TokioIo::new(client)) }
283                    }),
284                )
285                .unwrap();
286            Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
287        }
288    }
289
290    #[async_trait]
291    impl RegionServerHandler for EchoRegionServer {
292        async fn handle(
293            &self,
294            request: region_request::Body,
295        ) -> servers::error::Result<RegionResponse> {
296            self.received_requests.send(request).await.unwrap();
297
298            Ok(RegionResponse {
299                header: Some(ResponseHeader {
300                    status: Some(PbStatus {
301                        status_code: 0,
302                        err_msg: String::default(),
303                    }),
304                }),
305                affected_rows: 0,
306                extensions: Default::default(),
307                metadata: Vec::new(),
308            })
309        }
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use std::sync::Arc;
316
317    use common_meta::instruction::{FlushStrategy, Instruction};
318    use common_meta::kv_backend::memory::MemoryKvBackend;
319    use common_meta::sequence::SequenceBuilder;
320
321    use super::*;
322    use crate::procedure::test_util::{MailboxContext, new_flush_region_reply_for_region};
323
324    #[tokio::test]
325    async fn test_flush_region_payload_includes_reason() {
326        let kv_backend = Arc::new(MemoryKvBackend::new());
327        let mailbox_sequence = SequenceBuilder::new("test_flush_region_reason", kv_backend).build();
328        let mut mailbox_ctx = MailboxContext::new(mailbox_sequence);
329
330        let datanode = Peer::new(1, "127.0.0.1:4001");
331        let region_id = RegionId::new(1024, 1);
332        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
333        mailbox_ctx
334            .insert_heartbeat_response_receiver(Channel::Datanode(datanode.id), tx)
335            .await;
336
337        let mailbox = mailbox_ctx.mailbox().clone();
338        let reply_mailbox = mailbox.clone();
339        let reply_task = tokio::spawn(async move {
340            let response = rx.recv().await.unwrap().unwrap();
341            let msg = response.mailbox_message.unwrap();
342            let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
343            let Instruction::FlushRegions(flush_regions) = instruction else {
344                panic!("Expected FlushRegions instruction");
345            };
346
347            assert_eq!(flush_regions.region_ids, vec![region_id]);
348            assert_eq!(flush_regions.strategy, FlushStrategy::Sync);
349            assert_eq!(flush_regions.reason, Some(RegionFlushReason::Repartition));
350
351            reply_mailbox
352                .on_recv(
353                    msg.id,
354                    Ok(new_flush_region_reply_for_region(
355                        msg.id, region_id, true, None,
356                    )),
357                )
358                .await
359                .unwrap();
360        });
361
362        flush_region(
363            &mailbox,
364            "127.0.0.1:3002",
365            &[region_id],
366            &datanode,
367            Duration::from_secs(5),
368            ErrorStrategy::Retry,
369            Some(RegionFlushReason::Repartition),
370        )
371        .await
372        .unwrap();
373        reply_task.await.unwrap();
374    }
375}
376
377#[cfg(test)]
378pub mod test_data {
379    use std::sync::Arc;
380
381    use chrono::DateTime;
382    use common_catalog::consts::MITO2_ENGINE;
383    use common_meta::ddl::flow_meta::FlowMetadataAllocator;
384    use common_meta::ddl::table_meta::TableMetadataAllocator;
385    use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
386    use common_meta::key::TableMetadataManager;
387    use common_meta::key::flow::FlowMetadataManager;
388    use common_meta::kv_backend::memory::MemoryKvBackend;
389    use common_meta::node_manager::NodeManagerRef;
390    use common_meta::peer::Peer;
391    use common_meta::region_keeper::MemoryRegionKeeper;
392    use common_meta::region_registry::LeaderRegionRegistry;
393    use common_meta::rpc::router::RegionRoute;
394    use common_meta::sequence::SequenceBuilder;
395    use common_meta::wal_provider::WalProvider;
396    use datatypes::prelude::ConcreteDataType;
397    use datatypes::schema::{ColumnSchema, Schema};
398    use table::metadata::{TableIdent, TableInfo, TableMeta, TableType};
399    use table::requests::TableOptions;
400
401    use crate::cache_invalidator::MetasrvCacheInvalidator;
402    use crate::handler::{HeartbeatMailbox, Pushers};
403    use crate::metasrv::MetasrvInfo;
404    use crate::test_util::new_region_route;
405
406    pub fn new_region_routes() -> Vec<RegionRoute> {
407        let peers = vec![
408            Peer::new(1, "127.0.0.1:4001"),
409            Peer::new(2, "127.0.0.1:4002"),
410            Peer::new(3, "127.0.0.1:4003"),
411        ];
412        vec![
413            new_region_route(1, &peers, 3),
414            new_region_route(2, &peers, 2),
415            new_region_route(3, &peers, 1),
416        ]
417    }
418
419    pub fn new_table_info() -> TableInfo {
420        TableInfo {
421            ident: TableIdent {
422                table_id: 42,
423                version: 1,
424            },
425            name: "my_table".to_string(),
426            desc: Some("blabla".to_string()),
427            catalog_name: "my_catalog".to_string(),
428            schema_name: "my_schema".to_string(),
429            meta: TableMeta {
430                schema: Arc::new(Schema::new(vec![
431                    ColumnSchema::new(
432                        "ts".to_string(),
433                        ConcreteDataType::timestamp_millisecond_datatype(),
434                        false,
435                    ),
436                    ColumnSchema::new(
437                        "my_tag1".to_string(),
438                        ConcreteDataType::string_datatype(),
439                        true,
440                    ),
441                    ColumnSchema::new(
442                        "my_tag2".to_string(),
443                        ConcreteDataType::string_datatype(),
444                        true,
445                    ),
446                    ColumnSchema::new(
447                        "my_field_column".to_string(),
448                        ConcreteDataType::int32_datatype(),
449                        true,
450                    ),
451                ])),
452                primary_key_indices: vec![1, 2],
453                value_indices: vec![2],
454                engine: MITO2_ENGINE.to_string(),
455                next_column_id: 3,
456                options: TableOptions::default(),
457                created_on: DateTime::default(),
458                updated_on: DateTime::default(),
459                partition_key_indices: vec![],
460                column_ids: vec![],
461            },
462            table_type: TableType::Base,
463        }
464    }
465
466    pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
467        let kv_backend = Arc::new(MemoryKvBackend::new());
468
469        let mailbox_sequence =
470            SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
471        let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
472        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
473        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
474            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
475            Arc::new(WalProvider::default()),
476        ));
477        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
478        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
479            Arc::new(SequenceBuilder::new("test", kv_backend).build()),
480        ));
481        DdlContext {
482            node_manager,
483            cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
484                mailbox,
485                MetasrvInfo {
486                    server_addr: "127.0.0.1:4321".to_string(),
487                },
488            )),
489            table_metadata_manager,
490            table_metadata_allocator,
491            flow_metadata_manager,
492            flow_metadata_allocator,
493            memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
494            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
495            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
496        }
497    }
498}