1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest,
26 CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest,
27 DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
28};
29use api::v1::{
30 self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
31 SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::{FlightDecoder, FlightMessage};
35use common_grpc::FlightData;
36use common_recordbatch::DfRecordBatch;
37use common_time::TimeToLive;
38use datatypes::prelude::ConcreteDataType;
39use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
40use prost::Message;
41use serde::{Deserialize, Serialize};
42use snafu::{ensure, OptionExt, ResultExt};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47 ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidRawRegionRequestSnafu,
48 InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
49 InvalidUnsetRegionOptionRequestSnafu, MetadataError, ProstSnafu, RegionMetadata, Result,
50 UnexpectedSnafu,
51};
52use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
53use crate::metrics;
54use crate::mito_engine_options::{
55 TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS,
56 TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE,
57 TWCS_TIME_WINDOW,
58};
59use crate::path_utils::region_dir;
60use crate::storage::{ColumnId, RegionId, ScanRequest};
61
62#[derive(Debug, IntoStaticStr)]
63pub enum BatchRegionDdlRequest {
64 Create(Vec<(RegionId, RegionCreateRequest)>),
65 Drop(Vec<(RegionId, RegionDropRequest)>),
66 Alter(Vec<(RegionId, RegionAlterRequest)>),
67}
68
69impl BatchRegionDdlRequest {
70 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
72 match body {
73 region_request::Body::Creates(creates) => {
74 let requests = creates
75 .requests
76 .into_iter()
77 .map(parse_region_create)
78 .collect::<Result<Vec<_>>>()?;
79 Ok(Some(Self::Create(requests)))
80 }
81 region_request::Body::Drops(drops) => {
82 let requests = drops
83 .requests
84 .into_iter()
85 .map(parse_region_drop)
86 .collect::<Result<Vec<_>>>()?;
87 Ok(Some(Self::Drop(requests)))
88 }
89 region_request::Body::Alters(alters) => {
90 let requests = alters
91 .requests
92 .into_iter()
93 .map(parse_region_alter)
94 .collect::<Result<Vec<_>>>()?;
95 Ok(Some(Self::Alter(requests)))
96 }
97 _ => Ok(None),
98 }
99 }
100
101 pub fn request_type(&self) -> &'static str {
102 self.into()
103 }
104
105 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
106 match self {
107 Self::Create(requests) => requests
108 .into_iter()
109 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
110 .collect(),
111 Self::Drop(requests) => requests
112 .into_iter()
113 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
114 .collect(),
115 Self::Alter(requests) => requests
116 .into_iter()
117 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
118 .collect(),
119 }
120 }
121}
122
123#[derive(Debug, IntoStaticStr)]
124pub enum RegionRequest {
125 Put(RegionPutRequest),
126 Delete(RegionDeleteRequest),
127 Create(RegionCreateRequest),
128 Drop(RegionDropRequest),
129 Open(RegionOpenRequest),
130 Close(RegionCloseRequest),
131 Alter(RegionAlterRequest),
132 Flush(RegionFlushRequest),
133 Compact(RegionCompactRequest),
134 Truncate(RegionTruncateRequest),
135 Catchup(RegionCatchupRequest),
136 BulkInserts(RegionBulkInsertsRequest),
137}
138
139impl RegionRequest {
140 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
143 match body {
144 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
145 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
146 region_request::Body::Create(create) => make_region_create(create),
147 region_request::Body::Drop(drop) => make_region_drop(drop),
148 region_request::Body::Open(open) => make_region_open(open),
149 region_request::Body::Close(close) => make_region_close(close),
150 region_request::Body::Alter(alter) => make_region_alter(alter),
151 region_request::Body::Flush(flush) => make_region_flush(flush),
152 region_request::Body::Compact(compact) => make_region_compact(compact),
153 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
154 region_request::Body::Creates(creates) => make_region_creates(creates),
155 region_request::Body::Drops(drops) => make_region_drops(drops),
156 region_request::Body::Alters(alters) => make_region_alters(alters),
157 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
158 region_request::Body::Sync(_) => UnexpectedSnafu {
159 reason: "Sync request should be handled separately by RegionServer",
160 }
161 .fail(),
162 }
163 }
164
165 pub fn request_type(&self) -> &'static str {
167 self.into()
168 }
169}
170
171fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
172 let requests = inserts
173 .requests
174 .into_iter()
175 .filter_map(|r| {
176 let region_id = r.region_id.into();
177 r.rows.map(|rows| {
178 (
179 region_id,
180 RegionRequest::Put(RegionPutRequest { rows, hint: None }),
181 )
182 })
183 })
184 .collect();
185 Ok(requests)
186}
187
188fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
189 let requests = deletes
190 .requests
191 .into_iter()
192 .filter_map(|r| {
193 let region_id = r.region_id.into();
194 r.rows.map(|rows| {
195 (
196 region_id,
197 RegionRequest::Delete(RegionDeleteRequest { rows }),
198 )
199 })
200 })
201 .collect();
202 Ok(requests)
203}
204
205fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
206 let column_metadatas = create
207 .column_defs
208 .into_iter()
209 .map(ColumnMetadata::try_from_column_def)
210 .collect::<Result<Vec<_>>>()?;
211 let region_id = create.region_id.into();
212 let region_dir = region_dir(&create.path, region_id);
213 Ok((
214 region_id,
215 RegionCreateRequest {
216 engine: create.engine,
217 column_metadatas,
218 primary_key: create.primary_key,
219 options: create.options,
220 region_dir,
221 },
222 ))
223}
224
225fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
226 let (region_id, request) = parse_region_create(create)?;
227 Ok(vec![(region_id, RegionRequest::Create(request))])
228}
229
230fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
231 let mut requests = Vec::with_capacity(creates.requests.len());
232 for create in creates.requests {
233 requests.extend(make_region_create(create)?);
234 }
235 Ok(requests)
236}
237
238fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
239 let region_id = drop.region_id.into();
240 Ok((
241 region_id,
242 RegionDropRequest {
243 fast_path: drop.fast_path,
244 },
245 ))
246}
247
248fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
249 let (region_id, request) = parse_region_drop(drop)?;
250 Ok(vec![(region_id, RegionRequest::Drop(request))])
251}
252
253fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
254 let mut requests = Vec::with_capacity(drops.requests.len());
255 for drop in drops.requests {
256 requests.extend(make_region_drop(drop)?);
257 }
258 Ok(requests)
259}
260
261fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
262 let region_id = open.region_id.into();
263 let region_dir = region_dir(&open.path, region_id);
264 Ok(vec![(
265 region_id,
266 RegionRequest::Open(RegionOpenRequest {
267 engine: open.engine,
268 region_dir,
269 options: open.options,
270 skip_wal_replay: false,
271 }),
272 )])
273}
274
275fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
276 let region_id = close.region_id.into();
277 Ok(vec![(
278 region_id,
279 RegionRequest::Close(RegionCloseRequest {}),
280 )])
281}
282
283fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
284 let region_id = alter.region_id.into();
285 let request = RegionAlterRequest::try_from(alter)?;
286 Ok((region_id, request))
287}
288
289fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
290 let (region_id, request) = parse_region_alter(alter)?;
291 Ok(vec![(region_id, RegionRequest::Alter(request))])
292}
293
294fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
295 let mut requests = Vec::with_capacity(alters.requests.len());
296 for alter in alters.requests {
297 requests.extend(make_region_alter(alter)?);
298 }
299 Ok(requests)
300}
301
302fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
303 let region_id = flush.region_id.into();
304 Ok(vec![(
305 region_id,
306 RegionRequest::Flush(RegionFlushRequest {
307 row_group_size: None,
308 }),
309 )])
310}
311
312fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
313 let region_id = compact.region_id.into();
314 let options = compact
315 .options
316 .unwrap_or(compact_request::Options::Regular(Default::default()));
317 Ok(vec![(
318 region_id,
319 RegionRequest::Compact(RegionCompactRequest { options }),
320 )])
321}
322
323fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
324 let region_id = truncate.region_id.into();
325 Ok(vec![(
326 region_id,
327 RegionRequest::Truncate(RegionTruncateRequest {}),
328 )])
329}
330
331fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
333 let Some(Body::ArrowIpc(request)) = request.body else {
334 return Ok(vec![]);
335 };
336
337 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
338 .with_label_values(&["decode"])
339 .start_timer();
340 let schema_data = FlightData::decode(request.schema.clone()).context(ProstSnafu)?;
341 let payload_data = FlightData::decode(request.payload.clone()).context(ProstSnafu)?;
342 let mut decoder = FlightDecoder::default();
343 let _ = decoder.try_decode(&schema_data).context(FlightCodecSnafu)?;
344 let FlightMessage::Recordbatch(rb) = decoder
345 .try_decode(&payload_data)
346 .context(FlightCodecSnafu)?
347 else {
348 unreachable!("Always expect record batch message after schema");
349 };
350 decoder_timer.observe_duration();
351 let payload = rb.into_df_record_batch();
352 let region_id: RegionId = request.region_id.into();
353 Ok(vec![(
354 region_id,
355 RegionRequest::BulkInserts(RegionBulkInsertsRequest { region_id, payload }),
356 )])
357}
358
359#[derive(Debug)]
361pub struct RegionPutRequest {
362 pub rows: Rows,
364 pub hint: Option<WriteHint>,
366}
367
368#[derive(Debug)]
369pub struct RegionReadRequest {
370 pub request: ScanRequest,
371}
372
373#[derive(Debug)]
375pub struct RegionDeleteRequest {
376 pub rows: Rows,
380}
381
382#[derive(Debug, Clone)]
383pub struct RegionCreateRequest {
384 pub engine: String,
386 pub column_metadatas: Vec<ColumnMetadata>,
388 pub primary_key: Vec<ColumnId>,
390 pub options: HashMap<String, String>,
392 pub region_dir: String,
394}
395
396impl RegionCreateRequest {
397 pub fn validate(&self) -> Result<()> {
399 ensure!(
401 self.column_metadatas
402 .iter()
403 .any(|x| x.semantic_type == SemanticType::Timestamp),
404 InvalidRegionRequestSnafu {
405 region_id: RegionId::new(0, 0),
406 err: "missing timestamp column in create region request".to_string(),
407 }
408 );
409
410 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
412 for (i, c) in self.column_metadatas.iter().enumerate() {
413 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
414 return InvalidRegionRequestSnafu {
415 region_id: RegionId::new(0, 0),
416 err: format!(
417 "duplicate column id {} (at position {} and {}) in create region request",
418 c.column_id, previous, i
419 ),
420 }
421 .fail();
422 }
423 }
424
425 for column_id in &self.primary_key {
427 ensure!(
428 column_id_to_indices.contains_key(column_id),
429 InvalidRegionRequestSnafu {
430 region_id: RegionId::new(0, 0),
431 err: format!(
432 "missing primary key column {} in create region request",
433 column_id
434 ),
435 }
436 );
437 }
438
439 Ok(())
440 }
441
442 pub fn is_physical_table(&self) -> bool {
444 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
445 }
446}
447
448#[derive(Debug, Clone)]
449pub struct RegionDropRequest {
450 pub fast_path: bool,
451}
452
453#[derive(Debug, Clone)]
455pub struct RegionOpenRequest {
456 pub engine: String,
458 pub region_dir: String,
460 pub options: HashMap<String, String>,
462 pub skip_wal_replay: bool,
464}
465
466impl RegionOpenRequest {
467 pub fn is_physical_table(&self) -> bool {
469 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
470 }
471}
472
473#[derive(Debug)]
475pub struct RegionCloseRequest {}
476
477#[derive(Debug, PartialEq, Eq, Clone)]
479pub struct RegionAlterRequest {
480 pub kind: AlterKind,
482}
483
484impl RegionAlterRequest {
485 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
487 self.kind.validate(metadata)?;
488
489 Ok(())
490 }
491
492 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
496 debug_assert!(self.validate(metadata).is_ok());
497 self.kind.need_alter(metadata)
498 }
499}
500
501impl TryFrom<AlterRequest> for RegionAlterRequest {
502 type Error = MetadataError;
503
504 fn try_from(value: AlterRequest) -> Result<Self> {
505 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
506 err: "missing kind in AlterRequest",
507 })?;
508
509 let kind = AlterKind::try_from(kind)?;
510 Ok(RegionAlterRequest { kind })
511 }
512}
513
514#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
516pub enum AlterKind {
517 AddColumns {
519 columns: Vec<AddColumn>,
521 },
522 DropColumns {
524 names: Vec<String>,
526 },
527 ModifyColumnTypes {
529 columns: Vec<ModifyColumnType>,
531 },
532 SetRegionOptions { options: Vec<SetRegionOption> },
534 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
536 SetIndex { options: ApiSetIndexOptions },
538 UnsetIndex { options: ApiUnsetIndexOptions },
540}
541
542#[derive(Debug, PartialEq, Eq, Clone)]
543pub enum ApiSetIndexOptions {
544 Fulltext {
545 column_name: String,
546 options: FulltextOptions,
547 },
548 Inverted {
549 column_name: String,
550 },
551 Skipping {
552 column_name: String,
553 options: SkippingIndexOptions,
554 },
555}
556
557impl ApiSetIndexOptions {
558 pub fn column_name(&self) -> &String {
559 match self {
560 ApiSetIndexOptions::Fulltext { column_name, .. } => column_name,
561 ApiSetIndexOptions::Inverted { column_name } => column_name,
562 ApiSetIndexOptions::Skipping { column_name, .. } => column_name,
563 }
564 }
565
566 pub fn is_fulltext(&self) -> bool {
567 match self {
568 ApiSetIndexOptions::Fulltext { .. } => true,
569 ApiSetIndexOptions::Inverted { .. } => false,
570 ApiSetIndexOptions::Skipping { .. } => false,
571 }
572 }
573}
574
575#[derive(Debug, PartialEq, Eq, Clone)]
576pub enum ApiUnsetIndexOptions {
577 Fulltext { column_name: String },
578 Inverted { column_name: String },
579 Skipping { column_name: String },
580}
581
582impl ApiUnsetIndexOptions {
583 pub fn column_name(&self) -> &String {
584 match self {
585 ApiUnsetIndexOptions::Fulltext { column_name } => column_name,
586 ApiUnsetIndexOptions::Inverted { column_name } => column_name,
587 ApiUnsetIndexOptions::Skipping { column_name } => column_name,
588 }
589 }
590
591 pub fn is_fulltext(&self) -> bool {
592 match self {
593 ApiUnsetIndexOptions::Fulltext { .. } => true,
594 ApiUnsetIndexOptions::Inverted { .. } => false,
595 ApiUnsetIndexOptions::Skipping { .. } => false,
596 }
597 }
598}
599
600impl AlterKind {
601 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
605 match self {
606 AlterKind::AddColumns { columns } => {
607 for col_to_add in columns {
608 col_to_add.validate(metadata)?;
609 }
610 }
611 AlterKind::DropColumns { names } => {
612 for name in names {
613 Self::validate_column_to_drop(name, metadata)?;
614 }
615 }
616 AlterKind::ModifyColumnTypes { columns } => {
617 for col_to_change in columns {
618 col_to_change.validate(metadata)?;
619 }
620 }
621 AlterKind::SetRegionOptions { .. } => {}
622 AlterKind::UnsetRegionOptions { .. } => {}
623 AlterKind::SetIndex { options } => {
624 Self::validate_column_alter_index_option(
625 options.column_name(),
626 metadata,
627 options.is_fulltext(),
628 )?;
629 }
630 AlterKind::UnsetIndex { options } => {
631 Self::validate_column_alter_index_option(
632 options.column_name(),
633 metadata,
634 options.is_fulltext(),
635 )?;
636 }
637 }
638 Ok(())
639 }
640
641 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
643 debug_assert!(self.validate(metadata).is_ok());
644 match self {
645 AlterKind::AddColumns { columns } => columns
646 .iter()
647 .any(|col_to_add| col_to_add.need_alter(metadata)),
648 AlterKind::DropColumns { names } => names
649 .iter()
650 .any(|name| metadata.column_by_name(name).is_some()),
651 AlterKind::ModifyColumnTypes { columns } => columns
652 .iter()
653 .any(|col_to_change| col_to_change.need_alter(metadata)),
654 AlterKind::SetRegionOptions { .. } => {
655 true
658 }
659 AlterKind::UnsetRegionOptions { .. } => true,
660 AlterKind::SetIndex { options, .. } => {
661 metadata.column_by_name(options.column_name()).is_some()
662 }
663 AlterKind::UnsetIndex { options } => {
664 metadata.column_by_name(options.column_name()).is_some()
665 }
666 }
667 }
668
669 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
671 let Some(column) = metadata.column_by_name(name) else {
672 return Ok(());
673 };
674 ensure!(
675 column.semantic_type == SemanticType::Field,
676 InvalidRegionRequestSnafu {
677 region_id: metadata.region_id,
678 err: format!("column {} is not a field and could not be dropped", name),
679 }
680 );
681 Ok(())
682 }
683
684 fn validate_column_alter_index_option(
686 column_name: &String,
687 metadata: &RegionMetadata,
688 is_fulltext: bool,
689 ) -> Result<()> {
690 let column = metadata
691 .column_by_name(column_name)
692 .context(InvalidRegionRequestSnafu {
693 region_id: metadata.region_id,
694 err: format!("column {} not found", column_name),
695 })?;
696
697 if is_fulltext {
698 ensure!(
699 column.column_schema.data_type.is_string(),
700 InvalidRegionRequestSnafu {
701 region_id: metadata.region_id,
702 err: format!(
703 "cannot change alter index options for non-string column {}",
704 column_name
705 ),
706 }
707 );
708 }
709
710 Ok(())
711 }
712}
713
714impl TryFrom<alter_request::Kind> for AlterKind {
715 type Error = MetadataError;
716
717 fn try_from(kind: alter_request::Kind) -> Result<Self> {
718 let alter_kind = match kind {
719 alter_request::Kind::AddColumns(x) => {
720 let columns = x
721 .add_columns
722 .into_iter()
723 .map(|x| x.try_into())
724 .collect::<Result<Vec<_>>>()?;
725 AlterKind::AddColumns { columns }
726 }
727 alter_request::Kind::ModifyColumnTypes(x) => {
728 let columns = x
729 .modify_column_types
730 .into_iter()
731 .map(|x| x.into())
732 .collect::<Vec<_>>();
733 AlterKind::ModifyColumnTypes { columns }
734 }
735 alter_request::Kind::DropColumns(x) => {
736 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
737 AlterKind::DropColumns { names }
738 }
739 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
740 options: options
741 .table_options
742 .iter()
743 .map(TryFrom::try_from)
744 .collect::<Result<Vec<_>>>()?,
745 },
746 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
747 keys: options
748 .keys
749 .iter()
750 .map(|key| UnsetRegionOption::try_from(key.as_str()))
751 .collect::<Result<Vec<_>>>()?,
752 },
753 alter_request::Kind::SetIndex(o) => match o.options.unwrap() {
754 set_index::Options::Fulltext(x) => AlterKind::SetIndex {
755 options: ApiSetIndexOptions::Fulltext {
756 column_name: x.column_name.clone(),
757 options: FulltextOptions {
758 enable: x.enable,
759 analyzer: as_fulltext_option_analyzer(
760 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
761 ),
762 case_sensitive: x.case_sensitive,
763 backend: as_fulltext_option_backend(
764 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
765 ),
766 },
767 },
768 },
769 set_index::Options::Inverted(i) => AlterKind::SetIndex {
770 options: ApiSetIndexOptions::Inverted {
771 column_name: i.column_name,
772 },
773 },
774 set_index::Options::Skipping(s) => AlterKind::SetIndex {
775 options: ApiSetIndexOptions::Skipping {
776 column_name: s.column_name,
777 options: SkippingIndexOptions {
778 index_type: as_skipping_index_type(
779 PbSkippingIndexType::try_from(s.skipping_index_type)
780 .context(DecodeProtoSnafu)?,
781 ),
782 granularity: s.granularity as u32,
783 },
784 },
785 },
786 },
787 alter_request::Kind::UnsetIndex(o) => match o.options.unwrap() {
788 v1::unset_index::Options::Fulltext(f) => AlterKind::UnsetIndex {
789 options: ApiUnsetIndexOptions::Fulltext {
790 column_name: f.column_name,
791 },
792 },
793 v1::unset_index::Options::Inverted(i) => AlterKind::UnsetIndex {
794 options: ApiUnsetIndexOptions::Inverted {
795 column_name: i.column_name,
796 },
797 },
798 v1::unset_index::Options::Skipping(s) => AlterKind::UnsetIndex {
799 options: ApiUnsetIndexOptions::Skipping {
800 column_name: s.column_name,
801 },
802 },
803 },
804 };
805
806 Ok(alter_kind)
807 }
808}
809
810#[derive(Debug, PartialEq, Eq, Clone)]
812pub struct AddColumn {
813 pub column_metadata: ColumnMetadata,
815 pub location: Option<AddColumnLocation>,
818}
819
820impl AddColumn {
821 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
826 ensure!(
827 self.column_metadata.column_schema.is_nullable()
828 || self
829 .column_metadata
830 .column_schema
831 .default_constraint()
832 .is_some(),
833 InvalidRegionRequestSnafu {
834 region_id: metadata.region_id,
835 err: format!(
836 "no default value for column {}",
837 self.column_metadata.column_schema.name
838 ),
839 }
840 );
841
842 if let Some(existing_column) =
843 metadata.column_by_name(&self.column_metadata.column_schema.name)
844 {
845 ensure!(
847 *existing_column == self.column_metadata,
848 InvalidRegionRequestSnafu {
849 region_id: metadata.region_id,
850 err: format!(
851 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
852 self.column_metadata.column_schema.name, existing_column, self.column_metadata,
853 ),
854 }
855 );
856 ensure!(
857 self.location.is_none(),
858 InvalidRegionRequestSnafu {
859 region_id: metadata.region_id,
860 err: format!(
861 "column {} already exists, but location is specified",
862 self.column_metadata.column_schema.name
863 ),
864 }
865 );
866 }
867
868 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
869 ensure!(
871 existing_column.column_schema.name == self.column_metadata.column_schema.name,
872 InvalidRegionRequestSnafu {
873 region_id: metadata.region_id,
874 err: format!(
875 "column id {} already exists with different name {}",
876 self.column_metadata.column_id, existing_column.column_schema.name
877 ),
878 }
879 );
880 }
881
882 Ok(())
883 }
884
885 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
887 debug_assert!(self.validate(metadata).is_ok());
888 metadata
889 .column_by_name(&self.column_metadata.column_schema.name)
890 .is_none()
891 }
892}
893
894impl TryFrom<v1::region::AddColumn> for AddColumn {
895 type Error = MetadataError;
896
897 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
898 let column_def = add_column
899 .column_def
900 .context(InvalidRawRegionRequestSnafu {
901 err: "missing column_def in AddColumn",
902 })?;
903
904 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
905 let location = add_column
906 .location
907 .map(AddColumnLocation::try_from)
908 .transpose()?;
909
910 Ok(AddColumn {
911 column_metadata,
912 location,
913 })
914 }
915}
916
917#[derive(Debug, PartialEq, Eq, Clone)]
919pub enum AddColumnLocation {
920 First,
922 After {
924 column_name: String,
926 },
927}
928
929impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
930 type Error = MetadataError;
931
932 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
933 let location_type = LocationType::try_from(location.location_type)
934 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
935 let add_column_location = match location_type {
936 LocationType::First => AddColumnLocation::First,
937 LocationType::After => AddColumnLocation::After {
938 column_name: location.after_column_name,
939 },
940 };
941
942 Ok(add_column_location)
943 }
944}
945
946#[derive(Debug, PartialEq, Eq, Clone)]
948pub struct ModifyColumnType {
949 pub column_name: String,
951 pub target_type: ConcreteDataType,
953}
954
955impl ModifyColumnType {
956 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
958 let column_meta = metadata
959 .column_by_name(&self.column_name)
960 .with_context(|| InvalidRegionRequestSnafu {
961 region_id: metadata.region_id,
962 err: format!("column {} not found", self.column_name),
963 })?;
964
965 ensure!(
966 matches!(column_meta.semantic_type, SemanticType::Field),
967 InvalidRegionRequestSnafu {
968 region_id: metadata.region_id,
969 err: "'timestamp' or 'tag' column cannot change type".to_string()
970 }
971 );
972 ensure!(
973 column_meta
974 .column_schema
975 .data_type
976 .can_arrow_type_cast_to(&self.target_type),
977 InvalidRegionRequestSnafu {
978 region_id: metadata.region_id,
979 err: format!(
980 "column '{}' cannot be cast automatically to type '{}'",
981 self.column_name, self.target_type
982 ),
983 }
984 );
985
986 Ok(())
987 }
988
989 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
991 debug_assert!(self.validate(metadata).is_ok());
992 metadata.column_by_name(&self.column_name).is_some()
993 }
994}
995
996impl From<v1::ModifyColumnType> for ModifyColumnType {
997 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
998 let target_type = ColumnDataTypeWrapper::new(
999 modify_column_type.target_type(),
1000 modify_column_type.target_type_extension,
1001 )
1002 .into();
1003
1004 ModifyColumnType {
1005 column_name: modify_column_type.column_name,
1006 target_type,
1007 }
1008 }
1009}
1010
1011#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1012pub enum SetRegionOption {
1013 Ttl(Option<TimeToLive>),
1014 Twsc(String, String),
1016}
1017
1018impl TryFrom<&PbOption> for SetRegionOption {
1019 type Error = MetadataError;
1020
1021 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1022 let PbOption { key, value } = value;
1023 match key.as_str() {
1024 TTL_KEY => {
1025 let ttl = TimeToLive::from_humantime_or_str(value)
1026 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1027
1028 Ok(Self::Ttl(Some(ttl)))
1029 }
1030 TWCS_MAX_ACTIVE_WINDOW_RUNS
1031 | TWCS_MAX_ACTIVE_WINDOW_FILES
1032 | TWCS_MAX_INACTIVE_WINDOW_FILES
1033 | TWCS_MAX_INACTIVE_WINDOW_RUNS
1034 | TWCS_MAX_OUTPUT_FILE_SIZE
1035 | TWCS_TIME_WINDOW => Ok(Self::Twsc(key.to_string(), value.to_string())),
1036 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1037 }
1038 }
1039}
1040
1041impl From<&UnsetRegionOption> for SetRegionOption {
1042 fn from(unset_option: &UnsetRegionOption) -> Self {
1043 match unset_option {
1044 UnsetRegionOption::TwcsMaxActiveWindowFiles => {
1045 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1046 }
1047 UnsetRegionOption::TwcsMaxInactiveWindowFiles => {
1048 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1049 }
1050 UnsetRegionOption::TwcsMaxActiveWindowRuns => {
1051 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1052 }
1053 UnsetRegionOption::TwcsMaxInactiveWindowRuns => {
1054 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1055 }
1056 UnsetRegionOption::TwcsMaxOutputFileSize => {
1057 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1058 }
1059 UnsetRegionOption::TwcsTimeWindow => {
1060 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1061 }
1062 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1063 }
1064 }
1065}
1066
1067impl TryFrom<&str> for UnsetRegionOption {
1068 type Error = MetadataError;
1069
1070 fn try_from(key: &str) -> Result<Self> {
1071 match key.to_ascii_lowercase().as_str() {
1072 TTL_KEY => Ok(Self::Ttl),
1073 TWCS_MAX_ACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxActiveWindowFiles),
1074 TWCS_MAX_INACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxInactiveWindowFiles),
1075 TWCS_MAX_ACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxActiveWindowRuns),
1076 TWCS_MAX_INACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxInactiveWindowRuns),
1077 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1078 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1079 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1080 }
1081 }
1082}
1083
1084#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1085pub enum UnsetRegionOption {
1086 TwcsMaxActiveWindowFiles,
1087 TwcsMaxInactiveWindowFiles,
1088 TwcsMaxActiveWindowRuns,
1089 TwcsMaxInactiveWindowRuns,
1090 TwcsMaxOutputFileSize,
1091 TwcsTimeWindow,
1092 Ttl,
1093}
1094
1095impl UnsetRegionOption {
1096 pub fn as_str(&self) -> &str {
1097 match self {
1098 Self::Ttl => TTL_KEY,
1099 Self::TwcsMaxActiveWindowFiles => TWCS_MAX_ACTIVE_WINDOW_FILES,
1100 Self::TwcsMaxInactiveWindowFiles => TWCS_MAX_INACTIVE_WINDOW_FILES,
1101 Self::TwcsMaxActiveWindowRuns => TWCS_MAX_ACTIVE_WINDOW_RUNS,
1102 Self::TwcsMaxInactiveWindowRuns => TWCS_MAX_INACTIVE_WINDOW_RUNS,
1103 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1104 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1105 }
1106 }
1107}
1108
1109impl Display for UnsetRegionOption {
1110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1111 write!(f, "{}", self.as_str())
1112 }
1113}
1114
1115#[derive(Debug, Clone, Default)]
1116pub struct RegionFlushRequest {
1117 pub row_group_size: Option<usize>,
1118}
1119
1120#[derive(Debug)]
1121pub struct RegionCompactRequest {
1122 pub options: compact_request::Options,
1123}
1124
1125impl Default for RegionCompactRequest {
1126 fn default() -> Self {
1127 Self {
1128 options: compact_request::Options::Regular(Default::default()),
1130 }
1131 }
1132}
1133
1134#[derive(Debug)]
1136pub struct RegionTruncateRequest {}
1137
1138#[derive(Debug, Clone, Copy)]
1143pub struct RegionCatchupRequest {
1144 pub set_writable: bool,
1146 pub entry_id: Option<entry::Id>,
1149 pub metadata_entry_id: Option<entry::Id>,
1153 pub location_id: Option<u64>,
1155}
1156
1157#[derive(Debug, Clone)]
1159pub struct RegionSequencesRequest {
1160 pub region_ids: Vec<RegionId>,
1161}
1162
1163#[derive(Debug, Clone)]
1164pub struct RegionBulkInsertsRequest {
1165 pub region_id: RegionId,
1166 pub payload: DfRecordBatch,
1167}
1168
1169impl RegionBulkInsertsRequest {
1170 pub fn estimated_size(&self) -> usize {
1171 self.payload.get_array_memory_size()
1172 }
1173}
1174
1175impl fmt::Display for RegionRequest {
1176 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1177 match self {
1178 RegionRequest::Put(_) => write!(f, "Put"),
1179 RegionRequest::Delete(_) => write!(f, "Delete"),
1180 RegionRequest::Create(_) => write!(f, "Create"),
1181 RegionRequest::Drop(_) => write!(f, "Drop"),
1182 RegionRequest::Open(_) => write!(f, "Open"),
1183 RegionRequest::Close(_) => write!(f, "Close"),
1184 RegionRequest::Alter(_) => write!(f, "Alter"),
1185 RegionRequest::Flush(_) => write!(f, "Flush"),
1186 RegionRequest::Compact(_) => write!(f, "Compact"),
1187 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1188 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1189 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1190 }
1191 }
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196 use api::v1::region::RegionColumnDef;
1197 use api::v1::{ColumnDataType, ColumnDef};
1198 use datatypes::prelude::ConcreteDataType;
1199 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1200
1201 use super::*;
1202 use crate::metadata::RegionMetadataBuilder;
1203
1204 #[test]
1205 fn test_from_proto_location() {
1206 let proto_location = v1::AddColumnLocation {
1207 location_type: LocationType::First as i32,
1208 after_column_name: String::default(),
1209 };
1210 let location = AddColumnLocation::try_from(proto_location).unwrap();
1211 assert_eq!(location, AddColumnLocation::First);
1212
1213 let proto_location = v1::AddColumnLocation {
1214 location_type: 10,
1215 after_column_name: String::default(),
1216 };
1217 AddColumnLocation::try_from(proto_location).unwrap_err();
1218
1219 let proto_location = v1::AddColumnLocation {
1220 location_type: LocationType::After as i32,
1221 after_column_name: "a".to_string(),
1222 };
1223 let location = AddColumnLocation::try_from(proto_location).unwrap();
1224 assert_eq!(
1225 location,
1226 AddColumnLocation::After {
1227 column_name: "a".to_string()
1228 }
1229 );
1230 }
1231
1232 #[test]
1233 fn test_from_none_proto_add_column() {
1234 AddColumn::try_from(v1::region::AddColumn {
1235 column_def: None,
1236 location: None,
1237 })
1238 .unwrap_err();
1239 }
1240
1241 #[test]
1242 fn test_from_proto_alter_request() {
1243 RegionAlterRequest::try_from(AlterRequest {
1244 region_id: 0,
1245 schema_version: 1,
1246 kind: None,
1247 })
1248 .unwrap_err();
1249
1250 let request = RegionAlterRequest::try_from(AlterRequest {
1251 region_id: 0,
1252 schema_version: 1,
1253 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1254 add_columns: vec![v1::region::AddColumn {
1255 column_def: Some(RegionColumnDef {
1256 column_def: Some(ColumnDef {
1257 name: "a".to_string(),
1258 data_type: ColumnDataType::String as i32,
1259 is_nullable: true,
1260 default_constraint: vec![],
1261 semantic_type: SemanticType::Field as i32,
1262 comment: String::new(),
1263 ..Default::default()
1264 }),
1265 column_id: 1,
1266 }),
1267 location: Some(v1::AddColumnLocation {
1268 location_type: LocationType::First as i32,
1269 after_column_name: String::default(),
1270 }),
1271 }],
1272 })),
1273 })
1274 .unwrap();
1275
1276 assert_eq!(
1277 request,
1278 RegionAlterRequest {
1279 kind: AlterKind::AddColumns {
1280 columns: vec![AddColumn {
1281 column_metadata: ColumnMetadata {
1282 column_schema: ColumnSchema::new(
1283 "a",
1284 ConcreteDataType::string_datatype(),
1285 true,
1286 ),
1287 semantic_type: SemanticType::Field,
1288 column_id: 1,
1289 },
1290 location: Some(AddColumnLocation::First),
1291 }]
1292 },
1293 }
1294 );
1295 }
1296
1297 fn new_metadata() -> RegionMetadata {
1300 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1301 builder
1302 .push_column_metadata(ColumnMetadata {
1303 column_schema: ColumnSchema::new(
1304 "ts",
1305 ConcreteDataType::timestamp_millisecond_datatype(),
1306 false,
1307 ),
1308 semantic_type: SemanticType::Timestamp,
1309 column_id: 1,
1310 })
1311 .push_column_metadata(ColumnMetadata {
1312 column_schema: ColumnSchema::new(
1313 "tag_0",
1314 ConcreteDataType::string_datatype(),
1315 true,
1316 ),
1317 semantic_type: SemanticType::Tag,
1318 column_id: 2,
1319 })
1320 .push_column_metadata(ColumnMetadata {
1321 column_schema: ColumnSchema::new(
1322 "field_0",
1323 ConcreteDataType::string_datatype(),
1324 true,
1325 ),
1326 semantic_type: SemanticType::Field,
1327 column_id: 3,
1328 })
1329 .push_column_metadata(ColumnMetadata {
1330 column_schema: ColumnSchema::new(
1331 "field_1",
1332 ConcreteDataType::boolean_datatype(),
1333 true,
1334 ),
1335 semantic_type: SemanticType::Field,
1336 column_id: 4,
1337 })
1338 .primary_key(vec![2]);
1339 builder.build().unwrap()
1340 }
1341
1342 #[test]
1343 fn test_add_column_validate() {
1344 let metadata = new_metadata();
1345 let add_column = AddColumn {
1346 column_metadata: ColumnMetadata {
1347 column_schema: ColumnSchema::new(
1348 "tag_1",
1349 ConcreteDataType::string_datatype(),
1350 true,
1351 ),
1352 semantic_type: SemanticType::Tag,
1353 column_id: 5,
1354 },
1355 location: None,
1356 };
1357 add_column.validate(&metadata).unwrap();
1358 assert!(add_column.need_alter(&metadata));
1359
1360 AddColumn {
1362 column_metadata: ColumnMetadata {
1363 column_schema: ColumnSchema::new(
1364 "tag_1",
1365 ConcreteDataType::string_datatype(),
1366 false,
1367 ),
1368 semantic_type: SemanticType::Tag,
1369 column_id: 5,
1370 },
1371 location: None,
1372 }
1373 .validate(&metadata)
1374 .unwrap_err();
1375
1376 let add_column = AddColumn {
1378 column_metadata: ColumnMetadata {
1379 column_schema: ColumnSchema::new(
1380 "tag_0",
1381 ConcreteDataType::string_datatype(),
1382 true,
1383 ),
1384 semantic_type: SemanticType::Tag,
1385 column_id: 2,
1386 },
1387 location: None,
1388 };
1389 add_column.validate(&metadata).unwrap();
1390 assert!(!add_column.need_alter(&metadata));
1391 }
1392
1393 #[test]
1394 fn test_add_duplicate_columns() {
1395 let kind = AlterKind::AddColumns {
1396 columns: vec![
1397 AddColumn {
1398 column_metadata: ColumnMetadata {
1399 column_schema: ColumnSchema::new(
1400 "tag_1",
1401 ConcreteDataType::string_datatype(),
1402 true,
1403 ),
1404 semantic_type: SemanticType::Tag,
1405 column_id: 5,
1406 },
1407 location: None,
1408 },
1409 AddColumn {
1410 column_metadata: ColumnMetadata {
1411 column_schema: ColumnSchema::new(
1412 "tag_1",
1413 ConcreteDataType::string_datatype(),
1414 true,
1415 ),
1416 semantic_type: SemanticType::Field,
1417 column_id: 6,
1418 },
1419 location: None,
1420 },
1421 ],
1422 };
1423 let metadata = new_metadata();
1424 kind.validate(&metadata).unwrap();
1425 assert!(kind.need_alter(&metadata));
1426 }
1427
1428 #[test]
1429 fn test_add_existing_column_different_metadata() {
1430 let metadata = new_metadata();
1431
1432 let kind = AlterKind::AddColumns {
1434 columns: vec![AddColumn {
1435 column_metadata: ColumnMetadata {
1436 column_schema: ColumnSchema::new(
1437 "tag_0",
1438 ConcreteDataType::string_datatype(),
1439 true,
1440 ),
1441 semantic_type: SemanticType::Tag,
1442 column_id: 4,
1443 },
1444 location: None,
1445 }],
1446 };
1447 kind.validate(&metadata).unwrap_err();
1448
1449 let kind = AlterKind::AddColumns {
1451 columns: vec![AddColumn {
1452 column_metadata: ColumnMetadata {
1453 column_schema: ColumnSchema::new(
1454 "tag_0",
1455 ConcreteDataType::int64_datatype(),
1456 true,
1457 ),
1458 semantic_type: SemanticType::Tag,
1459 column_id: 2,
1460 },
1461 location: None,
1462 }],
1463 };
1464 kind.validate(&metadata).unwrap_err();
1465
1466 let kind = AlterKind::AddColumns {
1468 columns: vec![AddColumn {
1469 column_metadata: ColumnMetadata {
1470 column_schema: ColumnSchema::new(
1471 "tag_1",
1472 ConcreteDataType::string_datatype(),
1473 true,
1474 ),
1475 semantic_type: SemanticType::Tag,
1476 column_id: 2,
1477 },
1478 location: None,
1479 }],
1480 };
1481 kind.validate(&metadata).unwrap_err();
1482 }
1483
1484 #[test]
1485 fn test_add_existing_column_with_location() {
1486 let metadata = new_metadata();
1487 let kind = AlterKind::AddColumns {
1488 columns: vec![AddColumn {
1489 column_metadata: ColumnMetadata {
1490 column_schema: ColumnSchema::new(
1491 "tag_0",
1492 ConcreteDataType::string_datatype(),
1493 true,
1494 ),
1495 semantic_type: SemanticType::Tag,
1496 column_id: 2,
1497 },
1498 location: Some(AddColumnLocation::First),
1499 }],
1500 };
1501 kind.validate(&metadata).unwrap_err();
1502 }
1503
1504 #[test]
1505 fn test_validate_drop_column() {
1506 let metadata = new_metadata();
1507 let kind = AlterKind::DropColumns {
1508 names: vec!["xxxx".to_string()],
1509 };
1510 kind.validate(&metadata).unwrap();
1511 assert!(!kind.need_alter(&metadata));
1512
1513 AlterKind::DropColumns {
1514 names: vec!["tag_0".to_string()],
1515 }
1516 .validate(&metadata)
1517 .unwrap_err();
1518
1519 let kind = AlterKind::DropColumns {
1520 names: vec!["field_0".to_string()],
1521 };
1522 kind.validate(&metadata).unwrap();
1523 assert!(kind.need_alter(&metadata));
1524 }
1525
1526 #[test]
1527 fn test_validate_modify_column_type() {
1528 let metadata = new_metadata();
1529 AlterKind::ModifyColumnTypes {
1530 columns: vec![ModifyColumnType {
1531 column_name: "xxxx".to_string(),
1532 target_type: ConcreteDataType::string_datatype(),
1533 }],
1534 }
1535 .validate(&metadata)
1536 .unwrap_err();
1537
1538 AlterKind::ModifyColumnTypes {
1539 columns: vec![ModifyColumnType {
1540 column_name: "field_1".to_string(),
1541 target_type: ConcreteDataType::date_datatype(),
1542 }],
1543 }
1544 .validate(&metadata)
1545 .unwrap_err();
1546
1547 AlterKind::ModifyColumnTypes {
1548 columns: vec![ModifyColumnType {
1549 column_name: "ts".to_string(),
1550 target_type: ConcreteDataType::date_datatype(),
1551 }],
1552 }
1553 .validate(&metadata)
1554 .unwrap_err();
1555
1556 AlterKind::ModifyColumnTypes {
1557 columns: vec![ModifyColumnType {
1558 column_name: "tag_0".to_string(),
1559 target_type: ConcreteDataType::date_datatype(),
1560 }],
1561 }
1562 .validate(&metadata)
1563 .unwrap_err();
1564
1565 let kind = AlterKind::ModifyColumnTypes {
1566 columns: vec![ModifyColumnType {
1567 column_name: "field_0".to_string(),
1568 target_type: ConcreteDataType::int32_datatype(),
1569 }],
1570 };
1571 kind.validate(&metadata).unwrap();
1572 assert!(kind.need_alter(&metadata));
1573 }
1574
1575 #[test]
1576 fn test_validate_add_columns() {
1577 let kind = AlterKind::AddColumns {
1578 columns: vec![
1579 AddColumn {
1580 column_metadata: ColumnMetadata {
1581 column_schema: ColumnSchema::new(
1582 "tag_1",
1583 ConcreteDataType::string_datatype(),
1584 true,
1585 ),
1586 semantic_type: SemanticType::Tag,
1587 column_id: 5,
1588 },
1589 location: None,
1590 },
1591 AddColumn {
1592 column_metadata: ColumnMetadata {
1593 column_schema: ColumnSchema::new(
1594 "field_2",
1595 ConcreteDataType::string_datatype(),
1596 true,
1597 ),
1598 semantic_type: SemanticType::Field,
1599 column_id: 6,
1600 },
1601 location: None,
1602 },
1603 ],
1604 };
1605 let request = RegionAlterRequest { kind };
1606 let mut metadata = new_metadata();
1607 metadata.schema_version = 1;
1608 request.validate(&metadata).unwrap();
1609 }
1610
1611 #[test]
1612 fn test_validate_create_region() {
1613 let column_metadatas = vec![
1614 ColumnMetadata {
1615 column_schema: ColumnSchema::new(
1616 "ts",
1617 ConcreteDataType::timestamp_millisecond_datatype(),
1618 false,
1619 ),
1620 semantic_type: SemanticType::Timestamp,
1621 column_id: 1,
1622 },
1623 ColumnMetadata {
1624 column_schema: ColumnSchema::new(
1625 "tag_0",
1626 ConcreteDataType::string_datatype(),
1627 true,
1628 ),
1629 semantic_type: SemanticType::Tag,
1630 column_id: 2,
1631 },
1632 ColumnMetadata {
1633 column_schema: ColumnSchema::new(
1634 "field_0",
1635 ConcreteDataType::string_datatype(),
1636 true,
1637 ),
1638 semantic_type: SemanticType::Field,
1639 column_id: 3,
1640 },
1641 ];
1642 let create = RegionCreateRequest {
1643 engine: "mito".to_string(),
1644 column_metadatas,
1645 primary_key: vec![3, 4],
1646 options: HashMap::new(),
1647 region_dir: "path".to_string(),
1648 };
1649
1650 assert!(create.validate().is_err());
1651 }
1652
1653 #[test]
1654 fn test_validate_modify_column_fulltext_options() {
1655 let kind = AlterKind::SetIndex {
1656 options: ApiSetIndexOptions::Fulltext {
1657 column_name: "tag_0".to_string(),
1658 options: FulltextOptions {
1659 enable: true,
1660 analyzer: FulltextAnalyzer::Chinese,
1661 case_sensitive: false,
1662 backend: FulltextBackend::Bloom,
1663 },
1664 },
1665 };
1666 let request = RegionAlterRequest { kind };
1667 let mut metadata = new_metadata();
1668 metadata.schema_version = 1;
1669 request.validate(&metadata).unwrap();
1670
1671 let kind = AlterKind::UnsetIndex {
1672 options: ApiUnsetIndexOptions::Fulltext {
1673 column_name: "tag_0".to_string(),
1674 },
1675 };
1676 let request = RegionAlterRequest { kind };
1677 let mut metadata = new_metadata();
1678 metadata.schema_version = 1;
1679 request.validate(&metadata).unwrap();
1680 }
1681}