1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest,
26 CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest,
27 DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
28};
29use api::v1::{
30 self, set_index, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption,
31 Rows, SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::TimeToLive;
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use serde::{Deserialize, Serialize};
40use snafu::{ensure, OptionExt, ResultExt};
41use strum::{AsRefStr, IntoStaticStr};
42
43use crate::logstore::entry;
44use crate::metadata::{
45 ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidRawRegionRequestSnafu,
46 InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
47 InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu,
48};
49use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
50use crate::metrics;
51use crate::mito_engine_options::{
52 TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
53};
54use crate::path_utils::region_dir;
55use crate::storage::{ColumnId, RegionId, ScanRequest};
56
57#[derive(Debug, IntoStaticStr)]
58pub enum BatchRegionDdlRequest {
59 Create(Vec<(RegionId, RegionCreateRequest)>),
60 Drop(Vec<(RegionId, RegionDropRequest)>),
61 Alter(Vec<(RegionId, RegionAlterRequest)>),
62}
63
64impl BatchRegionDdlRequest {
65 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
67 match body {
68 region_request::Body::Creates(creates) => {
69 let requests = creates
70 .requests
71 .into_iter()
72 .map(parse_region_create)
73 .collect::<Result<Vec<_>>>()?;
74 Ok(Some(Self::Create(requests)))
75 }
76 region_request::Body::Drops(drops) => {
77 let requests = drops
78 .requests
79 .into_iter()
80 .map(parse_region_drop)
81 .collect::<Result<Vec<_>>>()?;
82 Ok(Some(Self::Drop(requests)))
83 }
84 region_request::Body::Alters(alters) => {
85 let requests = alters
86 .requests
87 .into_iter()
88 .map(parse_region_alter)
89 .collect::<Result<Vec<_>>>()?;
90 Ok(Some(Self::Alter(requests)))
91 }
92 _ => Ok(None),
93 }
94 }
95
96 pub fn request_type(&self) -> &'static str {
97 self.into()
98 }
99
100 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
101 match self {
102 Self::Create(requests) => requests
103 .into_iter()
104 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
105 .collect(),
106 Self::Drop(requests) => requests
107 .into_iter()
108 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
109 .collect(),
110 Self::Alter(requests) => requests
111 .into_iter()
112 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
113 .collect(),
114 }
115 }
116}
117
118#[derive(Debug, IntoStaticStr)]
119pub enum RegionRequest {
120 Put(RegionPutRequest),
121 Delete(RegionDeleteRequest),
122 Create(RegionCreateRequest),
123 Drop(RegionDropRequest),
124 Open(RegionOpenRequest),
125 Close(RegionCloseRequest),
126 Alter(RegionAlterRequest),
127 Flush(RegionFlushRequest),
128 Compact(RegionCompactRequest),
129 Truncate(RegionTruncateRequest),
130 Catchup(RegionCatchupRequest),
131 BulkInserts(RegionBulkInsertsRequest),
132}
133
134impl RegionRequest {
135 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
138 match body {
139 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
140 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
141 region_request::Body::Create(create) => make_region_create(create),
142 region_request::Body::Drop(drop) => make_region_drop(drop),
143 region_request::Body::Open(open) => make_region_open(open),
144 region_request::Body::Close(close) => make_region_close(close),
145 region_request::Body::Alter(alter) => make_region_alter(alter),
146 region_request::Body::Flush(flush) => make_region_flush(flush),
147 region_request::Body::Compact(compact) => make_region_compact(compact),
148 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
149 region_request::Body::Creates(creates) => make_region_creates(creates),
150 region_request::Body::Drops(drops) => make_region_drops(drops),
151 region_request::Body::Alters(alters) => make_region_alters(alters),
152 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
153 region_request::Body::Sync(_) => UnexpectedSnafu {
154 reason: "Sync request should be handled separately by RegionServer",
155 }
156 .fail(),
157 }
158 }
159
160 pub fn request_type(&self) -> &'static str {
162 self.into()
163 }
164}
165
166fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
167 let requests = inserts
168 .requests
169 .into_iter()
170 .filter_map(|r| {
171 let region_id = r.region_id.into();
172 r.rows.map(|rows| {
173 (
174 region_id,
175 RegionRequest::Put(RegionPutRequest { rows, hint: None }),
176 )
177 })
178 })
179 .collect();
180 Ok(requests)
181}
182
183fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
184 let requests = deletes
185 .requests
186 .into_iter()
187 .filter_map(|r| {
188 let region_id = r.region_id.into();
189 r.rows.map(|rows| {
190 (
191 region_id,
192 RegionRequest::Delete(RegionDeleteRequest { rows }),
193 )
194 })
195 })
196 .collect();
197 Ok(requests)
198}
199
200fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
201 let column_metadatas = create
202 .column_defs
203 .into_iter()
204 .map(ColumnMetadata::try_from_column_def)
205 .collect::<Result<Vec<_>>>()?;
206 let region_id = create.region_id.into();
207 let region_dir = region_dir(&create.path, region_id);
208 Ok((
209 region_id,
210 RegionCreateRequest {
211 engine: create.engine,
212 column_metadatas,
213 primary_key: create.primary_key,
214 options: create.options,
215 region_dir,
216 },
217 ))
218}
219
220fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
221 let (region_id, request) = parse_region_create(create)?;
222 Ok(vec![(region_id, RegionRequest::Create(request))])
223}
224
225fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
226 let mut requests = Vec::with_capacity(creates.requests.len());
227 for create in creates.requests {
228 requests.extend(make_region_create(create)?);
229 }
230 Ok(requests)
231}
232
233fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
234 let region_id = drop.region_id.into();
235 Ok((
236 region_id,
237 RegionDropRequest {
238 fast_path: drop.fast_path,
239 },
240 ))
241}
242
243fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
244 let (region_id, request) = parse_region_drop(drop)?;
245 Ok(vec![(region_id, RegionRequest::Drop(request))])
246}
247
248fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
249 let mut requests = Vec::with_capacity(drops.requests.len());
250 for drop in drops.requests {
251 requests.extend(make_region_drop(drop)?);
252 }
253 Ok(requests)
254}
255
256fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
257 let region_id = open.region_id.into();
258 let region_dir = region_dir(&open.path, region_id);
259 Ok(vec![(
260 region_id,
261 RegionRequest::Open(RegionOpenRequest {
262 engine: open.engine,
263 region_dir,
264 options: open.options,
265 skip_wal_replay: false,
266 }),
267 )])
268}
269
270fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
271 let region_id = close.region_id.into();
272 Ok(vec![(
273 region_id,
274 RegionRequest::Close(RegionCloseRequest {}),
275 )])
276}
277
278fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
279 let region_id = alter.region_id.into();
280 let request = RegionAlterRequest::try_from(alter)?;
281 Ok((region_id, request))
282}
283
284fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
285 let (region_id, request) = parse_region_alter(alter)?;
286 Ok(vec![(region_id, RegionRequest::Alter(request))])
287}
288
289fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
290 let mut requests = Vec::with_capacity(alters.requests.len());
291 for alter in alters.requests {
292 requests.extend(make_region_alter(alter)?);
293 }
294 Ok(requests)
295}
296
297fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
298 let region_id = flush.region_id.into();
299 Ok(vec![(
300 region_id,
301 RegionRequest::Flush(RegionFlushRequest {
302 row_group_size: None,
303 }),
304 )])
305}
306
307fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
308 let region_id = compact.region_id.into();
309 let options = compact
310 .options
311 .unwrap_or(compact_request::Options::Regular(Default::default()));
312 Ok(vec![(
313 region_id,
314 RegionRequest::Compact(RegionCompactRequest { options }),
315 )])
316}
317
318fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
319 let region_id = truncate.region_id.into();
320 Ok(vec![(
321 region_id,
322 RegionRequest::Truncate(RegionTruncateRequest {}),
323 )])
324}
325
326fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
328 let region_id = request.region_id.into();
329 let Some(Body::ArrowIpc(request)) = request.body else {
330 return Ok(vec![]);
331 };
332
333 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
334 .with_label_values(&["decode"])
335 .start_timer();
336 let mut decoder =
337 FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
338 let payload = decoder
339 .try_decode_record_batch(&request.data_header, &request.payload)
340 .context(FlightCodecSnafu)?;
341 decoder_timer.observe_duration();
342 Ok(vec![(
343 region_id,
344 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
345 region_id,
346 payload,
347 raw_data: request,
348 }),
349 )])
350}
351
352#[derive(Debug)]
354pub struct RegionPutRequest {
355 pub rows: Rows,
357 pub hint: Option<WriteHint>,
359}
360
361#[derive(Debug)]
362pub struct RegionReadRequest {
363 pub request: ScanRequest,
364}
365
366#[derive(Debug)]
368pub struct RegionDeleteRequest {
369 pub rows: Rows,
373}
374
375#[derive(Debug, Clone)]
376pub struct RegionCreateRequest {
377 pub engine: String,
379 pub column_metadatas: Vec<ColumnMetadata>,
381 pub primary_key: Vec<ColumnId>,
383 pub options: HashMap<String, String>,
385 pub region_dir: String,
387}
388
389impl RegionCreateRequest {
390 pub fn validate(&self) -> Result<()> {
392 ensure!(
394 self.column_metadatas
395 .iter()
396 .any(|x| x.semantic_type == SemanticType::Timestamp),
397 InvalidRegionRequestSnafu {
398 region_id: RegionId::new(0, 0),
399 err: "missing timestamp column in create region request".to_string(),
400 }
401 );
402
403 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
405 for (i, c) in self.column_metadatas.iter().enumerate() {
406 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
407 return InvalidRegionRequestSnafu {
408 region_id: RegionId::new(0, 0),
409 err: format!(
410 "duplicate column id {} (at position {} and {}) in create region request",
411 c.column_id, previous, i
412 ),
413 }
414 .fail();
415 }
416 }
417
418 for column_id in &self.primary_key {
420 ensure!(
421 column_id_to_indices.contains_key(column_id),
422 InvalidRegionRequestSnafu {
423 region_id: RegionId::new(0, 0),
424 err: format!(
425 "missing primary key column {} in create region request",
426 column_id
427 ),
428 }
429 );
430 }
431
432 Ok(())
433 }
434
435 pub fn is_physical_table(&self) -> bool {
437 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
438 }
439}
440
441#[derive(Debug, Clone)]
442pub struct RegionDropRequest {
443 pub fast_path: bool,
444}
445
446#[derive(Debug, Clone)]
448pub struct RegionOpenRequest {
449 pub engine: String,
451 pub region_dir: String,
453 pub options: HashMap<String, String>,
455 pub skip_wal_replay: bool,
457}
458
459impl RegionOpenRequest {
460 pub fn is_physical_table(&self) -> bool {
462 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
463 }
464}
465
466#[derive(Debug)]
468pub struct RegionCloseRequest {}
469
470#[derive(Debug, PartialEq, Eq, Clone)]
472pub struct RegionAlterRequest {
473 pub kind: AlterKind,
475}
476
477impl RegionAlterRequest {
478 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
480 self.kind.validate(metadata)?;
481
482 Ok(())
483 }
484
485 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
489 debug_assert!(self.validate(metadata).is_ok());
490 self.kind.need_alter(metadata)
491 }
492}
493
494impl TryFrom<AlterRequest> for RegionAlterRequest {
495 type Error = MetadataError;
496
497 fn try_from(value: AlterRequest) -> Result<Self> {
498 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
499 err: "missing kind in AlterRequest",
500 })?;
501
502 let kind = AlterKind::try_from(kind)?;
503 Ok(RegionAlterRequest { kind })
504 }
505}
506
507#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
509pub enum AlterKind {
510 AddColumns {
512 columns: Vec<AddColumn>,
514 },
515 DropColumns {
517 names: Vec<String>,
519 },
520 ModifyColumnTypes {
522 columns: Vec<ModifyColumnType>,
524 },
525 SetRegionOptions { options: Vec<SetRegionOption> },
527 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
529 SetIndex { options: ApiSetIndexOptions },
531 UnsetIndex { options: ApiUnsetIndexOptions },
533}
534
535#[derive(Debug, PartialEq, Eq, Clone)]
536pub enum ApiSetIndexOptions {
537 Fulltext {
538 column_name: String,
539 options: FulltextOptions,
540 },
541 Inverted {
542 column_name: String,
543 },
544 Skipping {
545 column_name: String,
546 options: SkippingIndexOptions,
547 },
548}
549
550impl ApiSetIndexOptions {
551 pub fn column_name(&self) -> &String {
552 match self {
553 ApiSetIndexOptions::Fulltext { column_name, .. } => column_name,
554 ApiSetIndexOptions::Inverted { column_name } => column_name,
555 ApiSetIndexOptions::Skipping { column_name, .. } => column_name,
556 }
557 }
558
559 pub fn is_fulltext(&self) -> bool {
560 match self {
561 ApiSetIndexOptions::Fulltext { .. } => true,
562 ApiSetIndexOptions::Inverted { .. } => false,
563 ApiSetIndexOptions::Skipping { .. } => false,
564 }
565 }
566}
567
568#[derive(Debug, PartialEq, Eq, Clone)]
569pub enum ApiUnsetIndexOptions {
570 Fulltext { column_name: String },
571 Inverted { column_name: String },
572 Skipping { column_name: String },
573}
574
575impl ApiUnsetIndexOptions {
576 pub fn column_name(&self) -> &String {
577 match self {
578 ApiUnsetIndexOptions::Fulltext { column_name } => column_name,
579 ApiUnsetIndexOptions::Inverted { column_name } => column_name,
580 ApiUnsetIndexOptions::Skipping { column_name } => column_name,
581 }
582 }
583
584 pub fn is_fulltext(&self) -> bool {
585 match self {
586 ApiUnsetIndexOptions::Fulltext { .. } => true,
587 ApiUnsetIndexOptions::Inverted { .. } => false,
588 ApiUnsetIndexOptions::Skipping { .. } => false,
589 }
590 }
591}
592
593impl AlterKind {
594 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
598 match self {
599 AlterKind::AddColumns { columns } => {
600 for col_to_add in columns {
601 col_to_add.validate(metadata)?;
602 }
603 }
604 AlterKind::DropColumns { names } => {
605 for name in names {
606 Self::validate_column_to_drop(name, metadata)?;
607 }
608 }
609 AlterKind::ModifyColumnTypes { columns } => {
610 for col_to_change in columns {
611 col_to_change.validate(metadata)?;
612 }
613 }
614 AlterKind::SetRegionOptions { .. } => {}
615 AlterKind::UnsetRegionOptions { .. } => {}
616 AlterKind::SetIndex { options } => {
617 Self::validate_column_alter_index_option(
618 options.column_name(),
619 metadata,
620 options.is_fulltext(),
621 )?;
622 }
623 AlterKind::UnsetIndex { options } => {
624 Self::validate_column_alter_index_option(
625 options.column_name(),
626 metadata,
627 options.is_fulltext(),
628 )?;
629 }
630 }
631 Ok(())
632 }
633
634 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
636 debug_assert!(self.validate(metadata).is_ok());
637 match self {
638 AlterKind::AddColumns { columns } => columns
639 .iter()
640 .any(|col_to_add| col_to_add.need_alter(metadata)),
641 AlterKind::DropColumns { names } => names
642 .iter()
643 .any(|name| metadata.column_by_name(name).is_some()),
644 AlterKind::ModifyColumnTypes { columns } => columns
645 .iter()
646 .any(|col_to_change| col_to_change.need_alter(metadata)),
647 AlterKind::SetRegionOptions { .. } => {
648 true
651 }
652 AlterKind::UnsetRegionOptions { .. } => true,
653 AlterKind::SetIndex { options, .. } => {
654 metadata.column_by_name(options.column_name()).is_some()
655 }
656 AlterKind::UnsetIndex { options } => {
657 metadata.column_by_name(options.column_name()).is_some()
658 }
659 }
660 }
661
662 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
664 let Some(column) = metadata.column_by_name(name) else {
665 return Ok(());
666 };
667 ensure!(
668 column.semantic_type == SemanticType::Field,
669 InvalidRegionRequestSnafu {
670 region_id: metadata.region_id,
671 err: format!("column {} is not a field and could not be dropped", name),
672 }
673 );
674 Ok(())
675 }
676
677 fn validate_column_alter_index_option(
679 column_name: &String,
680 metadata: &RegionMetadata,
681 is_fulltext: bool,
682 ) -> Result<()> {
683 let column = metadata
684 .column_by_name(column_name)
685 .context(InvalidRegionRequestSnafu {
686 region_id: metadata.region_id,
687 err: format!("column {} not found", column_name),
688 })?;
689
690 if is_fulltext {
691 ensure!(
692 column.column_schema.data_type.is_string(),
693 InvalidRegionRequestSnafu {
694 region_id: metadata.region_id,
695 err: format!(
696 "cannot change alter index options for non-string column {}",
697 column_name
698 ),
699 }
700 );
701 }
702
703 Ok(())
704 }
705}
706
707impl TryFrom<alter_request::Kind> for AlterKind {
708 type Error = MetadataError;
709
710 fn try_from(kind: alter_request::Kind) -> Result<Self> {
711 let alter_kind = match kind {
712 alter_request::Kind::AddColumns(x) => {
713 let columns = x
714 .add_columns
715 .into_iter()
716 .map(|x| x.try_into())
717 .collect::<Result<Vec<_>>>()?;
718 AlterKind::AddColumns { columns }
719 }
720 alter_request::Kind::ModifyColumnTypes(x) => {
721 let columns = x
722 .modify_column_types
723 .into_iter()
724 .map(|x| x.into())
725 .collect::<Vec<_>>();
726 AlterKind::ModifyColumnTypes { columns }
727 }
728 alter_request::Kind::DropColumns(x) => {
729 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
730 AlterKind::DropColumns { names }
731 }
732 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
733 options: options
734 .table_options
735 .iter()
736 .map(TryFrom::try_from)
737 .collect::<Result<Vec<_>>>()?,
738 },
739 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
740 keys: options
741 .keys
742 .iter()
743 .map(|key| UnsetRegionOption::try_from(key.as_str()))
744 .collect::<Result<Vec<_>>>()?,
745 },
746 alter_request::Kind::SetIndex(o) => match o.options.unwrap() {
747 set_index::Options::Fulltext(x) => AlterKind::SetIndex {
748 options: ApiSetIndexOptions::Fulltext {
749 column_name: x.column_name.clone(),
750 options: FulltextOptions {
751 enable: x.enable,
752 analyzer: as_fulltext_option_analyzer(
753 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
754 ),
755 case_sensitive: x.case_sensitive,
756 backend: as_fulltext_option_backend(
757 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
758 ),
759 },
760 },
761 },
762 set_index::Options::Inverted(i) => AlterKind::SetIndex {
763 options: ApiSetIndexOptions::Inverted {
764 column_name: i.column_name,
765 },
766 },
767 set_index::Options::Skipping(s) => AlterKind::SetIndex {
768 options: ApiSetIndexOptions::Skipping {
769 column_name: s.column_name,
770 options: SkippingIndexOptions {
771 index_type: as_skipping_index_type(
772 PbSkippingIndexType::try_from(s.skipping_index_type)
773 .context(DecodeProtoSnafu)?,
774 ),
775 granularity: s.granularity as u32,
776 },
777 },
778 },
779 },
780 alter_request::Kind::UnsetIndex(o) => match o.options.unwrap() {
781 v1::unset_index::Options::Fulltext(f) => AlterKind::UnsetIndex {
782 options: ApiUnsetIndexOptions::Fulltext {
783 column_name: f.column_name,
784 },
785 },
786 v1::unset_index::Options::Inverted(i) => AlterKind::UnsetIndex {
787 options: ApiUnsetIndexOptions::Inverted {
788 column_name: i.column_name,
789 },
790 },
791 v1::unset_index::Options::Skipping(s) => AlterKind::UnsetIndex {
792 options: ApiUnsetIndexOptions::Skipping {
793 column_name: s.column_name,
794 },
795 },
796 },
797 };
798
799 Ok(alter_kind)
800 }
801}
802
803#[derive(Debug, PartialEq, Eq, Clone)]
805pub struct AddColumn {
806 pub column_metadata: ColumnMetadata,
808 pub location: Option<AddColumnLocation>,
811}
812
813impl AddColumn {
814 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
819 ensure!(
820 self.column_metadata.column_schema.is_nullable()
821 || self
822 .column_metadata
823 .column_schema
824 .default_constraint()
825 .is_some(),
826 InvalidRegionRequestSnafu {
827 region_id: metadata.region_id,
828 err: format!(
829 "no default value for column {}",
830 self.column_metadata.column_schema.name
831 ),
832 }
833 );
834
835 if let Some(existing_column) =
836 metadata.column_by_name(&self.column_metadata.column_schema.name)
837 {
838 ensure!(
840 *existing_column == self.column_metadata,
841 InvalidRegionRequestSnafu {
842 region_id: metadata.region_id,
843 err: format!(
844 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
845 self.column_metadata.column_schema.name, existing_column, self.column_metadata,
846 ),
847 }
848 );
849 ensure!(
850 self.location.is_none(),
851 InvalidRegionRequestSnafu {
852 region_id: metadata.region_id,
853 err: format!(
854 "column {} already exists, but location is specified",
855 self.column_metadata.column_schema.name
856 ),
857 }
858 );
859 }
860
861 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
862 ensure!(
864 existing_column.column_schema.name == self.column_metadata.column_schema.name,
865 InvalidRegionRequestSnafu {
866 region_id: metadata.region_id,
867 err: format!(
868 "column id {} already exists with different name {}",
869 self.column_metadata.column_id, existing_column.column_schema.name
870 ),
871 }
872 );
873 }
874
875 Ok(())
876 }
877
878 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
880 debug_assert!(self.validate(metadata).is_ok());
881 metadata
882 .column_by_name(&self.column_metadata.column_schema.name)
883 .is_none()
884 }
885}
886
887impl TryFrom<v1::region::AddColumn> for AddColumn {
888 type Error = MetadataError;
889
890 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
891 let column_def = add_column
892 .column_def
893 .context(InvalidRawRegionRequestSnafu {
894 err: "missing column_def in AddColumn",
895 })?;
896
897 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
898 let location = add_column
899 .location
900 .map(AddColumnLocation::try_from)
901 .transpose()?;
902
903 Ok(AddColumn {
904 column_metadata,
905 location,
906 })
907 }
908}
909
910#[derive(Debug, PartialEq, Eq, Clone)]
912pub enum AddColumnLocation {
913 First,
915 After {
917 column_name: String,
919 },
920}
921
922impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
923 type Error = MetadataError;
924
925 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
926 let location_type = LocationType::try_from(location.location_type)
927 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
928 let add_column_location = match location_type {
929 LocationType::First => AddColumnLocation::First,
930 LocationType::After => AddColumnLocation::After {
931 column_name: location.after_column_name,
932 },
933 };
934
935 Ok(add_column_location)
936 }
937}
938
939#[derive(Debug, PartialEq, Eq, Clone)]
941pub struct ModifyColumnType {
942 pub column_name: String,
944 pub target_type: ConcreteDataType,
946}
947
948impl ModifyColumnType {
949 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
951 let column_meta = metadata
952 .column_by_name(&self.column_name)
953 .with_context(|| InvalidRegionRequestSnafu {
954 region_id: metadata.region_id,
955 err: format!("column {} not found", self.column_name),
956 })?;
957
958 ensure!(
959 matches!(column_meta.semantic_type, SemanticType::Field),
960 InvalidRegionRequestSnafu {
961 region_id: metadata.region_id,
962 err: "'timestamp' or 'tag' column cannot change type".to_string()
963 }
964 );
965 ensure!(
966 column_meta
967 .column_schema
968 .data_type
969 .can_arrow_type_cast_to(&self.target_type),
970 InvalidRegionRequestSnafu {
971 region_id: metadata.region_id,
972 err: format!(
973 "column '{}' cannot be cast automatically to type '{}'",
974 self.column_name, self.target_type
975 ),
976 }
977 );
978
979 Ok(())
980 }
981
982 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
984 debug_assert!(self.validate(metadata).is_ok());
985 metadata.column_by_name(&self.column_name).is_some()
986 }
987}
988
989impl From<v1::ModifyColumnType> for ModifyColumnType {
990 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
991 let target_type = ColumnDataTypeWrapper::new(
992 modify_column_type.target_type(),
993 modify_column_type.target_type_extension,
994 )
995 .into();
996
997 ModifyColumnType {
998 column_name: modify_column_type.column_name,
999 target_type,
1000 }
1001 }
1002}
1003
1004#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1005pub enum SetRegionOption {
1006 Ttl(Option<TimeToLive>),
1007 Twsc(String, String),
1009}
1010
1011impl TryFrom<&PbOption> for SetRegionOption {
1012 type Error = MetadataError;
1013
1014 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1015 let PbOption { key, value } = value;
1016 match key.as_str() {
1017 TTL_KEY => {
1018 let ttl = TimeToLive::from_humantime_or_str(value)
1019 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1020
1021 Ok(Self::Ttl(Some(ttl)))
1022 }
1023 TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1024 Ok(Self::Twsc(key.to_string(), value.to_string()))
1025 }
1026 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1027 }
1028 }
1029}
1030
1031impl From<&UnsetRegionOption> for SetRegionOption {
1032 fn from(unset_option: &UnsetRegionOption) -> Self {
1033 match unset_option {
1034 UnsetRegionOption::TwcsTriggerFileNum => {
1035 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1036 }
1037 UnsetRegionOption::TwcsMaxOutputFileSize => {
1038 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1039 }
1040 UnsetRegionOption::TwcsTimeWindow => {
1041 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1042 }
1043 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1044 }
1045 }
1046}
1047
1048impl TryFrom<&str> for UnsetRegionOption {
1049 type Error = MetadataError;
1050
1051 fn try_from(key: &str) -> Result<Self> {
1052 match key.to_ascii_lowercase().as_str() {
1053 TTL_KEY => Ok(Self::Ttl),
1054 TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1055 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1056 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1057 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1058 }
1059 }
1060}
1061
1062#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1063pub enum UnsetRegionOption {
1064 TwcsTriggerFileNum,
1065 TwcsMaxOutputFileSize,
1066 TwcsTimeWindow,
1067 Ttl,
1068}
1069
1070impl UnsetRegionOption {
1071 pub fn as_str(&self) -> &str {
1072 match self {
1073 Self::Ttl => TTL_KEY,
1074 Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1075 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1076 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1077 }
1078 }
1079}
1080
1081impl Display for UnsetRegionOption {
1082 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1083 write!(f, "{}", self.as_str())
1084 }
1085}
1086
1087#[derive(Debug, Clone, Default)]
1088pub struct RegionFlushRequest {
1089 pub row_group_size: Option<usize>,
1090}
1091
1092#[derive(Debug)]
1093pub struct RegionCompactRequest {
1094 pub options: compact_request::Options,
1095}
1096
1097impl Default for RegionCompactRequest {
1098 fn default() -> Self {
1099 Self {
1100 options: compact_request::Options::Regular(Default::default()),
1102 }
1103 }
1104}
1105
1106#[derive(Debug)]
1108pub struct RegionTruncateRequest {}
1109
1110#[derive(Debug, Clone, Copy)]
1115pub struct RegionCatchupRequest {
1116 pub set_writable: bool,
1118 pub entry_id: Option<entry::Id>,
1121 pub metadata_entry_id: Option<entry::Id>,
1125 pub location_id: Option<u64>,
1127}
1128
1129#[derive(Debug, Clone)]
1131pub struct RegionSequencesRequest {
1132 pub region_ids: Vec<RegionId>,
1133}
1134
1135#[derive(Debug, Clone)]
1136pub struct RegionBulkInsertsRequest {
1137 pub region_id: RegionId,
1138 pub payload: DfRecordBatch,
1139 pub raw_data: ArrowIpc,
1140}
1141
1142impl RegionBulkInsertsRequest {
1143 pub fn estimated_size(&self) -> usize {
1144 self.payload.get_array_memory_size()
1145 }
1146}
1147
1148impl fmt::Display for RegionRequest {
1149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1150 match self {
1151 RegionRequest::Put(_) => write!(f, "Put"),
1152 RegionRequest::Delete(_) => write!(f, "Delete"),
1153 RegionRequest::Create(_) => write!(f, "Create"),
1154 RegionRequest::Drop(_) => write!(f, "Drop"),
1155 RegionRequest::Open(_) => write!(f, "Open"),
1156 RegionRequest::Close(_) => write!(f, "Close"),
1157 RegionRequest::Alter(_) => write!(f, "Alter"),
1158 RegionRequest::Flush(_) => write!(f, "Flush"),
1159 RegionRequest::Compact(_) => write!(f, "Compact"),
1160 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1161 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1162 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1163 }
1164 }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169 use api::v1::region::RegionColumnDef;
1170 use api::v1::{ColumnDataType, ColumnDef};
1171 use datatypes::prelude::ConcreteDataType;
1172 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1173
1174 use super::*;
1175 use crate::metadata::RegionMetadataBuilder;
1176
1177 #[test]
1178 fn test_from_proto_location() {
1179 let proto_location = v1::AddColumnLocation {
1180 location_type: LocationType::First as i32,
1181 after_column_name: String::default(),
1182 };
1183 let location = AddColumnLocation::try_from(proto_location).unwrap();
1184 assert_eq!(location, AddColumnLocation::First);
1185
1186 let proto_location = v1::AddColumnLocation {
1187 location_type: 10,
1188 after_column_name: String::default(),
1189 };
1190 AddColumnLocation::try_from(proto_location).unwrap_err();
1191
1192 let proto_location = v1::AddColumnLocation {
1193 location_type: LocationType::After as i32,
1194 after_column_name: "a".to_string(),
1195 };
1196 let location = AddColumnLocation::try_from(proto_location).unwrap();
1197 assert_eq!(
1198 location,
1199 AddColumnLocation::After {
1200 column_name: "a".to_string()
1201 }
1202 );
1203 }
1204
1205 #[test]
1206 fn test_from_none_proto_add_column() {
1207 AddColumn::try_from(v1::region::AddColumn {
1208 column_def: None,
1209 location: None,
1210 })
1211 .unwrap_err();
1212 }
1213
1214 #[test]
1215 fn test_from_proto_alter_request() {
1216 RegionAlterRequest::try_from(AlterRequest {
1217 region_id: 0,
1218 schema_version: 1,
1219 kind: None,
1220 })
1221 .unwrap_err();
1222
1223 let request = RegionAlterRequest::try_from(AlterRequest {
1224 region_id: 0,
1225 schema_version: 1,
1226 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1227 add_columns: vec![v1::region::AddColumn {
1228 column_def: Some(RegionColumnDef {
1229 column_def: Some(ColumnDef {
1230 name: "a".to_string(),
1231 data_type: ColumnDataType::String as i32,
1232 is_nullable: true,
1233 default_constraint: vec![],
1234 semantic_type: SemanticType::Field as i32,
1235 comment: String::new(),
1236 ..Default::default()
1237 }),
1238 column_id: 1,
1239 }),
1240 location: Some(v1::AddColumnLocation {
1241 location_type: LocationType::First as i32,
1242 after_column_name: String::default(),
1243 }),
1244 }],
1245 })),
1246 })
1247 .unwrap();
1248
1249 assert_eq!(
1250 request,
1251 RegionAlterRequest {
1252 kind: AlterKind::AddColumns {
1253 columns: vec![AddColumn {
1254 column_metadata: ColumnMetadata {
1255 column_schema: ColumnSchema::new(
1256 "a",
1257 ConcreteDataType::string_datatype(),
1258 true,
1259 ),
1260 semantic_type: SemanticType::Field,
1261 column_id: 1,
1262 },
1263 location: Some(AddColumnLocation::First),
1264 }]
1265 },
1266 }
1267 );
1268 }
1269
1270 fn new_metadata() -> RegionMetadata {
1273 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1274 builder
1275 .push_column_metadata(ColumnMetadata {
1276 column_schema: ColumnSchema::new(
1277 "ts",
1278 ConcreteDataType::timestamp_millisecond_datatype(),
1279 false,
1280 ),
1281 semantic_type: SemanticType::Timestamp,
1282 column_id: 1,
1283 })
1284 .push_column_metadata(ColumnMetadata {
1285 column_schema: ColumnSchema::new(
1286 "tag_0",
1287 ConcreteDataType::string_datatype(),
1288 true,
1289 ),
1290 semantic_type: SemanticType::Tag,
1291 column_id: 2,
1292 })
1293 .push_column_metadata(ColumnMetadata {
1294 column_schema: ColumnSchema::new(
1295 "field_0",
1296 ConcreteDataType::string_datatype(),
1297 true,
1298 ),
1299 semantic_type: SemanticType::Field,
1300 column_id: 3,
1301 })
1302 .push_column_metadata(ColumnMetadata {
1303 column_schema: ColumnSchema::new(
1304 "field_1",
1305 ConcreteDataType::boolean_datatype(),
1306 true,
1307 ),
1308 semantic_type: SemanticType::Field,
1309 column_id: 4,
1310 })
1311 .primary_key(vec![2]);
1312 builder.build().unwrap()
1313 }
1314
1315 #[test]
1316 fn test_add_column_validate() {
1317 let metadata = new_metadata();
1318 let add_column = AddColumn {
1319 column_metadata: ColumnMetadata {
1320 column_schema: ColumnSchema::new(
1321 "tag_1",
1322 ConcreteDataType::string_datatype(),
1323 true,
1324 ),
1325 semantic_type: SemanticType::Tag,
1326 column_id: 5,
1327 },
1328 location: None,
1329 };
1330 add_column.validate(&metadata).unwrap();
1331 assert!(add_column.need_alter(&metadata));
1332
1333 AddColumn {
1335 column_metadata: ColumnMetadata {
1336 column_schema: ColumnSchema::new(
1337 "tag_1",
1338 ConcreteDataType::string_datatype(),
1339 false,
1340 ),
1341 semantic_type: SemanticType::Tag,
1342 column_id: 5,
1343 },
1344 location: None,
1345 }
1346 .validate(&metadata)
1347 .unwrap_err();
1348
1349 let add_column = AddColumn {
1351 column_metadata: ColumnMetadata {
1352 column_schema: ColumnSchema::new(
1353 "tag_0",
1354 ConcreteDataType::string_datatype(),
1355 true,
1356 ),
1357 semantic_type: SemanticType::Tag,
1358 column_id: 2,
1359 },
1360 location: None,
1361 };
1362 add_column.validate(&metadata).unwrap();
1363 assert!(!add_column.need_alter(&metadata));
1364 }
1365
1366 #[test]
1367 fn test_add_duplicate_columns() {
1368 let kind = AlterKind::AddColumns {
1369 columns: vec![
1370 AddColumn {
1371 column_metadata: ColumnMetadata {
1372 column_schema: ColumnSchema::new(
1373 "tag_1",
1374 ConcreteDataType::string_datatype(),
1375 true,
1376 ),
1377 semantic_type: SemanticType::Tag,
1378 column_id: 5,
1379 },
1380 location: None,
1381 },
1382 AddColumn {
1383 column_metadata: ColumnMetadata {
1384 column_schema: ColumnSchema::new(
1385 "tag_1",
1386 ConcreteDataType::string_datatype(),
1387 true,
1388 ),
1389 semantic_type: SemanticType::Field,
1390 column_id: 6,
1391 },
1392 location: None,
1393 },
1394 ],
1395 };
1396 let metadata = new_metadata();
1397 kind.validate(&metadata).unwrap();
1398 assert!(kind.need_alter(&metadata));
1399 }
1400
1401 #[test]
1402 fn test_add_existing_column_different_metadata() {
1403 let metadata = new_metadata();
1404
1405 let kind = AlterKind::AddColumns {
1407 columns: vec![AddColumn {
1408 column_metadata: ColumnMetadata {
1409 column_schema: ColumnSchema::new(
1410 "tag_0",
1411 ConcreteDataType::string_datatype(),
1412 true,
1413 ),
1414 semantic_type: SemanticType::Tag,
1415 column_id: 4,
1416 },
1417 location: None,
1418 }],
1419 };
1420 kind.validate(&metadata).unwrap_err();
1421
1422 let kind = AlterKind::AddColumns {
1424 columns: vec![AddColumn {
1425 column_metadata: ColumnMetadata {
1426 column_schema: ColumnSchema::new(
1427 "tag_0",
1428 ConcreteDataType::int64_datatype(),
1429 true,
1430 ),
1431 semantic_type: SemanticType::Tag,
1432 column_id: 2,
1433 },
1434 location: None,
1435 }],
1436 };
1437 kind.validate(&metadata).unwrap_err();
1438
1439 let kind = AlterKind::AddColumns {
1441 columns: vec![AddColumn {
1442 column_metadata: ColumnMetadata {
1443 column_schema: ColumnSchema::new(
1444 "tag_1",
1445 ConcreteDataType::string_datatype(),
1446 true,
1447 ),
1448 semantic_type: SemanticType::Tag,
1449 column_id: 2,
1450 },
1451 location: None,
1452 }],
1453 };
1454 kind.validate(&metadata).unwrap_err();
1455 }
1456
1457 #[test]
1458 fn test_add_existing_column_with_location() {
1459 let metadata = new_metadata();
1460 let kind = AlterKind::AddColumns {
1461 columns: vec![AddColumn {
1462 column_metadata: ColumnMetadata {
1463 column_schema: ColumnSchema::new(
1464 "tag_0",
1465 ConcreteDataType::string_datatype(),
1466 true,
1467 ),
1468 semantic_type: SemanticType::Tag,
1469 column_id: 2,
1470 },
1471 location: Some(AddColumnLocation::First),
1472 }],
1473 };
1474 kind.validate(&metadata).unwrap_err();
1475 }
1476
1477 #[test]
1478 fn test_validate_drop_column() {
1479 let metadata = new_metadata();
1480 let kind = AlterKind::DropColumns {
1481 names: vec!["xxxx".to_string()],
1482 };
1483 kind.validate(&metadata).unwrap();
1484 assert!(!kind.need_alter(&metadata));
1485
1486 AlterKind::DropColumns {
1487 names: vec!["tag_0".to_string()],
1488 }
1489 .validate(&metadata)
1490 .unwrap_err();
1491
1492 let kind = AlterKind::DropColumns {
1493 names: vec!["field_0".to_string()],
1494 };
1495 kind.validate(&metadata).unwrap();
1496 assert!(kind.need_alter(&metadata));
1497 }
1498
1499 #[test]
1500 fn test_validate_modify_column_type() {
1501 let metadata = new_metadata();
1502 AlterKind::ModifyColumnTypes {
1503 columns: vec![ModifyColumnType {
1504 column_name: "xxxx".to_string(),
1505 target_type: ConcreteDataType::string_datatype(),
1506 }],
1507 }
1508 .validate(&metadata)
1509 .unwrap_err();
1510
1511 AlterKind::ModifyColumnTypes {
1512 columns: vec![ModifyColumnType {
1513 column_name: "field_1".to_string(),
1514 target_type: ConcreteDataType::date_datatype(),
1515 }],
1516 }
1517 .validate(&metadata)
1518 .unwrap_err();
1519
1520 AlterKind::ModifyColumnTypes {
1521 columns: vec![ModifyColumnType {
1522 column_name: "ts".to_string(),
1523 target_type: ConcreteDataType::date_datatype(),
1524 }],
1525 }
1526 .validate(&metadata)
1527 .unwrap_err();
1528
1529 AlterKind::ModifyColumnTypes {
1530 columns: vec![ModifyColumnType {
1531 column_name: "tag_0".to_string(),
1532 target_type: ConcreteDataType::date_datatype(),
1533 }],
1534 }
1535 .validate(&metadata)
1536 .unwrap_err();
1537
1538 let kind = AlterKind::ModifyColumnTypes {
1539 columns: vec![ModifyColumnType {
1540 column_name: "field_0".to_string(),
1541 target_type: ConcreteDataType::int32_datatype(),
1542 }],
1543 };
1544 kind.validate(&metadata).unwrap();
1545 assert!(kind.need_alter(&metadata));
1546 }
1547
1548 #[test]
1549 fn test_validate_add_columns() {
1550 let kind = AlterKind::AddColumns {
1551 columns: vec![
1552 AddColumn {
1553 column_metadata: ColumnMetadata {
1554 column_schema: ColumnSchema::new(
1555 "tag_1",
1556 ConcreteDataType::string_datatype(),
1557 true,
1558 ),
1559 semantic_type: SemanticType::Tag,
1560 column_id: 5,
1561 },
1562 location: None,
1563 },
1564 AddColumn {
1565 column_metadata: ColumnMetadata {
1566 column_schema: ColumnSchema::new(
1567 "field_2",
1568 ConcreteDataType::string_datatype(),
1569 true,
1570 ),
1571 semantic_type: SemanticType::Field,
1572 column_id: 6,
1573 },
1574 location: None,
1575 },
1576 ],
1577 };
1578 let request = RegionAlterRequest { kind };
1579 let mut metadata = new_metadata();
1580 metadata.schema_version = 1;
1581 request.validate(&metadata).unwrap();
1582 }
1583
1584 #[test]
1585 fn test_validate_create_region() {
1586 let column_metadatas = vec![
1587 ColumnMetadata {
1588 column_schema: ColumnSchema::new(
1589 "ts",
1590 ConcreteDataType::timestamp_millisecond_datatype(),
1591 false,
1592 ),
1593 semantic_type: SemanticType::Timestamp,
1594 column_id: 1,
1595 },
1596 ColumnMetadata {
1597 column_schema: ColumnSchema::new(
1598 "tag_0",
1599 ConcreteDataType::string_datatype(),
1600 true,
1601 ),
1602 semantic_type: SemanticType::Tag,
1603 column_id: 2,
1604 },
1605 ColumnMetadata {
1606 column_schema: ColumnSchema::new(
1607 "field_0",
1608 ConcreteDataType::string_datatype(),
1609 true,
1610 ),
1611 semantic_type: SemanticType::Field,
1612 column_id: 3,
1613 },
1614 ];
1615 let create = RegionCreateRequest {
1616 engine: "mito".to_string(),
1617 column_metadatas,
1618 primary_key: vec![3, 4],
1619 options: HashMap::new(),
1620 region_dir: "path".to_string(),
1621 };
1622
1623 assert!(create.validate().is_err());
1624 }
1625
1626 #[test]
1627 fn test_validate_modify_column_fulltext_options() {
1628 let kind = AlterKind::SetIndex {
1629 options: ApiSetIndexOptions::Fulltext {
1630 column_name: "tag_0".to_string(),
1631 options: FulltextOptions {
1632 enable: true,
1633 analyzer: FulltextAnalyzer::Chinese,
1634 case_sensitive: false,
1635 backend: FulltextBackend::Bloom,
1636 },
1637 },
1638 };
1639 let request = RegionAlterRequest { kind };
1640 let mut metadata = new_metadata();
1641 metadata.schema_version = 1;
1642 request.validate(&metadata).unwrap();
1643
1644 let kind = AlterKind::UnsetIndex {
1645 options: ApiUnsetIndexOptions::Fulltext {
1646 column_name: "tag_0".to_string(),
1647 },
1648 };
1649 let request = RegionAlterRequest { kind };
1650 let mut metadata = new_metadata();
1651 metadata.schema_version = 1;
1652 request.validate(&metadata).unwrap();
1653 }
1654}