operator/statement/
kill.rs1use catalog::process_manager::ProcessManagerRef;
16use common_frontend::DisplayProcessId;
17use common_query::Output;
18use common_telemetry::error;
19use session::context::QueryContextRef;
20use snafu::ResultExt;
21use sql::statements::kill::Kill;
22
23use crate::error;
24use crate::statement::StatementExecutor;
25
26impl StatementExecutor {
27 pub async fn execute_kill(
28 &self,
29 query_ctx: QueryContextRef,
30 kill: Kill,
31 ) -> error::Result<Output> {
32 let Some(process_manager) = self.process_manager.as_ref() else {
33 error!("Process manager is not initialized");
34 return error::ProcessManagerMissingSnafu.fail();
35 };
36
37 let succ = match kill {
38 Kill::ProcessId(process_id) => {
39 self.kill_process_id(process_manager, query_ctx, process_id)
40 .await?
41 }
42 Kill::ConnectionId(conn_id) => {
43 self.kill_connection_id(process_manager, query_ctx, conn_id)
44 .await?
45 }
46 };
47 Ok(Output::new_with_affected_rows(if succ { 1 } else { 0 }))
48 }
49
50 async fn kill_process_id(
52 &self,
53 pm: &ProcessManagerRef,
54 query_ctx: QueryContextRef,
55 process_id: String,
56 ) -> error::Result<bool> {
57 let display_id = DisplayProcessId::try_from(process_id.as_str())
58 .map_err(|_| error::InvalidProcessIdSnafu { id: process_id }.build())?;
59
60 let current_user_catalog = query_ctx.current_catalog().to_string();
61 pm.kill_process(display_id.server_addr, current_user_catalog, display_id.id)
62 .await
63 .context(error::CatalogSnafu)
64 }
65
66 pub async fn kill_connection_id(
68 &self,
69 pm: &ProcessManagerRef,
70 query_ctx: QueryContextRef,
71 connection_id: u32,
72 ) -> error::Result<bool> {
73 pm.kill_local_process(query_ctx.current_catalog().to_string(), connection_id)
74 .await
75 .context(error::CatalogSnafu)
76 }
77}