operator/statement/
kill.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use catalog::process_manager::ProcessManagerRef;
16use common_frontend::DisplayProcessId;
17use common_query::Output;
18use common_telemetry::error;
19use session::context::QueryContextRef;
20use snafu::ResultExt;
21use sql::statements::kill::Kill;
22
23use crate::error;
24use crate::statement::StatementExecutor;
25
26impl StatementExecutor {
27    pub async fn execute_kill(
28        &self,
29        query_ctx: QueryContextRef,
30        kill: Kill,
31    ) -> error::Result<Output> {
32        let Some(process_manager) = self.process_manager.as_ref() else {
33            error!("Process manager is not initialized");
34            return error::ProcessManagerMissingSnafu.fail();
35        };
36
37        let succ = match kill {
38            Kill::ProcessId(process_id) => {
39                self.kill_process_id(process_manager, query_ctx, process_id)
40                    .await?
41            }
42            Kill::ConnectionId(conn_id) => {
43                self.kill_connection_id(process_manager, query_ctx, conn_id)
44                    .await?
45            }
46        };
47        Ok(Output::new_with_affected_rows(if succ { 1 } else { 0 }))
48    }
49
50    /// Handles `KILL <PROCESS_ID>` statements.
51    async fn kill_process_id(
52        &self,
53        pm: &ProcessManagerRef,
54        query_ctx: QueryContextRef,
55        process_id: String,
56    ) -> error::Result<bool> {
57        let display_id = DisplayProcessId::try_from(process_id.as_str())
58            .map_err(|_| error::InvalidProcessIdSnafu { id: process_id }.build())?;
59
60        let current_user_catalog = query_ctx.current_catalog().to_string();
61        pm.kill_process(display_id.server_addr, current_user_catalog, display_id.id)
62            .await
63            .context(error::CatalogSnafu)
64    }
65
66    /// Handles MySQL `KILL QUERY <CONNECTION_ID>` statements.
67    pub async fn kill_connection_id(
68        &self,
69        pm: &ProcessManagerRef,
70        query_ctx: QueryContextRef,
71        connection_id: u32,
72    ) -> error::Result<bool> {
73        pm.kill_local_process(query_ctx.current_catalog().to_string(), connection_id)
74            .await
75            .context(error::CatalogSnafu)
76    }
77}