1use std::collections::HashMap;
18use std::sync::{Arc, Weak};
19use std::time::SystemTime;
20
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{CreateTableExpr, QueryRequest};
24use client::{Client, Database};
25use common_error::ext::{BoxedError, ErrorExt};
26use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
27use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
28use common_meta::peer::Peer;
29use common_meta::rpc::store::RangeRequest;
30use common_query::Output;
31use common_telemetry::warn;
32use meta_client::client::MetaClient;
33use query::datafusion::QUERY_PARALLELISM_HINT;
34use query::options::QueryOptions;
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use session::hints::READ_PREFERENCE_HINT;
40use snafu::{OptionExt, ResultExt};
41
42use crate::batching_mode::BatchingModeOptions;
43use crate::error::{
44 CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
45 NoAvailableFrontendSnafu, UnexpectedSnafu,
46};
47use crate::{Error, FlowAuthHeader};
48
49#[async_trait::async_trait]
56pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
57 async fn do_query(
58 &self,
59 query: Request,
60 ctx: QueryContextRef,
61 ) -> std::result::Result<Output, BoxedError>;
62}
63
64#[async_trait::async_trait]
66impl<E: ErrorExt + Send + Sync + 'static, T: GrpcQueryHandler<Error = E> + Send + Sync + 'static>
67 GrpcQueryHandlerWithBoxedError for T
68{
69 async fn do_query(
70 &self,
71 query: Request,
72 ctx: QueryContextRef,
73 ) -> std::result::Result<Output, BoxedError> {
74 self.do_query(query, ctx).await.map_err(BoxedError::new)
75 }
76}
77
78type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
79
80#[derive(Debug, Clone)]
84pub enum FrontendClient {
85 Distributed {
86 meta_client: Arc<MetaClient>,
87 chnl_mgr: ChannelManager,
88 auth: Option<FlowAuthHeader>,
89 query: QueryOptions,
90 batch_opts: BatchingModeOptions,
91 },
92 Standalone {
93 database_client: HandlerMutable,
96 query: QueryOptions,
97 },
98}
99
100impl FrontendClient {
101 pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
103 let handler = Arc::new(std::sync::Mutex::new(None));
104 (
105 Self::Standalone {
106 database_client: handler.clone(),
107 query,
108 },
109 handler,
110 )
111 }
112
113 pub fn from_meta_client(
114 meta_client: Arc<MetaClient>,
115 auth: Option<FlowAuthHeader>,
116 query: QueryOptions,
117 batch_opts: BatchingModeOptions,
118 ) -> Result<Self, Error> {
119 common_telemetry::info!("Frontend client build with auth={:?}", auth);
120 Ok(Self::Distributed {
121 meta_client,
122 chnl_mgr: {
123 let cfg = ChannelConfig::new()
124 .connect_timeout(batch_opts.grpc_conn_timeout)
125 .timeout(batch_opts.query_timeout);
126 if let Some(tls) = &batch_opts.frontend_tls {
127 let cfg = cfg.client_tls_config(tls.clone());
128 ChannelManager::with_tls_config(cfg).context(InvalidClientConfigSnafu)?
129 } else {
130 ChannelManager::with_config(cfg)
131 }
132 },
133 auth,
134 query,
135 batch_opts,
136 })
137 }
138
139 pub fn from_grpc_handler(
140 grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
141 query: QueryOptions,
142 ) -> Self {
143 Self::Standalone {
144 database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
145 query,
146 }
147 }
148}
149
150#[derive(Debug, Clone)]
151pub struct DatabaseWithPeer {
152 pub database: Database,
153 pub peer: Peer,
154}
155
156impl DatabaseWithPeer {
157 fn new(database: Database, peer: Peer) -> Self {
158 Self { database, peer }
159 }
160
161 async fn try_select_one(&self) -> Result<(), Error> {
163 let _ = self
165 .database
166 .sql("SELECT 1")
167 .await
168 .with_context(|_| InvalidRequestSnafu {
169 context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
170 })?;
171 Ok(())
172 }
173}
174
175impl FrontendClient {
176 pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
178 let Self::Distributed { meta_client, .. } = self else {
179 return Ok(vec![]);
180 };
181 let cluster_client = meta_client
182 .cluster_client()
183 .map_err(BoxedError::new)
184 .context(ExternalSnafu)?;
185
186 let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
187 let req = RangeRequest::new().with_prefix(prefix);
188 let resp = cluster_client
189 .range(req)
190 .await
191 .map_err(BoxedError::new)
192 .context(ExternalSnafu)?;
193 let mut res = Vec::with_capacity(resp.kvs.len());
194 for kv in resp.kvs {
195 let key = NodeInfoKey::try_from(kv.key)
196 .map_err(BoxedError::new)
197 .context(ExternalSnafu)?;
198
199 let val = NodeInfo::try_from(kv.value)
200 .map_err(BoxedError::new)
201 .context(ExternalSnafu)?;
202 res.push((key, val));
203 }
204 Ok(res)
205 }
206
207 async fn get_random_active_frontend(
210 &self,
211 catalog: &str,
212 schema: &str,
213 ) -> Result<DatabaseWithPeer, Error> {
214 let Self::Distributed {
215 meta_client: _,
216 chnl_mgr,
217 auth,
218 query: _,
219 batch_opts,
220 } = self
221 else {
222 return UnexpectedSnafu {
223 reason: "Expect distributed mode",
224 }
225 .fail();
226 };
227
228 let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
229 interval.tick().await;
230 for retry in 0..batch_opts.experimental_grpc_max_retries {
231 let mut frontends = self.scan_for_frontend().await?;
232 let now_in_ms = SystemTime::now()
233 .duration_since(SystemTime::UNIX_EPOCH)
234 .unwrap()
235 .as_millis() as i64;
236 frontends.shuffle(&mut rng());
238
239 for (_, node_info) in frontends
241 .iter()
242 .filter(|(_, node_info)| {
244 node_info.last_activity_ts
245 + batch_opts
246 .experimental_frontend_activity_timeout
247 .as_millis() as i64
248 > now_in_ms
249 })
250 {
251 let addr = &node_info.peer.addr;
252 let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
253 let database = {
254 let mut db = Database::new(catalog, schema, client);
255 if let Some(auth) = auth {
256 db.set_auth(auth.auth().clone());
257 }
258 db
259 };
260 let db = DatabaseWithPeer::new(database, node_info.peer.clone());
261 match db.try_select_one().await {
262 Ok(_) => return Ok(db),
263 Err(e) => {
264 warn!(
265 "Failed to connect to frontend {} on retry={}: \n{e:?}",
266 addr, retry
267 );
268 }
269 }
270 }
271 interval.tick().await;
274 }
275
276 NoAvailableFrontendSnafu {
277 timeout: batch_opts.grpc_conn_timeout,
278 context: "No available frontend found that is able to process query",
279 }
280 .fail()
281 }
282
283 pub async fn create(
284 &self,
285 create: CreateTableExpr,
286 catalog: &str,
287 schema: &str,
288 ) -> Result<u32, Error> {
289 self.handle(
290 Request::Ddl(api::v1::DdlRequest {
291 expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())),
292 }),
293 catalog,
294 schema,
295 &mut None,
296 )
297 .await
298 .map_err(BoxedError::new)
299 .with_context(|_| CreateSinkTableSnafu {
300 create: create.clone(),
301 })
302 }
303
304 pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
306 match self {
307 FrontendClient::Distributed { .. } => {
308 let db = self.get_random_active_frontend(catalog, schema).await?;
309 db.database
310 .sql(sql)
311 .await
312 .map_err(BoxedError::new)
313 .context(ExternalSnafu)
314 }
315 FrontendClient::Standalone {
316 database_client, ..
317 } => {
318 let ctx = QueryContextBuilder::default()
319 .current_catalog(catalog.to_string())
320 .current_schema(schema.to_string())
321 .build();
322 let ctx = Arc::new(ctx);
323 {
324 let database_client = {
325 database_client
326 .lock()
327 .map_err(|e| {
328 UnexpectedSnafu {
329 reason: format!("Failed to lock database client: {e}"),
330 }
331 .build()
332 })?
333 .as_ref()
334 .context(UnexpectedSnafu {
335 reason: "Standalone's frontend instance is not set",
336 })?
337 .upgrade()
338 .context(UnexpectedSnafu {
339 reason: "Failed to upgrade database client",
340 })?
341 };
342 let req = Request::Query(QueryRequest {
343 query: Some(Query::Sql(sql.to_string())),
344 });
345 database_client
346 .do_query(req, ctx)
347 .await
348 .map_err(BoxedError::new)
349 .context(ExternalSnafu)
350 }
351 }
352 }
353 }
354
355 pub(crate) async fn handle(
357 &self,
358 req: api::v1::greptime_request::Request,
359 catalog: &str,
360 schema: &str,
361 peer_desc: &mut Option<PeerDesc>,
362 ) -> Result<u32, Error> {
363 match self {
364 FrontendClient::Distributed {
365 query, batch_opts, ..
366 } => {
367 let db = self.get_random_active_frontend(catalog, schema).await?;
368
369 *peer_desc = Some(PeerDesc::Dist {
370 peer: db.peer.clone(),
371 });
372
373 db.database
374 .handle_with_retry(
375 req.clone(),
376 batch_opts.experimental_grpc_max_retries,
377 &[
378 (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
379 (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
380 ],
381 )
382 .await
383 .with_context(|_| InvalidRequestSnafu {
384 context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
385 })
386 }
387 FrontendClient::Standalone {
388 database_client,
389 query,
390 } => {
391 let ctx = QueryContextBuilder::default()
392 .current_catalog(catalog.to_string())
393 .current_schema(schema.to_string())
394 .extensions(HashMap::from([(
395 QUERY_PARALLELISM_HINT.to_string(),
396 query.parallelism.to_string(),
397 )]))
398 .build();
399 let ctx = Arc::new(ctx);
400 {
401 let database_client = {
402 database_client
403 .lock()
404 .map_err(|e| {
405 UnexpectedSnafu {
406 reason: format!("Failed to lock database client: {e}"),
407 }
408 .build()
409 })?
410 .as_ref()
411 .context(UnexpectedSnafu {
412 reason: "Standalone's frontend instance is not set",
413 })?
414 .upgrade()
415 .context(UnexpectedSnafu {
416 reason: "Failed to upgrade database client",
417 })?
418 };
419 let resp: common_query::Output = database_client
420 .do_query(req, ctx)
421 .await
422 .map_err(BoxedError::new)
423 .context(ExternalSnafu)?;
424 match resp.data {
425 common_query::OutputData::AffectedRows(rows) => {
426 Ok(rows.try_into().map_err(|_| {
427 UnexpectedSnafu {
428 reason: format!("Failed to convert rows to u32: {}", rows),
429 }
430 .build()
431 })?)
432 }
433 _ => UnexpectedSnafu {
434 reason: "Unexpected output data",
435 }
436 .fail(),
437 }
438 }
439 }
440 }
441 }
442}
443
444#[derive(Debug, Default)]
446pub(crate) enum PeerDesc {
447 Dist {
449 peer: Peer,
451 },
452 #[default]
454 Standalone,
455}
456
457impl std::fmt::Display for PeerDesc {
458 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
459 match self {
460 PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
461 PeerDesc::Standalone => write!(f, "standalone"),
462 }
463 }
464}