Skip to main content

partition/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_base::hash::partition_expr_version;
18use common_error::ext::BoxedError;
19use common_meta::cache::{CacheContainer, Initializer, TableRoute, TableRouteCacheRef};
20use common_meta::instruction::CacheIdent;
21use common_meta::rpc::router::RegionRoute;
22use moka::future::Cache;
23use snafu::ResultExt;
24use store_api::storage::{RegionId, TableId};
25
26use crate::expr::PartitionExpr;
27use crate::manager::PartitionInfoWithVersion;
28
29#[derive(Debug, Clone)]
30pub struct PhysicalPartitionInfo {
31    pub partitions: Vec<PartitionInfoWithVersion>,
32}
33
34#[derive(Debug, Clone)]
35pub enum CachedPartitionInfo {
36    Physical(Arc<PhysicalPartitionInfo>),
37    Logical(TableId),
38}
39
40impl CachedPartitionInfo {
41    /// Returns the physical partitions if the cached partition info is physical.
42    pub fn into_physical(self) -> Option<Arc<PhysicalPartitionInfo>> {
43        match self {
44            CachedPartitionInfo::Physical(partitions) => Some(partitions),
45            CachedPartitionInfo::Logical(_) => None,
46        }
47    }
48}
49
50pub type PartitionInfoCache = CacheContainer<TableId, CachedPartitionInfo, CacheIdent>;
51
52pub type PartitionInfoCacheRef = Arc<PartitionInfoCache>;
53
54pub fn create_partitions_with_version_from_region_routes(
55    table_id: TableId,
56    region_routes: &[RegionRoute],
57) -> common_meta::error::Result<Vec<PartitionInfoWithVersion>> {
58    let mut partitions = Vec::with_capacity(region_routes.len());
59    for r in region_routes {
60        // Ignore regions marked as reject-all-writes; they should not participate
61        // in writable partition-cache construction.
62        if r.is_ignore_all_writes() {
63            continue;
64        }
65
66        let expr_json = r.region.partition_expr();
67        let partition_expr_version = if expr_json.is_empty() {
68            None
69        } else {
70            Some(partition_expr_version(Some(expr_json.as_str())))
71        };
72        let partition_expr = PartitionExpr::from_json_str(expr_json.as_str())
73            .map_err(BoxedError::new)
74            .context(common_meta::error::ExternalSnafu)?;
75        let id = RegionId::new(table_id, r.region.id.region_number());
76        partitions.push(PartitionInfoWithVersion {
77            id,
78            partition_expr,
79            partition_expr_version,
80        });
81    }
82
83    Ok(partitions)
84}
85
86fn init_factory(
87    table_route_cache: TableRouteCacheRef,
88) -> Initializer<TableId, CachedPartitionInfo> {
89    Arc::new(move |table_id: &TableId| {
90        let table_route_cache = table_route_cache.clone();
91        Box::pin(async move {
92            let Some(table_route) = table_route_cache.get(*table_id).await? else {
93                return Ok(None);
94            };
95
96            let cached = match table_route.as_ref() {
97                TableRoute::Physical(physical) => {
98                    let partitions = create_partitions_with_version_from_region_routes(
99                        *table_id,
100                        &physical.region_routes,
101                    )?;
102
103                    CachedPartitionInfo::Physical(Arc::new(PhysicalPartitionInfo { partitions }))
104                }
105                TableRoute::Logical(logical) => {
106                    let table_id = logical.physical_table_id();
107                    CachedPartitionInfo::Logical(table_id)
108                }
109            };
110
111            Ok(Some(cached))
112        })
113    })
114}
115
116pub fn new_partition_info_cache(
117    name: String,
118    cache: Cache<TableId, CachedPartitionInfo>,
119    table_route_cache: TableRouteCacheRef,
120) -> PartitionInfoCache {
121    CacheContainer::new(
122        name,
123        cache,
124        Box::new(|cache, idents| {
125            Box::pin(async move {
126                for ident in idents {
127                    if let CacheIdent::TableId(table_id) = ident {
128                        cache.invalidate(table_id).await
129                    }
130                }
131                Ok(())
132            })
133        }),
134        init_factory(table_route_cache),
135        |ident| matches!(ident, CacheIdent::TableId(_)),
136    )
137}
138
139#[cfg(test)]
140mod tests {
141    use common_base::hash::partition_expr_version;
142    use common_meta::rpc::router::{Region, RegionRoute, WriteRoutePolicy};
143    use store_api::storage::RegionId;
144
145    use super::create_partitions_with_version_from_region_routes;
146
147    #[test]
148    fn test_create_partitions_with_version_excludes_reject_all_writes() {
149        let table_id = 1024;
150        let expr_json =
151            r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
152        let region_routes = vec![
153            RegionRoute {
154                region: Region {
155                    id: RegionId::new(table_id, 1),
156                    partition_expr: expr_json.to_string(),
157                    ..Default::default()
158                },
159                leader_peer: None,
160                follower_peers: vec![],
161                leader_state: None,
162                leader_down_since: None,
163                write_route_policy: Some(WriteRoutePolicy::IgnoreAllWrites),
164            },
165            RegionRoute {
166                region: Region {
167                    id: RegionId::new(table_id, 2),
168                    partition_expr: expr_json.to_string(),
169                    ..Default::default()
170                },
171                ..Default::default()
172            },
173        ];
174
175        let partitions =
176            create_partitions_with_version_from_region_routes(table_id, &region_routes).unwrap();
177        assert_eq!(1, partitions.len());
178        assert_eq!(RegionId::new(table_id, 2), partitions[0].id);
179        assert_eq!(
180            Some(partition_expr_version(Some(expr_json))),
181            partitions[0].partition_expr_version
182        );
183    }
184}