1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest,
26 CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest,
27 DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
28};
29use api::v1::{
30 self, set_index, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption,
31 Rows, SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::TimeToLive;
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use serde::{Deserialize, Serialize};
40use snafu::{ensure, OptionExt, ResultExt};
41use strum::{AsRefStr, IntoStaticStr};
42
43use crate::logstore::entry;
44use crate::metadata::{
45 ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidIndexOptionSnafu,
46 InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
47 InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu,
48};
49use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
50use crate::metrics;
51use crate::mito_engine_options::{
52 TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
53};
54use crate::path_utils::region_dir;
55use crate::storage::{ColumnId, RegionId, ScanRequest};
56
57#[derive(Debug, IntoStaticStr)]
58pub enum BatchRegionDdlRequest {
59 Create(Vec<(RegionId, RegionCreateRequest)>),
60 Drop(Vec<(RegionId, RegionDropRequest)>),
61 Alter(Vec<(RegionId, RegionAlterRequest)>),
62}
63
64impl BatchRegionDdlRequest {
65 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
67 match body {
68 region_request::Body::Creates(creates) => {
69 let requests = creates
70 .requests
71 .into_iter()
72 .map(parse_region_create)
73 .collect::<Result<Vec<_>>>()?;
74 Ok(Some(Self::Create(requests)))
75 }
76 region_request::Body::Drops(drops) => {
77 let requests = drops
78 .requests
79 .into_iter()
80 .map(parse_region_drop)
81 .collect::<Result<Vec<_>>>()?;
82 Ok(Some(Self::Drop(requests)))
83 }
84 region_request::Body::Alters(alters) => {
85 let requests = alters
86 .requests
87 .into_iter()
88 .map(parse_region_alter)
89 .collect::<Result<Vec<_>>>()?;
90 Ok(Some(Self::Alter(requests)))
91 }
92 _ => Ok(None),
93 }
94 }
95
96 pub fn request_type(&self) -> &'static str {
97 self.into()
98 }
99
100 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
101 match self {
102 Self::Create(requests) => requests
103 .into_iter()
104 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
105 .collect(),
106 Self::Drop(requests) => requests
107 .into_iter()
108 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
109 .collect(),
110 Self::Alter(requests) => requests
111 .into_iter()
112 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
113 .collect(),
114 }
115 }
116}
117
118#[derive(Debug, IntoStaticStr)]
119pub enum RegionRequest {
120 Put(RegionPutRequest),
121 Delete(RegionDeleteRequest),
122 Create(RegionCreateRequest),
123 Drop(RegionDropRequest),
124 Open(RegionOpenRequest),
125 Close(RegionCloseRequest),
126 Alter(RegionAlterRequest),
127 Flush(RegionFlushRequest),
128 Compact(RegionCompactRequest),
129 Truncate(RegionTruncateRequest),
130 Catchup(RegionCatchupRequest),
131 BulkInserts(RegionBulkInsertsRequest),
132}
133
134impl RegionRequest {
135 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
138 match body {
139 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
140 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
141 region_request::Body::Create(create) => make_region_create(create),
142 region_request::Body::Drop(drop) => make_region_drop(drop),
143 region_request::Body::Open(open) => make_region_open(open),
144 region_request::Body::Close(close) => make_region_close(close),
145 region_request::Body::Alter(alter) => make_region_alter(alter),
146 region_request::Body::Flush(flush) => make_region_flush(flush),
147 region_request::Body::Compact(compact) => make_region_compact(compact),
148 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
149 region_request::Body::Creates(creates) => make_region_creates(creates),
150 region_request::Body::Drops(drops) => make_region_drops(drops),
151 region_request::Body::Alters(alters) => make_region_alters(alters),
152 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
153 region_request::Body::Sync(_) => UnexpectedSnafu {
154 reason: "Sync request should be handled separately by RegionServer",
155 }
156 .fail(),
157 region_request::Body::ListMetadata(_) => UnexpectedSnafu {
158 reason: "ListMetadata request should be handled separately by RegionServer",
159 }
160 .fail(),
161 }
162 }
163
164 pub fn request_type(&self) -> &'static str {
166 self.into()
167 }
168}
169
170fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
171 let requests = inserts
172 .requests
173 .into_iter()
174 .filter_map(|r| {
175 let region_id = r.region_id.into();
176 r.rows.map(|rows| {
177 (
178 region_id,
179 RegionRequest::Put(RegionPutRequest { rows, hint: None }),
180 )
181 })
182 })
183 .collect();
184 Ok(requests)
185}
186
187fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
188 let requests = deletes
189 .requests
190 .into_iter()
191 .filter_map(|r| {
192 let region_id = r.region_id.into();
193 r.rows.map(|rows| {
194 (
195 region_id,
196 RegionRequest::Delete(RegionDeleteRequest { rows }),
197 )
198 })
199 })
200 .collect();
201 Ok(requests)
202}
203
204fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
205 let column_metadatas = create
206 .column_defs
207 .into_iter()
208 .map(ColumnMetadata::try_from_column_def)
209 .collect::<Result<Vec<_>>>()?;
210 let region_id = create.region_id.into();
211 let region_dir = region_dir(&create.path, region_id);
212 Ok((
213 region_id,
214 RegionCreateRequest {
215 engine: create.engine,
216 column_metadatas,
217 primary_key: create.primary_key,
218 options: create.options,
219 region_dir,
220 },
221 ))
222}
223
224fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
225 let (region_id, request) = parse_region_create(create)?;
226 Ok(vec![(region_id, RegionRequest::Create(request))])
227}
228
229fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
230 let mut requests = Vec::with_capacity(creates.requests.len());
231 for create in creates.requests {
232 requests.extend(make_region_create(create)?);
233 }
234 Ok(requests)
235}
236
237fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
238 let region_id = drop.region_id.into();
239 Ok((
240 region_id,
241 RegionDropRequest {
242 fast_path: drop.fast_path,
243 },
244 ))
245}
246
247fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
248 let (region_id, request) = parse_region_drop(drop)?;
249 Ok(vec![(region_id, RegionRequest::Drop(request))])
250}
251
252fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
253 let mut requests = Vec::with_capacity(drops.requests.len());
254 for drop in drops.requests {
255 requests.extend(make_region_drop(drop)?);
256 }
257 Ok(requests)
258}
259
260fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
261 let region_id = open.region_id.into();
262 let region_dir = region_dir(&open.path, region_id);
263 Ok(vec![(
264 region_id,
265 RegionRequest::Open(RegionOpenRequest {
266 engine: open.engine,
267 region_dir,
268 options: open.options,
269 skip_wal_replay: false,
270 }),
271 )])
272}
273
274fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
275 let region_id = close.region_id.into();
276 Ok(vec![(
277 region_id,
278 RegionRequest::Close(RegionCloseRequest {}),
279 )])
280}
281
282fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
283 let region_id = alter.region_id.into();
284 let request = RegionAlterRequest::try_from(alter)?;
285 Ok((region_id, request))
286}
287
288fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
289 let (region_id, request) = parse_region_alter(alter)?;
290 Ok(vec![(region_id, RegionRequest::Alter(request))])
291}
292
293fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
294 let mut requests = Vec::with_capacity(alters.requests.len());
295 for alter in alters.requests {
296 requests.extend(make_region_alter(alter)?);
297 }
298 Ok(requests)
299}
300
301fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
302 let region_id = flush.region_id.into();
303 Ok(vec![(
304 region_id,
305 RegionRequest::Flush(RegionFlushRequest {
306 row_group_size: None,
307 }),
308 )])
309}
310
311fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
312 let region_id = compact.region_id.into();
313 let options = compact
314 .options
315 .unwrap_or(compact_request::Options::Regular(Default::default()));
316 Ok(vec![(
317 region_id,
318 RegionRequest::Compact(RegionCompactRequest { options }),
319 )])
320}
321
322fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
323 let region_id = truncate.region_id.into();
324 Ok(vec![(
325 region_id,
326 RegionRequest::Truncate(RegionTruncateRequest {}),
327 )])
328}
329
330fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
332 let region_id = request.region_id.into();
333 let Some(Body::ArrowIpc(request)) = request.body else {
334 return Ok(vec![]);
335 };
336
337 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
338 .with_label_values(&["decode"])
339 .start_timer();
340 let mut decoder =
341 FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
342 let payload = decoder
343 .try_decode_record_batch(&request.data_header, &request.payload)
344 .context(FlightCodecSnafu)?;
345 decoder_timer.observe_duration();
346 Ok(vec![(
347 region_id,
348 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
349 region_id,
350 payload,
351 raw_data: request,
352 }),
353 )])
354}
355
356#[derive(Debug)]
358pub struct RegionPutRequest {
359 pub rows: Rows,
361 pub hint: Option<WriteHint>,
363}
364
365#[derive(Debug)]
366pub struct RegionReadRequest {
367 pub request: ScanRequest,
368}
369
370#[derive(Debug)]
372pub struct RegionDeleteRequest {
373 pub rows: Rows,
377}
378
379#[derive(Debug, Clone)]
380pub struct RegionCreateRequest {
381 pub engine: String,
383 pub column_metadatas: Vec<ColumnMetadata>,
385 pub primary_key: Vec<ColumnId>,
387 pub options: HashMap<String, String>,
389 pub region_dir: String,
391}
392
393impl RegionCreateRequest {
394 pub fn validate(&self) -> Result<()> {
396 ensure!(
398 self.column_metadatas
399 .iter()
400 .any(|x| x.semantic_type == SemanticType::Timestamp),
401 InvalidRegionRequestSnafu {
402 region_id: RegionId::new(0, 0),
403 err: "missing timestamp column in create region request".to_string(),
404 }
405 );
406
407 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
409 for (i, c) in self.column_metadatas.iter().enumerate() {
410 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
411 return InvalidRegionRequestSnafu {
412 region_id: RegionId::new(0, 0),
413 err: format!(
414 "duplicate column id {} (at position {} and {}) in create region request",
415 c.column_id, previous, i
416 ),
417 }
418 .fail();
419 }
420 }
421
422 for column_id in &self.primary_key {
424 ensure!(
425 column_id_to_indices.contains_key(column_id),
426 InvalidRegionRequestSnafu {
427 region_id: RegionId::new(0, 0),
428 err: format!(
429 "missing primary key column {} in create region request",
430 column_id
431 ),
432 }
433 );
434 }
435
436 Ok(())
437 }
438
439 pub fn is_physical_table(&self) -> bool {
441 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
442 }
443}
444
445#[derive(Debug, Clone)]
446pub struct RegionDropRequest {
447 pub fast_path: bool,
448}
449
450#[derive(Debug, Clone)]
452pub struct RegionOpenRequest {
453 pub engine: String,
455 pub region_dir: String,
457 pub options: HashMap<String, String>,
459 pub skip_wal_replay: bool,
461}
462
463impl RegionOpenRequest {
464 pub fn is_physical_table(&self) -> bool {
466 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
467 }
468}
469
470#[derive(Debug)]
472pub struct RegionCloseRequest {}
473
474#[derive(Debug, PartialEq, Eq, Clone)]
476pub struct RegionAlterRequest {
477 pub kind: AlterKind,
479}
480
481impl RegionAlterRequest {
482 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
484 self.kind.validate(metadata)?;
485
486 Ok(())
487 }
488
489 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
493 debug_assert!(self.validate(metadata).is_ok());
494 self.kind.need_alter(metadata)
495 }
496}
497
498impl TryFrom<AlterRequest> for RegionAlterRequest {
499 type Error = MetadataError;
500
501 fn try_from(value: AlterRequest) -> Result<Self> {
502 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
503 err: "missing kind in AlterRequest",
504 })?;
505
506 let kind = AlterKind::try_from(kind)?;
507 Ok(RegionAlterRequest { kind })
508 }
509}
510
511#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
513pub enum AlterKind {
514 AddColumns {
516 columns: Vec<AddColumn>,
518 },
519 DropColumns {
521 names: Vec<String>,
523 },
524 ModifyColumnTypes {
526 columns: Vec<ModifyColumnType>,
528 },
529 SetRegionOptions { options: Vec<SetRegionOption> },
531 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
533 SetIndex { options: ApiSetIndexOptions },
535 UnsetIndex { options: ApiUnsetIndexOptions },
537 DropDefaults {
539 names: Vec<String>,
541 },
542}
543
544#[derive(Debug, PartialEq, Eq, Clone)]
545pub enum ApiSetIndexOptions {
546 Fulltext {
547 column_name: String,
548 options: FulltextOptions,
549 },
550 Inverted {
551 column_name: String,
552 },
553 Skipping {
554 column_name: String,
555 options: SkippingIndexOptions,
556 },
557}
558
559impl ApiSetIndexOptions {
560 pub fn column_name(&self) -> &String {
561 match self {
562 ApiSetIndexOptions::Fulltext { column_name, .. } => column_name,
563 ApiSetIndexOptions::Inverted { column_name } => column_name,
564 ApiSetIndexOptions::Skipping { column_name, .. } => column_name,
565 }
566 }
567
568 pub fn is_fulltext(&self) -> bool {
569 match self {
570 ApiSetIndexOptions::Fulltext { .. } => true,
571 ApiSetIndexOptions::Inverted { .. } => false,
572 ApiSetIndexOptions::Skipping { .. } => false,
573 }
574 }
575}
576
577#[derive(Debug, PartialEq, Eq, Clone)]
578pub enum ApiUnsetIndexOptions {
579 Fulltext { column_name: String },
580 Inverted { column_name: String },
581 Skipping { column_name: String },
582}
583
584impl ApiUnsetIndexOptions {
585 pub fn column_name(&self) -> &String {
586 match self {
587 ApiUnsetIndexOptions::Fulltext { column_name } => column_name,
588 ApiUnsetIndexOptions::Inverted { column_name } => column_name,
589 ApiUnsetIndexOptions::Skipping { column_name } => column_name,
590 }
591 }
592
593 pub fn is_fulltext(&self) -> bool {
594 match self {
595 ApiUnsetIndexOptions::Fulltext { .. } => true,
596 ApiUnsetIndexOptions::Inverted { .. } => false,
597 ApiUnsetIndexOptions::Skipping { .. } => false,
598 }
599 }
600}
601
602impl AlterKind {
603 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
607 match self {
608 AlterKind::AddColumns { columns } => {
609 for col_to_add in columns {
610 col_to_add.validate(metadata)?;
611 }
612 }
613 AlterKind::DropColumns { names } => {
614 for name in names {
615 Self::validate_column_to_drop(name, metadata)?;
616 }
617 }
618 AlterKind::ModifyColumnTypes { columns } => {
619 for col_to_change in columns {
620 col_to_change.validate(metadata)?;
621 }
622 }
623 AlterKind::SetRegionOptions { .. } => {}
624 AlterKind::UnsetRegionOptions { .. } => {}
625 AlterKind::SetIndex { options } => {
626 Self::validate_column_alter_index_option(
627 options.column_name(),
628 metadata,
629 options.is_fulltext(),
630 )?;
631 }
632 AlterKind::UnsetIndex { options } => {
633 Self::validate_column_alter_index_option(
634 options.column_name(),
635 metadata,
636 options.is_fulltext(),
637 )?;
638 }
639 AlterKind::DropDefaults { names } => {
640 names
641 .iter()
642 .try_for_each(|name| Self::validate_column_to_drop(name, metadata))?;
643 }
644 }
645 Ok(())
646 }
647
648 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
650 debug_assert!(self.validate(metadata).is_ok());
651 match self {
652 AlterKind::AddColumns { columns } => columns
653 .iter()
654 .any(|col_to_add| col_to_add.need_alter(metadata)),
655 AlterKind::DropColumns { names } => names
656 .iter()
657 .any(|name| metadata.column_by_name(name).is_some()),
658 AlterKind::ModifyColumnTypes { columns } => columns
659 .iter()
660 .any(|col_to_change| col_to_change.need_alter(metadata)),
661 AlterKind::SetRegionOptions { .. } => {
662 true
665 }
666 AlterKind::UnsetRegionOptions { .. } => true,
667 AlterKind::SetIndex { options, .. } => {
668 metadata.column_by_name(options.column_name()).is_some()
669 }
670 AlterKind::UnsetIndex { options } => {
671 metadata.column_by_name(options.column_name()).is_some()
672 }
673 AlterKind::DropDefaults { names } => names
674 .iter()
675 .any(|name| metadata.column_by_name(name).is_some()),
676 }
677 }
678
679 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
681 let Some(column) = metadata.column_by_name(name) else {
682 return Ok(());
683 };
684 ensure!(
685 column.semantic_type == SemanticType::Field,
686 InvalidRegionRequestSnafu {
687 region_id: metadata.region_id,
688 err: format!("column {} is not a field and could not be dropped", name),
689 }
690 );
691 Ok(())
692 }
693
694 fn validate_column_alter_index_option(
696 column_name: &String,
697 metadata: &RegionMetadata,
698 is_fulltext: bool,
699 ) -> Result<()> {
700 let column = metadata
701 .column_by_name(column_name)
702 .context(InvalidRegionRequestSnafu {
703 region_id: metadata.region_id,
704 err: format!("column {} not found", column_name),
705 })?;
706
707 if is_fulltext {
708 ensure!(
709 column.column_schema.data_type.is_string(),
710 InvalidRegionRequestSnafu {
711 region_id: metadata.region_id,
712 err: format!(
713 "cannot change alter index options for non-string column {}",
714 column_name
715 ),
716 }
717 );
718 }
719
720 Ok(())
721 }
722}
723
724impl TryFrom<alter_request::Kind> for AlterKind {
725 type Error = MetadataError;
726
727 fn try_from(kind: alter_request::Kind) -> Result<Self> {
728 let alter_kind = match kind {
729 alter_request::Kind::AddColumns(x) => {
730 let columns = x
731 .add_columns
732 .into_iter()
733 .map(|x| x.try_into())
734 .collect::<Result<Vec<_>>>()?;
735 AlterKind::AddColumns { columns }
736 }
737 alter_request::Kind::ModifyColumnTypes(x) => {
738 let columns = x
739 .modify_column_types
740 .into_iter()
741 .map(|x| x.into())
742 .collect::<Vec<_>>();
743 AlterKind::ModifyColumnTypes { columns }
744 }
745 alter_request::Kind::DropColumns(x) => {
746 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
747 AlterKind::DropColumns { names }
748 }
749 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
750 options: options
751 .table_options
752 .iter()
753 .map(TryFrom::try_from)
754 .collect::<Result<Vec<_>>>()?,
755 },
756 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
757 keys: options
758 .keys
759 .iter()
760 .map(|key| UnsetRegionOption::try_from(key.as_str()))
761 .collect::<Result<Vec<_>>>()?,
762 },
763 alter_request::Kind::SetIndex(o) => match o.options.unwrap() {
764 set_index::Options::Fulltext(x) => AlterKind::SetIndex {
765 options: ApiSetIndexOptions::Fulltext {
766 column_name: x.column_name.clone(),
767 options: FulltextOptions::new(
768 x.enable,
769 as_fulltext_option_analyzer(
770 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
771 ),
772 x.case_sensitive,
773 as_fulltext_option_backend(
774 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
775 ),
776 x.granularity as u32,
777 x.false_positive_rate,
778 )
779 .context(InvalidIndexOptionSnafu)?,
780 },
781 },
782 set_index::Options::Inverted(i) => AlterKind::SetIndex {
783 options: ApiSetIndexOptions::Inverted {
784 column_name: i.column_name,
785 },
786 },
787 set_index::Options::Skipping(s) => AlterKind::SetIndex {
788 options: ApiSetIndexOptions::Skipping {
789 column_name: s.column_name,
790 options: SkippingIndexOptions::new(
791 s.granularity as u32,
792 s.false_positive_rate,
793 as_skipping_index_type(
794 PbSkippingIndexType::try_from(s.skipping_index_type)
795 .context(DecodeProtoSnafu)?,
796 ),
797 )
798 .context(InvalidIndexOptionSnafu)?,
799 },
800 },
801 },
802 alter_request::Kind::UnsetIndex(o) => match o.options.unwrap() {
803 v1::unset_index::Options::Fulltext(f) => AlterKind::UnsetIndex {
804 options: ApiUnsetIndexOptions::Fulltext {
805 column_name: f.column_name,
806 },
807 },
808 v1::unset_index::Options::Inverted(i) => AlterKind::UnsetIndex {
809 options: ApiUnsetIndexOptions::Inverted {
810 column_name: i.column_name,
811 },
812 },
813 v1::unset_index::Options::Skipping(s) => AlterKind::UnsetIndex {
814 options: ApiUnsetIndexOptions::Skipping {
815 column_name: s.column_name,
816 },
817 },
818 },
819 alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
820 names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
821 },
822 };
823
824 Ok(alter_kind)
825 }
826}
827
828#[derive(Debug, PartialEq, Eq, Clone)]
830pub struct AddColumn {
831 pub column_metadata: ColumnMetadata,
833 pub location: Option<AddColumnLocation>,
836}
837
838impl AddColumn {
839 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
844 ensure!(
845 self.column_metadata.column_schema.is_nullable()
846 || self
847 .column_metadata
848 .column_schema
849 .default_constraint()
850 .is_some(),
851 InvalidRegionRequestSnafu {
852 region_id: metadata.region_id,
853 err: format!(
854 "no default value for column {}",
855 self.column_metadata.column_schema.name
856 ),
857 }
858 );
859
860 if let Some(existing_column) =
861 metadata.column_by_name(&self.column_metadata.column_schema.name)
862 {
863 ensure!(
865 *existing_column == self.column_metadata,
866 InvalidRegionRequestSnafu {
867 region_id: metadata.region_id,
868 err: format!(
869 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
870 self.column_metadata.column_schema.name, existing_column, self.column_metadata,
871 ),
872 }
873 );
874 ensure!(
875 self.location.is_none(),
876 InvalidRegionRequestSnafu {
877 region_id: metadata.region_id,
878 err: format!(
879 "column {} already exists, but location is specified",
880 self.column_metadata.column_schema.name
881 ),
882 }
883 );
884 }
885
886 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
887 ensure!(
889 existing_column.column_schema.name == self.column_metadata.column_schema.name,
890 InvalidRegionRequestSnafu {
891 region_id: metadata.region_id,
892 err: format!(
893 "column id {} already exists with different name {}",
894 self.column_metadata.column_id, existing_column.column_schema.name
895 ),
896 }
897 );
898 }
899
900 Ok(())
901 }
902
903 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
905 debug_assert!(self.validate(metadata).is_ok());
906 metadata
907 .column_by_name(&self.column_metadata.column_schema.name)
908 .is_none()
909 }
910}
911
912impl TryFrom<v1::region::AddColumn> for AddColumn {
913 type Error = MetadataError;
914
915 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
916 let column_def = add_column
917 .column_def
918 .context(InvalidRawRegionRequestSnafu {
919 err: "missing column_def in AddColumn",
920 })?;
921
922 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
923 let location = add_column
924 .location
925 .map(AddColumnLocation::try_from)
926 .transpose()?;
927
928 Ok(AddColumn {
929 column_metadata,
930 location,
931 })
932 }
933}
934
935#[derive(Debug, PartialEq, Eq, Clone)]
937pub enum AddColumnLocation {
938 First,
940 After {
942 column_name: String,
944 },
945}
946
947impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
948 type Error = MetadataError;
949
950 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
951 let location_type = LocationType::try_from(location.location_type)
952 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
953 let add_column_location = match location_type {
954 LocationType::First => AddColumnLocation::First,
955 LocationType::After => AddColumnLocation::After {
956 column_name: location.after_column_name,
957 },
958 };
959
960 Ok(add_column_location)
961 }
962}
963
964#[derive(Debug, PartialEq, Eq, Clone)]
966pub struct ModifyColumnType {
967 pub column_name: String,
969 pub target_type: ConcreteDataType,
971}
972
973impl ModifyColumnType {
974 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
976 let column_meta = metadata
977 .column_by_name(&self.column_name)
978 .with_context(|| InvalidRegionRequestSnafu {
979 region_id: metadata.region_id,
980 err: format!("column {} not found", self.column_name),
981 })?;
982
983 ensure!(
984 matches!(column_meta.semantic_type, SemanticType::Field),
985 InvalidRegionRequestSnafu {
986 region_id: metadata.region_id,
987 err: "'timestamp' or 'tag' column cannot change type".to_string()
988 }
989 );
990 ensure!(
991 column_meta
992 .column_schema
993 .data_type
994 .can_arrow_type_cast_to(&self.target_type),
995 InvalidRegionRequestSnafu {
996 region_id: metadata.region_id,
997 err: format!(
998 "column '{}' cannot be cast automatically to type '{}'",
999 self.column_name, self.target_type
1000 ),
1001 }
1002 );
1003
1004 Ok(())
1005 }
1006
1007 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1009 debug_assert!(self.validate(metadata).is_ok());
1010 metadata.column_by_name(&self.column_name).is_some()
1011 }
1012}
1013
1014impl From<v1::ModifyColumnType> for ModifyColumnType {
1015 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1016 let target_type = ColumnDataTypeWrapper::new(
1017 modify_column_type.target_type(),
1018 modify_column_type.target_type_extension,
1019 )
1020 .into();
1021
1022 ModifyColumnType {
1023 column_name: modify_column_type.column_name,
1024 target_type,
1025 }
1026 }
1027}
1028
1029#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1030pub enum SetRegionOption {
1031 Ttl(Option<TimeToLive>),
1032 Twsc(String, String),
1034}
1035
1036impl TryFrom<&PbOption> for SetRegionOption {
1037 type Error = MetadataError;
1038
1039 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1040 let PbOption { key, value } = value;
1041 match key.as_str() {
1042 TTL_KEY => {
1043 let ttl = TimeToLive::from_humantime_or_str(value)
1044 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1045
1046 Ok(Self::Ttl(Some(ttl)))
1047 }
1048 TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1049 Ok(Self::Twsc(key.to_string(), value.to_string()))
1050 }
1051 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1052 }
1053 }
1054}
1055
1056impl From<&UnsetRegionOption> for SetRegionOption {
1057 fn from(unset_option: &UnsetRegionOption) -> Self {
1058 match unset_option {
1059 UnsetRegionOption::TwcsTriggerFileNum => {
1060 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1061 }
1062 UnsetRegionOption::TwcsMaxOutputFileSize => {
1063 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1064 }
1065 UnsetRegionOption::TwcsTimeWindow => {
1066 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1067 }
1068 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1069 }
1070 }
1071}
1072
1073impl TryFrom<&str> for UnsetRegionOption {
1074 type Error = MetadataError;
1075
1076 fn try_from(key: &str) -> Result<Self> {
1077 match key.to_ascii_lowercase().as_str() {
1078 TTL_KEY => Ok(Self::Ttl),
1079 TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1080 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1081 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1082 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1083 }
1084 }
1085}
1086
1087#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1088pub enum UnsetRegionOption {
1089 TwcsTriggerFileNum,
1090 TwcsMaxOutputFileSize,
1091 TwcsTimeWindow,
1092 Ttl,
1093}
1094
1095impl UnsetRegionOption {
1096 pub fn as_str(&self) -> &str {
1097 match self {
1098 Self::Ttl => TTL_KEY,
1099 Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1100 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1101 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1102 }
1103 }
1104}
1105
1106impl Display for UnsetRegionOption {
1107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1108 write!(f, "{}", self.as_str())
1109 }
1110}
1111
1112#[derive(Debug, Clone, Default)]
1113pub struct RegionFlushRequest {
1114 pub row_group_size: Option<usize>,
1115}
1116
1117#[derive(Debug)]
1118pub struct RegionCompactRequest {
1119 pub options: compact_request::Options,
1120}
1121
1122impl Default for RegionCompactRequest {
1123 fn default() -> Self {
1124 Self {
1125 options: compact_request::Options::Regular(Default::default()),
1127 }
1128 }
1129}
1130
1131#[derive(Debug)]
1133pub struct RegionTruncateRequest {}
1134
1135#[derive(Debug, Clone, Copy)]
1140pub struct RegionCatchupRequest {
1141 pub set_writable: bool,
1143 pub entry_id: Option<entry::Id>,
1146 pub metadata_entry_id: Option<entry::Id>,
1150 pub location_id: Option<u64>,
1152}
1153
1154#[derive(Debug, Clone)]
1156pub struct RegionSequencesRequest {
1157 pub region_ids: Vec<RegionId>,
1158}
1159
1160#[derive(Debug, Clone)]
1161pub struct RegionBulkInsertsRequest {
1162 pub region_id: RegionId,
1163 pub payload: DfRecordBatch,
1164 pub raw_data: ArrowIpc,
1165}
1166
1167impl RegionBulkInsertsRequest {
1168 pub fn estimated_size(&self) -> usize {
1169 self.payload.get_array_memory_size()
1170 }
1171}
1172
1173impl fmt::Display for RegionRequest {
1174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1175 match self {
1176 RegionRequest::Put(_) => write!(f, "Put"),
1177 RegionRequest::Delete(_) => write!(f, "Delete"),
1178 RegionRequest::Create(_) => write!(f, "Create"),
1179 RegionRequest::Drop(_) => write!(f, "Drop"),
1180 RegionRequest::Open(_) => write!(f, "Open"),
1181 RegionRequest::Close(_) => write!(f, "Close"),
1182 RegionRequest::Alter(_) => write!(f, "Alter"),
1183 RegionRequest::Flush(_) => write!(f, "Flush"),
1184 RegionRequest::Compact(_) => write!(f, "Compact"),
1185 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1186 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1187 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1188 }
1189 }
1190}
1191
1192#[cfg(test)]
1193mod tests {
1194 use api::v1::region::RegionColumnDef;
1195 use api::v1::{ColumnDataType, ColumnDef};
1196 use datatypes::prelude::ConcreteDataType;
1197 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1198
1199 use super::*;
1200 use crate::metadata::RegionMetadataBuilder;
1201
1202 #[test]
1203 fn test_from_proto_location() {
1204 let proto_location = v1::AddColumnLocation {
1205 location_type: LocationType::First as i32,
1206 after_column_name: String::default(),
1207 };
1208 let location = AddColumnLocation::try_from(proto_location).unwrap();
1209 assert_eq!(location, AddColumnLocation::First);
1210
1211 let proto_location = v1::AddColumnLocation {
1212 location_type: 10,
1213 after_column_name: String::default(),
1214 };
1215 AddColumnLocation::try_from(proto_location).unwrap_err();
1216
1217 let proto_location = v1::AddColumnLocation {
1218 location_type: LocationType::After as i32,
1219 after_column_name: "a".to_string(),
1220 };
1221 let location = AddColumnLocation::try_from(proto_location).unwrap();
1222 assert_eq!(
1223 location,
1224 AddColumnLocation::After {
1225 column_name: "a".to_string()
1226 }
1227 );
1228 }
1229
1230 #[test]
1231 fn test_from_none_proto_add_column() {
1232 AddColumn::try_from(v1::region::AddColumn {
1233 column_def: None,
1234 location: None,
1235 })
1236 .unwrap_err();
1237 }
1238
1239 #[test]
1240 fn test_from_proto_alter_request() {
1241 RegionAlterRequest::try_from(AlterRequest {
1242 region_id: 0,
1243 schema_version: 1,
1244 kind: None,
1245 })
1246 .unwrap_err();
1247
1248 let request = RegionAlterRequest::try_from(AlterRequest {
1249 region_id: 0,
1250 schema_version: 1,
1251 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1252 add_columns: vec![v1::region::AddColumn {
1253 column_def: Some(RegionColumnDef {
1254 column_def: Some(ColumnDef {
1255 name: "a".to_string(),
1256 data_type: ColumnDataType::String as i32,
1257 is_nullable: true,
1258 default_constraint: vec![],
1259 semantic_type: SemanticType::Field as i32,
1260 comment: String::new(),
1261 ..Default::default()
1262 }),
1263 column_id: 1,
1264 }),
1265 location: Some(v1::AddColumnLocation {
1266 location_type: LocationType::First as i32,
1267 after_column_name: String::default(),
1268 }),
1269 }],
1270 })),
1271 })
1272 .unwrap();
1273
1274 assert_eq!(
1275 request,
1276 RegionAlterRequest {
1277 kind: AlterKind::AddColumns {
1278 columns: vec![AddColumn {
1279 column_metadata: ColumnMetadata {
1280 column_schema: ColumnSchema::new(
1281 "a",
1282 ConcreteDataType::string_datatype(),
1283 true,
1284 ),
1285 semantic_type: SemanticType::Field,
1286 column_id: 1,
1287 },
1288 location: Some(AddColumnLocation::First),
1289 }]
1290 },
1291 }
1292 );
1293 }
1294
1295 fn new_metadata() -> RegionMetadata {
1298 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1299 builder
1300 .push_column_metadata(ColumnMetadata {
1301 column_schema: ColumnSchema::new(
1302 "ts",
1303 ConcreteDataType::timestamp_millisecond_datatype(),
1304 false,
1305 ),
1306 semantic_type: SemanticType::Timestamp,
1307 column_id: 1,
1308 })
1309 .push_column_metadata(ColumnMetadata {
1310 column_schema: ColumnSchema::new(
1311 "tag_0",
1312 ConcreteDataType::string_datatype(),
1313 true,
1314 ),
1315 semantic_type: SemanticType::Tag,
1316 column_id: 2,
1317 })
1318 .push_column_metadata(ColumnMetadata {
1319 column_schema: ColumnSchema::new(
1320 "field_0",
1321 ConcreteDataType::string_datatype(),
1322 true,
1323 ),
1324 semantic_type: SemanticType::Field,
1325 column_id: 3,
1326 })
1327 .push_column_metadata(ColumnMetadata {
1328 column_schema: ColumnSchema::new(
1329 "field_1",
1330 ConcreteDataType::boolean_datatype(),
1331 true,
1332 ),
1333 semantic_type: SemanticType::Field,
1334 column_id: 4,
1335 })
1336 .primary_key(vec![2]);
1337 builder.build().unwrap()
1338 }
1339
1340 #[test]
1341 fn test_add_column_validate() {
1342 let metadata = new_metadata();
1343 let add_column = AddColumn {
1344 column_metadata: ColumnMetadata {
1345 column_schema: ColumnSchema::new(
1346 "tag_1",
1347 ConcreteDataType::string_datatype(),
1348 true,
1349 ),
1350 semantic_type: SemanticType::Tag,
1351 column_id: 5,
1352 },
1353 location: None,
1354 };
1355 add_column.validate(&metadata).unwrap();
1356 assert!(add_column.need_alter(&metadata));
1357
1358 AddColumn {
1360 column_metadata: ColumnMetadata {
1361 column_schema: ColumnSchema::new(
1362 "tag_1",
1363 ConcreteDataType::string_datatype(),
1364 false,
1365 ),
1366 semantic_type: SemanticType::Tag,
1367 column_id: 5,
1368 },
1369 location: None,
1370 }
1371 .validate(&metadata)
1372 .unwrap_err();
1373
1374 let add_column = AddColumn {
1376 column_metadata: ColumnMetadata {
1377 column_schema: ColumnSchema::new(
1378 "tag_0",
1379 ConcreteDataType::string_datatype(),
1380 true,
1381 ),
1382 semantic_type: SemanticType::Tag,
1383 column_id: 2,
1384 },
1385 location: None,
1386 };
1387 add_column.validate(&metadata).unwrap();
1388 assert!(!add_column.need_alter(&metadata));
1389 }
1390
1391 #[test]
1392 fn test_add_duplicate_columns() {
1393 let kind = AlterKind::AddColumns {
1394 columns: vec![
1395 AddColumn {
1396 column_metadata: ColumnMetadata {
1397 column_schema: ColumnSchema::new(
1398 "tag_1",
1399 ConcreteDataType::string_datatype(),
1400 true,
1401 ),
1402 semantic_type: SemanticType::Tag,
1403 column_id: 5,
1404 },
1405 location: None,
1406 },
1407 AddColumn {
1408 column_metadata: ColumnMetadata {
1409 column_schema: ColumnSchema::new(
1410 "tag_1",
1411 ConcreteDataType::string_datatype(),
1412 true,
1413 ),
1414 semantic_type: SemanticType::Field,
1415 column_id: 6,
1416 },
1417 location: None,
1418 },
1419 ],
1420 };
1421 let metadata = new_metadata();
1422 kind.validate(&metadata).unwrap();
1423 assert!(kind.need_alter(&metadata));
1424 }
1425
1426 #[test]
1427 fn test_add_existing_column_different_metadata() {
1428 let metadata = new_metadata();
1429
1430 let kind = AlterKind::AddColumns {
1432 columns: vec![AddColumn {
1433 column_metadata: ColumnMetadata {
1434 column_schema: ColumnSchema::new(
1435 "tag_0",
1436 ConcreteDataType::string_datatype(),
1437 true,
1438 ),
1439 semantic_type: SemanticType::Tag,
1440 column_id: 4,
1441 },
1442 location: None,
1443 }],
1444 };
1445 kind.validate(&metadata).unwrap_err();
1446
1447 let kind = AlterKind::AddColumns {
1449 columns: vec![AddColumn {
1450 column_metadata: ColumnMetadata {
1451 column_schema: ColumnSchema::new(
1452 "tag_0",
1453 ConcreteDataType::int64_datatype(),
1454 true,
1455 ),
1456 semantic_type: SemanticType::Tag,
1457 column_id: 2,
1458 },
1459 location: None,
1460 }],
1461 };
1462 kind.validate(&metadata).unwrap_err();
1463
1464 let kind = AlterKind::AddColumns {
1466 columns: vec![AddColumn {
1467 column_metadata: ColumnMetadata {
1468 column_schema: ColumnSchema::new(
1469 "tag_1",
1470 ConcreteDataType::string_datatype(),
1471 true,
1472 ),
1473 semantic_type: SemanticType::Tag,
1474 column_id: 2,
1475 },
1476 location: None,
1477 }],
1478 };
1479 kind.validate(&metadata).unwrap_err();
1480 }
1481
1482 #[test]
1483 fn test_add_existing_column_with_location() {
1484 let metadata = new_metadata();
1485 let kind = AlterKind::AddColumns {
1486 columns: vec![AddColumn {
1487 column_metadata: ColumnMetadata {
1488 column_schema: ColumnSchema::new(
1489 "tag_0",
1490 ConcreteDataType::string_datatype(),
1491 true,
1492 ),
1493 semantic_type: SemanticType::Tag,
1494 column_id: 2,
1495 },
1496 location: Some(AddColumnLocation::First),
1497 }],
1498 };
1499 kind.validate(&metadata).unwrap_err();
1500 }
1501
1502 #[test]
1503 fn test_validate_drop_column() {
1504 let metadata = new_metadata();
1505 let kind = AlterKind::DropColumns {
1506 names: vec!["xxxx".to_string()],
1507 };
1508 kind.validate(&metadata).unwrap();
1509 assert!(!kind.need_alter(&metadata));
1510
1511 AlterKind::DropColumns {
1512 names: vec!["tag_0".to_string()],
1513 }
1514 .validate(&metadata)
1515 .unwrap_err();
1516
1517 let kind = AlterKind::DropColumns {
1518 names: vec!["field_0".to_string()],
1519 };
1520 kind.validate(&metadata).unwrap();
1521 assert!(kind.need_alter(&metadata));
1522 }
1523
1524 #[test]
1525 fn test_validate_modify_column_type() {
1526 let metadata = new_metadata();
1527 AlterKind::ModifyColumnTypes {
1528 columns: vec![ModifyColumnType {
1529 column_name: "xxxx".to_string(),
1530 target_type: ConcreteDataType::string_datatype(),
1531 }],
1532 }
1533 .validate(&metadata)
1534 .unwrap_err();
1535
1536 AlterKind::ModifyColumnTypes {
1537 columns: vec![ModifyColumnType {
1538 column_name: "field_1".to_string(),
1539 target_type: ConcreteDataType::date_datatype(),
1540 }],
1541 }
1542 .validate(&metadata)
1543 .unwrap_err();
1544
1545 AlterKind::ModifyColumnTypes {
1546 columns: vec![ModifyColumnType {
1547 column_name: "ts".to_string(),
1548 target_type: ConcreteDataType::date_datatype(),
1549 }],
1550 }
1551 .validate(&metadata)
1552 .unwrap_err();
1553
1554 AlterKind::ModifyColumnTypes {
1555 columns: vec![ModifyColumnType {
1556 column_name: "tag_0".to_string(),
1557 target_type: ConcreteDataType::date_datatype(),
1558 }],
1559 }
1560 .validate(&metadata)
1561 .unwrap_err();
1562
1563 let kind = AlterKind::ModifyColumnTypes {
1564 columns: vec![ModifyColumnType {
1565 column_name: "field_0".to_string(),
1566 target_type: ConcreteDataType::int32_datatype(),
1567 }],
1568 };
1569 kind.validate(&metadata).unwrap();
1570 assert!(kind.need_alter(&metadata));
1571 }
1572
1573 #[test]
1574 fn test_validate_add_columns() {
1575 let kind = AlterKind::AddColumns {
1576 columns: vec![
1577 AddColumn {
1578 column_metadata: ColumnMetadata {
1579 column_schema: ColumnSchema::new(
1580 "tag_1",
1581 ConcreteDataType::string_datatype(),
1582 true,
1583 ),
1584 semantic_type: SemanticType::Tag,
1585 column_id: 5,
1586 },
1587 location: None,
1588 },
1589 AddColumn {
1590 column_metadata: ColumnMetadata {
1591 column_schema: ColumnSchema::new(
1592 "field_2",
1593 ConcreteDataType::string_datatype(),
1594 true,
1595 ),
1596 semantic_type: SemanticType::Field,
1597 column_id: 6,
1598 },
1599 location: None,
1600 },
1601 ],
1602 };
1603 let request = RegionAlterRequest { kind };
1604 let mut metadata = new_metadata();
1605 metadata.schema_version = 1;
1606 request.validate(&metadata).unwrap();
1607 }
1608
1609 #[test]
1610 fn test_validate_create_region() {
1611 let column_metadatas = vec![
1612 ColumnMetadata {
1613 column_schema: ColumnSchema::new(
1614 "ts",
1615 ConcreteDataType::timestamp_millisecond_datatype(),
1616 false,
1617 ),
1618 semantic_type: SemanticType::Timestamp,
1619 column_id: 1,
1620 },
1621 ColumnMetadata {
1622 column_schema: ColumnSchema::new(
1623 "tag_0",
1624 ConcreteDataType::string_datatype(),
1625 true,
1626 ),
1627 semantic_type: SemanticType::Tag,
1628 column_id: 2,
1629 },
1630 ColumnMetadata {
1631 column_schema: ColumnSchema::new(
1632 "field_0",
1633 ConcreteDataType::string_datatype(),
1634 true,
1635 ),
1636 semantic_type: SemanticType::Field,
1637 column_id: 3,
1638 },
1639 ];
1640 let create = RegionCreateRequest {
1641 engine: "mito".to_string(),
1642 column_metadatas,
1643 primary_key: vec![3, 4],
1644 options: HashMap::new(),
1645 region_dir: "path".to_string(),
1646 };
1647
1648 assert!(create.validate().is_err());
1649 }
1650
1651 #[test]
1652 fn test_validate_modify_column_fulltext_options() {
1653 let kind = AlterKind::SetIndex {
1654 options: ApiSetIndexOptions::Fulltext {
1655 column_name: "tag_0".to_string(),
1656 options: FulltextOptions::new_unchecked(
1657 true,
1658 FulltextAnalyzer::Chinese,
1659 false,
1660 FulltextBackend::Bloom,
1661 1000,
1662 0.01,
1663 ),
1664 },
1665 };
1666 let request = RegionAlterRequest { kind };
1667 let mut metadata = new_metadata();
1668 metadata.schema_version = 1;
1669 request.validate(&metadata).unwrap();
1670
1671 let kind = AlterKind::UnsetIndex {
1672 options: ApiUnsetIndexOptions::Fulltext {
1673 column_name: "tag_0".to_string(),
1674 },
1675 };
1676 let request = RegionAlterRequest { kind };
1677 let mut metadata = new_metadata();
1678 metadata.schema_version = 1;
1679 request.validate(&metadata).unwrap();
1680 }
1681}