1use std::collections::HashMap;
18use std::sync::{Arc, Weak};
19use std::time::SystemTime;
20
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{CreateTableExpr, QueryRequest};
24use client::{Client, Database};
25use common_error::ext::{BoxedError, ErrorExt};
26use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
27use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
28use common_meta::peer::Peer;
29use common_meta::rpc::store::RangeRequest;
30use common_query::Output;
31use common_telemetry::warn;
32use meta_client::client::MetaClient;
33use query::datafusion::QUERY_PARALLELISM_HINT;
34use query::options::QueryOptions;
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use session::hints::READ_PREFERENCE_HINT;
40use snafu::{OptionExt, ResultExt};
41
42use crate::batching_mode::BatchingModeOptions;
43use crate::error::{
44 ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu,
45 UnexpectedSnafu,
46};
47use crate::{Error, FlowAuthHeader};
48
49#[async_trait::async_trait]
56pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
57 async fn do_query(
58 &self,
59 query: Request,
60 ctx: QueryContextRef,
61 ) -> std::result::Result<Output, BoxedError>;
62}
63
64#[async_trait::async_trait]
66impl<
67 E: ErrorExt + Send + Sync + 'static,
68 T: GrpcQueryHandler<Error = E> + Send + Sync + 'static,
69 > GrpcQueryHandlerWithBoxedError for T
70{
71 async fn do_query(
72 &self,
73 query: Request,
74 ctx: QueryContextRef,
75 ) -> std::result::Result<Output, BoxedError> {
76 self.do_query(query, ctx).await.map_err(BoxedError::new)
77 }
78}
79
80type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
81
82#[derive(Debug, Clone)]
86pub enum FrontendClient {
87 Distributed {
88 meta_client: Arc<MetaClient>,
89 chnl_mgr: ChannelManager,
90 auth: Option<FlowAuthHeader>,
91 query: QueryOptions,
92 batch_opts: BatchingModeOptions,
93 },
94 Standalone {
95 database_client: HandlerMutable,
98 query: QueryOptions,
99 },
100}
101
102impl FrontendClient {
103 pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
105 let handler = Arc::new(std::sync::Mutex::new(None));
106 (
107 Self::Standalone {
108 database_client: handler.clone(),
109 query,
110 },
111 handler,
112 )
113 }
114
115 pub fn from_meta_client(
116 meta_client: Arc<MetaClient>,
117 auth: Option<FlowAuthHeader>,
118 query: QueryOptions,
119 batch_opts: BatchingModeOptions,
120 ) -> Result<Self, Error> {
121 common_telemetry::info!("Frontend client build with auth={:?}", auth);
122 Ok(Self::Distributed {
123 meta_client,
124 chnl_mgr: {
125 let cfg = ChannelConfig::new()
126 .connect_timeout(batch_opts.grpc_conn_timeout)
127 .timeout(batch_opts.query_timeout);
128 if let Some(tls) = &batch_opts.frontend_tls {
129 let cfg = cfg.client_tls_config(tls.clone());
130 ChannelManager::with_tls_config(cfg).context(InvalidClientConfigSnafu)?
131 } else {
132 ChannelManager::with_config(cfg)
133 }
134 },
135 auth,
136 query,
137 batch_opts,
138 })
139 }
140
141 pub fn from_grpc_handler(
142 grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
143 query: QueryOptions,
144 ) -> Self {
145 Self::Standalone {
146 database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
147 query,
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
153pub struct DatabaseWithPeer {
154 pub database: Database,
155 pub peer: Peer,
156}
157
158impl DatabaseWithPeer {
159 fn new(database: Database, peer: Peer) -> Self {
160 Self { database, peer }
161 }
162
163 async fn try_select_one(&self) -> Result<(), Error> {
165 let _ = self
167 .database
168 .sql("SELECT 1")
169 .await
170 .with_context(|_| InvalidRequestSnafu {
171 context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
172 })?;
173 Ok(())
174 }
175}
176
177impl FrontendClient {
178 pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
180 let Self::Distributed { meta_client, .. } = self else {
181 return Ok(vec![]);
182 };
183 let cluster_client = meta_client
184 .cluster_client()
185 .map_err(BoxedError::new)
186 .context(ExternalSnafu)?;
187
188 let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
189 let req = RangeRequest::new().with_prefix(prefix);
190 let resp = cluster_client
191 .range(req)
192 .await
193 .map_err(BoxedError::new)
194 .context(ExternalSnafu)?;
195 let mut res = Vec::with_capacity(resp.kvs.len());
196 for kv in resp.kvs {
197 let key = NodeInfoKey::try_from(kv.key)
198 .map_err(BoxedError::new)
199 .context(ExternalSnafu)?;
200
201 let val = NodeInfo::try_from(kv.value)
202 .map_err(BoxedError::new)
203 .context(ExternalSnafu)?;
204 res.push((key, val));
205 }
206 Ok(res)
207 }
208
209 async fn get_random_active_frontend(
212 &self,
213 catalog: &str,
214 schema: &str,
215 ) -> Result<DatabaseWithPeer, Error> {
216 let Self::Distributed {
217 meta_client: _,
218 chnl_mgr,
219 auth,
220 query: _,
221 batch_opts,
222 } = self
223 else {
224 return UnexpectedSnafu {
225 reason: "Expect distributed mode",
226 }
227 .fail();
228 };
229
230 let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
231 interval.tick().await;
232 for retry in 0..batch_opts.experimental_grpc_max_retries {
233 let mut frontends = self.scan_for_frontend().await?;
234 let now_in_ms = SystemTime::now()
235 .duration_since(SystemTime::UNIX_EPOCH)
236 .unwrap()
237 .as_millis() as i64;
238 frontends.shuffle(&mut rng());
240
241 for (_, node_info) in frontends
243 .iter()
244 .filter(|(_, node_info)| {
246 node_info.last_activity_ts
247 + batch_opts
248 .experimental_frontend_activity_timeout
249 .as_millis() as i64
250 > now_in_ms
251 })
252 {
253 let addr = &node_info.peer.addr;
254 let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
255 let database = {
256 let mut db = Database::new(catalog, schema, client);
257 if let Some(auth) = auth {
258 db.set_auth(auth.auth().clone());
259 }
260 db
261 };
262 let db = DatabaseWithPeer::new(database, node_info.peer.clone());
263 match db.try_select_one().await {
264 Ok(_) => return Ok(db),
265 Err(e) => {
266 warn!(
267 "Failed to connect to frontend {} on retry={}: \n{e:?}",
268 addr, retry
269 );
270 }
271 }
272 }
273 interval.tick().await;
276 }
277
278 NoAvailableFrontendSnafu {
279 timeout: batch_opts.grpc_conn_timeout,
280 context: "No available frontend found that is able to process query",
281 }
282 .fail()
283 }
284
285 pub async fn create(
286 &self,
287 create: CreateTableExpr,
288 catalog: &str,
289 schema: &str,
290 ) -> Result<u32, Error> {
291 self.handle(
292 Request::Ddl(api::v1::DdlRequest {
293 expr: Some(api::v1::ddl_request::Expr::CreateTable(create)),
294 }),
295 catalog,
296 schema,
297 &mut None,
298 )
299 .await
300 }
301
302 pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
304 match self {
305 FrontendClient::Distributed { .. } => {
306 let db = self.get_random_active_frontend(catalog, schema).await?;
307 db.database
308 .sql(sql)
309 .await
310 .map_err(BoxedError::new)
311 .context(ExternalSnafu)
312 }
313 FrontendClient::Standalone {
314 database_client, ..
315 } => {
316 let ctx = QueryContextBuilder::default()
317 .current_catalog(catalog.to_string())
318 .current_schema(schema.to_string())
319 .build();
320 let ctx = Arc::new(ctx);
321 {
322 let database_client = {
323 database_client
324 .lock()
325 .map_err(|e| {
326 UnexpectedSnafu {
327 reason: format!("Failed to lock database client: {e}"),
328 }
329 .build()
330 })?
331 .as_ref()
332 .context(UnexpectedSnafu {
333 reason: "Standalone's frontend instance is not set",
334 })?
335 .upgrade()
336 .context(UnexpectedSnafu {
337 reason: "Failed to upgrade database client",
338 })?
339 };
340 let req = Request::Query(QueryRequest {
341 query: Some(Query::Sql(sql.to_string())),
342 });
343 database_client
344 .do_query(req, ctx)
345 .await
346 .map_err(BoxedError::new)
347 .context(ExternalSnafu)
348 }
349 }
350 }
351 }
352
353 pub(crate) async fn handle(
355 &self,
356 req: api::v1::greptime_request::Request,
357 catalog: &str,
358 schema: &str,
359 peer_desc: &mut Option<PeerDesc>,
360 ) -> Result<u32, Error> {
361 match self {
362 FrontendClient::Distributed {
363 query, batch_opts, ..
364 } => {
365 let db = self.get_random_active_frontend(catalog, schema).await?;
366
367 *peer_desc = Some(PeerDesc::Dist {
368 peer: db.peer.clone(),
369 });
370
371 db.database
372 .handle_with_retry(
373 req.clone(),
374 batch_opts.experimental_grpc_max_retries,
375 &[
376 (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
377 (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
378 ],
379 )
380 .await
381 .with_context(|_| InvalidRequestSnafu {
382 context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
383 })
384 }
385 FrontendClient::Standalone {
386 database_client,
387 query,
388 } => {
389 let ctx = QueryContextBuilder::default()
390 .current_catalog(catalog.to_string())
391 .current_schema(schema.to_string())
392 .extensions(HashMap::from([(
393 QUERY_PARALLELISM_HINT.to_string(),
394 query.parallelism.to_string(),
395 )]))
396 .build();
397 let ctx = Arc::new(ctx);
398 {
399 let database_client = {
400 database_client
401 .lock()
402 .map_err(|e| {
403 UnexpectedSnafu {
404 reason: format!("Failed to lock database client: {e}"),
405 }
406 .build()
407 })?
408 .as_ref()
409 .context(UnexpectedSnafu {
410 reason: "Standalone's frontend instance is not set",
411 })?
412 .upgrade()
413 .context(UnexpectedSnafu {
414 reason: "Failed to upgrade database client",
415 })?
416 };
417 let resp: common_query::Output = database_client
418 .do_query(req, ctx)
419 .await
420 .map_err(BoxedError::new)
421 .context(ExternalSnafu)?;
422 match resp.data {
423 common_query::OutputData::AffectedRows(rows) => {
424 Ok(rows.try_into().map_err(|_| {
425 UnexpectedSnafu {
426 reason: format!("Failed to convert rows to u32: {}", rows),
427 }
428 .build()
429 })?)
430 }
431 _ => UnexpectedSnafu {
432 reason: "Unexpected output data",
433 }
434 .fail(),
435 }
436 }
437 }
438 }
439 }
440}
441
442#[derive(Debug, Default)]
444pub(crate) enum PeerDesc {
445 Dist {
447 peer: Peer,
449 },
450 #[default]
452 Standalone,
453}
454
455impl std::fmt::Display for PeerDesc {
456 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457 match self {
458 PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
459 PeerDesc::Standalone => write!(f, "standalone"),
460 }
461 }
462}