frontend/instance/
region_query.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use async_trait::async_trait;
18use common_error::ext::BoxedError;
19use common_meta::node_manager::NodeManagerRef;
20use common_query::request::QueryRequest;
21use common_recordbatch::SendableRecordBatchStream;
22use partition::manager::PartitionRuleManagerRef;
23use query::error::{RegionQuerySnafu, Result as QueryResult};
24use query::region_query::RegionQueryHandler;
25use session::ReadPreference;
26use snafu::ResultExt;
27
28use crate::error::{FindRegionPeerSnafu, RequestQuerySnafu, Result};
29
30pub(crate) struct FrontendRegionQueryHandler {
31    partition_manager: PartitionRuleManagerRef,
32    node_manager: NodeManagerRef,
33}
34
35impl FrontendRegionQueryHandler {
36    pub fn arc(
37        partition_manager: PartitionRuleManagerRef,
38        node_manager: NodeManagerRef,
39    ) -> Arc<Self> {
40        Arc::new(Self {
41            partition_manager,
42            node_manager,
43        })
44    }
45}
46
47#[async_trait]
48impl RegionQueryHandler for FrontendRegionQueryHandler {
49    async fn do_get(
50        &self,
51        read_preference: ReadPreference,
52        request: QueryRequest,
53    ) -> QueryResult<SendableRecordBatchStream> {
54        self.do_get_inner(read_preference, request)
55            .await
56            .map_err(BoxedError::new)
57            .context(RegionQuerySnafu)
58    }
59}
60
61impl FrontendRegionQueryHandler {
62    async fn do_get_inner(
63        &self,
64        read_preference: ReadPreference,
65        request: QueryRequest,
66    ) -> Result<SendableRecordBatchStream> {
67        let region_id = request.region_id;
68
69        let peer = &self
70            .partition_manager
71            .find_region_leader(region_id)
72            .await
73            .context(FindRegionPeerSnafu {
74                region_id,
75                read_preference,
76            })?;
77
78        let client = self.node_manager.datanode(peer).await;
79
80        client
81            .handle_query(request)
82            .await
83            .context(RequestQuerySnafu)
84    }
85}