Skip to main content

common_meta/cache/table/
table_route.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::BoxFuture;
18use moka::future::Cache;
19use snafu::OptionExt;
20use store_api::storage::TableId;
21
22use crate::cache::container::InitStrategy;
23use crate::cache::{CacheContainer, Initializer};
24use crate::error;
25use crate::error::Result;
26use crate::instruction::CacheIdent;
27use crate::key::table_route::{
28    LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteManager, TableRouteManagerRef,
29    TableRouteValue,
30};
31use crate::kv_backend::KvBackendRef;
32
33/// [TableRoute] stores `Arc` wrapped table route.
34#[derive(Clone)]
35pub enum TableRoute {
36    Physical(Arc<PhysicalTableRouteValue>),
37    Logical(Arc<LogicalTableRouteValue>),
38}
39
40impl TableRoute {
41    /// Returns true if it's physical table.
42    pub fn is_physical(&self) -> bool {
43        matches!(self, TableRoute::Physical(_))
44    }
45
46    /// Returns [PhysicalTableRouteValue] reference if it's [TableRoute::Physical]; Otherwise it returns [None].
47    pub fn as_physical_table_route_ref(&self) -> Option<&Arc<PhysicalTableRouteValue>> {
48        match self {
49            TableRoute::Physical(table_route) => Some(table_route),
50            TableRoute::Logical(_) => None,
51        }
52    }
53}
54
55/// [TableRouteCache] caches the [TableId] to [TableRoute] mapping.
56pub type TableRouteCache = CacheContainer<TableId, Arc<TableRoute>, CacheIdent>;
57
58pub type TableRouteCacheRef = Arc<TableRouteCache>;
59
60/// Constructs a [TableRouteCache].
61pub fn new_table_route_cache(
62    name: String,
63    cache: Cache<TableId, Arc<TableRoute>>,
64    kv_backend: KvBackendRef,
65) -> TableRouteCache {
66    let table_info_manager = Arc::new(TableRouteManager::new(kv_backend));
67    let init = init_factory(table_info_manager);
68
69    CacheContainer::with_strategy(
70        name,
71        cache,
72        Box::new(invalidator),
73        init,
74        filter,
75        InitStrategy::VersionChecked,
76    )
77}
78
79fn init_factory(
80    table_route_manager: TableRouteManagerRef,
81) -> Initializer<TableId, Arc<TableRoute>> {
82    Arc::new(move |table_id| {
83        let table_route_manager = table_route_manager.clone();
84        Box::pin(async move {
85            let table_route_value = table_route_manager
86                .table_route_storage()
87                .get(*table_id)
88                .await?
89                .context(error::ValueNotExistSnafu {})?;
90
91            let table_route = match table_route_value {
92                TableRouteValue::Physical(physical) => TableRoute::Physical(Arc::new(physical)),
93                TableRouteValue::Logical(logical) => TableRoute::Logical(Arc::new(logical)),
94            };
95
96            Ok(Some(Arc::new(table_route)))
97        })
98    })
99}
100
101fn invalidator<'a>(
102    cache: &'a Cache<TableId, Arc<TableRoute>>,
103    idents: &'a [&CacheIdent],
104) -> BoxFuture<'a, Result<()>> {
105    Box::pin(async move {
106        for ident in idents {
107            if let CacheIdent::TableId(table_id) = ident {
108                cache.invalidate(table_id).await
109            }
110        }
111        Ok(())
112    })
113}
114
115fn filter(ident: &CacheIdent) -> bool {
116    matches!(ident, CacheIdent::TableId(_))
117}
118
119#[cfg(test)]
120mod tests {
121    use std::collections::HashMap;
122    use std::sync::Arc;
123
124    use moka::future::CacheBuilder;
125    use store_api::storage::RegionId;
126
127    use super::*;
128    use crate::ddl::test_util::create_table::test_create_table_task;
129    use crate::key::TableMetadataManager;
130    use crate::key::table_route::TableRouteValue;
131    use crate::kv_backend::memory::MemoryKvBackend;
132    use crate::peer::Peer;
133    use crate::rpc::router::{Region, RegionRoute};
134
135    #[tokio::test]
136    async fn test_cache() {
137        let mem_kv = Arc::new(MemoryKvBackend::default());
138        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
139        let cache = CacheBuilder::new(128).build();
140        let cache = new_table_route_cache("test".to_string(), cache, mem_kv.clone());
141
142        let result = cache.get(1024).await.unwrap();
143        assert!(result.is_none());
144        let task = test_create_table_task("my_table", 1024);
145        let table_id = 10;
146        let region_id = RegionId::new(table_id, 1);
147        let peer = Peer::empty(1);
148        let region_routes = vec![RegionRoute {
149            region: Region::new_test(region_id),
150            leader_peer: Some(peer.clone()),
151            ..Default::default()
152        }];
153        table_metadata_manager
154            .create_table_metadata(
155                task.table_info.clone(),
156                TableRouteValue::physical(region_routes.clone()),
157                HashMap::new(),
158            )
159            .await
160            .unwrap();
161        let table_route = cache.get(1024).await.unwrap().unwrap();
162        assert_eq!(
163            (*table_route)
164                .clone()
165                .as_physical_table_route_ref()
166                .unwrap()
167                .region_routes,
168            region_routes
169        );
170
171        assert!(cache.contains_key(&1024));
172        cache
173            .invalidate(&[CacheIdent::TableId(1024)])
174            .await
175            .unwrap();
176        assert!(!cache.contains_key(&1024));
177    }
178}