1use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::fmt::{self, Display};
18use std::io::Cursor;
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::add_column_location::LocationType;
22use api::v1::column_def::{
23 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
24};
25use api::v1::region::{
26 alter_request, compact_request, region_request, AlterRequest, AlterRequests,
27 BulkInsertRequests, CloseRequest, CompactRequest, CreateRequest, CreateRequests,
28 DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests, OpenRequest,
29 TruncateRequest,
30};
31use api::v1::{
32 self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
33 SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
34};
35pub use common_base::AffectedRows;
36use common_recordbatch::DfRecordBatch;
37use common_time::TimeToLive;
38use datatypes::arrow::ipc::reader::FileReader;
39use datatypes::prelude::ConcreteDataType;
40use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
41use serde::{Deserialize, Serialize};
42use snafu::{ensure, OptionExt, ResultExt};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47 ColumnMetadata, DecodeArrowIpcSnafu, DecodeProtoSnafu, InvalidRawRegionRequestSnafu,
48 InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
49 InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu,
50};
51use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
52use crate::mito_engine_options::{
53 TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS,
54 TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE,
55 TWCS_TIME_WINDOW,
56};
57use crate::path_utils::region_dir;
58use crate::storage::{ColumnId, RegionId, ScanRequest};
59
60#[derive(Debug, IntoStaticStr)]
61pub enum BatchRegionDdlRequest {
62 Create(Vec<(RegionId, RegionCreateRequest)>),
63 Drop(Vec<(RegionId, RegionDropRequest)>),
64 Alter(Vec<(RegionId, RegionAlterRequest)>),
65}
66
67impl BatchRegionDdlRequest {
68 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
70 match body {
71 region_request::Body::Creates(creates) => {
72 let requests = creates
73 .requests
74 .into_iter()
75 .map(parse_region_create)
76 .collect::<Result<Vec<_>>>()?;
77 Ok(Some(Self::Create(requests)))
78 }
79 region_request::Body::Drops(drops) => {
80 let requests = drops
81 .requests
82 .into_iter()
83 .map(parse_region_drop)
84 .collect::<Result<Vec<_>>>()?;
85 Ok(Some(Self::Drop(requests)))
86 }
87 region_request::Body::Alters(alters) => {
88 let requests = alters
89 .requests
90 .into_iter()
91 .map(parse_region_alter)
92 .collect::<Result<Vec<_>>>()?;
93 Ok(Some(Self::Alter(requests)))
94 }
95 _ => Ok(None),
96 }
97 }
98
99 pub fn request_type(&self) -> &'static str {
100 self.into()
101 }
102
103 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
104 match self {
105 Self::Create(requests) => requests
106 .into_iter()
107 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
108 .collect(),
109 Self::Drop(requests) => requests
110 .into_iter()
111 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
112 .collect(),
113 Self::Alter(requests) => requests
114 .into_iter()
115 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
116 .collect(),
117 }
118 }
119}
120
121#[derive(Debug, IntoStaticStr)]
122pub enum RegionRequest {
123 Put(RegionPutRequest),
124 Delete(RegionDeleteRequest),
125 Create(RegionCreateRequest),
126 Drop(RegionDropRequest),
127 Open(RegionOpenRequest),
128 Close(RegionCloseRequest),
129 Alter(RegionAlterRequest),
130 Flush(RegionFlushRequest),
131 Compact(RegionCompactRequest),
132 Truncate(RegionTruncateRequest),
133 Catchup(RegionCatchupRequest),
134 BulkInserts(RegionBulkInsertsRequest),
135}
136
137impl RegionRequest {
138 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
141 match body {
142 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
143 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
144 region_request::Body::Create(create) => make_region_create(create),
145 region_request::Body::Drop(drop) => make_region_drop(drop),
146 region_request::Body::Open(open) => make_region_open(open),
147 region_request::Body::Close(close) => make_region_close(close),
148 region_request::Body::Alter(alter) => make_region_alter(alter),
149 region_request::Body::Flush(flush) => make_region_flush(flush),
150 region_request::Body::Compact(compact) => make_region_compact(compact),
151 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
152 region_request::Body::Creates(creates) => make_region_creates(creates),
153 region_request::Body::Drops(drops) => make_region_drops(drops),
154 region_request::Body::Alters(alters) => make_region_alters(alters),
155 region_request::Body::BulkInserts(bulk) => make_region_bulk_inserts(bulk),
156 region_request::Body::Sync(_) => UnexpectedSnafu {
157 reason: "Sync request should be handled separately by RegionServer",
158 }
159 .fail(),
160 }
161 }
162
163 pub fn request_type(&self) -> &'static str {
165 self.into()
166 }
167}
168
169fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
170 let requests = inserts
171 .requests
172 .into_iter()
173 .filter_map(|r| {
174 let region_id = r.region_id.into();
175 r.rows.map(|rows| {
176 (
177 region_id,
178 RegionRequest::Put(RegionPutRequest { rows, hint: None }),
179 )
180 })
181 })
182 .collect();
183 Ok(requests)
184}
185
186fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
187 let requests = deletes
188 .requests
189 .into_iter()
190 .filter_map(|r| {
191 let region_id = r.region_id.into();
192 r.rows.map(|rows| {
193 (
194 region_id,
195 RegionRequest::Delete(RegionDeleteRequest { rows }),
196 )
197 })
198 })
199 .collect();
200 Ok(requests)
201}
202
203fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
204 let column_metadatas = create
205 .column_defs
206 .into_iter()
207 .map(ColumnMetadata::try_from_column_def)
208 .collect::<Result<Vec<_>>>()?;
209 let region_id = create.region_id.into();
210 let region_dir = region_dir(&create.path, region_id);
211 Ok((
212 region_id,
213 RegionCreateRequest {
214 engine: create.engine,
215 column_metadatas,
216 primary_key: create.primary_key,
217 options: create.options,
218 region_dir,
219 },
220 ))
221}
222
223fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
224 let (region_id, request) = parse_region_create(create)?;
225 Ok(vec![(region_id, RegionRequest::Create(request))])
226}
227
228fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
229 let mut requests = Vec::with_capacity(creates.requests.len());
230 for create in creates.requests {
231 requests.extend(make_region_create(create)?);
232 }
233 Ok(requests)
234}
235
236fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
237 let region_id = drop.region_id.into();
238 Ok((
239 region_id,
240 RegionDropRequest {
241 fast_path: drop.fast_path,
242 },
243 ))
244}
245
246fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
247 let (region_id, request) = parse_region_drop(drop)?;
248 Ok(vec![(region_id, RegionRequest::Drop(request))])
249}
250
251fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
252 let mut requests = Vec::with_capacity(drops.requests.len());
253 for drop in drops.requests {
254 requests.extend(make_region_drop(drop)?);
255 }
256 Ok(requests)
257}
258
259fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
260 let region_id = open.region_id.into();
261 let region_dir = region_dir(&open.path, region_id);
262 Ok(vec![(
263 region_id,
264 RegionRequest::Open(RegionOpenRequest {
265 engine: open.engine,
266 region_dir,
267 options: open.options,
268 skip_wal_replay: false,
269 }),
270 )])
271}
272
273fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
274 let region_id = close.region_id.into();
275 Ok(vec![(
276 region_id,
277 RegionRequest::Close(RegionCloseRequest {}),
278 )])
279}
280
281fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
282 let region_id = alter.region_id.into();
283 let request = RegionAlterRequest::try_from(alter)?;
284 Ok((region_id, request))
285}
286
287fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
288 let (region_id, request) = parse_region_alter(alter)?;
289 Ok(vec![(region_id, RegionRequest::Alter(request))])
290}
291
292fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
293 let mut requests = Vec::with_capacity(alters.requests.len());
294 for alter in alters.requests {
295 requests.extend(make_region_alter(alter)?);
296 }
297 Ok(requests)
298}
299
300fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
301 let region_id = flush.region_id.into();
302 Ok(vec![(
303 region_id,
304 RegionRequest::Flush(RegionFlushRequest {
305 row_group_size: None,
306 }),
307 )])
308}
309
310fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
311 let region_id = compact.region_id.into();
312 let options = compact
313 .options
314 .unwrap_or(compact_request::Options::Regular(Default::default()));
315 Ok(vec![(
316 region_id,
317 RegionRequest::Compact(RegionCompactRequest { options }),
318 )])
319}
320
321fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
322 let region_id = truncate.region_id.into();
323 Ok(vec![(
324 region_id,
325 RegionRequest::Truncate(RegionTruncateRequest {}),
326 )])
327}
328
329fn make_region_bulk_inserts(
331 requests: BulkInsertRequests,
332) -> Result<Vec<(RegionId, RegionRequest)>> {
333 let mut region_requests: HashMap<u64, Vec<BulkInsertPayload>> =
334 HashMap::with_capacity(requests.requests.len());
335
336 for req in requests.requests {
337 let region_id = req.region_id;
338 match req.payload_type() {
339 api::v1::region::BulkInsertType::ArrowIpc => {
340 let reader = FileReader::try_new(Cursor::new(req.payload), None)
342 .context(DecodeArrowIpcSnafu)?;
343 let record_batches = reader
344 .map(|b| b.map(BulkInsertPayload::ArrowIpc))
345 .try_collect::<Vec<_>>()
346 .context(DecodeArrowIpcSnafu)?;
347 match region_requests.entry(region_id) {
348 Entry::Occupied(mut e) => {
349 e.get_mut().extend(record_batches);
350 }
351 Entry::Vacant(e) => {
352 e.insert(record_batches);
353 }
354 }
355 }
356 }
357 }
358
359 let result = region_requests
360 .into_iter()
361 .map(|(region_id, payloads)| {
362 (
363 region_id.into(),
364 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
365 region_id: region_id.into(),
366 payloads,
367 }),
368 )
369 })
370 .collect::<Vec<_>>();
371 Ok(result)
372}
373
374#[derive(Debug)]
376pub struct RegionPutRequest {
377 pub rows: Rows,
379 pub hint: Option<WriteHint>,
381}
382
383#[derive(Debug)]
384pub struct RegionReadRequest {
385 pub request: ScanRequest,
386}
387
388#[derive(Debug)]
390pub struct RegionDeleteRequest {
391 pub rows: Rows,
395}
396
397#[derive(Debug, Clone)]
398pub struct RegionCreateRequest {
399 pub engine: String,
401 pub column_metadatas: Vec<ColumnMetadata>,
403 pub primary_key: Vec<ColumnId>,
405 pub options: HashMap<String, String>,
407 pub region_dir: String,
409}
410
411impl RegionCreateRequest {
412 pub fn validate(&self) -> Result<()> {
414 ensure!(
416 self.column_metadatas
417 .iter()
418 .any(|x| x.semantic_type == SemanticType::Timestamp),
419 InvalidRegionRequestSnafu {
420 region_id: RegionId::new(0, 0),
421 err: "missing timestamp column in create region request".to_string(),
422 }
423 );
424
425 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
427 for (i, c) in self.column_metadatas.iter().enumerate() {
428 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
429 return InvalidRegionRequestSnafu {
430 region_id: RegionId::new(0, 0),
431 err: format!(
432 "duplicate column id {} (at position {} and {}) in create region request",
433 c.column_id, previous, i
434 ),
435 }
436 .fail();
437 }
438 }
439
440 for column_id in &self.primary_key {
442 ensure!(
443 column_id_to_indices.contains_key(column_id),
444 InvalidRegionRequestSnafu {
445 region_id: RegionId::new(0, 0),
446 err: format!(
447 "missing primary key column {} in create region request",
448 column_id
449 ),
450 }
451 );
452 }
453
454 Ok(())
455 }
456
457 pub fn is_physical_table(&self) -> bool {
459 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
460 }
461}
462
463#[derive(Debug, Clone)]
464pub struct RegionDropRequest {
465 pub fast_path: bool,
466}
467
468#[derive(Debug, Clone)]
470pub struct RegionOpenRequest {
471 pub engine: String,
473 pub region_dir: String,
475 pub options: HashMap<String, String>,
477 pub skip_wal_replay: bool,
479}
480
481impl RegionOpenRequest {
482 pub fn is_physical_table(&self) -> bool {
484 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
485 }
486}
487
488#[derive(Debug)]
490pub struct RegionCloseRequest {}
491
492#[derive(Debug, PartialEq, Eq, Clone)]
494pub struct RegionAlterRequest {
495 pub kind: AlterKind,
497}
498
499impl RegionAlterRequest {
500 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
502 self.kind.validate(metadata)?;
503
504 Ok(())
505 }
506
507 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
511 debug_assert!(self.validate(metadata).is_ok());
512 self.kind.need_alter(metadata)
513 }
514}
515
516impl TryFrom<AlterRequest> for RegionAlterRequest {
517 type Error = MetadataError;
518
519 fn try_from(value: AlterRequest) -> Result<Self> {
520 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
521 err: "missing kind in AlterRequest",
522 })?;
523
524 let kind = AlterKind::try_from(kind)?;
525 Ok(RegionAlterRequest { kind })
526 }
527}
528
529#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
531pub enum AlterKind {
532 AddColumns {
534 columns: Vec<AddColumn>,
536 },
537 DropColumns {
539 names: Vec<String>,
541 },
542 ModifyColumnTypes {
544 columns: Vec<ModifyColumnType>,
546 },
547 SetRegionOptions { options: Vec<SetRegionOption> },
549 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
551 SetIndex { options: ApiSetIndexOptions },
553 UnsetIndex { options: ApiUnsetIndexOptions },
555}
556
557#[derive(Debug, PartialEq, Eq, Clone)]
558pub enum ApiSetIndexOptions {
559 Fulltext {
560 column_name: String,
561 options: FulltextOptions,
562 },
563 Inverted {
564 column_name: String,
565 },
566 Skipping {
567 column_name: String,
568 options: SkippingIndexOptions,
569 },
570}
571
572impl ApiSetIndexOptions {
573 pub fn column_name(&self) -> &String {
574 match self {
575 ApiSetIndexOptions::Fulltext { column_name, .. } => column_name,
576 ApiSetIndexOptions::Inverted { column_name } => column_name,
577 ApiSetIndexOptions::Skipping { column_name, .. } => column_name,
578 }
579 }
580
581 pub fn is_fulltext(&self) -> bool {
582 match self {
583 ApiSetIndexOptions::Fulltext { .. } => true,
584 ApiSetIndexOptions::Inverted { .. } => false,
585 ApiSetIndexOptions::Skipping { .. } => false,
586 }
587 }
588}
589
590#[derive(Debug, PartialEq, Eq, Clone)]
591pub enum ApiUnsetIndexOptions {
592 Fulltext { column_name: String },
593 Inverted { column_name: String },
594 Skipping { column_name: String },
595}
596
597impl ApiUnsetIndexOptions {
598 pub fn column_name(&self) -> &String {
599 match self {
600 ApiUnsetIndexOptions::Fulltext { column_name } => column_name,
601 ApiUnsetIndexOptions::Inverted { column_name } => column_name,
602 ApiUnsetIndexOptions::Skipping { column_name } => column_name,
603 }
604 }
605
606 pub fn is_fulltext(&self) -> bool {
607 match self {
608 ApiUnsetIndexOptions::Fulltext { .. } => true,
609 ApiUnsetIndexOptions::Inverted { .. } => false,
610 ApiUnsetIndexOptions::Skipping { .. } => false,
611 }
612 }
613}
614
615impl AlterKind {
616 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
620 match self {
621 AlterKind::AddColumns { columns } => {
622 for col_to_add in columns {
623 col_to_add.validate(metadata)?;
624 }
625 }
626 AlterKind::DropColumns { names } => {
627 for name in names {
628 Self::validate_column_to_drop(name, metadata)?;
629 }
630 }
631 AlterKind::ModifyColumnTypes { columns } => {
632 for col_to_change in columns {
633 col_to_change.validate(metadata)?;
634 }
635 }
636 AlterKind::SetRegionOptions { .. } => {}
637 AlterKind::UnsetRegionOptions { .. } => {}
638 AlterKind::SetIndex { options } => {
639 Self::validate_column_alter_index_option(
640 options.column_name(),
641 metadata,
642 options.is_fulltext(),
643 )?;
644 }
645 AlterKind::UnsetIndex { options } => {
646 Self::validate_column_alter_index_option(
647 options.column_name(),
648 metadata,
649 options.is_fulltext(),
650 )?;
651 }
652 }
653 Ok(())
654 }
655
656 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
658 debug_assert!(self.validate(metadata).is_ok());
659 match self {
660 AlterKind::AddColumns { columns } => columns
661 .iter()
662 .any(|col_to_add| col_to_add.need_alter(metadata)),
663 AlterKind::DropColumns { names } => names
664 .iter()
665 .any(|name| metadata.column_by_name(name).is_some()),
666 AlterKind::ModifyColumnTypes { columns } => columns
667 .iter()
668 .any(|col_to_change| col_to_change.need_alter(metadata)),
669 AlterKind::SetRegionOptions { .. } => {
670 true
673 }
674 AlterKind::UnsetRegionOptions { .. } => true,
675 AlterKind::SetIndex { options, .. } => {
676 metadata.column_by_name(options.column_name()).is_some()
677 }
678 AlterKind::UnsetIndex { options } => {
679 metadata.column_by_name(options.column_name()).is_some()
680 }
681 }
682 }
683
684 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
686 let Some(column) = metadata.column_by_name(name) else {
687 return Ok(());
688 };
689 ensure!(
690 column.semantic_type == SemanticType::Field,
691 InvalidRegionRequestSnafu {
692 region_id: metadata.region_id,
693 err: format!("column {} is not a field and could not be dropped", name),
694 }
695 );
696 Ok(())
697 }
698
699 fn validate_column_alter_index_option(
701 column_name: &String,
702 metadata: &RegionMetadata,
703 is_fulltext: bool,
704 ) -> Result<()> {
705 let column = metadata
706 .column_by_name(column_name)
707 .context(InvalidRegionRequestSnafu {
708 region_id: metadata.region_id,
709 err: format!("column {} not found", column_name),
710 })?;
711
712 if is_fulltext {
713 ensure!(
714 column.column_schema.data_type.is_string(),
715 InvalidRegionRequestSnafu {
716 region_id: metadata.region_id,
717 err: format!(
718 "cannot change alter index options for non-string column {}",
719 column_name
720 ),
721 }
722 );
723 }
724
725 Ok(())
726 }
727}
728
729impl TryFrom<alter_request::Kind> for AlterKind {
730 type Error = MetadataError;
731
732 fn try_from(kind: alter_request::Kind) -> Result<Self> {
733 let alter_kind = match kind {
734 alter_request::Kind::AddColumns(x) => {
735 let columns = x
736 .add_columns
737 .into_iter()
738 .map(|x| x.try_into())
739 .collect::<Result<Vec<_>>>()?;
740 AlterKind::AddColumns { columns }
741 }
742 alter_request::Kind::ModifyColumnTypes(x) => {
743 let columns = x
744 .modify_column_types
745 .into_iter()
746 .map(|x| x.into())
747 .collect::<Vec<_>>();
748 AlterKind::ModifyColumnTypes { columns }
749 }
750 alter_request::Kind::DropColumns(x) => {
751 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
752 AlterKind::DropColumns { names }
753 }
754 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
755 options: options
756 .table_options
757 .iter()
758 .map(TryFrom::try_from)
759 .collect::<Result<Vec<_>>>()?,
760 },
761 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
762 keys: options
763 .keys
764 .iter()
765 .map(|key| UnsetRegionOption::try_from(key.as_str()))
766 .collect::<Result<Vec<_>>>()?,
767 },
768 alter_request::Kind::SetIndex(o) => match o.options.unwrap() {
769 set_index::Options::Fulltext(x) => AlterKind::SetIndex {
770 options: ApiSetIndexOptions::Fulltext {
771 column_name: x.column_name.clone(),
772 options: FulltextOptions {
773 enable: x.enable,
774 analyzer: as_fulltext_option_analyzer(
775 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
776 ),
777 case_sensitive: x.case_sensitive,
778 backend: as_fulltext_option_backend(
779 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
780 ),
781 },
782 },
783 },
784 set_index::Options::Inverted(i) => AlterKind::SetIndex {
785 options: ApiSetIndexOptions::Inverted {
786 column_name: i.column_name,
787 },
788 },
789 set_index::Options::Skipping(s) => AlterKind::SetIndex {
790 options: ApiSetIndexOptions::Skipping {
791 column_name: s.column_name,
792 options: SkippingIndexOptions {
793 index_type: as_skipping_index_type(
794 PbSkippingIndexType::try_from(s.skipping_index_type)
795 .context(DecodeProtoSnafu)?,
796 ),
797 granularity: s.granularity as u32,
798 },
799 },
800 },
801 },
802 alter_request::Kind::UnsetIndex(o) => match o.options.unwrap() {
803 v1::unset_index::Options::Fulltext(f) => AlterKind::UnsetIndex {
804 options: ApiUnsetIndexOptions::Fulltext {
805 column_name: f.column_name,
806 },
807 },
808 v1::unset_index::Options::Inverted(i) => AlterKind::UnsetIndex {
809 options: ApiUnsetIndexOptions::Inverted {
810 column_name: i.column_name,
811 },
812 },
813 v1::unset_index::Options::Skipping(s) => AlterKind::UnsetIndex {
814 options: ApiUnsetIndexOptions::Skipping {
815 column_name: s.column_name,
816 },
817 },
818 },
819 };
820
821 Ok(alter_kind)
822 }
823}
824
825#[derive(Debug, PartialEq, Eq, Clone)]
827pub struct AddColumn {
828 pub column_metadata: ColumnMetadata,
830 pub location: Option<AddColumnLocation>,
833}
834
835impl AddColumn {
836 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
841 ensure!(
842 self.column_metadata.column_schema.is_nullable()
843 || self
844 .column_metadata
845 .column_schema
846 .default_constraint()
847 .is_some(),
848 InvalidRegionRequestSnafu {
849 region_id: metadata.region_id,
850 err: format!(
851 "no default value for column {}",
852 self.column_metadata.column_schema.name
853 ),
854 }
855 );
856
857 if let Some(existing_column) =
858 metadata.column_by_name(&self.column_metadata.column_schema.name)
859 {
860 ensure!(
862 *existing_column == self.column_metadata,
863 InvalidRegionRequestSnafu {
864 region_id: metadata.region_id,
865 err: format!(
866 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
867 self.column_metadata.column_schema.name, existing_column, self.column_metadata,
868 ),
869 }
870 );
871 ensure!(
872 self.location.is_none(),
873 InvalidRegionRequestSnafu {
874 region_id: metadata.region_id,
875 err: format!(
876 "column {} already exists, but location is specified",
877 self.column_metadata.column_schema.name
878 ),
879 }
880 );
881 }
882
883 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
884 ensure!(
886 existing_column.column_schema.name == self.column_metadata.column_schema.name,
887 InvalidRegionRequestSnafu {
888 region_id: metadata.region_id,
889 err: format!(
890 "column id {} already exists with different name {}",
891 self.column_metadata.column_id, existing_column.column_schema.name
892 ),
893 }
894 );
895 }
896
897 Ok(())
898 }
899
900 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
902 debug_assert!(self.validate(metadata).is_ok());
903 metadata
904 .column_by_name(&self.column_metadata.column_schema.name)
905 .is_none()
906 }
907}
908
909impl TryFrom<v1::region::AddColumn> for AddColumn {
910 type Error = MetadataError;
911
912 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
913 let column_def = add_column
914 .column_def
915 .context(InvalidRawRegionRequestSnafu {
916 err: "missing column_def in AddColumn",
917 })?;
918
919 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
920 let location = add_column
921 .location
922 .map(AddColumnLocation::try_from)
923 .transpose()?;
924
925 Ok(AddColumn {
926 column_metadata,
927 location,
928 })
929 }
930}
931
932#[derive(Debug, PartialEq, Eq, Clone)]
934pub enum AddColumnLocation {
935 First,
937 After {
939 column_name: String,
941 },
942}
943
944impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
945 type Error = MetadataError;
946
947 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
948 let location_type = LocationType::try_from(location.location_type)
949 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
950 let add_column_location = match location_type {
951 LocationType::First => AddColumnLocation::First,
952 LocationType::After => AddColumnLocation::After {
953 column_name: location.after_column_name,
954 },
955 };
956
957 Ok(add_column_location)
958 }
959}
960
961#[derive(Debug, PartialEq, Eq, Clone)]
963pub struct ModifyColumnType {
964 pub column_name: String,
966 pub target_type: ConcreteDataType,
968}
969
970impl ModifyColumnType {
971 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
973 let column_meta = metadata
974 .column_by_name(&self.column_name)
975 .with_context(|| InvalidRegionRequestSnafu {
976 region_id: metadata.region_id,
977 err: format!("column {} not found", self.column_name),
978 })?;
979
980 ensure!(
981 matches!(column_meta.semantic_type, SemanticType::Field),
982 InvalidRegionRequestSnafu {
983 region_id: metadata.region_id,
984 err: "'timestamp' or 'tag' column cannot change type".to_string()
985 }
986 );
987 ensure!(
988 column_meta
989 .column_schema
990 .data_type
991 .can_arrow_type_cast_to(&self.target_type),
992 InvalidRegionRequestSnafu {
993 region_id: metadata.region_id,
994 err: format!(
995 "column '{}' cannot be cast automatically to type '{}'",
996 self.column_name, self.target_type
997 ),
998 }
999 );
1000
1001 Ok(())
1002 }
1003
1004 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1006 debug_assert!(self.validate(metadata).is_ok());
1007 metadata.column_by_name(&self.column_name).is_some()
1008 }
1009}
1010
1011impl From<v1::ModifyColumnType> for ModifyColumnType {
1012 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1013 let target_type = ColumnDataTypeWrapper::new(
1014 modify_column_type.target_type(),
1015 modify_column_type.target_type_extension,
1016 )
1017 .into();
1018
1019 ModifyColumnType {
1020 column_name: modify_column_type.column_name,
1021 target_type,
1022 }
1023 }
1024}
1025
1026#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1027pub enum SetRegionOption {
1028 Ttl(Option<TimeToLive>),
1029 Twsc(String, String),
1031}
1032
1033impl TryFrom<&PbOption> for SetRegionOption {
1034 type Error = MetadataError;
1035
1036 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1037 let PbOption { key, value } = value;
1038 match key.as_str() {
1039 TTL_KEY => {
1040 let ttl = TimeToLive::from_humantime_or_str(value)
1041 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1042
1043 Ok(Self::Ttl(Some(ttl)))
1044 }
1045 TWCS_MAX_ACTIVE_WINDOW_RUNS
1046 | TWCS_MAX_ACTIVE_WINDOW_FILES
1047 | TWCS_MAX_INACTIVE_WINDOW_FILES
1048 | TWCS_MAX_INACTIVE_WINDOW_RUNS
1049 | TWCS_MAX_OUTPUT_FILE_SIZE
1050 | TWCS_TIME_WINDOW => Ok(Self::Twsc(key.to_string(), value.to_string())),
1051 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1052 }
1053 }
1054}
1055
1056impl From<&UnsetRegionOption> for SetRegionOption {
1057 fn from(unset_option: &UnsetRegionOption) -> Self {
1058 match unset_option {
1059 UnsetRegionOption::TwcsMaxActiveWindowFiles => {
1060 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1061 }
1062 UnsetRegionOption::TwcsMaxInactiveWindowFiles => {
1063 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1064 }
1065 UnsetRegionOption::TwcsMaxActiveWindowRuns => {
1066 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1067 }
1068 UnsetRegionOption::TwcsMaxInactiveWindowRuns => {
1069 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1070 }
1071 UnsetRegionOption::TwcsMaxOutputFileSize => {
1072 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1073 }
1074 UnsetRegionOption::TwcsTimeWindow => {
1075 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1076 }
1077 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1078 }
1079 }
1080}
1081
1082impl TryFrom<&str> for UnsetRegionOption {
1083 type Error = MetadataError;
1084
1085 fn try_from(key: &str) -> Result<Self> {
1086 match key.to_ascii_lowercase().as_str() {
1087 TTL_KEY => Ok(Self::Ttl),
1088 TWCS_MAX_ACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxActiveWindowFiles),
1089 TWCS_MAX_INACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxInactiveWindowFiles),
1090 TWCS_MAX_ACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxActiveWindowRuns),
1091 TWCS_MAX_INACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxInactiveWindowRuns),
1092 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1093 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1094 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1095 }
1096 }
1097}
1098
1099#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1100pub enum UnsetRegionOption {
1101 TwcsMaxActiveWindowFiles,
1102 TwcsMaxInactiveWindowFiles,
1103 TwcsMaxActiveWindowRuns,
1104 TwcsMaxInactiveWindowRuns,
1105 TwcsMaxOutputFileSize,
1106 TwcsTimeWindow,
1107 Ttl,
1108}
1109
1110impl UnsetRegionOption {
1111 pub fn as_str(&self) -> &str {
1112 match self {
1113 Self::Ttl => TTL_KEY,
1114 Self::TwcsMaxActiveWindowFiles => TWCS_MAX_ACTIVE_WINDOW_FILES,
1115 Self::TwcsMaxInactiveWindowFiles => TWCS_MAX_INACTIVE_WINDOW_FILES,
1116 Self::TwcsMaxActiveWindowRuns => TWCS_MAX_ACTIVE_WINDOW_RUNS,
1117 Self::TwcsMaxInactiveWindowRuns => TWCS_MAX_INACTIVE_WINDOW_RUNS,
1118 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1119 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1120 }
1121 }
1122}
1123
1124impl Display for UnsetRegionOption {
1125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1126 write!(f, "{}", self.as_str())
1127 }
1128}
1129
1130#[derive(Debug, Clone, Default)]
1131pub struct RegionFlushRequest {
1132 pub row_group_size: Option<usize>,
1133}
1134
1135#[derive(Debug)]
1136pub struct RegionCompactRequest {
1137 pub options: compact_request::Options,
1138}
1139
1140impl Default for RegionCompactRequest {
1141 fn default() -> Self {
1142 Self {
1143 options: compact_request::Options::Regular(Default::default()),
1145 }
1146 }
1147}
1148
1149#[derive(Debug)]
1151pub struct RegionTruncateRequest {}
1152
1153#[derive(Debug, Clone, Copy)]
1158pub struct RegionCatchupRequest {
1159 pub set_writable: bool,
1161 pub entry_id: Option<entry::Id>,
1164 pub metadata_entry_id: Option<entry::Id>,
1168 pub location_id: Option<u64>,
1170}
1171
1172#[derive(Debug, Clone)]
1174pub struct RegionSequencesRequest {
1175 pub region_ids: Vec<RegionId>,
1176}
1177
1178#[derive(Debug, Clone)]
1179pub struct RegionBulkInsertsRequest {
1180 pub region_id: RegionId,
1181 pub payloads: Vec<BulkInsertPayload>,
1182}
1183
1184#[derive(Debug, Clone)]
1185pub enum BulkInsertPayload {
1186 ArrowIpc(DfRecordBatch),
1187}
1188
1189impl fmt::Display for RegionRequest {
1190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1191 match self {
1192 RegionRequest::Put(_) => write!(f, "Put"),
1193 RegionRequest::Delete(_) => write!(f, "Delete"),
1194 RegionRequest::Create(_) => write!(f, "Create"),
1195 RegionRequest::Drop(_) => write!(f, "Drop"),
1196 RegionRequest::Open(_) => write!(f, "Open"),
1197 RegionRequest::Close(_) => write!(f, "Close"),
1198 RegionRequest::Alter(_) => write!(f, "Alter"),
1199 RegionRequest::Flush(_) => write!(f, "Flush"),
1200 RegionRequest::Compact(_) => write!(f, "Compact"),
1201 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1202 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1203 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1204 }
1205 }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210 use api::v1::region::RegionColumnDef;
1211 use api::v1::{ColumnDataType, ColumnDef};
1212 use datatypes::prelude::ConcreteDataType;
1213 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1214
1215 use super::*;
1216 use crate::metadata::RegionMetadataBuilder;
1217
1218 #[test]
1219 fn test_from_proto_location() {
1220 let proto_location = v1::AddColumnLocation {
1221 location_type: LocationType::First as i32,
1222 after_column_name: String::default(),
1223 };
1224 let location = AddColumnLocation::try_from(proto_location).unwrap();
1225 assert_eq!(location, AddColumnLocation::First);
1226
1227 let proto_location = v1::AddColumnLocation {
1228 location_type: 10,
1229 after_column_name: String::default(),
1230 };
1231 AddColumnLocation::try_from(proto_location).unwrap_err();
1232
1233 let proto_location = v1::AddColumnLocation {
1234 location_type: LocationType::After as i32,
1235 after_column_name: "a".to_string(),
1236 };
1237 let location = AddColumnLocation::try_from(proto_location).unwrap();
1238 assert_eq!(
1239 location,
1240 AddColumnLocation::After {
1241 column_name: "a".to_string()
1242 }
1243 );
1244 }
1245
1246 #[test]
1247 fn test_from_none_proto_add_column() {
1248 AddColumn::try_from(v1::region::AddColumn {
1249 column_def: None,
1250 location: None,
1251 })
1252 .unwrap_err();
1253 }
1254
1255 #[test]
1256 fn test_from_proto_alter_request() {
1257 RegionAlterRequest::try_from(AlterRequest {
1258 region_id: 0,
1259 schema_version: 1,
1260 kind: None,
1261 })
1262 .unwrap_err();
1263
1264 let request = RegionAlterRequest::try_from(AlterRequest {
1265 region_id: 0,
1266 schema_version: 1,
1267 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1268 add_columns: vec![v1::region::AddColumn {
1269 column_def: Some(RegionColumnDef {
1270 column_def: Some(ColumnDef {
1271 name: "a".to_string(),
1272 data_type: ColumnDataType::String as i32,
1273 is_nullable: true,
1274 default_constraint: vec![],
1275 semantic_type: SemanticType::Field as i32,
1276 comment: String::new(),
1277 ..Default::default()
1278 }),
1279 column_id: 1,
1280 }),
1281 location: Some(v1::AddColumnLocation {
1282 location_type: LocationType::First as i32,
1283 after_column_name: String::default(),
1284 }),
1285 }],
1286 })),
1287 })
1288 .unwrap();
1289
1290 assert_eq!(
1291 request,
1292 RegionAlterRequest {
1293 kind: AlterKind::AddColumns {
1294 columns: vec![AddColumn {
1295 column_metadata: ColumnMetadata {
1296 column_schema: ColumnSchema::new(
1297 "a",
1298 ConcreteDataType::string_datatype(),
1299 true,
1300 ),
1301 semantic_type: SemanticType::Field,
1302 column_id: 1,
1303 },
1304 location: Some(AddColumnLocation::First),
1305 }]
1306 },
1307 }
1308 );
1309 }
1310
1311 fn new_metadata() -> RegionMetadata {
1314 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1315 builder
1316 .push_column_metadata(ColumnMetadata {
1317 column_schema: ColumnSchema::new(
1318 "ts",
1319 ConcreteDataType::timestamp_millisecond_datatype(),
1320 false,
1321 ),
1322 semantic_type: SemanticType::Timestamp,
1323 column_id: 1,
1324 })
1325 .push_column_metadata(ColumnMetadata {
1326 column_schema: ColumnSchema::new(
1327 "tag_0",
1328 ConcreteDataType::string_datatype(),
1329 true,
1330 ),
1331 semantic_type: SemanticType::Tag,
1332 column_id: 2,
1333 })
1334 .push_column_metadata(ColumnMetadata {
1335 column_schema: ColumnSchema::new(
1336 "field_0",
1337 ConcreteDataType::string_datatype(),
1338 true,
1339 ),
1340 semantic_type: SemanticType::Field,
1341 column_id: 3,
1342 })
1343 .push_column_metadata(ColumnMetadata {
1344 column_schema: ColumnSchema::new(
1345 "field_1",
1346 ConcreteDataType::boolean_datatype(),
1347 true,
1348 ),
1349 semantic_type: SemanticType::Field,
1350 column_id: 4,
1351 })
1352 .primary_key(vec![2]);
1353 builder.build().unwrap()
1354 }
1355
1356 #[test]
1357 fn test_add_column_validate() {
1358 let metadata = new_metadata();
1359 let add_column = AddColumn {
1360 column_metadata: ColumnMetadata {
1361 column_schema: ColumnSchema::new(
1362 "tag_1",
1363 ConcreteDataType::string_datatype(),
1364 true,
1365 ),
1366 semantic_type: SemanticType::Tag,
1367 column_id: 5,
1368 },
1369 location: None,
1370 };
1371 add_column.validate(&metadata).unwrap();
1372 assert!(add_column.need_alter(&metadata));
1373
1374 AddColumn {
1376 column_metadata: ColumnMetadata {
1377 column_schema: ColumnSchema::new(
1378 "tag_1",
1379 ConcreteDataType::string_datatype(),
1380 false,
1381 ),
1382 semantic_type: SemanticType::Tag,
1383 column_id: 5,
1384 },
1385 location: None,
1386 }
1387 .validate(&metadata)
1388 .unwrap_err();
1389
1390 let add_column = AddColumn {
1392 column_metadata: ColumnMetadata {
1393 column_schema: ColumnSchema::new(
1394 "tag_0",
1395 ConcreteDataType::string_datatype(),
1396 true,
1397 ),
1398 semantic_type: SemanticType::Tag,
1399 column_id: 2,
1400 },
1401 location: None,
1402 };
1403 add_column.validate(&metadata).unwrap();
1404 assert!(!add_column.need_alter(&metadata));
1405 }
1406
1407 #[test]
1408 fn test_add_duplicate_columns() {
1409 let kind = AlterKind::AddColumns {
1410 columns: vec![
1411 AddColumn {
1412 column_metadata: ColumnMetadata {
1413 column_schema: ColumnSchema::new(
1414 "tag_1",
1415 ConcreteDataType::string_datatype(),
1416 true,
1417 ),
1418 semantic_type: SemanticType::Tag,
1419 column_id: 5,
1420 },
1421 location: None,
1422 },
1423 AddColumn {
1424 column_metadata: ColumnMetadata {
1425 column_schema: ColumnSchema::new(
1426 "tag_1",
1427 ConcreteDataType::string_datatype(),
1428 true,
1429 ),
1430 semantic_type: SemanticType::Field,
1431 column_id: 6,
1432 },
1433 location: None,
1434 },
1435 ],
1436 };
1437 let metadata = new_metadata();
1438 kind.validate(&metadata).unwrap();
1439 assert!(kind.need_alter(&metadata));
1440 }
1441
1442 #[test]
1443 fn test_add_existing_column_different_metadata() {
1444 let metadata = new_metadata();
1445
1446 let kind = AlterKind::AddColumns {
1448 columns: vec![AddColumn {
1449 column_metadata: ColumnMetadata {
1450 column_schema: ColumnSchema::new(
1451 "tag_0",
1452 ConcreteDataType::string_datatype(),
1453 true,
1454 ),
1455 semantic_type: SemanticType::Tag,
1456 column_id: 4,
1457 },
1458 location: None,
1459 }],
1460 };
1461 kind.validate(&metadata).unwrap_err();
1462
1463 let kind = AlterKind::AddColumns {
1465 columns: vec![AddColumn {
1466 column_metadata: ColumnMetadata {
1467 column_schema: ColumnSchema::new(
1468 "tag_0",
1469 ConcreteDataType::int64_datatype(),
1470 true,
1471 ),
1472 semantic_type: SemanticType::Tag,
1473 column_id: 2,
1474 },
1475 location: None,
1476 }],
1477 };
1478 kind.validate(&metadata).unwrap_err();
1479
1480 let kind = AlterKind::AddColumns {
1482 columns: vec![AddColumn {
1483 column_metadata: ColumnMetadata {
1484 column_schema: ColumnSchema::new(
1485 "tag_1",
1486 ConcreteDataType::string_datatype(),
1487 true,
1488 ),
1489 semantic_type: SemanticType::Tag,
1490 column_id: 2,
1491 },
1492 location: None,
1493 }],
1494 };
1495 kind.validate(&metadata).unwrap_err();
1496 }
1497
1498 #[test]
1499 fn test_add_existing_column_with_location() {
1500 let metadata = new_metadata();
1501 let kind = AlterKind::AddColumns {
1502 columns: vec![AddColumn {
1503 column_metadata: ColumnMetadata {
1504 column_schema: ColumnSchema::new(
1505 "tag_0",
1506 ConcreteDataType::string_datatype(),
1507 true,
1508 ),
1509 semantic_type: SemanticType::Tag,
1510 column_id: 2,
1511 },
1512 location: Some(AddColumnLocation::First),
1513 }],
1514 };
1515 kind.validate(&metadata).unwrap_err();
1516 }
1517
1518 #[test]
1519 fn test_validate_drop_column() {
1520 let metadata = new_metadata();
1521 let kind = AlterKind::DropColumns {
1522 names: vec!["xxxx".to_string()],
1523 };
1524 kind.validate(&metadata).unwrap();
1525 assert!(!kind.need_alter(&metadata));
1526
1527 AlterKind::DropColumns {
1528 names: vec!["tag_0".to_string()],
1529 }
1530 .validate(&metadata)
1531 .unwrap_err();
1532
1533 let kind = AlterKind::DropColumns {
1534 names: vec!["field_0".to_string()],
1535 };
1536 kind.validate(&metadata).unwrap();
1537 assert!(kind.need_alter(&metadata));
1538 }
1539
1540 #[test]
1541 fn test_validate_modify_column_type() {
1542 let metadata = new_metadata();
1543 AlterKind::ModifyColumnTypes {
1544 columns: vec![ModifyColumnType {
1545 column_name: "xxxx".to_string(),
1546 target_type: ConcreteDataType::string_datatype(),
1547 }],
1548 }
1549 .validate(&metadata)
1550 .unwrap_err();
1551
1552 AlterKind::ModifyColumnTypes {
1553 columns: vec![ModifyColumnType {
1554 column_name: "field_1".to_string(),
1555 target_type: ConcreteDataType::date_datatype(),
1556 }],
1557 }
1558 .validate(&metadata)
1559 .unwrap_err();
1560
1561 AlterKind::ModifyColumnTypes {
1562 columns: vec![ModifyColumnType {
1563 column_name: "ts".to_string(),
1564 target_type: ConcreteDataType::date_datatype(),
1565 }],
1566 }
1567 .validate(&metadata)
1568 .unwrap_err();
1569
1570 AlterKind::ModifyColumnTypes {
1571 columns: vec![ModifyColumnType {
1572 column_name: "tag_0".to_string(),
1573 target_type: ConcreteDataType::date_datatype(),
1574 }],
1575 }
1576 .validate(&metadata)
1577 .unwrap_err();
1578
1579 let kind = AlterKind::ModifyColumnTypes {
1580 columns: vec![ModifyColumnType {
1581 column_name: "field_0".to_string(),
1582 target_type: ConcreteDataType::int32_datatype(),
1583 }],
1584 };
1585 kind.validate(&metadata).unwrap();
1586 assert!(kind.need_alter(&metadata));
1587 }
1588
1589 #[test]
1590 fn test_validate_add_columns() {
1591 let kind = AlterKind::AddColumns {
1592 columns: vec![
1593 AddColumn {
1594 column_metadata: ColumnMetadata {
1595 column_schema: ColumnSchema::new(
1596 "tag_1",
1597 ConcreteDataType::string_datatype(),
1598 true,
1599 ),
1600 semantic_type: SemanticType::Tag,
1601 column_id: 5,
1602 },
1603 location: None,
1604 },
1605 AddColumn {
1606 column_metadata: ColumnMetadata {
1607 column_schema: ColumnSchema::new(
1608 "field_2",
1609 ConcreteDataType::string_datatype(),
1610 true,
1611 ),
1612 semantic_type: SemanticType::Field,
1613 column_id: 6,
1614 },
1615 location: None,
1616 },
1617 ],
1618 };
1619 let request = RegionAlterRequest { kind };
1620 let mut metadata = new_metadata();
1621 metadata.schema_version = 1;
1622 request.validate(&metadata).unwrap();
1623 }
1624
1625 #[test]
1626 fn test_validate_create_region() {
1627 let column_metadatas = vec![
1628 ColumnMetadata {
1629 column_schema: ColumnSchema::new(
1630 "ts",
1631 ConcreteDataType::timestamp_millisecond_datatype(),
1632 false,
1633 ),
1634 semantic_type: SemanticType::Timestamp,
1635 column_id: 1,
1636 },
1637 ColumnMetadata {
1638 column_schema: ColumnSchema::new(
1639 "tag_0",
1640 ConcreteDataType::string_datatype(),
1641 true,
1642 ),
1643 semantic_type: SemanticType::Tag,
1644 column_id: 2,
1645 },
1646 ColumnMetadata {
1647 column_schema: ColumnSchema::new(
1648 "field_0",
1649 ConcreteDataType::string_datatype(),
1650 true,
1651 ),
1652 semantic_type: SemanticType::Field,
1653 column_id: 3,
1654 },
1655 ];
1656 let create = RegionCreateRequest {
1657 engine: "mito".to_string(),
1658 column_metadatas,
1659 primary_key: vec![3, 4],
1660 options: HashMap::new(),
1661 region_dir: "path".to_string(),
1662 };
1663
1664 assert!(create.validate().is_err());
1665 }
1666
1667 #[test]
1668 fn test_validate_modify_column_fulltext_options() {
1669 let kind = AlterKind::SetIndex {
1670 options: ApiSetIndexOptions::Fulltext {
1671 column_name: "tag_0".to_string(),
1672 options: FulltextOptions {
1673 enable: true,
1674 analyzer: FulltextAnalyzer::Chinese,
1675 case_sensitive: false,
1676 backend: FulltextBackend::Bloom,
1677 },
1678 },
1679 };
1680 let request = RegionAlterRequest { kind };
1681 let mut metadata = new_metadata();
1682 metadata.schema_version = 1;
1683 request.validate(&metadata).unwrap();
1684
1685 let kind = AlterKind::UnsetIndex {
1686 options: ApiUnsetIndexOptions::Fulltext {
1687 column_name: "tag_0".to_string(),
1688 },
1689 };
1690 let request = RegionAlterRequest { kind };
1691 let mut metadata = new_metadata();
1692 metadata.schema_version = 1;
1693 request.validate(&metadata).unwrap();
1694 }
1695}