meta_srv/utils/
insert_forwarder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::RowInsertRequests;
18use client::error::{ExternalSnafu, Result as ClientResult};
19use client::inserter::{Context, InsertOptions, Inserter};
20use client::{Client, Database};
21use common_error::ext::BoxedError;
22use common_meta::peer::PeerLookupServiceRef;
23use common_telemetry::{debug, warn};
24use snafu::{ensure, ResultExt};
25use tokio::sync::RwLock;
26
27use crate::error::{LookupFrontendsSnafu, NoAvailableFrontendSnafu};
28
29pub type InsertForwarderRef = Arc<InsertForwarder>;
30
31/// [`InsertForwarder`] is the inserter for the metasrv.
32/// It forwards insert requests to available frontend instances.
33pub struct InsertForwarder {
34    peer_lookup_service: PeerLookupServiceRef,
35    client: RwLock<Option<Client>>,
36    options: Option<InsertOptions>,
37}
38
39impl InsertForwarder {
40    /// Creates a new InsertForwarder with the given peer lookup service.
41    pub fn new(peer_lookup_service: PeerLookupServiceRef, options: Option<InsertOptions>) -> Self {
42        Self {
43            peer_lookup_service,
44            client: RwLock::new(None),
45            options,
46        }
47    }
48
49    /// Builds a new client.
50    async fn build_client(&self) -> crate::error::Result<Client> {
51        let frontends = self
52            .peer_lookup_service
53            .active_frontends()
54            .await
55            .context(LookupFrontendsSnafu)?;
56
57        ensure!(!frontends.is_empty(), NoAvailableFrontendSnafu);
58
59        let urls = frontends
60            .into_iter()
61            .map(|peer| peer.addr)
62            .collect::<Vec<_>>();
63
64        debug!("Available frontend addresses: {:?}", urls);
65
66        Ok(Client::with_urls(urls))
67    }
68
69    /// Initializes the client if it hasn't been initialized yet, or returns the cached client.
70    async fn maybe_init_client(&self) -> Result<Client, BoxedError> {
71        let mut guard = self.client.write().await;
72        if guard.is_none() {
73            let client = self.build_client().await.map_err(BoxedError::new)?;
74            *guard = Some(client);
75        }
76
77        // Safety: checked above that the client is Some.
78        Ok(guard.as_ref().unwrap().clone())
79    }
80
81    /// Resets the cached client, forcing a rebuild on the next use.
82    async fn reset_client(&self) {
83        warn!("Resetting the client");
84        let mut guard = self.client.write().await;
85        guard.take();
86    }
87}
88
89#[async_trait::async_trait]
90impl Inserter for InsertForwarder {
91    async fn insert_rows(
92        &self,
93        context: &Context<'_>,
94        requests: RowInsertRequests,
95    ) -> ClientResult<()> {
96        let client = self.maybe_init_client().await.context(ExternalSnafu)?;
97        let database = Database::new(context.catalog, context.schema, client);
98        let hints = self.options.as_ref().map_or(vec![], |o| o.to_hints());
99
100        if let Err(e) = database
101            .row_inserts_with_hints(
102                requests,
103                &hints
104                    .iter()
105                    .map(|(k, v)| (*k, v.as_str()))
106                    .collect::<Vec<_>>(),
107            )
108            .await
109            .map_err(BoxedError::new)
110            .context(ExternalSnafu)
111        {
112            // Resets the client so it will be rebuilt next time.
113            self.reset_client().await;
114            return Err(e);
115        };
116
117        Ok(())
118    }
119
120    fn set_options(&mut self, options: &InsertOptions) {
121        self.options = Some(*options);
122    }
123}