1pub mod context;
16pub mod hints;
17pub mod session_config;
18pub mod table_name;
19
20use std::collections::HashMap;
21use std::net::SocketAddr;
22use std::sync::{Arc, RwLock};
23use std::time::Duration;
24
25use auth::UserInfoRef;
26use common_catalog::build_db_string;
27use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
28use common_recordbatch::cursor::RecordBatchStreamCursor;
29pub use common_session::ReadPreference;
30use common_time::timezone::get_timezone;
31use common_time::Timezone;
32use context::{ConfigurationVariables, QueryContextBuilder};
33use derive_more::Debug;
34
35use crate::context::{Channel, ConnInfo, QueryContextRef};
36
37#[derive(Debug)]
39pub struct Session {
40 catalog: RwLock<String>,
41 mutable_inner: Arc<RwLock<MutableInner>>,
42 conn_info: ConnInfo,
43 configuration_variables: Arc<ConfigurationVariables>,
44 process_id: u32,
46}
47
48pub type SessionRef = Arc<Session>;
49
50#[derive(Debug)]
52pub(crate) struct MutableInner {
53 schema: String,
54 user_info: UserInfoRef,
55 timezone: Timezone,
56 query_timeout: Option<Duration>,
57 read_preference: ReadPreference,
58 #[debug(skip)]
59 pub(crate) cursors: HashMap<String, Arc<RecordBatchStreamCursor>>,
60}
61
62impl Default for MutableInner {
63 fn default() -> Self {
64 Self {
65 schema: DEFAULT_SCHEMA_NAME.into(),
66 user_info: auth::userinfo_by_name(None),
67 timezone: get_timezone(None).clone(),
68 query_timeout: None,
69 read_preference: ReadPreference::Leader,
70 cursors: HashMap::with_capacity(0),
71 }
72 }
73}
74
75impl Session {
76 pub fn new(
77 addr: Option<SocketAddr>,
78 channel: Channel,
79 configuration_variables: ConfigurationVariables,
80 process_id: u32,
81 ) -> Self {
82 Session {
83 catalog: RwLock::new(DEFAULT_CATALOG_NAME.into()),
84 conn_info: ConnInfo::new(addr, channel),
85 configuration_variables: Arc::new(configuration_variables),
86 mutable_inner: Arc::new(RwLock::new(MutableInner::default())),
87 process_id,
88 }
89 }
90
91 pub fn new_query_context(&self) -> QueryContextRef {
92 QueryContextBuilder::default()
93 .current_catalog(self.catalog.read().unwrap().clone())
96 .mutable_session_data(self.mutable_inner.clone())
97 .sql_dialect(self.conn_info.channel.dialect())
98 .configuration_parameter(self.configuration_variables.clone())
99 .channel(self.conn_info.channel)
100 .process_id(self.process_id)
101 .conn_info(self.conn_info.clone())
102 .build()
103 .into()
104 }
105
106 pub fn conn_info(&self) -> &ConnInfo {
107 &self.conn_info
108 }
109
110 pub fn timezone(&self) -> Timezone {
111 self.mutable_inner.read().unwrap().timezone.clone()
112 }
113
114 pub fn read_preference(&self) -> ReadPreference {
115 self.mutable_inner.read().unwrap().read_preference
116 }
117
118 pub fn set_timezone(&self, tz: Timezone) {
119 let mut inner = self.mutable_inner.write().unwrap();
120 inner.timezone = tz;
121 }
122
123 pub fn set_read_preference(&self, read_preference: ReadPreference) {
124 self.mutable_inner.write().unwrap().read_preference = read_preference;
125 }
126
127 pub fn user_info(&self) -> UserInfoRef {
128 self.mutable_inner.read().unwrap().user_info.clone()
129 }
130
131 pub fn set_user_info(&self, user_info: UserInfoRef) {
132 self.mutable_inner.write().unwrap().user_info = user_info;
133 }
134
135 pub fn set_catalog(&self, catalog: String) {
136 *self.catalog.write().unwrap() = catalog;
137 }
138
139 pub fn catalog(&self) -> String {
140 self.catalog.read().unwrap().clone()
141 }
142
143 pub fn schema(&self) -> String {
144 self.mutable_inner.read().unwrap().schema.clone()
145 }
146
147 pub fn set_schema(&self, schema: String) {
148 self.mutable_inner.write().unwrap().schema = schema;
149 }
150
151 pub fn get_db_string(&self) -> String {
152 build_db_string(&self.catalog(), &self.schema())
153 }
154
155 pub fn process_id(&self) -> u32 {
156 self.process_id
157 }
158}