Skip to main content

meta_srv/utils/
insert_forwarder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::RowInsertRequests;
18use client::error::Result as ClientResult;
19use client::inserter::{Context, InsertOptions, Inserter};
20
21use crate::utils::database::{DatabaseContext, DatabaseOperatorRef};
22
23pub type InsertForwarderRef = Arc<InsertForwarder>;
24
25/// [`InsertForwarder`] is the inserter for the metasrv.
26/// It forwards insert requests to available frontend instances.
27pub struct InsertForwarder {
28    database_operator: DatabaseOperatorRef,
29    options: Option<InsertOptions>,
30}
31
32impl InsertForwarder {
33    /// Creates a new [`InsertForwarder`] with the given shared database operator
34    /// and optional insert options.
35    pub fn new(database_operator: DatabaseOperatorRef, options: Option<InsertOptions>) -> Self {
36        Self {
37            database_operator,
38            options,
39        }
40    }
41}
42
43#[async_trait::async_trait]
44impl Inserter for InsertForwarder {
45    async fn insert_rows(
46        &self,
47        context: &Context<'_>,
48        requests: RowInsertRequests,
49    ) -> ClientResult<()> {
50        let ctx = DatabaseContext::new(context.catalog, context.schema);
51        let hints = self.options.as_ref().map_or(vec![], |o| o.to_hints());
52
53        self.database_operator
54            .insert(
55                &ctx,
56                requests,
57                &hints
58                    .iter()
59                    .map(|(k, v)| (*k, v.as_str()))
60                    .collect::<Vec<_>>(),
61            )
62            .await?;
63
64        Ok(())
65    }
66
67    fn set_options(&mut self, options: &InsertOptions) {
68        self.options = Some(*options);
69    }
70}