meta_srv/utils/
database.rs1use std::sync::Arc;
16
17use client::error::{ExternalSnafu, Result as ClientResult};
18use client::{Client, Database, Output};
19use common_error::ext::BoxedError;
20use common_meta::peer::PeerDiscoveryRef;
21use common_telemetry::{debug, warn};
22use snafu::{ResultExt, ensure};
23use tokio::sync::RwLock;
24
25use crate::error::{ListActiveFrontendsSnafu, NoAvailableFrontendSnafu, Result};
26
27pub type DatabaseOperatorRef = Arc<DatabaseOperator>;
28
29#[derive(Debug, Clone, Copy)]
30pub struct DatabaseContext<'a> {
32 pub catalog: &'a str,
34 pub schema: &'a str,
36}
37
38impl<'a> DatabaseContext<'a> {
39 pub fn new(catalog: &'a str, schema: &'a str) -> Self {
41 Self { catalog, schema }
42 }
43}
44
45pub struct DatabaseOperator {
47 peer_discovery: PeerDiscoveryRef,
48 client: RwLock<Option<Client>>,
49}
50
51impl DatabaseOperator {
52 pub fn new(peer_discovery: PeerDiscoveryRef) -> Self {
54 Self {
55 peer_discovery,
56 client: RwLock::new(None),
57 }
58 }
59
60 pub async fn insert(
62 &self,
63 ctx: &DatabaseContext<'_>,
64 requests: api::v1::RowInsertRequests,
65 hints: &[(&str, &str)],
66 ) -> ClientResult<u32> {
67 let client = self.maybe_init_client().await?;
68 let database = Database::new(ctx.catalog, ctx.schema, client);
69
70 let result = database.row_inserts_with_hints(requests, hints).await;
71
72 if should_reset_client(&result) {
73 self.reset_client().await;
74 }
75
76 result
77 }
78
79 pub async fn logical_plan(
81 &self,
82 ctx: &DatabaseContext<'_>,
83 plan: Vec<u8>,
84 ) -> ClientResult<Output> {
85 let client = self.maybe_init_client().await?;
86 let database = Database::new(ctx.catalog, ctx.schema, client);
87
88 let result = database.logical_plan(plan).await;
89
90 if should_reset_client(&result) {
91 self.reset_client().await;
92 }
93
94 result
95 }
96
97 async fn build_client(&self) -> Result<Client> {
98 let frontends = self
99 .peer_discovery
100 .active_frontends()
101 .await
102 .context(ListActiveFrontendsSnafu)?;
103
104 ensure!(!frontends.is_empty(), NoAvailableFrontendSnafu);
105
106 let urls = frontends
107 .into_iter()
108 .map(|peer| peer.addr)
109 .collect::<Vec<_>>();
110
111 debug!("Available frontend addresses: {:?}", urls);
112
113 Ok(Client::with_urls(urls))
114 }
115
116 async fn maybe_init_client(&self) -> ClientResult<Client> {
117 if let Some(client) = self.client.read().await.as_ref() {
118 return Ok(client.clone());
119 }
120
121 let client = self
122 .build_client()
123 .await
124 .map_err(BoxedError::new)
125 .context(ExternalSnafu)?;
126
127 let mut guard = self.client.write().await;
128 if let Some(client) = guard.as_ref() {
129 return Ok(client.clone());
130 }
131
132 *guard = Some(client.clone());
133 Ok(client)
134 }
135
136 async fn reset_client(&self) {
137 warn!("Resetting the client");
138 let mut guard = self.client.write().await;
139 guard.take();
140 }
141}
142
143fn should_reset_client<T>(result: &client::error::Result<T>) -> bool {
144 result
145 .as_ref()
146 .err()
147 .map(|err| err.is_connection_error())
148 .unwrap_or(false)
149}