common_function/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};
16
17/// Shared state for SQL functions.
18/// The handlers in state may be `None` in cli command-line or test cases.
19#[derive(Clone, Default)]
20pub struct FunctionState {
21    // The table mutation handler
22    pub table_mutation_handler: Option<TableMutationHandlerRef>,
23    // The procedure service handler
24    pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
25    // The flownode handler
26    pub flow_service_handler: Option<FlowServiceHandlerRef>,
27}
28
29impl FunctionState {
30    /// Create a mock [`FunctionState`] for test.
31    #[cfg(any(test, feature = "testing"))]
32    pub fn mock() -> Self {
33        use std::sync::Arc;
34
35        use api::v1::meta::{ProcedureStatus, ReconcileRequest};
36        use async_trait::async_trait;
37        use catalog::CatalogManagerRef;
38        use common_base::AffectedRows;
39        use common_meta::rpc::procedure::{
40            ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
41        };
42        use common_query::Output;
43        use common_query::error::Result;
44        use session::context::QueryContextRef;
45        use store_api::storage::RegionId;
46        use table::requests::{
47            BuildIndexTableRequest, CompactTableRequest, DeleteRequest, FlushTableRequest,
48            InsertRequest,
49        };
50
51        use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
52        struct MockProcedureServiceHandler;
53        struct MockTableMutationHandler;
54        struct MockFlowServiceHandler;
55        const ROWS: usize = 42;
56
57        #[async_trait]
58        impl ProcedureServiceHandler for MockProcedureServiceHandler {
59            async fn migrate_region(
60                &self,
61                _request: MigrateRegionRequest,
62            ) -> Result<Option<String>> {
63                Ok(Some("test_pid".to_string()))
64            }
65
66            async fn reconcile(&self, _request: ReconcileRequest) -> Result<Option<String>> {
67                Ok(Some("test_pid".to_string()))
68            }
69
70            async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
71                Ok(ProcedureStateResponse {
72                    status: ProcedureStatus::Done.into(),
73                    error: "OK".to_string(),
74                    ..Default::default()
75                })
76            }
77
78            async fn manage_region_follower(
79                &self,
80                _request: ManageRegionFollowerRequest,
81            ) -> Result<()> {
82                Ok(())
83            }
84
85            fn catalog_manager(&self) -> &CatalogManagerRef {
86                unimplemented!()
87            }
88        }
89
90        #[async_trait]
91        impl TableMutationHandler for MockTableMutationHandler {
92            async fn insert(
93                &self,
94                _request: InsertRequest,
95                _ctx: QueryContextRef,
96            ) -> Result<Output> {
97                Ok(Output::new_with_affected_rows(ROWS))
98            }
99
100            async fn delete(
101                &self,
102                _request: DeleteRequest,
103                _ctx: QueryContextRef,
104            ) -> Result<AffectedRows> {
105                Ok(ROWS)
106            }
107
108            async fn flush(
109                &self,
110                _request: FlushTableRequest,
111                _ctx: QueryContextRef,
112            ) -> Result<AffectedRows> {
113                Ok(ROWS)
114            }
115
116            async fn compact(
117                &self,
118                _request: CompactTableRequest,
119                _ctx: QueryContextRef,
120            ) -> Result<AffectedRows> {
121                Ok(ROWS)
122            }
123
124            async fn build_index(
125                &self,
126                _request: BuildIndexTableRequest,
127                _ctx: QueryContextRef,
128            ) -> Result<AffectedRows> {
129                Ok(ROWS)
130            }
131
132            async fn flush_region(
133                &self,
134                _region_id: RegionId,
135                _ctx: QueryContextRef,
136            ) -> Result<AffectedRows> {
137                Ok(ROWS)
138            }
139
140            async fn compact_region(
141                &self,
142                _region_id: RegionId,
143                _ctx: QueryContextRef,
144            ) -> Result<AffectedRows> {
145                Ok(ROWS)
146            }
147        }
148
149        #[async_trait]
150        impl FlowServiceHandler for MockFlowServiceHandler {
151            async fn flush(
152                &self,
153                _catalog: &str,
154                _flow: &str,
155                _ctx: QueryContextRef,
156            ) -> Result<api::v1::flow::FlowResponse> {
157                todo!()
158            }
159        }
160
161        Self {
162            table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
163            procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
164            flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
165        }
166    }
167}