common_function/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};
16
17/// Shared state for SQL functions.
18/// The handlers in state may be `None` in cli command-line or test cases.
19#[derive(Clone, Default)]
20pub struct FunctionState {
21    // The table mutation handler
22    pub table_mutation_handler: Option<TableMutationHandlerRef>,
23    // The procedure service handler
24    pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
25    // The flownode handler
26    pub flow_service_handler: Option<FlowServiceHandlerRef>,
27}
28
29impl FunctionState {
30    /// Create a mock [`FunctionState`] for test.
31    #[cfg(any(test, feature = "testing"))]
32    pub fn mock() -> Self {
33        use std::sync::Arc;
34
35        use api::v1::meta::{ProcedureStatus, ReconcileRequest};
36        use async_trait::async_trait;
37        use catalog::CatalogManagerRef;
38        use common_base::AffectedRows;
39        use common_meta::rpc::procedure::{
40            GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest,
41            MigrateRegionRequest, ProcedureStateResponse,
42        };
43        use common_query::Output;
44        use common_query::error::Result;
45        use session::context::QueryContextRef;
46        use store_api::storage::RegionId;
47        use table::requests::{
48            BuildIndexTableRequest, CompactTableRequest, DeleteRequest, FlushTableRequest,
49            InsertRequest,
50        };
51
52        use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
53        struct MockProcedureServiceHandler;
54        struct MockTableMutationHandler;
55        struct MockFlowServiceHandler;
56        const ROWS: usize = 42;
57
58        #[async_trait]
59        impl ProcedureServiceHandler for MockProcedureServiceHandler {
60            async fn migrate_region(
61                &self,
62                _request: MigrateRegionRequest,
63            ) -> Result<Option<String>> {
64                Ok(Some("test_pid".to_string()))
65            }
66
67            async fn reconcile(&self, _request: ReconcileRequest) -> Result<Option<String>> {
68                Ok(Some("test_pid".to_string()))
69            }
70
71            async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
72                Ok(ProcedureStateResponse {
73                    status: ProcedureStatus::Done.into(),
74                    error: "OK".to_string(),
75                    ..Default::default()
76                })
77            }
78
79            async fn manage_region_follower(
80                &self,
81                _request: ManageRegionFollowerRequest,
82            ) -> Result<()> {
83                Ok(())
84            }
85
86            async fn gc_regions(&self, _request: GcRegionsRequest) -> Result<GcResponse> {
87                Ok(GcResponse {
88                    processed_regions: 1,
89                    need_retry_regions: vec![],
90                    deleted_files: 0,
91                    deleted_indexes: 0,
92                })
93            }
94
95            async fn gc_table(&self, _request: GcTableRequest) -> Result<GcResponse> {
96                Ok(GcResponse {
97                    processed_regions: 1,
98                    need_retry_regions: vec![],
99                    deleted_files: 0,
100                    deleted_indexes: 0,
101                })
102            }
103
104            fn catalog_manager(&self) -> &CatalogManagerRef {
105                unimplemented!()
106            }
107        }
108
109        #[async_trait]
110        impl TableMutationHandler for MockTableMutationHandler {
111            async fn insert(
112                &self,
113                _request: InsertRequest,
114                _ctx: QueryContextRef,
115            ) -> Result<Output> {
116                Ok(Output::new_with_affected_rows(ROWS))
117            }
118
119            async fn delete(
120                &self,
121                _request: DeleteRequest,
122                _ctx: QueryContextRef,
123            ) -> Result<AffectedRows> {
124                Ok(ROWS)
125            }
126
127            async fn flush(
128                &self,
129                _request: FlushTableRequest,
130                _ctx: QueryContextRef,
131            ) -> Result<AffectedRows> {
132                Ok(ROWS)
133            }
134
135            async fn compact(
136                &self,
137                _request: CompactTableRequest,
138                _ctx: QueryContextRef,
139            ) -> Result<AffectedRows> {
140                Ok(ROWS)
141            }
142
143            async fn build_index(
144                &self,
145                _request: BuildIndexTableRequest,
146                _ctx: QueryContextRef,
147            ) -> Result<AffectedRows> {
148                Ok(ROWS)
149            }
150
151            async fn flush_region(
152                &self,
153                _region_id: RegionId,
154                _ctx: QueryContextRef,
155            ) -> Result<AffectedRows> {
156                Ok(ROWS)
157            }
158
159            async fn compact_region(
160                &self,
161                _region_id: RegionId,
162                _ctx: QueryContextRef,
163            ) -> Result<AffectedRows> {
164                Ok(ROWS)
165            }
166        }
167
168        #[async_trait]
169        impl FlowServiceHandler for MockFlowServiceHandler {
170            async fn flush(
171                &self,
172                _catalog: &str,
173                _flow: &str,
174                _ctx: QueryContextRef,
175            ) -> Result<api::v1::flow::FlowResponse> {
176                todo!()
177            }
178        }
179
180        Self {
181            table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
182            procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
183            flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
184        }
185    }
186}