common_meta/cache/table/
table_route.rs1use std::sync::Arc;
16
17use futures::future::BoxFuture;
18use moka::future::Cache;
19use snafu::OptionExt;
20use store_api::storage::TableId;
21
22use crate::cache::{CacheContainer, Initializer};
23use crate::error;
24use crate::error::Result;
25use crate::instruction::CacheIdent;
26use crate::key::table_route::{
27 LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteManager, TableRouteManagerRef,
28 TableRouteValue,
29};
30use crate::kv_backend::KvBackendRef;
31
32#[derive(Clone)]
34pub enum TableRoute {
35 Physical(Arc<PhysicalTableRouteValue>),
36 Logical(Arc<LogicalTableRouteValue>),
37}
38
39impl TableRoute {
40 pub fn is_physical(&self) -> bool {
42 matches!(self, TableRoute::Physical(_))
43 }
44
45 pub fn as_physical_table_route_ref(&self) -> Option<&Arc<PhysicalTableRouteValue>> {
47 match self {
48 TableRoute::Physical(table_route) => Some(table_route),
49 TableRoute::Logical(_) => None,
50 }
51 }
52}
53
54pub type TableRouteCache = CacheContainer<TableId, Arc<TableRoute>, CacheIdent>;
56
57pub type TableRouteCacheRef = Arc<TableRouteCache>;
58
59pub fn new_table_route_cache(
61 name: String,
62 cache: Cache<TableId, Arc<TableRoute>>,
63 kv_backend: KvBackendRef,
64) -> TableRouteCache {
65 let table_info_manager = Arc::new(TableRouteManager::new(kv_backend));
66 let init = init_factory(table_info_manager);
67
68 CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
69}
70
71fn init_factory(
72 table_route_manager: TableRouteManagerRef,
73) -> Initializer<TableId, Arc<TableRoute>> {
74 Arc::new(move |table_id| {
75 let table_route_manager = table_route_manager.clone();
76 Box::pin(async move {
77 let table_route_value = table_route_manager
78 .table_route_storage()
79 .get(*table_id)
80 .await?
81 .context(error::ValueNotExistSnafu {})?;
82
83 let table_route = match table_route_value {
84 TableRouteValue::Physical(physical) => TableRoute::Physical(Arc::new(physical)),
85 TableRouteValue::Logical(logical) => TableRoute::Logical(Arc::new(logical)),
86 };
87
88 Ok(Some(Arc::new(table_route)))
89 })
90 })
91}
92
93fn invalidator<'a>(
94 cache: &'a Cache<TableId, Arc<TableRoute>>,
95 ident: &'a CacheIdent,
96) -> BoxFuture<'a, Result<()>> {
97 Box::pin(async move {
98 if let CacheIdent::TableId(table_id) = ident {
99 cache.invalidate(table_id).await
100 }
101 Ok(())
102 })
103}
104
105fn filter(ident: &CacheIdent) -> bool {
106 matches!(ident, CacheIdent::TableId(_))
107}
108
109#[cfg(test)]
110mod tests {
111 use std::collections::HashMap;
112 use std::sync::Arc;
113
114 use moka::future::CacheBuilder;
115 use store_api::storage::RegionId;
116
117 use super::*;
118 use crate::ddl::test_util::create_table::test_create_table_task;
119 use crate::key::table_route::TableRouteValue;
120 use crate::key::TableMetadataManager;
121 use crate::kv_backend::memory::MemoryKvBackend;
122 use crate::peer::Peer;
123 use crate::rpc::router::{Region, RegionRoute};
124
125 #[tokio::test]
126 async fn test_cache() {
127 let mem_kv = Arc::new(MemoryKvBackend::default());
128 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
129 let cache = CacheBuilder::new(128).build();
130 let cache = new_table_route_cache("test".to_string(), cache, mem_kv.clone());
131
132 let result = cache.get(1024).await.unwrap();
133 assert!(result.is_none());
134 let task = test_create_table_task("my_table", 1024);
135 let table_id = 10;
136 let region_id = RegionId::new(table_id, 1);
137 let peer = Peer::empty(1);
138 let region_routes = vec![RegionRoute {
139 region: Region::new_test(region_id),
140 leader_peer: Some(peer.clone()),
141 ..Default::default()
142 }];
143 table_metadata_manager
144 .create_table_metadata(
145 task.table_info.clone(),
146 TableRouteValue::physical(region_routes.clone()),
147 HashMap::new(),
148 )
149 .await
150 .unwrap();
151 let table_route = cache.get(1024).await.unwrap().unwrap();
152 assert_eq!(
153 (*table_route)
154 .clone()
155 .as_physical_table_route_ref()
156 .unwrap()
157 .region_routes,
158 region_routes
159 );
160
161 assert!(cache.contains_key(&1024));
162 cache
163 .invalidate(&[CacheIdent::TableId(1024)])
164 .await
165 .unwrap();
166 assert!(!cache.contains_key(&1024));
167 }
168}