Skip to main content

flow/batching_mode/
frontend_client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
16
17use std::collections::HashMap;
18use std::sync::{Arc, Mutex, Weak};
19
20use api::v1::greptime_request::Request;
21use api::v1::query_request::Query;
22use api::v1::{CreateTableExpr, QueryRequest};
23use client::{Client, Database};
24use common_error::ext::BoxedError;
25use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
26use common_meta::peer::{Peer, PeerDiscovery};
27use common_query::Output;
28use common_telemetry::warn;
29use meta_client::client::MetaClient;
30use query::datafusion::QUERY_PARALLELISM_HINT;
31use query::options::QueryOptions;
32use rand::rng;
33use rand::seq::SliceRandom;
34use servers::query_handler::grpc::GrpcQueryHandler;
35use session::context::{QueryContextBuilder, QueryContextRef};
36use session::hints::READ_PREFERENCE_HINT;
37use snafu::{OptionExt, ResultExt};
38use tokio::sync::SetOnce;
39
40use crate::batching_mode::BatchingModeOptions;
41use crate::error::{
42    CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
43    NoAvailableFrontendSnafu, UnexpectedSnafu,
44};
45use crate::{Error, FlowAuthHeader};
46
47/// Adapter trait for [`GrpcQueryHandler`] that boxes the underlying error into [`BoxedError`].
48///
49/// This is mainly used by flownode to invoke a frontend instance in standalone mode.
50#[async_trait::async_trait]
51pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
52    async fn do_query(
53        &self,
54        query: Request,
55        ctx: QueryContextRef,
56    ) -> std::result::Result<Output, BoxedError>;
57}
58
59/// auto impl
60#[async_trait::async_trait]
61impl<T: GrpcQueryHandler + Send + Sync + 'static> GrpcQueryHandlerWithBoxedError for T {
62    async fn do_query(
63        &self,
64        query: Request,
65        ctx: QueryContextRef,
66    ) -> std::result::Result<Output, BoxedError> {
67        self.do_query(query, ctx).await.map_err(BoxedError::new)
68    }
69}
70
71#[derive(Debug, Clone)]
72pub struct HandlerMutable {
73    handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
74    is_initialized: Arc<SetOnce<()>>,
75}
76
77impl HandlerMutable {
78    pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
79        *self.handler.lock().unwrap() = Some(handler);
80        // Ignore the error, as we allow the handler to be set multiple times.
81        let _ = self.is_initialized.set(());
82    }
83}
84
85/// A simple frontend client able to execute sql using grpc protocol
86///
87/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
88#[derive(Debug, Clone)]
89pub enum FrontendClient {
90    Distributed {
91        meta_client: Arc<MetaClient>,
92        chnl_mgr: ChannelManager,
93        auth: Option<FlowAuthHeader>,
94        query: QueryOptions,
95        batch_opts: BatchingModeOptions,
96    },
97    Standalone {
98        /// for the sake of simplicity still use grpc even in standalone mode
99        /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
100        database_client: HandlerMutable,
101        query: QueryOptions,
102    },
103}
104
105impl FrontendClient {
106    /// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
107    pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
108        let is_initialized = Arc::new(SetOnce::new());
109        let handler = HandlerMutable {
110            handler: Arc::new(Mutex::new(None)),
111            is_initialized,
112        };
113        (
114            Self::Standalone {
115                database_client: handler.clone(),
116                query,
117            },
118            handler,
119        )
120    }
121
122    /// Waits until the frontend client is initialized.
123    pub async fn wait_initialized(&self) {
124        if let FrontendClient::Standalone {
125            database_client, ..
126        } = self
127        {
128            database_client.is_initialized.wait().await;
129        }
130    }
131
132    pub fn from_meta_client(
133        meta_client: Arc<MetaClient>,
134        auth: Option<FlowAuthHeader>,
135        query: QueryOptions,
136        batch_opts: BatchingModeOptions,
137    ) -> Result<Self, Error> {
138        common_telemetry::info!("Frontend client build with auth={:?}", auth);
139        Ok(Self::Distributed {
140            meta_client,
141            chnl_mgr: {
142                let cfg = ChannelConfig::new()
143                    .connect_timeout(batch_opts.grpc_conn_timeout)
144                    .timeout(Some(batch_opts.query_timeout));
145
146                let tls_config = load_client_tls_config(batch_opts.frontend_tls.clone())
147                    .context(InvalidClientConfigSnafu)?;
148                ChannelManager::with_config(cfg, tls_config)
149            },
150            auth,
151            query,
152            batch_opts,
153        })
154    }
155
156    pub fn from_grpc_handler(
157        grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
158        query: QueryOptions,
159    ) -> Self {
160        let is_initialized = Arc::new(SetOnce::new_with(Some(())));
161        let handler = HandlerMutable {
162            handler: Arc::new(Mutex::new(Some(grpc_handler))),
163            is_initialized: is_initialized.clone(),
164        };
165
166        Self::Standalone {
167            database_client: handler,
168            query,
169        }
170    }
171}
172
173#[derive(Debug, Clone)]
174pub struct DatabaseWithPeer {
175    pub database: Database,
176    pub peer: Peer,
177}
178
179impl DatabaseWithPeer {
180    fn new(database: Database, peer: Peer) -> Self {
181        Self { database, peer }
182    }
183
184    /// Try sending a "SELECT 1" to the database
185    async fn try_select_one(&self) -> Result<(), Error> {
186        // notice here use `sql` for `SELECT 1` return 1 row
187        let _ = self
188            .database
189            .sql("SELECT 1")
190            .await
191            .with_context(|_| InvalidRequestSnafu {
192                context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
193            })?;
194        Ok(())
195    }
196}
197
198impl FrontendClient {
199    /// scan for available frontend from metadata
200    pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<Peer>, Error> {
201        let Self::Distributed { meta_client, .. } = self else {
202            return Ok(vec![]);
203        };
204
205        meta_client
206            .active_frontends()
207            .await
208            .map_err(BoxedError::new)
209            .context(ExternalSnafu)
210    }
211
212    /// Get a frontend discovered by metasrv and verified with a query probe.
213    async fn get_random_active_frontend(
214        &self,
215        catalog: &str,
216        schema: &str,
217    ) -> Result<DatabaseWithPeer, Error> {
218        let Self::Distributed {
219            meta_client: _,
220            chnl_mgr,
221            auth,
222            query: _,
223            batch_opts,
224        } = self
225        else {
226            return UnexpectedSnafu {
227                reason: "Expect distributed mode",
228            }
229            .fail();
230        };
231
232        let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
233        interval.tick().await;
234        for retry in 0..batch_opts.experimental_grpc_max_retries {
235            let mut frontends = self.scan_for_frontend().await?;
236            // shuffle the frontends to avoid always pick the same one
237            frontends.shuffle(&mut rng());
238
239            for peer in frontends {
240                let addr = peer.addr.clone();
241                let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
242                let database = {
243                    let mut db = Database::new(catalog, schema, client);
244                    if let Some(auth) = auth {
245                        db.set_auth(auth.auth().clone());
246                    }
247                    db
248                };
249                let db = DatabaseWithPeer::new(database, peer);
250                match db.try_select_one().await {
251                    Ok(_) => return Ok(db),
252                    Err(e) => {
253                        warn!(
254                            "Failed to connect to frontend {} on retry={}: \n{e:?}",
255                            addr, retry
256                        );
257                    }
258                }
259            }
260            // no available frontend
261            // sleep and retry
262            interval.tick().await;
263        }
264
265        NoAvailableFrontendSnafu {
266            timeout: batch_opts.grpc_conn_timeout,
267            context: "No available frontend found that is able to process query",
268        }
269        .fail()
270    }
271
272    pub async fn create(
273        &self,
274        create: CreateTableExpr,
275        catalog: &str,
276        schema: &str,
277    ) -> Result<u32, Error> {
278        self.handle(
279            Request::Ddl(api::v1::DdlRequest {
280                expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())),
281            }),
282            catalog,
283            schema,
284            &mut None,
285        )
286        .await
287        .map_err(BoxedError::new)
288        .with_context(|_| CreateSinkTableSnafu {
289            create: create.clone(),
290        })
291    }
292
293    /// Execute a SQL statement on the frontend.
294    pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
295        match self {
296            FrontendClient::Distributed { .. } => {
297                let db = self.get_random_active_frontend(catalog, schema).await?;
298                db.database
299                    .sql(sql)
300                    .await
301                    .map_err(BoxedError::new)
302                    .context(ExternalSnafu)
303            }
304            FrontendClient::Standalone {
305                database_client, ..
306            } => {
307                let ctx = QueryContextBuilder::default()
308                    .current_catalog(catalog.to_string())
309                    .current_schema(schema.to_string())
310                    .build();
311                let ctx = Arc::new(ctx);
312                {
313                    let database_client = {
314                        database_client
315                            .handler
316                            .lock()
317                            .map_err(|e| {
318                                UnexpectedSnafu {
319                                    reason: format!("Failed to lock database client: {e}"),
320                                }
321                                .build()
322                            })?
323                            .as_ref()
324                            .context(UnexpectedSnafu {
325                                reason: "Standalone's frontend instance is not set",
326                            })?
327                            .upgrade()
328                            .context(UnexpectedSnafu {
329                                reason: "Failed to upgrade database client",
330                            })?
331                    };
332                    let req = Request::Query(QueryRequest {
333                        query: Some(Query::Sql(sql.to_string())),
334                    });
335                    database_client
336                        .do_query(req, ctx)
337                        .await
338                        .map_err(BoxedError::new)
339                        .context(ExternalSnafu)
340                }
341            }
342        }
343    }
344
345    /// Handle a request to frontend
346    pub(crate) async fn handle(
347        &self,
348        req: api::v1::greptime_request::Request,
349        catalog: &str,
350        schema: &str,
351        peer_desc: &mut Option<PeerDesc>,
352    ) -> Result<u32, Error> {
353        match self {
354            FrontendClient::Distributed {
355                query, batch_opts, ..
356            } => {
357                let db = self.get_random_active_frontend(catalog, schema).await?;
358
359                *peer_desc = Some(PeerDesc::Dist {
360                    peer: db.peer.clone(),
361                });
362
363                db.database
364                    .handle_with_retry(
365                        req.clone(),
366                        batch_opts.experimental_grpc_max_retries,
367                        &[
368                            (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
369                            (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
370                        ],
371                    )
372                    .await
373                    .with_context(|_| InvalidRequestSnafu {
374                        context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
375                    })
376            }
377            FrontendClient::Standalone {
378                database_client,
379                query,
380            } => {
381                let ctx = QueryContextBuilder::default()
382                    .current_catalog(catalog.to_string())
383                    .current_schema(schema.to_string())
384                    .extensions(HashMap::from([(
385                        QUERY_PARALLELISM_HINT.to_string(),
386                        query.parallelism.to_string(),
387                    )]))
388                    .build();
389                let ctx = Arc::new(ctx);
390                {
391                    let database_client = {
392                        database_client
393                            .handler
394                            .lock()
395                            .map_err(|e| {
396                                UnexpectedSnafu {
397                                    reason: format!("Failed to lock database client: {e}"),
398                                }
399                                .build()
400                            })?
401                            .as_ref()
402                            .context(UnexpectedSnafu {
403                                reason: "Standalone's frontend instance is not set",
404                            })?
405                            .upgrade()
406                            .context(UnexpectedSnafu {
407                                reason: "Failed to upgrade database client",
408                            })?
409                    };
410                    let resp: common_query::Output = database_client
411                        .do_query(req, ctx)
412                        .await
413                        .map_err(BoxedError::new)
414                        .context(ExternalSnafu)?;
415                    match resp.data {
416                        common_query::OutputData::AffectedRows(rows) => {
417                            Ok(rows.try_into().map_err(|_| {
418                                UnexpectedSnafu {
419                                    reason: format!("Failed to convert rows to u32: {}", rows),
420                                }
421                                .build()
422                            })?)
423                        }
424                        _ => UnexpectedSnafu {
425                            reason: "Unexpected output data",
426                        }
427                        .fail(),
428                    }
429                }
430            }
431        }
432    }
433}
434
435/// Describe a peer of frontend
436#[derive(Debug, Default)]
437pub(crate) enum PeerDesc {
438    /// Distributed mode's frontend peer address
439    Dist {
440        /// frontend peer address
441        peer: Peer,
442    },
443    /// Standalone mode
444    #[default]
445    Standalone,
446}
447
448impl std::fmt::Display for PeerDesc {
449    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
450        match self {
451            PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
452            PeerDesc::Standalone => write!(f, "standalone"),
453        }
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use std::time::Duration;
460
461    use common_query::Output;
462    use tokio::time::timeout;
463
464    use super::*;
465
466    #[derive(Debug)]
467    struct NoopHandler;
468
469    #[async_trait::async_trait]
470    impl GrpcQueryHandlerWithBoxedError for NoopHandler {
471        async fn do_query(
472            &self,
473            _query: Request,
474            _ctx: QueryContextRef,
475        ) -> std::result::Result<Output, BoxedError> {
476            Ok(Output::new_with_affected_rows(0))
477        }
478    }
479
480    #[tokio::test]
481    async fn wait_initialized() {
482        let (client, handler_mut) =
483            FrontendClient::from_empty_grpc_handler(QueryOptions::default());
484
485        assert!(
486            timeout(Duration::from_millis(50), client.wait_initialized())
487                .await
488                .is_err()
489        );
490
491        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
492        handler_mut.set_handler(Arc::downgrade(&handler)).await;
493
494        timeout(Duration::from_secs(1), client.wait_initialized())
495            .await
496            .expect("wait_initialized should complete after handler is set");
497
498        timeout(Duration::from_millis(10), client.wait_initialized())
499            .await
500            .expect("wait_initialized should be a no-op once initialized");
501
502        let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
503        let client =
504            FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
505        assert!(
506            timeout(Duration::from_millis(10), client.wait_initialized())
507                .await
508                .is_ok()
509        );
510
511        let meta_client = Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend));
512        let client = FrontendClient::from_meta_client(
513            meta_client,
514            None,
515            QueryOptions::default(),
516            BatchingModeOptions::default(),
517        )
518        .unwrap();
519        assert!(
520            timeout(Duration::from_millis(10), client.wait_initialized())
521                .await
522                .is_ok()
523        );
524    }
525}