flow/batching_mode/
frontend_client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
16
17use std::collections::HashMap;
18use std::sync::{Arc, Weak};
19use std::time::SystemTime;
20
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{CreateTableExpr, QueryRequest};
24use client::{Client, Database};
25use common_error::ext::{BoxedError, ErrorExt};
26use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
27use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
28use common_meta::peer::Peer;
29use common_meta::rpc::store::RangeRequest;
30use common_query::Output;
31use common_telemetry::warn;
32use meta_client::client::MetaClient;
33use query::datafusion::QUERY_PARALLELISM_HINT;
34use query::options::QueryOptions;
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use session::hints::READ_PREFERENCE_HINT;
40use snafu::{OptionExt, ResultExt};
41
42use crate::batching_mode::BatchingModeOptions;
43use crate::error::{
44    CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
45    NoAvailableFrontendSnafu, UnexpectedSnafu,
46};
47use crate::{Error, FlowAuthHeader};
48
49/// Just like [`GrpcQueryHandler`] but use BoxedError
50///
51/// basically just a specialized `GrpcQueryHandler<Error=BoxedError>`
52///
53/// this is only useful for flownode to
54/// invoke frontend Instance in standalone mode
55#[async_trait::async_trait]
56pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
57    async fn do_query(
58        &self,
59        query: Request,
60        ctx: QueryContextRef,
61    ) -> std::result::Result<Output, BoxedError>;
62}
63
64/// auto impl
65#[async_trait::async_trait]
66impl<E: ErrorExt + Send + Sync + 'static, T: GrpcQueryHandler<Error = E> + Send + Sync + 'static>
67    GrpcQueryHandlerWithBoxedError for T
68{
69    async fn do_query(
70        &self,
71        query: Request,
72        ctx: QueryContextRef,
73    ) -> std::result::Result<Output, BoxedError> {
74        self.do_query(query, ctx).await.map_err(BoxedError::new)
75    }
76}
77
78type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
79
80/// A simple frontend client able to execute sql using grpc protocol
81///
82/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
83#[derive(Debug, Clone)]
84pub enum FrontendClient {
85    Distributed {
86        meta_client: Arc<MetaClient>,
87        chnl_mgr: ChannelManager,
88        auth: Option<FlowAuthHeader>,
89        query: QueryOptions,
90        batch_opts: BatchingModeOptions,
91    },
92    Standalone {
93        /// for the sake of simplicity still use grpc even in standalone mode
94        /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
95        database_client: HandlerMutable,
96        query: QueryOptions,
97    },
98}
99
100impl FrontendClient {
101    /// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
102    pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
103        let handler = Arc::new(std::sync::Mutex::new(None));
104        (
105            Self::Standalone {
106                database_client: handler.clone(),
107                query,
108            },
109            handler,
110        )
111    }
112
113    pub fn from_meta_client(
114        meta_client: Arc<MetaClient>,
115        auth: Option<FlowAuthHeader>,
116        query: QueryOptions,
117        batch_opts: BatchingModeOptions,
118    ) -> Result<Self, Error> {
119        common_telemetry::info!("Frontend client build with auth={:?}", auth);
120        Ok(Self::Distributed {
121            meta_client,
122            chnl_mgr: {
123                let cfg = ChannelConfig::new()
124                    .connect_timeout(batch_opts.grpc_conn_timeout)
125                    .timeout(batch_opts.query_timeout);
126                if let Some(tls) = &batch_opts.frontend_tls {
127                    let cfg = cfg.client_tls_config(tls.clone());
128                    ChannelManager::with_tls_config(cfg).context(InvalidClientConfigSnafu)?
129                } else {
130                    ChannelManager::with_config(cfg)
131                }
132            },
133            auth,
134            query,
135            batch_opts,
136        })
137    }
138
139    pub fn from_grpc_handler(
140        grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
141        query: QueryOptions,
142    ) -> Self {
143        Self::Standalone {
144            database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
145            query,
146        }
147    }
148}
149
150#[derive(Debug, Clone)]
151pub struct DatabaseWithPeer {
152    pub database: Database,
153    pub peer: Peer,
154}
155
156impl DatabaseWithPeer {
157    fn new(database: Database, peer: Peer) -> Self {
158        Self { database, peer }
159    }
160
161    /// Try sending a "SELECT 1" to the database
162    async fn try_select_one(&self) -> Result<(), Error> {
163        // notice here use `sql` for `SELECT 1` return 1 row
164        let _ = self
165            .database
166            .sql("SELECT 1")
167            .await
168            .with_context(|_| InvalidRequestSnafu {
169                context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
170            })?;
171        Ok(())
172    }
173}
174
175impl FrontendClient {
176    /// scan for available frontend from metadata
177    pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
178        let Self::Distributed { meta_client, .. } = self else {
179            return Ok(vec![]);
180        };
181        let cluster_client = meta_client
182            .cluster_client()
183            .map_err(BoxedError::new)
184            .context(ExternalSnafu)?;
185
186        let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
187        let req = RangeRequest::new().with_prefix(prefix);
188        let resp = cluster_client
189            .range(req)
190            .await
191            .map_err(BoxedError::new)
192            .context(ExternalSnafu)?;
193        let mut res = Vec::with_capacity(resp.kvs.len());
194        for kv in resp.kvs {
195            let key = NodeInfoKey::try_from(kv.key)
196                .map_err(BoxedError::new)
197                .context(ExternalSnafu)?;
198
199            let val = NodeInfo::try_from(kv.value)
200                .map_err(BoxedError::new)
201                .context(ExternalSnafu)?;
202            res.push((key, val));
203        }
204        Ok(res)
205    }
206
207    /// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
208    /// and is able to process query
209    async fn get_random_active_frontend(
210        &self,
211        catalog: &str,
212        schema: &str,
213    ) -> Result<DatabaseWithPeer, Error> {
214        let Self::Distributed {
215            meta_client: _,
216            chnl_mgr,
217            auth,
218            query: _,
219            batch_opts,
220        } = self
221        else {
222            return UnexpectedSnafu {
223                reason: "Expect distributed mode",
224            }
225            .fail();
226        };
227
228        let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
229        interval.tick().await;
230        for retry in 0..batch_opts.experimental_grpc_max_retries {
231            let mut frontends = self.scan_for_frontend().await?;
232            let now_in_ms = SystemTime::now()
233                .duration_since(SystemTime::UNIX_EPOCH)
234                .unwrap()
235                .as_millis() as i64;
236            // shuffle the frontends to avoid always pick the same one
237            frontends.shuffle(&mut rng());
238
239            // found node with maximum last_activity_ts
240            for (_, node_info) in frontends
241                .iter()
242                // filter out frontend that have been down for more than 1 min
243                .filter(|(_, node_info)| {
244                    node_info.last_activity_ts
245                        + batch_opts
246                            .experimental_frontend_activity_timeout
247                            .as_millis() as i64
248                        > now_in_ms
249                })
250            {
251                let addr = &node_info.peer.addr;
252                let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
253                let database = {
254                    let mut db = Database::new(catalog, schema, client);
255                    if let Some(auth) = auth {
256                        db.set_auth(auth.auth().clone());
257                    }
258                    db
259                };
260                let db = DatabaseWithPeer::new(database, node_info.peer.clone());
261                match db.try_select_one().await {
262                    Ok(_) => return Ok(db),
263                    Err(e) => {
264                        warn!(
265                            "Failed to connect to frontend {} on retry={}: \n{e:?}",
266                            addr, retry
267                        );
268                    }
269                }
270            }
271            // no available frontend
272            // sleep and retry
273            interval.tick().await;
274        }
275
276        NoAvailableFrontendSnafu {
277            timeout: batch_opts.grpc_conn_timeout,
278            context: "No available frontend found that is able to process query",
279        }
280        .fail()
281    }
282
283    pub async fn create(
284        &self,
285        create: CreateTableExpr,
286        catalog: &str,
287        schema: &str,
288    ) -> Result<u32, Error> {
289        self.handle(
290            Request::Ddl(api::v1::DdlRequest {
291                expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())),
292            }),
293            catalog,
294            schema,
295            &mut None,
296        )
297        .await
298        .map_err(BoxedError::new)
299        .with_context(|_| CreateSinkTableSnafu {
300            create: create.clone(),
301        })
302    }
303
304    /// Execute a SQL statement on the frontend.
305    pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
306        match self {
307            FrontendClient::Distributed { .. } => {
308                let db = self.get_random_active_frontend(catalog, schema).await?;
309                db.database
310                    .sql(sql)
311                    .await
312                    .map_err(BoxedError::new)
313                    .context(ExternalSnafu)
314            }
315            FrontendClient::Standalone {
316                database_client, ..
317            } => {
318                let ctx = QueryContextBuilder::default()
319                    .current_catalog(catalog.to_string())
320                    .current_schema(schema.to_string())
321                    .build();
322                let ctx = Arc::new(ctx);
323                {
324                    let database_client = {
325                        database_client
326                            .lock()
327                            .map_err(|e| {
328                                UnexpectedSnafu {
329                                    reason: format!("Failed to lock database client: {e}"),
330                                }
331                                .build()
332                            })?
333                            .as_ref()
334                            .context(UnexpectedSnafu {
335                                reason: "Standalone's frontend instance is not set",
336                            })?
337                            .upgrade()
338                            .context(UnexpectedSnafu {
339                                reason: "Failed to upgrade database client",
340                            })?
341                    };
342                    let req = Request::Query(QueryRequest {
343                        query: Some(Query::Sql(sql.to_string())),
344                    });
345                    database_client
346                        .do_query(req, ctx)
347                        .await
348                        .map_err(BoxedError::new)
349                        .context(ExternalSnafu)
350                }
351            }
352        }
353    }
354
355    /// Handle a request to frontend
356    pub(crate) async fn handle(
357        &self,
358        req: api::v1::greptime_request::Request,
359        catalog: &str,
360        schema: &str,
361        peer_desc: &mut Option<PeerDesc>,
362    ) -> Result<u32, Error> {
363        match self {
364            FrontendClient::Distributed {
365                query, batch_opts, ..
366            } => {
367                let db = self.get_random_active_frontend(catalog, schema).await?;
368
369                *peer_desc = Some(PeerDesc::Dist {
370                    peer: db.peer.clone(),
371                });
372
373                db.database
374                    .handle_with_retry(
375                        req.clone(),
376                        batch_opts.experimental_grpc_max_retries,
377                        &[
378                            (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
379                            (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
380                        ],
381                    )
382                    .await
383                    .with_context(|_| InvalidRequestSnafu {
384                        context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
385                    })
386            }
387            FrontendClient::Standalone {
388                database_client,
389                query,
390            } => {
391                let ctx = QueryContextBuilder::default()
392                    .current_catalog(catalog.to_string())
393                    .current_schema(schema.to_string())
394                    .extensions(HashMap::from([(
395                        QUERY_PARALLELISM_HINT.to_string(),
396                        query.parallelism.to_string(),
397                    )]))
398                    .build();
399                let ctx = Arc::new(ctx);
400                {
401                    let database_client = {
402                        database_client
403                            .lock()
404                            .map_err(|e| {
405                                UnexpectedSnafu {
406                                    reason: format!("Failed to lock database client: {e}"),
407                                }
408                                .build()
409                            })?
410                            .as_ref()
411                            .context(UnexpectedSnafu {
412                                reason: "Standalone's frontend instance is not set",
413                            })?
414                            .upgrade()
415                            .context(UnexpectedSnafu {
416                                reason: "Failed to upgrade database client",
417                            })?
418                    };
419                    let resp: common_query::Output = database_client
420                        .do_query(req, ctx)
421                        .await
422                        .map_err(BoxedError::new)
423                        .context(ExternalSnafu)?;
424                    match resp.data {
425                        common_query::OutputData::AffectedRows(rows) => {
426                            Ok(rows.try_into().map_err(|_| {
427                                UnexpectedSnafu {
428                                    reason: format!("Failed to convert rows to u32: {}", rows),
429                                }
430                                .build()
431                            })?)
432                        }
433                        _ => UnexpectedSnafu {
434                            reason: "Unexpected output data",
435                        }
436                        .fail(),
437                    }
438                }
439            }
440        }
441    }
442}
443
444/// Describe a peer of frontend
445#[derive(Debug, Default)]
446pub(crate) enum PeerDesc {
447    /// Distributed mode's frontend peer address
448    Dist {
449        /// frontend peer address
450        peer: Peer,
451    },
452    /// Standalone mode
453    #[default]
454    Standalone,
455}
456
457impl std::fmt::Display for PeerDesc {
458    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
459        match self {
460            PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
461            PeerDesc::Standalone => write!(f, "standalone"),
462        }
463    }
464}