Skip to main content

operator/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, RwLock};
16
17use common_time::Timezone;
18use session::context::{QueryContextBuilder, QueryContextRef};
19use snafu::ResultExt;
20
21use crate::error::{Error, InvalidTimezoneSnafu};
22
23pub fn to_meta_query_context(
24    query_context: QueryContextRef,
25) -> common_meta::rpc::ddl::QueryContext {
26    common_meta::rpc::ddl::QueryContext {
27        current_catalog: query_context.current_catalog().to_string(),
28        current_schema: query_context.current_schema().clone(),
29        timezone: query_context.timezone().to_string(),
30        extensions: query_context.extensions(),
31        channel: query_context.channel() as u8,
32        snapshot_seqs: query_context.snapshots(),
33        sst_min_sequences: query_context.sst_min_sequences(),
34    }
35}
36
37pub fn try_to_session_query_context(
38    value: common_meta::rpc::ddl::QueryContext,
39) -> Result<session::context::QueryContext, Error> {
40    Ok(QueryContextBuilder::default()
41        .current_catalog(value.current_catalog)
42        .current_schema(value.current_schema)
43        .timezone(
44            Timezone::from_tz_string(&value.timezone).context(InvalidTimezoneSnafu {
45                timezone: value.timezone,
46            })?,
47        )
48        .extensions(value.extensions)
49        .channel((value.channel as u32).into())
50        .snapshot_seqs(Arc::new(RwLock::new(value.snapshot_seqs)))
51        .sst_min_sequences(Arc::new(RwLock::new(value.sst_min_sequences)))
52        .build())
53}
54
55#[cfg(test)]
56mod tests {
57    use std::collections::HashMap;
58    use std::sync::{Arc, RwLock};
59
60    use common_time::Timezone;
61    use session::context::QueryContextBuilder;
62
63    use super::{to_meta_query_context, try_to_session_query_context};
64
65    #[test]
66    fn test_query_context_meta_roundtrip_with_sequences() {
67        let session_ctx = Arc::new(
68            QueryContextBuilder::default()
69                .current_catalog("c1".to_string())
70                .current_schema("s1".to_string())
71                .timezone(Timezone::from_tz_string("UTC").unwrap())
72                .set_extension("flow.return_region_seq".to_string(), "true".to_string())
73                .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(10, 100)]))))
74                .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(10, 90)]))))
75                .build(),
76        );
77
78        let meta_ctx = to_meta_query_context(session_ctx);
79        let roundtrip = try_to_session_query_context(meta_ctx).unwrap();
80
81        assert_eq!(roundtrip.current_catalog(), "c1");
82        assert_eq!(roundtrip.current_schema(), "s1");
83        assert_eq!(roundtrip.snapshots(), HashMap::from([(10, 100)]));
84        assert_eq!(roundtrip.sst_min_sequences(), HashMap::from([(10, 90)]));
85        assert_eq!(roundtrip.extension("flow.return_region_seq"), Some("true"));
86    }
87}