frontend/instance/
standalone.rs1use std::sync::Arc;
16
17use api::region::RegionResponse;
18use api::v1::region::{RegionRequest, RegionResponse as RegionResponseV1};
19use async_trait::async_trait;
20use client::region::check_response_header;
21use common_error::ext::BoxedError;
22use common_meta::error::{self as meta_error, Result as MetaResult};
23use common_meta::node_manager::{
24 Datanode, DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef,
25};
26use common_meta::peer::Peer;
27use common_query::request::QueryRequest;
28use common_recordbatch::SendableRecordBatchStream;
29use common_telemetry::tracing;
30use common_telemetry::tracing_context::{FutureExt, TracingContext};
31use datanode::region_server::RegionServer;
32use servers::grpc::region_server::RegionServerHandler;
33use snafu::{OptionExt, ResultExt};
34
35use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result};
36
37pub struct StandaloneDatanodeManager {
38 pub region_server: RegionServer,
39 pub flow_server: FlownodeRef,
40}
41
42#[async_trait]
43impl DatanodeManager for StandaloneDatanodeManager {
44 async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
45 RegionInvoker::arc(self.region_server.clone())
46 }
47}
48
49#[async_trait]
50impl FlownodeManager for StandaloneDatanodeManager {
51 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
52 self.flow_server.clone()
53 }
54}
55
56pub struct RegionInvoker {
58 region_server: RegionServer,
59}
60
61impl RegionInvoker {
62 pub fn arc(region_server: RegionServer) -> Arc<Self> {
63 Arc::new(Self { region_server })
64 }
65
66 async fn handle_inner(&self, request: RegionRequest) -> Result<RegionResponseV1> {
67 let body = request.body.with_context(|| InvalidRegionRequestSnafu {
68 reason: "body not found",
69 })?;
70
71 self.region_server
72 .handle(body)
73 .await
74 .context(InvokeRegionServerSnafu)
75 }
76}
77
78#[async_trait]
79impl Datanode for RegionInvoker {
80 async fn handle(&self, request: RegionRequest) -> MetaResult<RegionResponse> {
81 let span = request
82 .header
83 .as_ref()
84 .map(|h| TracingContext::from_w3c(&h.tracing_context))
85 .unwrap_or_default()
86 .attach(tracing::info_span!("RegionInvoker::handle_region_request"));
87 let response = self
88 .handle_inner(request)
89 .trace(span)
90 .await
91 .map_err(BoxedError::new)
92 .context(meta_error::ExternalSnafu)?;
93 check_response_header(&response.header)
94 .map_err(BoxedError::new)
95 .context(meta_error::ExternalSnafu)?;
96 Ok(RegionResponse::from_region_response(response))
97 }
98
99 async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
100 let span = request
101 .header
102 .as_ref()
103 .map(|h| TracingContext::from_w3c(&h.tracing_context))
104 .unwrap_or_default()
105 .attach(tracing::info_span!("RegionInvoker::handle_query"));
106 self.region_server
107 .handle_read(request)
108 .trace(span)
109 .await
110 .map_err(BoxedError::new)
111 .context(meta_error::ExternalSnafu)
112 }
113}