Skip to main content

flow/batching_mode/
frontend_client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
16
17use std::collections::HashMap;
18use std::sync::{Arc, Mutex, Weak};
19
20use api::v1::greptime_request::Request;
21use api::v1::query_request::Query;
22use api::v1::{CreateTableExpr, QueryRequest};
23use client::{Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, Database, OutputWithMetrics};
24use common_error::ext::BoxedError;
25use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
26use common_meta::peer::{Peer, PeerDiscovery};
27use common_query::{Output, OutputData};
28use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
29use common_telemetry::warn;
30use futures::stream::{FuturesUnordered, StreamExt};
31use meta_client::client::MetaClient;
32use query::datafusion::QUERY_PARALLELISM_HINT;
33use query::metrics::terminal_recordbatch_metrics_from_plan;
34use query::options::{FlowQueryExtensions, QueryOptions};
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use session::hints::READ_PREFERENCE_HINT;
40use snafu::{OptionExt, ResultExt};
41use tokio::sync::SetOnce;
42
43use crate::Error;
44use crate::batching_mode::BatchingModeOptions;
45use crate::error::{
46    CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
47    NoAvailableFrontendSnafu, UnexpectedSnafu,
48};
49
50/// Adapter trait for [`GrpcQueryHandler`] that boxes the underlying error into [`BoxedError`].
51///
52/// This is mainly used by flownode to invoke a frontend instance in standalone mode.
53#[async_trait::async_trait]
54pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
55    async fn do_query(
56        &self,
57        query: Request,
58        ctx: QueryContextRef,
59    ) -> std::result::Result<Output, BoxedError>;
60}
61
62/// auto impl
63#[async_trait::async_trait]
64impl<T: GrpcQueryHandler + Send + Sync + 'static> GrpcQueryHandlerWithBoxedError for T {
65    async fn do_query(
66        &self,
67        query: Request,
68        ctx: QueryContextRef,
69    ) -> std::result::Result<Output, BoxedError> {
70        self.do_query(query, ctx).await.map_err(BoxedError::new)
71    }
72}
73
74#[derive(Debug, Clone)]
75pub struct HandlerMutable {
76    handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
77    is_initialized: Arc<SetOnce<()>>,
78}
79
80impl HandlerMutable {
81    pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
82        *self.handler.lock().unwrap() = Some(handler);
83        // Ignore the error, as we allow the handler to be set multiple times.
84        let _ = self.is_initialized.set(());
85    }
86}
87
88/// A simple frontend client able to execute sql using grpc protocol
89///
90/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
91#[derive(Debug, Clone)]
92pub enum FrontendClient {
93    Distributed {
94        meta_client: Arc<MetaClient>,
95        chnl_mgr: ChannelManager,
96        query: QueryOptions,
97        batch_opts: BatchingModeOptions,
98    },
99    Standalone {
100        /// for the sake of simplicity still use grpc even in standalone mode
101        /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
102        database_client: HandlerMutable,
103        query: QueryOptions,
104    },
105}
106
107impl FrontendClient {
108    /// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
109    pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
110        let is_initialized = Arc::new(SetOnce::new());
111        let handler = HandlerMutable {
112            handler: Arc::new(Mutex::new(None)),
113            is_initialized,
114        };
115        (
116            Self::Standalone {
117                database_client: handler.clone(),
118                query,
119            },
120            handler,
121        )
122    }
123
124    /// Waits until the frontend client is initialized.
125    pub async fn wait_initialized(&self) {
126        if let FrontendClient::Standalone {
127            database_client, ..
128        } = self
129        {
130            database_client.is_initialized.wait().await;
131        }
132    }
133
134    pub fn from_meta_client(
135        meta_client: Arc<MetaClient>,
136        query: QueryOptions,
137        batch_opts: BatchingModeOptions,
138    ) -> Result<Self, Error> {
139        common_telemetry::info!("Frontend client build without auth");
140        Ok(Self::Distributed {
141            meta_client,
142            chnl_mgr: {
143                let cfg = ChannelConfig::new()
144                    .connect_timeout(batch_opts.grpc_conn_timeout)
145                    .timeout(Some(batch_opts.query_timeout));
146
147                let tls_config = load_client_tls_config(batch_opts.frontend_tls.clone())
148                    .context(InvalidClientConfigSnafu)?;
149                ChannelManager::with_config(cfg, tls_config)
150            },
151            query,
152            batch_opts,
153        })
154    }
155
156    pub fn from_grpc_handler(
157        grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
158        query: QueryOptions,
159    ) -> Self {
160        let is_initialized = Arc::new(SetOnce::new_with(Some(())));
161        let handler = HandlerMutable {
162            handler: Arc::new(Mutex::new(Some(grpc_handler))),
163            is_initialized: is_initialized.clone(),
164        };
165
166        Self::Standalone {
167            database_client: handler,
168            query,
169        }
170    }
171}
172
173#[derive(Debug, Clone)]
174pub struct DatabaseWithPeer {
175    pub database: Database,
176    pub peer: Peer,
177}
178
179impl DatabaseWithPeer {
180    fn new(database: Database, peer: Peer) -> Self {
181        Self { database, peer }
182    }
183
184    /// Try sending a "SELECT 1" to the database
185    async fn try_select_one(&self) -> Result<(), Error> {
186        // notice here use `sql` for `SELECT 1` return 1 row
187        let _ = self
188            .database
189            .sql("SELECT 1")
190            .await
191            .with_context(|_| InvalidRequestSnafu {
192                context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
193            })?;
194        Ok(())
195    }
196}
197
198impl FrontendClient {
199    /// scan for available frontend from metadata
200    pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<Peer>, Error> {
201        let Self::Distributed { meta_client, .. } = self else {
202            return Ok(vec![]);
203        };
204
205        meta_client
206            .active_frontends()
207            .await
208            .map(|nodes| nodes.into_iter().map(|node| node.peer).collect())
209            .map_err(BoxedError::new)
210            .context(ExternalSnafu)
211    }
212
213    /// Probes all discovered frontends without auth.
214    ///
215    /// Returns non-auth failures to allow callers to retry transient connectivity
216    /// errors. Authentication failures are returned immediately because they mean
217    /// a frontend advertised an auth-protected endpoint to flownodes.
218    pub(crate) async fn check_all_frontends_without_auth(
219        &self,
220        frontends: &[Peer],
221    ) -> Result<Vec<String>, Error> {
222        let Self::Distributed {
223            chnl_mgr,
224            batch_opts,
225            ..
226        } = self
227        else {
228            return Ok(vec![]);
229        };
230
231        let probe_timeout = batch_opts.grpc_conn_timeout;
232        let mut probes = frontends
233            .iter()
234            .map(|peer| {
235                let addr = peer.addr.clone();
236                let chnl_mgr = chnl_mgr.clone();
237
238                async move {
239                    let client = Client::with_manager_and_urls(chnl_mgr, vec![addr.clone()]);
240                    let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
241
242                    match tokio::time::timeout(probe_timeout, database.sql("SELECT 1")).await {
243                        Ok(Ok(_)) => Ok(None),
244                        Ok(Err(err)) if err.tonic_code() == Some(tonic::Code::Unauthenticated) => {
245                            Err(err).context(InvalidRequestSnafu {
246                                context: format!(
247                                    "Frontend {addr} rejected unauthenticated flownode probe; ensure frontend internal_grpc is advertised to metasrv"
248                                ),
249                            })
250                        }
251                        Ok(Err(err)) => Ok(Some(format!("{addr}: {err}"))),
252                        Err(_) => Ok(Some(format!(
253                            "{addr}: health check timed out after {probe_timeout:?}"
254                        ))),
255                    }
256                }
257            })
258            .collect::<FuturesUnordered<_>>();
259
260        let mut failures = Vec::new();
261        while let Some(probe_result) = probes.next().await {
262            if let Some(failure) = probe_result? {
263                failures.push(failure);
264            }
265        }
266
267        Ok(failures)
268    }
269
270    /// Get a frontend discovered by metasrv and verified with a query probe.
271    async fn get_random_active_frontend(
272        &self,
273        catalog: &str,
274        schema: &str,
275    ) -> Result<DatabaseWithPeer, Error> {
276        let Self::Distributed {
277            meta_client: _,
278            chnl_mgr,
279            query: _,
280            batch_opts,
281        } = self
282        else {
283            return UnexpectedSnafu {
284                reason: "Expect distributed mode",
285            }
286            .fail();
287        };
288
289        let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
290        interval.tick().await;
291        for retry in 0..batch_opts.experimental_grpc_max_retries {
292            let mut frontends = self.scan_for_frontend().await?;
293            // shuffle the frontends to avoid always pick the same one
294            frontends.shuffle(&mut rng());
295
296            for peer in frontends {
297                let addr = peer.addr.clone();
298                let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
299                let database = Database::new(catalog, schema, client);
300                let db = DatabaseWithPeer::new(database, peer);
301                match db.try_select_one().await {
302                    Ok(_) => return Ok(db),
303                    Err(e) => {
304                        warn!(
305                            "Failed to connect to frontend {} on retry={}: \n{e:?}",
306                            addr, retry
307                        );
308                    }
309                }
310            }
311            // no available frontend
312            // sleep and retry
313            interval.tick().await;
314        }
315
316        NoAvailableFrontendSnafu {
317            timeout: batch_opts.grpc_conn_timeout,
318            context: "No available frontend found that is able to process query",
319        }
320        .fail()
321    }
322
323    pub async fn create(
324        &self,
325        create: CreateTableExpr,
326        catalog: &str,
327        schema: &str,
328    ) -> Result<u32, Error> {
329        self.handle(
330            Request::Ddl(api::v1::DdlRequest {
331                expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())),
332            }),
333            catalog,
334            schema,
335            &mut None,
336        )
337        .await
338        .map_err(BoxedError::new)
339        .with_context(|_| CreateSinkTableSnafu {
340            create: create.clone(),
341        })
342    }
343
344    /// Execute a SQL statement on the frontend.
345    pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
346        match self {
347            FrontendClient::Distributed { .. } => {
348                let db = self.get_random_active_frontend(catalog, schema).await?;
349                db.database
350                    .sql(sql)
351                    .await
352                    .map_err(BoxedError::new)
353                    .context(ExternalSnafu)
354            }
355            FrontendClient::Standalone {
356                database_client, ..
357            } => {
358                let ctx = QueryContextBuilder::default()
359                    .current_catalog(catalog.to_string())
360                    .current_schema(schema.to_string())
361                    .build();
362                let ctx = Arc::new(ctx);
363                {
364                    let database_client = {
365                        database_client
366                            .handler
367                            .lock()
368                            .unwrap()
369                            .as_ref()
370                            .context(UnexpectedSnafu {
371                                reason: "Standalone's frontend instance is not set",
372                            })?
373                            .upgrade()
374                            .context(UnexpectedSnafu {
375                                reason: "Failed to upgrade database client",
376                            })?
377                    };
378                    let req = Request::Query(QueryRequest {
379                        query: Some(Query::Sql(sql.to_string())),
380                    });
381                    database_client
382                        .do_query(req, ctx)
383                        .await
384                        .map_err(BoxedError::new)
385                        .context(ExternalSnafu)
386                }
387            }
388        }
389    }
390
391    pub(crate) async fn query_with_terminal_metrics(
392        &self,
393        catalog: &str,
394        schema: &str,
395        request: QueryRequest,
396        extensions: &[(&str, &str)],
397        peer_desc: &mut Option<PeerDesc>,
398    ) -> Result<OutputWithMetrics, Error> {
399        let flow_extensions = build_flow_extensions(extensions)?;
400        match self {
401            FrontendClient::Distributed {
402                query, batch_opts, ..
403            } => {
404                let query_parallelism = query.parallelism.to_string();
405                let hints = vec![
406                    (QUERY_PARALLELISM_HINT, query_parallelism.as_str()),
407                    (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
408                ];
409                let db = self.get_random_active_frontend(catalog, schema).await?;
410                *peer_desc = Some(PeerDesc::Dist {
411                    peer: db.peer.clone(),
412                });
413                db.database
414                    .query_with_terminal_metrics_and_flow_extensions(request, &hints, extensions)
415                    .await
416                    .map_err(BoxedError::new)
417                    .context(ExternalSnafu)
418            }
419            FrontendClient::Standalone {
420                database_client,
421                query,
422            } => {
423                *peer_desc = Some(PeerDesc::Standalone);
424                let mut extensions_map = HashMap::from([(
425                    QUERY_PARALLELISM_HINT.to_string(),
426                    query.parallelism.to_string(),
427                )]);
428                for (key, value) in extensions {
429                    extensions_map.insert((*key).to_string(), (*value).to_string());
430                }
431                let ctx = QueryContextBuilder::default()
432                    .current_catalog(catalog.to_string())
433                    .current_schema(schema.to_string())
434                    .extensions(extensions_map)
435                    .build();
436                let ctx = Arc::new(ctx);
437                let database_client = {
438                    database_client
439                        .handler
440                        .lock()
441                        .map_err(|e| {
442                            UnexpectedSnafu {
443                                reason: format!("Failed to lock database client: {e}"),
444                            }
445                            .build()
446                        })?
447                        .as_ref()
448                        .context(UnexpectedSnafu {
449                            reason: "Standalone's frontend instance is not set",
450                        })?
451                        .upgrade()
452                        .context(UnexpectedSnafu {
453                            reason: "Failed to upgrade database client",
454                        })?
455                };
456                database_client
457                    .do_query(Request::Query(request), ctx.clone())
458                    .await
459                    .map(|output| {
460                        wrap_standalone_output_with_terminal_metrics(output, &flow_extensions, &ctx)
461                    })
462                    .map_err(BoxedError::new)
463                    .context(ExternalSnafu)
464            }
465        }
466    }
467
468    /// Handle a request to frontend
469    pub(crate) async fn handle(
470        &self,
471        req: api::v1::greptime_request::Request,
472        catalog: &str,
473        schema: &str,
474        peer_desc: &mut Option<PeerDesc>,
475    ) -> Result<u32, Error> {
476        match self {
477            FrontendClient::Distributed {
478                query, batch_opts, ..
479            } => {
480                let db = self.get_random_active_frontend(catalog, schema).await?;
481
482                *peer_desc = Some(PeerDesc::Dist {
483                    peer: db.peer.clone(),
484                });
485
486                db.database
487                    .handle_with_retry(
488                        req.clone(),
489                        batch_opts.experimental_grpc_max_retries,
490                        &[
491                            (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
492                            (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
493                        ],
494                    )
495                    .await
496                    .with_context(|_| InvalidRequestSnafu {
497                        context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
498                    })
499            }
500            FrontendClient::Standalone {
501                database_client,
502                query,
503            } => {
504                let ctx = QueryContextBuilder::default()
505                    .current_catalog(catalog.to_string())
506                    .current_schema(schema.to_string())
507                    .extensions(HashMap::from([(
508                        QUERY_PARALLELISM_HINT.to_string(),
509                        query.parallelism.to_string(),
510                    )]))
511                    .build();
512                let ctx = Arc::new(ctx);
513                {
514                    let database_client = {
515                        database_client
516                            .handler
517                            .lock()
518                            .unwrap()
519                            .as_ref()
520                            .context(UnexpectedSnafu {
521                                reason: "Standalone's frontend instance is not set",
522                            })?
523                            .upgrade()
524                            .context(UnexpectedSnafu {
525                                reason: "Failed to upgrade database client",
526                            })?
527                    };
528                    let resp: common_query::Output = database_client
529                        .do_query(req, ctx)
530                        .await
531                        .map_err(BoxedError::new)
532                        .context(ExternalSnafu)?;
533                    match resp.data {
534                        common_query::OutputData::AffectedRows(rows) => {
535                            Ok(rows.try_into().map_err(|_| {
536                                UnexpectedSnafu {
537                                    reason: format!("Failed to convert rows to u32: {}", rows),
538                                }
539                                .build()
540                            })?)
541                        }
542                        _ => UnexpectedSnafu {
543                            reason: "Unexpected output data",
544                        }
545                        .fail(),
546                    }
547                }
548            }
549        }
550    }
551}
552
553fn build_flow_extensions(extensions: &[(&str, &str)]) -> Result<FlowQueryExtensions, Error> {
554    let flow_extensions = HashMap::from_iter(
555        extensions
556            .iter()
557            .map(|(key, value)| ((*key).to_string(), (*value).to_string())),
558    );
559    FlowQueryExtensions::parse_flow_extensions(&flow_extensions)
560        .map_err(BoxedError::new)
561        .context(ExternalSnafu)
562        .map(|extensions| extensions.unwrap_or_default())
563}
564
565fn wrap_standalone_output_with_terminal_metrics(
566    output: Output,
567    flow_extensions: &FlowQueryExtensions,
568    query_ctx: &QueryContextRef,
569) -> OutputWithMetrics {
570    let should_collect_region_watermark = flow_extensions.should_collect_region_watermark();
571    let terminal_metrics =
572        if should_collect_region_watermark && !matches!(&output.data, OutputData::Stream(_)) {
573            output
574                .meta
575                .plan
576                .clone()
577                .and_then(terminal_recordbatch_metrics_from_plan)
578                .or_else(|| terminal_recordbatch_metrics_from_snapshots(query_ctx))
579        } else {
580            None
581        };
582    let result = OutputWithMetrics::from_output(output);
583    if let Some(metrics) = terminal_metrics {
584        result.metrics.update(Some(metrics));
585    }
586    result
587}
588
589fn terminal_recordbatch_metrics_from_snapshots(
590    query_ctx: &QueryContextRef,
591) -> Option<RecordBatchMetrics> {
592    let mut region_watermarks = query_ctx
593        .snapshots()
594        .into_iter()
595        .map(|(region_id, watermark)| RegionWatermarkEntry {
596            region_id,
597            watermark: Some(watermark),
598        })
599        .collect::<Vec<_>>();
600    if region_watermarks.is_empty() {
601        return None;
602    }
603
604    region_watermarks.sort_by_key(|entry| entry.region_id);
605    Some(RecordBatchMetrics {
606        region_watermarks,
607        ..Default::default()
608    })
609}
610
611/// Describe a peer of frontend
612#[derive(Debug, Default, Clone)]
613pub(crate) enum PeerDesc {
614    /// The query failed before a frontend peer was selected.
615    #[default]
616    Unknown,
617    /// Distributed mode's frontend peer address
618    Dist {
619        /// frontend peer address
620        peer: Peer,
621    },
622    /// Standalone mode
623    Standalone,
624}
625
626impl std::fmt::Display for PeerDesc {
627    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
628        match self {
629            PeerDesc::Unknown => write!(f, "unknown"),
630            PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
631            PeerDesc::Standalone => write!(f, "standalone"),
632        }
633    }
634}
635
636#[cfg(test)]
637mod tests {
638    use std::pin::Pin;
639    use std::task::{Context, Poll};
640    use std::time::Duration;
641
642    use arrow_flight::flight_service_server::FlightServiceServer;
643    use arrow_flight::{FlightData, Ticket};
644    use common_query::{Output, OutputData};
645    use common_recordbatch::adapter::RecordBatchMetrics;
646    use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
647    use datatypes::prelude::{ConcreteDataType, VectorRef};
648    use datatypes::schema::{ColumnSchema, Schema};
649    use datatypes::vectors::Int32Vector;
650    use futures::StreamExt;
651    use servers::grpc::flight::{FlightCraft, FlightCraftWrapper, TonicStream};
652    use tokio::net::TcpListener;
653    use tokio::task::JoinHandle;
654    use tokio::time::timeout;
655    use tokio_stream::wrappers::TcpListenerStream;
656    use tonic::{Request as TonicRequest, Response as TonicResponse, Status};
657
658    use super::*;
659
660    #[derive(Debug)]
661    struct NoopHandler;
662
663    struct MockMetricsStream {
664        schema: datatypes::schema::SchemaRef,
665        batch: Option<RecordBatch>,
666        metrics: RecordBatchMetrics,
667        terminal_metrics_only: bool,
668    }
669
670    impl futures::Stream for MockMetricsStream {
671        type Item = common_recordbatch::error::Result<RecordBatch>;
672
673        fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
674            Poll::Ready(self.batch.take().map(Ok))
675        }
676
677        fn size_hint(&self) -> (usize, Option<usize>) {
678            (
679                usize::from(self.batch.is_some()),
680                Some(usize::from(self.batch.is_some())),
681            )
682        }
683    }
684
685    impl RecordBatchStream for MockMetricsStream {
686        fn name(&self) -> &str {
687            "MockMetricsStream"
688        }
689
690        fn schema(&self) -> datatypes::schema::SchemaRef {
691            self.schema.clone()
692        }
693
694        fn output_ordering(&self) -> Option<&[OrderOption]> {
695            None
696        }
697
698        fn metrics(&self) -> Option<RecordBatchMetrics> {
699            if self.terminal_metrics_only && self.batch.is_some() {
700                return None;
701            }
702            Some(self.metrics.clone())
703        }
704    }
705
706    #[derive(Debug)]
707    struct MetricsHandler;
708
709    #[derive(Debug)]
710    struct ExtensionAwareHandler;
711
712    #[derive(Debug)]
713    struct SnapshotBindingHandler;
714
715    #[derive(Debug)]
716    struct RejectUnauthenticatedFlight;
717
718    #[derive(Debug)]
719    struct SlowFlight;
720
721    struct WaitForConcurrentFlight {
722        barrier: Arc<tokio::sync::Barrier>,
723    }
724
725    #[async_trait::async_trait]
726    impl GrpcQueryHandlerWithBoxedError for NoopHandler {
727        async fn do_query(
728            &self,
729            _query: Request,
730            _ctx: QueryContextRef,
731        ) -> std::result::Result<Output, BoxedError> {
732            Ok(Output::new_with_affected_rows(0))
733        }
734    }
735
736    #[async_trait::async_trait]
737    impl GrpcQueryHandlerWithBoxedError for MetricsHandler {
738        async fn do_query(
739            &self,
740            _query: Request,
741            _ctx: QueryContextRef,
742        ) -> std::result::Result<Output, BoxedError> {
743            let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
744                "v",
745                ConcreteDataType::int32_datatype(),
746                false,
747            )]));
748            let batch = RecordBatch::new(
749                schema.clone(),
750                vec![Arc::new(Int32Vector::from_slice([1, 2])) as VectorRef],
751            )
752            .unwrap();
753            Ok(Output::new_with_stream(Box::pin(MockMetricsStream {
754                schema,
755                batch: Some(batch),
756                metrics: RecordBatchMetrics {
757                    region_watermarks: vec![common_recordbatch::adapter::RegionWatermarkEntry {
758                        region_id: 42,
759                        watermark: Some(99),
760                    }],
761                    ..Default::default()
762                },
763                terminal_metrics_only: true,
764            })))
765        }
766    }
767
768    #[async_trait::async_trait]
769    impl GrpcQueryHandlerWithBoxedError for ExtensionAwareHandler {
770        async fn do_query(
771            &self,
772            _query: Request,
773            ctx: QueryContextRef,
774        ) -> std::result::Result<Output, BoxedError> {
775            assert_eq!(ctx.extension("flow.return_region_seq"), Some("true"));
776            Ok(Output::new_with_affected_rows(1))
777        }
778    }
779
780    #[async_trait::async_trait]
781    impl GrpcQueryHandlerWithBoxedError for SnapshotBindingHandler {
782        async fn do_query(
783            &self,
784            _query: Request,
785            ctx: QueryContextRef,
786        ) -> std::result::Result<Output, BoxedError> {
787            assert_eq!(ctx.extension("flow.return_region_seq"), Some("true"));
788            ctx.set_snapshot(42, 99);
789            Ok(Output::new_with_affected_rows(1))
790        }
791    }
792
793    #[async_trait::async_trait]
794    impl FlightCraft for RejectUnauthenticatedFlight {
795        async fn do_get(
796            &self,
797            _request: TonicRequest<Ticket>,
798        ) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
799            Err(Status::unauthenticated("auth failed"))
800        }
801    }
802
803    #[async_trait::async_trait]
804    impl FlightCraft for SlowFlight {
805        async fn do_get(
806            &self,
807            _request: TonicRequest<Ticket>,
808        ) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
809            tokio::time::sleep(Duration::from_secs(60)).await;
810            Err(Status::unavailable("slow response"))
811        }
812    }
813
814    #[async_trait::async_trait]
815    impl FlightCraft for WaitForConcurrentFlight {
816        async fn do_get(
817            &self,
818            _request: TonicRequest<Ticket>,
819        ) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
820            self.barrier.wait().await;
821            Err(Status::unavailable("probe started concurrently"))
822        }
823    }
824
825    async fn start_flight_server<T: FlightCraft>(handler: T) -> (String, JoinHandle<()>) {
826        let listener = TcpListener::bind("127.0.0.1:0")
827            .await
828            .expect("bind test flight server");
829        let addr = listener.local_addr().expect("local addr").to_string();
830        let server = tokio::spawn(async move {
831            tonic::transport::Server::builder()
832                .add_service(FlightServiceServer::new(FlightCraftWrapper(handler)))
833                .serve_with_incoming(TcpListenerStream::new(listener))
834                .await
835                .expect("serve test flight server");
836        });
837
838        (addr, server)
839    }
840
841    #[tokio::test]
842    async fn wait_initialized() {
843        let (client, handler_mut) =
844            FrontendClient::from_empty_grpc_handler(QueryOptions::default());
845
846        assert!(
847            timeout(Duration::from_millis(50), client.wait_initialized())
848                .await
849                .is_err()
850        );
851
852        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
853        handler_mut.set_handler(Arc::downgrade(&handler)).await;
854
855        timeout(Duration::from_secs(1), client.wait_initialized())
856            .await
857            .expect("wait_initialized should complete after handler is set");
858
859        timeout(Duration::from_millis(10), client.wait_initialized())
860            .await
861            .expect("wait_initialized should be a no-op once initialized");
862
863        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
864        let client =
865            FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
866        assert!(
867            timeout(Duration::from_millis(10), client.wait_initialized())
868                .await
869                .is_ok()
870        );
871
872        let meta_client = Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend));
873        let client = FrontendClient::from_meta_client(
874            meta_client,
875            QueryOptions::default(),
876            BatchingModeOptions::default(),
877        )
878        .unwrap();
879        assert!(
880            timeout(Duration::from_millis(10), client.wait_initialized())
881                .await
882                .is_ok()
883        );
884    }
885
886    #[tokio::test]
887    async fn test_query_with_terminal_metrics_tracks_watermark_in_standalone_mode() {
888        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(MetricsHandler);
889        let client =
890            FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
891        let mut peer_desc = None;
892
893        let result = client
894            .query_with_terminal_metrics(
895                "greptime",
896                "public",
897                QueryRequest {
898                    query: Some(Query::Sql("select 1".to_string())),
899                },
900                &[],
901                &mut peer_desc,
902            )
903            .await
904            .unwrap();
905        assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
906
907        let terminal_metrics = result.metrics.clone();
908        assert!(!result.metrics.is_ready());
909        assert!(terminal_metrics.get().is_none());
910
911        let OutputData::Stream(mut stream) = result.output.data else {
912            panic!("expected stream output");
913        };
914        while stream.next().await.is_some() {}
915
916        assert!(terminal_metrics.is_ready());
917        assert_eq!(
918            terminal_metrics.region_watermark_map(),
919            Some(HashMap::from([(42_u64, 99_u64)]))
920        );
921    }
922
923    #[tokio::test]
924    async fn test_query_with_terminal_metrics_forwards_flow_extensions_in_standalone_mode() {
925        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(ExtensionAwareHandler);
926        let client =
927            FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
928        let mut peer_desc = None;
929
930        let result = client
931            .query_with_terminal_metrics(
932                "greptime",
933                "public",
934                QueryRequest {
935                    query: Some(Query::Sql("insert into t select 1".to_string())),
936                },
937                &[("flow.return_region_seq", "true")],
938                &mut peer_desc,
939            )
940            .await
941            .unwrap();
942        assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
943
944        assert!(result.metrics.is_ready());
945        assert!(result.region_watermark_map().is_none());
946    }
947
948    #[tokio::test]
949    async fn test_query_with_terminal_metrics_uses_standalone_snapshot_bounds() {
950        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(SnapshotBindingHandler);
951        let client =
952            FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
953        let mut peer_desc = None;
954
955        let result = client
956            .query_with_terminal_metrics(
957                "greptime",
958                "public",
959                QueryRequest {
960                    query: Some(Query::Sql("insert into t select * from src".to_string())),
961                },
962                &[("flow.return_region_seq", "true")],
963                &mut peer_desc,
964            )
965            .await
966            .unwrap();
967        assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
968
969        assert!(result.metrics.is_ready());
970        assert_eq!(
971            result.region_watermark_map(),
972            Some(HashMap::from([(42, 99)]))
973        );
974    }
975
976    #[tokio::test]
977    async fn test_query_with_terminal_metrics_rejects_invalid_flow_extensions() {
978        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
979        let client =
980            FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
981        let mut peer_desc = None;
982
983        let err = client
984            .query_with_terminal_metrics(
985                "greptime",
986                "public",
987                QueryRequest {
988                    query: Some(Query::Sql("select 1".to_string())),
989                },
990                &[("flow.return_region_seq", "not-a-bool")],
991                &mut peer_desc,
992            )
993            .await
994            .unwrap_err();
995
996        assert!(format!("{err:?}").contains("Invalid value for flow.return_region_seq"));
997    }
998
999    #[tokio::test]
1000    async fn test_check_all_frontends_without_auth_fails_fast_on_unauthenticated_frontend() {
1001        let (addr, server) = start_flight_server(RejectUnauthenticatedFlight).await;
1002        let client = FrontendClient::from_meta_client(
1003            Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
1004            QueryOptions::default(),
1005            BatchingModeOptions::default(),
1006        )
1007        .unwrap();
1008
1009        let err = client
1010            .check_all_frontends_without_auth(&[Peer {
1011                id: 1,
1012                addr: addr.clone(),
1013            }])
1014            .await
1015            .unwrap_err();
1016        server.abort();
1017
1018        let Error::InvalidRequest {
1019            context, source, ..
1020        } = err
1021        else {
1022            panic!("expected InvalidRequest, got {err:?}");
1023        };
1024        assert!(context.contains(&addr));
1025        assert!(context.contains("rejected unauthenticated flownode probe"));
1026        assert_eq!(source.tonic_code(), Some(tonic::Code::Unauthenticated));
1027    }
1028
1029    #[tokio::test]
1030    async fn test_check_all_frontends_without_auth_uses_grpc_connection_timeout() {
1031        let (addr, server) = start_flight_server(SlowFlight).await;
1032        let client = FrontendClient::from_meta_client(
1033            Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
1034            QueryOptions::default(),
1035            BatchingModeOptions {
1036                grpc_conn_timeout: Duration::from_millis(50),
1037                ..Default::default()
1038            },
1039        )
1040        .unwrap();
1041
1042        let failures = client
1043            .check_all_frontends_without_auth(&[Peer {
1044                id: 1,
1045                addr: addr.clone(),
1046            }])
1047            .await
1048            .unwrap();
1049        server.abort();
1050
1051        assert_eq!(failures.len(), 1);
1052        assert!(failures[0].contains(&addr));
1053        assert!(failures[0].contains("health check timed out"));
1054    }
1055
1056    #[tokio::test]
1057    async fn test_check_all_frontends_without_auth_checks_frontends_concurrently() {
1058        let barrier = Arc::new(tokio::sync::Barrier::new(2));
1059        let (addr1, server1) = start_flight_server(WaitForConcurrentFlight {
1060            barrier: barrier.clone(),
1061        })
1062        .await;
1063        let (addr2, server2) = start_flight_server(WaitForConcurrentFlight { barrier }).await;
1064        let client = FrontendClient::from_meta_client(
1065            Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
1066            QueryOptions::default(),
1067            BatchingModeOptions {
1068                grpc_conn_timeout: Duration::from_millis(500),
1069                ..Default::default()
1070            },
1071        )
1072        .unwrap();
1073
1074        let failures = timeout(
1075            Duration::from_secs(2),
1076            client.check_all_frontends_without_auth(&[
1077                Peer {
1078                    id: 1,
1079                    addr: addr1.clone(),
1080                },
1081                Peer {
1082                    id: 2,
1083                    addr: addr2.clone(),
1084                },
1085            ]),
1086        )
1087        .await
1088        .expect("concurrent probes should complete before per-peer timeouts")
1089        .unwrap();
1090        server1.abort();
1091        server2.abort();
1092
1093        assert_eq!(failures.len(), 2);
1094        assert!(failures.iter().any(|failure| failure.contains(&addr1)));
1095        assert!(failures.iter().any(|failure| failure.contains(&addr2)));
1096        assert!(
1097            failures
1098                .iter()
1099                .all(|failure| !failure.contains("health check timed out")),
1100            "sequential probes would time out before both requests reach the barrier: {failures:?}"
1101        );
1102    }
1103}