partition/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::Rows;
19use common_meta::cache::{TableRoute, TableRouteCacheRef};
20use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteManager};
21use common_meta::kv_backend::KvBackendRef;
22use common_meta::peer::Peer;
23use common_meta::rpc::router::{self, RegionRoute};
24use snafu::{OptionExt, ResultExt};
25use store_api::storage::{RegionId, RegionNumber};
26use table::metadata::{TableId, TableInfo};
27
28use crate::cache::{CachedPartitionInfo, PartitionInfoCacheRef, PhysicalPartitionInfo};
29use crate::error::{FindLeaderSnafu, Result};
30use crate::expr::PartitionExpr;
31use crate::multi_dim::MultiDimPartitionRule;
32use crate::splitter::RowSplitter;
33use crate::{PartitionRuleRef, error};
34
35pub type PartitionRuleManagerRef = Arc<PartitionRuleManager>;
36
37/// PartitionRuleManager manages the table routes and partition rules.
38/// It provides methods to find regions by:
39/// - values (in case of insertion)
40/// - filters (in case of select, deletion and update)
41pub struct PartitionRuleManager {
42    table_route_manager: TableRouteManager,
43    table_route_cache: TableRouteCacheRef,
44    partition_info_cache: PartitionInfoCacheRef,
45}
46
47#[derive(Debug, Clone)]
48pub struct PartitionInfo {
49    pub id: RegionId,
50    pub partition_expr: Option<PartitionExpr>,
51}
52
53#[derive(Debug, Clone)]
54pub struct PartitionInfoWithVersion {
55    pub id: RegionId,
56    pub partition_expr: Option<PartitionExpr>,
57    pub partition_expr_version: Option<u64>,
58}
59
60impl PartitionRuleManager {
61    pub fn new(
62        kv_backend: KvBackendRef,
63        table_route_cache: TableRouteCacheRef,
64        partition_info_cache: PartitionInfoCacheRef,
65    ) -> Self {
66        Self {
67            table_route_manager: TableRouteManager::new(kv_backend),
68            table_route_cache,
69            partition_info_cache,
70        }
71    }
72
73    pub async fn find_physical_table_route(
74        &self,
75        table_id: TableId,
76    ) -> Result<Arc<PhysicalTableRouteValue>> {
77        match self
78            .table_route_cache
79            .get(table_id)
80            .await
81            .context(error::TableRouteManagerSnafu)?
82            .context(error::TableRouteNotFoundSnafu { table_id })?
83            .as_ref()
84        {
85            TableRoute::Physical(physical_table_route) => Ok(physical_table_route.clone()),
86            TableRoute::Logical(logical_table_route) => {
87                let physical_table_id = logical_table_route.physical_table_id();
88                let physical_table_route = self
89                    .table_route_cache
90                    .get(physical_table_id)
91                    .await
92                    .context(error::TableRouteManagerSnafu)?
93                    .context(error::TableRouteNotFoundSnafu { table_id })?;
94
95                let physical_table_route = physical_table_route
96                    .as_physical_table_route_ref()
97                    .context(error::UnexpectedSnafu{
98                        err_msg: format!(
99                            "Expected the physical table route, but got logical table route, table: {table_id}"
100                        ),
101                    })?;
102
103                Ok(physical_table_route.clone())
104            }
105        }
106    }
107
108    pub async fn batch_find_region_routes(
109        &self,
110        table_ids: &[TableId],
111    ) -> Result<HashMap<TableId, Vec<RegionRoute>>> {
112        let table_routes = self
113            .table_route_manager
114            .batch_get_physical_table_routes(table_ids)
115            .await
116            .context(error::TableRouteManagerSnafu)?;
117
118        let mut table_region_routes = HashMap::with_capacity(table_routes.len());
119
120        for (table_id, table_route) in table_routes {
121            let region_routes = table_route.region_routes;
122            table_region_routes.insert(table_id, region_routes);
123        }
124
125        Ok(table_region_routes)
126    }
127
128    /// Returns the physical partition info with version.
129    pub async fn find_physical_partition_info(
130        &self,
131        table_id: TableId,
132    ) -> Result<Arc<PhysicalPartitionInfo>> {
133        let cached = self
134            .partition_info_cache
135            .get(table_id)
136            .await
137            .context(error::GetPartitionInfoSnafu)?
138            .context(error::TableRouteNotFoundSnafu { table_id })?;
139        match cached {
140            CachedPartitionInfo::Physical(info) => Ok(info),
141            CachedPartitionInfo::Logical(physical_table_id) => {
142                let cached = self
143                    .partition_info_cache
144                    .get(physical_table_id)
145                    .await
146                    .context(error::GetPartitionInfoSnafu)?
147                    .context(error::TableRouteNotFoundSnafu {
148                        table_id: physical_table_id,
149                    })?;
150                let info = cached.into_physical().context(error::UnexpectedSnafu{
151                        err_msg: format!(
152                            "Expected the physical partition info, but got logical partable route, table: {physical_table_id}"
153                        )
154                    })?;
155
156                Ok(info)
157            }
158        }
159    }
160
161    pub async fn batch_find_table_partitions(
162        &self,
163        table_ids: &[TableId],
164    ) -> Result<HashMap<TableId, Vec<PartitionInfo>>> {
165        let batch_region_routes = self.batch_find_region_routes(table_ids).await?;
166
167        let mut results = HashMap::with_capacity(table_ids.len());
168
169        for (table_id, region_routes) in batch_region_routes {
170            results.insert(
171                table_id,
172                create_partitions_from_region_routes(table_id, &region_routes)?,
173            );
174        }
175
176        Ok(results)
177    }
178
179    pub async fn find_table_partition_rule(
180        &self,
181        table_info: &TableInfo,
182    ) -> Result<(PartitionRuleRef, HashMap<RegionNumber, Option<u64>>)> {
183        let partition_columns = table_info
184            .meta
185            .partition_column_names()
186            .cloned()
187            .collect::<Vec<_>>();
188
189        let partition_info = self
190            .find_physical_partition_info(table_info.table_id())
191            .await?;
192        let partition_versions = partition_info
193            .partitions
194            .iter()
195            .map(|r| (r.id.region_number(), r.partition_expr_version))
196            .collect::<HashMap<RegionNumber, Option<u64>>>();
197        let regions = partition_info
198            .partitions
199            .iter()
200            .map(|x| x.id.region_number())
201            .collect::<Vec<RegionNumber>>();
202        let exprs = partition_info
203            .partitions
204            .iter()
205            .filter_map(|x| x.partition_expr.as_ref())
206            .cloned()
207            .collect::<Vec<_>>();
208        let partition_rule = Arc::new(MultiDimPartitionRule::try_new(
209            partition_columns,
210            regions,
211            exprs,
212            false,
213        )?) as _;
214        Ok((partition_rule, partition_versions))
215    }
216
217    /// Find the leader of the region.
218    pub async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer> {
219        let region_routes = &self
220            .find_physical_table_route(region_id.table_id())
221            .await?
222            .region_routes;
223
224        router::find_region_leader(region_routes, region_id.region_number()).context(
225            FindLeaderSnafu {
226                region_id,
227                table_id: region_id.table_id(),
228            },
229        )
230    }
231
232    pub async fn split_rows(
233        &self,
234        table_info: &TableInfo,
235        rows: Rows,
236    ) -> Result<HashMap<RegionNumber, (Rows, Option<u64>)>> {
237        let (partition_rule, partition_versions) =
238            self.find_table_partition_rule(table_info).await?;
239
240        let result = RowSplitter::new(partition_rule)
241            .split(rows)?
242            .into_iter()
243            .map(|(region_number, rows)| {
244                (
245                    region_number,
246                    (
247                        rows,
248                        partition_versions
249                            .get(&region_number)
250                            .copied()
251                            .unwrap_or_default(),
252                    ),
253                )
254            })
255            .collect::<HashMap<_, _>>();
256
257        Ok(result)
258    }
259}
260
261/// Creates partitions from region routes.
262pub fn create_partitions_from_region_routes(
263    table_id: TableId,
264    region_routes: &[RegionRoute],
265) -> Result<Vec<PartitionInfo>> {
266    let mut partitions = Vec::with_capacity(region_routes.len());
267    for r in region_routes {
268        let partition_expr = PartitionExpr::from_json_str(&r.region.partition_expr())?;
269
270        // The region routes belong to the physical table but are shared among all logical tables.
271        // That it to say, the region id points to the physical table, so we need to use the actual
272        // table id (which may be a logical table) to renew the region id.
273        let id = RegionId::new(table_id, r.region.id.region_number());
274        partitions.push(PartitionInfo { id, partition_expr });
275    }
276
277    Ok(partitions)
278}