meta_srv/procedure/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
19use common_meta::peer::Peer;
20use common_telemetry::tracing_context::TracingContext;
21use common_telemetry::{info, warn};
22use snafu::ResultExt;
23use store_api::storage::RegionId;
24use tokio::time::Instant;
25
26use crate::error::{self, Error, Result};
27use crate::handler::HeartbeatMailbox;
28use crate::service::mailbox::{Channel, MailboxRef};
29
30pub(crate) enum ErrorStrategy {
31    Ignore,
32    Retry,
33}
34
35fn handle_flush_region_reply(
36    reply: &InstructionReply,
37    region_ids: &[RegionId],
38    msg: &MailboxMessage,
39) -> Result<(bool, Option<String>)> {
40    let result = match reply {
41        InstructionReply::FlushRegions(flush_reply) => {
42            if flush_reply.results.len() != region_ids.len() {
43                return error::UnexpectedInstructionReplySnafu {
44                    mailbox_message: msg.to_string(),
45                    reason: format!(
46                        "expect {} region flush result, but got {}",
47                        region_ids.len(),
48                        flush_reply.results.len()
49                    ),
50                }
51                .fail();
52            }
53
54            match flush_reply.overall_success {
55                true => (true, None),
56                false => (
57                    false,
58                    Some(
59                        flush_reply
60                            .results
61                            .iter()
62                            .filter_map(|(region_id, result)| match result {
63                                Ok(_) => None,
64                                Err(e) => Some(format!("{}: {:?}", region_id, e)),
65                            })
66                            .collect::<Vec<String>>()
67                            .join("; "),
68                    ),
69                ),
70            }
71        }
72        _ => {
73            return error::UnexpectedInstructionReplySnafu {
74                mailbox_message: msg.to_string(),
75                reason: "expect flush region reply",
76            }
77            .fail();
78        }
79    };
80
81    Ok(result)
82}
83
84/// Flushes the regions on the datanode.
85///
86/// Retry Or Ignore:
87/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
88/// - Failed to flush region on the Datanode.
89///
90/// Abort:
91/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
92/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
93/// - [ExceededDeadline](error::Error::ExceededDeadline)
94/// - Invalid JSON.
95pub(crate) async fn flush_region(
96    mailbox: &MailboxRef,
97    server_addr: &str,
98    region_ids: &[RegionId],
99    datanode: &Peer,
100    timeout: Duration,
101    error_strategy: ErrorStrategy,
102) -> Result<()> {
103    let flush_instruction = Instruction::FlushRegions(FlushRegions::sync_batch(
104        region_ids.to_vec(),
105        FlushErrorStrategy::TryAll,
106    ));
107
108    let tracing_ctx = TracingContext::from_current_span();
109    let msg = MailboxMessage::json_message(
110        &format!("Flush regions: {:?}", region_ids),
111        &format!("Metasrv@{}", server_addr),
112        &format!("Datanode-{}@{}", datanode.id, datanode.addr),
113        common_time::util::current_time_millis(),
114        &flush_instruction,
115        Some(tracing_ctx.to_w3c()),
116    )
117    .with_context(|_| error::SerializeToJsonSnafu {
118        input: flush_instruction.to_string(),
119    })?;
120
121    let ch = Channel::Datanode(datanode.id);
122    let now = Instant::now();
123    let receiver = mailbox.send(&ch, msg, timeout).await;
124    let receiver = match receiver {
125        Ok(receiver) => receiver,
126        Err(error::Error::PusherNotFound { .. }) => match error_strategy {
127            ErrorStrategy::Ignore => {
128                warn!(
129                    "Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
130                    region_ids, datanode
131                );
132                return Ok(());
133            }
134            ErrorStrategy::Retry => error::RetryLaterSnafu {
135                reason: format!(
136                    "Pusher not found for flush regions on datanode {:?}, elapsed: {:?}",
137                    datanode,
138                    now.elapsed()
139                ),
140            }
141            .fail()?,
142        },
143        Err(err) => {
144            return Err(err);
145        }
146    };
147
148    match receiver.await {
149        Ok(msg) => {
150            let reply = HeartbeatMailbox::json_reply(&msg)?;
151            info!(
152                "Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}",
153                reply,
154                region_ids,
155                now.elapsed()
156            );
157            let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?;
158            if let Some(error) = error {
159                match error_strategy {
160                    ErrorStrategy::Ignore => {
161                        warn!(
162                            "Failed to flush regions {:?}, the datanode({}) error is ignored: {}",
163                            region_ids, datanode, error
164                        );
165                    }
166                    ErrorStrategy::Retry => {
167                        return error::RetryLaterSnafu {
168                            reason: format!(
169                                "Failed to flush regions {:?}, the datanode({}) error is retried: {}",
170                                region_ids,
171                                datanode,
172                                error,
173                            ),
174                        }
175                        .fail()?;
176                    }
177                }
178            } else if result {
179                info!(
180                    "The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}",
181                    region_ids,
182                    datanode,
183                    now.elapsed()
184                );
185            }
186
187            Ok(())
188        }
189        Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
190            operation: "Flush regions",
191        }
192        .fail(),
193        Err(err) => Err(err),
194    }
195}
196
197#[cfg(any(test, feature = "mock"))]
198pub mod mock {
199    use std::io::Error;
200    use std::sync::Arc;
201
202    use api::v1::region::region_server::RegionServer;
203    use api::v1::region::{RegionResponse, region_request};
204    use api::v1::{ResponseHeader, Status as PbStatus};
205    use async_trait::async_trait;
206    use client::Client;
207    use common_grpc::channel_manager::ChannelManager;
208    use common_meta::peer::Peer;
209    use common_runtime::runtime::BuilderBuild;
210    use common_runtime::{Builder as RuntimeBuilder, Runtime};
211    use hyper_util::rt::TokioIo;
212    use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
213    use tokio::sync::mpsc;
214    use tonic::codec::CompressionEncoding;
215    use tonic::transport::Server;
216    use tower::service_fn;
217
218    /// An mock implementation of region server that simply echoes the request.
219    #[derive(Clone)]
220    pub struct EchoRegionServer {
221        runtime: Runtime,
222        received_requests: mpsc::Sender<region_request::Body>,
223    }
224
225    impl EchoRegionServer {
226        pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
227            let (tx, rx) = mpsc::channel(10);
228            (
229                Self {
230                    runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
231                    received_requests: tx,
232                },
233                rx,
234            )
235        }
236
237        pub fn new_client(&self, datanode: &Peer) -> Client {
238            let (client, server) = tokio::io::duplex(1024);
239
240            let handler =
241                RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
242
243            tokio::spawn(async move {
244                Server::builder()
245                    .add_service(
246                        RegionServer::new(handler)
247                            .accept_compressed(CompressionEncoding::Gzip)
248                            .accept_compressed(CompressionEncoding::Zstd)
249                            .send_compressed(CompressionEncoding::Gzip)
250                            .send_compressed(CompressionEncoding::Zstd),
251                    )
252                    .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
253                    .await
254            });
255
256            let channel_manager = ChannelManager::new();
257            let mut client = Some(client);
258            channel_manager
259                .reset_with_connector(
260                    datanode.addr.clone(),
261                    service_fn(move |_| {
262                        let client = client.take().unwrap();
263                        async move { Ok::<_, Error>(TokioIo::new(client)) }
264                    }),
265                )
266                .unwrap();
267            Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
268        }
269    }
270
271    #[async_trait]
272    impl RegionServerHandler for EchoRegionServer {
273        async fn handle(
274            &self,
275            request: region_request::Body,
276        ) -> servers::error::Result<RegionResponse> {
277            self.received_requests.send(request).await.unwrap();
278
279            Ok(RegionResponse {
280                header: Some(ResponseHeader {
281                    status: Some(PbStatus {
282                        status_code: 0,
283                        err_msg: String::default(),
284                    }),
285                }),
286                affected_rows: 0,
287                extensions: Default::default(),
288                metadata: Vec::new(),
289            })
290        }
291    }
292}
293
294#[cfg(test)]
295pub mod test_data {
296    use std::sync::Arc;
297
298    use chrono::DateTime;
299    use common_catalog::consts::MITO2_ENGINE;
300    use common_meta::ddl::flow_meta::FlowMetadataAllocator;
301    use common_meta::ddl::table_meta::TableMetadataAllocator;
302    use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
303    use common_meta::key::TableMetadataManager;
304    use common_meta::key::flow::FlowMetadataManager;
305    use common_meta::kv_backend::memory::MemoryKvBackend;
306    use common_meta::node_manager::NodeManagerRef;
307    use common_meta::peer::Peer;
308    use common_meta::region_keeper::MemoryRegionKeeper;
309    use common_meta::region_registry::LeaderRegionRegistry;
310    use common_meta::rpc::router::RegionRoute;
311    use common_meta::sequence::SequenceBuilder;
312    use common_meta::wal_provider::WalProvider;
313    use datatypes::prelude::ConcreteDataType;
314    use datatypes::schema::{ColumnSchema, Schema};
315    use table::metadata::{TableIdent, TableInfo, TableMeta, TableType};
316    use table::requests::TableOptions;
317
318    use crate::cache_invalidator::MetasrvCacheInvalidator;
319    use crate::handler::{HeartbeatMailbox, Pushers};
320    use crate::metasrv::MetasrvInfo;
321    use crate::test_util::new_region_route;
322
323    pub fn new_region_routes() -> Vec<RegionRoute> {
324        let peers = vec![
325            Peer::new(1, "127.0.0.1:4001"),
326            Peer::new(2, "127.0.0.1:4002"),
327            Peer::new(3, "127.0.0.1:4003"),
328        ];
329        vec![
330            new_region_route(1, &peers, 3),
331            new_region_route(2, &peers, 2),
332            new_region_route(3, &peers, 1),
333        ]
334    }
335
336    pub fn new_table_info() -> TableInfo {
337        TableInfo {
338            ident: TableIdent {
339                table_id: 42,
340                version: 1,
341            },
342            name: "my_table".to_string(),
343            desc: Some("blabla".to_string()),
344            catalog_name: "my_catalog".to_string(),
345            schema_name: "my_schema".to_string(),
346            meta: TableMeta {
347                schema: Arc::new(Schema::new(vec![
348                    ColumnSchema::new(
349                        "ts".to_string(),
350                        ConcreteDataType::timestamp_millisecond_datatype(),
351                        false,
352                    ),
353                    ColumnSchema::new(
354                        "my_tag1".to_string(),
355                        ConcreteDataType::string_datatype(),
356                        true,
357                    ),
358                    ColumnSchema::new(
359                        "my_tag2".to_string(),
360                        ConcreteDataType::string_datatype(),
361                        true,
362                    ),
363                    ColumnSchema::new(
364                        "my_field_column".to_string(),
365                        ConcreteDataType::int32_datatype(),
366                        true,
367                    ),
368                ])),
369                primary_key_indices: vec![1, 2],
370                value_indices: vec![2],
371                engine: MITO2_ENGINE.to_string(),
372                next_column_id: 3,
373                options: TableOptions::default(),
374                created_on: DateTime::default(),
375                updated_on: DateTime::default(),
376                partition_key_indices: vec![],
377                column_ids: vec![],
378            },
379            table_type: TableType::Base,
380        }
381    }
382
383    pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
384        let kv_backend = Arc::new(MemoryKvBackend::new());
385
386        let mailbox_sequence =
387            SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
388        let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
389        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
390        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
391            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
392            Arc::new(WalProvider::default()),
393        ));
394        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
395        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
396            Arc::new(SequenceBuilder::new("test", kv_backend).build()),
397        ));
398        DdlContext {
399            node_manager,
400            cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
401                mailbox,
402                MetasrvInfo {
403                    server_addr: "127.0.0.1:4321".to_string(),
404                },
405            )),
406            table_metadata_manager,
407            table_metadata_allocator,
408            flow_metadata_manager,
409            flow_metadata_allocator,
410            memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
411            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
412            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
413        }
414    }
415}