1use std::sync::{Arc, RwLock};
16
17use common_time::Timezone;
18use session::context::{QueryContextBuilder, QueryContextRef};
19use snafu::ResultExt;
20
21use crate::error::{Error, InvalidTimezoneSnafu};
22
23pub fn to_meta_query_context(
24 query_context: QueryContextRef,
25) -> common_meta::rpc::ddl::QueryContext {
26 common_meta::rpc::ddl::QueryContext {
27 current_catalog: query_context.current_catalog().to_string(),
28 current_schema: query_context.current_schema().clone(),
29 timezone: query_context.timezone().to_string(),
30 extensions: query_context.extensions(),
31 channel: query_context.channel() as u8,
32 snapshot_seqs: query_context.snapshots(),
33 sst_min_sequences: query_context.sst_min_sequences(),
34 }
35}
36
37pub fn try_to_session_query_context(
38 value: common_meta::rpc::ddl::QueryContext,
39) -> Result<session::context::QueryContext, Error> {
40 Ok(QueryContextBuilder::default()
41 .current_catalog(value.current_catalog)
42 .current_schema(value.current_schema)
43 .timezone(
44 Timezone::from_tz_string(&value.timezone).context(InvalidTimezoneSnafu {
45 timezone: value.timezone,
46 })?,
47 )
48 .extensions(value.extensions)
49 .channel((value.channel as u32).into())
50 .snapshot_seqs(Arc::new(RwLock::new(value.snapshot_seqs)))
51 .sst_min_sequences(Arc::new(RwLock::new(value.sst_min_sequences)))
52 .build())
53}
54
55#[cfg(test)]
56mod tests {
57 use std::collections::HashMap;
58 use std::sync::{Arc, RwLock};
59
60 use common_time::Timezone;
61 use session::context::QueryContextBuilder;
62
63 use super::{to_meta_query_context, try_to_session_query_context};
64
65 #[test]
66 fn test_query_context_meta_roundtrip_with_sequences() {
67 let session_ctx = Arc::new(
68 QueryContextBuilder::default()
69 .current_catalog("c1".to_string())
70 .current_schema("s1".to_string())
71 .timezone(Timezone::from_tz_string("UTC").unwrap())
72 .set_extension("flow.return_region_seq".to_string(), "true".to_string())
73 .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(10, 100)]))))
74 .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(10, 90)]))))
75 .build(),
76 );
77
78 let meta_ctx = to_meta_query_context(session_ctx);
79 let roundtrip = try_to_session_query_context(meta_ctx).unwrap();
80
81 assert_eq!(roundtrip.current_catalog(), "c1");
82 assert_eq!(roundtrip.current_schema(), "s1");
83 assert_eq!(roundtrip.snapshots(), HashMap::from([(10, 100)]));
84 assert_eq!(roundtrip.sst_min_sequences(), HashMap::from([(10, 90)]));
85 assert_eq!(roundtrip.extension("flow.return_region_seq"), Some("true"));
86 }
87}