Skip to main content

flow/batching_mode/
frontend_client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
16
17use std::collections::HashMap;
18use std::sync::{Arc, Mutex, RwLock, Weak};
19
20use api::v1::greptime_request::Request;
21use api::v1::query_request::Query;
22use api::v1::{CreateTableExpr, QueryRequest};
23use client::{Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, Database, OutputWithMetrics};
24use common_error::ext::BoxedError;
25use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
26use common_meta::peer::{Peer, PeerDiscovery};
27use common_query::{Output, OutputData};
28use common_telemetry::warn;
29use futures::stream::{FuturesUnordered, StreamExt};
30use meta_client::client::MetaClient;
31use query::datafusion::QUERY_PARALLELISM_HINT;
32use query::metrics::terminal_recordbatch_metrics_from_plan;
33use query::options::{FlowQueryExtensions, QueryOptions};
34use rand::rng;
35use rand::seq::SliceRandom;
36use servers::query_handler::grpc::GrpcQueryHandler;
37use session::context::{QueryContextBuilder, QueryContextRef};
38use session::hints::READ_PREFERENCE_HINT;
39use snafu::{OptionExt, ResultExt};
40use tokio::sync::SetOnce;
41
42use crate::Error;
43use crate::batching_mode::BatchingModeOptions;
44use crate::error::{
45    CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
46    NoAvailableFrontendSnafu, UnexpectedSnafu,
47};
48
49/// Adapter trait for [`GrpcQueryHandler`] that boxes the underlying error into [`BoxedError`].
50///
51/// This is mainly used by flownode to invoke a frontend instance in standalone mode.
52#[async_trait::async_trait]
53pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
54    async fn do_query(
55        &self,
56        query: Request,
57        ctx: QueryContextRef,
58    ) -> std::result::Result<Output, BoxedError>;
59}
60
61/// auto impl
62#[async_trait::async_trait]
63impl<T: GrpcQueryHandler + Send + Sync + 'static> GrpcQueryHandlerWithBoxedError for T {
64    async fn do_query(
65        &self,
66        query: Request,
67        ctx: QueryContextRef,
68    ) -> std::result::Result<Output, BoxedError> {
69        self.do_query(query, ctx).await.map_err(BoxedError::new)
70    }
71}
72
73#[derive(Debug, Clone)]
74pub struct HandlerMutable {
75    handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
76    is_initialized: Arc<SetOnce<()>>,
77}
78
79impl HandlerMutable {
80    pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
81        *self.handler.lock().unwrap() = Some(handler);
82        // Ignore the error, as we allow the handler to be set multiple times.
83        let _ = self.is_initialized.set(());
84    }
85}
86
87/// A simple frontend client able to execute sql using grpc protocol
88///
89/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
90#[derive(Debug, Clone)]
91pub enum FrontendClient {
92    Distributed {
93        meta_client: Arc<MetaClient>,
94        chnl_mgr: ChannelManager,
95        query: QueryOptions,
96        batch_opts: BatchingModeOptions,
97    },
98    Standalone {
99        /// for the sake of simplicity still use grpc even in standalone mode
100        /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
101        database_client: HandlerMutable,
102        query: QueryOptions,
103    },
104}
105
106impl FrontendClient {
107    /// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
108    pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
109        let is_initialized = Arc::new(SetOnce::new());
110        let handler = HandlerMutable {
111            handler: Arc::new(Mutex::new(None)),
112            is_initialized,
113        };
114        (
115            Self::Standalone {
116                database_client: handler.clone(),
117                query,
118            },
119            handler,
120        )
121    }
122
123    /// Waits until the frontend client is initialized.
124    pub async fn wait_initialized(&self) {
125        if let FrontendClient::Standalone {
126            database_client, ..
127        } = self
128        {
129            database_client.is_initialized.wait().await;
130        }
131    }
132
133    pub fn from_meta_client(
134        meta_client: Arc<MetaClient>,
135        query: QueryOptions,
136        batch_opts: BatchingModeOptions,
137    ) -> Result<Self, Error> {
138        common_telemetry::info!("Frontend client build without auth");
139        Ok(Self::Distributed {
140            meta_client,
141            chnl_mgr: {
142                let cfg = ChannelConfig::new()
143                    .connect_timeout(batch_opts.grpc_conn_timeout)
144                    .timeout(Some(batch_opts.query_timeout));
145
146                let tls_config = load_client_tls_config(batch_opts.frontend_tls.clone())
147                    .context(InvalidClientConfigSnafu)?;
148                ChannelManager::with_config(cfg, tls_config)
149            },
150            query,
151            batch_opts,
152        })
153    }
154
155    pub fn from_grpc_handler(
156        grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
157        query: QueryOptions,
158    ) -> Self {
159        let is_initialized = Arc::new(SetOnce::new_with(Some(())));
160        let handler = HandlerMutable {
161            handler: Arc::new(Mutex::new(Some(grpc_handler))),
162            is_initialized: is_initialized.clone(),
163        };
164
165        Self::Standalone {
166            database_client: handler,
167            query,
168        }
169    }
170}
171
172#[derive(Debug, Clone)]
173pub struct DatabaseWithPeer {
174    pub database: Database,
175    pub peer: Peer,
176}
177
178impl DatabaseWithPeer {
179    fn new(database: Database, peer: Peer) -> Self {
180        Self { database, peer }
181    }
182
183    /// Try sending a "SELECT 1" to the database
184    async fn try_select_one(&self) -> Result<(), Error> {
185        // notice here use `sql` for `SELECT 1` return 1 row
186        let _ = self
187            .database
188            .sql("SELECT 1")
189            .await
190            .with_context(|_| InvalidRequestSnafu {
191                context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
192            })?;
193        Ok(())
194    }
195}
196
197impl FrontendClient {
198    /// scan for available frontend from metadata
199    pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<Peer>, Error> {
200        let Self::Distributed { meta_client, .. } = self else {
201            return Ok(vec![]);
202        };
203
204        meta_client
205            .active_frontends()
206            .await
207            .map(|nodes| nodes.into_iter().map(|node| node.peer).collect())
208            .map_err(BoxedError::new)
209            .context(ExternalSnafu)
210    }
211
212    /// Probes all discovered frontends without auth.
213    ///
214    /// Returns non-auth failures to allow callers to retry transient connectivity
215    /// errors. Authentication failures are returned immediately because they mean
216    /// a frontend advertised an auth-protected endpoint to flownodes.
217    pub(crate) async fn check_all_frontends_without_auth(
218        &self,
219        frontends: &[Peer],
220    ) -> Result<Vec<String>, Error> {
221        let Self::Distributed {
222            chnl_mgr,
223            batch_opts,
224            ..
225        } = self
226        else {
227            return Ok(vec![]);
228        };
229
230        let probe_timeout = batch_opts.grpc_conn_timeout;
231        let mut probes = frontends
232            .iter()
233            .map(|peer| {
234                let addr = peer.addr.clone();
235                let chnl_mgr = chnl_mgr.clone();
236
237                async move {
238                    let client = Client::with_manager_and_urls(chnl_mgr, vec![addr.clone()]);
239                    let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
240
241                    match tokio::time::timeout(probe_timeout, database.sql("SELECT 1")).await {
242                        Ok(Ok(_)) => Ok(None),
243                        Ok(Err(err)) if err.tonic_code() == Some(tonic::Code::Unauthenticated) => {
244                            Err(err).context(InvalidRequestSnafu {
245                                context: format!(
246                                    "Frontend {addr} rejected unauthenticated flownode probe; ensure frontend internal_grpc is advertised to metasrv"
247                                ),
248                            })
249                        }
250                        Ok(Err(err)) => Ok(Some(format!("{addr}: {err}"))),
251                        Err(_) => Ok(Some(format!(
252                            "{addr}: health check timed out after {probe_timeout:?}"
253                        ))),
254                    }
255                }
256            })
257            .collect::<FuturesUnordered<_>>();
258
259        let mut failures = Vec::new();
260        while let Some(probe_result) = probes.next().await {
261            if let Some(failure) = probe_result? {
262                failures.push(failure);
263            }
264        }
265
266        Ok(failures)
267    }
268
269    /// Get a frontend discovered by metasrv and verified with a query probe.
270    async fn get_random_active_frontend(
271        &self,
272        catalog: &str,
273        schema: &str,
274    ) -> Result<DatabaseWithPeer, Error> {
275        let Self::Distributed {
276            meta_client: _,
277            chnl_mgr,
278            query: _,
279            batch_opts,
280        } = self
281        else {
282            return UnexpectedSnafu {
283                reason: "Expect distributed mode",
284            }
285            .fail();
286        };
287
288        let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
289        interval.tick().await;
290        for retry in 0..batch_opts.experimental_grpc_max_retries {
291            let mut frontends = self.scan_for_frontend().await?;
292            // shuffle the frontends to avoid always pick the same one
293            frontends.shuffle(&mut rng());
294
295            for peer in frontends {
296                let addr = peer.addr.clone();
297                let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
298                let database = Database::new(catalog, schema, client);
299                let db = DatabaseWithPeer::new(database, peer);
300                match db.try_select_one().await {
301                    Ok(_) => return Ok(db),
302                    Err(e) => {
303                        warn!(
304                            "Failed to connect to frontend {} on retry={}: \n{e:?}",
305                            addr, retry
306                        );
307                    }
308                }
309            }
310            // no available frontend
311            // sleep and retry
312            interval.tick().await;
313        }
314
315        NoAvailableFrontendSnafu {
316            timeout: batch_opts.grpc_conn_timeout,
317            context: "No available frontend found that is able to process query",
318        }
319        .fail()
320    }
321
322    pub async fn create(
323        &self,
324        create: CreateTableExpr,
325        catalog: &str,
326        schema: &str,
327    ) -> Result<u32, Error> {
328        self.handle(
329            Request::Ddl(api::v1::DdlRequest {
330                expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())),
331            }),
332            catalog,
333            schema,
334            &mut None,
335        )
336        .await
337        .map_err(BoxedError::new)
338        .with_context(|_| CreateSinkTableSnafu {
339            create: create.clone(),
340        })
341    }
342
343    /// Execute a SQL statement on the frontend.
344    pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
345        match self {
346            FrontendClient::Distributed { .. } => {
347                let db = self.get_random_active_frontend(catalog, schema).await?;
348                db.database
349                    .sql(sql)
350                    .await
351                    .map_err(BoxedError::new)
352                    .context(ExternalSnafu)
353            }
354            FrontendClient::Standalone {
355                database_client, ..
356            } => {
357                let ctx = QueryContextBuilder::default()
358                    .current_catalog(catalog.to_string())
359                    .current_schema(schema.to_string())
360                    .build();
361                let ctx = Arc::new(ctx);
362                {
363                    let database_client = {
364                        database_client
365                            .handler
366                            .lock()
367                            .unwrap()
368                            .as_ref()
369                            .context(UnexpectedSnafu {
370                                reason: "Standalone's frontend instance is not set",
371                            })?
372                            .upgrade()
373                            .context(UnexpectedSnafu {
374                                reason: "Failed to upgrade database client",
375                            })?
376                    };
377                    let req = Request::Query(QueryRequest {
378                        query: Some(Query::Sql(sql.to_string())),
379                    });
380                    database_client
381                        .do_query(req, ctx)
382                        .await
383                        .map_err(BoxedError::new)
384                        .context(ExternalSnafu)
385                }
386            }
387        }
388    }
389
390    /// Execute a flow query and return terminal metrics. `snapshot_seqs` are
391    /// optional read upper bounds used only by snapshot-fenced repair chunks.
392    pub(crate) async fn query_with_terminal_metrics(
393        &self,
394        catalog: &str,
395        schema: &str,
396        request: QueryRequest,
397        extensions: &[(&str, &str)],
398        snapshot_seqs: &HashMap<u64, u64>,
399        peer_desc: &mut Option<PeerDesc>,
400    ) -> Result<OutputWithMetrics, Error> {
401        let flow_extensions = build_flow_extensions(extensions)?;
402        match self {
403            FrontendClient::Distributed {
404                query, batch_opts, ..
405            } => {
406                let query_parallelism = query.parallelism.to_string();
407                let hints = vec![
408                    (QUERY_PARALLELISM_HINT, query_parallelism.as_str()),
409                    (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
410                ];
411                let db = self.get_random_active_frontend(catalog, schema).await?;
412                *peer_desc = Some(PeerDesc::Dist {
413                    peer: db.peer.clone(),
414                });
415                db.database
416                    .query_with_terminal_metrics_and_flow_extensions(
417                        request,
418                        &hints,
419                        extensions,
420                        snapshot_seqs,
421                    )
422                    .await
423                    .map_err(BoxedError::new)
424                    .context(ExternalSnafu)
425            }
426            FrontendClient::Standalone {
427                database_client,
428                query,
429            } => {
430                *peer_desc = Some(PeerDesc::Standalone);
431                let mut extensions_map = HashMap::from([(
432                    QUERY_PARALLELISM_HINT.to_string(),
433                    query.parallelism.to_string(),
434                )]);
435                for (key, value) in extensions {
436                    extensions_map.insert((*key).to_string(), (*value).to_string());
437                }
438                let ctx = QueryContextBuilder::default()
439                    .current_catalog(catalog.to_string())
440                    .current_schema(schema.to_string())
441                    .extensions(extensions_map)
442                    .snapshot_seqs(Arc::new(RwLock::new(snapshot_seqs.clone())))
443                    .build();
444                let ctx = Arc::new(ctx);
445                let database_client = {
446                    database_client
447                        .handler
448                        .lock()
449                        .map_err(|e| {
450                            UnexpectedSnafu {
451                                reason: format!("Failed to lock database client: {e}"),
452                            }
453                            .build()
454                        })?
455                        .as_ref()
456                        .context(UnexpectedSnafu {
457                            reason: "Standalone's frontend instance is not set",
458                        })?
459                        .upgrade()
460                        .context(UnexpectedSnafu {
461                            reason: "Failed to upgrade database client",
462                        })?
463                };
464                database_client
465                    .do_query(Request::Query(request), ctx.clone())
466                    .await
467                    .map(|output| {
468                        wrap_standalone_output_with_terminal_metrics(output, &flow_extensions)
469                    })
470                    .map_err(BoxedError::new)
471                    .context(ExternalSnafu)
472            }
473        }
474    }
475
476    /// Handle a request to frontend
477    pub(crate) async fn handle(
478        &self,
479        req: api::v1::greptime_request::Request,
480        catalog: &str,
481        schema: &str,
482        peer_desc: &mut Option<PeerDesc>,
483    ) -> Result<u32, Error> {
484        match self {
485            FrontendClient::Distributed {
486                query, batch_opts, ..
487            } => {
488                let db = self.get_random_active_frontend(catalog, schema).await?;
489
490                *peer_desc = Some(PeerDesc::Dist {
491                    peer: db.peer.clone(),
492                });
493
494                db.database
495                    .handle_with_retry(
496                        req.clone(),
497                        batch_opts.experimental_grpc_max_retries,
498                        &[
499                            (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
500                            (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
501                        ],
502                    )
503                    .await
504                    .with_context(|_| InvalidRequestSnafu {
505                        context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
506                    })
507            }
508            FrontendClient::Standalone {
509                database_client,
510                query,
511            } => {
512                let ctx = QueryContextBuilder::default()
513                    .current_catalog(catalog.to_string())
514                    .current_schema(schema.to_string())
515                    .extensions(HashMap::from([(
516                        QUERY_PARALLELISM_HINT.to_string(),
517                        query.parallelism.to_string(),
518                    )]))
519                    .build();
520                let ctx = Arc::new(ctx);
521                {
522                    let database_client = {
523                        database_client
524                            .handler
525                            .lock()
526                            .unwrap()
527                            .as_ref()
528                            .context(UnexpectedSnafu {
529                                reason: "Standalone's frontend instance is not set",
530                            })?
531                            .upgrade()
532                            .context(UnexpectedSnafu {
533                                reason: "Failed to upgrade database client",
534                            })?
535                    };
536                    let resp: common_query::Output = database_client
537                        .do_query(req, ctx)
538                        .await
539                        .map_err(BoxedError::new)
540                        .context(ExternalSnafu)?;
541                    match resp.data {
542                        common_query::OutputData::AffectedRows(rows) => {
543                            Ok(rows.try_into().map_err(|_| {
544                                UnexpectedSnafu {
545                                    reason: format!("Failed to convert rows to u32: {}", rows),
546                                }
547                                .build()
548                            })?)
549                        }
550                        _ => UnexpectedSnafu {
551                            reason: "Unexpected output data",
552                        }
553                        .fail(),
554                    }
555                }
556            }
557        }
558    }
559}
560
561fn build_flow_extensions(extensions: &[(&str, &str)]) -> Result<FlowQueryExtensions, Error> {
562    let flow_extensions = HashMap::from_iter(
563        extensions
564            .iter()
565            .map(|(key, value)| ((*key).to_string(), (*value).to_string())),
566    );
567    FlowQueryExtensions::parse_flow_extensions(&flow_extensions)
568        .map_err(BoxedError::new)
569        .context(ExternalSnafu)
570        .map(|extensions| extensions.unwrap_or_default())
571}
572
573fn wrap_standalone_output_with_terminal_metrics(
574    output: Output,
575    flow_extensions: &FlowQueryExtensions,
576) -> OutputWithMetrics {
577    let should_collect_region_watermark = flow_extensions.should_collect_region_watermark();
578    let terminal_metrics =
579        if should_collect_region_watermark && !matches!(&output.data, OutputData::Stream(_)) {
580            output
581                .meta
582                .plan
583                .clone()
584                .and_then(terminal_recordbatch_metrics_from_plan)
585        } else {
586            None
587        };
588    let result = OutputWithMetrics::from_output(output);
589    if let Some(metrics) = terminal_metrics {
590        result.metrics.update(Some(metrics));
591    }
592    result
593}
594
595/// Describe a peer of frontend
596#[derive(Debug, Default, Clone)]
597pub(crate) enum PeerDesc {
598    /// The query failed before a frontend peer was selected.
599    #[default]
600    Unknown,
601    /// Distributed mode's frontend peer address
602    Dist {
603        /// frontend peer address
604        peer: Peer,
605    },
606    /// Standalone mode
607    Standalone,
608}
609
610impl std::fmt::Display for PeerDesc {
611    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
612        match self {
613            PeerDesc::Unknown => write!(f, "unknown"),
614            PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
615            PeerDesc::Standalone => write!(f, "standalone"),
616        }
617    }
618}
619
620#[cfg(test)]
621mod tests {
622    use std::pin::Pin;
623    use std::task::{Context, Poll};
624    use std::time::Duration;
625
626    use arrow_flight::flight_service_server::FlightServiceServer;
627    use arrow_flight::{FlightData, Ticket};
628    use common_query::{Output, OutputData};
629    use common_recordbatch::adapter::RecordBatchMetrics;
630    use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
631    use datatypes::prelude::{ConcreteDataType, VectorRef};
632    use datatypes::schema::{ColumnSchema, Schema};
633    use datatypes::vectors::Int32Vector;
634    use futures::StreamExt;
635    use servers::grpc::flight::{FlightCraft, FlightCraftWrapper, TonicStream};
636    use tokio::net::TcpListener;
637    use tokio::task::JoinHandle;
638    use tokio::time::timeout;
639    use tokio_stream::wrappers::TcpListenerStream;
640    use tonic::{Request as TonicRequest, Response as TonicResponse, Status};
641
642    use super::*;
643
644    #[derive(Debug)]
645    struct NoopHandler;
646
647    struct MockMetricsStream {
648        schema: datatypes::schema::SchemaRef,
649        batch: Option<RecordBatch>,
650        metrics: RecordBatchMetrics,
651        terminal_metrics_only: bool,
652    }
653
654    impl futures::Stream for MockMetricsStream {
655        type Item = common_recordbatch::error::Result<RecordBatch>;
656
657        fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
658            Poll::Ready(self.batch.take().map(Ok))
659        }
660
661        fn size_hint(&self) -> (usize, Option<usize>) {
662            (
663                usize::from(self.batch.is_some()),
664                Some(usize::from(self.batch.is_some())),
665            )
666        }
667    }
668
669    impl RecordBatchStream for MockMetricsStream {
670        fn name(&self) -> &str {
671            "MockMetricsStream"
672        }
673
674        fn schema(&self) -> datatypes::schema::SchemaRef {
675            self.schema.clone()
676        }
677
678        fn output_ordering(&self) -> Option<&[OrderOption]> {
679            None
680        }
681
682        fn metrics(&self) -> Option<RecordBatchMetrics> {
683            if self.terminal_metrics_only && self.batch.is_some() {
684                return None;
685            }
686            Some(self.metrics.clone())
687        }
688    }
689
690    #[derive(Debug)]
691    struct MetricsHandler;
692
693    #[derive(Debug)]
694    struct ExtensionAwareHandler;
695
696    #[derive(Debug)]
697    struct SnapshotBindingHandler;
698
699    #[derive(Debug)]
700    struct RejectUnauthenticatedFlight;
701
702    #[derive(Debug)]
703    struct SlowFlight;
704
705    struct WaitForConcurrentFlight {
706        barrier: Arc<tokio::sync::Barrier>,
707    }
708
709    #[async_trait::async_trait]
710    impl GrpcQueryHandlerWithBoxedError for NoopHandler {
711        async fn do_query(
712            &self,
713            _query: Request,
714            _ctx: QueryContextRef,
715        ) -> std::result::Result<Output, BoxedError> {
716            Ok(Output::new_with_affected_rows(0))
717        }
718    }
719
720    #[async_trait::async_trait]
721    impl GrpcQueryHandlerWithBoxedError for MetricsHandler {
722        async fn do_query(
723            &self,
724            _query: Request,
725            _ctx: QueryContextRef,
726        ) -> std::result::Result<Output, BoxedError> {
727            let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
728                "v",
729                ConcreteDataType::int32_datatype(),
730                false,
731            )]));
732            let batch = RecordBatch::new(
733                schema.clone(),
734                vec![Arc::new(Int32Vector::from_slice([1, 2])) as VectorRef],
735            )
736            .unwrap();
737            Ok(Output::new_with_stream(Box::pin(MockMetricsStream {
738                schema,
739                batch: Some(batch),
740                metrics: RecordBatchMetrics {
741                    region_watermarks: vec![common_recordbatch::adapter::RegionWatermarkEntry {
742                        region_id: 42,
743                        watermark: Some(99),
744                    }],
745                    ..Default::default()
746                },
747                terminal_metrics_only: true,
748            })))
749        }
750    }
751
752    #[async_trait::async_trait]
753    impl GrpcQueryHandlerWithBoxedError for ExtensionAwareHandler {
754        async fn do_query(
755            &self,
756            _query: Request,
757            ctx: QueryContextRef,
758        ) -> std::result::Result<Output, BoxedError> {
759            assert_eq!(ctx.extension("flow.return_region_seq"), Some("true"));
760            Ok(Output::new_with_affected_rows(1))
761        }
762    }
763
764    #[async_trait::async_trait]
765    impl GrpcQueryHandlerWithBoxedError for SnapshotBindingHandler {
766        async fn do_query(
767            &self,
768            _query: Request,
769            ctx: QueryContextRef,
770        ) -> std::result::Result<Output, BoxedError> {
771            assert_eq!(ctx.extension("flow.return_region_seq"), Some("true"));
772            assert_eq!(ctx.get_snapshot(1), Some(10));
773            assert_eq!(ctx.get_snapshot(2), Some(20));
774            ctx.set_snapshot(42, 99);
775            Ok(Output::new_with_affected_rows(1))
776        }
777    }
778
779    #[async_trait::async_trait]
780    impl FlightCraft for RejectUnauthenticatedFlight {
781        async fn do_get(
782            &self,
783            _request: TonicRequest<Ticket>,
784        ) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
785            Err(Status::unauthenticated("auth failed"))
786        }
787    }
788
789    #[async_trait::async_trait]
790    impl FlightCraft for SlowFlight {
791        async fn do_get(
792            &self,
793            _request: TonicRequest<Ticket>,
794        ) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
795            tokio::time::sleep(Duration::from_secs(60)).await;
796            Err(Status::unavailable("slow response"))
797        }
798    }
799
800    #[async_trait::async_trait]
801    impl FlightCraft for WaitForConcurrentFlight {
802        async fn do_get(
803            &self,
804            _request: TonicRequest<Ticket>,
805        ) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
806            self.barrier.wait().await;
807            Err(Status::unavailable("probe started concurrently"))
808        }
809    }
810
811    async fn start_flight_server<T: FlightCraft>(handler: T) -> (String, JoinHandle<()>) {
812        let listener = TcpListener::bind("127.0.0.1:0")
813            .await
814            .expect("bind test flight server");
815        let addr = listener.local_addr().expect("local addr").to_string();
816        let server = tokio::spawn(async move {
817            tonic::transport::Server::builder()
818                .add_service(FlightServiceServer::new(FlightCraftWrapper(handler)))
819                .serve_with_incoming(TcpListenerStream::new(listener))
820                .await
821                .expect("serve test flight server");
822        });
823
824        (addr, server)
825    }
826
827    #[tokio::test]
828    async fn wait_initialized() {
829        let (client, handler_mut) =
830            FrontendClient::from_empty_grpc_handler(QueryOptions::default());
831
832        assert!(
833            timeout(Duration::from_millis(50), client.wait_initialized())
834                .await
835                .is_err()
836        );
837
838        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
839        handler_mut.set_handler(Arc::downgrade(&handler)).await;
840
841        timeout(Duration::from_secs(1), client.wait_initialized())
842            .await
843            .expect("wait_initialized should complete after handler is set");
844
845        timeout(Duration::from_millis(10), client.wait_initialized())
846            .await
847            .expect("wait_initialized should be a no-op once initialized");
848
849        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
850        let client =
851            FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
852        assert!(
853            timeout(Duration::from_millis(10), client.wait_initialized())
854                .await
855                .is_ok()
856        );
857
858        let meta_client = Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend));
859        let client = FrontendClient::from_meta_client(
860            meta_client,
861            QueryOptions::default(),
862            BatchingModeOptions::default(),
863        )
864        .unwrap();
865        assert!(
866            timeout(Duration::from_millis(10), client.wait_initialized())
867                .await
868                .is_ok()
869        );
870    }
871
872    #[tokio::test]
873    async fn test_query_with_terminal_metrics_tracks_watermark_in_standalone_mode() {
874        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(MetricsHandler);
875        let client =
876            FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
877        let mut peer_desc = None;
878
879        let result = client
880            .query_with_terminal_metrics(
881                "greptime",
882                "public",
883                QueryRequest {
884                    query: Some(Query::Sql("select 1".to_string())),
885                },
886                &[],
887                &HashMap::new(),
888                &mut peer_desc,
889            )
890            .await
891            .unwrap();
892        assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
893
894        let terminal_metrics = result.metrics.clone();
895        assert!(!result.metrics.is_ready());
896        assert!(terminal_metrics.get().is_none());
897
898        let OutputData::Stream(mut stream) = result.output.data else {
899            panic!("expected stream output");
900        };
901        while stream.next().await.is_some() {}
902
903        assert!(terminal_metrics.is_ready());
904        assert_eq!(
905            terminal_metrics.region_watermark_map(),
906            Some(HashMap::from([(42_u64, 99_u64)]))
907        );
908    }
909
910    #[tokio::test]
911    async fn test_query_with_terminal_metrics_forwards_flow_extensions_in_standalone_mode() {
912        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(ExtensionAwareHandler);
913        let client =
914            FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
915        let mut peer_desc = None;
916
917        let result = client
918            .query_with_terminal_metrics(
919                "greptime",
920                "public",
921                QueryRequest {
922                    query: Some(Query::Sql("insert into t select 1".to_string())),
923                },
924                &[("flow.return_region_seq", "true")],
925                &HashMap::new(),
926                &mut peer_desc,
927            )
928            .await
929            .unwrap();
930        assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
931
932        assert!(result.metrics.is_ready());
933        assert!(result.region_watermark_map().is_none());
934    }
935
936    #[tokio::test]
937    async fn test_query_with_terminal_metrics_uses_standalone_snapshot_bounds() {
938        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(SnapshotBindingHandler);
939        let client =
940            FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
941        let mut peer_desc = None;
942
943        let result = client
944            .query_with_terminal_metrics(
945                "greptime",
946                "public",
947                QueryRequest {
948                    query: Some(Query::Sql("insert into t select * from src".to_string())),
949                },
950                &[("flow.return_region_seq", "true")],
951                &HashMap::from([(1, 10), (2, 20)]),
952                &mut peer_desc,
953            )
954            .await
955            .unwrap();
956        assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
957
958        assert!(result.metrics.is_ready());
959        assert_eq!(result.region_watermark_map(), None);
960    }
961
962    #[tokio::test]
963    async fn test_query_with_terminal_metrics_rejects_invalid_flow_extensions() {
964        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
965        let client =
966            FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
967        let mut peer_desc = None;
968
969        let err = client
970            .query_with_terminal_metrics(
971                "greptime",
972                "public",
973                QueryRequest {
974                    query: Some(Query::Sql("select 1".to_string())),
975                },
976                &[("flow.return_region_seq", "not-a-bool")],
977                &HashMap::new(),
978                &mut peer_desc,
979            )
980            .await
981            .unwrap_err();
982
983        assert!(format!("{err:?}").contains("Invalid value for flow.return_region_seq"));
984    }
985
986    #[tokio::test]
987    async fn test_check_all_frontends_without_auth_fails_fast_on_unauthenticated_frontend() {
988        let (addr, server) = start_flight_server(RejectUnauthenticatedFlight).await;
989        let client = FrontendClient::from_meta_client(
990            Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
991            QueryOptions::default(),
992            BatchingModeOptions::default(),
993        )
994        .unwrap();
995
996        let err = client
997            .check_all_frontends_without_auth(&[Peer {
998                id: 1,
999                addr: addr.clone(),
1000            }])
1001            .await
1002            .unwrap_err();
1003        server.abort();
1004
1005        let Error::InvalidRequest {
1006            context, source, ..
1007        } = err
1008        else {
1009            panic!("expected InvalidRequest, got {err:?}");
1010        };
1011        assert!(context.contains(&addr));
1012        assert!(context.contains("rejected unauthenticated flownode probe"));
1013        assert_eq!(source.tonic_code(), Some(tonic::Code::Unauthenticated));
1014    }
1015
1016    #[tokio::test]
1017    async fn test_check_all_frontends_without_auth_uses_grpc_connection_timeout() {
1018        let (addr, server) = start_flight_server(SlowFlight).await;
1019        let client = FrontendClient::from_meta_client(
1020            Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
1021            QueryOptions::default(),
1022            BatchingModeOptions {
1023                grpc_conn_timeout: Duration::from_millis(50),
1024                ..Default::default()
1025            },
1026        )
1027        .unwrap();
1028
1029        let failures = client
1030            .check_all_frontends_without_auth(&[Peer {
1031                id: 1,
1032                addr: addr.clone(),
1033            }])
1034            .await
1035            .unwrap();
1036        server.abort();
1037
1038        assert_eq!(failures.len(), 1);
1039        assert!(failures[0].contains(&addr));
1040        assert!(failures[0].contains("health check timed out"));
1041    }
1042
1043    #[tokio::test]
1044    async fn test_check_all_frontends_without_auth_checks_frontends_concurrently() {
1045        let barrier = Arc::new(tokio::sync::Barrier::new(2));
1046        let (addr1, server1) = start_flight_server(WaitForConcurrentFlight {
1047            barrier: barrier.clone(),
1048        })
1049        .await;
1050        let (addr2, server2) = start_flight_server(WaitForConcurrentFlight { barrier }).await;
1051        let client = FrontendClient::from_meta_client(
1052            Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
1053            QueryOptions::default(),
1054            BatchingModeOptions {
1055                grpc_conn_timeout: Duration::from_millis(500),
1056                ..Default::default()
1057            },
1058        )
1059        .unwrap();
1060
1061        let failures = timeout(
1062            Duration::from_secs(2),
1063            client.check_all_frontends_without_auth(&[
1064                Peer {
1065                    id: 1,
1066                    addr: addr1.clone(),
1067                },
1068                Peer {
1069                    id: 2,
1070                    addr: addr2.clone(),
1071                },
1072            ]),
1073        )
1074        .await
1075        .expect("concurrent probes should complete before per-peer timeouts")
1076        .unwrap();
1077        server1.abort();
1078        server2.abort();
1079
1080        assert_eq!(failures.len(), 2);
1081        assert!(failures.iter().any(|failure| failure.contains(&addr1)));
1082        assert!(failures.iter().any(|failure| failure.contains(&addr2)));
1083        assert!(
1084            failures
1085                .iter()
1086                .all(|failure| !failure.contains("health check timed out")),
1087            "sequential probes would time out before both requests reach the barrier: {failures:?}"
1088        );
1089    }
1090}