1use std::sync::{Arc, Weak};
18use std::time::SystemTime;
19
20use api::v1::greptime_request::Request;
21use api::v1::CreateTableExpr;
22use client::{Client, Database};
23use common_error::ext::{BoxedError, ErrorExt};
24use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
25use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
26use common_meta::peer::Peer;
27use common_meta::rpc::store::RangeRequest;
28use common_query::Output;
29use common_telemetry::warn;
30use meta_client::client::MetaClient;
31use rand::rng;
32use rand::seq::SliceRandom;
33use servers::query_handler::grpc::GrpcQueryHandler;
34use session::context::{QueryContextBuilder, QueryContextRef};
35use snafu::{OptionExt, ResultExt};
36
37use crate::batching_mode::{
38 DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
39 GRPC_MAX_RETRIES,
40};
41use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
42use crate::{Error, FlowAuthHeader};
43
44#[async_trait::async_trait]
51pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
52 async fn do_query(
53 &self,
54 query: Request,
55 ctx: QueryContextRef,
56 ) -> std::result::Result<Output, BoxedError>;
57}
58
59#[async_trait::async_trait]
61impl<
62 E: ErrorExt + Send + Sync + 'static,
63 T: GrpcQueryHandler<Error = E> + Send + Sync + 'static,
64 > GrpcQueryHandlerWithBoxedError for T
65{
66 async fn do_query(
67 &self,
68 query: Request,
69 ctx: QueryContextRef,
70 ) -> std::result::Result<Output, BoxedError> {
71 self.do_query(query, ctx).await.map_err(BoxedError::new)
72 }
73}
74
75type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
76
77#[derive(Debug, Clone)]
81pub enum FrontendClient {
82 Distributed {
83 meta_client: Arc<MetaClient>,
84 chnl_mgr: ChannelManager,
85 auth: Option<FlowAuthHeader>,
86 },
87 Standalone {
88 database_client: HandlerMutable,
91 },
92}
93
94impl FrontendClient {
95 pub fn from_empty_grpc_handler() -> (Self, HandlerMutable) {
97 let handler = Arc::new(std::sync::Mutex::new(None));
98 (
99 Self::Standalone {
100 database_client: handler.clone(),
101 },
102 handler,
103 )
104 }
105
106 pub fn from_meta_client(meta_client: Arc<MetaClient>, auth: Option<FlowAuthHeader>) -> Self {
107 common_telemetry::info!("Frontend client build with auth={:?}", auth);
108 Self::Distributed {
109 meta_client,
110 chnl_mgr: {
111 let cfg = ChannelConfig::new()
112 .connect_timeout(GRPC_CONN_TIMEOUT)
113 .timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
114 ChannelManager::with_config(cfg)
115 },
116 auth,
117 }
118 }
119
120 pub fn from_grpc_handler(grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) -> Self {
121 Self::Standalone {
122 database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
123 }
124 }
125}
126
127#[derive(Debug, Clone)]
128pub struct DatabaseWithPeer {
129 pub database: Database,
130 pub peer: Peer,
131}
132
133impl DatabaseWithPeer {
134 fn new(database: Database, peer: Peer) -> Self {
135 Self { database, peer }
136 }
137
138 async fn try_select_one(&self) -> Result<(), Error> {
140 let _ = self
142 .database
143 .sql("SELECT 1")
144 .await
145 .with_context(|_| InvalidRequestSnafu {
146 context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
147 })?;
148 Ok(())
149 }
150}
151
152impl FrontendClient {
153 pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
155 let Self::Distributed { meta_client, .. } = self else {
156 return Ok(vec![]);
157 };
158 let cluster_client = meta_client
159 .cluster_client()
160 .map_err(BoxedError::new)
161 .context(ExternalSnafu)?;
162
163 let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
164 let req = RangeRequest::new().with_prefix(prefix);
165 let resp = cluster_client
166 .range(req)
167 .await
168 .map_err(BoxedError::new)
169 .context(ExternalSnafu)?;
170 let mut res = Vec::with_capacity(resp.kvs.len());
171 for kv in resp.kvs {
172 let key = NodeInfoKey::try_from(kv.key)
173 .map_err(BoxedError::new)
174 .context(ExternalSnafu)?;
175
176 let val = NodeInfo::try_from(kv.value)
177 .map_err(BoxedError::new)
178 .context(ExternalSnafu)?;
179 res.push((key, val));
180 }
181 Ok(res)
182 }
183
184 async fn get_random_active_frontend(
187 &self,
188 catalog: &str,
189 schema: &str,
190 ) -> Result<DatabaseWithPeer, Error> {
191 let Self::Distributed {
192 meta_client: _,
193 chnl_mgr,
194 auth,
195 } = self
196 else {
197 return UnexpectedSnafu {
198 reason: "Expect distributed mode",
199 }
200 .fail();
201 };
202
203 let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
204 interval.tick().await;
205 for retry in 0..GRPC_MAX_RETRIES {
206 let mut frontends = self.scan_for_frontend().await?;
207 let now_in_ms = SystemTime::now()
208 .duration_since(SystemTime::UNIX_EPOCH)
209 .unwrap()
210 .as_millis() as i64;
211 frontends.shuffle(&mut rng());
213
214 for (_, node_info) in frontends
216 .iter()
217 .filter(|(_, node_info)| {
219 node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
220 > now_in_ms
221 })
222 {
223 let addr = &node_info.peer.addr;
224 let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
225 let database = {
226 let mut db = Database::new(catalog, schema, client);
227 if let Some(auth) = auth {
228 db.set_auth(auth.auth().clone());
229 }
230 db
231 };
232 let db = DatabaseWithPeer::new(database, node_info.peer.clone());
233 match db.try_select_one().await {
234 Ok(_) => return Ok(db),
235 Err(e) => {
236 warn!(
237 "Failed to connect to frontend {} on retry={}: \n{e:?}",
238 addr, retry
239 );
240 }
241 }
242 }
243 interval.tick().await;
246 }
247
248 NoAvailableFrontendSnafu {
249 timeout: GRPC_CONN_TIMEOUT,
250 context: "No available frontend found that is able to process query",
251 }
252 .fail()
253 }
254
255 pub async fn create(
256 &self,
257 create: CreateTableExpr,
258 catalog: &str,
259 schema: &str,
260 ) -> Result<u32, Error> {
261 self.handle(
262 Request::Ddl(api::v1::DdlRequest {
263 expr: Some(api::v1::ddl_request::Expr::CreateTable(create)),
264 }),
265 catalog,
266 schema,
267 &mut None,
268 )
269 .await
270 }
271
272 pub(crate) async fn handle(
274 &self,
275 req: api::v1::greptime_request::Request,
276 catalog: &str,
277 schema: &str,
278 peer_desc: &mut Option<PeerDesc>,
279 ) -> Result<u32, Error> {
280 match self {
281 FrontendClient::Distributed { .. } => {
282 let db = self.get_random_active_frontend(catalog, schema).await?;
283
284 *peer_desc = Some(PeerDesc::Dist {
285 peer: db.peer.clone(),
286 });
287
288 db.database
289 .handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
290 .await
291 .with_context(|_| InvalidRequestSnafu {
292 context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
293 })
294 }
295 FrontendClient::Standalone { database_client } => {
296 let ctx = QueryContextBuilder::default()
297 .current_catalog(catalog.to_string())
298 .current_schema(schema.to_string())
299 .build();
300 let ctx = Arc::new(ctx);
301 {
302 let database_client = {
303 database_client
304 .lock()
305 .map_err(|e| {
306 UnexpectedSnafu {
307 reason: format!("Failed to lock database client: {e}"),
308 }
309 .build()
310 })?
311 .as_ref()
312 .context(UnexpectedSnafu {
313 reason: "Standalone's frontend instance is not set",
314 })?
315 .upgrade()
316 .context(UnexpectedSnafu {
317 reason: "Failed to upgrade database client",
318 })?
319 };
320 let resp: common_query::Output = database_client
321 .do_query(req.clone(), ctx)
322 .await
323 .map_err(BoxedError::new)
324 .context(ExternalSnafu)?;
325 match resp.data {
326 common_query::OutputData::AffectedRows(rows) => {
327 Ok(rows.try_into().map_err(|_| {
328 UnexpectedSnafu {
329 reason: format!("Failed to convert rows to u32: {}", rows),
330 }
331 .build()
332 })?)
333 }
334 _ => UnexpectedSnafu {
335 reason: "Unexpected output data",
336 }
337 .fail(),
338 }
339 }
340 }
341 }
342 }
343}
344
345#[derive(Debug, Default)]
347pub(crate) enum PeerDesc {
348 Dist {
350 peer: Peer,
352 },
353 #[default]
355 Standalone,
356}
357
358impl std::fmt::Display for PeerDesc {
359 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360 match self {
361 PeerDesc::Dist { peer } => write!(f, "{}", peer.addr),
362 PeerDesc::Standalone => write!(f, "standalone"),
363 }
364 }
365}