flow/batching_mode/
frontend_client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
16
17use std::collections::HashMap;
18use std::sync::{Arc, Mutex, Weak};
19use std::time::SystemTime;
20
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{CreateTableExpr, QueryRequest};
24use client::{Client, Database};
25use common_error::ext::BoxedError;
26use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
27use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
28use common_meta::peer::Peer;
29use common_meta::rpc::store::RangeRequest;
30use common_query::Output;
31use common_telemetry::warn;
32use meta_client::client::MetaClient;
33use query::datafusion::QUERY_PARALLELISM_HINT;
34use query::options::QueryOptions;
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use session::hints::READ_PREFERENCE_HINT;
40use snafu::{OptionExt, ResultExt};
41use tokio::sync::SetOnce;
42
43use crate::batching_mode::BatchingModeOptions;
44use crate::error::{
45    CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
46    NoAvailableFrontendSnafu, UnexpectedSnafu,
47};
48use crate::{Error, FlowAuthHeader};
49
50/// Adapter trait for [`GrpcQueryHandler`] that boxes the underlying error into [`BoxedError`].
51///
52/// This is mainly used by flownode to invoke a frontend instance in standalone mode.
53#[async_trait::async_trait]
54pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
55    async fn do_query(
56        &self,
57        query: Request,
58        ctx: QueryContextRef,
59    ) -> std::result::Result<Output, BoxedError>;
60}
61
62/// auto impl
63#[async_trait::async_trait]
64impl<T: GrpcQueryHandler + Send + Sync + 'static> GrpcQueryHandlerWithBoxedError for T {
65    async fn do_query(
66        &self,
67        query: Request,
68        ctx: QueryContextRef,
69    ) -> std::result::Result<Output, BoxedError> {
70        self.do_query(query, ctx).await.map_err(BoxedError::new)
71    }
72}
73
74#[derive(Debug, Clone)]
75pub struct HandlerMutable {
76    handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
77    is_initialized: Arc<SetOnce<()>>,
78}
79
80impl HandlerMutable {
81    pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
82        *self.handler.lock().unwrap() = Some(handler);
83        // Ignore the error, as we allow the handler to be set multiple times.
84        let _ = self.is_initialized.set(());
85    }
86}
87
88/// A simple frontend client able to execute sql using grpc protocol
89///
90/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
91#[derive(Debug, Clone)]
92pub enum FrontendClient {
93    Distributed {
94        meta_client: Arc<MetaClient>,
95        chnl_mgr: ChannelManager,
96        auth: Option<FlowAuthHeader>,
97        query: QueryOptions,
98        batch_opts: BatchingModeOptions,
99    },
100    Standalone {
101        /// for the sake of simplicity still use grpc even in standalone mode
102        /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
103        database_client: HandlerMutable,
104        query: QueryOptions,
105    },
106}
107
108impl FrontendClient {
109    /// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
110    pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
111        let is_initialized = Arc::new(SetOnce::new());
112        let handler = HandlerMutable {
113            handler: Arc::new(Mutex::new(None)),
114            is_initialized,
115        };
116        (
117            Self::Standalone {
118                database_client: handler.clone(),
119                query,
120            },
121            handler,
122        )
123    }
124
125    /// Waits until the frontend client is initialized.
126    pub async fn wait_initialized(&self) {
127        if let FrontendClient::Standalone {
128            database_client, ..
129        } = self
130        {
131            database_client.is_initialized.wait().await;
132        }
133    }
134
135    pub fn from_meta_client(
136        meta_client: Arc<MetaClient>,
137        auth: Option<FlowAuthHeader>,
138        query: QueryOptions,
139        batch_opts: BatchingModeOptions,
140    ) -> Result<Self, Error> {
141        common_telemetry::info!("Frontend client build with auth={:?}", auth);
142        Ok(Self::Distributed {
143            meta_client,
144            chnl_mgr: {
145                let cfg = ChannelConfig::new()
146                    .connect_timeout(batch_opts.grpc_conn_timeout)
147                    .timeout(Some(batch_opts.query_timeout));
148
149                let tls_config = load_client_tls_config(batch_opts.frontend_tls.clone())
150                    .context(InvalidClientConfigSnafu)?;
151                ChannelManager::with_config(cfg, tls_config)
152            },
153            auth,
154            query,
155            batch_opts,
156        })
157    }
158
159    pub fn from_grpc_handler(
160        grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
161        query: QueryOptions,
162    ) -> Self {
163        let is_initialized = Arc::new(SetOnce::new_with(Some(())));
164        let handler = HandlerMutable {
165            handler: Arc::new(Mutex::new(Some(grpc_handler))),
166            is_initialized: is_initialized.clone(),
167        };
168
169        Self::Standalone {
170            database_client: handler,
171            query,
172        }
173    }
174}
175
176#[derive(Debug, Clone)]
177pub struct DatabaseWithPeer {
178    pub database: Database,
179    pub peer: Peer,
180}
181
182impl DatabaseWithPeer {
183    fn new(database: Database, peer: Peer) -> Self {
184        Self { database, peer }
185    }
186
187    /// Try sending a "SELECT 1" to the database
188    async fn try_select_one(&self) -> Result<(), Error> {
189        // notice here use `sql` for `SELECT 1` return 1 row
190        let _ = self
191            .database
192            .sql("SELECT 1")
193            .await
194            .with_context(|_| InvalidRequestSnafu {
195                context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
196            })?;
197        Ok(())
198    }
199}
200
201impl FrontendClient {
202    /// scan for available frontend from metadata
203    pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
204        let Self::Distributed { meta_client, .. } = self else {
205            return Ok(vec![]);
206        };
207        let cluster_client = meta_client
208            .cluster_client()
209            .map_err(BoxedError::new)
210            .context(ExternalSnafu)?;
211
212        let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
213        let req = RangeRequest::new().with_prefix(prefix);
214        let resp = cluster_client
215            .range(req)
216            .await
217            .map_err(BoxedError::new)
218            .context(ExternalSnafu)?;
219        let mut res = Vec::with_capacity(resp.kvs.len());
220        for kv in resp.kvs {
221            let key = NodeInfoKey::try_from(kv.key)
222                .map_err(BoxedError::new)
223                .context(ExternalSnafu)?;
224
225            let val = NodeInfo::try_from(kv.value)
226                .map_err(BoxedError::new)
227                .context(ExternalSnafu)?;
228            res.push((key, val));
229        }
230        Ok(res)
231    }
232
233    /// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
234    /// and is able to process query
235    async fn get_random_active_frontend(
236        &self,
237        catalog: &str,
238        schema: &str,
239    ) -> Result<DatabaseWithPeer, Error> {
240        let Self::Distributed {
241            meta_client: _,
242            chnl_mgr,
243            auth,
244            query: _,
245            batch_opts,
246        } = self
247        else {
248            return UnexpectedSnafu {
249                reason: "Expect distributed mode",
250            }
251            .fail();
252        };
253
254        let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
255        interval.tick().await;
256        for retry in 0..batch_opts.experimental_grpc_max_retries {
257            let mut frontends = self.scan_for_frontend().await?;
258            let now_in_ms = SystemTime::now()
259                .duration_since(SystemTime::UNIX_EPOCH)
260                .unwrap()
261                .as_millis() as i64;
262            // shuffle the frontends to avoid always pick the same one
263            frontends.shuffle(&mut rng());
264
265            // found node with maximum last_activity_ts
266            for (_, node_info) in frontends
267                .iter()
268                // filter out frontend that have been down for more than 1 min
269                .filter(|(_, node_info)| {
270                    node_info.last_activity_ts
271                        + batch_opts
272                            .experimental_frontend_activity_timeout
273                            .as_millis() as i64
274                        > now_in_ms
275                })
276            {
277                let addr = &node_info.peer.addr;
278                let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
279                let database = {
280                    let mut db = Database::new(catalog, schema, client);
281                    if let Some(auth) = auth {
282                        db.set_auth(auth.auth().clone());
283                    }
284                    db
285                };
286                let db = DatabaseWithPeer::new(database, node_info.peer.clone());
287                match db.try_select_one().await {
288                    Ok(_) => return Ok(db),
289                    Err(e) => {
290                        warn!(
291                            "Failed to connect to frontend {} on retry={}: \n{e:?}",
292                            addr, retry
293                        );
294                    }
295                }
296            }
297            // no available frontend
298            // sleep and retry
299            interval.tick().await;
300        }
301
302        NoAvailableFrontendSnafu {
303            timeout: batch_opts.grpc_conn_timeout,
304            context: "No available frontend found that is able to process query",
305        }
306        .fail()
307    }
308
309    pub async fn create(
310        &self,
311        create: CreateTableExpr,
312        catalog: &str,
313        schema: &str,
314    ) -> Result<u32, Error> {
315        self.handle(
316            Request::Ddl(api::v1::DdlRequest {
317                expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())),
318            }),
319            catalog,
320            schema,
321            &mut None,
322        )
323        .await
324        .map_err(BoxedError::new)
325        .with_context(|_| CreateSinkTableSnafu {
326            create: create.clone(),
327        })
328    }
329
330    /// Execute a SQL statement on the frontend.
331    pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
332        match self {
333            FrontendClient::Distributed { .. } => {
334                let db = self.get_random_active_frontend(catalog, schema).await?;
335                db.database
336                    .sql(sql)
337                    .await
338                    .map_err(BoxedError::new)
339                    .context(ExternalSnafu)
340            }
341            FrontendClient::Standalone {
342                database_client, ..
343            } => {
344                let ctx = QueryContextBuilder::default()
345                    .current_catalog(catalog.to_string())
346                    .current_schema(schema.to_string())
347                    .build();
348                let ctx = Arc::new(ctx);
349                {
350                    let database_client = {
351                        database_client
352                            .handler
353                            .lock()
354                            .map_err(|e| {
355                                UnexpectedSnafu {
356                                    reason: format!("Failed to lock database client: {e}"),
357                                }
358                                .build()
359                            })?
360                            .as_ref()
361                            .context(UnexpectedSnafu {
362                                reason: "Standalone's frontend instance is not set",
363                            })?
364                            .upgrade()
365                            .context(UnexpectedSnafu {
366                                reason: "Failed to upgrade database client",
367                            })?
368                    };
369                    let req = Request::Query(QueryRequest {
370                        query: Some(Query::Sql(sql.to_string())),
371                    });
372                    database_client
373                        .do_query(req, ctx)
374                        .await
375                        .map_err(BoxedError::new)
376                        .context(ExternalSnafu)
377                }
378            }
379        }
380    }
381
382    /// Handle a request to frontend
383    pub(crate) async fn handle(
384        &self,
385        req: api::v1::greptime_request::Request,
386        catalog: &str,
387        schema: &str,
388        peer_desc: &mut Option<PeerDesc>,
389    ) -> Result<u32, Error> {
390        match self {
391            FrontendClient::Distributed {
392                query, batch_opts, ..
393            } => {
394                let db = self.get_random_active_frontend(catalog, schema).await?;
395
396                *peer_desc = Some(PeerDesc::Dist {
397                    peer: db.peer.clone(),
398                });
399
400                db.database
401                    .handle_with_retry(
402                        req.clone(),
403                        batch_opts.experimental_grpc_max_retries,
404                        &[
405                            (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
406                            (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
407                        ],
408                    )
409                    .await
410                    .with_context(|_| InvalidRequestSnafu {
411                        context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
412                    })
413            }
414            FrontendClient::Standalone {
415                database_client,
416                query,
417            } => {
418                let ctx = QueryContextBuilder::default()
419                    .current_catalog(catalog.to_string())
420                    .current_schema(schema.to_string())
421                    .extensions(HashMap::from([(
422                        QUERY_PARALLELISM_HINT.to_string(),
423                        query.parallelism.to_string(),
424                    )]))
425                    .build();
426                let ctx = Arc::new(ctx);
427                {
428                    let database_client = {
429                        database_client
430                            .handler
431                            .lock()
432                            .map_err(|e| {
433                                UnexpectedSnafu {
434                                    reason: format!("Failed to lock database client: {e}"),
435                                }
436                                .build()
437                            })?
438                            .as_ref()
439                            .context(UnexpectedSnafu {
440                                reason: "Standalone's frontend instance is not set",
441                            })?
442                            .upgrade()
443                            .context(UnexpectedSnafu {
444                                reason: "Failed to upgrade database client",
445                            })?
446                    };
447                    let resp: common_query::Output = database_client
448                        .do_query(req, ctx)
449                        .await
450                        .map_err(BoxedError::new)
451                        .context(ExternalSnafu)?;
452                    match resp.data {
453                        common_query::OutputData::AffectedRows(rows) => {
454                            Ok(rows.try_into().map_err(|_| {
455                                UnexpectedSnafu {
456                                    reason: format!("Failed to convert rows to u32: {}", rows),
457                                }
458                                .build()
459                            })?)
460                        }
461                        _ => UnexpectedSnafu {
462                            reason: "Unexpected output data",
463                        }
464                        .fail(),
465                    }
466                }
467            }
468        }
469    }
470}
471
472/// Describe a peer of frontend
473#[derive(Debug, Default)]
474pub(crate) enum PeerDesc {
475    /// Distributed mode's frontend peer address
476    Dist {
477        /// frontend peer address
478        peer: Peer,
479    },
480    /// Standalone mode
481    #[default]
482    Standalone,
483}
484
485impl std::fmt::Display for PeerDesc {
486    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
487        match self {
488            PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
489            PeerDesc::Standalone => write!(f, "standalone"),
490        }
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use std::time::Duration;
497
498    use common_query::Output;
499    use tokio::time::timeout;
500
501    use super::*;
502
503    #[derive(Debug)]
504    struct NoopHandler;
505
506    #[async_trait::async_trait]
507    impl GrpcQueryHandlerWithBoxedError for NoopHandler {
508        async fn do_query(
509            &self,
510            _query: Request,
511            _ctx: QueryContextRef,
512        ) -> std::result::Result<Output, BoxedError> {
513            Ok(Output::new_with_affected_rows(0))
514        }
515    }
516
517    #[tokio::test]
518    async fn wait_initialized() {
519        let (client, handler_mut) =
520            FrontendClient::from_empty_grpc_handler(QueryOptions::default());
521
522        assert!(
523            timeout(Duration::from_millis(50), client.wait_initialized())
524                .await
525                .is_err()
526        );
527
528        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
529        handler_mut.set_handler(Arc::downgrade(&handler)).await;
530
531        timeout(Duration::from_secs(1), client.wait_initialized())
532            .await
533            .expect("wait_initialized should complete after handler is set");
534
535        timeout(Duration::from_millis(10), client.wait_initialized())
536            .await
537            .expect("wait_initialized should be a no-op once initialized");
538
539        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
540        let client =
541            FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
542        assert!(
543            timeout(Duration::from_millis(10), client.wait_initialized())
544                .await
545                .is_ok()
546        );
547
548        let meta_client = Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend));
549        let client = FrontendClient::from_meta_client(
550            meta_client,
551            None,
552            QueryOptions::default(),
553            BatchingModeOptions::default(),
554        )
555        .unwrap();
556        assert!(
557            timeout(Duration::from_millis(10), client.wait_initialized())
558                .await
559                .is_ok()
560        );
561    }
562}