Skip to main content

meta_srv/procedure/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
19use common_meta::peer::Peer;
20use common_telemetry::tracing_context::TracingContext;
21use common_telemetry::{info, warn};
22use snafu::ResultExt;
23use store_api::storage::RegionId;
24use tokio::time::Instant;
25
26use crate::error::{self, Error, Result};
27use crate::handler::HeartbeatMailbox;
28use crate::service::mailbox::{Channel, MailboxRef};
29
30pub(crate) enum ErrorStrategy {
31    Ignore,
32    Retry,
33}
34
35fn handle_flush_region_reply(
36    reply: &InstructionReply,
37    region_ids: &[RegionId],
38    msg: &MailboxMessage,
39) -> Result<(bool, Option<String>)> {
40    let result = match reply {
41        InstructionReply::FlushRegions(flush_reply) => {
42            if flush_reply.results.len() != region_ids.len() {
43                return error::UnexpectedInstructionReplySnafu {
44                    mailbox_message: msg.to_string(),
45                    reason: format!(
46                        "expect {} region flush result, but got {}",
47                        region_ids.len(),
48                        flush_reply.results.len()
49                    ),
50                }
51                .fail();
52            }
53
54            match flush_reply.overall_success {
55                true => (true, None),
56                false => (
57                    false,
58                    Some(
59                        flush_reply
60                            .results
61                            .iter()
62                            .filter_map(|(region_id, result)| match result {
63                                Ok(_) => None,
64                                Err(e) => Some(format!("{}: {:?}", region_id, e)),
65                            })
66                            .collect::<Vec<String>>()
67                            .join("; "),
68                    ),
69                ),
70            }
71        }
72        _ => {
73            return error::UnexpectedInstructionReplySnafu {
74                mailbox_message: msg.to_string(),
75                reason: "expect flush region reply",
76            }
77            .fail();
78        }
79    };
80
81    Ok(result)
82}
83
84/// Flushes the regions on the datanode.
85///
86/// Retry Or Ignore:
87/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
88/// - Failed to flush region on the Datanode.
89///
90/// Abort:
91/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
92/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
93/// - [ExceededDeadline](error::Error::ExceededDeadline)
94/// - Invalid JSON.
95pub(crate) async fn flush_region(
96    mailbox: &MailboxRef,
97    server_addr: &str,
98    region_ids: &[RegionId],
99    datanode: &Peer,
100    timeout: Duration,
101    error_strategy: ErrorStrategy,
102) -> Result<()> {
103    let flush_instruction = Instruction::FlushRegions(FlushRegions::sync_batch(
104        region_ids.to_vec(),
105        FlushErrorStrategy::TryAll,
106    ));
107
108    let tracing_ctx = TracingContext::from_current_span();
109    let msg = MailboxMessage::json_message(
110        &format!("Flush regions: {:?}", region_ids),
111        &format!("Metasrv@{}", server_addr),
112        &format!("Datanode-{}@{}", datanode.id, datanode.addr),
113        common_time::util::current_time_millis(),
114        &flush_instruction,
115        Some(tracing_ctx.to_w3c()),
116    )
117    .with_context(|_| error::SerializeToJsonSnafu {
118        input: flush_instruction.to_string(),
119    })?;
120
121    let ch = Channel::Datanode(datanode.id);
122    let now = Instant::now();
123    let receiver = mailbox.send(&ch, msg, timeout).await;
124    let receiver = match receiver {
125        Ok(receiver) => receiver,
126        Err(error::Error::PusherNotFound { .. }) => match error_strategy {
127            ErrorStrategy::Ignore => {
128                warn!(
129                    "Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
130                    region_ids, datanode
131                );
132                return Ok(());
133            }
134            ErrorStrategy::Retry => error::RetryLaterSnafu {
135                reason: format!(
136                    "Pusher not found for flush regions on datanode {:?}, elapsed: {:?}",
137                    datanode,
138                    now.elapsed()
139                ),
140            }
141            .fail()?,
142        },
143        Err(err) => {
144            return Err(err);
145        }
146    };
147
148    match receiver.await {
149        Ok(msg) => {
150            let reply = HeartbeatMailbox::json_reply(&msg)?;
151            info!(
152                "Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}",
153                reply,
154                region_ids,
155                now.elapsed()
156            );
157            let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?;
158            if let Some(error) = error {
159                match error_strategy {
160                    ErrorStrategy::Ignore => {
161                        warn!(
162                            "Failed to flush regions {:?}, the datanode({}) error is ignored: {}",
163                            region_ids, datanode, error
164                        );
165                    }
166                    ErrorStrategy::Retry => {
167                        return error::RetryLaterSnafu {
168                            reason: format!(
169                                "Failed to flush regions {:?}, the datanode({}) error is retried: {}",
170                                region_ids,
171                                datanode,
172                                error,
173                            ),
174                        }
175                        .fail()?;
176                    }
177                }
178            } else if result {
179                info!(
180                    "The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}",
181                    region_ids,
182                    datanode,
183                    now.elapsed()
184                );
185            }
186
187            Ok(())
188        }
189        Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
190            operation: "Flush regions",
191        }
192        .fail(),
193        Err(error::Error::MailboxChannelClosed { .. }) => match error_strategy {
194            ErrorStrategy::Ignore => {
195                warn!(
196                    "Failed to flush regions({:?}), the datanode({}) is unreachable(MailboxChannelClosed). Skip flush operation.",
197                    region_ids, datanode
198                );
199                Ok(())
200            }
201            ErrorStrategy::Retry => error::RetryLaterSnafu {
202                reason: format!(
203                    "Mailbox closed when sending flush region to datanode {:?}, elapsed: {:?}",
204                    datanode,
205                    now.elapsed()
206                ),
207            }
208            .fail()?,
209        },
210        Err(err) => Err(err),
211    }
212}
213
214#[cfg(any(test, feature = "mock"))]
215pub mod mock {
216    use std::io::Error;
217    use std::sync::Arc;
218
219    use api::v1::region::region_server::RegionServer;
220    use api::v1::region::{RegionResponse, region_request};
221    use api::v1::{ResponseHeader, Status as PbStatus};
222    use async_trait::async_trait;
223    use client::Client;
224    use common_grpc::channel_manager::ChannelManager;
225    use common_meta::peer::Peer;
226    use common_runtime::runtime::BuilderBuild;
227    use common_runtime::{Builder as RuntimeBuilder, Runtime};
228    use hyper_util::rt::TokioIo;
229    use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
230    use tokio::sync::mpsc;
231    use tonic::codec::CompressionEncoding;
232    use tonic::transport::Server;
233    use tower::service_fn;
234
235    /// An mock implementation of region server that simply echoes the request.
236    #[derive(Clone)]
237    pub struct EchoRegionServer {
238        runtime: Runtime,
239        received_requests: mpsc::Sender<region_request::Body>,
240    }
241
242    impl EchoRegionServer {
243        pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
244            let (tx, rx) = mpsc::channel(10);
245            (
246                Self {
247                    runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
248                    received_requests: tx,
249                },
250                rx,
251            )
252        }
253
254        pub fn new_client(&self, datanode: &Peer) -> Client {
255            let (client, server) = tokio::io::duplex(1024);
256
257            let handler =
258                RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
259
260            tokio::spawn(async move {
261                Server::builder()
262                    .add_service(
263                        RegionServer::new(handler)
264                            .accept_compressed(CompressionEncoding::Gzip)
265                            .accept_compressed(CompressionEncoding::Zstd)
266                            .send_compressed(CompressionEncoding::Gzip)
267                            .send_compressed(CompressionEncoding::Zstd),
268                    )
269                    .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
270                    .await
271            });
272
273            let channel_manager = ChannelManager::new();
274            let mut client = Some(client);
275            channel_manager
276                .reset_with_connector(
277                    datanode.addr.clone(),
278                    service_fn(move |_| {
279                        let client = client.take().unwrap();
280                        async move { Ok::<_, Error>(TokioIo::new(client)) }
281                    }),
282                )
283                .unwrap();
284            Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
285        }
286    }
287
288    #[async_trait]
289    impl RegionServerHandler for EchoRegionServer {
290        async fn handle(
291            &self,
292            request: region_request::Body,
293        ) -> servers::error::Result<RegionResponse> {
294            self.received_requests.send(request).await.unwrap();
295
296            Ok(RegionResponse {
297                header: Some(ResponseHeader {
298                    status: Some(PbStatus {
299                        status_code: 0,
300                        err_msg: String::default(),
301                    }),
302                }),
303                affected_rows: 0,
304                extensions: Default::default(),
305                metadata: Vec::new(),
306            })
307        }
308    }
309}
310
311#[cfg(test)]
312pub mod test_data {
313    use std::sync::Arc;
314
315    use chrono::DateTime;
316    use common_catalog::consts::MITO2_ENGINE;
317    use common_meta::ddl::flow_meta::FlowMetadataAllocator;
318    use common_meta::ddl::table_meta::TableMetadataAllocator;
319    use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
320    use common_meta::key::TableMetadataManager;
321    use common_meta::key::flow::FlowMetadataManager;
322    use common_meta::kv_backend::memory::MemoryKvBackend;
323    use common_meta::node_manager::NodeManagerRef;
324    use common_meta::peer::Peer;
325    use common_meta::region_keeper::MemoryRegionKeeper;
326    use common_meta::region_registry::LeaderRegionRegistry;
327    use common_meta::rpc::router::RegionRoute;
328    use common_meta::sequence::SequenceBuilder;
329    use common_meta::wal_provider::WalProvider;
330    use datatypes::prelude::ConcreteDataType;
331    use datatypes::schema::{ColumnSchema, Schema};
332    use table::metadata::{TableIdent, TableInfo, TableMeta, TableType};
333    use table::requests::TableOptions;
334
335    use crate::cache_invalidator::MetasrvCacheInvalidator;
336    use crate::handler::{HeartbeatMailbox, Pushers};
337    use crate::metasrv::MetasrvInfo;
338    use crate::test_util::new_region_route;
339
340    pub fn new_region_routes() -> Vec<RegionRoute> {
341        let peers = vec![
342            Peer::new(1, "127.0.0.1:4001"),
343            Peer::new(2, "127.0.0.1:4002"),
344            Peer::new(3, "127.0.0.1:4003"),
345        ];
346        vec![
347            new_region_route(1, &peers, 3),
348            new_region_route(2, &peers, 2),
349            new_region_route(3, &peers, 1),
350        ]
351    }
352
353    pub fn new_table_info() -> TableInfo {
354        TableInfo {
355            ident: TableIdent {
356                table_id: 42,
357                version: 1,
358            },
359            name: "my_table".to_string(),
360            desc: Some("blabla".to_string()),
361            catalog_name: "my_catalog".to_string(),
362            schema_name: "my_schema".to_string(),
363            meta: TableMeta {
364                schema: Arc::new(Schema::new(vec![
365                    ColumnSchema::new(
366                        "ts".to_string(),
367                        ConcreteDataType::timestamp_millisecond_datatype(),
368                        false,
369                    ),
370                    ColumnSchema::new(
371                        "my_tag1".to_string(),
372                        ConcreteDataType::string_datatype(),
373                        true,
374                    ),
375                    ColumnSchema::new(
376                        "my_tag2".to_string(),
377                        ConcreteDataType::string_datatype(),
378                        true,
379                    ),
380                    ColumnSchema::new(
381                        "my_field_column".to_string(),
382                        ConcreteDataType::int32_datatype(),
383                        true,
384                    ),
385                ])),
386                primary_key_indices: vec![1, 2],
387                value_indices: vec![2],
388                engine: MITO2_ENGINE.to_string(),
389                next_column_id: 3,
390                options: TableOptions::default(),
391                created_on: DateTime::default(),
392                updated_on: DateTime::default(),
393                partition_key_indices: vec![],
394                column_ids: vec![],
395            },
396            table_type: TableType::Base,
397        }
398    }
399
400    pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
401        let kv_backend = Arc::new(MemoryKvBackend::new());
402
403        let mailbox_sequence =
404            SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
405        let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
406        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
407        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
408            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
409            Arc::new(WalProvider::default()),
410        ));
411        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
412        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
413            Arc::new(SequenceBuilder::new("test", kv_backend).build()),
414        ));
415        DdlContext {
416            node_manager,
417            cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
418                mailbox,
419                MetasrvInfo {
420                    server_addr: "127.0.0.1:4321".to_string(),
421                },
422            )),
423            table_metadata_manager,
424            table_metadata_allocator,
425            flow_metadata_manager,
426            flow_metadata_allocator,
427            memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
428            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
429            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
430        }
431    }
432}