flow/batching_mode/
frontend_client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
16
17use std::collections::HashMap;
18use std::sync::{Arc, Weak};
19use std::time::SystemTime;
20
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{CreateTableExpr, QueryRequest};
24use client::{Client, Database};
25use common_error::ext::{BoxedError, ErrorExt};
26use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
27use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
28use common_meta::peer::Peer;
29use common_meta::rpc::store::RangeRequest;
30use common_query::Output;
31use common_telemetry::warn;
32use meta_client::client::MetaClient;
33use query::datafusion::QUERY_PARALLELISM_HINT;
34use query::options::QueryOptions;
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use session::hints::READ_PREFERENCE_HINT;
40use snafu::{OptionExt, ResultExt};
41
42use crate::batching_mode::BatchingModeOptions;
43use crate::error::{
44    ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu,
45    UnexpectedSnafu,
46};
47use crate::{Error, FlowAuthHeader};
48
49/// Just like [`GrpcQueryHandler`] but use BoxedError
50///
51/// basically just a specialized `GrpcQueryHandler<Error=BoxedError>`
52///
53/// this is only useful for flownode to
54/// invoke frontend Instance in standalone mode
55#[async_trait::async_trait]
56pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
57    async fn do_query(
58        &self,
59        query: Request,
60        ctx: QueryContextRef,
61    ) -> std::result::Result<Output, BoxedError>;
62}
63
64/// auto impl
65#[async_trait::async_trait]
66impl<
67        E: ErrorExt + Send + Sync + 'static,
68        T: GrpcQueryHandler<Error = E> + Send + Sync + 'static,
69    > GrpcQueryHandlerWithBoxedError for T
70{
71    async fn do_query(
72        &self,
73        query: Request,
74        ctx: QueryContextRef,
75    ) -> std::result::Result<Output, BoxedError> {
76        self.do_query(query, ctx).await.map_err(BoxedError::new)
77    }
78}
79
80type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
81
82/// A simple frontend client able to execute sql using grpc protocol
83///
84/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
85#[derive(Debug, Clone)]
86pub enum FrontendClient {
87    Distributed {
88        meta_client: Arc<MetaClient>,
89        chnl_mgr: ChannelManager,
90        auth: Option<FlowAuthHeader>,
91        query: QueryOptions,
92        batch_opts: BatchingModeOptions,
93    },
94    Standalone {
95        /// for the sake of simplicity still use grpc even in standalone mode
96        /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
97        database_client: HandlerMutable,
98        query: QueryOptions,
99    },
100}
101
102impl FrontendClient {
103    /// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
104    pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
105        let handler = Arc::new(std::sync::Mutex::new(None));
106        (
107            Self::Standalone {
108                database_client: handler.clone(),
109                query,
110            },
111            handler,
112        )
113    }
114
115    pub fn from_meta_client(
116        meta_client: Arc<MetaClient>,
117        auth: Option<FlowAuthHeader>,
118        query: QueryOptions,
119        batch_opts: BatchingModeOptions,
120    ) -> Result<Self, Error> {
121        common_telemetry::info!("Frontend client build with auth={:?}", auth);
122        Ok(Self::Distributed {
123            meta_client,
124            chnl_mgr: {
125                let cfg = ChannelConfig::new()
126                    .connect_timeout(batch_opts.grpc_conn_timeout)
127                    .timeout(batch_opts.query_timeout);
128                if let Some(tls) = &batch_opts.frontend_tls {
129                    let cfg = cfg.client_tls_config(tls.clone());
130                    ChannelManager::with_tls_config(cfg).context(InvalidClientConfigSnafu)?
131                } else {
132                    ChannelManager::with_config(cfg)
133                }
134            },
135            auth,
136            query,
137            batch_opts,
138        })
139    }
140
141    pub fn from_grpc_handler(
142        grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
143        query: QueryOptions,
144    ) -> Self {
145        Self::Standalone {
146            database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
147            query,
148        }
149    }
150}
151
152#[derive(Debug, Clone)]
153pub struct DatabaseWithPeer {
154    pub database: Database,
155    pub peer: Peer,
156}
157
158impl DatabaseWithPeer {
159    fn new(database: Database, peer: Peer) -> Self {
160        Self { database, peer }
161    }
162
163    /// Try sending a "SELECT 1" to the database
164    async fn try_select_one(&self) -> Result<(), Error> {
165        // notice here use `sql` for `SELECT 1` return 1 row
166        let _ = self
167            .database
168            .sql("SELECT 1")
169            .await
170            .with_context(|_| InvalidRequestSnafu {
171                context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
172            })?;
173        Ok(())
174    }
175}
176
177impl FrontendClient {
178    /// scan for available frontend from metadata
179    pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
180        let Self::Distributed { meta_client, .. } = self else {
181            return Ok(vec![]);
182        };
183        let cluster_client = meta_client
184            .cluster_client()
185            .map_err(BoxedError::new)
186            .context(ExternalSnafu)?;
187
188        let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
189        let req = RangeRequest::new().with_prefix(prefix);
190        let resp = cluster_client
191            .range(req)
192            .await
193            .map_err(BoxedError::new)
194            .context(ExternalSnafu)?;
195        let mut res = Vec::with_capacity(resp.kvs.len());
196        for kv in resp.kvs {
197            let key = NodeInfoKey::try_from(kv.key)
198                .map_err(BoxedError::new)
199                .context(ExternalSnafu)?;
200
201            let val = NodeInfo::try_from(kv.value)
202                .map_err(BoxedError::new)
203                .context(ExternalSnafu)?;
204            res.push((key, val));
205        }
206        Ok(res)
207    }
208
209    /// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
210    /// and is able to process query
211    async fn get_random_active_frontend(
212        &self,
213        catalog: &str,
214        schema: &str,
215    ) -> Result<DatabaseWithPeer, Error> {
216        let Self::Distributed {
217            meta_client: _,
218            chnl_mgr,
219            auth,
220            query: _,
221            batch_opts,
222        } = self
223        else {
224            return UnexpectedSnafu {
225                reason: "Expect distributed mode",
226            }
227            .fail();
228        };
229
230        let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
231        interval.tick().await;
232        for retry in 0..batch_opts.experimental_grpc_max_retries {
233            let mut frontends = self.scan_for_frontend().await?;
234            let now_in_ms = SystemTime::now()
235                .duration_since(SystemTime::UNIX_EPOCH)
236                .unwrap()
237                .as_millis() as i64;
238            // shuffle the frontends to avoid always pick the same one
239            frontends.shuffle(&mut rng());
240
241            // found node with maximum last_activity_ts
242            for (_, node_info) in frontends
243                .iter()
244                // filter out frontend that have been down for more than 1 min
245                .filter(|(_, node_info)| {
246                    node_info.last_activity_ts
247                        + batch_opts
248                            .experimental_frontend_activity_timeout
249                            .as_millis() as i64
250                        > now_in_ms
251                })
252            {
253                let addr = &node_info.peer.addr;
254                let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
255                let database = {
256                    let mut db = Database::new(catalog, schema, client);
257                    if let Some(auth) = auth {
258                        db.set_auth(auth.auth().clone());
259                    }
260                    db
261                };
262                let db = DatabaseWithPeer::new(database, node_info.peer.clone());
263                match db.try_select_one().await {
264                    Ok(_) => return Ok(db),
265                    Err(e) => {
266                        warn!(
267                            "Failed to connect to frontend {} on retry={}: \n{e:?}",
268                            addr, retry
269                        );
270                    }
271                }
272            }
273            // no available frontend
274            // sleep and retry
275            interval.tick().await;
276        }
277
278        NoAvailableFrontendSnafu {
279            timeout: batch_opts.grpc_conn_timeout,
280            context: "No available frontend found that is able to process query",
281        }
282        .fail()
283    }
284
285    pub async fn create(
286        &self,
287        create: CreateTableExpr,
288        catalog: &str,
289        schema: &str,
290    ) -> Result<u32, Error> {
291        self.handle(
292            Request::Ddl(api::v1::DdlRequest {
293                expr: Some(api::v1::ddl_request::Expr::CreateTable(create)),
294            }),
295            catalog,
296            schema,
297            &mut None,
298        )
299        .await
300    }
301
302    /// Execute a SQL statement on the frontend.
303    pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
304        match self {
305            FrontendClient::Distributed { .. } => {
306                let db = self.get_random_active_frontend(catalog, schema).await?;
307                db.database
308                    .sql(sql)
309                    .await
310                    .map_err(BoxedError::new)
311                    .context(ExternalSnafu)
312            }
313            FrontendClient::Standalone {
314                database_client, ..
315            } => {
316                let ctx = QueryContextBuilder::default()
317                    .current_catalog(catalog.to_string())
318                    .current_schema(schema.to_string())
319                    .build();
320                let ctx = Arc::new(ctx);
321                {
322                    let database_client = {
323                        database_client
324                            .lock()
325                            .map_err(|e| {
326                                UnexpectedSnafu {
327                                    reason: format!("Failed to lock database client: {e}"),
328                                }
329                                .build()
330                            })?
331                            .as_ref()
332                            .context(UnexpectedSnafu {
333                                reason: "Standalone's frontend instance is not set",
334                            })?
335                            .upgrade()
336                            .context(UnexpectedSnafu {
337                                reason: "Failed to upgrade database client",
338                            })?
339                    };
340                    let req = Request::Query(QueryRequest {
341                        query: Some(Query::Sql(sql.to_string())),
342                    });
343                    database_client
344                        .do_query(req, ctx)
345                        .await
346                        .map_err(BoxedError::new)
347                        .context(ExternalSnafu)
348                }
349            }
350        }
351    }
352
353    /// Handle a request to frontend
354    pub(crate) async fn handle(
355        &self,
356        req: api::v1::greptime_request::Request,
357        catalog: &str,
358        schema: &str,
359        peer_desc: &mut Option<PeerDesc>,
360    ) -> Result<u32, Error> {
361        match self {
362            FrontendClient::Distributed {
363                query, batch_opts, ..
364            } => {
365                let db = self.get_random_active_frontend(catalog, schema).await?;
366
367                *peer_desc = Some(PeerDesc::Dist {
368                    peer: db.peer.clone(),
369                });
370
371                db.database
372                    .handle_with_retry(
373                        req.clone(),
374                        batch_opts.experimental_grpc_max_retries,
375                        &[
376                            (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
377                            (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
378                        ],
379                    )
380                    .await
381                    .with_context(|_| InvalidRequestSnafu {
382                        context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
383                    })
384            }
385            FrontendClient::Standalone {
386                database_client,
387                query,
388            } => {
389                let ctx = QueryContextBuilder::default()
390                    .current_catalog(catalog.to_string())
391                    .current_schema(schema.to_string())
392                    .extensions(HashMap::from([(
393                        QUERY_PARALLELISM_HINT.to_string(),
394                        query.parallelism.to_string(),
395                    )]))
396                    .build();
397                let ctx = Arc::new(ctx);
398                {
399                    let database_client = {
400                        database_client
401                            .lock()
402                            .map_err(|e| {
403                                UnexpectedSnafu {
404                                    reason: format!("Failed to lock database client: {e}"),
405                                }
406                                .build()
407                            })?
408                            .as_ref()
409                            .context(UnexpectedSnafu {
410                                reason: "Standalone's frontend instance is not set",
411                            })?
412                            .upgrade()
413                            .context(UnexpectedSnafu {
414                                reason: "Failed to upgrade database client",
415                            })?
416                    };
417                    let resp: common_query::Output = database_client
418                        .do_query(req, ctx)
419                        .await
420                        .map_err(BoxedError::new)
421                        .context(ExternalSnafu)?;
422                    match resp.data {
423                        common_query::OutputData::AffectedRows(rows) => {
424                            Ok(rows.try_into().map_err(|_| {
425                                UnexpectedSnafu {
426                                    reason: format!("Failed to convert rows to u32: {}", rows),
427                                }
428                                .build()
429                            })?)
430                        }
431                        _ => UnexpectedSnafu {
432                            reason: "Unexpected output data",
433                        }
434                        .fail(),
435                    }
436                }
437            }
438        }
439    }
440}
441
442/// Describe a peer of frontend
443#[derive(Debug, Default)]
444pub(crate) enum PeerDesc {
445    /// Distributed mode's frontend peer address
446    Dist {
447        /// frontend peer address
448        peer: Peer,
449    },
450    /// Standalone mode
451    #[default]
452    Standalone,
453}
454
455impl std::fmt::Display for PeerDesc {
456    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457        match self {
458            PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
459            PeerDesc::Standalone => write!(f, "standalone"),
460        }
461    }
462}