common_meta/cache/table/
table_route.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::BoxFuture;
18use moka::future::Cache;
19use snafu::OptionExt;
20use store_api::storage::TableId;
21
22use crate::cache::{CacheContainer, Initializer};
23use crate::error;
24use crate::error::Result;
25use crate::instruction::CacheIdent;
26use crate::key::table_route::{
27    LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteManager, TableRouteManagerRef,
28    TableRouteValue,
29};
30use crate::kv_backend::KvBackendRef;
31
32/// [TableRoute] stores `Arc` wrapped table route.
33#[derive(Clone)]
34pub enum TableRoute {
35    Physical(Arc<PhysicalTableRouteValue>),
36    Logical(Arc<LogicalTableRouteValue>),
37}
38
39impl TableRoute {
40    /// Returns true if it's physical table.
41    pub fn is_physical(&self) -> bool {
42        matches!(self, TableRoute::Physical(_))
43    }
44
45    /// Returns [PhysicalTableRouteValue] reference if it's [TableRoute::Physical]; Otherwise it returns [None].
46    pub fn as_physical_table_route_ref(&self) -> Option<&Arc<PhysicalTableRouteValue>> {
47        match self {
48            TableRoute::Physical(table_route) => Some(table_route),
49            TableRoute::Logical(_) => None,
50        }
51    }
52}
53
54/// [TableRouteCache] caches the [TableId] to [TableRoute] mapping.
55pub type TableRouteCache = CacheContainer<TableId, Arc<TableRoute>, CacheIdent>;
56
57pub type TableRouteCacheRef = Arc<TableRouteCache>;
58
59/// Constructs a [TableRouteCache].
60pub fn new_table_route_cache(
61    name: String,
62    cache: Cache<TableId, Arc<TableRoute>>,
63    kv_backend: KvBackendRef,
64) -> TableRouteCache {
65    let table_info_manager = Arc::new(TableRouteManager::new(kv_backend));
66    let init = init_factory(table_info_manager);
67
68    CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
69}
70
71fn init_factory(
72    table_route_manager: TableRouteManagerRef,
73) -> Initializer<TableId, Arc<TableRoute>> {
74    Arc::new(move |table_id| {
75        let table_route_manager = table_route_manager.clone();
76        Box::pin(async move {
77            let table_route_value = table_route_manager
78                .table_route_storage()
79                .get(*table_id)
80                .await?
81                .context(error::ValueNotExistSnafu {})?;
82
83            let table_route = match table_route_value {
84                TableRouteValue::Physical(physical) => TableRoute::Physical(Arc::new(physical)),
85                TableRouteValue::Logical(logical) => TableRoute::Logical(Arc::new(logical)),
86            };
87
88            Ok(Some(Arc::new(table_route)))
89        })
90    })
91}
92
93fn invalidator<'a>(
94    cache: &'a Cache<TableId, Arc<TableRoute>>,
95    ident: &'a CacheIdent,
96) -> BoxFuture<'a, Result<()>> {
97    Box::pin(async move {
98        if let CacheIdent::TableId(table_id) = ident {
99            cache.invalidate(table_id).await
100        }
101        Ok(())
102    })
103}
104
105fn filter(ident: &CacheIdent) -> bool {
106    matches!(ident, CacheIdent::TableId(_))
107}
108
109#[cfg(test)]
110mod tests {
111    use std::collections::HashMap;
112    use std::sync::Arc;
113
114    use moka::future::CacheBuilder;
115    use store_api::storage::RegionId;
116
117    use super::*;
118    use crate::ddl::test_util::create_table::test_create_table_task;
119    use crate::key::table_route::TableRouteValue;
120    use crate::key::TableMetadataManager;
121    use crate::kv_backend::memory::MemoryKvBackend;
122    use crate::peer::Peer;
123    use crate::rpc::router::{Region, RegionRoute};
124
125    #[tokio::test]
126    async fn test_cache() {
127        let mem_kv = Arc::new(MemoryKvBackend::default());
128        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
129        let cache = CacheBuilder::new(128).build();
130        let cache = new_table_route_cache("test".to_string(), cache, mem_kv.clone());
131
132        let result = cache.get(1024).await.unwrap();
133        assert!(result.is_none());
134        let task = test_create_table_task("my_table", 1024);
135        let table_id = 10;
136        let region_id = RegionId::new(table_id, 1);
137        let peer = Peer::empty(1);
138        let region_routes = vec![RegionRoute {
139            region: Region::new_test(region_id),
140            leader_peer: Some(peer.clone()),
141            ..Default::default()
142        }];
143        table_metadata_manager
144            .create_table_metadata(
145                task.table_info.clone(),
146                TableRouteValue::physical(region_routes.clone()),
147                HashMap::new(),
148            )
149            .await
150            .unwrap();
151        let table_route = cache.get(1024).await.unwrap().unwrap();
152        assert_eq!(
153            (*table_route)
154                .clone()
155                .as_physical_table_route_ref()
156                .unwrap()
157                .region_routes,
158            region_routes
159        );
160
161        assert!(cache.contains_key(&1024));
162        cache
163            .invalidate(&[CacheIdent::TableId(1024)])
164            .await
165            .unwrap();
166        assert!(!cache.contains_key(&1024));
167    }
168}