Skip to main content

meta_srv/utils/
database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use client::error::{ExternalSnafu, Result as ClientResult};
18use client::{Client, Database, Output};
19use common_error::ext::BoxedError;
20use common_meta::peer::PeerDiscoveryRef;
21use common_telemetry::{debug, warn};
22use snafu::{ResultExt, ensure};
23use tokio::sync::RwLock;
24
25use crate::error::{ListActiveFrontendsSnafu, NoAvailableFrontendSnafu, Result};
26
27pub type DatabaseOperatorRef = Arc<DatabaseOperator>;
28
29#[derive(Debug, Clone, Copy)]
30/// Database-level request context used by metasrv forwarding.
31pub struct DatabaseContext<'a> {
32    /// Catalog name carried in forwarded requests.
33    pub catalog: &'a str,
34    /// Schema name carried in forwarded requests.
35    pub schema: &'a str,
36}
37
38impl<'a> DatabaseContext<'a> {
39    /// Creates a new database context from catalog and schema.
40    pub fn new(catalog: &'a str, schema: &'a str) -> Self {
41        Self { catalog, schema }
42    }
43}
44
45/// A cached frontend database operator used by metasrv.
46pub struct DatabaseOperator {
47    peer_discovery: PeerDiscoveryRef,
48    client: RwLock<Option<Client>>,
49}
50
51impl DatabaseOperator {
52    /// Creates a database operator backed by discovered frontend peers.
53    pub fn new(peer_discovery: PeerDiscoveryRef) -> Self {
54        Self {
55            peer_discovery,
56            client: RwLock::new(None),
57        }
58    }
59
60    /// Forwards row inserts to an available frontend database client.
61    pub async fn insert(
62        &self,
63        ctx: &DatabaseContext<'_>,
64        requests: api::v1::RowInsertRequests,
65        hints: &[(&str, &str)],
66    ) -> ClientResult<u32> {
67        let client = self.maybe_init_client().await?;
68        let database = Database::new(ctx.catalog, ctx.schema, client);
69
70        let result = database.row_inserts_with_hints(requests, hints).await;
71
72        if should_reset_client(&result) {
73            self.reset_client().await;
74        }
75
76        result
77    }
78
79    /// Executes a serialized logical plan on an available frontend.
80    pub async fn logical_plan(
81        &self,
82        ctx: &DatabaseContext<'_>,
83        plan: Vec<u8>,
84    ) -> ClientResult<Output> {
85        let client = self.maybe_init_client().await?;
86        let database = Database::new(ctx.catalog, ctx.schema, client);
87
88        let result = database.logical_plan(plan).await;
89
90        if should_reset_client(&result) {
91            self.reset_client().await;
92        }
93
94        result
95    }
96
97    async fn build_client(&self) -> Result<Client> {
98        let frontends = self
99            .peer_discovery
100            .active_frontends()
101            .await
102            .context(ListActiveFrontendsSnafu)?;
103
104        ensure!(!frontends.is_empty(), NoAvailableFrontendSnafu);
105
106        let urls = frontends
107            .into_iter()
108            .map(|peer| peer.addr)
109            .collect::<Vec<_>>();
110
111        debug!("Available frontend addresses: {:?}", urls);
112
113        Ok(Client::with_urls(urls))
114    }
115
116    async fn maybe_init_client(&self) -> ClientResult<Client> {
117        if let Some(client) = self.client.read().await.as_ref() {
118            return Ok(client.clone());
119        }
120
121        let client = self
122            .build_client()
123            .await
124            .map_err(BoxedError::new)
125            .context(ExternalSnafu)?;
126
127        let mut guard = self.client.write().await;
128        if let Some(client) = guard.as_ref() {
129            return Ok(client.clone());
130        }
131
132        *guard = Some(client.clone());
133        Ok(client)
134    }
135
136    async fn reset_client(&self) {
137        warn!("Resetting the client");
138        let mut guard = self.client.write().await;
139        guard.take();
140    }
141}
142
143fn should_reset_client<T>(result: &client::error::Result<T>) -> bool {
144    result
145        .as_ref()
146        .err()
147        .map(|err| err.is_connection_error())
148        .unwrap_or(false)
149}