flow/batching_mode/
frontend_client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
16
17use std::sync::{Arc, Weak};
18use std::time::SystemTime;
19
20use api::v1::greptime_request::Request;
21use api::v1::CreateTableExpr;
22use client::{Client, Database};
23use common_error::ext::{BoxedError, ErrorExt};
24use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
25use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
26use common_meta::peer::Peer;
27use common_meta::rpc::store::RangeRequest;
28use common_query::Output;
29use common_telemetry::warn;
30use meta_client::client::MetaClient;
31use rand::rng;
32use rand::seq::SliceRandom;
33use servers::query_handler::grpc::GrpcQueryHandler;
34use session::context::{QueryContextBuilder, QueryContextRef};
35use snafu::{OptionExt, ResultExt};
36
37use crate::batching_mode::{
38    DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
39    GRPC_MAX_RETRIES,
40};
41use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
42use crate::{Error, FlowAuthHeader};
43
44/// Just like [`GrpcQueryHandler`] but use BoxedError
45///
46/// basically just a specialized `GrpcQueryHandler<Error=BoxedError>`
47///
48/// this is only useful for flownode to
49/// invoke frontend Instance in standalone mode
50#[async_trait::async_trait]
51pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
52    async fn do_query(
53        &self,
54        query: Request,
55        ctx: QueryContextRef,
56    ) -> std::result::Result<Output, BoxedError>;
57}
58
59/// auto impl
60#[async_trait::async_trait]
61impl<
62        E: ErrorExt + Send + Sync + 'static,
63        T: GrpcQueryHandler<Error = E> + Send + Sync + 'static,
64    > GrpcQueryHandlerWithBoxedError for T
65{
66    async fn do_query(
67        &self,
68        query: Request,
69        ctx: QueryContextRef,
70    ) -> std::result::Result<Output, BoxedError> {
71        self.do_query(query, ctx).await.map_err(BoxedError::new)
72    }
73}
74
75type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
76
77/// A simple frontend client able to execute sql using grpc protocol
78///
79/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
80#[derive(Debug, Clone)]
81pub enum FrontendClient {
82    Distributed {
83        meta_client: Arc<MetaClient>,
84        chnl_mgr: ChannelManager,
85        auth: Option<FlowAuthHeader>,
86    },
87    Standalone {
88        /// for the sake of simplicity still use grpc even in standalone mode
89        /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
90        database_client: HandlerMutable,
91    },
92}
93
94impl FrontendClient {
95    /// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
96    pub fn from_empty_grpc_handler() -> (Self, HandlerMutable) {
97        let handler = Arc::new(std::sync::Mutex::new(None));
98        (
99            Self::Standalone {
100                database_client: handler.clone(),
101            },
102            handler,
103        )
104    }
105
106    pub fn from_meta_client(meta_client: Arc<MetaClient>, auth: Option<FlowAuthHeader>) -> Self {
107        common_telemetry::info!("Frontend client build with auth={:?}", auth);
108        Self::Distributed {
109            meta_client,
110            chnl_mgr: {
111                let cfg = ChannelConfig::new()
112                    .connect_timeout(GRPC_CONN_TIMEOUT)
113                    .timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
114                ChannelManager::with_config(cfg)
115            },
116            auth,
117        }
118    }
119
120    pub fn from_grpc_handler(grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) -> Self {
121        Self::Standalone {
122            database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
123        }
124    }
125}
126
127#[derive(Debug, Clone)]
128pub struct DatabaseWithPeer {
129    pub database: Database,
130    pub peer: Peer,
131}
132
133impl DatabaseWithPeer {
134    fn new(database: Database, peer: Peer) -> Self {
135        Self { database, peer }
136    }
137
138    /// Try sending a "SELECT 1" to the database
139    async fn try_select_one(&self) -> Result<(), Error> {
140        // notice here use `sql` for `SELECT 1` return 1 row
141        let _ = self
142            .database
143            .sql("SELECT 1")
144            .await
145            .with_context(|_| InvalidRequestSnafu {
146                context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
147            })?;
148        Ok(())
149    }
150}
151
152impl FrontendClient {
153    /// scan for available frontend from metadata
154    pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
155        let Self::Distributed { meta_client, .. } = self else {
156            return Ok(vec![]);
157        };
158        let cluster_client = meta_client
159            .cluster_client()
160            .map_err(BoxedError::new)
161            .context(ExternalSnafu)?;
162
163        let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
164        let req = RangeRequest::new().with_prefix(prefix);
165        let resp = cluster_client
166            .range(req)
167            .await
168            .map_err(BoxedError::new)
169            .context(ExternalSnafu)?;
170        let mut res = Vec::with_capacity(resp.kvs.len());
171        for kv in resp.kvs {
172            let key = NodeInfoKey::try_from(kv.key)
173                .map_err(BoxedError::new)
174                .context(ExternalSnafu)?;
175
176            let val = NodeInfo::try_from(kv.value)
177                .map_err(BoxedError::new)
178                .context(ExternalSnafu)?;
179            res.push((key, val));
180        }
181        Ok(res)
182    }
183
184    /// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
185    /// and is able to process query
186    async fn get_random_active_frontend(
187        &self,
188        catalog: &str,
189        schema: &str,
190    ) -> Result<DatabaseWithPeer, Error> {
191        let Self::Distributed {
192            meta_client: _,
193            chnl_mgr,
194            auth,
195        } = self
196        else {
197            return UnexpectedSnafu {
198                reason: "Expect distributed mode",
199            }
200            .fail();
201        };
202
203        let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
204        interval.tick().await;
205        for retry in 0..GRPC_MAX_RETRIES {
206            let mut frontends = self.scan_for_frontend().await?;
207            let now_in_ms = SystemTime::now()
208                .duration_since(SystemTime::UNIX_EPOCH)
209                .unwrap()
210                .as_millis() as i64;
211            // shuffle the frontends to avoid always pick the same one
212            frontends.shuffle(&mut rng());
213
214            // found node with maximum last_activity_ts
215            for (_, node_info) in frontends
216                .iter()
217                // filter out frontend that have been down for more than 1 min
218                .filter(|(_, node_info)| {
219                    node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
220                        > now_in_ms
221                })
222            {
223                let addr = &node_info.peer.addr;
224                let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
225                let database = {
226                    let mut db = Database::new(catalog, schema, client);
227                    if let Some(auth) = auth {
228                        db.set_auth(auth.auth().clone());
229                    }
230                    db
231                };
232                let db = DatabaseWithPeer::new(database, node_info.peer.clone());
233                match db.try_select_one().await {
234                    Ok(_) => return Ok(db),
235                    Err(e) => {
236                        warn!(
237                            "Failed to connect to frontend {} on retry={}: \n{e:?}",
238                            addr, retry
239                        );
240                    }
241                }
242            }
243            // no available frontend
244            // sleep and retry
245            interval.tick().await;
246        }
247
248        NoAvailableFrontendSnafu {
249            timeout: GRPC_CONN_TIMEOUT,
250            context: "No available frontend found that is able to process query",
251        }
252        .fail()
253    }
254
255    pub async fn create(
256        &self,
257        create: CreateTableExpr,
258        catalog: &str,
259        schema: &str,
260    ) -> Result<u32, Error> {
261        self.handle(
262            Request::Ddl(api::v1::DdlRequest {
263                expr: Some(api::v1::ddl_request::Expr::CreateTable(create)),
264            }),
265            catalog,
266            schema,
267            &mut None,
268        )
269        .await
270    }
271
272    /// Handle a request to frontend
273    pub(crate) async fn handle(
274        &self,
275        req: api::v1::greptime_request::Request,
276        catalog: &str,
277        schema: &str,
278        peer_desc: &mut Option<PeerDesc>,
279    ) -> Result<u32, Error> {
280        match self {
281            FrontendClient::Distributed { .. } => {
282                let db = self.get_random_active_frontend(catalog, schema).await?;
283
284                *peer_desc = Some(PeerDesc::Dist {
285                    peer: db.peer.clone(),
286                });
287
288                db.database
289                    .handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
290                    .await
291                    .with_context(|_| InvalidRequestSnafu {
292                        context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
293                    })
294            }
295            FrontendClient::Standalone { database_client } => {
296                let ctx = QueryContextBuilder::default()
297                    .current_catalog(catalog.to_string())
298                    .current_schema(schema.to_string())
299                    .build();
300                let ctx = Arc::new(ctx);
301                {
302                    let database_client = {
303                        database_client
304                            .lock()
305                            .map_err(|e| {
306                                UnexpectedSnafu {
307                                    reason: format!("Failed to lock database client: {e}"),
308                                }
309                                .build()
310                            })?
311                            .as_ref()
312                            .context(UnexpectedSnafu {
313                                reason: "Standalone's frontend instance is not set",
314                            })?
315                            .upgrade()
316                            .context(UnexpectedSnafu {
317                                reason: "Failed to upgrade database client",
318                            })?
319                    };
320                    let resp: common_query::Output = database_client
321                        .do_query(req.clone(), ctx)
322                        .await
323                        .map_err(BoxedError::new)
324                        .context(ExternalSnafu)?;
325                    match resp.data {
326                        common_query::OutputData::AffectedRows(rows) => {
327                            Ok(rows.try_into().map_err(|_| {
328                                UnexpectedSnafu {
329                                    reason: format!("Failed to convert rows to u32: {}", rows),
330                                }
331                                .build()
332                            })?)
333                        }
334                        _ => UnexpectedSnafu {
335                            reason: "Unexpected output data",
336                        }
337                        .fail(),
338                    }
339                }
340            }
341        }
342    }
343}
344
345/// Describe a peer of frontend
346#[derive(Debug, Default)]
347pub(crate) enum PeerDesc {
348    /// Distributed mode's frontend peer address
349    Dist {
350        /// frontend peer address
351        peer: Peer,
352    },
353    /// Standalone mode
354    #[default]
355    Standalone,
356}
357
358impl std::fmt::Display for PeerDesc {
359    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360        match self {
361            PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
362            PeerDesc::Standalone => write!(f, "standalone"),
363        }
364    }
365}