1use std::sync::Arc;
16
17use common_base::hash::partition_expr_version;
18use common_error::ext::BoxedError;
19use common_meta::cache::{CacheContainer, Initializer, TableRoute, TableRouteCacheRef};
20use common_meta::instruction::CacheIdent;
21use common_meta::rpc::router::RegionRoute;
22use moka::future::Cache;
23use snafu::ResultExt;
24use store_api::storage::{RegionId, TableId};
25
26use crate::expr::PartitionExpr;
27use crate::manager::PartitionInfoWithVersion;
28
29#[derive(Debug, Clone)]
30pub struct PhysicalPartitionInfo {
31 pub partitions: Vec<PartitionInfoWithVersion>,
32}
33
34#[derive(Debug, Clone)]
35pub enum CachedPartitionInfo {
36 Physical(Arc<PhysicalPartitionInfo>),
37 Logical(TableId),
38}
39
40impl CachedPartitionInfo {
41 pub fn into_physical(self) -> Option<Arc<PhysicalPartitionInfo>> {
43 match self {
44 CachedPartitionInfo::Physical(partitions) => Some(partitions),
45 CachedPartitionInfo::Logical(_) => None,
46 }
47 }
48}
49
50pub type PartitionInfoCache = CacheContainer<TableId, CachedPartitionInfo, CacheIdent>;
51
52pub type PartitionInfoCacheRef = Arc<PartitionInfoCache>;
53
54pub fn create_partitions_with_version_from_region_routes(
55 table_id: TableId,
56 region_routes: &[RegionRoute],
57) -> common_meta::error::Result<Vec<PartitionInfoWithVersion>> {
58 let mut partitions = Vec::with_capacity(region_routes.len());
59 for r in region_routes {
60 let expr_json = r.region.partition_expr();
61 let partition_expr_version = if expr_json.is_empty() {
62 None
63 } else {
64 Some(partition_expr_version(Some(expr_json.as_str())))
65 };
66 let partition_expr = PartitionExpr::from_json_str(expr_json.as_str())
67 .map_err(BoxedError::new)
68 .context(common_meta::error::ExternalSnafu)?;
69 let id = RegionId::new(table_id, r.region.id.region_number());
70 partitions.push(PartitionInfoWithVersion {
71 id,
72 partition_expr,
73 partition_expr_version,
74 });
75 }
76
77 Ok(partitions)
78}
79
80fn init_factory(
81 table_route_cache: TableRouteCacheRef,
82) -> Initializer<TableId, CachedPartitionInfo> {
83 Arc::new(move |table_id: &TableId| {
84 let table_route_cache = table_route_cache.clone();
85 Box::pin(async move {
86 let Some(table_route) = table_route_cache.get(*table_id).await? else {
87 return Ok(None);
88 };
89
90 let cached = match table_route.as_ref() {
91 TableRoute::Physical(physical) => {
92 let partitions = create_partitions_with_version_from_region_routes(
93 *table_id,
94 &physical.region_routes,
95 )?;
96
97 CachedPartitionInfo::Physical(Arc::new(PhysicalPartitionInfo { partitions }))
98 }
99 TableRoute::Logical(logical) => {
100 let table_id = logical.physical_table_id();
101 CachedPartitionInfo::Logical(table_id)
102 }
103 };
104
105 Ok(Some(cached))
106 })
107 })
108}
109
110pub fn new_partition_info_cache(
111 name: String,
112 cache: Cache<TableId, CachedPartitionInfo>,
113 table_route_cache: TableRouteCacheRef,
114) -> PartitionInfoCache {
115 CacheContainer::new(
116 name,
117 cache,
118 Box::new(|cache, ident| {
119 Box::pin(async move {
120 if let CacheIdent::TableId(table_id) = ident {
121 cache.invalidate(table_id).await
122 }
123 Ok(())
124 })
125 }),
126 init_factory(table_route_cache),
127 |ident| matches!(ident, CacheIdent::TableId(_)),
128 )
129}