datanode/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use common_meta::key::datanode_table::DatanodeTableManager;
18use common_meta::key::topic_region::{TopicRegionKey, TopicRegionManager, TopicRegionValue};
19use common_meta::kv_backend::KvBackendRef;
20use common_meta::wal_options_allocator::{extract_topic_from_wal_options, prepare_wal_options};
21use common_meta::DatanodeId;
22use futures::TryStreamExt;
23use snafu::ResultExt;
24use store_api::path_utils::table_dir;
25use store_api::region_request::{PathType, RegionOpenRequest, ReplayCheckpoint};
26use store_api::storage::{RegionId, RegionNumber};
27use tracing::info;
28
29use crate::error::{GetMetadataSnafu, Result};
30
31/// The requests to open regions.
32pub(crate) struct RegionOpenRequests {
33    pub leader_regions: Vec<(RegionId, RegionOpenRequest)>,
34    #[cfg(feature = "enterprise")]
35    pub follower_regions: Vec<(RegionId, RegionOpenRequest)>,
36}
37
38fn group_region_by_topic(
39    region_id: RegionId,
40    region_options: &HashMap<RegionNumber, String>,
41    topic_regions: &mut HashMap<String, Vec<RegionId>>,
42) {
43    if let Some(topic) = extract_topic_from_wal_options(region_id, region_options) {
44        topic_regions.entry(topic).or_default().push(region_id);
45    }
46}
47
48fn get_replay_checkpoint(
49    region_id: RegionId,
50    topic_region_values: &Option<HashMap<RegionId, TopicRegionValue>>,
51) -> Option<ReplayCheckpoint> {
52    let topic_region_values = topic_region_values.as_ref()?;
53    let topic_region_value = topic_region_values.get(&region_id);
54    let replay_checkpoint = topic_region_value.and_then(|value| value.checkpoint);
55    replay_checkpoint.map(|checkpoint| ReplayCheckpoint {
56        entry_id: checkpoint.entry_id,
57        metadata_entry_id: checkpoint.metadata_entry_id,
58    })
59}
60
61pub(crate) async fn build_region_open_requests(
62    node_id: DatanodeId,
63    kv_backend: KvBackendRef,
64) -> Result<RegionOpenRequests> {
65    let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
66    let table_values = datanode_table_manager
67        .tables(node_id)
68        .try_collect::<Vec<_>>()
69        .await
70        .context(GetMetadataSnafu)?;
71
72    let topic_region_manager = TopicRegionManager::new(kv_backend);
73    let mut topic_regions = HashMap::<String, Vec<RegionId>>::new();
74    let mut regions = vec![];
75    #[cfg(feature = "enterprise")]
76    let mut follower_regions = vec![];
77
78    for table_value in table_values {
79        for region_number in table_value.regions {
80            let region_id = RegionId::new(table_value.table_id, region_number);
81            // Augments region options with wal options if a wal options is provided.
82            let mut region_options = table_value.region_info.region_options.clone();
83            prepare_wal_options(
84                &mut region_options,
85                region_id,
86                &table_value.region_info.region_wal_options,
87            );
88            group_region_by_topic(
89                region_id,
90                &table_value.region_info.region_wal_options,
91                &mut topic_regions,
92            );
93
94            regions.push((
95                region_id,
96                table_value.region_info.engine.clone(),
97                table_value.region_info.region_storage_path.clone(),
98                region_options,
99            ));
100        }
101
102        #[cfg(feature = "enterprise")]
103        for region_number in table_value.follower_regions {
104            let region_id = RegionId::new(table_value.table_id, region_number);
105            // Augments region options with wal options if a wal options is provided.
106            let mut region_options = table_value.region_info.region_options.clone();
107            prepare_wal_options(
108                &mut region_options,
109                RegionId::new(table_value.table_id, region_number),
110                &table_value.region_info.region_wal_options,
111            );
112            group_region_by_topic(
113                region_id,
114                &table_value.region_info.region_wal_options,
115                &mut topic_regions,
116            );
117
118            follower_regions.push((
119                RegionId::new(table_value.table_id, region_number),
120                table_value.region_info.engine.clone(),
121                table_value.region_info.region_storage_path.clone(),
122                region_options,
123            ));
124        }
125    }
126
127    let topic_region_values = if !topic_regions.is_empty() {
128        let keys = topic_regions
129            .iter()
130            .flat_map(|(topic, regions)| {
131                regions
132                    .iter()
133                    .map(|region_id| TopicRegionKey::new(*region_id, topic))
134            })
135            .collect::<Vec<_>>();
136        let topic_region_manager = topic_region_manager
137            .batch_get(keys)
138            .await
139            .context(GetMetadataSnafu)?;
140        Some(topic_region_manager)
141    } else {
142        None
143    };
144
145    let mut leader_region_requests = Vec::with_capacity(regions.len());
146    for (region_id, engine, store_path, options) in regions {
147        let table_dir = table_dir(&store_path, region_id.table_id());
148        let checkpoint = get_replay_checkpoint(region_id, &topic_region_values);
149        info!("region_id: {}, checkpoint: {:?}", region_id, checkpoint);
150        leader_region_requests.push((
151            region_id,
152            RegionOpenRequest {
153                engine,
154                table_dir,
155                path_type: PathType::Bare,
156                options,
157                skip_wal_replay: false,
158                checkpoint,
159            },
160        ));
161    }
162
163    #[cfg(feature = "enterprise")]
164    let follower_region_requests = {
165        let mut follower_region_requests = Vec::with_capacity(follower_regions.len());
166        for (region_id, engine, store_path, options) in follower_regions {
167            let table_dir = table_dir(&store_path, region_id.table_id());
168            follower_region_requests.push((
169                region_id,
170                RegionOpenRequest {
171                    engine,
172                    table_dir,
173                    path_type: PathType::Bare,
174                    options,
175                    skip_wal_replay: true,
176                    checkpoint: None,
177                },
178            ));
179        }
180        follower_region_requests
181    };
182
183    Ok(RegionOpenRequests {
184        leader_regions: leader_region_requests,
185        #[cfg(feature = "enterprise")]
186        follower_regions: follower_region_requests,
187    })
188}