Skip to main content

datanode/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use common_meta::DatanodeId;
18use common_meta::key::datanode_table::DatanodeTableManager;
19use common_meta::key::topic_name::{TopicNameKey, TopicNameManager, TopicNameValue};
20use common_meta::key::topic_region::{
21    ReplayCheckpoint as MetadataReplayCheckpoint, TopicRegionKey, TopicRegionManager,
22    TopicRegionValue,
23};
24use common_meta::kv_backend::KvBackendRef;
25use common_meta::wal_provider::{extract_topic_from_wal_options, prepare_wal_options};
26use futures::TryStreamExt;
27use snafu::ResultExt;
28use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
29use store_api::path_utils::table_dir;
30use store_api::region_request::{PathType, RegionOpenRequest, ReplayCheckpoint};
31use store_api::storage::{RegionId, RegionNumber};
32use tracing::info;
33
34use crate::error::{GetMetadataSnafu, Result};
35
36/// The requests to open regions.
37pub struct RegionOpenRequests {
38    pub(crate) leader_regions: Vec<(RegionId, RegionOpenRequest)>,
39    #[cfg(feature = "enterprise")]
40    pub(crate) follower_regions: Vec<(RegionId, RegionOpenRequest)>,
41}
42
43impl RegionOpenRequests {
44    /// Splits the request set into leader and follower regions.
45    #[allow(clippy::type_complexity)]
46    pub fn into_parts(
47        self,
48    ) -> (
49        Vec<(RegionId, RegionOpenRequest)>,
50        Vec<(RegionId, RegionOpenRequest)>,
51    ) {
52        let leader_regions = self.leader_regions;
53        #[cfg(feature = "enterprise")]
54        let follower_regions = self.follower_regions;
55        #[cfg(not(feature = "enterprise"))]
56        let follower_regions = Vec::new();
57        (leader_regions, follower_regions)
58    }
59}
60
61fn group_region_by_topic(
62    region_id: RegionId,
63    region_options: &HashMap<RegionNumber, String>,
64    topic_regions: &mut HashMap<String, Vec<RegionId>>,
65) {
66    if let Some(topic) = extract_topic_from_wal_options(region_id, region_options) {
67        topic_regions.entry(topic).or_default().push(region_id);
68    }
69}
70
71fn region_pruned_entry_ids(
72    topic_regions: &HashMap<String, Vec<RegionId>>,
73    topic_name_values: &HashMap<String, TopicNameValue>,
74) -> HashMap<RegionId, u64> {
75    topic_regions
76        .iter()
77        .flat_map(|(topic, region_ids)| {
78            topic_name_values
79                .get(topic)
80                .into_iter()
81                .flat_map(move |value| {
82                    region_ids
83                        .iter()
84                        .map(move |region_id| (*region_id, value.pruned_entry_id))
85                })
86        })
87        .collect()
88}
89
90fn get_replay_checkpoint(
91    region_id: RegionId,
92    topic_region_values: &Option<HashMap<RegionId, TopicRegionValue>>,
93    pruned_entry_id: Option<u64>,
94    is_metric_engine: bool,
95) -> Option<ReplayCheckpoint> {
96    let checkpoint = topic_region_values
97        .as_ref()
98        .and_then(|values| values.get(&region_id))
99        .and_then(|value| value.checkpoint)
100        .map(|checkpoint| {
101            MetadataReplayCheckpoint::new(checkpoint.entry_id, checkpoint.metadata_entry_id)
102        });
103
104    MetadataReplayCheckpoint::merge_with_topic_pruned_entry_id(
105        checkpoint,
106        pruned_entry_id,
107        is_metric_engine,
108    )
109    .map(|checkpoint| ReplayCheckpoint {
110        entry_id: checkpoint.entry_id,
111        metadata_entry_id: checkpoint.metadata_entry_id,
112    })
113}
114
115/// Builds region-open requests from persisted metadata.
116pub async fn build_region_open_requests(
117    node_id: DatanodeId,
118    kv_backend: KvBackendRef,
119) -> Result<RegionOpenRequests> {
120    let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
121    let table_values = datanode_table_manager
122        .tables(node_id)
123        .try_collect::<Vec<_>>()
124        .await
125        .context(GetMetadataSnafu)?;
126
127    let topic_region_manager = TopicRegionManager::new(kv_backend.clone());
128    let topic_name_manager = TopicNameManager::new(kv_backend);
129    let mut topic_regions = HashMap::<String, Vec<RegionId>>::new();
130    let mut regions = vec![];
131    #[cfg(feature = "enterprise")]
132    let mut follower_regions = vec![];
133
134    for table_value in table_values {
135        for region_number in table_value.regions {
136            let region_id = RegionId::new(table_value.table_id, region_number);
137            // Augments region options with wal options if a wal options is provided.
138            let mut region_options = table_value.region_info.region_options.clone();
139            prepare_wal_options(
140                &mut region_options,
141                region_id,
142                &table_value.region_info.region_wal_options,
143            );
144            group_region_by_topic(
145                region_id,
146                &table_value.region_info.region_wal_options,
147                &mut topic_regions,
148            );
149
150            regions.push((
151                region_id,
152                table_value.region_info.engine.clone(),
153                table_value.region_info.region_storage_path.clone(),
154                region_options,
155            ));
156        }
157
158        #[cfg(feature = "enterprise")]
159        for region_number in table_value.follower_regions {
160            let region_id = RegionId::new(table_value.table_id, region_number);
161            // Augments region options with wal options if a wal options is provided.
162            let mut region_options = table_value.region_info.region_options.clone();
163            prepare_wal_options(
164                &mut region_options,
165                RegionId::new(table_value.table_id, region_number),
166                &table_value.region_info.region_wal_options,
167            );
168            group_region_by_topic(
169                region_id,
170                &table_value.region_info.region_wal_options,
171                &mut topic_regions,
172            );
173
174            follower_regions.push((
175                RegionId::new(table_value.table_id, region_number),
176                table_value.region_info.engine.clone(),
177                table_value.region_info.region_storage_path.clone(),
178                region_options,
179            ));
180        }
181    }
182
183    let topic_region_values = if !topic_regions.is_empty() {
184        let keys = topic_regions
185            .iter()
186            .flat_map(|(topic, regions)| {
187                regions
188                    .iter()
189                    .map(|region_id| TopicRegionKey::new(*region_id, topic))
190            })
191            .collect::<Vec<_>>();
192        let topic_region_manager = topic_region_manager
193            .batch_get(keys)
194            .await
195            .context(GetMetadataSnafu)?;
196        Some(topic_region_manager)
197    } else {
198        None
199    };
200
201    let topic_name_values = if !topic_regions.is_empty() {
202        let topics = topic_regions
203            .keys()
204            .map(|topic| TopicNameKey::new(topic))
205            .collect::<Vec<_>>();
206        Some(
207            topic_name_manager
208                .batch_get(topics)
209                .await
210                .context(GetMetadataSnafu)?,
211        )
212    } else {
213        None
214    };
215    let region_pruned_entry_ids = topic_name_values
216        .as_ref()
217        .map(|values| region_pruned_entry_ids(&topic_regions, values));
218
219    let mut leader_region_requests = Vec::with_capacity(regions.len());
220    for (region_id, engine, store_path, options) in regions {
221        let table_dir = table_dir(&store_path, region_id.table_id());
222        let pruned_entry_id = region_pruned_entry_ids
223            .as_ref()
224            .and_then(|values| values.get(&region_id).copied());
225        let checkpoint = get_replay_checkpoint(
226            region_id,
227            &topic_region_values,
228            pruned_entry_id,
229            engine == METRIC_ENGINE_NAME,
230        );
231        info!("region_id: {}, checkpoint: {:?}", region_id, checkpoint);
232        leader_region_requests.push((
233            region_id,
234            RegionOpenRequest {
235                engine,
236                table_dir,
237                path_type: PathType::Bare,
238                options,
239                skip_wal_replay: false,
240                checkpoint,
241                requirements: Default::default(),
242            },
243        ));
244    }
245
246    #[cfg(feature = "enterprise")]
247    let follower_region_requests = {
248        let mut follower_region_requests = Vec::with_capacity(follower_regions.len());
249        for (region_id, engine, store_path, options) in follower_regions {
250            let table_dir = table_dir(&store_path, region_id.table_id());
251            follower_region_requests.push((
252                region_id,
253                RegionOpenRequest {
254                    engine,
255                    table_dir,
256                    path_type: PathType::Bare,
257                    options,
258                    skip_wal_replay: true,
259                    checkpoint: None,
260                    requirements: Default::default(),
261                },
262            ));
263        }
264        follower_region_requests
265    };
266
267    Ok(RegionOpenRequests {
268        leader_regions: leader_region_requests,
269        #[cfg(feature = "enterprise")]
270        follower_regions: follower_region_requests,
271    })
272}