1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{from_pb_time_ranges, ColumnDataTypeWrapper};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 alter_request, compact_request, region_request, truncate_request, AlterRequest, AlterRequests,
26 BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests,
27 DropRequest, DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
28};
29use api::v1::{
30 self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
31 SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::{TimeToLive, Timestamp};
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use num_enum::TryFromPrimitive;
40use serde::{Deserialize, Serialize};
41use snafu::{ensure, OptionExt, ResultExt};
42use strum::{AsRefStr, IntoStaticStr};
43
44use crate::logstore::entry;
45use crate::metadata::{
46 ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
47 InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
48 InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
49 RegionMetadata, Result, UnexpectedSnafu,
50};
51use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
52use crate::metrics;
53use crate::mito_engine_options::{
54 TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
55};
56use crate::path_utils::table_dir;
57use crate::storage::{ColumnId, RegionId, ScanRequest};
58
59#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
61#[repr(u8)]
62pub enum PathType {
63 Bare,
67 Data,
71 Metadata,
75}
76
77#[derive(Debug, IntoStaticStr)]
78pub enum BatchRegionDdlRequest {
79 Create(Vec<(RegionId, RegionCreateRequest)>),
80 Drop(Vec<(RegionId, RegionDropRequest)>),
81 Alter(Vec<(RegionId, RegionAlterRequest)>),
82}
83
84impl BatchRegionDdlRequest {
85 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
87 match body {
88 region_request::Body::Creates(creates) => {
89 let requests = creates
90 .requests
91 .into_iter()
92 .map(parse_region_create)
93 .collect::<Result<Vec<_>>>()?;
94 Ok(Some(Self::Create(requests)))
95 }
96 region_request::Body::Drops(drops) => {
97 let requests = drops
98 .requests
99 .into_iter()
100 .map(parse_region_drop)
101 .collect::<Result<Vec<_>>>()?;
102 Ok(Some(Self::Drop(requests)))
103 }
104 region_request::Body::Alters(alters) => {
105 let requests = alters
106 .requests
107 .into_iter()
108 .map(parse_region_alter)
109 .collect::<Result<Vec<_>>>()?;
110 Ok(Some(Self::Alter(requests)))
111 }
112 _ => Ok(None),
113 }
114 }
115
116 pub fn request_type(&self) -> &'static str {
117 self.into()
118 }
119
120 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
121 match self {
122 Self::Create(requests) => requests
123 .into_iter()
124 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
125 .collect(),
126 Self::Drop(requests) => requests
127 .into_iter()
128 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
129 .collect(),
130 Self::Alter(requests) => requests
131 .into_iter()
132 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
133 .collect(),
134 }
135 }
136}
137
138#[derive(Debug, IntoStaticStr)]
139pub enum RegionRequest {
140 Put(RegionPutRequest),
141 Delete(RegionDeleteRequest),
142 Create(RegionCreateRequest),
143 Drop(RegionDropRequest),
144 Open(RegionOpenRequest),
145 Close(RegionCloseRequest),
146 Alter(RegionAlterRequest),
147 Flush(RegionFlushRequest),
148 Compact(RegionCompactRequest),
149 Truncate(RegionTruncateRequest),
150 Catchup(RegionCatchupRequest),
151 BulkInserts(RegionBulkInsertsRequest),
152}
153
154impl RegionRequest {
155 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
158 match body {
159 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
160 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
161 region_request::Body::Create(create) => make_region_create(create),
162 region_request::Body::Drop(drop) => make_region_drop(drop),
163 region_request::Body::Open(open) => make_region_open(open),
164 region_request::Body::Close(close) => make_region_close(close),
165 region_request::Body::Alter(alter) => make_region_alter(alter),
166 region_request::Body::Flush(flush) => make_region_flush(flush),
167 region_request::Body::Compact(compact) => make_region_compact(compact),
168 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
169 region_request::Body::Creates(creates) => make_region_creates(creates),
170 region_request::Body::Drops(drops) => make_region_drops(drops),
171 region_request::Body::Alters(alters) => make_region_alters(alters),
172 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
173 region_request::Body::Sync(_) => UnexpectedSnafu {
174 reason: "Sync request should be handled separately by RegionServer",
175 }
176 .fail(),
177 region_request::Body::ListMetadata(_) => UnexpectedSnafu {
178 reason: "ListMetadata request should be handled separately by RegionServer",
179 }
180 .fail(),
181 }
182 }
183
184 pub fn request_type(&self) -> &'static str {
186 self.into()
187 }
188}
189
190fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
191 let requests = inserts
192 .requests
193 .into_iter()
194 .filter_map(|r| {
195 let region_id = r.region_id.into();
196 r.rows.map(|rows| {
197 (
198 region_id,
199 RegionRequest::Put(RegionPutRequest { rows, hint: None }),
200 )
201 })
202 })
203 .collect();
204 Ok(requests)
205}
206
207fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
208 let requests = deletes
209 .requests
210 .into_iter()
211 .filter_map(|r| {
212 let region_id = r.region_id.into();
213 r.rows.map(|rows| {
214 (
215 region_id,
216 RegionRequest::Delete(RegionDeleteRequest { rows }),
217 )
218 })
219 })
220 .collect();
221 Ok(requests)
222}
223
224fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
225 let column_metadatas = create
226 .column_defs
227 .into_iter()
228 .map(ColumnMetadata::try_from_column_def)
229 .collect::<Result<Vec<_>>>()?;
230 let region_id = RegionId::from(create.region_id);
231 let table_dir = table_dir(&create.path, region_id.table_id());
232 let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
233 Ok((
234 region_id,
235 RegionCreateRequest {
236 engine: create.engine,
237 column_metadatas,
238 primary_key: create.primary_key,
239 options: create.options,
240 table_dir,
241 path_type: PathType::Bare,
242 partition_expr_json,
243 },
244 ))
245}
246
247fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
248 let (region_id, request) = parse_region_create(create)?;
249 Ok(vec![(region_id, RegionRequest::Create(request))])
250}
251
252fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
253 let mut requests = Vec::with_capacity(creates.requests.len());
254 for create in creates.requests {
255 requests.extend(make_region_create(create)?);
256 }
257 Ok(requests)
258}
259
260fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
261 let region_id = drop.region_id.into();
262 Ok((
263 region_id,
264 RegionDropRequest {
265 fast_path: drop.fast_path,
266 },
267 ))
268}
269
270fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
271 let (region_id, request) = parse_region_drop(drop)?;
272 Ok(vec![(region_id, RegionRequest::Drop(request))])
273}
274
275fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
276 let mut requests = Vec::with_capacity(drops.requests.len());
277 for drop in drops.requests {
278 requests.extend(make_region_drop(drop)?);
279 }
280 Ok(requests)
281}
282
283fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
284 let region_id = RegionId::from(open.region_id);
285 let table_dir = table_dir(&open.path, region_id.table_id());
286 Ok(vec![(
287 region_id,
288 RegionRequest::Open(RegionOpenRequest {
289 engine: open.engine,
290 table_dir,
291 path_type: PathType::Bare,
292 options: open.options,
293 skip_wal_replay: false,
294 }),
295 )])
296}
297
298fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
299 let region_id = close.region_id.into();
300 Ok(vec![(
301 region_id,
302 RegionRequest::Close(RegionCloseRequest {}),
303 )])
304}
305
306fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
307 let region_id = alter.region_id.into();
308 let request = RegionAlterRequest::try_from(alter)?;
309 Ok((region_id, request))
310}
311
312fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
313 let (region_id, request) = parse_region_alter(alter)?;
314 Ok(vec![(region_id, RegionRequest::Alter(request))])
315}
316
317fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
318 let mut requests = Vec::with_capacity(alters.requests.len());
319 for alter in alters.requests {
320 requests.extend(make_region_alter(alter)?);
321 }
322 Ok(requests)
323}
324
325fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
326 let region_id = flush.region_id.into();
327 Ok(vec![(
328 region_id,
329 RegionRequest::Flush(RegionFlushRequest {
330 row_group_size: None,
331 }),
332 )])
333}
334
335fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
336 let region_id = compact.region_id.into();
337 let options = compact
338 .options
339 .unwrap_or(compact_request::Options::Regular(Default::default()));
340 Ok(vec![(
341 region_id,
342 RegionRequest::Compact(RegionCompactRequest { options }),
343 )])
344}
345
346fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
347 let region_id = truncate.region_id.into();
348 match truncate.kind {
349 None => InvalidRawRegionRequestSnafu {
350 err: "missing kind in TruncateRequest".to_string(),
351 }
352 .fail(),
353 Some(truncate_request::Kind::All(_)) => Ok(vec![(
354 region_id,
355 RegionRequest::Truncate(RegionTruncateRequest::All),
356 )]),
357 Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
358 let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
359
360 Ok(vec![(
361 region_id,
362 RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
363 )])
364 }
365 }
366}
367
368fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
370 let region_id = request.region_id.into();
371 let Some(Body::ArrowIpc(request)) = request.body else {
372 return Ok(vec![]);
373 };
374
375 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
376 .with_label_values(&["decode"])
377 .start_timer();
378 let mut decoder =
379 FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
380 let payload = decoder
381 .try_decode_record_batch(&request.data_header, &request.payload)
382 .context(FlightCodecSnafu)?;
383 decoder_timer.observe_duration();
384 Ok(vec![(
385 region_id,
386 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
387 region_id,
388 payload,
389 raw_data: request,
390 }),
391 )])
392}
393
394#[derive(Debug)]
396pub struct RegionPutRequest {
397 pub rows: Rows,
399 pub hint: Option<WriteHint>,
401}
402
403#[derive(Debug)]
404pub struct RegionReadRequest {
405 pub request: ScanRequest,
406}
407
408#[derive(Debug)]
410pub struct RegionDeleteRequest {
411 pub rows: Rows,
415}
416
417#[derive(Debug, Clone)]
418pub struct RegionCreateRequest {
419 pub engine: String,
421 pub column_metadatas: Vec<ColumnMetadata>,
423 pub primary_key: Vec<ColumnId>,
425 pub options: HashMap<String, String>,
427 pub table_dir: String,
429 pub path_type: PathType,
431 pub partition_expr_json: Option<String>,
434}
435
436impl RegionCreateRequest {
437 pub fn validate(&self) -> Result<()> {
439 ensure!(
441 self.column_metadatas
442 .iter()
443 .any(|x| x.semantic_type == SemanticType::Timestamp),
444 InvalidRegionRequestSnafu {
445 region_id: RegionId::new(0, 0),
446 err: "missing timestamp column in create region request".to_string(),
447 }
448 );
449
450 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
452 for (i, c) in self.column_metadatas.iter().enumerate() {
453 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
454 return InvalidRegionRequestSnafu {
455 region_id: RegionId::new(0, 0),
456 err: format!(
457 "duplicate column id {} (at position {} and {}) in create region request",
458 c.column_id, previous, i
459 ),
460 }
461 .fail();
462 }
463 }
464
465 for column_id in &self.primary_key {
467 ensure!(
468 column_id_to_indices.contains_key(column_id),
469 InvalidRegionRequestSnafu {
470 region_id: RegionId::new(0, 0),
471 err: format!(
472 "missing primary key column {} in create region request",
473 column_id
474 ),
475 }
476 );
477 }
478
479 Ok(())
480 }
481
482 pub fn is_physical_table(&self) -> bool {
484 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
485 }
486}
487
488#[derive(Debug, Clone)]
489pub struct RegionDropRequest {
490 pub fast_path: bool,
491}
492
493#[derive(Debug, Clone)]
495pub struct RegionOpenRequest {
496 pub engine: String,
498 pub table_dir: String,
500 pub path_type: PathType,
502 pub options: HashMap<String, String>,
504 pub skip_wal_replay: bool,
506}
507
508impl RegionOpenRequest {
509 pub fn is_physical_table(&self) -> bool {
511 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
512 }
513}
514
515#[derive(Debug)]
517pub struct RegionCloseRequest {}
518
519#[derive(Debug, PartialEq, Eq, Clone)]
521pub struct RegionAlterRequest {
522 pub kind: AlterKind,
524}
525
526impl RegionAlterRequest {
527 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
529 self.kind.validate(metadata)?;
530
531 Ok(())
532 }
533
534 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
538 debug_assert!(self.validate(metadata).is_ok());
539 self.kind.need_alter(metadata)
540 }
541}
542
543impl TryFrom<AlterRequest> for RegionAlterRequest {
544 type Error = MetadataError;
545
546 fn try_from(value: AlterRequest) -> Result<Self> {
547 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
548 err: "missing kind in AlterRequest",
549 })?;
550
551 let kind = AlterKind::try_from(kind)?;
552 Ok(RegionAlterRequest { kind })
553 }
554}
555
556#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
558pub enum AlterKind {
559 AddColumns {
561 columns: Vec<AddColumn>,
563 },
564 DropColumns {
566 names: Vec<String>,
568 },
569 ModifyColumnTypes {
571 columns: Vec<ModifyColumnType>,
573 },
574 SetRegionOptions { options: Vec<SetRegionOption> },
576 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
578 SetIndexes { options: Vec<SetIndexOption> },
580 UnsetIndexes { options: Vec<UnsetIndexOption> },
582 DropDefaults {
584 names: Vec<String>,
586 },
587 SetDefaults {
589 columns: Vec<SetDefault>,
591 },
592 SyncColumns {
594 column_metadatas: Vec<ColumnMetadata>,
595 },
596}
597#[derive(Debug, PartialEq, Eq, Clone)]
598pub struct SetDefault {
599 pub name: String,
600 pub default_constraint: Vec<u8>,
601}
602
603#[derive(Debug, PartialEq, Eq, Clone)]
604pub enum SetIndexOption {
605 Fulltext {
606 column_name: String,
607 options: FulltextOptions,
608 },
609 Inverted {
610 column_name: String,
611 },
612 Skipping {
613 column_name: String,
614 options: SkippingIndexOptions,
615 },
616}
617
618impl SetIndexOption {
619 pub fn column_name(&self) -> &String {
621 match self {
622 SetIndexOption::Fulltext { column_name, .. } => column_name,
623 SetIndexOption::Inverted { column_name } => column_name,
624 SetIndexOption::Skipping { column_name, .. } => column_name,
625 }
626 }
627
628 pub fn is_fulltext(&self) -> bool {
630 match self {
631 SetIndexOption::Fulltext { .. } => true,
632 SetIndexOption::Inverted { .. } => false,
633 SetIndexOption::Skipping { .. } => false,
634 }
635 }
636}
637
638impl TryFrom<v1::SetIndex> for SetIndexOption {
639 type Error = MetadataError;
640
641 fn try_from(value: v1::SetIndex) -> Result<Self> {
642 let option = value.options.context(InvalidRawRegionRequestSnafu {
643 err: "missing options in SetIndex",
644 })?;
645
646 let opt = match option {
647 v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
648 column_name: x.column_name.clone(),
649 options: FulltextOptions::new(
650 x.enable,
651 as_fulltext_option_analyzer(
652 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
653 ),
654 x.case_sensitive,
655 as_fulltext_option_backend(
656 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
657 ),
658 x.granularity as u32,
659 x.false_positive_rate,
660 )
661 .context(InvalidIndexOptionSnafu)?,
662 },
663 v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
664 column_name: i.column_name,
665 },
666 v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
667 column_name: s.column_name,
668 options: SkippingIndexOptions::new(
669 s.granularity as u32,
670 s.false_positive_rate,
671 as_skipping_index_type(
672 PbSkippingIndexType::try_from(s.skipping_index_type)
673 .context(DecodeProtoSnafu)?,
674 ),
675 )
676 .context(InvalidIndexOptionSnafu)?,
677 },
678 };
679
680 Ok(opt)
681 }
682}
683
684#[derive(Debug, PartialEq, Eq, Clone)]
685pub enum UnsetIndexOption {
686 Fulltext { column_name: String },
687 Inverted { column_name: String },
688 Skipping { column_name: String },
689}
690
691impl UnsetIndexOption {
692 pub fn column_name(&self) -> &String {
693 match self {
694 UnsetIndexOption::Fulltext { column_name } => column_name,
695 UnsetIndexOption::Inverted { column_name } => column_name,
696 UnsetIndexOption::Skipping { column_name } => column_name,
697 }
698 }
699
700 pub fn is_fulltext(&self) -> bool {
701 match self {
702 UnsetIndexOption::Fulltext { .. } => true,
703 UnsetIndexOption::Inverted { .. } => false,
704 UnsetIndexOption::Skipping { .. } => false,
705 }
706 }
707}
708
709impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
710 type Error = MetadataError;
711
712 fn try_from(value: v1::UnsetIndex) -> Result<Self> {
713 let option = value.options.context(InvalidRawRegionRequestSnafu {
714 err: "missing options in UnsetIndex",
715 })?;
716
717 let opt = match option {
718 v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
719 column_name: f.column_name,
720 },
721 v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
722 column_name: i.column_name,
723 },
724 v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
725 column_name: s.column_name,
726 },
727 };
728
729 Ok(opt)
730 }
731}
732
733impl AlterKind {
734 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
738 match self {
739 AlterKind::AddColumns { columns } => {
740 for col_to_add in columns {
741 col_to_add.validate(metadata)?;
742 }
743 }
744 AlterKind::DropColumns { names } => {
745 for name in names {
746 Self::validate_column_to_drop(name, metadata)?;
747 }
748 }
749 AlterKind::ModifyColumnTypes { columns } => {
750 for col_to_change in columns {
751 col_to_change.validate(metadata)?;
752 }
753 }
754 AlterKind::SetRegionOptions { .. } => {}
755 AlterKind::UnsetRegionOptions { .. } => {}
756 AlterKind::SetIndexes { options } => {
757 for option in options {
758 Self::validate_column_alter_index_option(
759 option.column_name(),
760 metadata,
761 option.is_fulltext(),
762 )?;
763 }
764 }
765 AlterKind::UnsetIndexes { options } => {
766 for option in options {
767 Self::validate_column_alter_index_option(
768 option.column_name(),
769 metadata,
770 option.is_fulltext(),
771 )?;
772 }
773 }
774 AlterKind::DropDefaults { names } => {
775 names
776 .iter()
777 .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
778 }
779 AlterKind::SetDefaults { columns } => {
780 columns
781 .iter()
782 .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
783 }
784 AlterKind::SyncColumns { column_metadatas } => {
785 let new_primary_keys = column_metadatas
786 .iter()
787 .filter(|c| c.semantic_type == SemanticType::Tag)
788 .map(|c| (c.column_schema.name.as_str(), c.column_id))
789 .collect::<HashMap<_, _>>();
790
791 let old_primary_keys = metadata
792 .column_metadatas
793 .iter()
794 .filter(|c| c.semantic_type == SemanticType::Tag)
795 .map(|c| (c.column_schema.name.as_str(), c.column_id));
796
797 for (name, id) in old_primary_keys {
798 let primary_key =
799 new_primary_keys
800 .get(name)
801 .with_context(|| InvalidRegionRequestSnafu {
802 region_id: metadata.region_id,
803 err: format!("column {} is not a primary key", name),
804 })?;
805
806 ensure!(
807 *primary_key == id,
808 InvalidRegionRequestSnafu {
809 region_id: metadata.region_id,
810 err: format!(
811 "column with same name {} has different id, existing: {}, got: {}",
812 name, id, primary_key
813 ),
814 }
815 );
816 }
817
818 let new_ts_column = column_metadatas
819 .iter()
820 .find(|c| c.semantic_type == SemanticType::Timestamp)
821 .map(|c| (c.column_schema.name.as_str(), c.column_id))
822 .context(InvalidRegionRequestSnafu {
823 region_id: metadata.region_id,
824 err: "timestamp column not found",
825 })?;
826
827 let old_ts_column = metadata
829 .column_metadatas
830 .iter()
831 .find(|c| c.semantic_type == SemanticType::Timestamp)
832 .map(|c| (c.column_schema.name.as_str(), c.column_id))
833 .unwrap();
834
835 ensure!(
836 new_ts_column == old_ts_column,
837 InvalidRegionRequestSnafu {
838 region_id: metadata.region_id,
839 err: format!(
840 "timestamp column {} has different id, existing: {}, got: {}",
841 old_ts_column.0, old_ts_column.1, new_ts_column.1
842 ),
843 }
844 );
845 }
846 }
847 Ok(())
848 }
849
850 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
852 debug_assert!(self.validate(metadata).is_ok());
853 match self {
854 AlterKind::AddColumns { columns } => columns
855 .iter()
856 .any(|col_to_add| col_to_add.need_alter(metadata)),
857 AlterKind::DropColumns { names } => names
858 .iter()
859 .any(|name| metadata.column_by_name(name).is_some()),
860 AlterKind::ModifyColumnTypes { columns } => columns
861 .iter()
862 .any(|col_to_change| col_to_change.need_alter(metadata)),
863 AlterKind::SetRegionOptions { .. } => {
864 true
867 }
868 AlterKind::UnsetRegionOptions { .. } => true,
869 AlterKind::SetIndexes { options, .. } => options
870 .iter()
871 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
872 AlterKind::UnsetIndexes { options } => options
873 .iter()
874 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
875 AlterKind::DropDefaults { names } => names
876 .iter()
877 .any(|name| metadata.column_by_name(name).is_some()),
878
879 AlterKind::SetDefaults { columns } => columns
880 .iter()
881 .any(|x| metadata.column_by_name(&x.name).is_some()),
882 AlterKind::SyncColumns { column_metadatas } => {
883 metadata.column_metadatas != *column_metadatas
884 }
885 }
886 }
887
888 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
890 let Some(column) = metadata.column_by_name(name) else {
891 return Ok(());
892 };
893 ensure!(
894 column.semantic_type == SemanticType::Field,
895 InvalidRegionRequestSnafu {
896 region_id: metadata.region_id,
897 err: format!("column {} is not a field and could not be dropped", name),
898 }
899 );
900 Ok(())
901 }
902
903 fn validate_column_alter_index_option(
905 column_name: &String,
906 metadata: &RegionMetadata,
907 is_fulltext: bool,
908 ) -> Result<()> {
909 let column = metadata
910 .column_by_name(column_name)
911 .context(InvalidRegionRequestSnafu {
912 region_id: metadata.region_id,
913 err: format!("column {} not found", column_name),
914 })?;
915
916 if is_fulltext {
917 ensure!(
918 column.column_schema.data_type.is_string(),
919 InvalidRegionRequestSnafu {
920 region_id: metadata.region_id,
921 err: format!(
922 "cannot change alter index options for non-string column {}",
923 column_name
924 ),
925 }
926 );
927 }
928
929 Ok(())
930 }
931
932 fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
934 metadata
935 .column_by_name(column_name)
936 .context(InvalidRegionRequestSnafu {
937 region_id: metadata.region_id,
938 err: format!("column {} not found", column_name),
939 })?;
940
941 Ok(())
942 }
943}
944
945impl TryFrom<alter_request::Kind> for AlterKind {
946 type Error = MetadataError;
947
948 fn try_from(kind: alter_request::Kind) -> Result<Self> {
949 let alter_kind = match kind {
950 alter_request::Kind::AddColumns(x) => {
951 let columns = x
952 .add_columns
953 .into_iter()
954 .map(|x| x.try_into())
955 .collect::<Result<Vec<_>>>()?;
956 AlterKind::AddColumns { columns }
957 }
958 alter_request::Kind::ModifyColumnTypes(x) => {
959 let columns = x
960 .modify_column_types
961 .into_iter()
962 .map(|x| x.into())
963 .collect::<Vec<_>>();
964 AlterKind::ModifyColumnTypes { columns }
965 }
966 alter_request::Kind::DropColumns(x) => {
967 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
968 AlterKind::DropColumns { names }
969 }
970 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
971 options: options
972 .table_options
973 .iter()
974 .map(TryFrom::try_from)
975 .collect::<Result<Vec<_>>>()?,
976 },
977 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
978 keys: options
979 .keys
980 .iter()
981 .map(|key| UnsetRegionOption::try_from(key.as_str()))
982 .collect::<Result<Vec<_>>>()?,
983 },
984 alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
985 options: vec![SetIndexOption::try_from(o)?],
986 },
987 alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
988 options: vec![UnsetIndexOption::try_from(o)?],
989 },
990 alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
991 options: o
992 .set_indexes
993 .into_iter()
994 .map(SetIndexOption::try_from)
995 .collect::<Result<Vec<_>>>()?,
996 },
997 alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
998 options: o
999 .unset_indexes
1000 .into_iter()
1001 .map(UnsetIndexOption::try_from)
1002 .collect::<Result<Vec<_>>>()?,
1003 },
1004 alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1005 names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1006 },
1007 alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1008 columns: x
1009 .set_defaults
1010 .into_iter()
1011 .map(|x| {
1012 Ok(SetDefault {
1013 name: x.column_name,
1014 default_constraint: x.default_constraint.clone(),
1015 })
1016 })
1017 .collect::<Result<Vec<_>>>()?,
1018 },
1019 alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1020 column_metadatas: x
1021 .column_defs
1022 .into_iter()
1023 .map(ColumnMetadata::try_from_column_def)
1024 .collect::<Result<Vec<_>>>()?,
1025 },
1026 };
1027
1028 Ok(alter_kind)
1029 }
1030}
1031
1032#[derive(Debug, PartialEq, Eq, Clone)]
1034pub struct AddColumn {
1035 pub column_metadata: ColumnMetadata,
1037 pub location: Option<AddColumnLocation>,
1040}
1041
1042impl AddColumn {
1043 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1048 ensure!(
1049 self.column_metadata.column_schema.is_nullable()
1050 || self
1051 .column_metadata
1052 .column_schema
1053 .default_constraint()
1054 .is_some(),
1055 InvalidRegionRequestSnafu {
1056 region_id: metadata.region_id,
1057 err: format!(
1058 "no default value for column {}",
1059 self.column_metadata.column_schema.name
1060 ),
1061 }
1062 );
1063
1064 if let Some(existing_column) =
1065 metadata.column_by_name(&self.column_metadata.column_schema.name)
1066 {
1067 ensure!(
1069 *existing_column == self.column_metadata,
1070 InvalidRegionRequestSnafu {
1071 region_id: metadata.region_id,
1072 err: format!(
1073 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1074 self.column_metadata.column_schema.name, existing_column, self.column_metadata,
1075 ),
1076 }
1077 );
1078 ensure!(
1079 self.location.is_none(),
1080 InvalidRegionRequestSnafu {
1081 region_id: metadata.region_id,
1082 err: format!(
1083 "column {} already exists, but location is specified",
1084 self.column_metadata.column_schema.name
1085 ),
1086 }
1087 );
1088 }
1089
1090 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1091 ensure!(
1093 existing_column.column_schema.name == self.column_metadata.column_schema.name,
1094 InvalidRegionRequestSnafu {
1095 region_id: metadata.region_id,
1096 err: format!(
1097 "column id {} already exists with different name {}",
1098 self.column_metadata.column_id, existing_column.column_schema.name
1099 ),
1100 }
1101 );
1102 }
1103
1104 Ok(())
1105 }
1106
1107 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1109 debug_assert!(self.validate(metadata).is_ok());
1110 metadata
1111 .column_by_name(&self.column_metadata.column_schema.name)
1112 .is_none()
1113 }
1114}
1115
1116impl TryFrom<v1::region::AddColumn> for AddColumn {
1117 type Error = MetadataError;
1118
1119 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1120 let column_def = add_column
1121 .column_def
1122 .context(InvalidRawRegionRequestSnafu {
1123 err: "missing column_def in AddColumn",
1124 })?;
1125
1126 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1127 let location = add_column
1128 .location
1129 .map(AddColumnLocation::try_from)
1130 .transpose()?;
1131
1132 Ok(AddColumn {
1133 column_metadata,
1134 location,
1135 })
1136 }
1137}
1138
1139#[derive(Debug, PartialEq, Eq, Clone)]
1141pub enum AddColumnLocation {
1142 First,
1144 After {
1146 column_name: String,
1148 },
1149}
1150
1151impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1152 type Error = MetadataError;
1153
1154 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1155 let location_type = LocationType::try_from(location.location_type)
1156 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1157 let add_column_location = match location_type {
1158 LocationType::First => AddColumnLocation::First,
1159 LocationType::After => AddColumnLocation::After {
1160 column_name: location.after_column_name,
1161 },
1162 };
1163
1164 Ok(add_column_location)
1165 }
1166}
1167
1168#[derive(Debug, PartialEq, Eq, Clone)]
1170pub struct ModifyColumnType {
1171 pub column_name: String,
1173 pub target_type: ConcreteDataType,
1175}
1176
1177impl ModifyColumnType {
1178 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1180 let column_meta = metadata
1181 .column_by_name(&self.column_name)
1182 .with_context(|| InvalidRegionRequestSnafu {
1183 region_id: metadata.region_id,
1184 err: format!("column {} not found", self.column_name),
1185 })?;
1186
1187 ensure!(
1188 matches!(column_meta.semantic_type, SemanticType::Field),
1189 InvalidRegionRequestSnafu {
1190 region_id: metadata.region_id,
1191 err: "'timestamp' or 'tag' column cannot change type".to_string()
1192 }
1193 );
1194 ensure!(
1195 column_meta
1196 .column_schema
1197 .data_type
1198 .can_arrow_type_cast_to(&self.target_type),
1199 InvalidRegionRequestSnafu {
1200 region_id: metadata.region_id,
1201 err: format!(
1202 "column '{}' cannot be cast automatically to type '{}'",
1203 self.column_name, self.target_type
1204 ),
1205 }
1206 );
1207
1208 Ok(())
1209 }
1210
1211 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1213 debug_assert!(self.validate(metadata).is_ok());
1214 metadata.column_by_name(&self.column_name).is_some()
1215 }
1216}
1217
1218impl From<v1::ModifyColumnType> for ModifyColumnType {
1219 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1220 let target_type = ColumnDataTypeWrapper::new(
1221 modify_column_type.target_type(),
1222 modify_column_type.target_type_extension,
1223 )
1224 .into();
1225
1226 ModifyColumnType {
1227 column_name: modify_column_type.column_name,
1228 target_type,
1229 }
1230 }
1231}
1232
1233#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1234pub enum SetRegionOption {
1235 Ttl(Option<TimeToLive>),
1236 Twsc(String, String),
1238}
1239
1240impl TryFrom<&PbOption> for SetRegionOption {
1241 type Error = MetadataError;
1242
1243 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1244 let PbOption { key, value } = value;
1245 match key.as_str() {
1246 TTL_KEY => {
1247 let ttl = TimeToLive::from_humantime_or_str(value)
1248 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1249
1250 Ok(Self::Ttl(Some(ttl)))
1251 }
1252 TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1253 Ok(Self::Twsc(key.to_string(), value.to_string()))
1254 }
1255 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1256 }
1257 }
1258}
1259
1260impl From<&UnsetRegionOption> for SetRegionOption {
1261 fn from(unset_option: &UnsetRegionOption) -> Self {
1262 match unset_option {
1263 UnsetRegionOption::TwcsTriggerFileNum => {
1264 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1265 }
1266 UnsetRegionOption::TwcsMaxOutputFileSize => {
1267 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1268 }
1269 UnsetRegionOption::TwcsTimeWindow => {
1270 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1271 }
1272 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1273 }
1274 }
1275}
1276
1277impl TryFrom<&str> for UnsetRegionOption {
1278 type Error = MetadataError;
1279
1280 fn try_from(key: &str) -> Result<Self> {
1281 match key.to_ascii_lowercase().as_str() {
1282 TTL_KEY => Ok(Self::Ttl),
1283 TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1284 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1285 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1286 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1287 }
1288 }
1289}
1290
1291#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1292pub enum UnsetRegionOption {
1293 TwcsTriggerFileNum,
1294 TwcsMaxOutputFileSize,
1295 TwcsTimeWindow,
1296 Ttl,
1297}
1298
1299impl UnsetRegionOption {
1300 pub fn as_str(&self) -> &str {
1301 match self {
1302 Self::Ttl => TTL_KEY,
1303 Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1304 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1305 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1306 }
1307 }
1308}
1309
1310impl Display for UnsetRegionOption {
1311 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1312 write!(f, "{}", self.as_str())
1313 }
1314}
1315
1316#[derive(Debug, Clone, Default)]
1317pub struct RegionFlushRequest {
1318 pub row_group_size: Option<usize>,
1319}
1320
1321#[derive(Debug)]
1322pub struct RegionCompactRequest {
1323 pub options: compact_request::Options,
1324}
1325
1326impl Default for RegionCompactRequest {
1327 fn default() -> Self {
1328 Self {
1329 options: compact_request::Options::Regular(Default::default()),
1331 }
1332 }
1333}
1334
1335#[derive(Debug)]
1337pub enum RegionTruncateRequest {
1338 All,
1340 ByTimeRanges {
1341 time_ranges: Vec<(Timestamp, Timestamp)>,
1345 },
1346}
1347
1348#[derive(Debug, Clone, Copy)]
1353pub struct RegionCatchupRequest {
1354 pub set_writable: bool,
1356 pub entry_id: Option<entry::Id>,
1359 pub metadata_entry_id: Option<entry::Id>,
1363 pub location_id: Option<u64>,
1365}
1366
1367#[derive(Debug, Clone)]
1369pub struct RegionSequencesRequest {
1370 pub region_ids: Vec<RegionId>,
1371}
1372
1373#[derive(Debug, Clone)]
1374pub struct RegionBulkInsertsRequest {
1375 pub region_id: RegionId,
1376 pub payload: DfRecordBatch,
1377 pub raw_data: ArrowIpc,
1378}
1379
1380impl RegionBulkInsertsRequest {
1381 pub fn estimated_size(&self) -> usize {
1382 self.payload.get_array_memory_size()
1383 }
1384}
1385
1386impl fmt::Display for RegionRequest {
1387 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1388 match self {
1389 RegionRequest::Put(_) => write!(f, "Put"),
1390 RegionRequest::Delete(_) => write!(f, "Delete"),
1391 RegionRequest::Create(_) => write!(f, "Create"),
1392 RegionRequest::Drop(_) => write!(f, "Drop"),
1393 RegionRequest::Open(_) => write!(f, "Open"),
1394 RegionRequest::Close(_) => write!(f, "Close"),
1395 RegionRequest::Alter(_) => write!(f, "Alter"),
1396 RegionRequest::Flush(_) => write!(f, "Flush"),
1397 RegionRequest::Compact(_) => write!(f, "Compact"),
1398 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1399 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1400 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1401 }
1402 }
1403}
1404
1405#[cfg(test)]
1406mod tests {
1407
1408 use api::v1::region::RegionColumnDef;
1409 use api::v1::{ColumnDataType, ColumnDef};
1410 use datatypes::prelude::ConcreteDataType;
1411 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1412
1413 use super::*;
1414 use crate::metadata::RegionMetadataBuilder;
1415
1416 #[test]
1417 fn test_from_proto_location() {
1418 let proto_location = v1::AddColumnLocation {
1419 location_type: LocationType::First as i32,
1420 after_column_name: String::default(),
1421 };
1422 let location = AddColumnLocation::try_from(proto_location).unwrap();
1423 assert_eq!(location, AddColumnLocation::First);
1424
1425 let proto_location = v1::AddColumnLocation {
1426 location_type: 10,
1427 after_column_name: String::default(),
1428 };
1429 AddColumnLocation::try_from(proto_location).unwrap_err();
1430
1431 let proto_location = v1::AddColumnLocation {
1432 location_type: LocationType::After as i32,
1433 after_column_name: "a".to_string(),
1434 };
1435 let location = AddColumnLocation::try_from(proto_location).unwrap();
1436 assert_eq!(
1437 location,
1438 AddColumnLocation::After {
1439 column_name: "a".to_string()
1440 }
1441 );
1442 }
1443
1444 #[test]
1445 fn test_from_none_proto_add_column() {
1446 AddColumn::try_from(v1::region::AddColumn {
1447 column_def: None,
1448 location: None,
1449 })
1450 .unwrap_err();
1451 }
1452
1453 #[test]
1454 fn test_from_proto_alter_request() {
1455 RegionAlterRequest::try_from(AlterRequest {
1456 region_id: 0,
1457 schema_version: 1,
1458 kind: None,
1459 })
1460 .unwrap_err();
1461
1462 let request = RegionAlterRequest::try_from(AlterRequest {
1463 region_id: 0,
1464 schema_version: 1,
1465 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1466 add_columns: vec![v1::region::AddColumn {
1467 column_def: Some(RegionColumnDef {
1468 column_def: Some(ColumnDef {
1469 name: "a".to_string(),
1470 data_type: ColumnDataType::String as i32,
1471 is_nullable: true,
1472 default_constraint: vec![],
1473 semantic_type: SemanticType::Field as i32,
1474 comment: String::new(),
1475 ..Default::default()
1476 }),
1477 column_id: 1,
1478 }),
1479 location: Some(v1::AddColumnLocation {
1480 location_type: LocationType::First as i32,
1481 after_column_name: String::default(),
1482 }),
1483 }],
1484 })),
1485 })
1486 .unwrap();
1487
1488 assert_eq!(
1489 request,
1490 RegionAlterRequest {
1491 kind: AlterKind::AddColumns {
1492 columns: vec![AddColumn {
1493 column_metadata: ColumnMetadata {
1494 column_schema: ColumnSchema::new(
1495 "a",
1496 ConcreteDataType::string_datatype(),
1497 true,
1498 ),
1499 semantic_type: SemanticType::Field,
1500 column_id: 1,
1501 },
1502 location: Some(AddColumnLocation::First),
1503 }]
1504 },
1505 }
1506 );
1507 }
1508
1509 fn new_metadata() -> RegionMetadata {
1512 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1513 builder
1514 .push_column_metadata(ColumnMetadata {
1515 column_schema: ColumnSchema::new(
1516 "ts",
1517 ConcreteDataType::timestamp_millisecond_datatype(),
1518 false,
1519 ),
1520 semantic_type: SemanticType::Timestamp,
1521 column_id: 1,
1522 })
1523 .push_column_metadata(ColumnMetadata {
1524 column_schema: ColumnSchema::new(
1525 "tag_0",
1526 ConcreteDataType::string_datatype(),
1527 true,
1528 ),
1529 semantic_type: SemanticType::Tag,
1530 column_id: 2,
1531 })
1532 .push_column_metadata(ColumnMetadata {
1533 column_schema: ColumnSchema::new(
1534 "field_0",
1535 ConcreteDataType::string_datatype(),
1536 true,
1537 ),
1538 semantic_type: SemanticType::Field,
1539 column_id: 3,
1540 })
1541 .push_column_metadata(ColumnMetadata {
1542 column_schema: ColumnSchema::new(
1543 "field_1",
1544 ConcreteDataType::boolean_datatype(),
1545 true,
1546 ),
1547 semantic_type: SemanticType::Field,
1548 column_id: 4,
1549 })
1550 .primary_key(vec![2]);
1551 builder.build().unwrap()
1552 }
1553
1554 #[test]
1555 fn test_add_column_validate() {
1556 let metadata = new_metadata();
1557 let add_column = AddColumn {
1558 column_metadata: ColumnMetadata {
1559 column_schema: ColumnSchema::new(
1560 "tag_1",
1561 ConcreteDataType::string_datatype(),
1562 true,
1563 ),
1564 semantic_type: SemanticType::Tag,
1565 column_id: 5,
1566 },
1567 location: None,
1568 };
1569 add_column.validate(&metadata).unwrap();
1570 assert!(add_column.need_alter(&metadata));
1571
1572 AddColumn {
1574 column_metadata: ColumnMetadata {
1575 column_schema: ColumnSchema::new(
1576 "tag_1",
1577 ConcreteDataType::string_datatype(),
1578 false,
1579 ),
1580 semantic_type: SemanticType::Tag,
1581 column_id: 5,
1582 },
1583 location: None,
1584 }
1585 .validate(&metadata)
1586 .unwrap_err();
1587
1588 let add_column = AddColumn {
1590 column_metadata: ColumnMetadata {
1591 column_schema: ColumnSchema::new(
1592 "tag_0",
1593 ConcreteDataType::string_datatype(),
1594 true,
1595 ),
1596 semantic_type: SemanticType::Tag,
1597 column_id: 2,
1598 },
1599 location: None,
1600 };
1601 add_column.validate(&metadata).unwrap();
1602 assert!(!add_column.need_alter(&metadata));
1603 }
1604
1605 #[test]
1606 fn test_add_duplicate_columns() {
1607 let kind = AlterKind::AddColumns {
1608 columns: vec![
1609 AddColumn {
1610 column_metadata: ColumnMetadata {
1611 column_schema: ColumnSchema::new(
1612 "tag_1",
1613 ConcreteDataType::string_datatype(),
1614 true,
1615 ),
1616 semantic_type: SemanticType::Tag,
1617 column_id: 5,
1618 },
1619 location: None,
1620 },
1621 AddColumn {
1622 column_metadata: ColumnMetadata {
1623 column_schema: ColumnSchema::new(
1624 "tag_1",
1625 ConcreteDataType::string_datatype(),
1626 true,
1627 ),
1628 semantic_type: SemanticType::Field,
1629 column_id: 6,
1630 },
1631 location: None,
1632 },
1633 ],
1634 };
1635 let metadata = new_metadata();
1636 kind.validate(&metadata).unwrap();
1637 assert!(kind.need_alter(&metadata));
1638 }
1639
1640 #[test]
1641 fn test_add_existing_column_different_metadata() {
1642 let metadata = new_metadata();
1643
1644 let kind = AlterKind::AddColumns {
1646 columns: vec![AddColumn {
1647 column_metadata: ColumnMetadata {
1648 column_schema: ColumnSchema::new(
1649 "tag_0",
1650 ConcreteDataType::string_datatype(),
1651 true,
1652 ),
1653 semantic_type: SemanticType::Tag,
1654 column_id: 4,
1655 },
1656 location: None,
1657 }],
1658 };
1659 kind.validate(&metadata).unwrap_err();
1660
1661 let kind = AlterKind::AddColumns {
1663 columns: vec![AddColumn {
1664 column_metadata: ColumnMetadata {
1665 column_schema: ColumnSchema::new(
1666 "tag_0",
1667 ConcreteDataType::int64_datatype(),
1668 true,
1669 ),
1670 semantic_type: SemanticType::Tag,
1671 column_id: 2,
1672 },
1673 location: None,
1674 }],
1675 };
1676 kind.validate(&metadata).unwrap_err();
1677
1678 let kind = AlterKind::AddColumns {
1680 columns: vec![AddColumn {
1681 column_metadata: ColumnMetadata {
1682 column_schema: ColumnSchema::new(
1683 "tag_1",
1684 ConcreteDataType::string_datatype(),
1685 true,
1686 ),
1687 semantic_type: SemanticType::Tag,
1688 column_id: 2,
1689 },
1690 location: None,
1691 }],
1692 };
1693 kind.validate(&metadata).unwrap_err();
1694 }
1695
1696 #[test]
1697 fn test_add_existing_column_with_location() {
1698 let metadata = new_metadata();
1699 let kind = AlterKind::AddColumns {
1700 columns: vec![AddColumn {
1701 column_metadata: ColumnMetadata {
1702 column_schema: ColumnSchema::new(
1703 "tag_0",
1704 ConcreteDataType::string_datatype(),
1705 true,
1706 ),
1707 semantic_type: SemanticType::Tag,
1708 column_id: 2,
1709 },
1710 location: Some(AddColumnLocation::First),
1711 }],
1712 };
1713 kind.validate(&metadata).unwrap_err();
1714 }
1715
1716 #[test]
1717 fn test_validate_drop_column() {
1718 let metadata = new_metadata();
1719 let kind = AlterKind::DropColumns {
1720 names: vec!["xxxx".to_string()],
1721 };
1722 kind.validate(&metadata).unwrap();
1723 assert!(!kind.need_alter(&metadata));
1724
1725 AlterKind::DropColumns {
1726 names: vec!["tag_0".to_string()],
1727 }
1728 .validate(&metadata)
1729 .unwrap_err();
1730
1731 let kind = AlterKind::DropColumns {
1732 names: vec!["field_0".to_string()],
1733 };
1734 kind.validate(&metadata).unwrap();
1735 assert!(kind.need_alter(&metadata));
1736 }
1737
1738 #[test]
1739 fn test_validate_modify_column_type() {
1740 let metadata = new_metadata();
1741 AlterKind::ModifyColumnTypes {
1742 columns: vec![ModifyColumnType {
1743 column_name: "xxxx".to_string(),
1744 target_type: ConcreteDataType::string_datatype(),
1745 }],
1746 }
1747 .validate(&metadata)
1748 .unwrap_err();
1749
1750 AlterKind::ModifyColumnTypes {
1751 columns: vec![ModifyColumnType {
1752 column_name: "field_1".to_string(),
1753 target_type: ConcreteDataType::date_datatype(),
1754 }],
1755 }
1756 .validate(&metadata)
1757 .unwrap_err();
1758
1759 AlterKind::ModifyColumnTypes {
1760 columns: vec![ModifyColumnType {
1761 column_name: "ts".to_string(),
1762 target_type: ConcreteDataType::date_datatype(),
1763 }],
1764 }
1765 .validate(&metadata)
1766 .unwrap_err();
1767
1768 AlterKind::ModifyColumnTypes {
1769 columns: vec![ModifyColumnType {
1770 column_name: "tag_0".to_string(),
1771 target_type: ConcreteDataType::date_datatype(),
1772 }],
1773 }
1774 .validate(&metadata)
1775 .unwrap_err();
1776
1777 let kind = AlterKind::ModifyColumnTypes {
1778 columns: vec![ModifyColumnType {
1779 column_name: "field_0".to_string(),
1780 target_type: ConcreteDataType::int32_datatype(),
1781 }],
1782 };
1783 kind.validate(&metadata).unwrap();
1784 assert!(kind.need_alter(&metadata));
1785 }
1786
1787 #[test]
1788 fn test_validate_add_columns() {
1789 let kind = AlterKind::AddColumns {
1790 columns: vec![
1791 AddColumn {
1792 column_metadata: ColumnMetadata {
1793 column_schema: ColumnSchema::new(
1794 "tag_1",
1795 ConcreteDataType::string_datatype(),
1796 true,
1797 ),
1798 semantic_type: SemanticType::Tag,
1799 column_id: 5,
1800 },
1801 location: None,
1802 },
1803 AddColumn {
1804 column_metadata: ColumnMetadata {
1805 column_schema: ColumnSchema::new(
1806 "field_2",
1807 ConcreteDataType::string_datatype(),
1808 true,
1809 ),
1810 semantic_type: SemanticType::Field,
1811 column_id: 6,
1812 },
1813 location: None,
1814 },
1815 ],
1816 };
1817 let request = RegionAlterRequest { kind };
1818 let mut metadata = new_metadata();
1819 metadata.schema_version = 1;
1820 request.validate(&metadata).unwrap();
1821 }
1822
1823 #[test]
1824 fn test_validate_create_region() {
1825 let column_metadatas = vec![
1826 ColumnMetadata {
1827 column_schema: ColumnSchema::new(
1828 "ts",
1829 ConcreteDataType::timestamp_millisecond_datatype(),
1830 false,
1831 ),
1832 semantic_type: SemanticType::Timestamp,
1833 column_id: 1,
1834 },
1835 ColumnMetadata {
1836 column_schema: ColumnSchema::new(
1837 "tag_0",
1838 ConcreteDataType::string_datatype(),
1839 true,
1840 ),
1841 semantic_type: SemanticType::Tag,
1842 column_id: 2,
1843 },
1844 ColumnMetadata {
1845 column_schema: ColumnSchema::new(
1846 "field_0",
1847 ConcreteDataType::string_datatype(),
1848 true,
1849 ),
1850 semantic_type: SemanticType::Field,
1851 column_id: 3,
1852 },
1853 ];
1854 let create = RegionCreateRequest {
1855 engine: "mito".to_string(),
1856 column_metadatas,
1857 primary_key: vec![3, 4],
1858 options: HashMap::new(),
1859 table_dir: "path".to_string(),
1860 path_type: PathType::Bare,
1861 partition_expr_json: Some("".to_string()),
1862 };
1863
1864 assert!(create.validate().is_err());
1865 }
1866
1867 #[test]
1868 fn test_validate_modify_column_fulltext_options() {
1869 let kind = AlterKind::SetIndexes {
1870 options: vec![SetIndexOption::Fulltext {
1871 column_name: "tag_0".to_string(),
1872 options: FulltextOptions::new_unchecked(
1873 true,
1874 FulltextAnalyzer::Chinese,
1875 false,
1876 FulltextBackend::Bloom,
1877 1000,
1878 0.01,
1879 ),
1880 }],
1881 };
1882 let request = RegionAlterRequest { kind };
1883 let mut metadata = new_metadata();
1884 metadata.schema_version = 1;
1885 request.validate(&metadata).unwrap();
1886
1887 let kind = AlterKind::UnsetIndexes {
1888 options: vec![UnsetIndexOption::Fulltext {
1889 column_name: "tag_0".to_string(),
1890 }],
1891 };
1892 let request = RegionAlterRequest { kind };
1893 let mut metadata = new_metadata();
1894 metadata.schema_version = 1;
1895 request.validate(&metadata).unwrap();
1896 }
1897
1898 #[test]
1899 fn test_validate_sync_columns() {
1900 let metadata = new_metadata();
1901 let kind = AlterKind::SyncColumns {
1902 column_metadatas: vec![
1903 ColumnMetadata {
1904 column_schema: ColumnSchema::new(
1905 "tag_1",
1906 ConcreteDataType::string_datatype(),
1907 true,
1908 ),
1909 semantic_type: SemanticType::Tag,
1910 column_id: 5,
1911 },
1912 ColumnMetadata {
1913 column_schema: ColumnSchema::new(
1914 "field_2",
1915 ConcreteDataType::string_datatype(),
1916 true,
1917 ),
1918 semantic_type: SemanticType::Field,
1919 column_id: 6,
1920 },
1921 ],
1922 };
1923 let err = kind.validate(&metadata).unwrap_err();
1924 assert!(err.to_string().contains("not a primary key"));
1925
1926 let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
1928 let ts_column = column_metadatas_with_different_ts_column
1929 .iter_mut()
1930 .find(|c| c.semantic_type == SemanticType::Timestamp)
1931 .unwrap();
1932 ts_column.column_schema.name = "ts1".to_string();
1933
1934 let kind = AlterKind::SyncColumns {
1935 column_metadatas: column_metadatas_with_different_ts_column,
1936 };
1937 let err = kind.validate(&metadata).unwrap_err();
1938 assert!(err
1939 .to_string()
1940 .contains("timestamp column ts has different id"));
1941
1942 let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
1944 let pk_column = column_metadatas_with_different_pk_column
1945 .iter_mut()
1946 .find(|c| c.column_schema.name == "tag_0")
1947 .unwrap();
1948 pk_column.column_id = 100;
1949 let kind = AlterKind::SyncColumns {
1950 column_metadatas: column_metadatas_with_different_pk_column,
1951 };
1952 let err = kind.validate(&metadata).unwrap_err();
1953 assert!(err
1954 .to_string()
1955 .contains("column with same name tag_0 has different id"));
1956
1957 let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
1959 column_metadatas_with_new_field_column.push(ColumnMetadata {
1960 column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
1961 semantic_type: SemanticType::Field,
1962 column_id: 4,
1963 });
1964 let kind = AlterKind::SyncColumns {
1965 column_metadatas: column_metadatas_with_new_field_column,
1966 };
1967 kind.validate(&metadata).unwrap();
1968 }
1969
1970 #[test]
1971 fn test_cast_path_type_to_primitive() {
1972 assert_eq!(PathType::Bare as u8, 0);
1973 assert_eq!(PathType::Data as u8, 1);
1974 assert_eq!(PathType::Metadata as u8, 2);
1975 assert_eq!(
1976 PathType::try_from(PathType::Bare as u8).unwrap(),
1977 PathType::Bare
1978 );
1979 assert_eq!(
1980 PathType::try_from(PathType::Data as u8).unwrap(),
1981 PathType::Data
1982 );
1983 assert_eq!(
1984 PathType::try_from(PathType::Metadata as u8).unwrap(),
1985 PathType::Metadata
1986 );
1987 }
1988}