meta_srv/service/admin/
sequencer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use axum::extract::{self, State};
16use axum::http::StatusCode;
17use axum::response::{IntoResponse, Response};
18use axum::Json;
19use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
20use common_meta::sequence::SequenceRef;
21use serde::{Deserialize, Serialize};
22use servers::http::result::error_result::ErrorResponse;
23use snafu::{ensure, ResultExt};
24
25use crate::error::{
26    PeekSequenceSnafu, Result, RuntimeSwitchManagerSnafu, SetNextSequenceSnafu, UnexpectedSnafu,
27};
28
29#[derive(Clone)]
30pub(crate) struct TableIdSequenceHandler {
31    pub(crate) table_id_sequence: SequenceRef,
32    pub(crate) runtime_switch_manager: RuntimeSwitchManagerRef,
33}
34
35impl TableIdSequenceHandler {
36    async fn set_next_table_id(&self, next_table_id: u32) -> Result<()> {
37        ensure!(
38            self.runtime_switch_manager
39                .recovery_mode()
40                .await
41                .context(RuntimeSwitchManagerSnafu)?,
42            UnexpectedSnafu {
43                violated: "Setting next table id is only allowed in recovery mode",
44            }
45        );
46
47        self.table_id_sequence
48            .jump_to(next_table_id as u64)
49            .await
50            .context(SetNextSequenceSnafu)
51    }
52
53    async fn peek_table_id(&self) -> Result<u32> {
54        let next_table_id = self
55            .table_id_sequence
56            .peek()
57            .await
58            .context(PeekSequenceSnafu)?;
59        Ok(next_table_id as u32)
60    }
61}
62
63#[derive(Debug, Serialize, Deserialize)]
64pub(crate) struct NextTableIdResponse {
65    pub(crate) next_table_id: u32,
66}
67
68#[derive(Debug, Serialize, Deserialize)]
69pub(crate) struct ResetTableIdRequest {
70    pub(crate) next_table_id: u32,
71}
72
73/// Set the next table id.
74#[axum_macros::debug_handler]
75pub(crate) async fn set_next_table_id(
76    State(handler): State<TableIdSequenceHandler>,
77    extract::Json(ResetTableIdRequest { next_table_id }): extract::Json<ResetTableIdRequest>,
78) -> Response {
79    match handler.set_next_table_id(next_table_id).await {
80        Ok(_) => (StatusCode::OK, Json(NextTableIdResponse { next_table_id })).into_response(),
81        Err(e) => ErrorResponse::from_error(e).into_response(),
82    }
83}
84
85/// Get the next table id without incrementing the sequence.
86#[axum_macros::debug_handler]
87pub(crate) async fn get_next_table_id(State(handler): State<TableIdSequenceHandler>) -> Response {
88    match handler.peek_table_id().await {
89        Ok(next_table_id) => {
90            (StatusCode::OK, Json(NextTableIdResponse { next_table_id })).into_response()
91        }
92        Err(e) => ErrorResponse::from_error(e).into_response(),
93    }
94}