meta_srv/procedure/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
19use common_meta::peer::Peer;
20use common_telemetry::{info, warn};
21use snafu::ResultExt;
22use store_api::storage::RegionId;
23use tokio::time::Instant;
24
25use crate::error::{self, Error, Result};
26use crate::handler::HeartbeatMailbox;
27use crate::service::mailbox::{Channel, MailboxRef};
28
29pub(crate) enum ErrorStrategy {
30    Ignore,
31    Retry,
32}
33
34fn handle_flush_region_reply(
35    reply: &InstructionReply,
36    region_ids: &[RegionId],
37    msg: &MailboxMessage,
38) -> Result<(bool, Option<String>)> {
39    let result = match reply {
40        InstructionReply::FlushRegions(flush_reply) => {
41            if flush_reply.results.len() != region_ids.len() {
42                return error::UnexpectedInstructionReplySnafu {
43                    mailbox_message: msg.to_string(),
44                    reason: format!(
45                        "expect {} region flush result, but got {}",
46                        region_ids.len(),
47                        flush_reply.results.len()
48                    ),
49                }
50                .fail();
51            }
52
53            match flush_reply.overall_success {
54                true => (true, None),
55                false => (
56                    false,
57                    Some(
58                        flush_reply
59                            .results
60                            .iter()
61                            .filter_map(|(region_id, result)| match result {
62                                Ok(_) => None,
63                                Err(e) => Some(format!("{}: {:?}", region_id, e)),
64                            })
65                            .collect::<Vec<String>>()
66                            .join("; "),
67                    ),
68                ),
69            }
70        }
71        _ => {
72            return error::UnexpectedInstructionReplySnafu {
73                mailbox_message: msg.to_string(),
74                reason: "expect flush region reply",
75            }
76            .fail();
77        }
78    };
79
80    Ok(result)
81}
82
83/// Flushes the regions on the datanode.
84///
85/// Retry Or Ignore:
86/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
87/// - Failed to flush region on the Datanode.
88///
89/// Abort:
90/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
91/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
92/// - [ExceededDeadline](error::Error::ExceededDeadline)
93/// - Invalid JSON.
94pub(crate) async fn flush_region(
95    mailbox: &MailboxRef,
96    server_addr: &str,
97    region_ids: &[RegionId],
98    datanode: &Peer,
99    timeout: Duration,
100    error_strategy: ErrorStrategy,
101) -> Result<()> {
102    let flush_instruction = Instruction::FlushRegions(FlushRegions::sync_batch(
103        region_ids.to_vec(),
104        FlushErrorStrategy::TryAll,
105    ));
106
107    let msg = MailboxMessage::json_message(
108        &format!("Flush regions: {:?}", region_ids),
109        &format!("Metasrv@{}", server_addr),
110        &format!("Datanode-{}@{}", datanode.id, datanode.addr),
111        common_time::util::current_time_millis(),
112        &flush_instruction,
113    )
114    .with_context(|_| error::SerializeToJsonSnafu {
115        input: flush_instruction.to_string(),
116    })?;
117
118    let ch = Channel::Datanode(datanode.id);
119    let now = Instant::now();
120    let receiver = mailbox.send(&ch, msg, timeout).await;
121    let receiver = match receiver {
122        Ok(receiver) => receiver,
123        Err(error::Error::PusherNotFound { .. }) => match error_strategy {
124            ErrorStrategy::Ignore => {
125                warn!(
126                    "Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
127                    region_ids, datanode
128                );
129                return Ok(());
130            }
131            ErrorStrategy::Retry => error::RetryLaterSnafu {
132                reason: format!(
133                    "Pusher not found for flush regions on datanode {:?}, elapsed: {:?}",
134                    datanode,
135                    now.elapsed()
136                ),
137            }
138            .fail()?,
139        },
140        Err(err) => {
141            return Err(err);
142        }
143    };
144
145    match receiver.await {
146        Ok(msg) => {
147            let reply = HeartbeatMailbox::json_reply(&msg)?;
148            info!(
149                "Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}",
150                reply,
151                region_ids,
152                now.elapsed()
153            );
154            let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?;
155            if let Some(error) = error {
156                match error_strategy {
157                    ErrorStrategy::Ignore => {
158                        warn!(
159                            "Failed to flush regions {:?}, the datanode({}) error is ignored: {}",
160                            region_ids, datanode, error
161                        );
162                    }
163                    ErrorStrategy::Retry => {
164                        return error::RetryLaterSnafu {
165                            reason: format!(
166                                "Failed to flush regions {:?}, the datanode({}) error is retried: {}",
167                                region_ids,
168                                datanode,
169                                error,
170                            ),
171                        }
172                        .fail()?;
173                    }
174                }
175            } else if result {
176                info!(
177                    "The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}",
178                    region_ids,
179                    datanode,
180                    now.elapsed()
181                );
182            }
183
184            Ok(())
185        }
186        Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
187            operation: "Flush regions",
188        }
189        .fail(),
190        Err(err) => Err(err),
191    }
192}
193
194#[cfg(any(test, feature = "mock"))]
195pub mod mock {
196    use std::io::Error;
197    use std::sync::Arc;
198
199    use api::v1::region::region_server::RegionServer;
200    use api::v1::region::{RegionResponse, region_request};
201    use api::v1::{ResponseHeader, Status as PbStatus};
202    use async_trait::async_trait;
203    use client::Client;
204    use common_grpc::channel_manager::ChannelManager;
205    use common_meta::peer::Peer;
206    use common_runtime::runtime::BuilderBuild;
207    use common_runtime::{Builder as RuntimeBuilder, Runtime};
208    use hyper_util::rt::TokioIo;
209    use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
210    use tokio::sync::mpsc;
211    use tonic::codec::CompressionEncoding;
212    use tonic::transport::Server;
213    use tower::service_fn;
214
215    /// An mock implementation of region server that simply echoes the request.
216    #[derive(Clone)]
217    pub struct EchoRegionServer {
218        runtime: Runtime,
219        received_requests: mpsc::Sender<region_request::Body>,
220    }
221
222    impl EchoRegionServer {
223        pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
224            let (tx, rx) = mpsc::channel(10);
225            (
226                Self {
227                    runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
228                    received_requests: tx,
229                },
230                rx,
231            )
232        }
233
234        pub fn new_client(&self, datanode: &Peer) -> Client {
235            let (client, server) = tokio::io::duplex(1024);
236
237            let handler =
238                RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
239
240            tokio::spawn(async move {
241                Server::builder()
242                    .add_service(
243                        RegionServer::new(handler)
244                            .accept_compressed(CompressionEncoding::Gzip)
245                            .accept_compressed(CompressionEncoding::Zstd)
246                            .send_compressed(CompressionEncoding::Gzip)
247                            .send_compressed(CompressionEncoding::Zstd),
248                    )
249                    .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
250                    .await
251            });
252
253            let channel_manager = ChannelManager::new();
254            let mut client = Some(client);
255            channel_manager
256                .reset_with_connector(
257                    datanode.addr.clone(),
258                    service_fn(move |_| {
259                        let client = client.take().unwrap();
260                        async move { Ok::<_, Error>(TokioIo::new(client)) }
261                    }),
262                )
263                .unwrap();
264            Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
265        }
266    }
267
268    #[async_trait]
269    impl RegionServerHandler for EchoRegionServer {
270        async fn handle(
271            &self,
272            request: region_request::Body,
273        ) -> servers::error::Result<RegionResponse> {
274            self.received_requests.send(request).await.unwrap();
275
276            Ok(RegionResponse {
277                header: Some(ResponseHeader {
278                    status: Some(PbStatus {
279                        status_code: 0,
280                        err_msg: String::default(),
281                    }),
282                }),
283                affected_rows: 0,
284                extensions: Default::default(),
285                metadata: Vec::new(),
286            })
287        }
288    }
289}
290
291#[cfg(test)]
292pub mod test_data {
293    use std::sync::Arc;
294
295    use chrono::DateTime;
296    use common_catalog::consts::MITO2_ENGINE;
297    use common_meta::ddl::flow_meta::FlowMetadataAllocator;
298    use common_meta::ddl::table_meta::TableMetadataAllocator;
299    use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
300    use common_meta::key::TableMetadataManager;
301    use common_meta::key::flow::FlowMetadataManager;
302    use common_meta::kv_backend::memory::MemoryKvBackend;
303    use common_meta::node_manager::NodeManagerRef;
304    use common_meta::peer::Peer;
305    use common_meta::region_keeper::MemoryRegionKeeper;
306    use common_meta::region_registry::LeaderRegionRegistry;
307    use common_meta::rpc::router::RegionRoute;
308    use common_meta::sequence::SequenceBuilder;
309    use common_meta::wal_provider::WalProvider;
310    use datatypes::prelude::ConcreteDataType;
311    use datatypes::schema::{ColumnSchema, RawSchema};
312    use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
313    use table::requests::TableOptions;
314
315    use crate::cache_invalidator::MetasrvCacheInvalidator;
316    use crate::handler::{HeartbeatMailbox, Pushers};
317    use crate::metasrv::MetasrvInfo;
318    use crate::test_util::new_region_route;
319
320    pub fn new_region_routes() -> Vec<RegionRoute> {
321        let peers = vec![
322            Peer::new(1, "127.0.0.1:4001"),
323            Peer::new(2, "127.0.0.1:4002"),
324            Peer::new(3, "127.0.0.1:4003"),
325        ];
326        vec![
327            new_region_route(1, &peers, 3),
328            new_region_route(2, &peers, 2),
329            new_region_route(3, &peers, 1),
330        ]
331    }
332
333    pub fn new_table_info() -> RawTableInfo {
334        RawTableInfo {
335            ident: TableIdent {
336                table_id: 42,
337                version: 1,
338            },
339            name: "my_table".to_string(),
340            desc: Some("blabla".to_string()),
341            catalog_name: "my_catalog".to_string(),
342            schema_name: "my_schema".to_string(),
343            meta: RawTableMeta {
344                schema: RawSchema {
345                    column_schemas: vec![
346                        ColumnSchema::new(
347                            "ts".to_string(),
348                            ConcreteDataType::timestamp_millisecond_datatype(),
349                            false,
350                        ),
351                        ColumnSchema::new(
352                            "my_tag1".to_string(),
353                            ConcreteDataType::string_datatype(),
354                            true,
355                        ),
356                        ColumnSchema::new(
357                            "my_tag2".to_string(),
358                            ConcreteDataType::string_datatype(),
359                            true,
360                        ),
361                        ColumnSchema::new(
362                            "my_field_column".to_string(),
363                            ConcreteDataType::int32_datatype(),
364                            true,
365                        ),
366                    ],
367                    timestamp_index: Some(0),
368                    version: 0,
369                },
370                primary_key_indices: vec![1, 2],
371                value_indices: vec![2],
372                engine: MITO2_ENGINE.to_string(),
373                next_column_id: 3,
374                options: TableOptions::default(),
375                created_on: DateTime::default(),
376                updated_on: DateTime::default(),
377                partition_key_indices: vec![],
378                column_ids: vec![],
379            },
380            table_type: TableType::Base,
381        }
382    }
383
384    pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
385        let kv_backend = Arc::new(MemoryKvBackend::new());
386
387        let mailbox_sequence =
388            SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
389        let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
390        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
391        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
392            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
393            Arc::new(WalProvider::default()),
394        ));
395        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
396        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
397            Arc::new(SequenceBuilder::new("test", kv_backend).build()),
398        ));
399        DdlContext {
400            node_manager,
401            cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
402                mailbox,
403                MetasrvInfo {
404                    server_addr: "127.0.0.1:4321".to_string(),
405                },
406            )),
407            table_metadata_manager,
408            table_metadata_allocator,
409            flow_metadata_manager,
410            flow_metadata_allocator,
411            memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
412            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
413            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
414        }
415    }
416}