datanode/
partition_expr_fetcher.rs1use common_meta::key::table_route::TableRouteManager;
16use common_meta::kv_backend::KvBackendRef;
17use mito2::region::opener::PartitionExprFetcher;
18use store_api::storage::RegionId;
19
20pub struct MetaPartitionExprFetcher {
22 table_route_manager: TableRouteManager,
23}
24
25impl MetaPartitionExprFetcher {
26 pub fn new(kv: KvBackendRef) -> Self {
28 Self {
29 table_route_manager: TableRouteManager::new(kv),
30 }
31 }
32}
33
34#[async_trait::async_trait]
35impl PartitionExprFetcher for MetaPartitionExprFetcher {
36 async fn fetch_expr(&self, region_id: RegionId) -> Option<String> {
37 let table_id = region_id.table_id();
38 let Ok((_, route)) = self
39 .table_route_manager
40 .get_physical_table_route(table_id)
41 .await
42 else {
43 return None;
44 };
45 let region_number = region_id.region_number();
46 let rr = route
47 .region_routes
48 .iter()
49 .find(|r| r.region.id.region_number() == region_number)?;
50 Some(rr.region.partition_expr())
51 }
52}
53
54#[cfg(test)]
55mod tests {
56 use common_meta::key::table_route::{TableRouteStorage, TableRouteValue};
57 use common_meta::kv_backend::TxnService;
58 use common_meta::kv_backend::memory::MemoryKvBackend;
59 use common_meta::rpc::router::{Region, RegionRoute};
60
61 use super::*;
62
63 #[tokio::test]
64 async fn test_fetch_expr_json() {
65 let kv = std::sync::Arc::new(MemoryKvBackend::new());
66 let storage = TableRouteStorage::new(kv.clone());
67 let table_id_u32 = 42;
68 let region_number = 7;
69 let region_id = RegionId::new(table_id_u32, region_number);
70 let expr_json =
71 r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
72
73 let region_route = RegionRoute {
74 region: Region {
75 id: region_id,
76 name: "r".to_string(),
77 attrs: Default::default(),
78 partition: None,
79 partition_expr: expr_json.to_string(),
80 },
81 ..Default::default()
82 };
83 let trv = TableRouteValue::physical(vec![region_route]);
84 let (txn, _dec) = storage.build_create_txn(table_id_u32, &trv).unwrap();
85 kv.txn(txn).await.unwrap();
86
87 let fetcher = MetaPartitionExprFetcher::new(kv);
88 let fetched = fetcher.fetch_expr(region_id).await;
89 assert_eq!(fetched.as_deref(), Some(expr_json));
90 }
91}