datanode/
partition_expr_fetcher.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::key::table_route::TableRouteManager;
16use common_meta::kv_backend::KvBackendRef;
17use mito2::region::opener::PartitionExprFetcher;
18use store_api::storage::RegionId;
19
20/// A fetcher to fetch partition exprs from the table route.
21pub struct MetaPartitionExprFetcher {
22    table_route_manager: TableRouteManager,
23}
24
25impl MetaPartitionExprFetcher {
26    /// Creates a new [MetaPartitionExprFetcher].
27    pub fn new(kv: KvBackendRef) -> Self {
28        Self {
29            table_route_manager: TableRouteManager::new(kv),
30        }
31    }
32}
33
34#[async_trait::async_trait]
35impl PartitionExprFetcher for MetaPartitionExprFetcher {
36    async fn fetch_expr(&self, region_id: RegionId) -> Option<String> {
37        let table_id = region_id.table_id();
38        let Ok((_, route)) = self
39            .table_route_manager
40            .get_physical_table_route(table_id)
41            .await
42        else {
43            return None;
44        };
45        let region_number = region_id.region_number();
46        let rr = route
47            .region_routes
48            .iter()
49            .find(|r| r.region.id.region_number() == region_number)?;
50        Some(rr.region.partition_expr())
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use common_meta::key::table_route::{TableRouteStorage, TableRouteValue};
57    use common_meta::kv_backend::TxnService;
58    use common_meta::kv_backend::memory::MemoryKvBackend;
59    use common_meta::rpc::router::{Region, RegionRoute};
60
61    use super::*;
62
63    #[tokio::test]
64    async fn test_fetch_expr_json() {
65        let kv = std::sync::Arc::new(MemoryKvBackend::new());
66        let storage = TableRouteStorage::new(kv.clone());
67        let table_id_u32 = 42;
68        let region_number = 7;
69        let region_id = RegionId::new(table_id_u32, region_number);
70        let expr_json =
71            r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
72
73        let region_route = RegionRoute {
74            region: Region {
75                id: region_id,
76                name: "r".to_string(),
77                attrs: Default::default(),
78                partition: None,
79                partition_expr: expr_json.to_string(),
80            },
81            ..Default::default()
82        };
83        let trv = TableRouteValue::physical(vec![region_route]);
84        let (txn, _dec) = storage.build_create_txn(table_id_u32, &trv).unwrap();
85        kv.txn(txn).await.unwrap();
86
87        let fetcher = MetaPartitionExprFetcher::new(kv);
88        let fetched = fetcher.fetch_expr(region_id).await;
89        assert_eq!(fetched.as_deref(), Some(expr_json));
90    }
91}