common_wal/config/kafka/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use common_base::readable_size::ReadableSize;
18use serde::{Deserialize, Serialize};
19
20use crate::config::kafka::common::{
21    KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_AUTO_PRUNE_INTERVAL,
22    DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_TRIGGER_FLUSH_THRESHOLD,
23};
24
25/// Kafka wal configurations for datanode.
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27#[serde(default)]
28pub struct DatanodeKafkaConfig {
29    /// The kafka connection config.
30    #[serde(flatten)]
31    pub connection: KafkaConnectionConfig,
32    /// TODO(weny): Remove the alias once we release v0.9.
33    /// The max size of a single producer batch.
34    #[serde(alias = "max_batch_size")]
35    pub max_batch_bytes: ReadableSize,
36    /// The consumer wait timeout.
37    #[serde(with = "humantime_serde")]
38    pub consumer_wait_timeout: Duration,
39    /// The kafka topic config.
40    #[serde(flatten)]
41    pub kafka_topic: KafkaTopicConfig,
42    // Automatically create topics for WAL.
43    pub auto_create_topics: bool,
44    // Create index for WAL.
45    pub create_index: bool,
46    #[serde(with = "humantime_serde")]
47    pub dump_index_interval: Duration,
48    /// Ignore missing entries during read WAL.
49    pub overwrite_entry_start_id: bool,
50    // Interval of WAL pruning.
51    #[serde(with = "humantime_serde")]
52    pub auto_prune_interval: Duration,
53    // Threshold for sending flush request when pruning remote WAL.
54    // `None` stands for never sending flush request.
55    pub trigger_flush_threshold: u64,
56    // Limit of concurrent active pruning procedures.
57    pub auto_prune_parallelism: usize,
58}
59
60impl Default for DatanodeKafkaConfig {
61    fn default() -> Self {
62        Self {
63            connection: KafkaConnectionConfig::default(),
64            // Warning: Kafka has a default limit of 1MB per message in a topic.
65            max_batch_bytes: ReadableSize::mb(1),
66            consumer_wait_timeout: Duration::from_millis(100),
67            kafka_topic: KafkaTopicConfig::default(),
68            auto_create_topics: true,
69            create_index: true,
70            dump_index_interval: Duration::from_secs(60),
71            overwrite_entry_start_id: false,
72            auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL,
73            trigger_flush_threshold: DEFAULT_TRIGGER_FLUSH_THRESHOLD,
74            auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
75        }
76    }
77}