meta_srv/utils/
insert_forwarder.rs1use std::sync::Arc;
16
17use api::v1::RowInsertRequests;
18use client::error::{ExternalSnafu, Result as ClientResult};
19use client::inserter::{Context, InsertOptions, Inserter};
20use client::{Client, Database};
21use common_error::ext::BoxedError;
22use common_meta::peer::PeerLookupServiceRef;
23use common_telemetry::{debug, warn};
24use snafu::{ensure, ResultExt};
25use tokio::sync::RwLock;
26
27use crate::error::{LookupFrontendsSnafu, NoAvailableFrontendSnafu};
28
29pub type InsertForwarderRef = Arc<InsertForwarder>;
30
31pub struct InsertForwarder {
34 peer_lookup_service: PeerLookupServiceRef,
35 client: RwLock<Option<Client>>,
36 options: Option<InsertOptions>,
37}
38
39impl InsertForwarder {
40 pub fn new(peer_lookup_service: PeerLookupServiceRef, options: Option<InsertOptions>) -> Self {
42 Self {
43 peer_lookup_service,
44 client: RwLock::new(None),
45 options,
46 }
47 }
48
49 async fn build_client(&self) -> crate::error::Result<Client> {
51 let frontends = self
52 .peer_lookup_service
53 .active_frontends()
54 .await
55 .context(LookupFrontendsSnafu)?;
56
57 ensure!(!frontends.is_empty(), NoAvailableFrontendSnafu);
58
59 let urls = frontends
60 .into_iter()
61 .map(|peer| peer.addr)
62 .collect::<Vec<_>>();
63
64 debug!("Available frontend addresses: {:?}", urls);
65
66 Ok(Client::with_urls(urls))
67 }
68
69 async fn maybe_init_client(&self) -> Result<Client, BoxedError> {
71 let mut guard = self.client.write().await;
72 if guard.is_none() {
73 let client = self.build_client().await.map_err(BoxedError::new)?;
74 *guard = Some(client);
75 }
76
77 Ok(guard.as_ref().unwrap().clone())
79 }
80
81 async fn reset_client(&self) {
83 warn!("Resetting the client");
84 let mut guard = self.client.write().await;
85 guard.take();
86 }
87}
88
89#[async_trait::async_trait]
90impl Inserter for InsertForwarder {
91 async fn insert_rows(
92 &self,
93 context: &Context<'_>,
94 requests: RowInsertRequests,
95 ) -> ClientResult<()> {
96 let client = self.maybe_init_client().await.context(ExternalSnafu)?;
97 let database = Database::new(context.catalog, context.schema, client);
98 let hints = self.options.as_ref().map_or(vec![], |o| o.to_hints());
99
100 if let Err(e) = database
101 .row_inserts_with_hints(
102 requests,
103 &hints
104 .iter()
105 .map(|(k, v)| (*k, v.as_str()))
106 .collect::<Vec<_>>(),
107 )
108 .await
109 .map_err(BoxedError::new)
110 .context(ExternalSnafu)
111 {
112 self.reset_client().await;
114 return Err(e);
115 };
116
117 Ok(())
118 }
119
120 fn set_options(&mut self, options: &InsertOptions) {
121 self.options = Some(*options);
122 }
123}