1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use axum::extract::rejection::FormRejection;
20use axum::extract::{Json, Query, State};
21use axum::response::sse::{Event, KeepAlive, Sse};
22use axum::response::{IntoResponse, Response};
23use axum::{Extension, Form};
24use common_catalog::parse_catalog_and_schema_from_db_string;
25use common_error::ext::ErrorExt;
26use common_error::status_code::StatusCode;
27use common_plugins::GREPTIME_EXEC_WRITE_COST;
28use common_query::{Output, OutputData};
29use common_recordbatch::{RecordBatch, SendableRecordBatchStream, util};
30use common_telemetry::tracing;
31use datafusion::physical_plan::ExecutionPlan;
32use datatypes::schema::SchemaRef;
33use futures::StreamExt;
34use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery};
35use serde::{Deserialize, Serialize};
36use serde_json::Value;
37use session::context::{Channel, QueryContext, QueryContextRef};
38use snafu::ResultExt;
39use sql::dialect::GreptimeDbDialect;
40use sql::parser::{ParseOptions, ParserContext};
41use sql::statements::statement::Statement;
42
43use crate::error::{FailedToParseQuerySnafu, InvalidQuerySnafu, Result};
44use crate::http::header::collect_plan_metrics;
45use crate::http::result::arrow_result::ArrowResponse;
46use crate::http::result::csv_result::CsvResponse;
47use crate::http::result::error_result::ErrorResponse;
48use crate::http::result::greptime_result_v1::GreptimedbV1Response;
49use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
50use crate::http::result::json_result::JsonResponse;
51use crate::http::result::null_result::NullResponse;
52use crate::http::result::table_result::TableResponse;
53use crate::http::{
54 ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
55 HttpResponse, ResponseFormat,
56};
57use crate::metrics_handler::MetricsHandler;
58use crate::query_handler::sql::ServerSqlQueryHandlerRef;
59
60#[derive(Debug, Default, Serialize, Deserialize)]
61pub struct SqlQuery {
62 pub db: Option<String>,
63 pub sql: Option<String>,
64 pub format: Option<String>,
68 pub epoch: Option<String>,
77 pub limit: Option<usize>,
78 pub compression: Option<String>,
80 pub snapshot_interval_ms: Option<u64>,
81}
82
83const DEFAULT_ANALYZE_SNAPSHOT_INTERVAL_MS: u64 = 5000;
84const MIN_ANALYZE_SNAPSHOT_INTERVAL_MS: u64 = 1000;
85const MAX_ANALYZE_SNAPSHOT_INTERVAL_MS: u64 = 60000;
86
87#[derive(Serialize)]
88struct AnalyzeStreamPayload {
89 seq: u64,
90 state: &'static str,
91 partial: bool,
92 elapsed_ms: u64,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 metrics: Option<Value>,
95 #[serde(skip_serializing_if = "Option::is_none")]
96 output: Option<GreptimeQueryOutput>,
97 #[serde(skip_serializing_if = "Option::is_none")]
98 reason: Option<String>,
99 #[serde(skip_serializing_if = "Option::is_none")]
100 code: Option<u32>,
101}
102
103struct AnalyzeStreamState {
104 stream: SendableRecordBatchStream,
105 schema: SchemaRef,
106 plan: Option<Arc<dyn ExecutionPlan>>,
107 batches: Vec<RecordBatch>,
108 seq: u64,
109 start: Instant,
110 requested_interval_ms: u64,
111 current_interval_ms: u64,
112 done: bool,
113}
114
115#[axum_macros::debug_handler]
117#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
118pub async fn sql(
119 State(state): State<ApiState>,
120 Query(query_params): Query<SqlQuery>,
121 Extension(mut query_ctx): Extension<QueryContext>,
122 Form(form_params): Form<SqlQuery>,
123) -> HttpResponse {
124 let start = Instant::now();
125 let sql_handler = &state.sql_handler;
126 if let Some(db) = &query_params.db.or(form_params.db) {
127 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
128 query_ctx.set_current_catalog(&catalog);
129 query_ctx.set_current_schema(&schema);
130 }
131 let db = query_ctx.get_db_string();
132
133 query_ctx.set_channel(Channel::HttpSql);
134 let query_ctx = Arc::new(query_ctx);
135
136 let _timer = crate::metrics::METRIC_HTTP_SQL_ELAPSED
137 .with_label_values(&[db.as_str()])
138 .start_timer();
139
140 let sql = query_params.sql.or(form_params.sql);
141 let format = query_params
142 .format
143 .or(form_params.format)
144 .map(|s| s.to_lowercase())
145 .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
146 .unwrap_or(ResponseFormat::GreptimedbV1);
147 let epoch = query_params
148 .epoch
149 .or(form_params.epoch)
150 .map(|s| s.to_lowercase())
151 .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
152
153 let result = if let Some(sql) = &sql {
154 if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
155 Err((status, msg))
156 } else {
157 Ok(sql_handler.do_query(sql, query_ctx.clone()).await)
158 }
159 } else {
160 Err((
161 StatusCode::InvalidArguments,
162 "sql parameter is required.".to_string(),
163 ))
164 };
165
166 let outputs = match result {
167 Err((status, msg)) => {
168 return HttpResponse::Error(
169 ErrorResponse::from_error_message(status, msg)
170 .with_execution_time(start.elapsed().as_millis() as u64),
171 );
172 }
173 Ok(outputs) => outputs,
174 };
175
176 let mut resp = match format {
177 ResponseFormat::Arrow => {
178 ArrowResponse::from_output(outputs, query_params.compression).await
179 }
180 ResponseFormat::Csv(with_names, with_types) => {
181 CsvResponse::from_output(outputs, with_names, with_types).await
182 }
183 ResponseFormat::Table => TableResponse::from_output(outputs).await,
184 ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
185 ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
186 ResponseFormat::Json => JsonResponse::from_output(outputs).await,
187 ResponseFormat::Null => NullResponse::from_output(outputs).await,
188 };
189
190 if let Some(limit) = query_params.limit {
191 resp = resp.with_limit(limit);
192 }
193 resp.with_execution_time(start.elapsed().as_millis() as u64)
194}
195
196#[axum_macros::debug_handler]
205#[tracing::instrument(
206 skip_all,
207 fields(protocol = "http", request_type = "sql_analyze_stream")
208)]
209pub async fn sql_analyze_stream(
210 State(state): State<ApiState>,
211 Query(query_params): Query<SqlQuery>,
212 Extension(mut query_ctx): Extension<QueryContext>,
213 form_params: std::result::Result<Form<SqlQuery>, FormRejection>,
214) -> Response {
215 let start = Instant::now();
216 let form_params = match form_params {
217 Ok(Form(params)) => params,
218 Err(err) => {
219 if err.status() != axum::http::StatusCode::UNSUPPORTED_MEDIA_TYPE {
220 return ErrorResponse::from_error_message(
221 StatusCode::InvalidArguments,
222 err.body_text(),
223 )
224 .with_execution_time(start.elapsed().as_millis() as u64)
225 .into_response();
226 }
227 SqlQuery::default()
228 }
229 };
230 let sql_handler = &state.sql_handler;
231 if let Some(db) = &query_params.db.or(form_params.db) {
232 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
233 query_ctx.set_current_catalog(&catalog);
234 query_ctx.set_current_schema(&schema);
235 }
236 query_ctx.set_channel(Channel::HttpSql);
237 let query_ctx = Arc::new(query_ctx);
238
239 let Some(sql) = query_params.sql.or(form_params.sql) else {
240 return ErrorResponse::from_error_message(
241 StatusCode::InvalidArguments,
242 "sql parameter is required.".to_string(),
243 )
244 .with_execution_time(start.elapsed().as_millis() as u64)
245 .into_response();
246 };
247 if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
248 return ErrorResponse::from_error_message(status, msg)
249 .with_execution_time(start.elapsed().as_millis() as u64)
250 .into_response();
251 }
252
253 let interval_ms = query_params
254 .snapshot_interval_ms
255 .or(form_params.snapshot_interval_ms)
256 .unwrap_or(DEFAULT_ANALYZE_SNAPSHOT_INTERVAL_MS)
257 .clamp(
258 MIN_ANALYZE_SNAPSHOT_INTERVAL_MS,
259 MAX_ANALYZE_SNAPSHOT_INTERVAL_MS,
260 );
261
262 let output = match state
263 .sql_handler
264 .do_analyze_stream_query(&sql, query_ctx.clone())
265 .await
266 {
267 Ok(output) => output,
268 Err(err) => {
269 return ErrorResponse::from_error(err)
270 .with_execution_time(start.elapsed().as_millis() as u64)
271 .into_response();
272 }
273 };
274
275 let plan = output.meta.plan.clone();
276 let OutputData::Stream(stream) = output.data else {
277 return ErrorResponse::from_error_message(
278 StatusCode::InvalidArguments,
279 "analyze stream query must return a stream".to_string(),
280 )
281 .with_execution_time(start.elapsed().as_millis() as u64)
282 .into_response();
283 };
284 let schema = stream.schema();
285
286 let sse_stream = futures::stream::unfold(
287 AnalyzeStreamState {
288 stream,
289 schema,
290 plan,
291 batches: Vec::new(),
292 seq: 0,
293 start,
294 requested_interval_ms: interval_ms,
295 current_interval_ms: interval_ms,
296 done: false,
297 },
298 |mut state| async move {
299 if state.done {
300 return None;
301 }
302 let tick = tokio::time::sleep(Duration::from_millis(state.current_interval_ms));
303 tokio::pin!(tick);
304 loop {
305 tokio::select! {
306 item = state.stream.next() => {
307 match item {
308 Some(Ok(batch)) => state.batches.push(batch),
309 Some(Err(err)) => {
310 let status = err.status_code();
311 let event_name = if status == StatusCode::Cancelled { "canceled" } else { "error" };
312 let (payload, _) = make_analyze_payload(AnalyzePayloadArgs {
313 seq: state.seq,
314 state: event_name,
315 partial: false,
316 start: state.start,
317 plan: state.plan.as_ref(),
318 output: None,
319 reason: Some(err.output_msg()),
320 code: Some(status as u32),
321 });
322 state.seq += 1;
323 state.done = true;
324 return Some((Ok::<Event, std::convert::Infallible>(Event::default().event(event_name).data(payload)), state));
325 }
326 None => {
327 let batches = std::mem::take(&mut state.batches);
328 let output = HttpRecordsOutput::try_new(state.schema.clone(), batches)
329 .map(GreptimeQueryOutput::Records);
330 let (event_name, payload) = make_final_analyze_event(
331 output.map_err(|err| (err.output_msg(), err.status_code() as u32)),
332 state.seq,
333 state.start,
334 state.plan.as_ref(),
335 );
336 state.seq += 1;
337 state.done = true;
338 return Some((Ok::<Event, std::convert::Infallible>(Event::default().event(event_name).data(payload)), state));
339 }
340 }
341 }
342 _ = &mut tick => {
343 if state.plan.is_some() {
344 let (payload, payload_bytes) = make_analyze_payload(AnalyzePayloadArgs {
345 seq: state.seq,
346 state: "metrics",
347 partial: true,
348 start: state.start,
349 plan: state.plan.as_ref(),
350 output: None,
351 reason: None,
352 code: None,
353 });
354 state.current_interval_ms = adaptive_interval_ms(payload_bytes, state.requested_interval_ms);
355 state.seq += 1;
356 return Some((Ok::<Event, std::convert::Infallible>(Event::default().event("metrics").data(payload)), state));
357 }
358 tick.as_mut().reset(tokio::time::Instant::now() + Duration::from_millis(state.current_interval_ms));
359 }
360 }
361 }
362 },
363 );
364
365 Sse::new(sse_stream)
366 .keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
367 .into_response()
368}
369
370fn adaptive_interval_ms(payload_bytes: usize, requested_ms: u64) -> u64 {
371 if payload_bytes >= 10 * 1024 * 1024 {
372 requested_ms.max(30_000)
373 } else if payload_bytes >= 1024 * 1024 {
374 requested_ms.max(10_000)
375 } else {
376 requested_ms
377 }
378}
379
380fn make_final_analyze_event(
381 output: std::result::Result<GreptimeQueryOutput, (String, u32)>,
382 seq: u64,
383 start: Instant,
384 plan: Option<&Arc<dyn ExecutionPlan>>,
385) -> (&'static str, String) {
386 match output {
387 Ok(output) => (
388 "final",
389 make_analyze_payload(AnalyzePayloadArgs {
390 seq,
391 state: "final",
392 partial: false,
393 start,
394 plan,
395 output: Some(output),
396 reason: None,
397 code: None,
398 })
399 .0,
400 ),
401 Err((reason, code)) => (
402 "error",
403 make_analyze_payload(AnalyzePayloadArgs {
404 seq,
405 state: "error",
406 partial: false,
407 start,
408 plan,
409 output: None,
410 reason: Some(reason),
411 code: Some(code),
412 })
413 .0,
414 ),
415 }
416}
417
418struct AnalyzePayloadArgs<'a> {
419 seq: u64,
420 state: &'static str,
421 partial: bool,
422 start: Instant,
423 plan: Option<&'a Arc<dyn ExecutionPlan>>,
424 output: Option<GreptimeQueryOutput>,
425 reason: Option<String>,
426 code: Option<u32>,
427}
428
429fn make_analyze_payload(args: AnalyzePayloadArgs<'_>) -> (String, usize) {
430 let AnalyzePayloadArgs {
431 seq,
432 state,
433 partial,
434 start,
435 plan,
436 output,
437 reason,
438 code,
439 } = args;
440 let metrics = plan.and_then(|plan| query::analyze_plan_metrics_to_json_value(plan, true).ok());
441 let payload = AnalyzeStreamPayload {
442 seq,
443 state,
444 partial,
445 elapsed_ms: start.elapsed().as_millis() as u64,
446 metrics,
447 output,
448 reason,
449 code,
450 };
451 let payload_string = serde_json::to_string(&payload).unwrap_or_else(|e| {
452 serde_json::json!({
453 "seq": seq,
454 "state": "error",
455 "partial": false,
456 "reason": format!("Failed to serialize SSE payload: {e}"),
457 })
458 .to_string()
459 });
460 let payload_bytes = payload_string.len();
461 (payload_string, payload_bytes)
462}
463
464#[axum_macros::debug_handler]
466#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
467pub async fn sql_parse(
468 Query(query_params): Query<SqlQuery>,
469 Form(form_params): Form<SqlQuery>,
470) -> Result<Json<Vec<Statement>>> {
471 let Some(sql) = query_params.sql.or(form_params.sql) else {
472 return InvalidQuerySnafu {
473 reason: "sql parameter is required.",
474 }
475 .fail();
476 };
477
478 let stmts =
479 ParserContext::create_with_dialect(&sql, &GreptimeDbDialect {}, ParseOptions::default())
480 .context(FailedToParseQuerySnafu)?;
481
482 Ok(stmts.into())
483}
484
485#[derive(Debug, Serialize, Deserialize)]
486pub struct SqlFormatResponse {
487 pub formatted: String,
488}
489
490#[axum_macros::debug_handler]
492#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql_format"))]
493pub async fn sql_format(
494 Query(query_params): Query<SqlQuery>,
495 Form(form_params): Form<SqlQuery>,
496) -> axum::response::Response {
497 let Some(sql) = query_params.sql.or(form_params.sql) else {
498 let resp = ErrorResponse::from_error_message(
499 StatusCode::InvalidArguments,
500 "sql parameter is required.".to_string(),
501 );
502 return HttpResponse::Error(resp).into_response();
503 };
504
505 let stmts = match ParserContext::create_with_dialect(
507 &sql,
508 &GreptimeDbDialect {},
509 ParseOptions::default(),
510 ) {
511 Ok(v) => v,
512 Err(e) => return HttpResponse::Error(ErrorResponse::from_error(e)).into_response(),
513 };
514
515 let mut parts: Vec<String> = Vec::with_capacity(stmts.len());
516 for stmt in stmts {
517 let mut s = format!("{stmt}");
518 if !s.trim_end().ends_with(';') {
519 s.push(';');
520 }
521 parts.push(s);
522 }
523
524 let formatted = parts.join("\n");
525 Json(SqlFormatResponse { formatted }).into_response()
526}
527
528pub async fn from_output(
530 outputs: Vec<crate::error::Result<Output>>,
531) -> std::result::Result<(Vec<GreptimeQueryOutput>, HashMap<String, Value>), ErrorResponse> {
532 let mut results = Vec::with_capacity(outputs.len());
535 let mut merge_map = HashMap::new();
536
537 for out in outputs {
538 match out {
539 Ok(o) => match o.data {
540 OutputData::AffectedRows(rows) => {
541 results.push(GreptimeQueryOutput::AffectedRows(rows));
542 if o.meta.cost > 0 {
543 merge_map.insert(GREPTIME_EXEC_WRITE_COST.to_string(), o.meta.cost as u64);
544 }
545 }
546 OutputData::Stream(stream) => {
547 let schema = stream.schema().clone();
548 let mut http_record_output = match util::collect(stream).await {
550 Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) {
551 Ok(rows) => rows,
552 Err(err) => {
553 return Err(ErrorResponse::from_error(err));
554 }
555 },
556 Err(err) => {
557 return Err(ErrorResponse::from_error(err));
558 }
559 };
560 if let Some(physical_plan) = o.meta.plan {
561 let mut result_map = HashMap::new();
562
563 let mut tmp = vec![&mut merge_map, &mut result_map];
564 collect_plan_metrics(&physical_plan, &mut tmp);
565 let re = result_map
566 .into_iter()
567 .map(|(k, v)| (k, Value::from(v)))
568 .collect::<HashMap<String, Value>>();
569 http_record_output.metrics.extend(re);
570 }
571 results.push(GreptimeQueryOutput::Records(http_record_output))
572 }
573 OutputData::RecordBatches(rbs) => {
574 match HttpRecordsOutput::try_new(rbs.schema(), rbs.take()) {
575 Ok(rows) => {
576 results.push(GreptimeQueryOutput::Records(rows));
577 }
578 Err(err) => {
579 return Err(ErrorResponse::from_error(err));
580 }
581 }
582 }
583 },
584
585 Err(err) => {
586 return Err(ErrorResponse::from_error(err));
587 }
588 }
589 }
590
591 let merge_map = merge_map
592 .into_iter()
593 .map(|(k, v)| (k, Value::from(v)))
594 .collect();
595
596 Ok((results, merge_map))
597}
598
599#[derive(Debug, Default, Serialize, Deserialize)]
600pub struct PromqlQuery {
601 pub query: String,
602 pub start: String,
603 pub end: String,
604 pub step: String,
605 pub lookback: Option<String>,
606 pub db: Option<String>,
607 pub format: Option<String>,
611 pub compression: Option<String>,
613 pub epoch: Option<String>,
624}
625
626impl From<PromqlQuery> for PromQuery {
627 fn from(query: PromqlQuery) -> Self {
628 PromQuery {
629 query: query.query,
630 start: query.start,
631 end: query.end,
632 step: query.step,
633 lookback: query
634 .lookback
635 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
636 alias: None,
638 }
639 }
640}
641
642#[axum_macros::debug_handler]
644#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "promql"))]
645pub async fn promql(
646 State(state): State<ApiState>,
647 Query(params): Query<PromqlQuery>,
648 Extension(mut query_ctx): Extension<QueryContext>,
649) -> Response {
650 let sql_handler = &state.sql_handler;
651 let exec_start = Instant::now();
652 let db = query_ctx.get_db_string();
653
654 query_ctx.set_channel(Channel::Promql);
655 let query_ctx = Arc::new(query_ctx);
656
657 let _timer = crate::metrics::METRIC_HTTP_PROMQL_ELAPSED
658 .with_label_values(&[db.as_str()])
659 .start_timer();
660
661 let resp = if let Some((status, msg)) =
662 validate_schema(sql_handler.clone(), query_ctx.clone()).await
663 {
664 let resp = ErrorResponse::from_error_message(status, msg);
665 HttpResponse::Error(resp)
666 } else {
667 let format = params
668 .format
669 .as_ref()
670 .map(|s| s.to_lowercase())
671 .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
672 .unwrap_or(ResponseFormat::GreptimedbV1);
673 let epoch = params
674 .epoch
675 .as_ref()
676 .map(|s| s.to_lowercase())
677 .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
678 let compression = params.compression.clone();
679
680 let prom_query = params.into();
681 let outputs = sql_handler.do_promql_query(&prom_query, query_ctx).await;
682
683 match format {
684 ResponseFormat::Arrow => ArrowResponse::from_output(outputs, compression).await,
685 ResponseFormat::Csv(with_names, with_types) => {
686 CsvResponse::from_output(outputs, with_names, with_types).await
687 }
688 ResponseFormat::Table => TableResponse::from_output(outputs).await,
689 ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
690 ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
691 ResponseFormat::Json => JsonResponse::from_output(outputs).await,
692 ResponseFormat::Null => NullResponse::from_output(outputs).await,
693 }
694 };
695
696 resp.with_execution_time(exec_start.elapsed().as_millis() as u64)
697 .into_response()
698}
699
700#[axum_macros::debug_handler]
702pub async fn metrics(
703 State(state): State<MetricsHandler>,
704 Query(_params): Query<HashMap<String, String>>,
705) -> String {
706 #[cfg(not(windows))]
711 if let Some(c) = crate::metrics::jemalloc::JEMALLOC_COLLECTOR.as_ref()
712 && let Err(e) = c.update()
713 {
714 common_telemetry::error!(e; "Failed to update jemalloc metrics");
715 }
716 state.render()
717}
718
719#[derive(Debug, Serialize, Deserialize)]
720pub struct HealthQuery {}
721
722#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
723pub struct HealthResponse {}
724
725#[axum_macros::debug_handler]
729pub async fn health(Query(_params): Query<HealthQuery>) -> Json<HealthResponse> {
730 Json(HealthResponse {})
731}
732
733#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
734pub struct StatusResponse<'a> {
735 pub commit: &'a str,
736 pub branch: &'a str,
737 pub rustc_version: &'a str,
738 pub hostname: String,
739 pub version: &'a str,
740}
741
742#[axum_macros::debug_handler]
744pub async fn status() -> Json<StatusResponse<'static>> {
745 let hostname = hostname::get()
746 .map(|s| s.to_string_lossy().to_string())
747 .unwrap_or_else(|_| "unknown".to_string());
748 let build_info = common_version::build_info();
749 Json(StatusResponse {
750 commit: build_info.commit,
751 branch: build_info.branch,
752 rustc_version: build_info.rustc,
753 hostname,
754 version: build_info.version,
755 })
756}
757
758#[axum_macros::debug_handler]
760pub async fn config(State(state): State<GreptimeOptionsConfigState>) -> Response {
761 (axum::http::StatusCode::OK, state.greptime_config_options).into_response()
762}
763
764async fn validate_schema(
765 sql_handler: ServerSqlQueryHandlerRef,
766 query_ctx: QueryContextRef,
767) -> Option<(StatusCode, String)> {
768 match sql_handler
769 .is_valid_schema(query_ctx.current_catalog(), &query_ctx.current_schema())
770 .await
771 {
772 Ok(true) => None,
773 Ok(false) => Some((
774 StatusCode::DatabaseNotFound,
775 format!("Database not found: {}", query_ctx.get_db_string()),
776 )),
777 Err(e) => Some((
778 StatusCode::Internal,
779 format!(
780 "Error checking database: {}, {}",
781 query_ctx.get_db_string(),
782 e.output_msg(),
783 ),
784 )),
785 }
786}
787
788pub async fn index() -> axum::response::Html<String> {
789 let name = common_version::product_name();
790 let version = common_version::version();
791 axum::response::Html(format!(
792 r#"<!DOCTYPE html>
793<html>
794<head><title>{name}</title></head>
795<body>
796<h1>{name}</h1>
797<p>Version: {version}</p>
798<ul>
799<li><a href="/dashboard">Dashboard UI</a></li>
800<li><a href="/v1/health">Health</a> (JSON)</li>
801<li><a href="/status">Status</a> (JSON)</li>
802<li><a href="/metrics">Metrics</a> (For Prometheus Scrape)</li>
803<li><a href="/config">Config</a> (TXT)</li>
804</ul>
805</body>
806</html>"#,
807 ))
808}
809
810#[cfg(test)]
811mod tests {
812 use super::*;
813
814 #[test]
815 fn test_final_analyze_event_uses_error_event_for_conversion_error() {
816 let (event_name, payload) = make_final_analyze_event(
817 Err((
818 "conversion failed".to_string(),
819 StatusCode::InvalidArguments as u32,
820 )),
821 7,
822 Instant::now(),
823 None,
824 );
825
826 assert_eq!(event_name, "error");
827 let value: Value = serde_json::from_str(&payload).unwrap();
828 assert_eq!(value["state"], "error");
829 assert_eq!(value["reason"], "conversion failed");
830 }
831}