1use std::collections::HashMap;
18use std::sync::{Arc, Mutex, Weak};
19
20use api::v1::greptime_request::Request;
21use api::v1::query_request::Query;
22use api::v1::{CreateTableExpr, QueryRequest};
23use client::{Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, Database, OutputWithMetrics};
24use common_error::ext::BoxedError;
25use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
26use common_meta::peer::{Peer, PeerDiscovery};
27use common_query::{Output, OutputData};
28use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
29use common_telemetry::warn;
30use futures::stream::{FuturesUnordered, StreamExt};
31use meta_client::client::MetaClient;
32use query::datafusion::QUERY_PARALLELISM_HINT;
33use query::metrics::terminal_recordbatch_metrics_from_plan;
34use query::options::{FlowQueryExtensions, QueryOptions};
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use session::hints::READ_PREFERENCE_HINT;
40use snafu::{OptionExt, ResultExt};
41use tokio::sync::SetOnce;
42
43use crate::Error;
44use crate::batching_mode::BatchingModeOptions;
45use crate::error::{
46 CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
47 NoAvailableFrontendSnafu, UnexpectedSnafu,
48};
49
50#[async_trait::async_trait]
54pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
55 async fn do_query(
56 &self,
57 query: Request,
58 ctx: QueryContextRef,
59 ) -> std::result::Result<Output, BoxedError>;
60}
61
62#[async_trait::async_trait]
64impl<T: GrpcQueryHandler + Send + Sync + 'static> GrpcQueryHandlerWithBoxedError for T {
65 async fn do_query(
66 &self,
67 query: Request,
68 ctx: QueryContextRef,
69 ) -> std::result::Result<Output, BoxedError> {
70 self.do_query(query, ctx).await.map_err(BoxedError::new)
71 }
72}
73
74#[derive(Debug, Clone)]
75pub struct HandlerMutable {
76 handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
77 is_initialized: Arc<SetOnce<()>>,
78}
79
80impl HandlerMutable {
81 pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
82 *self.handler.lock().unwrap() = Some(handler);
83 let _ = self.is_initialized.set(());
85 }
86}
87
88#[derive(Debug, Clone)]
92pub enum FrontendClient {
93 Distributed {
94 meta_client: Arc<MetaClient>,
95 chnl_mgr: ChannelManager,
96 query: QueryOptions,
97 batch_opts: BatchingModeOptions,
98 },
99 Standalone {
100 database_client: HandlerMutable,
103 query: QueryOptions,
104 },
105}
106
107impl FrontendClient {
108 pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
110 let is_initialized = Arc::new(SetOnce::new());
111 let handler = HandlerMutable {
112 handler: Arc::new(Mutex::new(None)),
113 is_initialized,
114 };
115 (
116 Self::Standalone {
117 database_client: handler.clone(),
118 query,
119 },
120 handler,
121 )
122 }
123
124 pub async fn wait_initialized(&self) {
126 if let FrontendClient::Standalone {
127 database_client, ..
128 } = self
129 {
130 database_client.is_initialized.wait().await;
131 }
132 }
133
134 pub fn from_meta_client(
135 meta_client: Arc<MetaClient>,
136 query: QueryOptions,
137 batch_opts: BatchingModeOptions,
138 ) -> Result<Self, Error> {
139 common_telemetry::info!("Frontend client build without auth");
140 Ok(Self::Distributed {
141 meta_client,
142 chnl_mgr: {
143 let cfg = ChannelConfig::new()
144 .connect_timeout(batch_opts.grpc_conn_timeout)
145 .timeout(Some(batch_opts.query_timeout));
146
147 let tls_config = load_client_tls_config(batch_opts.frontend_tls.clone())
148 .context(InvalidClientConfigSnafu)?;
149 ChannelManager::with_config(cfg, tls_config)
150 },
151 query,
152 batch_opts,
153 })
154 }
155
156 pub fn from_grpc_handler(
157 grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
158 query: QueryOptions,
159 ) -> Self {
160 let is_initialized = Arc::new(SetOnce::new_with(Some(())));
161 let handler = HandlerMutable {
162 handler: Arc::new(Mutex::new(Some(grpc_handler))),
163 is_initialized: is_initialized.clone(),
164 };
165
166 Self::Standalone {
167 database_client: handler,
168 query,
169 }
170 }
171}
172
173#[derive(Debug, Clone)]
174pub struct DatabaseWithPeer {
175 pub database: Database,
176 pub peer: Peer,
177}
178
179impl DatabaseWithPeer {
180 fn new(database: Database, peer: Peer) -> Self {
181 Self { database, peer }
182 }
183
184 async fn try_select_one(&self) -> Result<(), Error> {
186 let _ = self
188 .database
189 .sql("SELECT 1")
190 .await
191 .with_context(|_| InvalidRequestSnafu {
192 context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
193 })?;
194 Ok(())
195 }
196}
197
198impl FrontendClient {
199 pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<Peer>, Error> {
201 let Self::Distributed { meta_client, .. } = self else {
202 return Ok(vec![]);
203 };
204
205 meta_client
206 .active_frontends()
207 .await
208 .map(|nodes| nodes.into_iter().map(|node| node.peer).collect())
209 .map_err(BoxedError::new)
210 .context(ExternalSnafu)
211 }
212
213 pub(crate) async fn check_all_frontends_without_auth(
219 &self,
220 frontends: &[Peer],
221 ) -> Result<Vec<String>, Error> {
222 let Self::Distributed {
223 chnl_mgr,
224 batch_opts,
225 ..
226 } = self
227 else {
228 return Ok(vec![]);
229 };
230
231 let probe_timeout = batch_opts.grpc_conn_timeout;
232 let mut probes = frontends
233 .iter()
234 .map(|peer| {
235 let addr = peer.addr.clone();
236 let chnl_mgr = chnl_mgr.clone();
237
238 async move {
239 let client = Client::with_manager_and_urls(chnl_mgr, vec![addr.clone()]);
240 let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
241
242 match tokio::time::timeout(probe_timeout, database.sql("SELECT 1")).await {
243 Ok(Ok(_)) => Ok(None),
244 Ok(Err(err)) if err.tonic_code() == Some(tonic::Code::Unauthenticated) => {
245 Err(err).context(InvalidRequestSnafu {
246 context: format!(
247 "Frontend {addr} rejected unauthenticated flownode probe; ensure frontend internal_grpc is advertised to metasrv"
248 ),
249 })
250 }
251 Ok(Err(err)) => Ok(Some(format!("{addr}: {err}"))),
252 Err(_) => Ok(Some(format!(
253 "{addr}: health check timed out after {probe_timeout:?}"
254 ))),
255 }
256 }
257 })
258 .collect::<FuturesUnordered<_>>();
259
260 let mut failures = Vec::new();
261 while let Some(probe_result) = probes.next().await {
262 if let Some(failure) = probe_result? {
263 failures.push(failure);
264 }
265 }
266
267 Ok(failures)
268 }
269
270 async fn get_random_active_frontend(
272 &self,
273 catalog: &str,
274 schema: &str,
275 ) -> Result<DatabaseWithPeer, Error> {
276 let Self::Distributed {
277 meta_client: _,
278 chnl_mgr,
279 query: _,
280 batch_opts,
281 } = self
282 else {
283 return UnexpectedSnafu {
284 reason: "Expect distributed mode",
285 }
286 .fail();
287 };
288
289 let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
290 interval.tick().await;
291 for retry in 0..batch_opts.experimental_grpc_max_retries {
292 let mut frontends = self.scan_for_frontend().await?;
293 frontends.shuffle(&mut rng());
295
296 for peer in frontends {
297 let addr = peer.addr.clone();
298 let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
299 let database = Database::new(catalog, schema, client);
300 let db = DatabaseWithPeer::new(database, peer);
301 match db.try_select_one().await {
302 Ok(_) => return Ok(db),
303 Err(e) => {
304 warn!(
305 "Failed to connect to frontend {} on retry={}: \n{e:?}",
306 addr, retry
307 );
308 }
309 }
310 }
311 interval.tick().await;
314 }
315
316 NoAvailableFrontendSnafu {
317 timeout: batch_opts.grpc_conn_timeout,
318 context: "No available frontend found that is able to process query",
319 }
320 .fail()
321 }
322
323 pub async fn create(
324 &self,
325 create: CreateTableExpr,
326 catalog: &str,
327 schema: &str,
328 ) -> Result<u32, Error> {
329 self.handle(
330 Request::Ddl(api::v1::DdlRequest {
331 expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())),
332 }),
333 catalog,
334 schema,
335 &mut None,
336 )
337 .await
338 .map_err(BoxedError::new)
339 .with_context(|_| CreateSinkTableSnafu {
340 create: create.clone(),
341 })
342 }
343
344 pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
346 match self {
347 FrontendClient::Distributed { .. } => {
348 let db = self.get_random_active_frontend(catalog, schema).await?;
349 db.database
350 .sql(sql)
351 .await
352 .map_err(BoxedError::new)
353 .context(ExternalSnafu)
354 }
355 FrontendClient::Standalone {
356 database_client, ..
357 } => {
358 let ctx = QueryContextBuilder::default()
359 .current_catalog(catalog.to_string())
360 .current_schema(schema.to_string())
361 .build();
362 let ctx = Arc::new(ctx);
363 {
364 let database_client = {
365 database_client
366 .handler
367 .lock()
368 .unwrap()
369 .as_ref()
370 .context(UnexpectedSnafu {
371 reason: "Standalone's frontend instance is not set",
372 })?
373 .upgrade()
374 .context(UnexpectedSnafu {
375 reason: "Failed to upgrade database client",
376 })?
377 };
378 let req = Request::Query(QueryRequest {
379 query: Some(Query::Sql(sql.to_string())),
380 });
381 database_client
382 .do_query(req, ctx)
383 .await
384 .map_err(BoxedError::new)
385 .context(ExternalSnafu)
386 }
387 }
388 }
389 }
390
391 pub(crate) async fn query_with_terminal_metrics(
392 &self,
393 catalog: &str,
394 schema: &str,
395 request: QueryRequest,
396 extensions: &[(&str, &str)],
397 peer_desc: &mut Option<PeerDesc>,
398 ) -> Result<OutputWithMetrics, Error> {
399 let flow_extensions = build_flow_extensions(extensions)?;
400 match self {
401 FrontendClient::Distributed {
402 query, batch_opts, ..
403 } => {
404 let query_parallelism = query.parallelism.to_string();
405 let hints = vec![
406 (QUERY_PARALLELISM_HINT, query_parallelism.as_str()),
407 (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
408 ];
409 let db = self.get_random_active_frontend(catalog, schema).await?;
410 *peer_desc = Some(PeerDesc::Dist {
411 peer: db.peer.clone(),
412 });
413 db.database
414 .query_with_terminal_metrics_and_flow_extensions(request, &hints, extensions)
415 .await
416 .map_err(BoxedError::new)
417 .context(ExternalSnafu)
418 }
419 FrontendClient::Standalone {
420 database_client,
421 query,
422 } => {
423 *peer_desc = Some(PeerDesc::Standalone);
424 let mut extensions_map = HashMap::from([(
425 QUERY_PARALLELISM_HINT.to_string(),
426 query.parallelism.to_string(),
427 )]);
428 for (key, value) in extensions {
429 extensions_map.insert((*key).to_string(), (*value).to_string());
430 }
431 let ctx = QueryContextBuilder::default()
432 .current_catalog(catalog.to_string())
433 .current_schema(schema.to_string())
434 .extensions(extensions_map)
435 .build();
436 let ctx = Arc::new(ctx);
437 let database_client = {
438 database_client
439 .handler
440 .lock()
441 .map_err(|e| {
442 UnexpectedSnafu {
443 reason: format!("Failed to lock database client: {e}"),
444 }
445 .build()
446 })?
447 .as_ref()
448 .context(UnexpectedSnafu {
449 reason: "Standalone's frontend instance is not set",
450 })?
451 .upgrade()
452 .context(UnexpectedSnafu {
453 reason: "Failed to upgrade database client",
454 })?
455 };
456 database_client
457 .do_query(Request::Query(request), ctx.clone())
458 .await
459 .map(|output| {
460 wrap_standalone_output_with_terminal_metrics(output, &flow_extensions, &ctx)
461 })
462 .map_err(BoxedError::new)
463 .context(ExternalSnafu)
464 }
465 }
466 }
467
468 pub(crate) async fn handle(
470 &self,
471 req: api::v1::greptime_request::Request,
472 catalog: &str,
473 schema: &str,
474 peer_desc: &mut Option<PeerDesc>,
475 ) -> Result<u32, Error> {
476 match self {
477 FrontendClient::Distributed {
478 query, batch_opts, ..
479 } => {
480 let db = self.get_random_active_frontend(catalog, schema).await?;
481
482 *peer_desc = Some(PeerDesc::Dist {
483 peer: db.peer.clone(),
484 });
485
486 db.database
487 .handle_with_retry(
488 req.clone(),
489 batch_opts.experimental_grpc_max_retries,
490 &[
491 (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
492 (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
493 ],
494 )
495 .await
496 .with_context(|_| InvalidRequestSnafu {
497 context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
498 })
499 }
500 FrontendClient::Standalone {
501 database_client,
502 query,
503 } => {
504 let ctx = QueryContextBuilder::default()
505 .current_catalog(catalog.to_string())
506 .current_schema(schema.to_string())
507 .extensions(HashMap::from([(
508 QUERY_PARALLELISM_HINT.to_string(),
509 query.parallelism.to_string(),
510 )]))
511 .build();
512 let ctx = Arc::new(ctx);
513 {
514 let database_client = {
515 database_client
516 .handler
517 .lock()
518 .unwrap()
519 .as_ref()
520 .context(UnexpectedSnafu {
521 reason: "Standalone's frontend instance is not set",
522 })?
523 .upgrade()
524 .context(UnexpectedSnafu {
525 reason: "Failed to upgrade database client",
526 })?
527 };
528 let resp: common_query::Output = database_client
529 .do_query(req, ctx)
530 .await
531 .map_err(BoxedError::new)
532 .context(ExternalSnafu)?;
533 match resp.data {
534 common_query::OutputData::AffectedRows(rows) => {
535 Ok(rows.try_into().map_err(|_| {
536 UnexpectedSnafu {
537 reason: format!("Failed to convert rows to u32: {}", rows),
538 }
539 .build()
540 })?)
541 }
542 _ => UnexpectedSnafu {
543 reason: "Unexpected output data",
544 }
545 .fail(),
546 }
547 }
548 }
549 }
550 }
551}
552
553fn build_flow_extensions(extensions: &[(&str, &str)]) -> Result<FlowQueryExtensions, Error> {
554 let flow_extensions = HashMap::from_iter(
555 extensions
556 .iter()
557 .map(|(key, value)| ((*key).to_string(), (*value).to_string())),
558 );
559 FlowQueryExtensions::parse_flow_extensions(&flow_extensions)
560 .map_err(BoxedError::new)
561 .context(ExternalSnafu)
562 .map(|extensions| extensions.unwrap_or_default())
563}
564
565fn wrap_standalone_output_with_terminal_metrics(
566 output: Output,
567 flow_extensions: &FlowQueryExtensions,
568 query_ctx: &QueryContextRef,
569) -> OutputWithMetrics {
570 let should_collect_region_watermark = flow_extensions.should_collect_region_watermark();
571 let terminal_metrics =
572 if should_collect_region_watermark && !matches!(&output.data, OutputData::Stream(_)) {
573 output
574 .meta
575 .plan
576 .clone()
577 .and_then(terminal_recordbatch_metrics_from_plan)
578 .or_else(|| terminal_recordbatch_metrics_from_snapshots(query_ctx))
579 } else {
580 None
581 };
582 let result = OutputWithMetrics::from_output(output);
583 if let Some(metrics) = terminal_metrics {
584 result.metrics.update(Some(metrics));
585 }
586 result
587}
588
589fn terminal_recordbatch_metrics_from_snapshots(
590 query_ctx: &QueryContextRef,
591) -> Option<RecordBatchMetrics> {
592 let mut region_watermarks = query_ctx
593 .snapshots()
594 .into_iter()
595 .map(|(region_id, watermark)| RegionWatermarkEntry {
596 region_id,
597 watermark: Some(watermark),
598 })
599 .collect::<Vec<_>>();
600 if region_watermarks.is_empty() {
601 return None;
602 }
603
604 region_watermarks.sort_by_key(|entry| entry.region_id);
605 Some(RecordBatchMetrics {
606 region_watermarks,
607 ..Default::default()
608 })
609}
610
611#[derive(Debug, Default, Clone)]
613pub(crate) enum PeerDesc {
614 #[default]
616 Unknown,
617 Dist {
619 peer: Peer,
621 },
622 Standalone,
624}
625
626impl std::fmt::Display for PeerDesc {
627 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
628 match self {
629 PeerDesc::Unknown => write!(f, "unknown"),
630 PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
631 PeerDesc::Standalone => write!(f, "standalone"),
632 }
633 }
634}
635
636#[cfg(test)]
637mod tests {
638 use std::pin::Pin;
639 use std::task::{Context, Poll};
640 use std::time::Duration;
641
642 use arrow_flight::flight_service_server::FlightServiceServer;
643 use arrow_flight::{FlightData, Ticket};
644 use common_query::{Output, OutputData};
645 use common_recordbatch::adapter::RecordBatchMetrics;
646 use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
647 use datatypes::prelude::{ConcreteDataType, VectorRef};
648 use datatypes::schema::{ColumnSchema, Schema};
649 use datatypes::vectors::Int32Vector;
650 use futures::StreamExt;
651 use servers::grpc::flight::{FlightCraft, FlightCraftWrapper, TonicStream};
652 use tokio::net::TcpListener;
653 use tokio::task::JoinHandle;
654 use tokio::time::timeout;
655 use tokio_stream::wrappers::TcpListenerStream;
656 use tonic::{Request as TonicRequest, Response as TonicResponse, Status};
657
658 use super::*;
659
660 #[derive(Debug)]
661 struct NoopHandler;
662
663 struct MockMetricsStream {
664 schema: datatypes::schema::SchemaRef,
665 batch: Option<RecordBatch>,
666 metrics: RecordBatchMetrics,
667 terminal_metrics_only: bool,
668 }
669
670 impl futures::Stream for MockMetricsStream {
671 type Item = common_recordbatch::error::Result<RecordBatch>;
672
673 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
674 Poll::Ready(self.batch.take().map(Ok))
675 }
676
677 fn size_hint(&self) -> (usize, Option<usize>) {
678 (
679 usize::from(self.batch.is_some()),
680 Some(usize::from(self.batch.is_some())),
681 )
682 }
683 }
684
685 impl RecordBatchStream for MockMetricsStream {
686 fn name(&self) -> &str {
687 "MockMetricsStream"
688 }
689
690 fn schema(&self) -> datatypes::schema::SchemaRef {
691 self.schema.clone()
692 }
693
694 fn output_ordering(&self) -> Option<&[OrderOption]> {
695 None
696 }
697
698 fn metrics(&self) -> Option<RecordBatchMetrics> {
699 if self.terminal_metrics_only && self.batch.is_some() {
700 return None;
701 }
702 Some(self.metrics.clone())
703 }
704 }
705
706 #[derive(Debug)]
707 struct MetricsHandler;
708
709 #[derive(Debug)]
710 struct ExtensionAwareHandler;
711
712 #[derive(Debug)]
713 struct SnapshotBindingHandler;
714
715 #[derive(Debug)]
716 struct RejectUnauthenticatedFlight;
717
718 #[derive(Debug)]
719 struct SlowFlight;
720
721 struct WaitForConcurrentFlight {
722 barrier: Arc<tokio::sync::Barrier>,
723 }
724
725 #[async_trait::async_trait]
726 impl GrpcQueryHandlerWithBoxedError for NoopHandler {
727 async fn do_query(
728 &self,
729 _query: Request,
730 _ctx: QueryContextRef,
731 ) -> std::result::Result<Output, BoxedError> {
732 Ok(Output::new_with_affected_rows(0))
733 }
734 }
735
736 #[async_trait::async_trait]
737 impl GrpcQueryHandlerWithBoxedError for MetricsHandler {
738 async fn do_query(
739 &self,
740 _query: Request,
741 _ctx: QueryContextRef,
742 ) -> std::result::Result<Output, BoxedError> {
743 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
744 "v",
745 ConcreteDataType::int32_datatype(),
746 false,
747 )]));
748 let batch = RecordBatch::new(
749 schema.clone(),
750 vec![Arc::new(Int32Vector::from_slice([1, 2])) as VectorRef],
751 )
752 .unwrap();
753 Ok(Output::new_with_stream(Box::pin(MockMetricsStream {
754 schema,
755 batch: Some(batch),
756 metrics: RecordBatchMetrics {
757 region_watermarks: vec![common_recordbatch::adapter::RegionWatermarkEntry {
758 region_id: 42,
759 watermark: Some(99),
760 }],
761 ..Default::default()
762 },
763 terminal_metrics_only: true,
764 })))
765 }
766 }
767
768 #[async_trait::async_trait]
769 impl GrpcQueryHandlerWithBoxedError for ExtensionAwareHandler {
770 async fn do_query(
771 &self,
772 _query: Request,
773 ctx: QueryContextRef,
774 ) -> std::result::Result<Output, BoxedError> {
775 assert_eq!(ctx.extension("flow.return_region_seq"), Some("true"));
776 Ok(Output::new_with_affected_rows(1))
777 }
778 }
779
780 #[async_trait::async_trait]
781 impl GrpcQueryHandlerWithBoxedError for SnapshotBindingHandler {
782 async fn do_query(
783 &self,
784 _query: Request,
785 ctx: QueryContextRef,
786 ) -> std::result::Result<Output, BoxedError> {
787 assert_eq!(ctx.extension("flow.return_region_seq"), Some("true"));
788 ctx.set_snapshot(42, 99);
789 Ok(Output::new_with_affected_rows(1))
790 }
791 }
792
793 #[async_trait::async_trait]
794 impl FlightCraft for RejectUnauthenticatedFlight {
795 async fn do_get(
796 &self,
797 _request: TonicRequest<Ticket>,
798 ) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
799 Err(Status::unauthenticated("auth failed"))
800 }
801 }
802
803 #[async_trait::async_trait]
804 impl FlightCraft for SlowFlight {
805 async fn do_get(
806 &self,
807 _request: TonicRequest<Ticket>,
808 ) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
809 tokio::time::sleep(Duration::from_secs(60)).await;
810 Err(Status::unavailable("slow response"))
811 }
812 }
813
814 #[async_trait::async_trait]
815 impl FlightCraft for WaitForConcurrentFlight {
816 async fn do_get(
817 &self,
818 _request: TonicRequest<Ticket>,
819 ) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
820 self.barrier.wait().await;
821 Err(Status::unavailable("probe started concurrently"))
822 }
823 }
824
825 async fn start_flight_server<T: FlightCraft>(handler: T) -> (String, JoinHandle<()>) {
826 let listener = TcpListener::bind("127.0.0.1:0")
827 .await
828 .expect("bind test flight server");
829 let addr = listener.local_addr().expect("local addr").to_string();
830 let server = tokio::spawn(async move {
831 tonic::transport::Server::builder()
832 .add_service(FlightServiceServer::new(FlightCraftWrapper(handler)))
833 .serve_with_incoming(TcpListenerStream::new(listener))
834 .await
835 .expect("serve test flight server");
836 });
837
838 (addr, server)
839 }
840
841 #[tokio::test]
842 async fn wait_initialized() {
843 let (client, handler_mut) =
844 FrontendClient::from_empty_grpc_handler(QueryOptions::default());
845
846 assert!(
847 timeout(Duration::from_millis(50), client.wait_initialized())
848 .await
849 .is_err()
850 );
851
852 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
853 handler_mut.set_handler(Arc::downgrade(&handler)).await;
854
855 timeout(Duration::from_secs(1), client.wait_initialized())
856 .await
857 .expect("wait_initialized should complete after handler is set");
858
859 timeout(Duration::from_millis(10), client.wait_initialized())
860 .await
861 .expect("wait_initialized should be a no-op once initialized");
862
863 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
864 let client =
865 FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
866 assert!(
867 timeout(Duration::from_millis(10), client.wait_initialized())
868 .await
869 .is_ok()
870 );
871
872 let meta_client = Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend));
873 let client = FrontendClient::from_meta_client(
874 meta_client,
875 QueryOptions::default(),
876 BatchingModeOptions::default(),
877 )
878 .unwrap();
879 assert!(
880 timeout(Duration::from_millis(10), client.wait_initialized())
881 .await
882 .is_ok()
883 );
884 }
885
886 #[tokio::test]
887 async fn test_query_with_terminal_metrics_tracks_watermark_in_standalone_mode() {
888 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(MetricsHandler);
889 let client =
890 FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
891 let mut peer_desc = None;
892
893 let result = client
894 .query_with_terminal_metrics(
895 "greptime",
896 "public",
897 QueryRequest {
898 query: Some(Query::Sql("select 1".to_string())),
899 },
900 &[],
901 &mut peer_desc,
902 )
903 .await
904 .unwrap();
905 assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
906
907 let terminal_metrics = result.metrics.clone();
908 assert!(!result.metrics.is_ready());
909 assert!(terminal_metrics.get().is_none());
910
911 let OutputData::Stream(mut stream) = result.output.data else {
912 panic!("expected stream output");
913 };
914 while stream.next().await.is_some() {}
915
916 assert!(terminal_metrics.is_ready());
917 assert_eq!(
918 terminal_metrics.region_watermark_map(),
919 Some(HashMap::from([(42_u64, 99_u64)]))
920 );
921 }
922
923 #[tokio::test]
924 async fn test_query_with_terminal_metrics_forwards_flow_extensions_in_standalone_mode() {
925 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(ExtensionAwareHandler);
926 let client =
927 FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
928 let mut peer_desc = None;
929
930 let result = client
931 .query_with_terminal_metrics(
932 "greptime",
933 "public",
934 QueryRequest {
935 query: Some(Query::Sql("insert into t select 1".to_string())),
936 },
937 &[("flow.return_region_seq", "true")],
938 &mut peer_desc,
939 )
940 .await
941 .unwrap();
942 assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
943
944 assert!(result.metrics.is_ready());
945 assert!(result.region_watermark_map().is_none());
946 }
947
948 #[tokio::test]
949 async fn test_query_with_terminal_metrics_uses_standalone_snapshot_bounds() {
950 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(SnapshotBindingHandler);
951 let client =
952 FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
953 let mut peer_desc = None;
954
955 let result = client
956 .query_with_terminal_metrics(
957 "greptime",
958 "public",
959 QueryRequest {
960 query: Some(Query::Sql("insert into t select * from src".to_string())),
961 },
962 &[("flow.return_region_seq", "true")],
963 &mut peer_desc,
964 )
965 .await
966 .unwrap();
967 assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
968
969 assert!(result.metrics.is_ready());
970 assert_eq!(
971 result.region_watermark_map(),
972 Some(HashMap::from([(42, 99)]))
973 );
974 }
975
976 #[tokio::test]
977 async fn test_query_with_terminal_metrics_rejects_invalid_flow_extensions() {
978 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
979 let client =
980 FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
981 let mut peer_desc = None;
982
983 let err = client
984 .query_with_terminal_metrics(
985 "greptime",
986 "public",
987 QueryRequest {
988 query: Some(Query::Sql("select 1".to_string())),
989 },
990 &[("flow.return_region_seq", "not-a-bool")],
991 &mut peer_desc,
992 )
993 .await
994 .unwrap_err();
995
996 assert!(format!("{err:?}").contains("Invalid value for flow.return_region_seq"));
997 }
998
999 #[tokio::test]
1000 async fn test_check_all_frontends_without_auth_fails_fast_on_unauthenticated_frontend() {
1001 let (addr, server) = start_flight_server(RejectUnauthenticatedFlight).await;
1002 let client = FrontendClient::from_meta_client(
1003 Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
1004 QueryOptions::default(),
1005 BatchingModeOptions::default(),
1006 )
1007 .unwrap();
1008
1009 let err = client
1010 .check_all_frontends_without_auth(&[Peer {
1011 id: 1,
1012 addr: addr.clone(),
1013 }])
1014 .await
1015 .unwrap_err();
1016 server.abort();
1017
1018 let Error::InvalidRequest {
1019 context, source, ..
1020 } = err
1021 else {
1022 panic!("expected InvalidRequest, got {err:?}");
1023 };
1024 assert!(context.contains(&addr));
1025 assert!(context.contains("rejected unauthenticated flownode probe"));
1026 assert_eq!(source.tonic_code(), Some(tonic::Code::Unauthenticated));
1027 }
1028
1029 #[tokio::test]
1030 async fn test_check_all_frontends_without_auth_uses_grpc_connection_timeout() {
1031 let (addr, server) = start_flight_server(SlowFlight).await;
1032 let client = FrontendClient::from_meta_client(
1033 Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
1034 QueryOptions::default(),
1035 BatchingModeOptions {
1036 grpc_conn_timeout: Duration::from_millis(50),
1037 ..Default::default()
1038 },
1039 )
1040 .unwrap();
1041
1042 let failures = client
1043 .check_all_frontends_without_auth(&[Peer {
1044 id: 1,
1045 addr: addr.clone(),
1046 }])
1047 .await
1048 .unwrap();
1049 server.abort();
1050
1051 assert_eq!(failures.len(), 1);
1052 assert!(failures[0].contains(&addr));
1053 assert!(failures[0].contains("health check timed out"));
1054 }
1055
1056 #[tokio::test]
1057 async fn test_check_all_frontends_without_auth_checks_frontends_concurrently() {
1058 let barrier = Arc::new(tokio::sync::Barrier::new(2));
1059 let (addr1, server1) = start_flight_server(WaitForConcurrentFlight {
1060 barrier: barrier.clone(),
1061 })
1062 .await;
1063 let (addr2, server2) = start_flight_server(WaitForConcurrentFlight { barrier }).await;
1064 let client = FrontendClient::from_meta_client(
1065 Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
1066 QueryOptions::default(),
1067 BatchingModeOptions {
1068 grpc_conn_timeout: Duration::from_millis(500),
1069 ..Default::default()
1070 },
1071 )
1072 .unwrap();
1073
1074 let failures = timeout(
1075 Duration::from_secs(2),
1076 client.check_all_frontends_without_auth(&[
1077 Peer {
1078 id: 1,
1079 addr: addr1.clone(),
1080 },
1081 Peer {
1082 id: 2,
1083 addr: addr2.clone(),
1084 },
1085 ]),
1086 )
1087 .await
1088 .expect("concurrent probes should complete before per-peer timeouts")
1089 .unwrap();
1090 server1.abort();
1091 server2.abort();
1092
1093 assert_eq!(failures.len(), 2);
1094 assert!(failures.iter().any(|failure| failure.contains(&addr1)));
1095 assert!(failures.iter().any(|failure| failure.contains(&addr2)));
1096 assert!(
1097 failures
1098 .iter()
1099 .all(|failure| !failure.contains("health check timed out")),
1100 "sequential probes would time out before both requests reach the barrier: {failures:?}"
1101 );
1102 }
1103}