1pub mod context;
16pub mod hints;
17pub mod session_config;
18pub mod table_name;
19
20use std::collections::HashMap;
21use std::net::SocketAddr;
22use std::sync::{Arc, RwLock};
23use std::time::Duration;
24
25use auth::UserInfoRef;
26use common_catalog::build_db_string;
27use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
28use common_recordbatch::cursor::RecordBatchStreamCursor;
29pub use common_session::ReadPreference;
30use common_time::timezone::get_timezone;
31use common_time::Timezone;
32use context::{ConfigurationVariables, QueryContextBuilder};
33use derive_more::Debug;
34
35use crate::context::{Channel, ConnInfo, QueryContextRef};
36
37#[derive(Debug)]
39pub struct Session {
40 catalog: RwLock<String>,
41 mutable_inner: Arc<RwLock<MutableInner>>,
42 conn_info: ConnInfo,
43 configuration_variables: Arc<ConfigurationVariables>,
44}
45
46pub type SessionRef = Arc<Session>;
47
48#[derive(Debug)]
50pub(crate) struct MutableInner {
51 schema: String,
52 user_info: UserInfoRef,
53 timezone: Timezone,
54 query_timeout: Option<Duration>,
55 read_preference: ReadPreference,
56 #[debug(skip)]
57 pub(crate) cursors: HashMap<String, Arc<RecordBatchStreamCursor>>,
58}
59
60impl Default for MutableInner {
61 fn default() -> Self {
62 Self {
63 schema: DEFAULT_SCHEMA_NAME.into(),
64 user_info: auth::userinfo_by_name(None),
65 timezone: get_timezone(None).clone(),
66 query_timeout: None,
67 read_preference: ReadPreference::Leader,
68 cursors: HashMap::with_capacity(0),
69 }
70 }
71}
72
73impl Session {
74 pub fn new(
75 addr: Option<SocketAddr>,
76 channel: Channel,
77 configuration_variables: ConfigurationVariables,
78 ) -> Self {
79 Session {
80 catalog: RwLock::new(DEFAULT_CATALOG_NAME.into()),
81 conn_info: ConnInfo::new(addr, channel),
82 configuration_variables: Arc::new(configuration_variables),
83 mutable_inner: Arc::new(RwLock::new(MutableInner::default())),
84 }
85 }
86
87 pub fn new_query_context(&self) -> QueryContextRef {
88 QueryContextBuilder::default()
89 .current_catalog(self.catalog.read().unwrap().clone())
92 .mutable_session_data(self.mutable_inner.clone())
93 .sql_dialect(self.conn_info.channel.dialect())
94 .configuration_parameter(self.configuration_variables.clone())
95 .channel(self.conn_info.channel)
96 .build()
97 .into()
98 }
99
100 pub fn conn_info(&self) -> &ConnInfo {
101 &self.conn_info
102 }
103
104 pub fn timezone(&self) -> Timezone {
105 self.mutable_inner.read().unwrap().timezone.clone()
106 }
107
108 pub fn read_preference(&self) -> ReadPreference {
109 self.mutable_inner.read().unwrap().read_preference
110 }
111
112 pub fn set_timezone(&self, tz: Timezone) {
113 let mut inner = self.mutable_inner.write().unwrap();
114 inner.timezone = tz;
115 }
116
117 pub fn set_read_preference(&self, read_preference: ReadPreference) {
118 self.mutable_inner.write().unwrap().read_preference = read_preference;
119 }
120
121 pub fn user_info(&self) -> UserInfoRef {
122 self.mutable_inner.read().unwrap().user_info.clone()
123 }
124
125 pub fn set_user_info(&self, user_info: UserInfoRef) {
126 self.mutable_inner.write().unwrap().user_info = user_info;
127 }
128
129 pub fn set_catalog(&self, catalog: String) {
130 *self.catalog.write().unwrap() = catalog;
131 }
132
133 pub fn catalog(&self) -> String {
134 self.catalog.read().unwrap().clone()
135 }
136
137 pub fn schema(&self) -> String {
138 self.mutable_inner.read().unwrap().schema.clone()
139 }
140
141 pub fn set_schema(&self, schema: String) {
142 self.mutable_inner.write().unwrap().schema = schema;
143 }
144
145 pub fn get_db_string(&self) -> String {
146 build_db_string(&self.catalog(), &self.schema())
147 }
148}