1use std::collections::HashMap;
18use std::sync::{Arc, Mutex, Weak};
19
20use api::v1::greptime_request::Request;
21use api::v1::query_request::Query;
22use api::v1::{CreateTableExpr, QueryRequest};
23use client::{Client, Database};
24use common_error::ext::BoxedError;
25use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
26use common_meta::peer::{Peer, PeerDiscovery};
27use common_query::Output;
28use common_telemetry::warn;
29use meta_client::client::MetaClient;
30use query::datafusion::QUERY_PARALLELISM_HINT;
31use query::options::QueryOptions;
32use rand::rng;
33use rand::seq::SliceRandom;
34use servers::query_handler::grpc::GrpcQueryHandler;
35use session::context::{QueryContextBuilder, QueryContextRef};
36use session::hints::READ_PREFERENCE_HINT;
37use snafu::{OptionExt, ResultExt};
38use tokio::sync::SetOnce;
39
40use crate::batching_mode::BatchingModeOptions;
41use crate::error::{
42 CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
43 NoAvailableFrontendSnafu, UnexpectedSnafu,
44};
45use crate::{Error, FlowAuthHeader};
46
47#[async_trait::async_trait]
51pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
52 async fn do_query(
53 &self,
54 query: Request,
55 ctx: QueryContextRef,
56 ) -> std::result::Result<Output, BoxedError>;
57}
58
59#[async_trait::async_trait]
61impl<T: GrpcQueryHandler + Send + Sync + 'static> GrpcQueryHandlerWithBoxedError for T {
62 async fn do_query(
63 &self,
64 query: Request,
65 ctx: QueryContextRef,
66 ) -> std::result::Result<Output, BoxedError> {
67 self.do_query(query, ctx).await.map_err(BoxedError::new)
68 }
69}
70
71#[derive(Debug, Clone)]
72pub struct HandlerMutable {
73 handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
74 is_initialized: Arc<SetOnce<()>>,
75}
76
77impl HandlerMutable {
78 pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
79 *self.handler.lock().unwrap() = Some(handler);
80 let _ = self.is_initialized.set(());
82 }
83}
84
85#[derive(Debug, Clone)]
89pub enum FrontendClient {
90 Distributed {
91 meta_client: Arc<MetaClient>,
92 chnl_mgr: ChannelManager,
93 auth: Option<FlowAuthHeader>,
94 query: QueryOptions,
95 batch_opts: BatchingModeOptions,
96 },
97 Standalone {
98 database_client: HandlerMutable,
101 query: QueryOptions,
102 },
103}
104
105impl FrontendClient {
106 pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
108 let is_initialized = Arc::new(SetOnce::new());
109 let handler = HandlerMutable {
110 handler: Arc::new(Mutex::new(None)),
111 is_initialized,
112 };
113 (
114 Self::Standalone {
115 database_client: handler.clone(),
116 query,
117 },
118 handler,
119 )
120 }
121
122 pub async fn wait_initialized(&self) {
124 if let FrontendClient::Standalone {
125 database_client, ..
126 } = self
127 {
128 database_client.is_initialized.wait().await;
129 }
130 }
131
132 pub fn from_meta_client(
133 meta_client: Arc<MetaClient>,
134 auth: Option<FlowAuthHeader>,
135 query: QueryOptions,
136 batch_opts: BatchingModeOptions,
137 ) -> Result<Self, Error> {
138 common_telemetry::info!("Frontend client build with auth={:?}", auth);
139 Ok(Self::Distributed {
140 meta_client,
141 chnl_mgr: {
142 let cfg = ChannelConfig::new()
143 .connect_timeout(batch_opts.grpc_conn_timeout)
144 .timeout(Some(batch_opts.query_timeout));
145
146 let tls_config = load_client_tls_config(batch_opts.frontend_tls.clone())
147 .context(InvalidClientConfigSnafu)?;
148 ChannelManager::with_config(cfg, tls_config)
149 },
150 auth,
151 query,
152 batch_opts,
153 })
154 }
155
156 pub fn from_grpc_handler(
157 grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
158 query: QueryOptions,
159 ) -> Self {
160 let is_initialized = Arc::new(SetOnce::new_with(Some(())));
161 let handler = HandlerMutable {
162 handler: Arc::new(Mutex::new(Some(grpc_handler))),
163 is_initialized: is_initialized.clone(),
164 };
165
166 Self::Standalone {
167 database_client: handler,
168 query,
169 }
170 }
171}
172
173#[derive(Debug, Clone)]
174pub struct DatabaseWithPeer {
175 pub database: Database,
176 pub peer: Peer,
177}
178
179impl DatabaseWithPeer {
180 fn new(database: Database, peer: Peer) -> Self {
181 Self { database, peer }
182 }
183
184 async fn try_select_one(&self) -> Result<(), Error> {
186 let _ = self
188 .database
189 .sql("SELECT 1")
190 .await
191 .with_context(|_| InvalidRequestSnafu {
192 context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
193 })?;
194 Ok(())
195 }
196}
197
198impl FrontendClient {
199 pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<Peer>, Error> {
201 let Self::Distributed { meta_client, .. } = self else {
202 return Ok(vec![]);
203 };
204
205 meta_client
206 .active_frontends()
207 .await
208 .map_err(BoxedError::new)
209 .context(ExternalSnafu)
210 }
211
212 async fn get_random_active_frontend(
214 &self,
215 catalog: &str,
216 schema: &str,
217 ) -> Result<DatabaseWithPeer, Error> {
218 let Self::Distributed {
219 meta_client: _,
220 chnl_mgr,
221 auth,
222 query: _,
223 batch_opts,
224 } = self
225 else {
226 return UnexpectedSnafu {
227 reason: "Expect distributed mode",
228 }
229 .fail();
230 };
231
232 let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
233 interval.tick().await;
234 for retry in 0..batch_opts.experimental_grpc_max_retries {
235 let mut frontends = self.scan_for_frontend().await?;
236 frontends.shuffle(&mut rng());
238
239 for peer in frontends {
240 let addr = peer.addr.clone();
241 let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
242 let database = {
243 let mut db = Database::new(catalog, schema, client);
244 if let Some(auth) = auth {
245 db.set_auth(auth.auth().clone());
246 }
247 db
248 };
249 let db = DatabaseWithPeer::new(database, peer);
250 match db.try_select_one().await {
251 Ok(_) => return Ok(db),
252 Err(e) => {
253 warn!(
254 "Failed to connect to frontend {} on retry={}: \n{e:?}",
255 addr, retry
256 );
257 }
258 }
259 }
260 interval.tick().await;
263 }
264
265 NoAvailableFrontendSnafu {
266 timeout: batch_opts.grpc_conn_timeout,
267 context: "No available frontend found that is able to process query",
268 }
269 .fail()
270 }
271
272 pub async fn create(
273 &self,
274 create: CreateTableExpr,
275 catalog: &str,
276 schema: &str,
277 ) -> Result<u32, Error> {
278 self.handle(
279 Request::Ddl(api::v1::DdlRequest {
280 expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())),
281 }),
282 catalog,
283 schema,
284 &mut None,
285 )
286 .await
287 .map_err(BoxedError::new)
288 .with_context(|_| CreateSinkTableSnafu {
289 create: create.clone(),
290 })
291 }
292
293 pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
295 match self {
296 FrontendClient::Distributed { .. } => {
297 let db = self.get_random_active_frontend(catalog, schema).await?;
298 db.database
299 .sql(sql)
300 .await
301 .map_err(BoxedError::new)
302 .context(ExternalSnafu)
303 }
304 FrontendClient::Standalone {
305 database_client, ..
306 } => {
307 let ctx = QueryContextBuilder::default()
308 .current_catalog(catalog.to_string())
309 .current_schema(schema.to_string())
310 .build();
311 let ctx = Arc::new(ctx);
312 {
313 let database_client = {
314 database_client
315 .handler
316 .lock()
317 .map_err(|e| {
318 UnexpectedSnafu {
319 reason: format!("Failed to lock database client: {e}"),
320 }
321 .build()
322 })?
323 .as_ref()
324 .context(UnexpectedSnafu {
325 reason: "Standalone's frontend instance is not set",
326 })?
327 .upgrade()
328 .context(UnexpectedSnafu {
329 reason: "Failed to upgrade database client",
330 })?
331 };
332 let req = Request::Query(QueryRequest {
333 query: Some(Query::Sql(sql.to_string())),
334 });
335 database_client
336 .do_query(req, ctx)
337 .await
338 .map_err(BoxedError::new)
339 .context(ExternalSnafu)
340 }
341 }
342 }
343 }
344
345 pub(crate) async fn handle(
347 &self,
348 req: api::v1::greptime_request::Request,
349 catalog: &str,
350 schema: &str,
351 peer_desc: &mut Option<PeerDesc>,
352 ) -> Result<u32, Error> {
353 match self {
354 FrontendClient::Distributed {
355 query, batch_opts, ..
356 } => {
357 let db = self.get_random_active_frontend(catalog, schema).await?;
358
359 *peer_desc = Some(PeerDesc::Dist {
360 peer: db.peer.clone(),
361 });
362
363 db.database
364 .handle_with_retry(
365 req.clone(),
366 batch_opts.experimental_grpc_max_retries,
367 &[
368 (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
369 (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
370 ],
371 )
372 .await
373 .with_context(|_| InvalidRequestSnafu {
374 context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
375 })
376 }
377 FrontendClient::Standalone {
378 database_client,
379 query,
380 } => {
381 let ctx = QueryContextBuilder::default()
382 .current_catalog(catalog.to_string())
383 .current_schema(schema.to_string())
384 .extensions(HashMap::from([(
385 QUERY_PARALLELISM_HINT.to_string(),
386 query.parallelism.to_string(),
387 )]))
388 .build();
389 let ctx = Arc::new(ctx);
390 {
391 let database_client = {
392 database_client
393 .handler
394 .lock()
395 .map_err(|e| {
396 UnexpectedSnafu {
397 reason: format!("Failed to lock database client: {e}"),
398 }
399 .build()
400 })?
401 .as_ref()
402 .context(UnexpectedSnafu {
403 reason: "Standalone's frontend instance is not set",
404 })?
405 .upgrade()
406 .context(UnexpectedSnafu {
407 reason: "Failed to upgrade database client",
408 })?
409 };
410 let resp: common_query::Output = database_client
411 .do_query(req, ctx)
412 .await
413 .map_err(BoxedError::new)
414 .context(ExternalSnafu)?;
415 match resp.data {
416 common_query::OutputData::AffectedRows(rows) => {
417 Ok(rows.try_into().map_err(|_| {
418 UnexpectedSnafu {
419 reason: format!("Failed to convert rows to u32: {}", rows),
420 }
421 .build()
422 })?)
423 }
424 _ => UnexpectedSnafu {
425 reason: "Unexpected output data",
426 }
427 .fail(),
428 }
429 }
430 }
431 }
432 }
433}
434
435#[derive(Debug, Default)]
437pub(crate) enum PeerDesc {
438 Dist {
440 peer: Peer,
442 },
443 #[default]
445 Standalone,
446}
447
448impl std::fmt::Display for PeerDesc {
449 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
450 match self {
451 PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
452 PeerDesc::Standalone => write!(f, "standalone"),
453 }
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use std::time::Duration;
460
461 use common_query::Output;
462 use tokio::time::timeout;
463
464 use super::*;
465
466 #[derive(Debug)]
467 struct NoopHandler;
468
469 #[async_trait::async_trait]
470 impl GrpcQueryHandlerWithBoxedError for NoopHandler {
471 async fn do_query(
472 &self,
473 _query: Request,
474 _ctx: QueryContextRef,
475 ) -> std::result::Result<Output, BoxedError> {
476 Ok(Output::new_with_affected_rows(0))
477 }
478 }
479
480 #[tokio::test]
481 async fn wait_initialized() {
482 let (client, handler_mut) =
483 FrontendClient::from_empty_grpc_handler(QueryOptions::default());
484
485 assert!(
486 timeout(Duration::from_millis(50), client.wait_initialized())
487 .await
488 .is_err()
489 );
490
491 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
492 handler_mut.set_handler(Arc::downgrade(&handler)).await;
493
494 timeout(Duration::from_secs(1), client.wait_initialized())
495 .await
496 .expect("wait_initialized should complete after handler is set");
497
498 timeout(Duration::from_millis(10), client.wait_initialized())
499 .await
500 .expect("wait_initialized should be a no-op once initialized");
501
502 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
503 let client =
504 FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
505 assert!(
506 timeout(Duration::from_millis(10), client.wait_initialized())
507 .await
508 .is_ok()
509 );
510
511 let meta_client = Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend));
512 let client = FrontendClient::from_meta_client(
513 meta_client,
514 None,
515 QueryOptions::default(),
516 BatchingModeOptions::default(),
517 )
518 .unwrap();
519 assert!(
520 timeout(Duration::from_millis(10), client.wait_initialized())
521 .await
522 .is_ok()
523 );
524 }
525}