1pub mod context;
16pub mod hints;
17pub mod protocol_ctx;
18pub mod session_config;
19pub mod table_name;
20
21use std::collections::HashMap;
22use std::net::SocketAddr;
23use std::sync::{Arc, RwLock};
24use std::time::Duration;
25
26use auth::UserInfoRef;
27use common_catalog::build_db_string;
28use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
29use common_recordbatch::cursor::RecordBatchStreamCursor;
30pub use common_session::ReadPreference;
31use common_time::timezone::get_timezone;
32use common_time::Timezone;
33use context::{ConfigurationVariables, QueryContextBuilder};
34use derive_more::Debug;
35
36use crate::context::{Channel, ConnInfo, QueryContextRef};
37
38#[derive(Debug)]
40pub struct Session {
41 catalog: RwLock<String>,
42 mutable_inner: Arc<RwLock<MutableInner>>,
43 conn_info: ConnInfo,
44 configuration_variables: Arc<ConfigurationVariables>,
45 process_id: u32,
47}
48
49pub type SessionRef = Arc<Session>;
50
51#[derive(Debug)]
53pub(crate) struct MutableInner {
54 schema: String,
55 user_info: UserInfoRef,
56 timezone: Timezone,
57 query_timeout: Option<Duration>,
58 read_preference: ReadPreference,
59 #[debug(skip)]
60 pub(crate) cursors: HashMap<String, Arc<RecordBatchStreamCursor>>,
61}
62
63impl Default for MutableInner {
64 fn default() -> Self {
65 Self {
66 schema: DEFAULT_SCHEMA_NAME.into(),
67 user_info: auth::userinfo_by_name(None),
68 timezone: get_timezone(None).clone(),
69 query_timeout: None,
70 read_preference: ReadPreference::Leader,
71 cursors: HashMap::with_capacity(0),
72 }
73 }
74}
75
76impl Session {
77 pub fn new(
78 addr: Option<SocketAddr>,
79 channel: Channel,
80 configuration_variables: ConfigurationVariables,
81 process_id: u32,
82 ) -> Self {
83 Session {
84 catalog: RwLock::new(DEFAULT_CATALOG_NAME.into()),
85 conn_info: ConnInfo::new(addr, channel),
86 configuration_variables: Arc::new(configuration_variables),
87 mutable_inner: Arc::new(RwLock::new(MutableInner::default())),
88 process_id,
89 }
90 }
91
92 pub fn new_query_context(&self) -> QueryContextRef {
93 QueryContextBuilder::default()
94 .current_catalog(self.catalog.read().unwrap().clone())
97 .mutable_session_data(self.mutable_inner.clone())
98 .sql_dialect(self.conn_info.channel.dialect())
99 .configuration_parameter(self.configuration_variables.clone())
100 .channel(self.conn_info.channel)
101 .process_id(self.process_id)
102 .conn_info(self.conn_info.clone())
103 .build()
104 .into()
105 }
106
107 pub fn conn_info(&self) -> &ConnInfo {
108 &self.conn_info
109 }
110
111 pub fn timezone(&self) -> Timezone {
112 self.mutable_inner.read().unwrap().timezone.clone()
113 }
114
115 pub fn read_preference(&self) -> ReadPreference {
116 self.mutable_inner.read().unwrap().read_preference
117 }
118
119 pub fn set_timezone(&self, tz: Timezone) {
120 let mut inner = self.mutable_inner.write().unwrap();
121 inner.timezone = tz;
122 }
123
124 pub fn set_read_preference(&self, read_preference: ReadPreference) {
125 self.mutable_inner.write().unwrap().read_preference = read_preference;
126 }
127
128 pub fn user_info(&self) -> UserInfoRef {
129 self.mutable_inner.read().unwrap().user_info.clone()
130 }
131
132 pub fn set_user_info(&self, user_info: UserInfoRef) {
133 self.mutable_inner.write().unwrap().user_info = user_info;
134 }
135
136 pub fn set_catalog(&self, catalog: String) {
137 *self.catalog.write().unwrap() = catalog;
138 }
139
140 pub fn catalog(&self) -> String {
141 self.catalog.read().unwrap().clone()
142 }
143
144 pub fn schema(&self) -> String {
145 self.mutable_inner.read().unwrap().schema.clone()
146 }
147
148 pub fn set_schema(&self, schema: String) {
149 self.mutable_inner.write().unwrap().schema = schema;
150 }
151
152 pub fn get_db_string(&self) -> String {
153 build_db_string(&self.catalog(), &self.schema())
154 }
155
156 pub fn process_id(&self) -> u32 {
157 self.process_id
158 }
159}