frontend/instance/
standalone.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::region::RegionResponse;
18use api::v1::region::{RegionRequest, RegionResponse as RegionResponseV1};
19use async_trait::async_trait;
20use client::region::check_response_header;
21use common_error::ext::BoxedError;
22use common_meta::error::{self as meta_error, Result as MetaResult};
23use common_meta::node_manager::{
24    Datanode, DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef,
25};
26use common_meta::peer::Peer;
27use common_query::request::QueryRequest;
28use common_recordbatch::SendableRecordBatchStream;
29use common_telemetry::tracing;
30use common_telemetry::tracing_context::{FutureExt, TracingContext};
31use datanode::region_server::RegionServer;
32use servers::grpc::region_server::RegionServerHandler;
33use snafu::{OptionExt, ResultExt};
34
35use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result};
36
37pub struct StandaloneDatanodeManager {
38    pub region_server: RegionServer,
39    pub flow_server: FlownodeRef,
40}
41
42#[async_trait]
43impl DatanodeManager for StandaloneDatanodeManager {
44    async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
45        RegionInvoker::arc(self.region_server.clone())
46    }
47}
48
49#[async_trait]
50impl FlownodeManager for StandaloneDatanodeManager {
51    async fn flownode(&self, _node: &Peer) -> FlownodeRef {
52        self.flow_server.clone()
53    }
54}
55
56/// Relative to [client::region::RegionRequester]
57pub struct RegionInvoker {
58    region_server: RegionServer,
59}
60
61impl RegionInvoker {
62    pub fn arc(region_server: RegionServer) -> Arc<Self> {
63        Arc::new(Self { region_server })
64    }
65
66    async fn handle_inner(&self, request: RegionRequest) -> Result<RegionResponseV1> {
67        let body = request.body.with_context(|| InvalidRegionRequestSnafu {
68            reason: "body not found",
69        })?;
70
71        self.region_server
72            .handle(body)
73            .await
74            .context(InvokeRegionServerSnafu)
75    }
76}
77
78#[async_trait]
79impl Datanode for RegionInvoker {
80    async fn handle(&self, request: RegionRequest) -> MetaResult<RegionResponse> {
81        let span = request
82            .header
83            .as_ref()
84            .map(|h| TracingContext::from_w3c(&h.tracing_context))
85            .unwrap_or_default()
86            .attach(tracing::info_span!("RegionInvoker::handle_region_request"));
87        let response = self
88            .handle_inner(request)
89            .trace(span)
90            .await
91            .map_err(BoxedError::new)
92            .context(meta_error::ExternalSnafu)?;
93        check_response_header(&response.header)
94            .map_err(BoxedError::new)
95            .context(meta_error::ExternalSnafu)?;
96        Ok(RegionResponse::from_region_response(response))
97    }
98
99    async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
100        let span = request
101            .header
102            .as_ref()
103            .map(|h| TracingContext::from_w3c(&h.tracing_context))
104            .unwrap_or_default()
105            .attach(tracing::info_span!("RegionInvoker::handle_query"));
106        self.region_server
107            .handle_read(request)
108            .trace(span)
109            .await
110            .map_err(BoxedError::new)
111            .context(meta_error::ExternalSnafu)
112    }
113}