frontend/instance/
region_query.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use common_error::ext::BoxedError;
19use common_meta::node_manager::NodeManagerRef;
20use common_query::request::QueryRequest;
21use common_recordbatch::SendableRecordBatchStream;
22use partition::manager::PartitionRuleManagerRef;
23use query::error::{RegionQuerySnafu, Result as QueryResult};
24use query::region_query::RegionQueryHandler;
25use session::ReadPreference;
26use snafu::ResultExt;
27
28use crate::error::{FindRegionPeerSnafu, RequestQuerySnafu, Result};
29
30pub(crate) struct FrontendRegionQueryHandler {
31 partition_manager: PartitionRuleManagerRef,
32 node_manager: NodeManagerRef,
33}
34
35impl FrontendRegionQueryHandler {
36 pub fn arc(
37 partition_manager: PartitionRuleManagerRef,
38 node_manager: NodeManagerRef,
39 ) -> Arc<Self> {
40 Arc::new(Self {
41 partition_manager,
42 node_manager,
43 })
44 }
45}
46
47#[async_trait]
48impl RegionQueryHandler for FrontendRegionQueryHandler {
49 async fn do_get(
50 &self,
51 read_preference: ReadPreference,
52 request: QueryRequest,
53 ) -> QueryResult<SendableRecordBatchStream> {
54 self.do_get_inner(read_preference, request)
55 .await
56 .map_err(BoxedError::new)
57 .context(RegionQuerySnafu)
58 }
59}
60
61impl FrontendRegionQueryHandler {
62 async fn do_get_inner(
63 &self,
64 read_preference: ReadPreference,
65 request: QueryRequest,
66 ) -> Result<SendableRecordBatchStream> {
67 let region_id = request.region_id;
68
69 let peer = &self
70 .partition_manager
71 .find_region_leader(region_id)
72 .await
73 .context(FindRegionPeerSnafu {
74 region_id,
75 read_preference,
76 })?;
77
78 let client = self.node_manager.datanode(peer).await;
79
80 client
81 .handle_query(request)
82 .await
83 .context(RequestQuerySnafu)
84 }
85}