1use std::collections::HashMap;
18use std::sync::{Arc, Weak};
19use std::time::SystemTime;
20
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{CreateTableExpr, QueryRequest};
24use client::{Client, Database};
25use common_error::ext::{BoxedError, ErrorExt};
26use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
27use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
28use common_meta::peer::Peer;
29use common_meta::rpc::store::RangeRequest;
30use common_query::Output;
31use common_telemetry::warn;
32use meta_client::client::MetaClient;
33use query::datafusion::QUERY_PARALLELISM_HINT;
34use query::options::QueryOptions;
35use rand::rng;
36use rand::seq::SliceRandom;
37use servers::query_handler::grpc::GrpcQueryHandler;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use snafu::{OptionExt, ResultExt};
40
41use crate::batching_mode::BatchingModeOptions;
42use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
43use crate::{Error, FlowAuthHeader};
44
45#[async_trait::async_trait]
52pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
53 async fn do_query(
54 &self,
55 query: Request,
56 ctx: QueryContextRef,
57 ) -> std::result::Result<Output, BoxedError>;
58}
59
60#[async_trait::async_trait]
62impl<
63 E: ErrorExt + Send + Sync + 'static,
64 T: GrpcQueryHandler<Error = E> + Send + Sync + 'static,
65 > GrpcQueryHandlerWithBoxedError for T
66{
67 async fn do_query(
68 &self,
69 query: Request,
70 ctx: QueryContextRef,
71 ) -> std::result::Result<Output, BoxedError> {
72 self.do_query(query, ctx).await.map_err(BoxedError::new)
73 }
74}
75
76type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
77
78#[derive(Debug, Clone)]
82pub enum FrontendClient {
83 Distributed {
84 meta_client: Arc<MetaClient>,
85 chnl_mgr: ChannelManager,
86 auth: Option<FlowAuthHeader>,
87 query: QueryOptions,
88 batch_opts: BatchingModeOptions,
89 },
90 Standalone {
91 database_client: HandlerMutable,
94 query: QueryOptions,
95 },
96}
97
98impl FrontendClient {
99 pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
101 let handler = Arc::new(std::sync::Mutex::new(None));
102 (
103 Self::Standalone {
104 database_client: handler.clone(),
105 query,
106 },
107 handler,
108 )
109 }
110
111 pub fn from_meta_client(
112 meta_client: Arc<MetaClient>,
113 auth: Option<FlowAuthHeader>,
114 query: QueryOptions,
115 batch_opts: BatchingModeOptions,
116 ) -> Self {
117 common_telemetry::info!("Frontend client build with auth={:?}", auth);
118 Self::Distributed {
119 meta_client,
120 chnl_mgr: {
121 let cfg = ChannelConfig::new()
122 .connect_timeout(batch_opts.grpc_conn_timeout)
123 .timeout(batch_opts.query_timeout);
124 ChannelManager::with_config(cfg)
125 },
126 auth,
127 query,
128 batch_opts,
129 }
130 }
131
132 pub fn from_grpc_handler(
133 grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
134 query: QueryOptions,
135 ) -> Self {
136 Self::Standalone {
137 database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
138 query,
139 }
140 }
141}
142
143#[derive(Debug, Clone)]
144pub struct DatabaseWithPeer {
145 pub database: Database,
146 pub peer: Peer,
147}
148
149impl DatabaseWithPeer {
150 fn new(database: Database, peer: Peer) -> Self {
151 Self { database, peer }
152 }
153
154 async fn try_select_one(&self) -> Result<(), Error> {
156 let _ = self
158 .database
159 .sql("SELECT 1")
160 .await
161 .with_context(|_| InvalidRequestSnafu {
162 context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
163 })?;
164 Ok(())
165 }
166}
167
168impl FrontendClient {
169 pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
171 let Self::Distributed { meta_client, .. } = self else {
172 return Ok(vec![]);
173 };
174 let cluster_client = meta_client
175 .cluster_client()
176 .map_err(BoxedError::new)
177 .context(ExternalSnafu)?;
178
179 let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
180 let req = RangeRequest::new().with_prefix(prefix);
181 let resp = cluster_client
182 .range(req)
183 .await
184 .map_err(BoxedError::new)
185 .context(ExternalSnafu)?;
186 let mut res = Vec::with_capacity(resp.kvs.len());
187 for kv in resp.kvs {
188 let key = NodeInfoKey::try_from(kv.key)
189 .map_err(BoxedError::new)
190 .context(ExternalSnafu)?;
191
192 let val = NodeInfo::try_from(kv.value)
193 .map_err(BoxedError::new)
194 .context(ExternalSnafu)?;
195 res.push((key, val));
196 }
197 Ok(res)
198 }
199
200 async fn get_random_active_frontend(
203 &self,
204 catalog: &str,
205 schema: &str,
206 ) -> Result<DatabaseWithPeer, Error> {
207 let Self::Distributed {
208 meta_client: _,
209 chnl_mgr,
210 auth,
211 query: _,
212 batch_opts,
213 } = self
214 else {
215 return UnexpectedSnafu {
216 reason: "Expect distributed mode",
217 }
218 .fail();
219 };
220
221 let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
222 interval.tick().await;
223 for retry in 0..batch_opts.experimental_grpc_max_retries {
224 let mut frontends = self.scan_for_frontend().await?;
225 let now_in_ms = SystemTime::now()
226 .duration_since(SystemTime::UNIX_EPOCH)
227 .unwrap()
228 .as_millis() as i64;
229 frontends.shuffle(&mut rng());
231
232 for (_, node_info) in frontends
234 .iter()
235 .filter(|(_, node_info)| {
237 node_info.last_activity_ts
238 + batch_opts
239 .experimental_frontend_activity_timeout
240 .as_millis() as i64
241 > now_in_ms
242 })
243 {
244 let addr = &node_info.peer.addr;
245 let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
246 let database = {
247 let mut db = Database::new(catalog, schema, client);
248 if let Some(auth) = auth {
249 db.set_auth(auth.auth().clone());
250 }
251 db
252 };
253 let db = DatabaseWithPeer::new(database, node_info.peer.clone());
254 match db.try_select_one().await {
255 Ok(_) => return Ok(db),
256 Err(e) => {
257 warn!(
258 "Failed to connect to frontend {} on retry={}: \n{e:?}",
259 addr, retry
260 );
261 }
262 }
263 }
264 interval.tick().await;
267 }
268
269 NoAvailableFrontendSnafu {
270 timeout: batch_opts.grpc_conn_timeout,
271 context: "No available frontend found that is able to process query",
272 }
273 .fail()
274 }
275
276 pub async fn create(
277 &self,
278 create: CreateTableExpr,
279 catalog: &str,
280 schema: &str,
281 ) -> Result<u32, Error> {
282 self.handle(
283 Request::Ddl(api::v1::DdlRequest {
284 expr: Some(api::v1::ddl_request::Expr::CreateTable(create)),
285 }),
286 catalog,
287 schema,
288 &mut None,
289 )
290 .await
291 }
292
293 pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
295 match self {
296 FrontendClient::Distributed { .. } => {
297 let db = self.get_random_active_frontend(catalog, schema).await?;
298 db.database
299 .sql(sql)
300 .await
301 .map_err(BoxedError::new)
302 .context(ExternalSnafu)
303 }
304 FrontendClient::Standalone {
305 database_client, ..
306 } => {
307 let ctx = QueryContextBuilder::default()
308 .current_catalog(catalog.to_string())
309 .current_schema(schema.to_string())
310 .build();
311 let ctx = Arc::new(ctx);
312 {
313 let database_client = {
314 database_client
315 .lock()
316 .map_err(|e| {
317 UnexpectedSnafu {
318 reason: format!("Failed to lock database client: {e}"),
319 }
320 .build()
321 })?
322 .as_ref()
323 .context(UnexpectedSnafu {
324 reason: "Standalone's frontend instance is not set",
325 })?
326 .upgrade()
327 .context(UnexpectedSnafu {
328 reason: "Failed to upgrade database client",
329 })?
330 };
331 let req = Request::Query(QueryRequest {
332 query: Some(Query::Sql(sql.to_string())),
333 });
334 database_client
335 .do_query(req, ctx)
336 .await
337 .map_err(BoxedError::new)
338 .context(ExternalSnafu)
339 }
340 }
341 }
342 }
343
344 pub(crate) async fn handle(
346 &self,
347 req: api::v1::greptime_request::Request,
348 catalog: &str,
349 schema: &str,
350 peer_desc: &mut Option<PeerDesc>,
351 ) -> Result<u32, Error> {
352 match self {
353 FrontendClient::Distributed {
354 query, batch_opts, ..
355 } => {
356 let db = self.get_random_active_frontend(catalog, schema).await?;
357
358 *peer_desc = Some(PeerDesc::Dist {
359 peer: db.peer.clone(),
360 });
361
362 db.database
363 .handle_with_retry(
364 req.clone(),
365 batch_opts.experimental_grpc_max_retries,
366 &[(QUERY_PARALLELISM_HINT, &query.parallelism.to_string())],
367 )
368 .await
369 .with_context(|_| InvalidRequestSnafu {
370 context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
371 })
372 }
373 FrontendClient::Standalone {
374 database_client,
375 query,
376 } => {
377 let ctx = QueryContextBuilder::default()
378 .current_catalog(catalog.to_string())
379 .current_schema(schema.to_string())
380 .extensions(HashMap::from([(
381 QUERY_PARALLELISM_HINT.to_string(),
382 query.parallelism.to_string(),
383 )]))
384 .build();
385 let ctx = Arc::new(ctx);
386 {
387 let database_client = {
388 database_client
389 .lock()
390 .map_err(|e| {
391 UnexpectedSnafu {
392 reason: format!("Failed to lock database client: {e}"),
393 }
394 .build()
395 })?
396 .as_ref()
397 .context(UnexpectedSnafu {
398 reason: "Standalone's frontend instance is not set",
399 })?
400 .upgrade()
401 .context(UnexpectedSnafu {
402 reason: "Failed to upgrade database client",
403 })?
404 };
405 let resp: common_query::Output = database_client
406 .do_query(req, ctx)
407 .await
408 .map_err(BoxedError::new)
409 .context(ExternalSnafu)?;
410 match resp.data {
411 common_query::OutputData::AffectedRows(rows) => {
412 Ok(rows.try_into().map_err(|_| {
413 UnexpectedSnafu {
414 reason: format!("Failed to convert rows to u32: {}", rows),
415 }
416 .build()
417 })?)
418 }
419 _ => UnexpectedSnafu {
420 reason: "Unexpected output data",
421 }
422 .fail(),
423 }
424 }
425 }
426 }
427 }
428}
429
430#[derive(Debug, Default)]
432pub(crate) enum PeerDesc {
433 Dist {
435 peer: Peer,
437 },
438 #[default]
440 Standalone,
441}
442
443impl std::fmt::Display for PeerDesc {
444 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
445 match self {
446 PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
447 PeerDesc::Standalone => write!(f, "standalone"),
448 }
449 }
450}