meta_srv/utils/
insert_forwarder.rs1use std::sync::Arc;
16
17use api::v1::RowInsertRequests;
18use client::error::Result as ClientResult;
19use client::inserter::{Context, InsertOptions, Inserter};
20
21use crate::utils::database::{DatabaseContext, DatabaseOperatorRef};
22
23pub type InsertForwarderRef = Arc<InsertForwarder>;
24
25pub struct InsertForwarder {
28 database_operator: DatabaseOperatorRef,
29 options: Option<InsertOptions>,
30}
31
32impl InsertForwarder {
33 pub fn new(database_operator: DatabaseOperatorRef, options: Option<InsertOptions>) -> Self {
36 Self {
37 database_operator,
38 options,
39 }
40 }
41}
42
43#[async_trait::async_trait]
44impl Inserter for InsertForwarder {
45 async fn insert_rows(
46 &self,
47 context: &Context<'_>,
48 requests: RowInsertRequests,
49 ) -> ClientResult<()> {
50 let ctx = DatabaseContext::new(context.catalog, context.schema);
51 let hints = self.options.as_ref().map_or(vec![], |o| o.to_hints());
52
53 self.database_operator
54 .insert(
55 &ctx,
56 requests,
57 &hints
58 .iter()
59 .map(|(k, v)| (*k, v.as_str()))
60 .collect::<Vec<_>>(),
61 )
62 .await?;
63
64 Ok(())
65 }
66
67 fn set_options(&mut self, options: &InsertOptions) {
68 self.options = Some(*options);
69 }
70}