1mod column_schema;
16pub mod constraint;
17mod raw;
18
19use std::collections::HashMap;
20use std::fmt;
21use std::sync::Arc;
22
23use arrow::datatypes::{Field, Schema as ArrowSchema};
24use column_schema::ColumnExtType;
25use datafusion_common::DFSchemaRef;
26use snafu::{ensure, ResultExt};
27
28use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
29use crate::prelude::ConcreteDataType;
30pub use crate::schema::column_schema::{
31 ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, Metadata,
32 SkippingIndexOptions, SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE,
33 COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
34 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
35 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
36 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
37 FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, TIME_INDEX_KEY,
38};
39pub use crate::schema::constraint::ColumnDefaultConstraint;
40pub use crate::schema::raw::RawSchema;
41
42pub const VERSION_KEY: &str = "greptime:version";
44pub const TYPE_KEY: &str = "greptime:type";
46
47#[derive(Clone, PartialEq, Eq)]
49pub struct Schema {
50 column_schemas: Vec<ColumnSchema>,
51 name_to_index: HashMap<String, usize>,
52 arrow_schema: Arc<ArrowSchema>,
53 timestamp_index: Option<usize>,
58 version: u32,
62}
63
64impl fmt::Debug for Schema {
65 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
66 f.debug_struct("Schema")
67 .field("column_schemas", &self.column_schemas)
68 .field("name_to_index", &self.name_to_index)
69 .field("timestamp_index", &self.timestamp_index)
70 .field("version", &self.version)
71 .finish()
72 }
73}
74
75impl Schema {
76 pub const INITIAL_VERSION: u32 = 0;
78
79 pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
84 SchemaBuilder::try_from(column_schemas)
86 .unwrap()
87 .build()
88 .unwrap()
89 }
90
91 pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
93 SchemaBuilder::try_from(column_schemas)?.build()
94 }
95
96 pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
97 &self.arrow_schema
98 }
99
100 pub fn column_schemas(&self) -> &[ColumnSchema] {
101 &self.column_schemas
102 }
103
104 pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
105 self.name_to_index
106 .get(name)
107 .map(|index| &self.column_schemas[*index])
108 }
109
110 pub fn column_name_by_index(&self, idx: usize) -> &str {
114 &self.column_schemas[idx].name
115 }
116
117 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
118 self.name_to_index.get(name).copied()
119 }
120
121 pub fn contains_column(&self, name: &str) -> bool {
122 self.name_to_index.contains_key(name)
123 }
124
125 pub fn num_columns(&self) -> usize {
126 self.column_schemas.len()
127 }
128
129 pub fn is_empty(&self) -> bool {
130 self.column_schemas.is_empty()
131 }
132
133 pub fn timestamp_index(&self) -> Option<usize> {
135 self.timestamp_index
136 }
137
138 pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
139 self.timestamp_index.map(|idx| &self.column_schemas[idx])
140 }
141
142 pub fn version(&self) -> u32 {
143 self.version
144 }
145
146 pub fn metadata(&self) -> &HashMap<String, String> {
147 &self.arrow_schema.metadata
148 }
149
150 pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
156 let mut column_schemas = Vec::with_capacity(indices.len());
157 let mut timestamp_index = None;
158 for index in indices {
159 if let Some(ts_index) = self.timestamp_index
160 && ts_index == *index
161 {
162 timestamp_index = Some(column_schemas.len());
163 }
164 column_schemas.push(self.column_schemas[*index].clone());
165 }
166 let arrow_schema = self
167 .arrow_schema
168 .project(indices)
169 .context(ProjectArrowSchemaSnafu)?;
170 let name_to_index = column_schemas
171 .iter()
172 .enumerate()
173 .map(|(pos, column_schema)| (column_schema.name.clone(), pos))
174 .collect();
175
176 Ok(Self {
177 column_schemas,
178 name_to_index,
179 arrow_schema: Arc::new(arrow_schema),
180 timestamp_index,
181 version: self.version,
182 })
183 }
184}
185
186#[derive(Default)]
187pub struct SchemaBuilder {
188 column_schemas: Vec<ColumnSchema>,
189 name_to_index: HashMap<String, usize>,
190 fields: Vec<Field>,
191 timestamp_index: Option<usize>,
192 version: u32,
193 metadata: HashMap<String, String>,
194}
195
196impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
197 type Error = Error;
198
199 fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
200 SchemaBuilder::try_from_columns(column_schemas)
201 }
202}
203
204impl SchemaBuilder {
205 pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
206 let FieldsAndIndices {
207 fields,
208 name_to_index,
209 timestamp_index,
210 } = collect_fields(&column_schemas)?;
211
212 Ok(Self {
213 column_schemas,
214 name_to_index,
215 fields,
216 timestamp_index,
217 ..Default::default()
218 })
219 }
220
221 pub fn version(mut self, version: u32) -> Self {
222 self.version = version;
223 self
224 }
225
226 pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
230 let _ = self.metadata.insert(key.into(), value.into());
231 self
232 }
233
234 pub fn build(mut self) -> Result<Schema> {
235 if let Some(timestamp_index) = self.timestamp_index {
236 validate_timestamp_index(&self.column_schemas, timestamp_index)?;
237 }
238
239 self.metadata
240 .insert(VERSION_KEY.to_string(), self.version.to_string());
241
242 let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
243
244 Ok(Schema {
245 column_schemas: self.column_schemas,
246 name_to_index: self.name_to_index,
247 arrow_schema: Arc::new(arrow_schema),
248 timestamp_index: self.timestamp_index,
249 version: self.version,
250 })
251 }
252}
253
254struct FieldsAndIndices {
255 fields: Vec<Field>,
256 name_to_index: HashMap<String, usize>,
257 timestamp_index: Option<usize>,
258}
259
260fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
261 let mut fields = Vec::with_capacity(column_schemas.len());
262 let mut name_to_index = HashMap::with_capacity(column_schemas.len());
263 let mut timestamp_index = None;
264 for (index, column_schema) in column_schemas.iter().enumerate() {
265 if column_schema.is_time_index() && timestamp_index.is_none() {
266 timestamp_index = Some(index);
267 }
268 let mut field = Field::try_from(column_schema)?;
269
270 let extype = match column_schema.data_type {
272 ConcreteDataType::Json(_) => Some(ColumnExtType::Json),
273 ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)),
274 _ => None,
275 };
276 if let Some(extype) = extype {
277 let metadata = HashMap::from([(TYPE_KEY.to_string(), extype.to_string())]);
278 field = field.with_metadata(metadata);
279 }
280 fields.push(field);
281 ensure!(
282 name_to_index
283 .insert(column_schema.name.clone(), index)
284 .is_none(),
285 DuplicateColumnSnafu {
286 column: &column_schema.name,
287 }
288 );
289 }
290
291 Ok(FieldsAndIndices {
292 fields,
293 name_to_index,
294 timestamp_index,
295 })
296}
297
298fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
299 ensure!(
300 timestamp_index < column_schemas.len(),
301 error::InvalidTimestampIndexSnafu {
302 index: timestamp_index,
303 }
304 );
305
306 let column_schema = &column_schemas[timestamp_index];
307 ensure!(
308 column_schema.data_type.is_timestamp(),
309 error::InvalidTimestampIndexSnafu {
310 index: timestamp_index,
311 }
312 );
313 ensure!(
314 column_schema.is_time_index(),
315 error::InvalidTimestampIndexSnafu {
316 index: timestamp_index,
317 }
318 );
319
320 Ok(())
321}
322
323pub type SchemaRef = Arc<Schema>;
324
325impl TryFrom<Arc<ArrowSchema>> for Schema {
326 type Error = Error;
327
328 fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
329 let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
330 let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
331 for field in &arrow_schema.fields {
332 let column_schema = ColumnSchema::try_from(field.as_ref())?;
333 let _ = name_to_index.insert(field.name().to_string(), column_schemas.len());
334 column_schemas.push(column_schema);
335 }
336
337 let mut timestamp_index = None;
338 for (index, column_schema) in column_schemas.iter().enumerate() {
339 if column_schema.is_time_index() {
340 validate_timestamp_index(&column_schemas, index)?;
341 timestamp_index = Some(index);
342 break;
343 }
344 }
345
346 let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
347
348 Ok(Self {
349 column_schemas,
350 name_to_index,
351 arrow_schema,
352 timestamp_index,
353 version,
354 })
355 }
356}
357
358impl TryFrom<ArrowSchema> for Schema {
359 type Error = Error;
360
361 fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
362 let arrow_schema = Arc::new(arrow_schema);
363
364 Schema::try_from(arrow_schema)
365 }
366}
367
368impl TryFrom<DFSchemaRef> for Schema {
369 type Error = Error;
370
371 fn try_from(value: DFSchemaRef) -> Result<Self> {
372 let s: ArrowSchema = value.as_ref().into();
373 s.try_into()
374 }
375}
376
377fn try_parse_version(metadata: &HashMap<String, String>, key: &str) -> Result<u32> {
378 if let Some(value) = metadata.get(key) {
379 let version = value
380 .parse()
381 .context(error::ParseSchemaVersionSnafu { value })?;
382
383 Ok(version)
384 } else {
385 Ok(Schema::INITIAL_VERSION)
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392 use crate::data_type::ConcreteDataType;
393
394 #[test]
395 fn test_build_empty_schema() {
396 let schema = SchemaBuilder::default().build().unwrap();
397 assert_eq!(0, schema.num_columns());
398 assert!(schema.is_empty());
399 }
400
401 #[test]
402 fn test_schema_no_timestamp() {
403 let column_schemas = vec![
404 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
405 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
406 ];
407 let schema = Schema::new(column_schemas.clone());
408
409 assert_eq!(2, schema.num_columns());
410 assert!(!schema.is_empty());
411 assert!(schema.timestamp_index().is_none());
412 assert!(schema.timestamp_column().is_none());
413 assert_eq!(Schema::INITIAL_VERSION, schema.version());
414
415 for column_schema in &column_schemas {
416 let found = schema.column_schema_by_name(&column_schema.name).unwrap();
417 assert_eq!(column_schema, found);
418 }
419 assert!(schema.column_schema_by_name("col3").is_none());
420
421 let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
422
423 assert_eq!(schema, new_schema);
424 assert_eq!(column_schemas, schema.column_schemas());
425 }
426
427 #[test]
428 fn test_schema_duplicate_column() {
429 let column_schemas = vec![
430 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
431 ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
432 ];
433 let err = Schema::try_new(column_schemas).unwrap_err();
434
435 assert!(
436 matches!(err, Error::DuplicateColumn { .. }),
437 "expect DuplicateColumn, found {}",
438 err
439 );
440 }
441
442 #[test]
443 fn test_metadata() {
444 let column_schemas = vec![ColumnSchema::new(
445 "col1",
446 ConcreteDataType::int32_datatype(),
447 false,
448 )];
449 let schema = SchemaBuilder::try_from(column_schemas)
450 .unwrap()
451 .add_metadata("k1", "v1")
452 .build()
453 .unwrap();
454
455 assert_eq!("v1", schema.metadata().get("k1").unwrap());
456 }
457
458 #[test]
459 fn test_schema_with_timestamp() {
460 let column_schemas = vec![
461 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
462 ColumnSchema::new(
463 "ts",
464 ConcreteDataType::timestamp_millisecond_datatype(),
465 false,
466 )
467 .with_time_index(true),
468 ];
469 let schema = SchemaBuilder::try_from(column_schemas.clone())
470 .unwrap()
471 .version(123)
472 .build()
473 .unwrap();
474
475 assert_eq!(1, schema.timestamp_index().unwrap());
476 assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
477 assert_eq!(123, schema.version());
478
479 let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
480 assert_eq!(1, schema.timestamp_index().unwrap());
481 assert_eq!(schema, new_schema);
482 }
483
484 #[test]
485 fn test_schema_wrong_timestamp() {
486 let column_schemas = vec![
487 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
488 .with_time_index(true),
489 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
490 ];
491 assert!(SchemaBuilder::try_from(column_schemas)
492 .unwrap()
493 .build()
494 .is_err());
495
496 let column_schemas = vec![
497 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
498 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
499 .with_time_index(true),
500 ];
501
502 assert!(SchemaBuilder::try_from(column_schemas)
503 .unwrap()
504 .build()
505 .is_err());
506 }
507}