1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest,
26 CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest,
27 DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
28};
29use api::v1::{
30 self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
31 SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::TimeToLive;
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use serde::{Deserialize, Serialize};
40use snafu::{ensure, OptionExt, ResultExt};
41use strum::{AsRefStr, IntoStaticStr};
42
43use crate::logstore::entry;
44use crate::metadata::{
45 ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidIndexOptionSnafu,
46 InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
47 InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu,
48};
49use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
50use crate::metrics;
51use crate::mito_engine_options::{
52 TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
53};
54use crate::path_utils::table_dir;
55use crate::storage::{ColumnId, RegionId, ScanRequest};
56
57#[derive(Debug, Clone, Copy, PartialEq)]
59pub enum PathType {
60 Bare,
64 Data,
68 Metadata,
72}
73
74#[derive(Debug, IntoStaticStr)]
75pub enum BatchRegionDdlRequest {
76 Create(Vec<(RegionId, RegionCreateRequest)>),
77 Drop(Vec<(RegionId, RegionDropRequest)>),
78 Alter(Vec<(RegionId, RegionAlterRequest)>),
79}
80
81impl BatchRegionDdlRequest {
82 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
84 match body {
85 region_request::Body::Creates(creates) => {
86 let requests = creates
87 .requests
88 .into_iter()
89 .map(parse_region_create)
90 .collect::<Result<Vec<_>>>()?;
91 Ok(Some(Self::Create(requests)))
92 }
93 region_request::Body::Drops(drops) => {
94 let requests = drops
95 .requests
96 .into_iter()
97 .map(parse_region_drop)
98 .collect::<Result<Vec<_>>>()?;
99 Ok(Some(Self::Drop(requests)))
100 }
101 region_request::Body::Alters(alters) => {
102 let requests = alters
103 .requests
104 .into_iter()
105 .map(parse_region_alter)
106 .collect::<Result<Vec<_>>>()?;
107 Ok(Some(Self::Alter(requests)))
108 }
109 _ => Ok(None),
110 }
111 }
112
113 pub fn request_type(&self) -> &'static str {
114 self.into()
115 }
116
117 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
118 match self {
119 Self::Create(requests) => requests
120 .into_iter()
121 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
122 .collect(),
123 Self::Drop(requests) => requests
124 .into_iter()
125 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
126 .collect(),
127 Self::Alter(requests) => requests
128 .into_iter()
129 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
130 .collect(),
131 }
132 }
133}
134
135#[derive(Debug, IntoStaticStr)]
136pub enum RegionRequest {
137 Put(RegionPutRequest),
138 Delete(RegionDeleteRequest),
139 Create(RegionCreateRequest),
140 Drop(RegionDropRequest),
141 Open(RegionOpenRequest),
142 Close(RegionCloseRequest),
143 Alter(RegionAlterRequest),
144 Flush(RegionFlushRequest),
145 Compact(RegionCompactRequest),
146 Truncate(RegionTruncateRequest),
147 Catchup(RegionCatchupRequest),
148 BulkInserts(RegionBulkInsertsRequest),
149}
150
151impl RegionRequest {
152 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
155 match body {
156 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
157 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
158 region_request::Body::Create(create) => make_region_create(create),
159 region_request::Body::Drop(drop) => make_region_drop(drop),
160 region_request::Body::Open(open) => make_region_open(open),
161 region_request::Body::Close(close) => make_region_close(close),
162 region_request::Body::Alter(alter) => make_region_alter(alter),
163 region_request::Body::Flush(flush) => make_region_flush(flush),
164 region_request::Body::Compact(compact) => make_region_compact(compact),
165 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
166 region_request::Body::Creates(creates) => make_region_creates(creates),
167 region_request::Body::Drops(drops) => make_region_drops(drops),
168 region_request::Body::Alters(alters) => make_region_alters(alters),
169 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
170 region_request::Body::Sync(_) => UnexpectedSnafu {
171 reason: "Sync request should be handled separately by RegionServer",
172 }
173 .fail(),
174 region_request::Body::ListMetadata(_) => UnexpectedSnafu {
175 reason: "ListMetadata request should be handled separately by RegionServer",
176 }
177 .fail(),
178 }
179 }
180
181 pub fn request_type(&self) -> &'static str {
183 self.into()
184 }
185}
186
187fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
188 let requests = inserts
189 .requests
190 .into_iter()
191 .filter_map(|r| {
192 let region_id = r.region_id.into();
193 r.rows.map(|rows| {
194 (
195 region_id,
196 RegionRequest::Put(RegionPutRequest { rows, hint: None }),
197 )
198 })
199 })
200 .collect();
201 Ok(requests)
202}
203
204fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
205 let requests = deletes
206 .requests
207 .into_iter()
208 .filter_map(|r| {
209 let region_id = r.region_id.into();
210 r.rows.map(|rows| {
211 (
212 region_id,
213 RegionRequest::Delete(RegionDeleteRequest { rows }),
214 )
215 })
216 })
217 .collect();
218 Ok(requests)
219}
220
221fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
222 let column_metadatas = create
223 .column_defs
224 .into_iter()
225 .map(ColumnMetadata::try_from_column_def)
226 .collect::<Result<Vec<_>>>()?;
227 let region_id = RegionId::from(create.region_id);
228 let table_dir = table_dir(&create.path, region_id.table_id());
229 Ok((
230 region_id,
231 RegionCreateRequest {
232 engine: create.engine,
233 column_metadatas,
234 primary_key: create.primary_key,
235 options: create.options,
236 table_dir,
237 path_type: PathType::Bare,
238 },
239 ))
240}
241
242fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
243 let (region_id, request) = parse_region_create(create)?;
244 Ok(vec![(region_id, RegionRequest::Create(request))])
245}
246
247fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
248 let mut requests = Vec::with_capacity(creates.requests.len());
249 for create in creates.requests {
250 requests.extend(make_region_create(create)?);
251 }
252 Ok(requests)
253}
254
255fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
256 let region_id = drop.region_id.into();
257 Ok((
258 region_id,
259 RegionDropRequest {
260 fast_path: drop.fast_path,
261 },
262 ))
263}
264
265fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
266 let (region_id, request) = parse_region_drop(drop)?;
267 Ok(vec![(region_id, RegionRequest::Drop(request))])
268}
269
270fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
271 let mut requests = Vec::with_capacity(drops.requests.len());
272 for drop in drops.requests {
273 requests.extend(make_region_drop(drop)?);
274 }
275 Ok(requests)
276}
277
278fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
279 let region_id = RegionId::from(open.region_id);
280 let table_dir = table_dir(&open.path, region_id.table_id());
281 Ok(vec![(
282 region_id,
283 RegionRequest::Open(RegionOpenRequest {
284 engine: open.engine,
285 table_dir,
286 path_type: PathType::Bare,
287 options: open.options,
288 skip_wal_replay: false,
289 }),
290 )])
291}
292
293fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
294 let region_id = close.region_id.into();
295 Ok(vec![(
296 region_id,
297 RegionRequest::Close(RegionCloseRequest {}),
298 )])
299}
300
301fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
302 let region_id = alter.region_id.into();
303 let request = RegionAlterRequest::try_from(alter)?;
304 Ok((region_id, request))
305}
306
307fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
308 let (region_id, request) = parse_region_alter(alter)?;
309 Ok(vec![(region_id, RegionRequest::Alter(request))])
310}
311
312fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
313 let mut requests = Vec::with_capacity(alters.requests.len());
314 for alter in alters.requests {
315 requests.extend(make_region_alter(alter)?);
316 }
317 Ok(requests)
318}
319
320fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
321 let region_id = flush.region_id.into();
322 Ok(vec![(
323 region_id,
324 RegionRequest::Flush(RegionFlushRequest {
325 row_group_size: None,
326 }),
327 )])
328}
329
330fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
331 let region_id = compact.region_id.into();
332 let options = compact
333 .options
334 .unwrap_or(compact_request::Options::Regular(Default::default()));
335 Ok(vec![(
336 region_id,
337 RegionRequest::Compact(RegionCompactRequest { options }),
338 )])
339}
340
341fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
342 let region_id = truncate.region_id.into();
343 Ok(vec![(
344 region_id,
345 RegionRequest::Truncate(RegionTruncateRequest {}),
346 )])
347}
348
349fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
351 let region_id = request.region_id.into();
352 let Some(Body::ArrowIpc(request)) = request.body else {
353 return Ok(vec![]);
354 };
355
356 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
357 .with_label_values(&["decode"])
358 .start_timer();
359 let mut decoder =
360 FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
361 let payload = decoder
362 .try_decode_record_batch(&request.data_header, &request.payload)
363 .context(FlightCodecSnafu)?;
364 decoder_timer.observe_duration();
365 Ok(vec![(
366 region_id,
367 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
368 region_id,
369 payload,
370 raw_data: request,
371 }),
372 )])
373}
374
375#[derive(Debug)]
377pub struct RegionPutRequest {
378 pub rows: Rows,
380 pub hint: Option<WriteHint>,
382}
383
384#[derive(Debug)]
385pub struct RegionReadRequest {
386 pub request: ScanRequest,
387}
388
389#[derive(Debug)]
391pub struct RegionDeleteRequest {
392 pub rows: Rows,
396}
397
398#[derive(Debug, Clone)]
399pub struct RegionCreateRequest {
400 pub engine: String,
402 pub column_metadatas: Vec<ColumnMetadata>,
404 pub primary_key: Vec<ColumnId>,
406 pub options: HashMap<String, String>,
408 pub table_dir: String,
410 pub path_type: PathType,
412}
413
414impl RegionCreateRequest {
415 pub fn validate(&self) -> Result<()> {
417 ensure!(
419 self.column_metadatas
420 .iter()
421 .any(|x| x.semantic_type == SemanticType::Timestamp),
422 InvalidRegionRequestSnafu {
423 region_id: RegionId::new(0, 0),
424 err: "missing timestamp column in create region request".to_string(),
425 }
426 );
427
428 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
430 for (i, c) in self.column_metadatas.iter().enumerate() {
431 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
432 return InvalidRegionRequestSnafu {
433 region_id: RegionId::new(0, 0),
434 err: format!(
435 "duplicate column id {} (at position {} and {}) in create region request",
436 c.column_id, previous, i
437 ),
438 }
439 .fail();
440 }
441 }
442
443 for column_id in &self.primary_key {
445 ensure!(
446 column_id_to_indices.contains_key(column_id),
447 InvalidRegionRequestSnafu {
448 region_id: RegionId::new(0, 0),
449 err: format!(
450 "missing primary key column {} in create region request",
451 column_id
452 ),
453 }
454 );
455 }
456
457 Ok(())
458 }
459
460 pub fn is_physical_table(&self) -> bool {
462 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
463 }
464}
465
466#[derive(Debug, Clone)]
467pub struct RegionDropRequest {
468 pub fast_path: bool,
469}
470
471#[derive(Debug, Clone)]
473pub struct RegionOpenRequest {
474 pub engine: String,
476 pub table_dir: String,
478 pub path_type: PathType,
480 pub options: HashMap<String, String>,
482 pub skip_wal_replay: bool,
484}
485
486impl RegionOpenRequest {
487 pub fn is_physical_table(&self) -> bool {
489 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
490 }
491}
492
493#[derive(Debug)]
495pub struct RegionCloseRequest {}
496
497#[derive(Debug, PartialEq, Eq, Clone)]
499pub struct RegionAlterRequest {
500 pub kind: AlterKind,
502}
503
504impl RegionAlterRequest {
505 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
507 self.kind.validate(metadata)?;
508
509 Ok(())
510 }
511
512 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
516 debug_assert!(self.validate(metadata).is_ok());
517 self.kind.need_alter(metadata)
518 }
519}
520
521impl TryFrom<AlterRequest> for RegionAlterRequest {
522 type Error = MetadataError;
523
524 fn try_from(value: AlterRequest) -> Result<Self> {
525 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
526 err: "missing kind in AlterRequest",
527 })?;
528
529 let kind = AlterKind::try_from(kind)?;
530 Ok(RegionAlterRequest { kind })
531 }
532}
533
534#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
536pub enum AlterKind {
537 AddColumns {
539 columns: Vec<AddColumn>,
541 },
542 DropColumns {
544 names: Vec<String>,
546 },
547 ModifyColumnTypes {
549 columns: Vec<ModifyColumnType>,
551 },
552 SetRegionOptions { options: Vec<SetRegionOption> },
554 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
556 SetIndexes { options: Vec<SetIndexOption> },
558 UnsetIndexes { options: Vec<UnsetIndexOption> },
560 DropDefaults {
562 names: Vec<String>,
564 },
565}
566
567#[derive(Debug, PartialEq, Eq, Clone)]
568pub enum SetIndexOption {
569 Fulltext {
570 column_name: String,
571 options: FulltextOptions,
572 },
573 Inverted {
574 column_name: String,
575 },
576 Skipping {
577 column_name: String,
578 options: SkippingIndexOptions,
579 },
580}
581
582impl SetIndexOption {
583 pub fn column_name(&self) -> &String {
585 match self {
586 SetIndexOption::Fulltext { column_name, .. } => column_name,
587 SetIndexOption::Inverted { column_name } => column_name,
588 SetIndexOption::Skipping { column_name, .. } => column_name,
589 }
590 }
591
592 pub fn is_fulltext(&self) -> bool {
594 match self {
595 SetIndexOption::Fulltext { .. } => true,
596 SetIndexOption::Inverted { .. } => false,
597 SetIndexOption::Skipping { .. } => false,
598 }
599 }
600}
601
602impl TryFrom<v1::SetIndex> for SetIndexOption {
603 type Error = MetadataError;
604
605 fn try_from(value: v1::SetIndex) -> Result<Self> {
606 let option = value.options.context(InvalidRawRegionRequestSnafu {
607 err: "missing options in SetIndex",
608 })?;
609
610 let opt = match option {
611 v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
612 column_name: x.column_name.clone(),
613 options: FulltextOptions::new(
614 x.enable,
615 as_fulltext_option_analyzer(
616 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
617 ),
618 x.case_sensitive,
619 as_fulltext_option_backend(
620 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
621 ),
622 x.granularity as u32,
623 x.false_positive_rate,
624 )
625 .context(InvalidIndexOptionSnafu)?,
626 },
627 v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
628 column_name: i.column_name,
629 },
630 v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
631 column_name: s.column_name,
632 options: SkippingIndexOptions::new(
633 s.granularity as u32,
634 s.false_positive_rate,
635 as_skipping_index_type(
636 PbSkippingIndexType::try_from(s.skipping_index_type)
637 .context(DecodeProtoSnafu)?,
638 ),
639 )
640 .context(InvalidIndexOptionSnafu)?,
641 },
642 };
643
644 Ok(opt)
645 }
646}
647
648#[derive(Debug, PartialEq, Eq, Clone)]
649pub enum UnsetIndexOption {
650 Fulltext { column_name: String },
651 Inverted { column_name: String },
652 Skipping { column_name: String },
653}
654
655impl UnsetIndexOption {
656 pub fn column_name(&self) -> &String {
657 match self {
658 UnsetIndexOption::Fulltext { column_name } => column_name,
659 UnsetIndexOption::Inverted { column_name } => column_name,
660 UnsetIndexOption::Skipping { column_name } => column_name,
661 }
662 }
663
664 pub fn is_fulltext(&self) -> bool {
665 match self {
666 UnsetIndexOption::Fulltext { .. } => true,
667 UnsetIndexOption::Inverted { .. } => false,
668 UnsetIndexOption::Skipping { .. } => false,
669 }
670 }
671}
672
673impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
674 type Error = MetadataError;
675
676 fn try_from(value: v1::UnsetIndex) -> Result<Self> {
677 let option = value.options.context(InvalidRawRegionRequestSnafu {
678 err: "missing options in UnsetIndex",
679 })?;
680
681 let opt = match option {
682 v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
683 column_name: f.column_name,
684 },
685 v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
686 column_name: i.column_name,
687 },
688 v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
689 column_name: s.column_name,
690 },
691 };
692
693 Ok(opt)
694 }
695}
696
697impl AlterKind {
698 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
702 match self {
703 AlterKind::AddColumns { columns } => {
704 for col_to_add in columns {
705 col_to_add.validate(metadata)?;
706 }
707 }
708 AlterKind::DropColumns { names } => {
709 for name in names {
710 Self::validate_column_to_drop(name, metadata)?;
711 }
712 }
713 AlterKind::ModifyColumnTypes { columns } => {
714 for col_to_change in columns {
715 col_to_change.validate(metadata)?;
716 }
717 }
718 AlterKind::SetRegionOptions { .. } => {}
719 AlterKind::UnsetRegionOptions { .. } => {}
720 AlterKind::SetIndexes { options } => {
721 for option in options {
722 Self::validate_column_alter_index_option(
723 option.column_name(),
724 metadata,
725 option.is_fulltext(),
726 )?;
727 }
728 }
729 AlterKind::UnsetIndexes { options } => {
730 for option in options {
731 Self::validate_column_alter_index_option(
732 option.column_name(),
733 metadata,
734 option.is_fulltext(),
735 )?;
736 }
737 }
738 AlterKind::DropDefaults { names } => {
739 names
740 .iter()
741 .try_for_each(|name| Self::validate_column_to_drop(name, metadata))?;
742 }
743 }
744 Ok(())
745 }
746
747 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
749 debug_assert!(self.validate(metadata).is_ok());
750 match self {
751 AlterKind::AddColumns { columns } => columns
752 .iter()
753 .any(|col_to_add| col_to_add.need_alter(metadata)),
754 AlterKind::DropColumns { names } => names
755 .iter()
756 .any(|name| metadata.column_by_name(name).is_some()),
757 AlterKind::ModifyColumnTypes { columns } => columns
758 .iter()
759 .any(|col_to_change| col_to_change.need_alter(metadata)),
760 AlterKind::SetRegionOptions { .. } => {
761 true
764 }
765 AlterKind::UnsetRegionOptions { .. } => true,
766 AlterKind::SetIndexes { options, .. } => options
767 .iter()
768 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
769 AlterKind::UnsetIndexes { options } => options
770 .iter()
771 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
772 AlterKind::DropDefaults { names } => names
773 .iter()
774 .any(|name| metadata.column_by_name(name).is_some()),
775 }
776 }
777
778 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
780 let Some(column) = metadata.column_by_name(name) else {
781 return Ok(());
782 };
783 ensure!(
784 column.semantic_type == SemanticType::Field,
785 InvalidRegionRequestSnafu {
786 region_id: metadata.region_id,
787 err: format!("column {} is not a field and could not be dropped", name),
788 }
789 );
790 Ok(())
791 }
792
793 fn validate_column_alter_index_option(
795 column_name: &String,
796 metadata: &RegionMetadata,
797 is_fulltext: bool,
798 ) -> Result<()> {
799 let column = metadata
800 .column_by_name(column_name)
801 .context(InvalidRegionRequestSnafu {
802 region_id: metadata.region_id,
803 err: format!("column {} not found", column_name),
804 })?;
805
806 if is_fulltext {
807 ensure!(
808 column.column_schema.data_type.is_string(),
809 InvalidRegionRequestSnafu {
810 region_id: metadata.region_id,
811 err: format!(
812 "cannot change alter index options for non-string column {}",
813 column_name
814 ),
815 }
816 );
817 }
818
819 Ok(())
820 }
821}
822
823impl TryFrom<alter_request::Kind> for AlterKind {
824 type Error = MetadataError;
825
826 fn try_from(kind: alter_request::Kind) -> Result<Self> {
827 let alter_kind = match kind {
828 alter_request::Kind::AddColumns(x) => {
829 let columns = x
830 .add_columns
831 .into_iter()
832 .map(|x| x.try_into())
833 .collect::<Result<Vec<_>>>()?;
834 AlterKind::AddColumns { columns }
835 }
836 alter_request::Kind::ModifyColumnTypes(x) => {
837 let columns = x
838 .modify_column_types
839 .into_iter()
840 .map(|x| x.into())
841 .collect::<Vec<_>>();
842 AlterKind::ModifyColumnTypes { columns }
843 }
844 alter_request::Kind::DropColumns(x) => {
845 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
846 AlterKind::DropColumns { names }
847 }
848 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
849 options: options
850 .table_options
851 .iter()
852 .map(TryFrom::try_from)
853 .collect::<Result<Vec<_>>>()?,
854 },
855 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
856 keys: options
857 .keys
858 .iter()
859 .map(|key| UnsetRegionOption::try_from(key.as_str()))
860 .collect::<Result<Vec<_>>>()?,
861 },
862 alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
863 options: vec![SetIndexOption::try_from(o)?],
864 },
865 alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
866 options: vec![UnsetIndexOption::try_from(o)?],
867 },
868 alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
869 options: o
870 .set_indexes
871 .into_iter()
872 .map(SetIndexOption::try_from)
873 .collect::<Result<Vec<_>>>()?,
874 },
875 alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
876 options: o
877 .unset_indexes
878 .into_iter()
879 .map(UnsetIndexOption::try_from)
880 .collect::<Result<Vec<_>>>()?,
881 },
882 alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
883 names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
884 },
885 };
886
887 Ok(alter_kind)
888 }
889}
890
891#[derive(Debug, PartialEq, Eq, Clone)]
893pub struct AddColumn {
894 pub column_metadata: ColumnMetadata,
896 pub location: Option<AddColumnLocation>,
899}
900
901impl AddColumn {
902 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
907 ensure!(
908 self.column_metadata.column_schema.is_nullable()
909 || self
910 .column_metadata
911 .column_schema
912 .default_constraint()
913 .is_some(),
914 InvalidRegionRequestSnafu {
915 region_id: metadata.region_id,
916 err: format!(
917 "no default value for column {}",
918 self.column_metadata.column_schema.name
919 ),
920 }
921 );
922
923 if let Some(existing_column) =
924 metadata.column_by_name(&self.column_metadata.column_schema.name)
925 {
926 ensure!(
928 *existing_column == self.column_metadata,
929 InvalidRegionRequestSnafu {
930 region_id: metadata.region_id,
931 err: format!(
932 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
933 self.column_metadata.column_schema.name, existing_column, self.column_metadata,
934 ),
935 }
936 );
937 ensure!(
938 self.location.is_none(),
939 InvalidRegionRequestSnafu {
940 region_id: metadata.region_id,
941 err: format!(
942 "column {} already exists, but location is specified",
943 self.column_metadata.column_schema.name
944 ),
945 }
946 );
947 }
948
949 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
950 ensure!(
952 existing_column.column_schema.name == self.column_metadata.column_schema.name,
953 InvalidRegionRequestSnafu {
954 region_id: metadata.region_id,
955 err: format!(
956 "column id {} already exists with different name {}",
957 self.column_metadata.column_id, existing_column.column_schema.name
958 ),
959 }
960 );
961 }
962
963 Ok(())
964 }
965
966 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
968 debug_assert!(self.validate(metadata).is_ok());
969 metadata
970 .column_by_name(&self.column_metadata.column_schema.name)
971 .is_none()
972 }
973}
974
975impl TryFrom<v1::region::AddColumn> for AddColumn {
976 type Error = MetadataError;
977
978 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
979 let column_def = add_column
980 .column_def
981 .context(InvalidRawRegionRequestSnafu {
982 err: "missing column_def in AddColumn",
983 })?;
984
985 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
986 let location = add_column
987 .location
988 .map(AddColumnLocation::try_from)
989 .transpose()?;
990
991 Ok(AddColumn {
992 column_metadata,
993 location,
994 })
995 }
996}
997
998#[derive(Debug, PartialEq, Eq, Clone)]
1000pub enum AddColumnLocation {
1001 First,
1003 After {
1005 column_name: String,
1007 },
1008}
1009
1010impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1011 type Error = MetadataError;
1012
1013 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1014 let location_type = LocationType::try_from(location.location_type)
1015 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1016 let add_column_location = match location_type {
1017 LocationType::First => AddColumnLocation::First,
1018 LocationType::After => AddColumnLocation::After {
1019 column_name: location.after_column_name,
1020 },
1021 };
1022
1023 Ok(add_column_location)
1024 }
1025}
1026
1027#[derive(Debug, PartialEq, Eq, Clone)]
1029pub struct ModifyColumnType {
1030 pub column_name: String,
1032 pub target_type: ConcreteDataType,
1034}
1035
1036impl ModifyColumnType {
1037 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1039 let column_meta = metadata
1040 .column_by_name(&self.column_name)
1041 .with_context(|| InvalidRegionRequestSnafu {
1042 region_id: metadata.region_id,
1043 err: format!("column {} not found", self.column_name),
1044 })?;
1045
1046 ensure!(
1047 matches!(column_meta.semantic_type, SemanticType::Field),
1048 InvalidRegionRequestSnafu {
1049 region_id: metadata.region_id,
1050 err: "'timestamp' or 'tag' column cannot change type".to_string()
1051 }
1052 );
1053 ensure!(
1054 column_meta
1055 .column_schema
1056 .data_type
1057 .can_arrow_type_cast_to(&self.target_type),
1058 InvalidRegionRequestSnafu {
1059 region_id: metadata.region_id,
1060 err: format!(
1061 "column '{}' cannot be cast automatically to type '{}'",
1062 self.column_name, self.target_type
1063 ),
1064 }
1065 );
1066
1067 Ok(())
1068 }
1069
1070 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1072 debug_assert!(self.validate(metadata).is_ok());
1073 metadata.column_by_name(&self.column_name).is_some()
1074 }
1075}
1076
1077impl From<v1::ModifyColumnType> for ModifyColumnType {
1078 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1079 let target_type = ColumnDataTypeWrapper::new(
1080 modify_column_type.target_type(),
1081 modify_column_type.target_type_extension,
1082 )
1083 .into();
1084
1085 ModifyColumnType {
1086 column_name: modify_column_type.column_name,
1087 target_type,
1088 }
1089 }
1090}
1091
1092#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1093pub enum SetRegionOption {
1094 Ttl(Option<TimeToLive>),
1095 Twsc(String, String),
1097}
1098
1099impl TryFrom<&PbOption> for SetRegionOption {
1100 type Error = MetadataError;
1101
1102 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1103 let PbOption { key, value } = value;
1104 match key.as_str() {
1105 TTL_KEY => {
1106 let ttl = TimeToLive::from_humantime_or_str(value)
1107 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1108
1109 Ok(Self::Ttl(Some(ttl)))
1110 }
1111 TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1112 Ok(Self::Twsc(key.to_string(), value.to_string()))
1113 }
1114 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1115 }
1116 }
1117}
1118
1119impl From<&UnsetRegionOption> for SetRegionOption {
1120 fn from(unset_option: &UnsetRegionOption) -> Self {
1121 match unset_option {
1122 UnsetRegionOption::TwcsTriggerFileNum => {
1123 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1124 }
1125 UnsetRegionOption::TwcsMaxOutputFileSize => {
1126 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1127 }
1128 UnsetRegionOption::TwcsTimeWindow => {
1129 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1130 }
1131 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1132 }
1133 }
1134}
1135
1136impl TryFrom<&str> for UnsetRegionOption {
1137 type Error = MetadataError;
1138
1139 fn try_from(key: &str) -> Result<Self> {
1140 match key.to_ascii_lowercase().as_str() {
1141 TTL_KEY => Ok(Self::Ttl),
1142 TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1143 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1144 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1145 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1146 }
1147 }
1148}
1149
1150#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1151pub enum UnsetRegionOption {
1152 TwcsTriggerFileNum,
1153 TwcsMaxOutputFileSize,
1154 TwcsTimeWindow,
1155 Ttl,
1156}
1157
1158impl UnsetRegionOption {
1159 pub fn as_str(&self) -> &str {
1160 match self {
1161 Self::Ttl => TTL_KEY,
1162 Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1163 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1164 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1165 }
1166 }
1167}
1168
1169impl Display for UnsetRegionOption {
1170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1171 write!(f, "{}", self.as_str())
1172 }
1173}
1174
1175#[derive(Debug, Clone, Default)]
1176pub struct RegionFlushRequest {
1177 pub row_group_size: Option<usize>,
1178}
1179
1180#[derive(Debug)]
1181pub struct RegionCompactRequest {
1182 pub options: compact_request::Options,
1183}
1184
1185impl Default for RegionCompactRequest {
1186 fn default() -> Self {
1187 Self {
1188 options: compact_request::Options::Regular(Default::default()),
1190 }
1191 }
1192}
1193
1194#[derive(Debug)]
1196pub struct RegionTruncateRequest {}
1197
1198#[derive(Debug, Clone, Copy)]
1203pub struct RegionCatchupRequest {
1204 pub set_writable: bool,
1206 pub entry_id: Option<entry::Id>,
1209 pub metadata_entry_id: Option<entry::Id>,
1213 pub location_id: Option<u64>,
1215}
1216
1217#[derive(Debug, Clone)]
1219pub struct RegionSequencesRequest {
1220 pub region_ids: Vec<RegionId>,
1221}
1222
1223#[derive(Debug, Clone)]
1224pub struct RegionBulkInsertsRequest {
1225 pub region_id: RegionId,
1226 pub payload: DfRecordBatch,
1227 pub raw_data: ArrowIpc,
1228}
1229
1230impl RegionBulkInsertsRequest {
1231 pub fn estimated_size(&self) -> usize {
1232 self.payload.get_array_memory_size()
1233 }
1234}
1235
1236impl fmt::Display for RegionRequest {
1237 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1238 match self {
1239 RegionRequest::Put(_) => write!(f, "Put"),
1240 RegionRequest::Delete(_) => write!(f, "Delete"),
1241 RegionRequest::Create(_) => write!(f, "Create"),
1242 RegionRequest::Drop(_) => write!(f, "Drop"),
1243 RegionRequest::Open(_) => write!(f, "Open"),
1244 RegionRequest::Close(_) => write!(f, "Close"),
1245 RegionRequest::Alter(_) => write!(f, "Alter"),
1246 RegionRequest::Flush(_) => write!(f, "Flush"),
1247 RegionRequest::Compact(_) => write!(f, "Compact"),
1248 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1249 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1250 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1251 }
1252 }
1253}
1254
1255#[cfg(test)]
1256mod tests {
1257 use api::v1::region::RegionColumnDef;
1258 use api::v1::{ColumnDataType, ColumnDef};
1259 use datatypes::prelude::ConcreteDataType;
1260 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1261
1262 use super::*;
1263 use crate::metadata::RegionMetadataBuilder;
1264
1265 #[test]
1266 fn test_from_proto_location() {
1267 let proto_location = v1::AddColumnLocation {
1268 location_type: LocationType::First as i32,
1269 after_column_name: String::default(),
1270 };
1271 let location = AddColumnLocation::try_from(proto_location).unwrap();
1272 assert_eq!(location, AddColumnLocation::First);
1273
1274 let proto_location = v1::AddColumnLocation {
1275 location_type: 10,
1276 after_column_name: String::default(),
1277 };
1278 AddColumnLocation::try_from(proto_location).unwrap_err();
1279
1280 let proto_location = v1::AddColumnLocation {
1281 location_type: LocationType::After as i32,
1282 after_column_name: "a".to_string(),
1283 };
1284 let location = AddColumnLocation::try_from(proto_location).unwrap();
1285 assert_eq!(
1286 location,
1287 AddColumnLocation::After {
1288 column_name: "a".to_string()
1289 }
1290 );
1291 }
1292
1293 #[test]
1294 fn test_from_none_proto_add_column() {
1295 AddColumn::try_from(v1::region::AddColumn {
1296 column_def: None,
1297 location: None,
1298 })
1299 .unwrap_err();
1300 }
1301
1302 #[test]
1303 fn test_from_proto_alter_request() {
1304 RegionAlterRequest::try_from(AlterRequest {
1305 region_id: 0,
1306 schema_version: 1,
1307 kind: None,
1308 })
1309 .unwrap_err();
1310
1311 let request = RegionAlterRequest::try_from(AlterRequest {
1312 region_id: 0,
1313 schema_version: 1,
1314 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1315 add_columns: vec![v1::region::AddColumn {
1316 column_def: Some(RegionColumnDef {
1317 column_def: Some(ColumnDef {
1318 name: "a".to_string(),
1319 data_type: ColumnDataType::String as i32,
1320 is_nullable: true,
1321 default_constraint: vec![],
1322 semantic_type: SemanticType::Field as i32,
1323 comment: String::new(),
1324 ..Default::default()
1325 }),
1326 column_id: 1,
1327 }),
1328 location: Some(v1::AddColumnLocation {
1329 location_type: LocationType::First as i32,
1330 after_column_name: String::default(),
1331 }),
1332 }],
1333 })),
1334 })
1335 .unwrap();
1336
1337 assert_eq!(
1338 request,
1339 RegionAlterRequest {
1340 kind: AlterKind::AddColumns {
1341 columns: vec![AddColumn {
1342 column_metadata: ColumnMetadata {
1343 column_schema: ColumnSchema::new(
1344 "a",
1345 ConcreteDataType::string_datatype(),
1346 true,
1347 ),
1348 semantic_type: SemanticType::Field,
1349 column_id: 1,
1350 },
1351 location: Some(AddColumnLocation::First),
1352 }]
1353 },
1354 }
1355 );
1356 }
1357
1358 fn new_metadata() -> RegionMetadata {
1361 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1362 builder
1363 .push_column_metadata(ColumnMetadata {
1364 column_schema: ColumnSchema::new(
1365 "ts",
1366 ConcreteDataType::timestamp_millisecond_datatype(),
1367 false,
1368 ),
1369 semantic_type: SemanticType::Timestamp,
1370 column_id: 1,
1371 })
1372 .push_column_metadata(ColumnMetadata {
1373 column_schema: ColumnSchema::new(
1374 "tag_0",
1375 ConcreteDataType::string_datatype(),
1376 true,
1377 ),
1378 semantic_type: SemanticType::Tag,
1379 column_id: 2,
1380 })
1381 .push_column_metadata(ColumnMetadata {
1382 column_schema: ColumnSchema::new(
1383 "field_0",
1384 ConcreteDataType::string_datatype(),
1385 true,
1386 ),
1387 semantic_type: SemanticType::Field,
1388 column_id: 3,
1389 })
1390 .push_column_metadata(ColumnMetadata {
1391 column_schema: ColumnSchema::new(
1392 "field_1",
1393 ConcreteDataType::boolean_datatype(),
1394 true,
1395 ),
1396 semantic_type: SemanticType::Field,
1397 column_id: 4,
1398 })
1399 .primary_key(vec![2]);
1400 builder.build().unwrap()
1401 }
1402
1403 #[test]
1404 fn test_add_column_validate() {
1405 let metadata = new_metadata();
1406 let add_column = AddColumn {
1407 column_metadata: ColumnMetadata {
1408 column_schema: ColumnSchema::new(
1409 "tag_1",
1410 ConcreteDataType::string_datatype(),
1411 true,
1412 ),
1413 semantic_type: SemanticType::Tag,
1414 column_id: 5,
1415 },
1416 location: None,
1417 };
1418 add_column.validate(&metadata).unwrap();
1419 assert!(add_column.need_alter(&metadata));
1420
1421 AddColumn {
1423 column_metadata: ColumnMetadata {
1424 column_schema: ColumnSchema::new(
1425 "tag_1",
1426 ConcreteDataType::string_datatype(),
1427 false,
1428 ),
1429 semantic_type: SemanticType::Tag,
1430 column_id: 5,
1431 },
1432 location: None,
1433 }
1434 .validate(&metadata)
1435 .unwrap_err();
1436
1437 let add_column = AddColumn {
1439 column_metadata: ColumnMetadata {
1440 column_schema: ColumnSchema::new(
1441 "tag_0",
1442 ConcreteDataType::string_datatype(),
1443 true,
1444 ),
1445 semantic_type: SemanticType::Tag,
1446 column_id: 2,
1447 },
1448 location: None,
1449 };
1450 add_column.validate(&metadata).unwrap();
1451 assert!(!add_column.need_alter(&metadata));
1452 }
1453
1454 #[test]
1455 fn test_add_duplicate_columns() {
1456 let kind = AlterKind::AddColumns {
1457 columns: vec![
1458 AddColumn {
1459 column_metadata: ColumnMetadata {
1460 column_schema: ColumnSchema::new(
1461 "tag_1",
1462 ConcreteDataType::string_datatype(),
1463 true,
1464 ),
1465 semantic_type: SemanticType::Tag,
1466 column_id: 5,
1467 },
1468 location: None,
1469 },
1470 AddColumn {
1471 column_metadata: ColumnMetadata {
1472 column_schema: ColumnSchema::new(
1473 "tag_1",
1474 ConcreteDataType::string_datatype(),
1475 true,
1476 ),
1477 semantic_type: SemanticType::Field,
1478 column_id: 6,
1479 },
1480 location: None,
1481 },
1482 ],
1483 };
1484 let metadata = new_metadata();
1485 kind.validate(&metadata).unwrap();
1486 assert!(kind.need_alter(&metadata));
1487 }
1488
1489 #[test]
1490 fn test_add_existing_column_different_metadata() {
1491 let metadata = new_metadata();
1492
1493 let kind = AlterKind::AddColumns {
1495 columns: vec![AddColumn {
1496 column_metadata: ColumnMetadata {
1497 column_schema: ColumnSchema::new(
1498 "tag_0",
1499 ConcreteDataType::string_datatype(),
1500 true,
1501 ),
1502 semantic_type: SemanticType::Tag,
1503 column_id: 4,
1504 },
1505 location: None,
1506 }],
1507 };
1508 kind.validate(&metadata).unwrap_err();
1509
1510 let kind = AlterKind::AddColumns {
1512 columns: vec![AddColumn {
1513 column_metadata: ColumnMetadata {
1514 column_schema: ColumnSchema::new(
1515 "tag_0",
1516 ConcreteDataType::int64_datatype(),
1517 true,
1518 ),
1519 semantic_type: SemanticType::Tag,
1520 column_id: 2,
1521 },
1522 location: None,
1523 }],
1524 };
1525 kind.validate(&metadata).unwrap_err();
1526
1527 let kind = AlterKind::AddColumns {
1529 columns: vec![AddColumn {
1530 column_metadata: ColumnMetadata {
1531 column_schema: ColumnSchema::new(
1532 "tag_1",
1533 ConcreteDataType::string_datatype(),
1534 true,
1535 ),
1536 semantic_type: SemanticType::Tag,
1537 column_id: 2,
1538 },
1539 location: None,
1540 }],
1541 };
1542 kind.validate(&metadata).unwrap_err();
1543 }
1544
1545 #[test]
1546 fn test_add_existing_column_with_location() {
1547 let metadata = new_metadata();
1548 let kind = AlterKind::AddColumns {
1549 columns: vec![AddColumn {
1550 column_metadata: ColumnMetadata {
1551 column_schema: ColumnSchema::new(
1552 "tag_0",
1553 ConcreteDataType::string_datatype(),
1554 true,
1555 ),
1556 semantic_type: SemanticType::Tag,
1557 column_id: 2,
1558 },
1559 location: Some(AddColumnLocation::First),
1560 }],
1561 };
1562 kind.validate(&metadata).unwrap_err();
1563 }
1564
1565 #[test]
1566 fn test_validate_drop_column() {
1567 let metadata = new_metadata();
1568 let kind = AlterKind::DropColumns {
1569 names: vec!["xxxx".to_string()],
1570 };
1571 kind.validate(&metadata).unwrap();
1572 assert!(!kind.need_alter(&metadata));
1573
1574 AlterKind::DropColumns {
1575 names: vec!["tag_0".to_string()],
1576 }
1577 .validate(&metadata)
1578 .unwrap_err();
1579
1580 let kind = AlterKind::DropColumns {
1581 names: vec!["field_0".to_string()],
1582 };
1583 kind.validate(&metadata).unwrap();
1584 assert!(kind.need_alter(&metadata));
1585 }
1586
1587 #[test]
1588 fn test_validate_modify_column_type() {
1589 let metadata = new_metadata();
1590 AlterKind::ModifyColumnTypes {
1591 columns: vec![ModifyColumnType {
1592 column_name: "xxxx".to_string(),
1593 target_type: ConcreteDataType::string_datatype(),
1594 }],
1595 }
1596 .validate(&metadata)
1597 .unwrap_err();
1598
1599 AlterKind::ModifyColumnTypes {
1600 columns: vec![ModifyColumnType {
1601 column_name: "field_1".to_string(),
1602 target_type: ConcreteDataType::date_datatype(),
1603 }],
1604 }
1605 .validate(&metadata)
1606 .unwrap_err();
1607
1608 AlterKind::ModifyColumnTypes {
1609 columns: vec![ModifyColumnType {
1610 column_name: "ts".to_string(),
1611 target_type: ConcreteDataType::date_datatype(),
1612 }],
1613 }
1614 .validate(&metadata)
1615 .unwrap_err();
1616
1617 AlterKind::ModifyColumnTypes {
1618 columns: vec![ModifyColumnType {
1619 column_name: "tag_0".to_string(),
1620 target_type: ConcreteDataType::date_datatype(),
1621 }],
1622 }
1623 .validate(&metadata)
1624 .unwrap_err();
1625
1626 let kind = AlterKind::ModifyColumnTypes {
1627 columns: vec![ModifyColumnType {
1628 column_name: "field_0".to_string(),
1629 target_type: ConcreteDataType::int32_datatype(),
1630 }],
1631 };
1632 kind.validate(&metadata).unwrap();
1633 assert!(kind.need_alter(&metadata));
1634 }
1635
1636 #[test]
1637 fn test_validate_add_columns() {
1638 let kind = AlterKind::AddColumns {
1639 columns: vec![
1640 AddColumn {
1641 column_metadata: ColumnMetadata {
1642 column_schema: ColumnSchema::new(
1643 "tag_1",
1644 ConcreteDataType::string_datatype(),
1645 true,
1646 ),
1647 semantic_type: SemanticType::Tag,
1648 column_id: 5,
1649 },
1650 location: None,
1651 },
1652 AddColumn {
1653 column_metadata: ColumnMetadata {
1654 column_schema: ColumnSchema::new(
1655 "field_2",
1656 ConcreteDataType::string_datatype(),
1657 true,
1658 ),
1659 semantic_type: SemanticType::Field,
1660 column_id: 6,
1661 },
1662 location: None,
1663 },
1664 ],
1665 };
1666 let request = RegionAlterRequest { kind };
1667 let mut metadata = new_metadata();
1668 metadata.schema_version = 1;
1669 request.validate(&metadata).unwrap();
1670 }
1671
1672 #[test]
1673 fn test_validate_create_region() {
1674 let column_metadatas = vec![
1675 ColumnMetadata {
1676 column_schema: ColumnSchema::new(
1677 "ts",
1678 ConcreteDataType::timestamp_millisecond_datatype(),
1679 false,
1680 ),
1681 semantic_type: SemanticType::Timestamp,
1682 column_id: 1,
1683 },
1684 ColumnMetadata {
1685 column_schema: ColumnSchema::new(
1686 "tag_0",
1687 ConcreteDataType::string_datatype(),
1688 true,
1689 ),
1690 semantic_type: SemanticType::Tag,
1691 column_id: 2,
1692 },
1693 ColumnMetadata {
1694 column_schema: ColumnSchema::new(
1695 "field_0",
1696 ConcreteDataType::string_datatype(),
1697 true,
1698 ),
1699 semantic_type: SemanticType::Field,
1700 column_id: 3,
1701 },
1702 ];
1703 let create = RegionCreateRequest {
1704 engine: "mito".to_string(),
1705 column_metadatas,
1706 primary_key: vec![3, 4],
1707 options: HashMap::new(),
1708 table_dir: "path".to_string(),
1709 path_type: PathType::Bare,
1710 };
1711
1712 assert!(create.validate().is_err());
1713 }
1714
1715 #[test]
1716 fn test_validate_modify_column_fulltext_options() {
1717 let kind = AlterKind::SetIndexes {
1718 options: vec![SetIndexOption::Fulltext {
1719 column_name: "tag_0".to_string(),
1720 options: FulltextOptions::new_unchecked(
1721 true,
1722 FulltextAnalyzer::Chinese,
1723 false,
1724 FulltextBackend::Bloom,
1725 1000,
1726 0.01,
1727 ),
1728 }],
1729 };
1730 let request = RegionAlterRequest { kind };
1731 let mut metadata = new_metadata();
1732 metadata.schema_version = 1;
1733 request.validate(&metadata).unwrap();
1734
1735 let kind = AlterKind::UnsetIndexes {
1736 options: vec![UnsetIndexOption::Fulltext {
1737 column_name: "tag_0".to_string(),
1738 }],
1739 };
1740 let request = RegionAlterRequest { kind };
1741 let mut metadata = new_metadata();
1742 metadata.schema_version = 1;
1743 request.validate(&metadata).unwrap();
1744 }
1745}