1use std::sync::Arc;
16
17use common_base::hash::partition_expr_version;
18use common_error::ext::BoxedError;
19use common_meta::cache::{CacheContainer, Initializer, TableRoute, TableRouteCacheRef};
20use common_meta::instruction::CacheIdent;
21use common_meta::rpc::router::RegionRoute;
22use moka::future::Cache;
23use snafu::ResultExt;
24use store_api::storage::{RegionId, TableId};
25
26use crate::expr::PartitionExpr;
27use crate::manager::PartitionInfoWithVersion;
28
29#[derive(Debug, Clone)]
30pub struct PhysicalPartitionInfo {
31 pub partitions: Vec<PartitionInfoWithVersion>,
32}
33
34#[derive(Debug, Clone)]
35pub enum CachedPartitionInfo {
36 Physical(Arc<PhysicalPartitionInfo>),
37 Logical(TableId),
38}
39
40impl CachedPartitionInfo {
41 pub fn into_physical(self) -> Option<Arc<PhysicalPartitionInfo>> {
43 match self {
44 CachedPartitionInfo::Physical(partitions) => Some(partitions),
45 CachedPartitionInfo::Logical(_) => None,
46 }
47 }
48}
49
50pub type PartitionInfoCache = CacheContainer<TableId, CachedPartitionInfo, CacheIdent>;
51
52pub type PartitionInfoCacheRef = Arc<PartitionInfoCache>;
53
54pub fn create_partitions_with_version_from_region_routes(
55 table_id: TableId,
56 region_routes: &[RegionRoute],
57) -> common_meta::error::Result<Vec<PartitionInfoWithVersion>> {
58 let mut partitions = Vec::with_capacity(region_routes.len());
59 for r in region_routes {
60 if r.is_ignore_all_writes() {
63 continue;
64 }
65
66 let expr_json = r.region.partition_expr();
67 let partition_expr_version = if expr_json.is_empty() {
68 None
69 } else {
70 Some(partition_expr_version(Some(expr_json.as_str())))
71 };
72 let partition_expr = PartitionExpr::from_json_str(expr_json.as_str())
73 .map_err(BoxedError::new)
74 .context(common_meta::error::ExternalSnafu)?;
75 let id = RegionId::new(table_id, r.region.id.region_number());
76 partitions.push(PartitionInfoWithVersion {
77 id,
78 partition_expr,
79 partition_expr_version,
80 });
81 }
82
83 Ok(partitions)
84}
85
86fn init_factory(
87 table_route_cache: TableRouteCacheRef,
88) -> Initializer<TableId, CachedPartitionInfo> {
89 Arc::new(move |table_id: &TableId| {
90 let table_route_cache = table_route_cache.clone();
91 Box::pin(async move {
92 let Some(table_route) = table_route_cache.get(*table_id).await? else {
93 return Ok(None);
94 };
95
96 let cached = match table_route.as_ref() {
97 TableRoute::Physical(physical) => {
98 let partitions = create_partitions_with_version_from_region_routes(
99 *table_id,
100 &physical.region_routes,
101 )?;
102
103 CachedPartitionInfo::Physical(Arc::new(PhysicalPartitionInfo { partitions }))
104 }
105 TableRoute::Logical(logical) => {
106 let table_id = logical.physical_table_id();
107 CachedPartitionInfo::Logical(table_id)
108 }
109 };
110
111 Ok(Some(cached))
112 })
113 })
114}
115
116pub fn new_partition_info_cache(
117 name: String,
118 cache: Cache<TableId, CachedPartitionInfo>,
119 table_route_cache: TableRouteCacheRef,
120) -> PartitionInfoCache {
121 CacheContainer::new(
122 name,
123 cache,
124 Box::new(|cache, ident| {
125 Box::pin(async move {
126 if let CacheIdent::TableId(table_id) = ident {
127 cache.invalidate(table_id).await
128 }
129 Ok(())
130 })
131 }),
132 init_factory(table_route_cache),
133 |ident| matches!(ident, CacheIdent::TableId(_)),
134 )
135}
136
137#[cfg(test)]
138mod tests {
139 use common_base::hash::partition_expr_version;
140 use common_meta::rpc::router::{Region, RegionRoute, WriteRoutePolicy};
141 use store_api::storage::RegionId;
142
143 use super::create_partitions_with_version_from_region_routes;
144
145 #[test]
146 fn test_create_partitions_with_version_excludes_reject_all_writes() {
147 let table_id = 1024;
148 let expr_json =
149 r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
150 let region_routes = vec![
151 RegionRoute {
152 region: Region {
153 id: RegionId::new(table_id, 1),
154 partition_expr: expr_json.to_string(),
155 ..Default::default()
156 },
157 leader_peer: None,
158 follower_peers: vec![],
159 leader_state: None,
160 leader_down_since: None,
161 write_route_policy: Some(WriteRoutePolicy::IgnoreAllWrites),
162 },
163 RegionRoute {
164 region: Region {
165 id: RegionId::new(table_id, 2),
166 partition_expr: expr_json.to_string(),
167 ..Default::default()
168 },
169 ..Default::default()
170 },
171 ];
172
173 let partitions =
174 create_partitions_with_version_from_region_routes(table_id, ®ion_routes).unwrap();
175 assert_eq!(1, partitions.len());
176 assert_eq!(RegionId::new(table_id, 2), partitions[0].id);
177 assert_eq!(
178 Some(partition_expr_version(Some(expr_json))),
179 partitions[0].partition_expr_version
180 );
181 }
182}