common_meta/cache/table/
table_route.rs1use std::sync::Arc;
16
17use futures::future::BoxFuture;
18use moka::future::Cache;
19use snafu::OptionExt;
20use store_api::storage::TableId;
21
22use crate::cache::container::InitStrategy;
23use crate::cache::{CacheContainer, Initializer};
24use crate::error;
25use crate::error::Result;
26use crate::instruction::CacheIdent;
27use crate::key::table_route::{
28 LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteManager, TableRouteManagerRef,
29 TableRouteValue,
30};
31use crate::kv_backend::KvBackendRef;
32
33#[derive(Clone)]
35pub enum TableRoute {
36 Physical(Arc<PhysicalTableRouteValue>),
37 Logical(Arc<LogicalTableRouteValue>),
38}
39
40impl TableRoute {
41 pub fn is_physical(&self) -> bool {
43 matches!(self, TableRoute::Physical(_))
44 }
45
46 pub fn as_physical_table_route_ref(&self) -> Option<&Arc<PhysicalTableRouteValue>> {
48 match self {
49 TableRoute::Physical(table_route) => Some(table_route),
50 TableRoute::Logical(_) => None,
51 }
52 }
53}
54
55pub type TableRouteCache = CacheContainer<TableId, Arc<TableRoute>, CacheIdent>;
57
58pub type TableRouteCacheRef = Arc<TableRouteCache>;
59
60pub fn new_table_route_cache(
62 name: String,
63 cache: Cache<TableId, Arc<TableRoute>>,
64 kv_backend: KvBackendRef,
65) -> TableRouteCache {
66 let table_info_manager = Arc::new(TableRouteManager::new(kv_backend));
67 let init = init_factory(table_info_manager);
68
69 CacheContainer::with_strategy(
70 name,
71 cache,
72 Box::new(invalidator),
73 init,
74 filter,
75 InitStrategy::VersionChecked,
76 )
77}
78
79fn init_factory(
80 table_route_manager: TableRouteManagerRef,
81) -> Initializer<TableId, Arc<TableRoute>> {
82 Arc::new(move |table_id| {
83 let table_route_manager = table_route_manager.clone();
84 Box::pin(async move {
85 let table_route_value = table_route_manager
86 .table_route_storage()
87 .get(*table_id)
88 .await?
89 .context(error::ValueNotExistSnafu {})?;
90
91 let table_route = match table_route_value {
92 TableRouteValue::Physical(physical) => TableRoute::Physical(Arc::new(physical)),
93 TableRouteValue::Logical(logical) => TableRoute::Logical(Arc::new(logical)),
94 };
95
96 Ok(Some(Arc::new(table_route)))
97 })
98 })
99}
100
101fn invalidator<'a>(
102 cache: &'a Cache<TableId, Arc<TableRoute>>,
103 idents: &'a [&CacheIdent],
104) -> BoxFuture<'a, Result<()>> {
105 Box::pin(async move {
106 for ident in idents {
107 if let CacheIdent::TableId(table_id) = ident {
108 cache.invalidate(table_id).await
109 }
110 }
111 Ok(())
112 })
113}
114
115fn filter(ident: &CacheIdent) -> bool {
116 matches!(ident, CacheIdent::TableId(_))
117}
118
119#[cfg(test)]
120mod tests {
121 use std::collections::HashMap;
122 use std::sync::Arc;
123
124 use moka::future::CacheBuilder;
125 use store_api::storage::RegionId;
126
127 use super::*;
128 use crate::ddl::test_util::create_table::test_create_table_task;
129 use crate::key::TableMetadataManager;
130 use crate::key::table_route::TableRouteValue;
131 use crate::kv_backend::memory::MemoryKvBackend;
132 use crate::peer::Peer;
133 use crate::rpc::router::{Region, RegionRoute};
134
135 #[tokio::test]
136 async fn test_cache() {
137 let mem_kv = Arc::new(MemoryKvBackend::default());
138 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
139 let cache = CacheBuilder::new(128).build();
140 let cache = new_table_route_cache("test".to_string(), cache, mem_kv.clone());
141
142 let result = cache.get(1024).await.unwrap();
143 assert!(result.is_none());
144 let task = test_create_table_task("my_table", 1024);
145 let table_id = 10;
146 let region_id = RegionId::new(table_id, 1);
147 let peer = Peer::empty(1);
148 let region_routes = vec![RegionRoute {
149 region: Region::new_test(region_id),
150 leader_peer: Some(peer.clone()),
151 ..Default::default()
152 }];
153 table_metadata_manager
154 .create_table_metadata(
155 task.table_info.clone(),
156 TableRouteValue::physical(region_routes.clone()),
157 HashMap::new(),
158 )
159 .await
160 .unwrap();
161 let table_route = cache.get(1024).await.unwrap().unwrap();
162 assert_eq!(
163 (*table_route)
164 .clone()
165 .as_physical_table_route_ref()
166 .unwrap()
167 .region_routes,
168 region_routes
169 );
170
171 assert!(cache.contains_key(&1024));
172 cache
173 .invalidate(&[CacheIdent::TableId(1024)])
174 .await
175 .unwrap();
176 assert!(!cache.contains_key(&1024));
177 }
178}