1use std::collections::HashMap;
18use std::sync::{Arc, Mutex, RwLock, Weak};
19
20use api::v1::greptime_request::Request;
21use api::v1::query_request::Query;
22use api::v1::{CreateTableExpr, QueryRequest};
23use client::{Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, Database, OutputWithMetrics};
24use common_error::ext::BoxedError;
25use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
26use common_meta::peer::{Peer, PeerDiscovery};
27use common_query::{Output, OutputData};
28use common_telemetry::warn;
29use futures::stream::{FuturesUnordered, StreamExt};
30use meta_client::client::MetaClient;
31use query::datafusion::QUERY_PARALLELISM_HINT;
32use query::metrics::terminal_recordbatch_metrics_from_plan;
33use query::options::{FlowQueryExtensions, QueryOptions};
34use rand::rng;
35use rand::seq::SliceRandom;
36use servers::query_handler::grpc::GrpcQueryHandler;
37use session::context::{QueryContextBuilder, QueryContextRef};
38use session::hints::READ_PREFERENCE_HINT;
39use snafu::{OptionExt, ResultExt};
40use tokio::sync::SetOnce;
41
42use crate::Error;
43use crate::batching_mode::BatchingModeOptions;
44use crate::error::{
45 CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
46 NoAvailableFrontendSnafu, UnexpectedSnafu,
47};
48
49#[async_trait::async_trait]
53pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
54 async fn do_query(
55 &self,
56 query: Request,
57 ctx: QueryContextRef,
58 ) -> std::result::Result<Output, BoxedError>;
59}
60
61#[async_trait::async_trait]
63impl<T: GrpcQueryHandler + Send + Sync + 'static> GrpcQueryHandlerWithBoxedError for T {
64 async fn do_query(
65 &self,
66 query: Request,
67 ctx: QueryContextRef,
68 ) -> std::result::Result<Output, BoxedError> {
69 self.do_query(query, ctx).await.map_err(BoxedError::new)
70 }
71}
72
73#[derive(Debug, Clone)]
74pub struct HandlerMutable {
75 handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
76 is_initialized: Arc<SetOnce<()>>,
77}
78
79impl HandlerMutable {
80 pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
81 *self.handler.lock().unwrap() = Some(handler);
82 let _ = self.is_initialized.set(());
84 }
85}
86
87#[derive(Debug, Clone)]
91pub enum FrontendClient {
92 Distributed {
93 meta_client: Arc<MetaClient>,
94 chnl_mgr: ChannelManager,
95 query: QueryOptions,
96 batch_opts: BatchingModeOptions,
97 },
98 Standalone {
99 database_client: HandlerMutable,
102 query: QueryOptions,
103 },
104}
105
106impl FrontendClient {
107 pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
109 let is_initialized = Arc::new(SetOnce::new());
110 let handler = HandlerMutable {
111 handler: Arc::new(Mutex::new(None)),
112 is_initialized,
113 };
114 (
115 Self::Standalone {
116 database_client: handler.clone(),
117 query,
118 },
119 handler,
120 )
121 }
122
123 pub async fn wait_initialized(&self) {
125 if let FrontendClient::Standalone {
126 database_client, ..
127 } = self
128 {
129 database_client.is_initialized.wait().await;
130 }
131 }
132
133 pub fn from_meta_client(
134 meta_client: Arc<MetaClient>,
135 query: QueryOptions,
136 batch_opts: BatchingModeOptions,
137 ) -> Result<Self, Error> {
138 common_telemetry::info!("Frontend client build without auth");
139 Ok(Self::Distributed {
140 meta_client,
141 chnl_mgr: {
142 let cfg = ChannelConfig::new()
143 .connect_timeout(batch_opts.grpc_conn_timeout)
144 .timeout(Some(batch_opts.query_timeout));
145
146 let tls_config = load_client_tls_config(batch_opts.frontend_tls.clone())
147 .context(InvalidClientConfigSnafu)?;
148 ChannelManager::with_config(cfg, tls_config)
149 },
150 query,
151 batch_opts,
152 })
153 }
154
155 pub fn from_grpc_handler(
156 grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
157 query: QueryOptions,
158 ) -> Self {
159 let is_initialized = Arc::new(SetOnce::new_with(Some(())));
160 let handler = HandlerMutable {
161 handler: Arc::new(Mutex::new(Some(grpc_handler))),
162 is_initialized: is_initialized.clone(),
163 };
164
165 Self::Standalone {
166 database_client: handler,
167 query,
168 }
169 }
170}
171
172#[derive(Debug, Clone)]
173pub struct DatabaseWithPeer {
174 pub database: Database,
175 pub peer: Peer,
176}
177
178impl DatabaseWithPeer {
179 fn new(database: Database, peer: Peer) -> Self {
180 Self { database, peer }
181 }
182
183 async fn try_select_one(&self) -> Result<(), Error> {
185 let _ = self
187 .database
188 .sql("SELECT 1")
189 .await
190 .with_context(|_| InvalidRequestSnafu {
191 context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
192 })?;
193 Ok(())
194 }
195}
196
197impl FrontendClient {
198 pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<Peer>, Error> {
200 let Self::Distributed { meta_client, .. } = self else {
201 return Ok(vec![]);
202 };
203
204 meta_client
205 .active_frontends()
206 .await
207 .map(|nodes| nodes.into_iter().map(|node| node.peer).collect())
208 .map_err(BoxedError::new)
209 .context(ExternalSnafu)
210 }
211
212 pub(crate) async fn check_all_frontends_without_auth(
218 &self,
219 frontends: &[Peer],
220 ) -> Result<Vec<String>, Error> {
221 let Self::Distributed {
222 chnl_mgr,
223 batch_opts,
224 ..
225 } = self
226 else {
227 return Ok(vec![]);
228 };
229
230 let probe_timeout = batch_opts.grpc_conn_timeout;
231 let mut probes = frontends
232 .iter()
233 .map(|peer| {
234 let addr = peer.addr.clone();
235 let chnl_mgr = chnl_mgr.clone();
236
237 async move {
238 let client = Client::with_manager_and_urls(chnl_mgr, vec![addr.clone()]);
239 let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
240
241 match tokio::time::timeout(probe_timeout, database.sql("SELECT 1")).await {
242 Ok(Ok(_)) => Ok(None),
243 Ok(Err(err)) if err.tonic_code() == Some(tonic::Code::Unauthenticated) => {
244 Err(err).context(InvalidRequestSnafu {
245 context: format!(
246 "Frontend {addr} rejected unauthenticated flownode probe; ensure frontend internal_grpc is advertised to metasrv"
247 ),
248 })
249 }
250 Ok(Err(err)) => Ok(Some(format!("{addr}: {err}"))),
251 Err(_) => Ok(Some(format!(
252 "{addr}: health check timed out after {probe_timeout:?}"
253 ))),
254 }
255 }
256 })
257 .collect::<FuturesUnordered<_>>();
258
259 let mut failures = Vec::new();
260 while let Some(probe_result) = probes.next().await {
261 if let Some(failure) = probe_result? {
262 failures.push(failure);
263 }
264 }
265
266 Ok(failures)
267 }
268
269 async fn get_random_active_frontend(
271 &self,
272 catalog: &str,
273 schema: &str,
274 ) -> Result<DatabaseWithPeer, Error> {
275 let Self::Distributed {
276 meta_client: _,
277 chnl_mgr,
278 query: _,
279 batch_opts,
280 } = self
281 else {
282 return UnexpectedSnafu {
283 reason: "Expect distributed mode",
284 }
285 .fail();
286 };
287
288 let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
289 interval.tick().await;
290 for retry in 0..batch_opts.experimental_grpc_max_retries {
291 let mut frontends = self.scan_for_frontend().await?;
292 frontends.shuffle(&mut rng());
294
295 for peer in frontends {
296 let addr = peer.addr.clone();
297 let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
298 let database = Database::new(catalog, schema, client);
299 let db = DatabaseWithPeer::new(database, peer);
300 match db.try_select_one().await {
301 Ok(_) => return Ok(db),
302 Err(e) => {
303 warn!(
304 "Failed to connect to frontend {} on retry={}: \n{e:?}",
305 addr, retry
306 );
307 }
308 }
309 }
310 interval.tick().await;
313 }
314
315 NoAvailableFrontendSnafu {
316 timeout: batch_opts.grpc_conn_timeout,
317 context: "No available frontend found that is able to process query",
318 }
319 .fail()
320 }
321
322 pub async fn create(
323 &self,
324 create: CreateTableExpr,
325 catalog: &str,
326 schema: &str,
327 ) -> Result<u32, Error> {
328 self.handle(
329 Request::Ddl(api::v1::DdlRequest {
330 expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())),
331 }),
332 catalog,
333 schema,
334 &mut None,
335 )
336 .await
337 .map_err(BoxedError::new)
338 .with_context(|_| CreateSinkTableSnafu {
339 create: create.clone(),
340 })
341 }
342
343 pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
345 match self {
346 FrontendClient::Distributed { .. } => {
347 let db = self.get_random_active_frontend(catalog, schema).await?;
348 db.database
349 .sql(sql)
350 .await
351 .map_err(BoxedError::new)
352 .context(ExternalSnafu)
353 }
354 FrontendClient::Standalone {
355 database_client, ..
356 } => {
357 let ctx = QueryContextBuilder::default()
358 .current_catalog(catalog.to_string())
359 .current_schema(schema.to_string())
360 .build();
361 let ctx = Arc::new(ctx);
362 {
363 let database_client = {
364 database_client
365 .handler
366 .lock()
367 .unwrap()
368 .as_ref()
369 .context(UnexpectedSnafu {
370 reason: "Standalone's frontend instance is not set",
371 })?
372 .upgrade()
373 .context(UnexpectedSnafu {
374 reason: "Failed to upgrade database client",
375 })?
376 };
377 let req = Request::Query(QueryRequest {
378 query: Some(Query::Sql(sql.to_string())),
379 });
380 database_client
381 .do_query(req, ctx)
382 .await
383 .map_err(BoxedError::new)
384 .context(ExternalSnafu)
385 }
386 }
387 }
388 }
389
390 pub(crate) async fn query_with_terminal_metrics(
393 &self,
394 catalog: &str,
395 schema: &str,
396 request: QueryRequest,
397 extensions: &[(&str, &str)],
398 snapshot_seqs: &HashMap<u64, u64>,
399 peer_desc: &mut Option<PeerDesc>,
400 ) -> Result<OutputWithMetrics, Error> {
401 let flow_extensions = build_flow_extensions(extensions)?;
402 match self {
403 FrontendClient::Distributed {
404 query, batch_opts, ..
405 } => {
406 let query_parallelism = query.parallelism.to_string();
407 let hints = vec![
408 (QUERY_PARALLELISM_HINT, query_parallelism.as_str()),
409 (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
410 ];
411 let db = self.get_random_active_frontend(catalog, schema).await?;
412 *peer_desc = Some(PeerDesc::Dist {
413 peer: db.peer.clone(),
414 });
415 db.database
416 .query_with_terminal_metrics_and_flow_extensions(
417 request,
418 &hints,
419 extensions,
420 snapshot_seqs,
421 )
422 .await
423 .map_err(BoxedError::new)
424 .context(ExternalSnafu)
425 }
426 FrontendClient::Standalone {
427 database_client,
428 query,
429 } => {
430 *peer_desc = Some(PeerDesc::Standalone);
431 let mut extensions_map = HashMap::from([(
432 QUERY_PARALLELISM_HINT.to_string(),
433 query.parallelism.to_string(),
434 )]);
435 for (key, value) in extensions {
436 extensions_map.insert((*key).to_string(), (*value).to_string());
437 }
438 let ctx = QueryContextBuilder::default()
439 .current_catalog(catalog.to_string())
440 .current_schema(schema.to_string())
441 .extensions(extensions_map)
442 .snapshot_seqs(Arc::new(RwLock::new(snapshot_seqs.clone())))
443 .build();
444 let ctx = Arc::new(ctx);
445 let database_client = {
446 database_client
447 .handler
448 .lock()
449 .map_err(|e| {
450 UnexpectedSnafu {
451 reason: format!("Failed to lock database client: {e}"),
452 }
453 .build()
454 })?
455 .as_ref()
456 .context(UnexpectedSnafu {
457 reason: "Standalone's frontend instance is not set",
458 })?
459 .upgrade()
460 .context(UnexpectedSnafu {
461 reason: "Failed to upgrade database client",
462 })?
463 };
464 database_client
465 .do_query(Request::Query(request), ctx.clone())
466 .await
467 .map(|output| {
468 wrap_standalone_output_with_terminal_metrics(output, &flow_extensions)
469 })
470 .map_err(BoxedError::new)
471 .context(ExternalSnafu)
472 }
473 }
474 }
475
476 pub(crate) async fn handle(
478 &self,
479 req: api::v1::greptime_request::Request,
480 catalog: &str,
481 schema: &str,
482 peer_desc: &mut Option<PeerDesc>,
483 ) -> Result<u32, Error> {
484 match self {
485 FrontendClient::Distributed {
486 query, batch_opts, ..
487 } => {
488 let db = self.get_random_active_frontend(catalog, schema).await?;
489
490 *peer_desc = Some(PeerDesc::Dist {
491 peer: db.peer.clone(),
492 });
493
494 db.database
495 .handle_with_retry(
496 req.clone(),
497 batch_opts.experimental_grpc_max_retries,
498 &[
499 (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
500 (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
501 ],
502 )
503 .await
504 .with_context(|_| InvalidRequestSnafu {
505 context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
506 })
507 }
508 FrontendClient::Standalone {
509 database_client,
510 query,
511 } => {
512 let ctx = QueryContextBuilder::default()
513 .current_catalog(catalog.to_string())
514 .current_schema(schema.to_string())
515 .extensions(HashMap::from([(
516 QUERY_PARALLELISM_HINT.to_string(),
517 query.parallelism.to_string(),
518 )]))
519 .build();
520 let ctx = Arc::new(ctx);
521 {
522 let database_client = {
523 database_client
524 .handler
525 .lock()
526 .unwrap()
527 .as_ref()
528 .context(UnexpectedSnafu {
529 reason: "Standalone's frontend instance is not set",
530 })?
531 .upgrade()
532 .context(UnexpectedSnafu {
533 reason: "Failed to upgrade database client",
534 })?
535 };
536 let resp: common_query::Output = database_client
537 .do_query(req, ctx)
538 .await
539 .map_err(BoxedError::new)
540 .context(ExternalSnafu)?;
541 match resp.data {
542 common_query::OutputData::AffectedRows(rows) => {
543 Ok(rows.try_into().map_err(|_| {
544 UnexpectedSnafu {
545 reason: format!("Failed to convert rows to u32: {}", rows),
546 }
547 .build()
548 })?)
549 }
550 _ => UnexpectedSnafu {
551 reason: "Unexpected output data",
552 }
553 .fail(),
554 }
555 }
556 }
557 }
558 }
559}
560
561fn build_flow_extensions(extensions: &[(&str, &str)]) -> Result<FlowQueryExtensions, Error> {
562 let flow_extensions = HashMap::from_iter(
563 extensions
564 .iter()
565 .map(|(key, value)| ((*key).to_string(), (*value).to_string())),
566 );
567 FlowQueryExtensions::parse_flow_extensions(&flow_extensions)
568 .map_err(BoxedError::new)
569 .context(ExternalSnafu)
570 .map(|extensions| extensions.unwrap_or_default())
571}
572
573fn wrap_standalone_output_with_terminal_metrics(
574 output: Output,
575 flow_extensions: &FlowQueryExtensions,
576) -> OutputWithMetrics {
577 let should_collect_region_watermark = flow_extensions.should_collect_region_watermark();
578 let terminal_metrics =
579 if should_collect_region_watermark && !matches!(&output.data, OutputData::Stream(_)) {
580 output
581 .meta
582 .plan
583 .clone()
584 .and_then(terminal_recordbatch_metrics_from_plan)
585 } else {
586 None
587 };
588 let result = OutputWithMetrics::from_output(output);
589 if let Some(metrics) = terminal_metrics {
590 result.metrics.update(Some(metrics));
591 }
592 result
593}
594
595#[derive(Debug, Default, Clone)]
597pub(crate) enum PeerDesc {
598 #[default]
600 Unknown,
601 Dist {
603 peer: Peer,
605 },
606 Standalone,
608}
609
610impl std::fmt::Display for PeerDesc {
611 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
612 match self {
613 PeerDesc::Unknown => write!(f, "unknown"),
614 PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
615 PeerDesc::Standalone => write!(f, "standalone"),
616 }
617 }
618}
619
620#[cfg(test)]
621mod tests {
622 use std::pin::Pin;
623 use std::task::{Context, Poll};
624 use std::time::Duration;
625
626 use arrow_flight::flight_service_server::FlightServiceServer;
627 use arrow_flight::{FlightData, Ticket};
628 use common_query::{Output, OutputData};
629 use common_recordbatch::adapter::RecordBatchMetrics;
630 use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
631 use datatypes::prelude::{ConcreteDataType, VectorRef};
632 use datatypes::schema::{ColumnSchema, Schema};
633 use datatypes::vectors::Int32Vector;
634 use futures::StreamExt;
635 use servers::grpc::flight::{FlightCraft, FlightCraftWrapper, TonicStream};
636 use tokio::net::TcpListener;
637 use tokio::task::JoinHandle;
638 use tokio::time::timeout;
639 use tokio_stream::wrappers::TcpListenerStream;
640 use tonic::{Request as TonicRequest, Response as TonicResponse, Status};
641
642 use super::*;
643
644 #[derive(Debug)]
645 struct NoopHandler;
646
647 struct MockMetricsStream {
648 schema: datatypes::schema::SchemaRef,
649 batch: Option<RecordBatch>,
650 metrics: RecordBatchMetrics,
651 terminal_metrics_only: bool,
652 }
653
654 impl futures::Stream for MockMetricsStream {
655 type Item = common_recordbatch::error::Result<RecordBatch>;
656
657 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
658 Poll::Ready(self.batch.take().map(Ok))
659 }
660
661 fn size_hint(&self) -> (usize, Option<usize>) {
662 (
663 usize::from(self.batch.is_some()),
664 Some(usize::from(self.batch.is_some())),
665 )
666 }
667 }
668
669 impl RecordBatchStream for MockMetricsStream {
670 fn name(&self) -> &str {
671 "MockMetricsStream"
672 }
673
674 fn schema(&self) -> datatypes::schema::SchemaRef {
675 self.schema.clone()
676 }
677
678 fn output_ordering(&self) -> Option<&[OrderOption]> {
679 None
680 }
681
682 fn metrics(&self) -> Option<RecordBatchMetrics> {
683 if self.terminal_metrics_only && self.batch.is_some() {
684 return None;
685 }
686 Some(self.metrics.clone())
687 }
688 }
689
690 #[derive(Debug)]
691 struct MetricsHandler;
692
693 #[derive(Debug)]
694 struct ExtensionAwareHandler;
695
696 #[derive(Debug)]
697 struct SnapshotBindingHandler;
698
699 #[derive(Debug)]
700 struct RejectUnauthenticatedFlight;
701
702 #[derive(Debug)]
703 struct SlowFlight;
704
705 struct WaitForConcurrentFlight {
706 barrier: Arc<tokio::sync::Barrier>,
707 }
708
709 #[async_trait::async_trait]
710 impl GrpcQueryHandlerWithBoxedError for NoopHandler {
711 async fn do_query(
712 &self,
713 _query: Request,
714 _ctx: QueryContextRef,
715 ) -> std::result::Result<Output, BoxedError> {
716 Ok(Output::new_with_affected_rows(0))
717 }
718 }
719
720 #[async_trait::async_trait]
721 impl GrpcQueryHandlerWithBoxedError for MetricsHandler {
722 async fn do_query(
723 &self,
724 _query: Request,
725 _ctx: QueryContextRef,
726 ) -> std::result::Result<Output, BoxedError> {
727 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
728 "v",
729 ConcreteDataType::int32_datatype(),
730 false,
731 )]));
732 let batch = RecordBatch::new(
733 schema.clone(),
734 vec![Arc::new(Int32Vector::from_slice([1, 2])) as VectorRef],
735 )
736 .unwrap();
737 Ok(Output::new_with_stream(Box::pin(MockMetricsStream {
738 schema,
739 batch: Some(batch),
740 metrics: RecordBatchMetrics {
741 region_watermarks: vec![common_recordbatch::adapter::RegionWatermarkEntry {
742 region_id: 42,
743 watermark: Some(99),
744 }],
745 ..Default::default()
746 },
747 terminal_metrics_only: true,
748 })))
749 }
750 }
751
752 #[async_trait::async_trait]
753 impl GrpcQueryHandlerWithBoxedError for ExtensionAwareHandler {
754 async fn do_query(
755 &self,
756 _query: Request,
757 ctx: QueryContextRef,
758 ) -> std::result::Result<Output, BoxedError> {
759 assert_eq!(ctx.extension("flow.return_region_seq"), Some("true"));
760 Ok(Output::new_with_affected_rows(1))
761 }
762 }
763
764 #[async_trait::async_trait]
765 impl GrpcQueryHandlerWithBoxedError for SnapshotBindingHandler {
766 async fn do_query(
767 &self,
768 _query: Request,
769 ctx: QueryContextRef,
770 ) -> std::result::Result<Output, BoxedError> {
771 assert_eq!(ctx.extension("flow.return_region_seq"), Some("true"));
772 assert_eq!(ctx.get_snapshot(1), Some(10));
773 assert_eq!(ctx.get_snapshot(2), Some(20));
774 ctx.set_snapshot(42, 99);
775 Ok(Output::new_with_affected_rows(1))
776 }
777 }
778
779 #[async_trait::async_trait]
780 impl FlightCraft for RejectUnauthenticatedFlight {
781 async fn do_get(
782 &self,
783 _request: TonicRequest<Ticket>,
784 ) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
785 Err(Status::unauthenticated("auth failed"))
786 }
787 }
788
789 #[async_trait::async_trait]
790 impl FlightCraft for SlowFlight {
791 async fn do_get(
792 &self,
793 _request: TonicRequest<Ticket>,
794 ) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
795 tokio::time::sleep(Duration::from_secs(60)).await;
796 Err(Status::unavailable("slow response"))
797 }
798 }
799
800 #[async_trait::async_trait]
801 impl FlightCraft for WaitForConcurrentFlight {
802 async fn do_get(
803 &self,
804 _request: TonicRequest<Ticket>,
805 ) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
806 self.barrier.wait().await;
807 Err(Status::unavailable("probe started concurrently"))
808 }
809 }
810
811 async fn start_flight_server<T: FlightCraft>(handler: T) -> (String, JoinHandle<()>) {
812 let listener = TcpListener::bind("127.0.0.1:0")
813 .await
814 .expect("bind test flight server");
815 let addr = listener.local_addr().expect("local addr").to_string();
816 let server = tokio::spawn(async move {
817 tonic::transport::Server::builder()
818 .add_service(FlightServiceServer::new(FlightCraftWrapper(handler)))
819 .serve_with_incoming(TcpListenerStream::new(listener))
820 .await
821 .expect("serve test flight server");
822 });
823
824 (addr, server)
825 }
826
827 #[tokio::test]
828 async fn wait_initialized() {
829 let (client, handler_mut) =
830 FrontendClient::from_empty_grpc_handler(QueryOptions::default());
831
832 assert!(
833 timeout(Duration::from_millis(50), client.wait_initialized())
834 .await
835 .is_err()
836 );
837
838 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
839 handler_mut.set_handler(Arc::downgrade(&handler)).await;
840
841 timeout(Duration::from_secs(1), client.wait_initialized())
842 .await
843 .expect("wait_initialized should complete after handler is set");
844
845 timeout(Duration::from_millis(10), client.wait_initialized())
846 .await
847 .expect("wait_initialized should be a no-op once initialized");
848
849 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
850 let client =
851 FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
852 assert!(
853 timeout(Duration::from_millis(10), client.wait_initialized())
854 .await
855 .is_ok()
856 );
857
858 let meta_client = Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend));
859 let client = FrontendClient::from_meta_client(
860 meta_client,
861 QueryOptions::default(),
862 BatchingModeOptions::default(),
863 )
864 .unwrap();
865 assert!(
866 timeout(Duration::from_millis(10), client.wait_initialized())
867 .await
868 .is_ok()
869 );
870 }
871
872 #[tokio::test]
873 async fn test_query_with_terminal_metrics_tracks_watermark_in_standalone_mode() {
874 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(MetricsHandler);
875 let client =
876 FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
877 let mut peer_desc = None;
878
879 let result = client
880 .query_with_terminal_metrics(
881 "greptime",
882 "public",
883 QueryRequest {
884 query: Some(Query::Sql("select 1".to_string())),
885 },
886 &[],
887 &HashMap::new(),
888 &mut peer_desc,
889 )
890 .await
891 .unwrap();
892 assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
893
894 let terminal_metrics = result.metrics.clone();
895 assert!(!result.metrics.is_ready());
896 assert!(terminal_metrics.get().is_none());
897
898 let OutputData::Stream(mut stream) = result.output.data else {
899 panic!("expected stream output");
900 };
901 while stream.next().await.is_some() {}
902
903 assert!(terminal_metrics.is_ready());
904 assert_eq!(
905 terminal_metrics.region_watermark_map(),
906 Some(HashMap::from([(42_u64, 99_u64)]))
907 );
908 }
909
910 #[tokio::test]
911 async fn test_query_with_terminal_metrics_forwards_flow_extensions_in_standalone_mode() {
912 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(ExtensionAwareHandler);
913 let client =
914 FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
915 let mut peer_desc = None;
916
917 let result = client
918 .query_with_terminal_metrics(
919 "greptime",
920 "public",
921 QueryRequest {
922 query: Some(Query::Sql("insert into t select 1".to_string())),
923 },
924 &[("flow.return_region_seq", "true")],
925 &HashMap::new(),
926 &mut peer_desc,
927 )
928 .await
929 .unwrap();
930 assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
931
932 assert!(result.metrics.is_ready());
933 assert!(result.region_watermark_map().is_none());
934 }
935
936 #[tokio::test]
937 async fn test_query_with_terminal_metrics_uses_standalone_snapshot_bounds() {
938 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(SnapshotBindingHandler);
939 let client =
940 FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
941 let mut peer_desc = None;
942
943 let result = client
944 .query_with_terminal_metrics(
945 "greptime",
946 "public",
947 QueryRequest {
948 query: Some(Query::Sql("insert into t select * from src".to_string())),
949 },
950 &[("flow.return_region_seq", "true")],
951 &HashMap::from([(1, 10), (2, 20)]),
952 &mut peer_desc,
953 )
954 .await
955 .unwrap();
956 assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
957
958 assert!(result.metrics.is_ready());
959 assert_eq!(result.region_watermark_map(), None);
960 }
961
962 #[tokio::test]
963 async fn test_query_with_terminal_metrics_rejects_invalid_flow_extensions() {
964 let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
965 let client =
966 FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
967 let mut peer_desc = None;
968
969 let err = client
970 .query_with_terminal_metrics(
971 "greptime",
972 "public",
973 QueryRequest {
974 query: Some(Query::Sql("select 1".to_string())),
975 },
976 &[("flow.return_region_seq", "not-a-bool")],
977 &HashMap::new(),
978 &mut peer_desc,
979 )
980 .await
981 .unwrap_err();
982
983 assert!(format!("{err:?}").contains("Invalid value for flow.return_region_seq"));
984 }
985
986 #[tokio::test]
987 async fn test_check_all_frontends_without_auth_fails_fast_on_unauthenticated_frontend() {
988 let (addr, server) = start_flight_server(RejectUnauthenticatedFlight).await;
989 let client = FrontendClient::from_meta_client(
990 Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
991 QueryOptions::default(),
992 BatchingModeOptions::default(),
993 )
994 .unwrap();
995
996 let err = client
997 .check_all_frontends_without_auth(&[Peer {
998 id: 1,
999 addr: addr.clone(),
1000 }])
1001 .await
1002 .unwrap_err();
1003 server.abort();
1004
1005 let Error::InvalidRequest {
1006 context, source, ..
1007 } = err
1008 else {
1009 panic!("expected InvalidRequest, got {err:?}");
1010 };
1011 assert!(context.contains(&addr));
1012 assert!(context.contains("rejected unauthenticated flownode probe"));
1013 assert_eq!(source.tonic_code(), Some(tonic::Code::Unauthenticated));
1014 }
1015
1016 #[tokio::test]
1017 async fn test_check_all_frontends_without_auth_uses_grpc_connection_timeout() {
1018 let (addr, server) = start_flight_server(SlowFlight).await;
1019 let client = FrontendClient::from_meta_client(
1020 Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
1021 QueryOptions::default(),
1022 BatchingModeOptions {
1023 grpc_conn_timeout: Duration::from_millis(50),
1024 ..Default::default()
1025 },
1026 )
1027 .unwrap();
1028
1029 let failures = client
1030 .check_all_frontends_without_auth(&[Peer {
1031 id: 1,
1032 addr: addr.clone(),
1033 }])
1034 .await
1035 .unwrap();
1036 server.abort();
1037
1038 assert_eq!(failures.len(), 1);
1039 assert!(failures[0].contains(&addr));
1040 assert!(failures[0].contains("health check timed out"));
1041 }
1042
1043 #[tokio::test]
1044 async fn test_check_all_frontends_without_auth_checks_frontends_concurrently() {
1045 let barrier = Arc::new(tokio::sync::Barrier::new(2));
1046 let (addr1, server1) = start_flight_server(WaitForConcurrentFlight {
1047 barrier: barrier.clone(),
1048 })
1049 .await;
1050 let (addr2, server2) = start_flight_server(WaitForConcurrentFlight { barrier }).await;
1051 let client = FrontendClient::from_meta_client(
1052 Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
1053 QueryOptions::default(),
1054 BatchingModeOptions {
1055 grpc_conn_timeout: Duration::from_millis(500),
1056 ..Default::default()
1057 },
1058 )
1059 .unwrap();
1060
1061 let failures = timeout(
1062 Duration::from_secs(2),
1063 client.check_all_frontends_without_auth(&[
1064 Peer {
1065 id: 1,
1066 addr: addr1.clone(),
1067 },
1068 Peer {
1069 id: 2,
1070 addr: addr2.clone(),
1071 },
1072 ]),
1073 )
1074 .await
1075 .expect("concurrent probes should complete before per-peer timeouts")
1076 .unwrap();
1077 server1.abort();
1078 server2.abort();
1079
1080 assert_eq!(failures.len(), 2);
1081 assert!(failures.iter().any(|failure| failure.contains(&addr1)));
1082 assert!(failures.iter().any(|failure| failure.contains(&addr2)));
1083 assert!(
1084 failures
1085 .iter()
1086 .all(|failure| !failure.contains("health check timed out")),
1087 "sequential probes would time out before both requests reach the barrier: {failures:?}"
1088 );
1089 }
1090}