1use std::collections::HashMap;
18use std::sync::{Arc, Weak};
19use std::time::SystemTime;
20
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{CreateTableExpr, QueryRequest};
24use client::{Client, Database};
25use common_error::ext::{BoxedError, ErrorExt};
26use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
27use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
28use common_meta::peer::Peer;
29use common_meta::rpc::store::RangeRequest;
30use common_query::Output;
31use common_telemetry::warn;
32use meta_client::client::MetaClient;
33use query::datafusion::QUERY_PARALLELISM_HINT;
34use query::options::QueryOptions;
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use snafu::{OptionExt, ResultExt};
40
41use crate::batching_mode::{
42 DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
43 GRPC_MAX_RETRIES,
44};
45use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
46use crate::{Error, FlowAuthHeader};
47
48#[async_trait::async_trait]
55pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
56 async fn do_query(
57 &self,
58 query: Request,
59 ctx: QueryContextRef,
60 ) -> std::result::Result<Output, BoxedError>;
61}
62
63#[async_trait::async_trait]
65impl<
66 E: ErrorExt + Send + Sync + 'static,
67 T: GrpcQueryHandler<Error = E> + Send + Sync + 'static,
68 > GrpcQueryHandlerWithBoxedError for T
69{
70 async fn do_query(
71 &self,
72 query: Request,
73 ctx: QueryContextRef,
74 ) -> std::result::Result<Output, BoxedError> {
75 self.do_query(query, ctx).await.map_err(BoxedError::new)
76 }
77}
78
79type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
80
81#[derive(Debug, Clone)]
85pub enum FrontendClient {
86 Distributed {
87 meta_client: Arc<MetaClient>,
88 chnl_mgr: ChannelManager,
89 auth: Option<FlowAuthHeader>,
90 query: QueryOptions,
91 },
92 Standalone {
93 database_client: HandlerMutable,
96 query: QueryOptions,
97 },
98}
99
100impl FrontendClient {
101 pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
103 let handler = Arc::new(std::sync::Mutex::new(None));
104 (
105 Self::Standalone {
106 database_client: handler.clone(),
107 query,
108 },
109 handler,
110 )
111 }
112
113 pub fn from_meta_client(
114 meta_client: Arc<MetaClient>,
115 auth: Option<FlowAuthHeader>,
116 query: QueryOptions,
117 ) -> Self {
118 common_telemetry::info!("Frontend client build with auth={:?}", auth);
119 Self::Distributed {
120 meta_client,
121 chnl_mgr: {
122 let cfg = ChannelConfig::new()
123 .connect_timeout(GRPC_CONN_TIMEOUT)
124 .timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
125 ChannelManager::with_config(cfg)
126 },
127 auth,
128 query,
129 }
130 }
131
132 pub fn from_grpc_handler(
133 grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
134 query: QueryOptions,
135 ) -> Self {
136 Self::Standalone {
137 database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
138 query,
139 }
140 }
141}
142
143#[derive(Debug, Clone)]
144pub struct DatabaseWithPeer {
145 pub database: Database,
146 pub peer: Peer,
147}
148
149impl DatabaseWithPeer {
150 fn new(database: Database, peer: Peer) -> Self {
151 Self { database, peer }
152 }
153
154 async fn try_select_one(&self) -> Result<(), Error> {
156 let _ = self
158 .database
159 .sql("SELECT 1")
160 .await
161 .with_context(|_| InvalidRequestSnafu {
162 context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
163 })?;
164 Ok(())
165 }
166}
167
168impl FrontendClient {
169 pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
171 let Self::Distributed { meta_client, .. } = self else {
172 return Ok(vec![]);
173 };
174 let cluster_client = meta_client
175 .cluster_client()
176 .map_err(BoxedError::new)
177 .context(ExternalSnafu)?;
178
179 let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
180 let req = RangeRequest::new().with_prefix(prefix);
181 let resp = cluster_client
182 .range(req)
183 .await
184 .map_err(BoxedError::new)
185 .context(ExternalSnafu)?;
186 let mut res = Vec::with_capacity(resp.kvs.len());
187 for kv in resp.kvs {
188 let key = NodeInfoKey::try_from(kv.key)
189 .map_err(BoxedError::new)
190 .context(ExternalSnafu)?;
191
192 let val = NodeInfo::try_from(kv.value)
193 .map_err(BoxedError::new)
194 .context(ExternalSnafu)?;
195 res.push((key, val));
196 }
197 Ok(res)
198 }
199
200 async fn get_random_active_frontend(
203 &self,
204 catalog: &str,
205 schema: &str,
206 ) -> Result<DatabaseWithPeer, Error> {
207 let Self::Distributed {
208 meta_client: _,
209 chnl_mgr,
210 auth,
211 query: _,
212 } = self
213 else {
214 return UnexpectedSnafu {
215 reason: "Expect distributed mode",
216 }
217 .fail();
218 };
219
220 let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
221 interval.tick().await;
222 for retry in 0..GRPC_MAX_RETRIES {
223 let mut frontends = self.scan_for_frontend().await?;
224 let now_in_ms = SystemTime::now()
225 .duration_since(SystemTime::UNIX_EPOCH)
226 .unwrap()
227 .as_millis() as i64;
228 frontends.shuffle(&mut rng());
230
231 for (_, node_info) in frontends
233 .iter()
234 .filter(|(_, node_info)| {
236 node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
237 > now_in_ms
238 })
239 {
240 let addr = &node_info.peer.addr;
241 let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
242 let database = {
243 let mut db = Database::new(catalog, schema, client);
244 if let Some(auth) = auth {
245 db.set_auth(auth.auth().clone());
246 }
247 db
248 };
249 let db = DatabaseWithPeer::new(database, node_info.peer.clone());
250 match db.try_select_one().await {
251 Ok(_) => return Ok(db),
252 Err(e) => {
253 warn!(
254 "Failed to connect to frontend {} on retry={}: \n{e:?}",
255 addr, retry
256 );
257 }
258 }
259 }
260 interval.tick().await;
263 }
264
265 NoAvailableFrontendSnafu {
266 timeout: GRPC_CONN_TIMEOUT,
267 context: "No available frontend found that is able to process query",
268 }
269 .fail()
270 }
271
272 pub async fn create(
273 &self,
274 create: CreateTableExpr,
275 catalog: &str,
276 schema: &str,
277 ) -> Result<u32, Error> {
278 self.handle(
279 Request::Ddl(api::v1::DdlRequest {
280 expr: Some(api::v1::ddl_request::Expr::CreateTable(create)),
281 }),
282 catalog,
283 schema,
284 &mut None,
285 )
286 .await
287 }
288
289 pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
291 match self {
292 FrontendClient::Distributed { .. } => {
293 let db = self.get_random_active_frontend(catalog, schema).await?;
294 db.database
295 .sql(sql)
296 .await
297 .map_err(BoxedError::new)
298 .context(ExternalSnafu)
299 }
300 FrontendClient::Standalone {
301 database_client, ..
302 } => {
303 let ctx = QueryContextBuilder::default()
304 .current_catalog(catalog.to_string())
305 .current_schema(schema.to_string())
306 .build();
307 let ctx = Arc::new(ctx);
308 {
309 let database_client = {
310 database_client
311 .lock()
312 .map_err(|e| {
313 UnexpectedSnafu {
314 reason: format!("Failed to lock database client: {e}"),
315 }
316 .build()
317 })?
318 .as_ref()
319 .context(UnexpectedSnafu {
320 reason: "Standalone's frontend instance is not set",
321 })?
322 .upgrade()
323 .context(UnexpectedSnafu {
324 reason: "Failed to upgrade database client",
325 })?
326 };
327 let req = Request::Query(QueryRequest {
328 query: Some(Query::Sql(sql.to_string())),
329 });
330 database_client
331 .do_query(req, ctx)
332 .await
333 .map_err(BoxedError::new)
334 .context(ExternalSnafu)
335 }
336 }
337 }
338 }
339
340 pub(crate) async fn handle(
342 &self,
343 req: api::v1::greptime_request::Request,
344 catalog: &str,
345 schema: &str,
346 peer_desc: &mut Option<PeerDesc>,
347 ) -> Result<u32, Error> {
348 match self {
349 FrontendClient::Distributed { query, .. } => {
350 let db = self.get_random_active_frontend(catalog, schema).await?;
351
352 *peer_desc = Some(PeerDesc::Dist {
353 peer: db.peer.clone(),
354 });
355
356 db.database
357 .handle_with_retry(
358 req.clone(),
359 GRPC_MAX_RETRIES,
360 &[(QUERY_PARALLELISM_HINT, &query.parallelism.to_string())],
361 )
362 .await
363 .with_context(|_| InvalidRequestSnafu {
364 context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
365 })
366 }
367 FrontendClient::Standalone {
368 database_client,
369 query,
370 } => {
371 let ctx = QueryContextBuilder::default()
372 .current_catalog(catalog.to_string())
373 .current_schema(schema.to_string())
374 .extensions(HashMap::from([(
375 QUERY_PARALLELISM_HINT.to_string(),
376 query.parallelism.to_string(),
377 )]))
378 .build();
379 let ctx = Arc::new(ctx);
380 {
381 let database_client = {
382 database_client
383 .lock()
384 .map_err(|e| {
385 UnexpectedSnafu {
386 reason: format!("Failed to lock database client: {e}"),
387 }
388 .build()
389 })?
390 .as_ref()
391 .context(UnexpectedSnafu {
392 reason: "Standalone's frontend instance is not set",
393 })?
394 .upgrade()
395 .context(UnexpectedSnafu {
396 reason: "Failed to upgrade database client",
397 })?
398 };
399 let resp: common_query::Output = database_client
400 .do_query(req, ctx)
401 .await
402 .map_err(BoxedError::new)
403 .context(ExternalSnafu)?;
404 match resp.data {
405 common_query::OutputData::AffectedRows(rows) => {
406 Ok(rows.try_into().map_err(|_| {
407 UnexpectedSnafu {
408 reason: format!("Failed to convert rows to u32: {}", rows),
409 }
410 .build()
411 })?)
412 }
413 _ => UnexpectedSnafu {
414 reason: "Unexpected output data",
415 }
416 .fail(),
417 }
418 }
419 }
420 }
421 }
422}
423
424#[derive(Debug, Default)]
426pub(crate) enum PeerDesc {
427 Dist {
429 peer: Peer,
431 },
432 #[default]
434 Standalone,
435}
436
437impl std::fmt::Display for PeerDesc {
438 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
439 match self {
440 PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
441 PeerDesc::Standalone => write!(f, "standalone"),
442 }
443 }
444}