use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use axum::extract::{Query, RawBody, State};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use serde::{Deserialize, Serialize};
use session::context::QueryContext;
use snafu::ResultExt;
use crate::error::{HyperSnafu, InvalidUtf8ValueSnafu};
use crate::http::result::error_result::ErrorResponse;
use crate::http::{ApiState, GreptimedbV1Response, HttpResponse};
macro_rules! json_err {
($e: expr) => {{
return HttpResponse::Error(ErrorResponse::from_error($e));
}};
($msg: expr, $code: expr) => {{
return HttpResponse::Error(ErrorResponse::from_error_message($code, $msg.to_string()));
}};
}
macro_rules! unwrap_or_json_err {
($result: expr) => {
match $result {
Ok(result) => result,
Err(e) => json_err!(e),
}
};
}
#[axum_macros::debug_handler]
pub async fn scripts(
State(state): State<ApiState>,
Query(params): Query<ScriptQuery>,
RawBody(body): RawBody,
) -> HttpResponse {
if let Some(script_handler) = &state.script_handler {
let catalog = params
.catalog
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
let schema = params.db.as_ref();
if schema.is_none() || schema.unwrap().is_empty() {
json_err!("invalid schema", StatusCode::InvalidArguments)
}
let name = params.name.as_ref();
if name.is_none() || name.unwrap().is_empty() {
json_err!("invalid name", StatusCode::InvalidArguments);
}
let bytes = unwrap_or_json_err!(hyper::body::to_bytes(body).await.context(HyperSnafu));
let script =
unwrap_or_json_err!(String::from_utf8(bytes.to_vec()).context(InvalidUtf8ValueSnafu));
let query_ctx = Arc::new(QueryContext::with(&catalog, schema.unwrap()));
match script_handler
.insert_script(query_ctx, name.unwrap(), &script)
.await
{
Ok(()) => GreptimedbV1Response::from_output(vec![]).await,
Err(e) => json_err!(
format!("Insert script error: {}", e.output_msg()),
e.status_code()
),
}
} else {
json_err!(
"Script execution not supported, missing script handler",
StatusCode::Unsupported
);
}
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct ScriptQuery {
pub catalog: Option<String>,
pub db: Option<String>,
pub name: Option<String>,
#[serde(flatten)]
pub params: HashMap<String, String>,
}
#[axum_macros::debug_handler]
pub async fn run_script(
State(state): State<ApiState>,
Query(params): Query<ScriptQuery>,
) -> HttpResponse {
if let Some(script_handler) = &state.script_handler {
let catalog = params
.catalog
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
let start = Instant::now();
let schema = params.db.as_ref();
if schema.is_none() || schema.unwrap().is_empty() {
json_err!("invalid schema", StatusCode::InvalidArguments)
}
let name = params.name.as_ref();
if name.is_none() || name.unwrap().is_empty() {
json_err!("invalid name", StatusCode::InvalidArguments);
}
let query_ctx = Arc::new(QueryContext::with(&catalog, schema.unwrap()));
let output = script_handler
.execute_script(query_ctx, name.unwrap(), params.params)
.await;
let resp = GreptimedbV1Response::from_output(vec![output]).await;
resp.with_execution_time(start.elapsed().as_millis() as u64)
} else {
json_err!(
"Script execution not supported, missing script handler",
StatusCode::Unsupported
);
}
}