common_function/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};
16
17/// Shared state for SQL functions.
18/// The handlers in state may be `None` in cli command-line or test cases.
19#[derive(Clone, Default)]
20pub struct FunctionState {
21    // The table mutation handler
22    pub table_mutation_handler: Option<TableMutationHandlerRef>,
23    // The procedure service handler
24    pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
25    // The flownode handler
26    pub flow_service_handler: Option<FlowServiceHandlerRef>,
27}
28
29impl FunctionState {
30    /// Create a mock [`FunctionState`] for test.
31    #[cfg(any(test, feature = "testing"))]
32    pub fn mock() -> Self {
33        use std::sync::Arc;
34
35        use api::v1::meta::ProcedureStatus;
36        use async_trait::async_trait;
37        use catalog::CatalogManagerRef;
38        use common_base::AffectedRows;
39        use common_meta::rpc::procedure::{
40            AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
41            RemoveRegionFollowerRequest,
42        };
43        use common_query::error::Result;
44        use common_query::Output;
45        use session::context::QueryContextRef;
46        use store_api::storage::RegionId;
47        use table::requests::{
48            CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
49        };
50
51        use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
52        struct MockProcedureServiceHandler;
53        struct MockTableMutationHandler;
54        struct MockFlowServiceHandler;
55        const ROWS: usize = 42;
56
57        #[async_trait]
58        impl ProcedureServiceHandler for MockProcedureServiceHandler {
59            async fn migrate_region(
60                &self,
61                _request: MigrateRegionRequest,
62            ) -> Result<Option<String>> {
63                Ok(Some("test_pid".to_string()))
64            }
65
66            async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
67                Ok(ProcedureStateResponse {
68                    status: ProcedureStatus::Done.into(),
69                    error: "OK".to_string(),
70                    ..Default::default()
71                })
72            }
73
74            async fn add_region_follower(&self, _request: AddRegionFollowerRequest) -> Result<()> {
75                Ok(())
76            }
77
78            async fn remove_region_follower(
79                &self,
80                _request: RemoveRegionFollowerRequest,
81            ) -> Result<()> {
82                Ok(())
83            }
84
85            fn catalog_manager(&self) -> &CatalogManagerRef {
86                unimplemented!()
87            }
88        }
89
90        #[async_trait]
91        impl TableMutationHandler for MockTableMutationHandler {
92            async fn insert(
93                &self,
94                _request: InsertRequest,
95                _ctx: QueryContextRef,
96            ) -> Result<Output> {
97                Ok(Output::new_with_affected_rows(ROWS))
98            }
99
100            async fn delete(
101                &self,
102                _request: DeleteRequest,
103                _ctx: QueryContextRef,
104            ) -> Result<AffectedRows> {
105                Ok(ROWS)
106            }
107
108            async fn flush(
109                &self,
110                _request: FlushTableRequest,
111                _ctx: QueryContextRef,
112            ) -> Result<AffectedRows> {
113                Ok(ROWS)
114            }
115
116            async fn compact(
117                &self,
118                _request: CompactTableRequest,
119                _ctx: QueryContextRef,
120            ) -> Result<AffectedRows> {
121                Ok(ROWS)
122            }
123
124            async fn flush_region(
125                &self,
126                _region_id: RegionId,
127                _ctx: QueryContextRef,
128            ) -> Result<AffectedRows> {
129                Ok(ROWS)
130            }
131
132            async fn compact_region(
133                &self,
134                _region_id: RegionId,
135                _ctx: QueryContextRef,
136            ) -> Result<AffectedRows> {
137                Ok(ROWS)
138            }
139        }
140
141        #[async_trait]
142        impl FlowServiceHandler for MockFlowServiceHandler {
143            async fn flush(
144                &self,
145                _catalog: &str,
146                _flow: &str,
147                _ctx: QueryContextRef,
148            ) -> Result<api::v1::flow::FlowResponse> {
149                todo!()
150            }
151        }
152
153        Self {
154            table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
155            procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
156            flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
157        }
158    }
159}