1mod column_schema;
16pub mod constraint;
17pub mod ext;
18
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::{fmt, mem};
22
23use arrow::datatypes::{Field, Schema as ArrowSchema};
24use datafusion_common::DFSchemaRef;
25use serde::{Deserialize, Deserializer, Serialize};
26use snafu::{ResultExt, ensure};
27
28use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
29use crate::prelude::ConcreteDataType;
30pub use crate::schema::column_schema::{
31 COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
32 COLUMN_FULLTEXT_OPT_KEY_BACKEND, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE,
33 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY,
34 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
35 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COLUMN_VECTOR_INDEX_OPT_KEY_CONNECTIVITY,
36 COLUMN_VECTOR_INDEX_OPT_KEY_ENGINE, COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_ADD,
37 COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_SEARCH, COLUMN_VECTOR_INDEX_OPT_KEY_METRIC, COMMENT_KEY,
38 ColumnExtType, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer, FulltextBackend, FulltextOptions,
39 INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType,
40 TIME_INDEX_KEY, VECTOR_INDEX_KEY, VectorDistanceMetric, VectorIndexEngineType,
41 VectorIndexOptions,
42};
43pub use crate::schema::constraint::ColumnDefaultConstraint;
44
45pub const VERSION_KEY: &str = "greptime:version";
47pub const TYPE_KEY: &str = "greptime:type";
49
50#[derive(Clone, PartialEq, Eq, Serialize)]
52pub struct Schema {
53 column_schemas: Vec<ColumnSchema>,
54 #[serde(skip_serializing)]
55 name_to_index: HashMap<String, usize>,
56 #[serde(skip_serializing)]
57 arrow_schema: Arc<ArrowSchema>,
58 #[serde(skip_serializing)]
63 timestamp_index: Option<usize>,
64 version: u32,
68}
69
70impl<'de> Deserialize<'de> for Schema {
71 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
72 where
73 D: Deserializer<'de>,
74 {
75 use serde::de::Error;
76
77 #[derive(Deserialize)]
78 struct RawSchema {
79 column_schemas: Vec<ColumnSchema>,
80 version: u32,
81 }
82 let raw = RawSchema::deserialize(deserializer)?;
83
84 SchemaBuilder::try_from(raw.column_schemas)
85 .and_then(|x| x.version(raw.version).build())
86 .map_err(D::Error::custom)
87 }
88}
89
90impl fmt::Debug for Schema {
91 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
92 f.debug_struct("Schema")
93 .field("column_schemas", &self.column_schemas)
94 .field("name_to_index", &self.name_to_index)
95 .field("timestamp_index", &self.timestamp_index)
96 .field("version", &self.version)
97 .finish()
98 }
99}
100
101impl Schema {
102 pub const INITIAL_VERSION: u32 = 0;
104
105 pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
110 Self::new_with_version(column_schemas, Self::INITIAL_VERSION)
112 }
113
114 pub fn new_with_version(column_schemas: Vec<ColumnSchema>, version: u32) -> Schema {
115 SchemaBuilder::try_from(column_schemas)
116 .unwrap()
117 .version(version)
118 .build()
119 .unwrap()
120 }
121
122 pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
124 SchemaBuilder::try_from(column_schemas)?.build()
125 }
126
127 pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
128 &self.arrow_schema
129 }
130
131 pub fn column_schemas(&self) -> &[ColumnSchema] {
132 &self.column_schemas
133 }
134
135 pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
136 self.name_to_index
137 .get(name)
138 .map(|index| &self.column_schemas[*index])
139 }
140
141 pub fn column_name_by_index(&self, idx: usize) -> &str {
145 &self.column_schemas[idx].name
146 }
147
148 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
149 self.name_to_index.get(name).copied()
150 }
151
152 pub fn contains_column(&self, name: &str) -> bool {
153 self.name_to_index.contains_key(name)
154 }
155
156 pub fn num_columns(&self) -> usize {
157 self.column_schemas.len()
158 }
159
160 pub fn is_empty(&self) -> bool {
161 self.column_schemas.is_empty()
162 }
163
164 pub fn timestamp_index(&self) -> Option<usize> {
166 self.timestamp_index
167 }
168
169 pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
170 self.timestamp_index.map(|idx| &self.column_schemas[idx])
171 }
172
173 pub fn version(&self) -> u32 {
174 self.version
175 }
176
177 pub fn metadata(&self) -> &HashMap<String, String> {
178 &self.arrow_schema.metadata
179 }
180
181 pub fn estimated_size(&self) -> usize {
183 mem::size_of_val(self)
184 + mem::size_of::<ColumnSchema>() * self.column_schemas.capacity()
185 + self
186 .column_schemas
187 .iter()
188 .map(|column_schema| {
189 column_schema.estimated_size() - mem::size_of::<ColumnSchema>()
190 })
191 .sum::<usize>()
192 + mem::size_of::<(String, usize)>() * self.name_to_index.capacity()
193 + self
194 .name_to_index
195 .keys()
196 .map(|name| name.capacity())
197 .sum::<usize>()
198 + arrow_schema_size(self.arrow_schema.as_ref())
199 }
200
201 pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
207 let mut column_schemas = Vec::with_capacity(indices.len());
208 let mut timestamp_index = None;
209 for index in indices {
210 if let Some(ts_index) = self.timestamp_index
211 && ts_index == *index
212 {
213 timestamp_index = Some(column_schemas.len());
214 }
215 column_schemas.push(self.column_schemas[*index].clone());
216 }
217 let arrow_schema = self
218 .arrow_schema
219 .project(indices)
220 .context(ProjectArrowSchemaSnafu)?;
221 let name_to_index = column_schemas
222 .iter()
223 .enumerate()
224 .map(|(pos, column_schema)| (column_schema.name.clone(), pos))
225 .collect();
226
227 Ok(Self {
228 column_schemas,
229 name_to_index,
230 arrow_schema: Arc::new(arrow_schema),
231 timestamp_index,
232 version: self.version,
233 })
234 }
235}
236
237fn arrow_schema_size(schema: &ArrowSchema) -> usize {
238 mem::size_of_val(schema)
239 + schema.fields.size()
240 + mem::size_of::<(String, String)>() * schema.metadata.capacity()
241 + schema
242 .metadata
243 .iter()
244 .map(|(key, value)| key.capacity() + value.capacity())
245 .sum::<usize>()
246}
247
248#[derive(Default)]
249pub struct SchemaBuilder {
250 column_schemas: Vec<ColumnSchema>,
251 name_to_index: HashMap<String, usize>,
252 fields: Vec<Field>,
253 timestamp_index: Option<usize>,
254 version: u32,
255 metadata: HashMap<String, String>,
256}
257
258impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
259 type Error = Error;
260
261 fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
262 SchemaBuilder::try_from_columns(column_schemas)
263 }
264}
265
266impl SchemaBuilder {
267 pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
268 let FieldsAndIndices {
269 fields,
270 name_to_index,
271 timestamp_index,
272 } = collect_fields(&column_schemas)?;
273
274 Ok(Self {
275 column_schemas,
276 name_to_index,
277 fields,
278 timestamp_index,
279 ..Default::default()
280 })
281 }
282
283 pub fn version(mut self, version: u32) -> Self {
284 self.version = version;
285 self
286 }
287
288 pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
292 let _ = self.metadata.insert(key.into(), value.into());
293 self
294 }
295
296 pub fn build(mut self) -> Result<Schema> {
297 if let Some(timestamp_index) = self.timestamp_index {
298 validate_timestamp_index(&self.column_schemas, timestamp_index)?;
299 }
300
301 self.metadata
302 .insert(VERSION_KEY.to_string(), self.version.to_string());
303
304 let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
305
306 Ok(Schema {
307 column_schemas: self.column_schemas,
308 name_to_index: self.name_to_index,
309 arrow_schema: Arc::new(arrow_schema),
310 timestamp_index: self.timestamp_index,
311 version: self.version,
312 })
313 }
314}
315
316struct FieldsAndIndices {
317 fields: Vec<Field>,
318 name_to_index: HashMap<String, usize>,
319 timestamp_index: Option<usize>,
320}
321
322fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
323 let mut fields = Vec::with_capacity(column_schemas.len());
324 let mut name_to_index = HashMap::with_capacity(column_schemas.len());
325 let mut timestamp_index = None;
326 for (index, column_schema) in column_schemas.iter().enumerate() {
327 if column_schema.is_time_index() && timestamp_index.is_none() {
328 timestamp_index = Some(index);
329 }
330 let mut field = Field::try_from(column_schema)?;
331
332 let extype = match column_schema.data_type {
334 ConcreteDataType::Json(_) => Some(ColumnExtType::Json),
335 ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)),
336 _ => None,
337 };
338 if let Some(extype) = extype {
339 field
340 .metadata_mut()
341 .insert(TYPE_KEY.to_string(), extype.to_string());
342 }
343 fields.push(field);
344 ensure!(
345 name_to_index
346 .insert(column_schema.name.clone(), index)
347 .is_none(),
348 DuplicateColumnSnafu {
349 column: &column_schema.name,
350 }
351 );
352 }
353
354 Ok(FieldsAndIndices {
355 fields,
356 name_to_index,
357 timestamp_index,
358 })
359}
360
361fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
362 ensure!(
363 timestamp_index < column_schemas.len(),
364 error::InvalidTimestampIndexSnafu {
365 index: timestamp_index,
366 }
367 );
368
369 let column_schema = &column_schemas[timestamp_index];
370 ensure!(
371 column_schema.data_type.is_timestamp(),
372 error::InvalidTimestampIndexSnafu {
373 index: timestamp_index,
374 }
375 );
376 ensure!(
377 column_schema.is_time_index(),
378 error::InvalidTimestampIndexSnafu {
379 index: timestamp_index,
380 }
381 );
382
383 Ok(())
384}
385
386pub type SchemaRef = Arc<Schema>;
387
388impl TryFrom<Arc<ArrowSchema>> for Schema {
389 type Error = Error;
390
391 fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
392 let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
393 let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
394 for field in &arrow_schema.fields {
395 let column_schema = ColumnSchema::try_from(field.as_ref())?;
396 let _ = name_to_index.insert(field.name().clone(), column_schemas.len());
397 column_schemas.push(column_schema);
398 }
399
400 let mut timestamp_index = None;
401 for (index, column_schema) in column_schemas.iter().enumerate() {
402 if column_schema.is_time_index() {
403 validate_timestamp_index(&column_schemas, index)?;
404 timestamp_index = Some(index);
405 break;
406 }
407 }
408
409 let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
410
411 Ok(Self {
412 column_schemas,
413 name_to_index,
414 arrow_schema,
415 timestamp_index,
416 version,
417 })
418 }
419}
420
421impl TryFrom<ArrowSchema> for Schema {
422 type Error = Error;
423
424 fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
425 let arrow_schema = Arc::new(arrow_schema);
426
427 Schema::try_from(arrow_schema)
428 }
429}
430
431impl TryFrom<DFSchemaRef> for Schema {
432 type Error = Error;
433
434 fn try_from(value: DFSchemaRef) -> Result<Self> {
435 value.inner().clone().try_into()
436 }
437}
438
439fn try_parse_version(metadata: &HashMap<String, String>, key: &str) -> Result<u32> {
440 if let Some(value) = metadata.get(key) {
441 let version = value
442 .parse()
443 .context(error::ParseSchemaVersionSnafu { value })?;
444
445 Ok(version)
446 } else {
447 Ok(Schema::INITIAL_VERSION)
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454 use crate::data_type::ConcreteDataType;
455
456 #[test]
457 fn test_build_empty_schema() {
458 let schema = SchemaBuilder::default().build().unwrap();
459 assert_eq!(0, schema.num_columns());
460 assert!(schema.is_empty());
461 }
462
463 #[test]
464 fn test_schema_no_timestamp() {
465 let column_schemas = vec![
466 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
467 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
468 ];
469 let schema = Schema::new(column_schemas.clone());
470
471 assert_eq!(2, schema.num_columns());
472 assert!(!schema.is_empty());
473 assert!(schema.timestamp_index().is_none());
474 assert!(schema.timestamp_column().is_none());
475 assert_eq!(Schema::INITIAL_VERSION, schema.version());
476
477 for column_schema in &column_schemas {
478 let found = schema.column_schema_by_name(&column_schema.name).unwrap();
479 assert_eq!(column_schema, found);
480 }
481 assert!(schema.column_schema_by_name("col3").is_none());
482
483 let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
484
485 assert_eq!(schema, new_schema);
486 assert_eq!(column_schemas, schema.column_schemas());
487 }
488
489 #[test]
490 fn test_schema_duplicate_column() {
491 let column_schemas = vec![
492 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
493 ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
494 ];
495 let err = Schema::try_new(column_schemas).unwrap_err();
496
497 assert!(
498 matches!(err, Error::DuplicateColumn { .. }),
499 "expect DuplicateColumn, found {}",
500 err
501 );
502 }
503
504 #[test]
505 fn test_metadata() {
506 let column_schemas = vec![ColumnSchema::new(
507 "col1",
508 ConcreteDataType::int32_datatype(),
509 false,
510 )];
511 let schema = SchemaBuilder::try_from(column_schemas)
512 .unwrap()
513 .add_metadata("k1", "v1")
514 .build()
515 .unwrap();
516
517 assert_eq!("v1", schema.metadata().get("k1").unwrap());
518 }
519
520 #[test]
521 fn test_schema_with_timestamp() {
522 let column_schemas = vec![
523 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
524 ColumnSchema::new(
525 "ts",
526 ConcreteDataType::timestamp_millisecond_datatype(),
527 false,
528 )
529 .with_time_index(true),
530 ];
531 let schema = SchemaBuilder::try_from(column_schemas.clone())
532 .unwrap()
533 .version(123)
534 .build()
535 .unwrap();
536
537 assert_eq!(1, schema.timestamp_index().unwrap());
538 assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
539 assert_eq!(123, schema.version());
540
541 let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
542 assert_eq!(1, schema.timestamp_index().unwrap());
543 assert_eq!(schema, new_schema);
544 }
545
546 #[test]
547 fn test_schema_wrong_timestamp() {
548 let column_schemas = vec![
549 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
550 .with_time_index(true),
551 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
552 ];
553 assert!(
554 SchemaBuilder::try_from(column_schemas)
555 .unwrap()
556 .build()
557 .is_err()
558 );
559
560 let column_schemas = vec![
561 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
562 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
563 .with_time_index(true),
564 ];
565
566 assert!(
567 SchemaBuilder::try_from(column_schemas)
568 .unwrap()
569 .build()
570 .is_err()
571 );
572 }
573}