1use std::collections::HashMap;
18use std::sync::{Arc, Mutex, Weak};
19use std::time::SystemTime;
20
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{CreateTableExpr, QueryRequest};
24use client::{Client, Database};
25use common_error::ext::BoxedError;
26use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
27use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
28use common_meta::peer::Peer;
29use common_meta::rpc::store::RangeRequest;
30use common_query::Output;
31use common_telemetry::warn;
32use meta_client::client::MetaClient;
33use query::datafusion::QUERY_PARALLELISM_HINT;
34use query::options::QueryOptions;
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use session::hints::READ_PREFERENCE_HINT;
40use snafu::{OptionExt, ResultExt};
41use tokio::sync::SetOnce;
42
43use crate::batching_mode::BatchingModeOptions;
44use crate::error::{
45 CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
46 NoAvailableFrontendSnafu, UnexpectedSnafu,
47};
48use crate::{Error, FlowAuthHeader};
49
50#[async_trait::async_trait]
54pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
55 async fn do_query(
56 &self,
57 query: Request,
58 ctx: QueryContextRef,
59 ) -> std::result::Result<Output, BoxedError>;
60}
61
62#[async_trait::async_trait]
64impl<T: GrpcQueryHandler + Send + Sync + 'static> GrpcQueryHandlerWithBoxedError for T {
65 async fn do_query(
66 &self,
67 query: Request,
68 ctx: QueryContextRef,
69 ) -> std::result::Result<Output, BoxedError> {
70 self.do_query(query, ctx).await.map_err(BoxedError::new)
71 }
72}
73
74#[derive(Debug, Clone)]
75pub struct HandlerMutable {
76 handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
77 is_initialized: Arc<SetOnce<()>>,
78}
79
80impl HandlerMutable {
81 pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
82 *self.handler.lock().unwrap() = Some(handler);
83 let _ = self.is_initialized.set(());
85 }
86}
87
88#[derive(Debug, Clone)]
92pub enum FrontendClient {
93 Distributed {
94 meta_client: Arc<MetaClient>,
95 chnl_mgr: ChannelManager,
96 auth: Option<FlowAuthHeader>,
97 query: QueryOptions,
98 batch_opts: BatchingModeOptions,
99 },
100 Standalone {
101 database_client: HandlerMutable,
104 query: QueryOptions,
105 },
106}
107
108impl FrontendClient {
109 pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
111 let is_initialized = Arc::new(SetOnce::new());
112 let handler = HandlerMutable {
113 handler: Arc::new(Mutex::new(None)),
114 is_initialized,
115 };
116 (
117 Self::Standalone {
118 database_client: handler.clone(),
119 query,
120 },
121 handler,
122 )
123 }
124
125 pub async fn wait_initialized(&self) {
127 if let FrontendClient::Standalone {
128 database_client, ..
129 } = self
130 {
131 database_client.is_initialized.wait().await;
132 }
133 }
134
135 pub fn from_meta_client(
136 meta_client: Arc<MetaClient>,
137 auth: Option<FlowAuthHeader>,
138 query: QueryOptions,
139 batch_opts: BatchingModeOptions,
140 ) -> Result<Self, Error> {
141 common_telemetry::info!("Frontend client build with auth={:?}", auth);
142 Ok(Self::Distributed {
143 meta_client,
144 chnl_mgr: {
145 let cfg = ChannelConfig::new()
146 .connect_timeout(batch_opts.grpc_conn_timeout)
147 .timeout(Some(batch_opts.query_timeout));
148
149 let tls_config = load_client_tls_config(batch_opts.frontend_tls.clone())
150 .context(InvalidClientConfigSnafu)?;
151 ChannelManager::with_config(cfg, tls_config)
152 },
153 auth,
154 query,
155 batch_opts,
156 })
157 }
158
159 pub fn from_grpc_handler(
160 grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
161 query: QueryOptions,
162 ) -> Self {
163 let is_initialized = Arc::new(SetOnce::new_with(Some(())));
164 let handler = HandlerMutable {
165 handler: Arc::new(Mutex::new(Some(grpc_handler))),
166 is_initialized: is_initialized.clone(),
167 };
168
169 Self::Standalone {
170 database_client: handler,
171 query,
172 }
173 }
174}
175
176#[derive(Debug, Clone)]
177pub struct DatabaseWithPeer {
178 pub database: Database,
179 pub peer: Peer,
180}
181
182impl DatabaseWithPeer {
183 fn new(database: Database, peer: Peer) -> Self {
184 Self { database, peer }
185 }
186
187 async fn try_select_one(&self) -> Result<(), Error> {
189 let _ = self
191 .database
192 .sql("SELECT 1")
193 .await
194 .with_context(|_| InvalidRequestSnafu {
195 context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
196 })?;
197 Ok(())
198 }
199}
200
201impl FrontendClient {
202 pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
204 let Self::Distributed { meta_client, .. } = self else {
205 return Ok(vec![]);
206 };
207 let cluster_client = meta_client
208 .cluster_client()
209 .map_err(BoxedError::new)
210 .context(ExternalSnafu)?;
211
212 let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
213 let req = RangeRequest::new().with_prefix(prefix);
214 let resp = cluster_client
215 .range(req)
216 .await
217 .map_err(BoxedError::new)
218 .context(ExternalSnafu)?;
219 let mut res = Vec::with_capacity(resp.kvs.len());
220 for kv in resp.kvs {
221 let key = NodeInfoKey::try_from(kv.key)
222 .map_err(BoxedError::new)
223 .context(ExternalSnafu)?;
224
225 let val = NodeInfo::try_from(kv.value)
226 .map_err(BoxedError::new)
227 .context(ExternalSnafu)?;
228 res.push((key, val));
229 }
230 Ok(res)
231 }
232
233 async fn get_random_active_frontend(
236 &self,
237 catalog: &str,
238 schema: &str,
239 ) -> Result<DatabaseWithPeer, Error> {
240 let Self::Distributed {
241 meta_client: _,
242 chnl_mgr,
243 auth,
244 query: _,
245 batch_opts,
246 } = self
247 else {
248 return UnexpectedSnafu {
249 reason: "Expect distributed mode",
250 }
251 .fail();
252 };
253
254 let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
255 interval.tick().await;
256 for retry in 0..batch_opts.experimental_grpc_max_retries {
257 let mut frontends = self.scan_for_frontend().await?;
258 let now_in_ms = SystemTime::now()
259 .duration_since(SystemTime::UNIX_EPOCH)
260 .unwrap()
261 .as_millis() as i64;
262 frontends.shuffle(&mut rng());
264
265 for (_, node_info) in frontends
267 .iter()
268 .filter(|(_, node_info)| {
270 node_info.last_activity_ts
271 + batch_opts
272 .experimental_frontend_activity_timeout
273 .as_millis() as i64
274 > now_in_ms
275 })
276 {
277 let addr = &node_info.peer.addr;
278 let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
279 let database = {
280 let mut db = Database::new(catalog, schema, client);
281 if let Some(auth) = auth {
282 db.set_auth(auth.auth().clone());
283 }
284 db
285 };
286 let db = DatabaseWithPeer::new(database, node_info.peer.clone());
287 match db.try_select_one().await {
288 Ok(_) => return Ok(db),
289 Err(e) => {
290 warn!(
291 "Failed to connect to frontend {} on retry={}: \n{e:?}",
292 addr, retry
293 );
294 }
295 }
296 }
297 interval.tick().await;
300 }
301
302 NoAvailableFrontendSnafu {
303 timeout: batch_opts.grpc_conn_timeout,
304 context: "No available frontend found that is able to process query",
305 }
306 .fail()
307 }
308
309 pub async fn create(
310 &self,
311 create: CreateTableExpr,
312 catalog: &str,
313 schema: &str,
314 ) -> Result<u32, Error> {
315 self.handle(
316 Request::Ddl(api::v1::DdlRequest {
317 expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())),
318 }),
319 catalog,
320 schema,
321 &mut None,
322 )
323 .await
324 .map_err(BoxedError::new)
325 .with_context(|_| CreateSinkTableSnafu {
326 create: create.clone(),
327 })
328 }
329
330 pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
332 match self {
333 FrontendClient::Distributed { .. } => {
334 let db = self.get_random_active_frontend(catalog, schema).await?;
335 db.database
336 .sql(sql)
337 .await
338 .map_err(BoxedError::new)
339 .context(ExternalSnafu)
340 }
341 FrontendClient::Standalone {
342 database_client, ..
343 } => {
344 let ctx = QueryContextBuilder::default()
345 .current_catalog(catalog.to_string())
346 .current_schema(schema.to_string())
347 .build();
348 let ctx = Arc::new(ctx);
349 {
350 let database_client = {
351 database_client
352 .handler
353 .lock()
354 .map_err(|e| {
355 UnexpectedSnafu {
356 reason: format!("Failed to lock database client: {e}"),
357 }
358 .build()
359 })?
360 .as_ref()
361 .context(UnexpectedSnafu {
362 reason: "Standalone's frontend instance is not set",
363 })?
364 .upgrade()
365 .context(UnexpectedSnafu {
366 reason: "Failed to upgrade database client",
367 })?
368 };
369 let req = Request::Query(QueryRequest {
370 query: Some(Query::Sql(sql.to_string())),
371 });
372 database_client
373 .do_query(req, ctx)
374 .await
375 .map_err(BoxedError::new)
376 .context(ExternalSnafu)
377 }
378 }
379 }
380 }
381
382 pub(crate) async fn handle(
384 &self,
385 req: api::v1::greptime_request::Request,
386 catalog: &str,
387 schema: &str,
388 peer_desc: &mut Option<PeerDesc>,
389 ) -> Result<u32, Error> {
390 match self {
391 FrontendClient::Distributed {
392 query, batch_opts, ..
393 } => {
394 let db = self.get_random_active_frontend(catalog, schema).await?;
395
396 *peer_desc = Some(PeerDesc::Dist {
397 peer: db.peer.clone(),
398 });
399
400 db.database
401 .handle_with_retry(
402 req.clone(),
403 batch_opts.experimental_grpc_max_retries,
404 &[
405 (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
406 (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
407 ],
408 )
409 .await
410 .with_context(|_| InvalidRequestSnafu {
411 context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
412 })
413 }
414 FrontendClient::Standalone {
415 database_client,
416 query,
417 } => {
418 let ctx = QueryContextBuilder::default()
419 .current_catalog(catalog.to_string())
420 .current_schema(schema.to_string())
421 .extensions(HashMap::from([(
422 QUERY_PARALLELISM_HINT.to_string(),
423 query.parallelism.to_string(),
424 )]))
425 .build();
426 let ctx = Arc::new(ctx);
427 {
428 let database_client = {
429 database_client
430 .handler
431 .lock()
432 .map_err(|e| {
433 UnexpectedSnafu {
434 reason: format!("Failed to lock database client: {e}"),
435 }
436 .build()
437 })?
438 .as_ref()
439 .context(UnexpectedSnafu {
440 reason: "Standalone's frontend instance is not set",
441 })?
442 .upgrade()
443 .context(UnexpectedSnafu {
444 reason: "Failed to upgrade database client",
445 })?
446 };
447 let resp: common_query::Output = database_client
448 .do_query(req, ctx)
449 .await
450 .map_err(BoxedError::new)
451 .context(ExternalSnafu)?;
452 match resp.data {
453 common_query::OutputData::AffectedRows(rows) => {
454 Ok(rows.try_into().map_err(|_| {
455 UnexpectedSnafu {
456 reason: format!("Failed to convert rows to u32: {}", rows),
457 }
458 .build()
459 })?)
460 }
461 _ => UnexpectedSnafu {
462 reason: "Unexpected output data",
463 }
464 .fail(),
465 }
466 }
467 }
468 }
469 }
470}
471
472#[derive(Debug, Default)]
474pub(crate) enum PeerDesc {
475 Dist {
477 peer: Peer,
479 },
480 #[default]
482 Standalone,
483}
484
485impl std::fmt::Display for PeerDesc {
486 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
487 match self {
488 PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
489 PeerDesc::Standalone => write!(f, "standalone"),
490 }
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use std::time::Duration;
497
498 use common_query::Output;
499 use tokio::time::timeout;
500
501 use super::*;
502
503 #[derive(Debug)]
504 struct NoopHandler;
505
506 #[async_trait::async_trait]
507 impl GrpcQueryHandlerWithBoxedError for NoopHandler {
508 async fn do_query(
509 &self,
510 _query: Request,
511 _ctx: QueryContextRef,
512 ) -> std::result::Result<Output, BoxedError> {
513 Ok(Output::new_with_affected_rows(0))
514 }
515 }
516
517 #[tokio::test]
518 async fn wait_initialized() {
519 let (client, handler_mut) =
520 FrontendClient::from_empty_grpc_handler(QueryOptions::default());
521
522 assert!(
523 timeout(Duration::from_millis(50), client.wait_initialized())
524 .await
525 .is_err()
526 );
527
528 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
529 handler_mut.set_handler(Arc::downgrade(&handler)).await;
530
531 timeout(Duration::from_secs(1), client.wait_initialized())
532 .await
533 .expect("wait_initialized should complete after handler is set");
534
535 timeout(Duration::from_millis(10), client.wait_initialized())
536 .await
537 .expect("wait_initialized should be a no-op once initialized");
538
539 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
540 let client =
541 FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
542 assert!(
543 timeout(Duration::from_millis(10), client.wait_initialized())
544 .await
545 .is_ok()
546 );
547
548 let meta_client = Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend));
549 let client = FrontendClient::from_meta_client(
550 meta_client,
551 None,
552 QueryOptions::default(),
553 BatchingModeOptions::default(),
554 )
555 .unwrap();
556 assert!(
557 timeout(Duration::from_millis(10), client.wait_initialized())
558 .await
559 .is_ok()
560 );
561 }
562}