flow/batching_mode/
frontend_client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
16
17use std::collections::HashMap;
18use std::sync::{Arc, Weak};
19use std::time::SystemTime;
20
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{CreateTableExpr, QueryRequest};
24use client::{Client, Database};
25use common_error::ext::{BoxedError, ErrorExt};
26use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
27use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
28use common_meta::peer::Peer;
29use common_meta::rpc::store::RangeRequest;
30use common_query::Output;
31use common_telemetry::warn;
32use meta_client::client::MetaClient;
33use query::datafusion::QUERY_PARALLELISM_HINT;
34use query::options::QueryOptions;
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use snafu::{OptionExt, ResultExt};
40
41use crate::batching_mode::BatchingModeOptions;
42use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
43use crate::{Error, FlowAuthHeader};
44
45/// Just like [`GrpcQueryHandler`] but use BoxedError
46///
47/// basically just a specialized `GrpcQueryHandler<Error=BoxedError>`
48///
49/// this is only useful for flownode to
50/// invoke frontend Instance in standalone mode
51#[async_trait::async_trait]
52pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
53    async fn do_query(
54        &self,
55        query: Request,
56        ctx: QueryContextRef,
57    ) -> std::result::Result<Output, BoxedError>;
58}
59
60/// auto impl
61#[async_trait::async_trait]
62impl<
63        E: ErrorExt + Send + Sync + 'static,
64        T: GrpcQueryHandler<Error = E> + Send + Sync + 'static,
65    > GrpcQueryHandlerWithBoxedError for T
66{
67    async fn do_query(
68        &self,
69        query: Request,
70        ctx: QueryContextRef,
71    ) -> std::result::Result<Output, BoxedError> {
72        self.do_query(query, ctx).await.map_err(BoxedError::new)
73    }
74}
75
76type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
77
78/// A simple frontend client able to execute sql using grpc protocol
79///
80/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
81#[derive(Debug, Clone)]
82pub enum FrontendClient {
83    Distributed {
84        meta_client: Arc<MetaClient>,
85        chnl_mgr: ChannelManager,
86        auth: Option<FlowAuthHeader>,
87        query: QueryOptions,
88        batch_opts: BatchingModeOptions,
89    },
90    Standalone {
91        /// for the sake of simplicity still use grpc even in standalone mode
92        /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
93        database_client: HandlerMutable,
94        query: QueryOptions,
95    },
96}
97
98impl FrontendClient {
99    /// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
100    pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
101        let handler = Arc::new(std::sync::Mutex::new(None));
102        (
103            Self::Standalone {
104                database_client: handler.clone(),
105                query,
106            },
107            handler,
108        )
109    }
110
111    pub fn from_meta_client(
112        meta_client: Arc<MetaClient>,
113        auth: Option<FlowAuthHeader>,
114        query: QueryOptions,
115        batch_opts: BatchingModeOptions,
116    ) -> Self {
117        common_telemetry::info!("Frontend client build with auth={:?}", auth);
118        Self::Distributed {
119            meta_client,
120            chnl_mgr: {
121                let cfg = ChannelConfig::new()
122                    .connect_timeout(batch_opts.grpc_conn_timeout)
123                    .timeout(batch_opts.query_timeout);
124                ChannelManager::with_config(cfg)
125            },
126            auth,
127            query,
128            batch_opts,
129        }
130    }
131
132    pub fn from_grpc_handler(
133        grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
134        query: QueryOptions,
135    ) -> Self {
136        Self::Standalone {
137            database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
138            query,
139        }
140    }
141}
142
143#[derive(Debug, Clone)]
144pub struct DatabaseWithPeer {
145    pub database: Database,
146    pub peer: Peer,
147}
148
149impl DatabaseWithPeer {
150    fn new(database: Database, peer: Peer) -> Self {
151        Self { database, peer }
152    }
153
154    /// Try sending a "SELECT 1" to the database
155    async fn try_select_one(&self) -> Result<(), Error> {
156        // notice here use `sql` for `SELECT 1` return 1 row
157        let _ = self
158            .database
159            .sql("SELECT 1")
160            .await
161            .with_context(|_| InvalidRequestSnafu {
162                context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
163            })?;
164        Ok(())
165    }
166}
167
168impl FrontendClient {
169    /// scan for available frontend from metadata
170    pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
171        let Self::Distributed { meta_client, .. } = self else {
172            return Ok(vec![]);
173        };
174        let cluster_client = meta_client
175            .cluster_client()
176            .map_err(BoxedError::new)
177            .context(ExternalSnafu)?;
178
179        let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
180        let req = RangeRequest::new().with_prefix(prefix);
181        let resp = cluster_client
182            .range(req)
183            .await
184            .map_err(BoxedError::new)
185            .context(ExternalSnafu)?;
186        let mut res = Vec::with_capacity(resp.kvs.len());
187        for kv in resp.kvs {
188            let key = NodeInfoKey::try_from(kv.key)
189                .map_err(BoxedError::new)
190                .context(ExternalSnafu)?;
191
192            let val = NodeInfo::try_from(kv.value)
193                .map_err(BoxedError::new)
194                .context(ExternalSnafu)?;
195            res.push((key, val));
196        }
197        Ok(res)
198    }
199
200    /// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
201    /// and is able to process query
202    async fn get_random_active_frontend(
203        &self,
204        catalog: &str,
205        schema: &str,
206    ) -> Result<DatabaseWithPeer, Error> {
207        let Self::Distributed {
208            meta_client: _,
209            chnl_mgr,
210            auth,
211            query: _,
212            batch_opts,
213        } = self
214        else {
215            return UnexpectedSnafu {
216                reason: "Expect distributed mode",
217            }
218            .fail();
219        };
220
221        let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
222        interval.tick().await;
223        for retry in 0..batch_opts.experimental_grpc_max_retries {
224            let mut frontends = self.scan_for_frontend().await?;
225            let now_in_ms = SystemTime::now()
226                .duration_since(SystemTime::UNIX_EPOCH)
227                .unwrap()
228                .as_millis() as i64;
229            // shuffle the frontends to avoid always pick the same one
230            frontends.shuffle(&mut rng());
231
232            // found node with maximum last_activity_ts
233            for (_, node_info) in frontends
234                .iter()
235                // filter out frontend that have been down for more than 1 min
236                .filter(|(_, node_info)| {
237                    node_info.last_activity_ts
238                        + batch_opts
239                            .experimental_frontend_activity_timeout
240                            .as_millis() as i64
241                        > now_in_ms
242                })
243            {
244                let addr = &node_info.peer.addr;
245                let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
246                let database = {
247                    let mut db = Database::new(catalog, schema, client);
248                    if let Some(auth) = auth {
249                        db.set_auth(auth.auth().clone());
250                    }
251                    db
252                };
253                let db = DatabaseWithPeer::new(database, node_info.peer.clone());
254                match db.try_select_one().await {
255                    Ok(_) => return Ok(db),
256                    Err(e) => {
257                        warn!(
258                            "Failed to connect to frontend {} on retry={}: \n{e:?}",
259                            addr, retry
260                        );
261                    }
262                }
263            }
264            // no available frontend
265            // sleep and retry
266            interval.tick().await;
267        }
268
269        NoAvailableFrontendSnafu {
270            timeout: batch_opts.grpc_conn_timeout,
271            context: "No available frontend found that is able to process query",
272        }
273        .fail()
274    }
275
276    pub async fn create(
277        &self,
278        create: CreateTableExpr,
279        catalog: &str,
280        schema: &str,
281    ) -> Result<u32, Error> {
282        self.handle(
283            Request::Ddl(api::v1::DdlRequest {
284                expr: Some(api::v1::ddl_request::Expr::CreateTable(create)),
285            }),
286            catalog,
287            schema,
288            &mut None,
289        )
290        .await
291    }
292
293    /// Execute a SQL statement on the frontend.
294    pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
295        match self {
296            FrontendClient::Distributed { .. } => {
297                let db = self.get_random_active_frontend(catalog, schema).await?;
298                db.database
299                    .sql(sql)
300                    .await
301                    .map_err(BoxedError::new)
302                    .context(ExternalSnafu)
303            }
304            FrontendClient::Standalone {
305                database_client, ..
306            } => {
307                let ctx = QueryContextBuilder::default()
308                    .current_catalog(catalog.to_string())
309                    .current_schema(schema.to_string())
310                    .build();
311                let ctx = Arc::new(ctx);
312                {
313                    let database_client = {
314                        database_client
315                            .lock()
316                            .map_err(|e| {
317                                UnexpectedSnafu {
318                                    reason: format!("Failed to lock database client: {e}"),
319                                }
320                                .build()
321                            })?
322                            .as_ref()
323                            .context(UnexpectedSnafu {
324                                reason: "Standalone's frontend instance is not set",
325                            })?
326                            .upgrade()
327                            .context(UnexpectedSnafu {
328                                reason: "Failed to upgrade database client",
329                            })?
330                    };
331                    let req = Request::Query(QueryRequest {
332                        query: Some(Query::Sql(sql.to_string())),
333                    });
334                    database_client
335                        .do_query(req, ctx)
336                        .await
337                        .map_err(BoxedError::new)
338                        .context(ExternalSnafu)
339                }
340            }
341        }
342    }
343
344    /// Handle a request to frontend
345    pub(crate) async fn handle(
346        &self,
347        req: api::v1::greptime_request::Request,
348        catalog: &str,
349        schema: &str,
350        peer_desc: &mut Option<PeerDesc>,
351    ) -> Result<u32, Error> {
352        match self {
353            FrontendClient::Distributed {
354                query, batch_opts, ..
355            } => {
356                let db = self.get_random_active_frontend(catalog, schema).await?;
357
358                *peer_desc = Some(PeerDesc::Dist {
359                    peer: db.peer.clone(),
360                });
361
362                db.database
363                    .handle_with_retry(
364                        req.clone(),
365                        batch_opts.experimental_grpc_max_retries,
366                        &[(QUERY_PARALLELISM_HINT, &query.parallelism.to_string())],
367                    )
368                    .await
369                    .with_context(|_| InvalidRequestSnafu {
370                        context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
371                    })
372            }
373            FrontendClient::Standalone {
374                database_client,
375                query,
376            } => {
377                let ctx = QueryContextBuilder::default()
378                    .current_catalog(catalog.to_string())
379                    .current_schema(schema.to_string())
380                    .extensions(HashMap::from([(
381                        QUERY_PARALLELISM_HINT.to_string(),
382                        query.parallelism.to_string(),
383                    )]))
384                    .build();
385                let ctx = Arc::new(ctx);
386                {
387                    let database_client = {
388                        database_client
389                            .lock()
390                            .map_err(|e| {
391                                UnexpectedSnafu {
392                                    reason: format!("Failed to lock database client: {e}"),
393                                }
394                                .build()
395                            })?
396                            .as_ref()
397                            .context(UnexpectedSnafu {
398                                reason: "Standalone's frontend instance is not set",
399                            })?
400                            .upgrade()
401                            .context(UnexpectedSnafu {
402                                reason: "Failed to upgrade database client",
403                            })?
404                    };
405                    let resp: common_query::Output = database_client
406                        .do_query(req, ctx)
407                        .await
408                        .map_err(BoxedError::new)
409                        .context(ExternalSnafu)?;
410                    match resp.data {
411                        common_query::OutputData::AffectedRows(rows) => {
412                            Ok(rows.try_into().map_err(|_| {
413                                UnexpectedSnafu {
414                                    reason: format!("Failed to convert rows to u32: {}", rows),
415                                }
416                                .build()
417                            })?)
418                        }
419                        _ => UnexpectedSnafu {
420                            reason: "Unexpected output data",
421                        }
422                        .fail(),
423                    }
424                }
425            }
426        }
427    }
428}
429
430/// Describe a peer of frontend
431#[derive(Debug, Default)]
432pub(crate) enum PeerDesc {
433    /// Distributed mode's frontend peer address
434    Dist {
435        /// frontend peer address
436        peer: Peer,
437    },
438    /// Standalone mode
439    #[default]
440    Standalone,
441}
442
443impl std::fmt::Display for PeerDesc {
444    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
445        match self {
446            PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
447            PeerDesc::Standalone => write!(f, "standalone"),
448        }
449    }
450}