flow/batching_mode/
frontend_client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
16
17use std::collections::HashMap;
18use std::sync::{Arc, Weak};
19use std::time::SystemTime;
20
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{CreateTableExpr, QueryRequest};
24use client::{Client, Database};
25use common_error::ext::{BoxedError, ErrorExt};
26use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
27use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
28use common_meta::peer::Peer;
29use common_meta::rpc::store::RangeRequest;
30use common_query::Output;
31use common_telemetry::warn;
32use meta_client::client::MetaClient;
33use query::datafusion::QUERY_PARALLELISM_HINT;
34use query::options::QueryOptions;
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use snafu::{OptionExt, ResultExt};
40
41use crate::batching_mode::{
42    DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
43    GRPC_MAX_RETRIES,
44};
45use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
46use crate::{Error, FlowAuthHeader};
47
48/// Just like [`GrpcQueryHandler`] but use BoxedError
49///
50/// basically just a specialized `GrpcQueryHandler<Error=BoxedError>`
51///
52/// this is only useful for flownode to
53/// invoke frontend Instance in standalone mode
54#[async_trait::async_trait]
55pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
56    async fn do_query(
57        &self,
58        query: Request,
59        ctx: QueryContextRef,
60    ) -> std::result::Result<Output, BoxedError>;
61}
62
63/// auto impl
64#[async_trait::async_trait]
65impl<
66        E: ErrorExt + Send + Sync + 'static,
67        T: GrpcQueryHandler<Error = E> + Send + Sync + 'static,
68    > GrpcQueryHandlerWithBoxedError for T
69{
70    async fn do_query(
71        &self,
72        query: Request,
73        ctx: QueryContextRef,
74    ) -> std::result::Result<Output, BoxedError> {
75        self.do_query(query, ctx).await.map_err(BoxedError::new)
76    }
77}
78
79type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
80
81/// A simple frontend client able to execute sql using grpc protocol
82///
83/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
84#[derive(Debug, Clone)]
85pub enum FrontendClient {
86    Distributed {
87        meta_client: Arc<MetaClient>,
88        chnl_mgr: ChannelManager,
89        auth: Option<FlowAuthHeader>,
90        query: QueryOptions,
91    },
92    Standalone {
93        /// for the sake of simplicity still use grpc even in standalone mode
94        /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
95        database_client: HandlerMutable,
96        query: QueryOptions,
97    },
98}
99
100impl FrontendClient {
101    /// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
102    pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
103        let handler = Arc::new(std::sync::Mutex::new(None));
104        (
105            Self::Standalone {
106                database_client: handler.clone(),
107                query,
108            },
109            handler,
110        )
111    }
112
113    pub fn from_meta_client(
114        meta_client: Arc<MetaClient>,
115        auth: Option<FlowAuthHeader>,
116        query: QueryOptions,
117    ) -> Self {
118        common_telemetry::info!("Frontend client build with auth={:?}", auth);
119        Self::Distributed {
120            meta_client,
121            chnl_mgr: {
122                let cfg = ChannelConfig::new()
123                    .connect_timeout(GRPC_CONN_TIMEOUT)
124                    .timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
125                ChannelManager::with_config(cfg)
126            },
127            auth,
128            query,
129        }
130    }
131
132    pub fn from_grpc_handler(
133        grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
134        query: QueryOptions,
135    ) -> Self {
136        Self::Standalone {
137            database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
138            query,
139        }
140    }
141}
142
143#[derive(Debug, Clone)]
144pub struct DatabaseWithPeer {
145    pub database: Database,
146    pub peer: Peer,
147}
148
149impl DatabaseWithPeer {
150    fn new(database: Database, peer: Peer) -> Self {
151        Self { database, peer }
152    }
153
154    /// Try sending a "SELECT 1" to the database
155    async fn try_select_one(&self) -> Result<(), Error> {
156        // notice here use `sql` for `SELECT 1` return 1 row
157        let _ = self
158            .database
159            .sql("SELECT 1")
160            .await
161            .with_context(|_| InvalidRequestSnafu {
162                context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
163            })?;
164        Ok(())
165    }
166}
167
168impl FrontendClient {
169    /// scan for available frontend from metadata
170    pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
171        let Self::Distributed { meta_client, .. } = self else {
172            return Ok(vec![]);
173        };
174        let cluster_client = meta_client
175            .cluster_client()
176            .map_err(BoxedError::new)
177            .context(ExternalSnafu)?;
178
179        let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
180        let req = RangeRequest::new().with_prefix(prefix);
181        let resp = cluster_client
182            .range(req)
183            .await
184            .map_err(BoxedError::new)
185            .context(ExternalSnafu)?;
186        let mut res = Vec::with_capacity(resp.kvs.len());
187        for kv in resp.kvs {
188            let key = NodeInfoKey::try_from(kv.key)
189                .map_err(BoxedError::new)
190                .context(ExternalSnafu)?;
191
192            let val = NodeInfo::try_from(kv.value)
193                .map_err(BoxedError::new)
194                .context(ExternalSnafu)?;
195            res.push((key, val));
196        }
197        Ok(res)
198    }
199
200    /// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
201    /// and is able to process query
202    async fn get_random_active_frontend(
203        &self,
204        catalog: &str,
205        schema: &str,
206    ) -> Result<DatabaseWithPeer, Error> {
207        let Self::Distributed {
208            meta_client: _,
209            chnl_mgr,
210            auth,
211            query: _,
212        } = self
213        else {
214            return UnexpectedSnafu {
215                reason: "Expect distributed mode",
216            }
217            .fail();
218        };
219
220        let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
221        interval.tick().await;
222        for retry in 0..GRPC_MAX_RETRIES {
223            let mut frontends = self.scan_for_frontend().await?;
224            let now_in_ms = SystemTime::now()
225                .duration_since(SystemTime::UNIX_EPOCH)
226                .unwrap()
227                .as_millis() as i64;
228            // shuffle the frontends to avoid always pick the same one
229            frontends.shuffle(&mut rng());
230
231            // found node with maximum last_activity_ts
232            for (_, node_info) in frontends
233                .iter()
234                // filter out frontend that have been down for more than 1 min
235                .filter(|(_, node_info)| {
236                    node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
237                        > now_in_ms
238                })
239            {
240                let addr = &node_info.peer.addr;
241                let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
242                let database = {
243                    let mut db = Database::new(catalog, schema, client);
244                    if let Some(auth) = auth {
245                        db.set_auth(auth.auth().clone());
246                    }
247                    db
248                };
249                let db = DatabaseWithPeer::new(database, node_info.peer.clone());
250                match db.try_select_one().await {
251                    Ok(_) => return Ok(db),
252                    Err(e) => {
253                        warn!(
254                            "Failed to connect to frontend {} on retry={}: \n{e:?}",
255                            addr, retry
256                        );
257                    }
258                }
259            }
260            // no available frontend
261            // sleep and retry
262            interval.tick().await;
263        }
264
265        NoAvailableFrontendSnafu {
266            timeout: GRPC_CONN_TIMEOUT,
267            context: "No available frontend found that is able to process query",
268        }
269        .fail()
270    }
271
272    pub async fn create(
273        &self,
274        create: CreateTableExpr,
275        catalog: &str,
276        schema: &str,
277    ) -> Result<u32, Error> {
278        self.handle(
279            Request::Ddl(api::v1::DdlRequest {
280                expr: Some(api::v1::ddl_request::Expr::CreateTable(create)),
281            }),
282            catalog,
283            schema,
284            &mut None,
285        )
286        .await
287    }
288
289    /// Execute a SQL statement on the frontend.
290    pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
291        match self {
292            FrontendClient::Distributed { .. } => {
293                let db = self.get_random_active_frontend(catalog, schema).await?;
294                db.database
295                    .sql(sql)
296                    .await
297                    .map_err(BoxedError::new)
298                    .context(ExternalSnafu)
299            }
300            FrontendClient::Standalone {
301                database_client, ..
302            } => {
303                let ctx = QueryContextBuilder::default()
304                    .current_catalog(catalog.to_string())
305                    .current_schema(schema.to_string())
306                    .build();
307                let ctx = Arc::new(ctx);
308                {
309                    let database_client = {
310                        database_client
311                            .lock()
312                            .map_err(|e| {
313                                UnexpectedSnafu {
314                                    reason: format!("Failed to lock database client: {e}"),
315                                }
316                                .build()
317                            })?
318                            .as_ref()
319                            .context(UnexpectedSnafu {
320                                reason: "Standalone's frontend instance is not set",
321                            })?
322                            .upgrade()
323                            .context(UnexpectedSnafu {
324                                reason: "Failed to upgrade database client",
325                            })?
326                    };
327                    let req = Request::Query(QueryRequest {
328                        query: Some(Query::Sql(sql.to_string())),
329                    });
330                    database_client
331                        .do_query(req, ctx)
332                        .await
333                        .map_err(BoxedError::new)
334                        .context(ExternalSnafu)
335                }
336            }
337        }
338    }
339
340    /// Handle a request to frontend
341    pub(crate) async fn handle(
342        &self,
343        req: api::v1::greptime_request::Request,
344        catalog: &str,
345        schema: &str,
346        peer_desc: &mut Option<PeerDesc>,
347    ) -> Result<u32, Error> {
348        match self {
349            FrontendClient::Distributed { query, .. } => {
350                let db = self.get_random_active_frontend(catalog, schema).await?;
351
352                *peer_desc = Some(PeerDesc::Dist {
353                    peer: db.peer.clone(),
354                });
355
356                db.database
357                    .handle_with_retry(
358                        req.clone(),
359                        GRPC_MAX_RETRIES,
360                        &[(QUERY_PARALLELISM_HINT, &query.parallelism.to_string())],
361                    )
362                    .await
363                    .with_context(|_| InvalidRequestSnafu {
364                        context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
365                    })
366            }
367            FrontendClient::Standalone {
368                database_client,
369                query,
370            } => {
371                let ctx = QueryContextBuilder::default()
372                    .current_catalog(catalog.to_string())
373                    .current_schema(schema.to_string())
374                    .extensions(HashMap::from([(
375                        QUERY_PARALLELISM_HINT.to_string(),
376                        query.parallelism.to_string(),
377                    )]))
378                    .build();
379                let ctx = Arc::new(ctx);
380                {
381                    let database_client = {
382                        database_client
383                            .lock()
384                            .map_err(|e| {
385                                UnexpectedSnafu {
386                                    reason: format!("Failed to lock database client: {e}"),
387                                }
388                                .build()
389                            })?
390                            .as_ref()
391                            .context(UnexpectedSnafu {
392                                reason: "Standalone's frontend instance is not set",
393                            })?
394                            .upgrade()
395                            .context(UnexpectedSnafu {
396                                reason: "Failed to upgrade database client",
397                            })?
398                    };
399                    let resp: common_query::Output = database_client
400                        .do_query(req, ctx)
401                        .await
402                        .map_err(BoxedError::new)
403                        .context(ExternalSnafu)?;
404                    match resp.data {
405                        common_query::OutputData::AffectedRows(rows) => {
406                            Ok(rows.try_into().map_err(|_| {
407                                UnexpectedSnafu {
408                                    reason: format!("Failed to convert rows to u32: {}", rows),
409                                }
410                                .build()
411                            })?)
412                        }
413                        _ => UnexpectedSnafu {
414                            reason: "Unexpected output data",
415                        }
416                        .fail(),
417                    }
418                }
419            }
420        }
421    }
422}
423
424/// Describe a peer of frontend
425#[derive(Debug, Default)]
426pub(crate) enum PeerDesc {
427    /// Distributed mode's frontend peer address
428    Dist {
429        /// frontend peer address
430        peer: Peer,
431    },
432    /// Standalone mode
433    #[default]
434    Standalone,
435}
436
437impl std::fmt::Display for PeerDesc {
438    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
439        match self {
440            PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
441            PeerDesc::Standalone => write!(f, "standalone"),
442        }
443    }
444}