frontend/instance/
region_query.rsuse std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_meta::node_manager::NodeManagerRef;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use partition::manager::PartitionRuleManagerRef;
use query::error::{RegionQuerySnafu, Result as QueryResult};
use query::region_query::RegionQueryHandler;
use session::ReadPreference;
use snafu::ResultExt;
use crate::error::{FindRegionPeerSnafu, RequestQuerySnafu, Result};
pub(crate) struct FrontendRegionQueryHandler {
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
}
impl FrontendRegionQueryHandler {
pub fn arc(
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
) -> Arc<Self> {
Arc::new(Self {
partition_manager,
node_manager,
})
}
}
#[async_trait]
impl RegionQueryHandler for FrontendRegionQueryHandler {
async fn do_get(
&self,
read_preference: ReadPreference,
request: QueryRequest,
) -> QueryResult<SendableRecordBatchStream> {
self.do_get_inner(read_preference, request)
.await
.map_err(BoxedError::new)
.context(RegionQuerySnafu)
}
}
impl FrontendRegionQueryHandler {
async fn do_get_inner(
&self,
read_preference: ReadPreference,
request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
let region_id = request.region_id;
let peer = &self
.partition_manager
.find_region_leader(region_id)
.await
.context(FindRegionPeerSnafu {
region_id,
read_preference,
})?;
let client = self.node_manager.datanode(peer).await;
client
.handle_query(request)
.await
.context(RequestQuerySnafu)
}
}