flow/batching_mode/
frontend_client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
16
17use std::collections::HashMap;
18use std::sync::{Arc, Weak};
19use std::time::SystemTime;
20
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{CreateTableExpr, QueryRequest};
24use client::{Client, Database};
25use common_error::ext::{BoxedError, ErrorExt};
26use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_tls_config};
27use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
28use common_meta::peer::Peer;
29use common_meta::rpc::store::RangeRequest;
30use common_query::Output;
31use common_telemetry::warn;
32use meta_client::client::MetaClient;
33use query::datafusion::QUERY_PARALLELISM_HINT;
34use query::options::QueryOptions;
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use session::hints::READ_PREFERENCE_HINT;
40use snafu::{OptionExt, ResultExt};
41
42use crate::batching_mode::BatchingModeOptions;
43use crate::error::{
44    CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
45    NoAvailableFrontendSnafu, UnexpectedSnafu,
46};
47use crate::{Error, FlowAuthHeader};
48
49/// Just like [`GrpcQueryHandler`] but use BoxedError
50///
51/// basically just a specialized `GrpcQueryHandler<Error=BoxedError>`
52///
53/// this is only useful for flownode to
54/// invoke frontend Instance in standalone mode
55#[async_trait::async_trait]
56pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
57    async fn do_query(
58        &self,
59        query: Request,
60        ctx: QueryContextRef,
61    ) -> std::result::Result<Output, BoxedError>;
62}
63
64/// auto impl
65#[async_trait::async_trait]
66impl<E: ErrorExt + Send + Sync + 'static, T: GrpcQueryHandler<Error = E> + Send + Sync + 'static>
67    GrpcQueryHandlerWithBoxedError for T
68{
69    async fn do_query(
70        &self,
71        query: Request,
72        ctx: QueryContextRef,
73    ) -> std::result::Result<Output, BoxedError> {
74        self.do_query(query, ctx).await.map_err(BoxedError::new)
75    }
76}
77
78type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
79
80/// A simple frontend client able to execute sql using grpc protocol
81///
82/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
83#[derive(Debug, Clone)]
84pub enum FrontendClient {
85    Distributed {
86        meta_client: Arc<MetaClient>,
87        chnl_mgr: ChannelManager,
88        auth: Option<FlowAuthHeader>,
89        query: QueryOptions,
90        batch_opts: BatchingModeOptions,
91    },
92    Standalone {
93        /// for the sake of simplicity still use grpc even in standalone mode
94        /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
95        database_client: HandlerMutable,
96        query: QueryOptions,
97    },
98}
99
100impl FrontendClient {
101    /// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
102    pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
103        let handler = Arc::new(std::sync::Mutex::new(None));
104        (
105            Self::Standalone {
106                database_client: handler.clone(),
107                query,
108            },
109            handler,
110        )
111    }
112
113    pub fn from_meta_client(
114        meta_client: Arc<MetaClient>,
115        auth: Option<FlowAuthHeader>,
116        query: QueryOptions,
117        batch_opts: BatchingModeOptions,
118    ) -> Result<Self, Error> {
119        common_telemetry::info!("Frontend client build with auth={:?}", auth);
120        Ok(Self::Distributed {
121            meta_client,
122            chnl_mgr: {
123                let cfg = ChannelConfig::new()
124                    .connect_timeout(batch_opts.grpc_conn_timeout)
125                    .timeout(batch_opts.query_timeout);
126
127                let tls_config = load_tls_config(batch_opts.frontend_tls.as_ref())
128                    .context(InvalidClientConfigSnafu)?;
129                ChannelManager::with_config(cfg, tls_config)
130            },
131            auth,
132            query,
133            batch_opts,
134        })
135    }
136
137    pub fn from_grpc_handler(
138        grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
139        query: QueryOptions,
140    ) -> Self {
141        Self::Standalone {
142            database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
143            query,
144        }
145    }
146}
147
148#[derive(Debug, Clone)]
149pub struct DatabaseWithPeer {
150    pub database: Database,
151    pub peer: Peer,
152}
153
154impl DatabaseWithPeer {
155    fn new(database: Database, peer: Peer) -> Self {
156        Self { database, peer }
157    }
158
159    /// Try sending a "SELECT 1" to the database
160    async fn try_select_one(&self) -> Result<(), Error> {
161        // notice here use `sql` for `SELECT 1` return 1 row
162        let _ = self
163            .database
164            .sql("SELECT 1")
165            .await
166            .with_context(|_| InvalidRequestSnafu {
167                context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
168            })?;
169        Ok(())
170    }
171}
172
173impl FrontendClient {
174    /// scan for available frontend from metadata
175    pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
176        let Self::Distributed { meta_client, .. } = self else {
177            return Ok(vec![]);
178        };
179        let cluster_client = meta_client
180            .cluster_client()
181            .map_err(BoxedError::new)
182            .context(ExternalSnafu)?;
183
184        let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
185        let req = RangeRequest::new().with_prefix(prefix);
186        let resp = cluster_client
187            .range(req)
188            .await
189            .map_err(BoxedError::new)
190            .context(ExternalSnafu)?;
191        let mut res = Vec::with_capacity(resp.kvs.len());
192        for kv in resp.kvs {
193            let key = NodeInfoKey::try_from(kv.key)
194                .map_err(BoxedError::new)
195                .context(ExternalSnafu)?;
196
197            let val = NodeInfo::try_from(kv.value)
198                .map_err(BoxedError::new)
199                .context(ExternalSnafu)?;
200            res.push((key, val));
201        }
202        Ok(res)
203    }
204
205    /// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
206    /// and is able to process query
207    async fn get_random_active_frontend(
208        &self,
209        catalog: &str,
210        schema: &str,
211    ) -> Result<DatabaseWithPeer, Error> {
212        let Self::Distributed {
213            meta_client: _,
214            chnl_mgr,
215            auth,
216            query: _,
217            batch_opts,
218        } = self
219        else {
220            return UnexpectedSnafu {
221                reason: "Expect distributed mode",
222            }
223            .fail();
224        };
225
226        let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
227        interval.tick().await;
228        for retry in 0..batch_opts.experimental_grpc_max_retries {
229            let mut frontends = self.scan_for_frontend().await?;
230            let now_in_ms = SystemTime::now()
231                .duration_since(SystemTime::UNIX_EPOCH)
232                .unwrap()
233                .as_millis() as i64;
234            // shuffle the frontends to avoid always pick the same one
235            frontends.shuffle(&mut rng());
236
237            // found node with maximum last_activity_ts
238            for (_, node_info) in frontends
239                .iter()
240                // filter out frontend that have been down for more than 1 min
241                .filter(|(_, node_info)| {
242                    node_info.last_activity_ts
243                        + batch_opts
244                            .experimental_frontend_activity_timeout
245                            .as_millis() as i64
246                        > now_in_ms
247                })
248            {
249                let addr = &node_info.peer.addr;
250                let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
251                let database = {
252                    let mut db = Database::new(catalog, schema, client);
253                    if let Some(auth) = auth {
254                        db.set_auth(auth.auth().clone());
255                    }
256                    db
257                };
258                let db = DatabaseWithPeer::new(database, node_info.peer.clone());
259                match db.try_select_one().await {
260                    Ok(_) => return Ok(db),
261                    Err(e) => {
262                        warn!(
263                            "Failed to connect to frontend {} on retry={}: \n{e:?}",
264                            addr, retry
265                        );
266                    }
267                }
268            }
269            // no available frontend
270            // sleep and retry
271            interval.tick().await;
272        }
273
274        NoAvailableFrontendSnafu {
275            timeout: batch_opts.grpc_conn_timeout,
276            context: "No available frontend found that is able to process query",
277        }
278        .fail()
279    }
280
281    pub async fn create(
282        &self,
283        create: CreateTableExpr,
284        catalog: &str,
285        schema: &str,
286    ) -> Result<u32, Error> {
287        self.handle(
288            Request::Ddl(api::v1::DdlRequest {
289                expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())),
290            }),
291            catalog,
292            schema,
293            &mut None,
294        )
295        .await
296        .map_err(BoxedError::new)
297        .with_context(|_| CreateSinkTableSnafu {
298            create: create.clone(),
299        })
300    }
301
302    /// Execute a SQL statement on the frontend.
303    pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
304        match self {
305            FrontendClient::Distributed { .. } => {
306                let db = self.get_random_active_frontend(catalog, schema).await?;
307                db.database
308                    .sql(sql)
309                    .await
310                    .map_err(BoxedError::new)
311                    .context(ExternalSnafu)
312            }
313            FrontendClient::Standalone {
314                database_client, ..
315            } => {
316                let ctx = QueryContextBuilder::default()
317                    .current_catalog(catalog.to_string())
318                    .current_schema(schema.to_string())
319                    .build();
320                let ctx = Arc::new(ctx);
321                {
322                    let database_client = {
323                        database_client
324                            .lock()
325                            .map_err(|e| {
326                                UnexpectedSnafu {
327                                    reason: format!("Failed to lock database client: {e}"),
328                                }
329                                .build()
330                            })?
331                            .as_ref()
332                            .context(UnexpectedSnafu {
333                                reason: "Standalone's frontend instance is not set",
334                            })?
335                            .upgrade()
336                            .context(UnexpectedSnafu {
337                                reason: "Failed to upgrade database client",
338                            })?
339                    };
340                    let req = Request::Query(QueryRequest {
341                        query: Some(Query::Sql(sql.to_string())),
342                    });
343                    database_client
344                        .do_query(req, ctx)
345                        .await
346                        .map_err(BoxedError::new)
347                        .context(ExternalSnafu)
348                }
349            }
350        }
351    }
352
353    /// Handle a request to frontend
354    pub(crate) async fn handle(
355        &self,
356        req: api::v1::greptime_request::Request,
357        catalog: &str,
358        schema: &str,
359        peer_desc: &mut Option<PeerDesc>,
360    ) -> Result<u32, Error> {
361        match self {
362            FrontendClient::Distributed {
363                query, batch_opts, ..
364            } => {
365                let db = self.get_random_active_frontend(catalog, schema).await?;
366
367                *peer_desc = Some(PeerDesc::Dist {
368                    peer: db.peer.clone(),
369                });
370
371                db.database
372                    .handle_with_retry(
373                        req.clone(),
374                        batch_opts.experimental_grpc_max_retries,
375                        &[
376                            (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
377                            (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
378                        ],
379                    )
380                    .await
381                    .with_context(|_| InvalidRequestSnafu {
382                        context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
383                    })
384            }
385            FrontendClient::Standalone {
386                database_client,
387                query,
388            } => {
389                let ctx = QueryContextBuilder::default()
390                    .current_catalog(catalog.to_string())
391                    .current_schema(schema.to_string())
392                    .extensions(HashMap::from([(
393                        QUERY_PARALLELISM_HINT.to_string(),
394                        query.parallelism.to_string(),
395                    )]))
396                    .build();
397                let ctx = Arc::new(ctx);
398                {
399                    let database_client = {
400                        database_client
401                            .lock()
402                            .map_err(|e| {
403                                UnexpectedSnafu {
404                                    reason: format!("Failed to lock database client: {e}"),
405                                }
406                                .build()
407                            })?
408                            .as_ref()
409                            .context(UnexpectedSnafu {
410                                reason: "Standalone's frontend instance is not set",
411                            })?
412                            .upgrade()
413                            .context(UnexpectedSnafu {
414                                reason: "Failed to upgrade database client",
415                            })?
416                    };
417                    let resp: common_query::Output = database_client
418                        .do_query(req, ctx)
419                        .await
420                        .map_err(BoxedError::new)
421                        .context(ExternalSnafu)?;
422                    match resp.data {
423                        common_query::OutputData::AffectedRows(rows) => {
424                            Ok(rows.try_into().map_err(|_| {
425                                UnexpectedSnafu {
426                                    reason: format!("Failed to convert rows to u32: {}", rows),
427                                }
428                                .build()
429                            })?)
430                        }
431                        _ => UnexpectedSnafu {
432                            reason: "Unexpected output data",
433                        }
434                        .fail(),
435                    }
436                }
437            }
438        }
439    }
440}
441
442/// Describe a peer of frontend
443#[derive(Debug, Default)]
444pub(crate) enum PeerDesc {
445    /// Distributed mode's frontend peer address
446    Dist {
447        /// frontend peer address
448        peer: Peer,
449    },
450    /// Standalone mode
451    #[default]
452    Standalone,
453}
454
455impl std::fmt::Display for PeerDesc {
456    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457        match self {
458            PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
459            PeerDesc::Standalone => write!(f, "standalone"),
460        }
461    }
462}