1mod column_schema;
16pub mod constraint;
17
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::{fmt, mem};
21
22use arrow::datatypes::{Field, Schema as ArrowSchema};
23use datafusion_common::DFSchemaRef;
24use serde::{Deserialize, Deserializer, Serialize};
25use snafu::{ResultExt, ensure};
26
27use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
28use crate::prelude::ConcreteDataType;
29pub use crate::schema::column_schema::{
30 COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
31 COLUMN_FULLTEXT_OPT_KEY_BACKEND, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE,
32 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY,
33 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
34 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COLUMN_VECTOR_INDEX_OPT_KEY_CONNECTIVITY,
35 COLUMN_VECTOR_INDEX_OPT_KEY_ENGINE, COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_ADD,
36 COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_SEARCH, COLUMN_VECTOR_INDEX_OPT_KEY_METRIC, COMMENT_KEY,
37 ColumnExtType, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer, FulltextBackend, FulltextOptions,
38 INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType,
39 TIME_INDEX_KEY, VECTOR_INDEX_KEY, VectorDistanceMetric, VectorIndexEngineType,
40 VectorIndexOptions,
41};
42pub use crate::schema::constraint::ColumnDefaultConstraint;
43
44pub const VERSION_KEY: &str = "greptime:version";
46pub const TYPE_KEY: &str = "greptime:type";
48
49#[derive(Clone, PartialEq, Eq, Serialize)]
51pub struct Schema {
52 column_schemas: Vec<ColumnSchema>,
53 #[serde(skip_serializing)]
54 name_to_index: HashMap<String, usize>,
55 #[serde(skip_serializing)]
56 arrow_schema: Arc<ArrowSchema>,
57 #[serde(skip_serializing)]
62 timestamp_index: Option<usize>,
63 version: u32,
67}
68
69impl<'de> Deserialize<'de> for Schema {
70 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
71 where
72 D: Deserializer<'de>,
73 {
74 use serde::de::Error;
75
76 #[derive(Deserialize)]
77 struct RawSchema {
78 column_schemas: Vec<ColumnSchema>,
79 version: u32,
80 }
81 let raw = RawSchema::deserialize(deserializer)?;
82
83 SchemaBuilder::try_from(raw.column_schemas)
84 .and_then(|x| x.version(raw.version).build())
85 .map_err(D::Error::custom)
86 }
87}
88
89impl fmt::Debug for Schema {
90 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
91 f.debug_struct("Schema")
92 .field("column_schemas", &self.column_schemas)
93 .field("name_to_index", &self.name_to_index)
94 .field("timestamp_index", &self.timestamp_index)
95 .field("version", &self.version)
96 .finish()
97 }
98}
99
100impl Schema {
101 pub const INITIAL_VERSION: u32 = 0;
103
104 pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
109 Self::new_with_version(column_schemas, Self::INITIAL_VERSION)
111 }
112
113 pub fn new_with_version(column_schemas: Vec<ColumnSchema>, version: u32) -> Schema {
114 SchemaBuilder::try_from(column_schemas)
115 .unwrap()
116 .version(version)
117 .build()
118 .unwrap()
119 }
120
121 pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
123 SchemaBuilder::try_from(column_schemas)?.build()
124 }
125
126 pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
127 &self.arrow_schema
128 }
129
130 pub fn column_schemas(&self) -> &[ColumnSchema] {
131 &self.column_schemas
132 }
133
134 pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
135 self.name_to_index
136 .get(name)
137 .map(|index| &self.column_schemas[*index])
138 }
139
140 pub fn column_name_by_index(&self, idx: usize) -> &str {
144 &self.column_schemas[idx].name
145 }
146
147 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
148 self.name_to_index.get(name).copied()
149 }
150
151 pub fn contains_column(&self, name: &str) -> bool {
152 self.name_to_index.contains_key(name)
153 }
154
155 pub fn num_columns(&self) -> usize {
156 self.column_schemas.len()
157 }
158
159 pub fn is_empty(&self) -> bool {
160 self.column_schemas.is_empty()
161 }
162
163 pub fn timestamp_index(&self) -> Option<usize> {
165 self.timestamp_index
166 }
167
168 pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
169 self.timestamp_index.map(|idx| &self.column_schemas[idx])
170 }
171
172 pub fn version(&self) -> u32 {
173 self.version
174 }
175
176 pub fn metadata(&self) -> &HashMap<String, String> {
177 &self.arrow_schema.metadata
178 }
179
180 pub fn estimated_size(&self) -> usize {
182 mem::size_of_val(self)
183 + mem::size_of::<ColumnSchema>() * self.column_schemas.capacity()
184 + self
185 .column_schemas
186 .iter()
187 .map(|column_schema| {
188 column_schema.estimated_size() - mem::size_of::<ColumnSchema>()
189 })
190 .sum::<usize>()
191 + mem::size_of::<(String, usize)>() * self.name_to_index.capacity()
192 + self
193 .name_to_index
194 .keys()
195 .map(|name| name.capacity())
196 .sum::<usize>()
197 + arrow_schema_size(self.arrow_schema.as_ref())
198 }
199
200 pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
206 let mut column_schemas = Vec::with_capacity(indices.len());
207 let mut timestamp_index = None;
208 for index in indices {
209 if let Some(ts_index) = self.timestamp_index
210 && ts_index == *index
211 {
212 timestamp_index = Some(column_schemas.len());
213 }
214 column_schemas.push(self.column_schemas[*index].clone());
215 }
216 let arrow_schema = self
217 .arrow_schema
218 .project(indices)
219 .context(ProjectArrowSchemaSnafu)?;
220 let name_to_index = column_schemas
221 .iter()
222 .enumerate()
223 .map(|(pos, column_schema)| (column_schema.name.clone(), pos))
224 .collect();
225
226 Ok(Self {
227 column_schemas,
228 name_to_index,
229 arrow_schema: Arc::new(arrow_schema),
230 timestamp_index,
231 version: self.version,
232 })
233 }
234}
235
236fn arrow_schema_size(schema: &ArrowSchema) -> usize {
237 mem::size_of_val(schema)
238 + schema.fields.size()
239 + mem::size_of::<(String, String)>() * schema.metadata.capacity()
240 + schema
241 .metadata
242 .iter()
243 .map(|(key, value)| key.capacity() + value.capacity())
244 .sum::<usize>()
245}
246
247#[derive(Default)]
248pub struct SchemaBuilder {
249 column_schemas: Vec<ColumnSchema>,
250 name_to_index: HashMap<String, usize>,
251 fields: Vec<Field>,
252 timestamp_index: Option<usize>,
253 version: u32,
254 metadata: HashMap<String, String>,
255}
256
257impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
258 type Error = Error;
259
260 fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
261 SchemaBuilder::try_from_columns(column_schemas)
262 }
263}
264
265impl SchemaBuilder {
266 pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
267 let FieldsAndIndices {
268 fields,
269 name_to_index,
270 timestamp_index,
271 } = collect_fields(&column_schemas)?;
272
273 Ok(Self {
274 column_schemas,
275 name_to_index,
276 fields,
277 timestamp_index,
278 ..Default::default()
279 })
280 }
281
282 pub fn version(mut self, version: u32) -> Self {
283 self.version = version;
284 self
285 }
286
287 pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
291 let _ = self.metadata.insert(key.into(), value.into());
292 self
293 }
294
295 pub fn build(mut self) -> Result<Schema> {
296 if let Some(timestamp_index) = self.timestamp_index {
297 validate_timestamp_index(&self.column_schemas, timestamp_index)?;
298 }
299
300 self.metadata
301 .insert(VERSION_KEY.to_string(), self.version.to_string());
302
303 let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
304
305 Ok(Schema {
306 column_schemas: self.column_schemas,
307 name_to_index: self.name_to_index,
308 arrow_schema: Arc::new(arrow_schema),
309 timestamp_index: self.timestamp_index,
310 version: self.version,
311 })
312 }
313}
314
315struct FieldsAndIndices {
316 fields: Vec<Field>,
317 name_to_index: HashMap<String, usize>,
318 timestamp_index: Option<usize>,
319}
320
321fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
322 let mut fields = Vec::with_capacity(column_schemas.len());
323 let mut name_to_index = HashMap::with_capacity(column_schemas.len());
324 let mut timestamp_index = None;
325 for (index, column_schema) in column_schemas.iter().enumerate() {
326 if column_schema.is_time_index() && timestamp_index.is_none() {
327 timestamp_index = Some(index);
328 }
329 let mut field = Field::try_from(column_schema)?;
330
331 let extype = match column_schema.data_type {
333 ConcreteDataType::Json(_) => Some(ColumnExtType::Json),
334 ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)),
335 _ => None,
336 };
337 if let Some(extype) = extype {
338 field
339 .metadata_mut()
340 .insert(TYPE_KEY.to_string(), extype.to_string());
341 }
342 fields.push(field);
343 ensure!(
344 name_to_index
345 .insert(column_schema.name.clone(), index)
346 .is_none(),
347 DuplicateColumnSnafu {
348 column: &column_schema.name,
349 }
350 );
351 }
352
353 Ok(FieldsAndIndices {
354 fields,
355 name_to_index,
356 timestamp_index,
357 })
358}
359
360fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
361 ensure!(
362 timestamp_index < column_schemas.len(),
363 error::InvalidTimestampIndexSnafu {
364 index: timestamp_index,
365 }
366 );
367
368 let column_schema = &column_schemas[timestamp_index];
369 ensure!(
370 column_schema.data_type.is_timestamp(),
371 error::InvalidTimestampIndexSnafu {
372 index: timestamp_index,
373 }
374 );
375 ensure!(
376 column_schema.is_time_index(),
377 error::InvalidTimestampIndexSnafu {
378 index: timestamp_index,
379 }
380 );
381
382 Ok(())
383}
384
385pub type SchemaRef = Arc<Schema>;
386
387impl TryFrom<Arc<ArrowSchema>> for Schema {
388 type Error = Error;
389
390 fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
391 let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
392 let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
393 for field in &arrow_schema.fields {
394 let column_schema = ColumnSchema::try_from(field.as_ref())?;
395 let _ = name_to_index.insert(field.name().clone(), column_schemas.len());
396 column_schemas.push(column_schema);
397 }
398
399 let mut timestamp_index = None;
400 for (index, column_schema) in column_schemas.iter().enumerate() {
401 if column_schema.is_time_index() {
402 validate_timestamp_index(&column_schemas, index)?;
403 timestamp_index = Some(index);
404 break;
405 }
406 }
407
408 let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
409
410 Ok(Self {
411 column_schemas,
412 name_to_index,
413 arrow_schema,
414 timestamp_index,
415 version,
416 })
417 }
418}
419
420impl TryFrom<ArrowSchema> for Schema {
421 type Error = Error;
422
423 fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
424 let arrow_schema = Arc::new(arrow_schema);
425
426 Schema::try_from(arrow_schema)
427 }
428}
429
430impl TryFrom<DFSchemaRef> for Schema {
431 type Error = Error;
432
433 fn try_from(value: DFSchemaRef) -> Result<Self> {
434 value.inner().clone().try_into()
435 }
436}
437
438fn try_parse_version(metadata: &HashMap<String, String>, key: &str) -> Result<u32> {
439 if let Some(value) = metadata.get(key) {
440 let version = value
441 .parse()
442 .context(error::ParseSchemaVersionSnafu { value })?;
443
444 Ok(version)
445 } else {
446 Ok(Schema::INITIAL_VERSION)
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453 use crate::data_type::ConcreteDataType;
454
455 #[test]
456 fn test_build_empty_schema() {
457 let schema = SchemaBuilder::default().build().unwrap();
458 assert_eq!(0, schema.num_columns());
459 assert!(schema.is_empty());
460 }
461
462 #[test]
463 fn test_schema_no_timestamp() {
464 let column_schemas = vec![
465 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
466 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
467 ];
468 let schema = Schema::new(column_schemas.clone());
469
470 assert_eq!(2, schema.num_columns());
471 assert!(!schema.is_empty());
472 assert!(schema.timestamp_index().is_none());
473 assert!(schema.timestamp_column().is_none());
474 assert_eq!(Schema::INITIAL_VERSION, schema.version());
475
476 for column_schema in &column_schemas {
477 let found = schema.column_schema_by_name(&column_schema.name).unwrap();
478 assert_eq!(column_schema, found);
479 }
480 assert!(schema.column_schema_by_name("col3").is_none());
481
482 let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
483
484 assert_eq!(schema, new_schema);
485 assert_eq!(column_schemas, schema.column_schemas());
486 }
487
488 #[test]
489 fn test_schema_duplicate_column() {
490 let column_schemas = vec![
491 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
492 ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
493 ];
494 let err = Schema::try_new(column_schemas).unwrap_err();
495
496 assert!(
497 matches!(err, Error::DuplicateColumn { .. }),
498 "expect DuplicateColumn, found {}",
499 err
500 );
501 }
502
503 #[test]
504 fn test_metadata() {
505 let column_schemas = vec![ColumnSchema::new(
506 "col1",
507 ConcreteDataType::int32_datatype(),
508 false,
509 )];
510 let schema = SchemaBuilder::try_from(column_schemas)
511 .unwrap()
512 .add_metadata("k1", "v1")
513 .build()
514 .unwrap();
515
516 assert_eq!("v1", schema.metadata().get("k1").unwrap());
517 }
518
519 #[test]
520 fn test_schema_with_timestamp() {
521 let column_schemas = vec![
522 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
523 ColumnSchema::new(
524 "ts",
525 ConcreteDataType::timestamp_millisecond_datatype(),
526 false,
527 )
528 .with_time_index(true),
529 ];
530 let schema = SchemaBuilder::try_from(column_schemas.clone())
531 .unwrap()
532 .version(123)
533 .build()
534 .unwrap();
535
536 assert_eq!(1, schema.timestamp_index().unwrap());
537 assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
538 assert_eq!(123, schema.version());
539
540 let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
541 assert_eq!(1, schema.timestamp_index().unwrap());
542 assert_eq!(schema, new_schema);
543 }
544
545 #[test]
546 fn test_schema_wrong_timestamp() {
547 let column_schemas = vec![
548 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
549 .with_time_index(true),
550 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
551 ];
552 assert!(
553 SchemaBuilder::try_from(column_schemas)
554 .unwrap()
555 .build()
556 .is_err()
557 );
558
559 let column_schemas = vec![
560 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
561 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
562 .with_time_index(true),
563 ];
564
565 assert!(
566 SchemaBuilder::try_from(column_schemas)
567 .unwrap()
568 .build()
569 .is_err()
570 );
571 }
572}