store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest,
26    CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest,
27    DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
28};
29use api::v1::{
30    self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
31    SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::TimeToLive;
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use serde::{Deserialize, Serialize};
40use snafu::{ensure, OptionExt, ResultExt};
41use strum::{AsRefStr, IntoStaticStr};
42
43use crate::logstore::entry;
44use crate::metadata::{
45    ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidIndexOptionSnafu,
46    InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
47    InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu,
48};
49use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
50use crate::metrics;
51use crate::mito_engine_options::{
52    TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
53};
54use crate::path_utils::table_dir;
55use crate::storage::{ColumnId, RegionId, ScanRequest};
56
57/// The type of path to generate.
58#[derive(Debug, Clone, Copy, PartialEq)]
59pub enum PathType {
60    /// A bare path - the original path of an engine.
61    ///
62    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/`.
63    Bare,
64    /// A path for the data region of a metric engine table.
65    ///
66    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/data/`.
67    Data,
68    /// A path for the metadata region of a metric engine table.
69    ///
70    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/metadata/`.
71    Metadata,
72}
73
74#[derive(Debug, IntoStaticStr)]
75pub enum BatchRegionDdlRequest {
76    Create(Vec<(RegionId, RegionCreateRequest)>),
77    Drop(Vec<(RegionId, RegionDropRequest)>),
78    Alter(Vec<(RegionId, RegionAlterRequest)>),
79}
80
81impl BatchRegionDdlRequest {
82    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
83    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
84        match body {
85            region_request::Body::Creates(creates) => {
86                let requests = creates
87                    .requests
88                    .into_iter()
89                    .map(parse_region_create)
90                    .collect::<Result<Vec<_>>>()?;
91                Ok(Some(Self::Create(requests)))
92            }
93            region_request::Body::Drops(drops) => {
94                let requests = drops
95                    .requests
96                    .into_iter()
97                    .map(parse_region_drop)
98                    .collect::<Result<Vec<_>>>()?;
99                Ok(Some(Self::Drop(requests)))
100            }
101            region_request::Body::Alters(alters) => {
102                let requests = alters
103                    .requests
104                    .into_iter()
105                    .map(parse_region_alter)
106                    .collect::<Result<Vec<_>>>()?;
107                Ok(Some(Self::Alter(requests)))
108            }
109            _ => Ok(None),
110        }
111    }
112
113    pub fn request_type(&self) -> &'static str {
114        self.into()
115    }
116
117    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
118        match self {
119            Self::Create(requests) => requests
120                .into_iter()
121                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
122                .collect(),
123            Self::Drop(requests) => requests
124                .into_iter()
125                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
126                .collect(),
127            Self::Alter(requests) => requests
128                .into_iter()
129                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
130                .collect(),
131        }
132    }
133}
134
135#[derive(Debug, IntoStaticStr)]
136pub enum RegionRequest {
137    Put(RegionPutRequest),
138    Delete(RegionDeleteRequest),
139    Create(RegionCreateRequest),
140    Drop(RegionDropRequest),
141    Open(RegionOpenRequest),
142    Close(RegionCloseRequest),
143    Alter(RegionAlterRequest),
144    Flush(RegionFlushRequest),
145    Compact(RegionCompactRequest),
146    Truncate(RegionTruncateRequest),
147    Catchup(RegionCatchupRequest),
148    BulkInserts(RegionBulkInsertsRequest),
149}
150
151impl RegionRequest {
152    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
153    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
154    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
155        match body {
156            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
157            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
158            region_request::Body::Create(create) => make_region_create(create),
159            region_request::Body::Drop(drop) => make_region_drop(drop),
160            region_request::Body::Open(open) => make_region_open(open),
161            region_request::Body::Close(close) => make_region_close(close),
162            region_request::Body::Alter(alter) => make_region_alter(alter),
163            region_request::Body::Flush(flush) => make_region_flush(flush),
164            region_request::Body::Compact(compact) => make_region_compact(compact),
165            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
166            region_request::Body::Creates(creates) => make_region_creates(creates),
167            region_request::Body::Drops(drops) => make_region_drops(drops),
168            region_request::Body::Alters(alters) => make_region_alters(alters),
169            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
170            region_request::Body::Sync(_) => UnexpectedSnafu {
171                reason: "Sync request should be handled separately by RegionServer",
172            }
173            .fail(),
174            region_request::Body::ListMetadata(_) => UnexpectedSnafu {
175                reason: "ListMetadata request should be handled separately by RegionServer",
176            }
177            .fail(),
178        }
179    }
180
181    /// Returns the type name of the request.
182    pub fn request_type(&self) -> &'static str {
183        self.into()
184    }
185}
186
187fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
188    let requests = inserts
189        .requests
190        .into_iter()
191        .filter_map(|r| {
192            let region_id = r.region_id.into();
193            r.rows.map(|rows| {
194                (
195                    region_id,
196                    RegionRequest::Put(RegionPutRequest { rows, hint: None }),
197                )
198            })
199        })
200        .collect();
201    Ok(requests)
202}
203
204fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
205    let requests = deletes
206        .requests
207        .into_iter()
208        .filter_map(|r| {
209            let region_id = r.region_id.into();
210            r.rows.map(|rows| {
211                (
212                    region_id,
213                    RegionRequest::Delete(RegionDeleteRequest { rows }),
214                )
215            })
216        })
217        .collect();
218    Ok(requests)
219}
220
221fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
222    let column_metadatas = create
223        .column_defs
224        .into_iter()
225        .map(ColumnMetadata::try_from_column_def)
226        .collect::<Result<Vec<_>>>()?;
227    let region_id = RegionId::from(create.region_id);
228    let table_dir = table_dir(&create.path, region_id.table_id());
229    Ok((
230        region_id,
231        RegionCreateRequest {
232            engine: create.engine,
233            column_metadatas,
234            primary_key: create.primary_key,
235            options: create.options,
236            table_dir,
237            path_type: PathType::Bare,
238        },
239    ))
240}
241
242fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
243    let (region_id, request) = parse_region_create(create)?;
244    Ok(vec![(region_id, RegionRequest::Create(request))])
245}
246
247fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
248    let mut requests = Vec::with_capacity(creates.requests.len());
249    for create in creates.requests {
250        requests.extend(make_region_create(create)?);
251    }
252    Ok(requests)
253}
254
255fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
256    let region_id = drop.region_id.into();
257    Ok((
258        region_id,
259        RegionDropRequest {
260            fast_path: drop.fast_path,
261        },
262    ))
263}
264
265fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
266    let (region_id, request) = parse_region_drop(drop)?;
267    Ok(vec![(region_id, RegionRequest::Drop(request))])
268}
269
270fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
271    let mut requests = Vec::with_capacity(drops.requests.len());
272    for drop in drops.requests {
273        requests.extend(make_region_drop(drop)?);
274    }
275    Ok(requests)
276}
277
278fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
279    let region_id = RegionId::from(open.region_id);
280    let table_dir = table_dir(&open.path, region_id.table_id());
281    Ok(vec![(
282        region_id,
283        RegionRequest::Open(RegionOpenRequest {
284            engine: open.engine,
285            table_dir,
286            path_type: PathType::Bare,
287            options: open.options,
288            skip_wal_replay: false,
289        }),
290    )])
291}
292
293fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
294    let region_id = close.region_id.into();
295    Ok(vec![(
296        region_id,
297        RegionRequest::Close(RegionCloseRequest {}),
298    )])
299}
300
301fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
302    let region_id = alter.region_id.into();
303    let request = RegionAlterRequest::try_from(alter)?;
304    Ok((region_id, request))
305}
306
307fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
308    let (region_id, request) = parse_region_alter(alter)?;
309    Ok(vec![(region_id, RegionRequest::Alter(request))])
310}
311
312fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
313    let mut requests = Vec::with_capacity(alters.requests.len());
314    for alter in alters.requests {
315        requests.extend(make_region_alter(alter)?);
316    }
317    Ok(requests)
318}
319
320fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
321    let region_id = flush.region_id.into();
322    Ok(vec![(
323        region_id,
324        RegionRequest::Flush(RegionFlushRequest {
325            row_group_size: None,
326        }),
327    )])
328}
329
330fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
331    let region_id = compact.region_id.into();
332    let options = compact
333        .options
334        .unwrap_or(compact_request::Options::Regular(Default::default()));
335    Ok(vec![(
336        region_id,
337        RegionRequest::Compact(RegionCompactRequest { options }),
338    )])
339}
340
341fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
342    let region_id = truncate.region_id.into();
343    Ok(vec![(
344        region_id,
345        RegionRequest::Truncate(RegionTruncateRequest {}),
346    )])
347}
348
349/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
350fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
351    let region_id = request.region_id.into();
352    let Some(Body::ArrowIpc(request)) = request.body else {
353        return Ok(vec![]);
354    };
355
356    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
357        .with_label_values(&["decode"])
358        .start_timer();
359    let mut decoder =
360        FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
361    let payload = decoder
362        .try_decode_record_batch(&request.data_header, &request.payload)
363        .context(FlightCodecSnafu)?;
364    decoder_timer.observe_duration();
365    Ok(vec![(
366        region_id,
367        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
368            region_id,
369            payload,
370            raw_data: request,
371        }),
372    )])
373}
374
375/// Request to put data into a region.
376#[derive(Debug)]
377pub struct RegionPutRequest {
378    /// Rows to put.
379    pub rows: Rows,
380    /// Write hint.
381    pub hint: Option<WriteHint>,
382}
383
384#[derive(Debug)]
385pub struct RegionReadRequest {
386    pub request: ScanRequest,
387}
388
389/// Request to delete data from a region.
390#[derive(Debug)]
391pub struct RegionDeleteRequest {
392    /// Keys to rows to delete.
393    ///
394    /// Each row only contains primary key columns and a time index column.
395    pub rows: Rows,
396}
397
398#[derive(Debug, Clone)]
399pub struct RegionCreateRequest {
400    /// Region engine name
401    pub engine: String,
402    /// Columns in this region.
403    pub column_metadatas: Vec<ColumnMetadata>,
404    /// Columns in the primary key.
405    pub primary_key: Vec<ColumnId>,
406    /// Options of the created region.
407    pub options: HashMap<String, String>,
408    /// Directory for table's data home. Usually is composed by catalog and table id
409    pub table_dir: String,
410    /// Path type for generating paths
411    pub path_type: PathType,
412}
413
414impl RegionCreateRequest {
415    /// Checks whether the request is valid, returns an error if it is invalid.
416    pub fn validate(&self) -> Result<()> {
417        // time index must exist
418        ensure!(
419            self.column_metadatas
420                .iter()
421                .any(|x| x.semantic_type == SemanticType::Timestamp),
422            InvalidRegionRequestSnafu {
423                region_id: RegionId::new(0, 0),
424                err: "missing timestamp column in create region request".to_string(),
425            }
426        );
427
428        // build column id to indices
429        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
430        for (i, c) in self.column_metadatas.iter().enumerate() {
431            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
432                return InvalidRegionRequestSnafu {
433                    region_id: RegionId::new(0, 0),
434                    err: format!(
435                        "duplicate column id {} (at position {} and {}) in create region request",
436                        c.column_id, previous, i
437                    ),
438                }
439                .fail();
440            }
441        }
442
443        // primary key must exist
444        for column_id in &self.primary_key {
445            ensure!(
446                column_id_to_indices.contains_key(column_id),
447                InvalidRegionRequestSnafu {
448                    region_id: RegionId::new(0, 0),
449                    err: format!(
450                        "missing primary key column {} in create region request",
451                        column_id
452                    ),
453                }
454            );
455        }
456
457        Ok(())
458    }
459
460    /// Returns true when the region belongs to the metric engine's physical table.
461    pub fn is_physical_table(&self) -> bool {
462        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
463    }
464}
465
466#[derive(Debug, Clone)]
467pub struct RegionDropRequest {
468    pub fast_path: bool,
469}
470
471/// Open region request.
472#[derive(Debug, Clone)]
473pub struct RegionOpenRequest {
474    /// Region engine name
475    pub engine: String,
476    /// Directory for table's data home. Usually is composed by catalog and table id
477    pub table_dir: String,
478    /// Path type for generating paths
479    pub path_type: PathType,
480    /// Options of the opened region.
481    pub options: HashMap<String, String>,
482    /// To skip replaying the WAL.
483    pub skip_wal_replay: bool,
484}
485
486impl RegionOpenRequest {
487    /// Returns true when the region belongs to the metric engine's physical table.
488    pub fn is_physical_table(&self) -> bool {
489        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
490    }
491}
492
493/// Close region request.
494#[derive(Debug)]
495pub struct RegionCloseRequest {}
496
497/// Alter metadata of a region.
498#[derive(Debug, PartialEq, Eq, Clone)]
499pub struct RegionAlterRequest {
500    /// Kind of alteration to do.
501    pub kind: AlterKind,
502}
503
504impl RegionAlterRequest {
505    /// Checks whether the request is valid, returns an error if it is invalid.
506    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
507        self.kind.validate(metadata)?;
508
509        Ok(())
510    }
511
512    /// Returns true if we need to apply the request to the region.
513    ///
514    /// The `request` should be valid.
515    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
516        debug_assert!(self.validate(metadata).is_ok());
517        self.kind.need_alter(metadata)
518    }
519}
520
521impl TryFrom<AlterRequest> for RegionAlterRequest {
522    type Error = MetadataError;
523
524    fn try_from(value: AlterRequest) -> Result<Self> {
525        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
526            err: "missing kind in AlterRequest",
527        })?;
528
529        let kind = AlterKind::try_from(kind)?;
530        Ok(RegionAlterRequest { kind })
531    }
532}
533
534/// Kind of the alteration.
535#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
536pub enum AlterKind {
537    /// Add columns to the region.
538    AddColumns {
539        /// Columns to add.
540        columns: Vec<AddColumn>,
541    },
542    /// Drop columns from the region, only fields are allowed to drop.
543    DropColumns {
544        /// Name of columns to drop.
545        names: Vec<String>,
546    },
547    /// Change columns datatype form the region, only fields are allowed to change.
548    ModifyColumnTypes {
549        /// Columns to change.
550        columns: Vec<ModifyColumnType>,
551    },
552    /// Set region options.
553    SetRegionOptions { options: Vec<SetRegionOption> },
554    /// Unset region options.
555    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
556    /// Set index options.
557    SetIndexes { options: Vec<SetIndexOption> },
558    /// Unset index options.
559    UnsetIndexes { options: Vec<UnsetIndexOption> },
560    /// Drop column default value.
561    DropDefaults {
562        /// Name of columns to drop.
563        names: Vec<String>,
564    },
565}
566
567#[derive(Debug, PartialEq, Eq, Clone)]
568pub enum SetIndexOption {
569    Fulltext {
570        column_name: String,
571        options: FulltextOptions,
572    },
573    Inverted {
574        column_name: String,
575    },
576    Skipping {
577        column_name: String,
578        options: SkippingIndexOptions,
579    },
580}
581
582impl SetIndexOption {
583    /// Returns the column name of the index option.
584    pub fn column_name(&self) -> &String {
585        match self {
586            SetIndexOption::Fulltext { column_name, .. } => column_name,
587            SetIndexOption::Inverted { column_name } => column_name,
588            SetIndexOption::Skipping { column_name, .. } => column_name,
589        }
590    }
591
592    /// Returns true if the index option is fulltext.
593    pub fn is_fulltext(&self) -> bool {
594        match self {
595            SetIndexOption::Fulltext { .. } => true,
596            SetIndexOption::Inverted { .. } => false,
597            SetIndexOption::Skipping { .. } => false,
598        }
599    }
600}
601
602impl TryFrom<v1::SetIndex> for SetIndexOption {
603    type Error = MetadataError;
604
605    fn try_from(value: v1::SetIndex) -> Result<Self> {
606        let option = value.options.context(InvalidRawRegionRequestSnafu {
607            err: "missing options in SetIndex",
608        })?;
609
610        let opt = match option {
611            v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
612                column_name: x.column_name.clone(),
613                options: FulltextOptions::new(
614                    x.enable,
615                    as_fulltext_option_analyzer(
616                        Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
617                    ),
618                    x.case_sensitive,
619                    as_fulltext_option_backend(
620                        PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
621                    ),
622                    x.granularity as u32,
623                    x.false_positive_rate,
624                )
625                .context(InvalidIndexOptionSnafu)?,
626            },
627            v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
628                column_name: i.column_name,
629            },
630            v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
631                column_name: s.column_name,
632                options: SkippingIndexOptions::new(
633                    s.granularity as u32,
634                    s.false_positive_rate,
635                    as_skipping_index_type(
636                        PbSkippingIndexType::try_from(s.skipping_index_type)
637                            .context(DecodeProtoSnafu)?,
638                    ),
639                )
640                .context(InvalidIndexOptionSnafu)?,
641            },
642        };
643
644        Ok(opt)
645    }
646}
647
648#[derive(Debug, PartialEq, Eq, Clone)]
649pub enum UnsetIndexOption {
650    Fulltext { column_name: String },
651    Inverted { column_name: String },
652    Skipping { column_name: String },
653}
654
655impl UnsetIndexOption {
656    pub fn column_name(&self) -> &String {
657        match self {
658            UnsetIndexOption::Fulltext { column_name } => column_name,
659            UnsetIndexOption::Inverted { column_name } => column_name,
660            UnsetIndexOption::Skipping { column_name } => column_name,
661        }
662    }
663
664    pub fn is_fulltext(&self) -> bool {
665        match self {
666            UnsetIndexOption::Fulltext { .. } => true,
667            UnsetIndexOption::Inverted { .. } => false,
668            UnsetIndexOption::Skipping { .. } => false,
669        }
670    }
671}
672
673impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
674    type Error = MetadataError;
675
676    fn try_from(value: v1::UnsetIndex) -> Result<Self> {
677        let option = value.options.context(InvalidRawRegionRequestSnafu {
678            err: "missing options in UnsetIndex",
679        })?;
680
681        let opt = match option {
682            v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
683                column_name: f.column_name,
684            },
685            v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
686                column_name: i.column_name,
687            },
688            v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
689                column_name: s.column_name,
690            },
691        };
692
693        Ok(opt)
694    }
695}
696
697impl AlterKind {
698    /// Returns an error if the alter kind is invalid.
699    ///
700    /// It allows adding column if not exists and dropping column if exists.
701    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
702        match self {
703            AlterKind::AddColumns { columns } => {
704                for col_to_add in columns {
705                    col_to_add.validate(metadata)?;
706                }
707            }
708            AlterKind::DropColumns { names } => {
709                for name in names {
710                    Self::validate_column_to_drop(name, metadata)?;
711                }
712            }
713            AlterKind::ModifyColumnTypes { columns } => {
714                for col_to_change in columns {
715                    col_to_change.validate(metadata)?;
716                }
717            }
718            AlterKind::SetRegionOptions { .. } => {}
719            AlterKind::UnsetRegionOptions { .. } => {}
720            AlterKind::SetIndexes { options } => {
721                for option in options {
722                    Self::validate_column_alter_index_option(
723                        option.column_name(),
724                        metadata,
725                        option.is_fulltext(),
726                    )?;
727                }
728            }
729            AlterKind::UnsetIndexes { options } => {
730                for option in options {
731                    Self::validate_column_alter_index_option(
732                        option.column_name(),
733                        metadata,
734                        option.is_fulltext(),
735                    )?;
736                }
737            }
738            AlterKind::DropDefaults { names } => {
739                names
740                    .iter()
741                    .try_for_each(|name| Self::validate_column_to_drop(name, metadata))?;
742            }
743        }
744        Ok(())
745    }
746
747    /// Returns true if we need to apply the alteration to the region.
748    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
749        debug_assert!(self.validate(metadata).is_ok());
750        match self {
751            AlterKind::AddColumns { columns } => columns
752                .iter()
753                .any(|col_to_add| col_to_add.need_alter(metadata)),
754            AlterKind::DropColumns { names } => names
755                .iter()
756                .any(|name| metadata.column_by_name(name).is_some()),
757            AlterKind::ModifyColumnTypes { columns } => columns
758                .iter()
759                .any(|col_to_change| col_to_change.need_alter(metadata)),
760            AlterKind::SetRegionOptions { .. } => {
761                // we need to update region options for `ChangeTableOptions`.
762                // todo: we need to check if ttl has ever changed.
763                true
764            }
765            AlterKind::UnsetRegionOptions { .. } => true,
766            AlterKind::SetIndexes { options, .. } => options
767                .iter()
768                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
769            AlterKind::UnsetIndexes { options } => options
770                .iter()
771                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
772            AlterKind::DropDefaults { names } => names
773                .iter()
774                .any(|name| metadata.column_by_name(name).is_some()),
775        }
776    }
777
778    /// Returns an error if the column to drop is invalid.
779    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
780        let Some(column) = metadata.column_by_name(name) else {
781            return Ok(());
782        };
783        ensure!(
784            column.semantic_type == SemanticType::Field,
785            InvalidRegionRequestSnafu {
786                region_id: metadata.region_id,
787                err: format!("column {} is not a field and could not be dropped", name),
788            }
789        );
790        Ok(())
791    }
792
793    /// Returns an error if the column's alter index option is invalid.
794    fn validate_column_alter_index_option(
795        column_name: &String,
796        metadata: &RegionMetadata,
797        is_fulltext: bool,
798    ) -> Result<()> {
799        let column = metadata
800            .column_by_name(column_name)
801            .context(InvalidRegionRequestSnafu {
802                region_id: metadata.region_id,
803                err: format!("column {} not found", column_name),
804            })?;
805
806        if is_fulltext {
807            ensure!(
808                column.column_schema.data_type.is_string(),
809                InvalidRegionRequestSnafu {
810                    region_id: metadata.region_id,
811                    err: format!(
812                        "cannot change alter index options for non-string column {}",
813                        column_name
814                    ),
815                }
816            );
817        }
818
819        Ok(())
820    }
821}
822
823impl TryFrom<alter_request::Kind> for AlterKind {
824    type Error = MetadataError;
825
826    fn try_from(kind: alter_request::Kind) -> Result<Self> {
827        let alter_kind = match kind {
828            alter_request::Kind::AddColumns(x) => {
829                let columns = x
830                    .add_columns
831                    .into_iter()
832                    .map(|x| x.try_into())
833                    .collect::<Result<Vec<_>>>()?;
834                AlterKind::AddColumns { columns }
835            }
836            alter_request::Kind::ModifyColumnTypes(x) => {
837                let columns = x
838                    .modify_column_types
839                    .into_iter()
840                    .map(|x| x.into())
841                    .collect::<Vec<_>>();
842                AlterKind::ModifyColumnTypes { columns }
843            }
844            alter_request::Kind::DropColumns(x) => {
845                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
846                AlterKind::DropColumns { names }
847            }
848            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
849                options: options
850                    .table_options
851                    .iter()
852                    .map(TryFrom::try_from)
853                    .collect::<Result<Vec<_>>>()?,
854            },
855            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
856                keys: options
857                    .keys
858                    .iter()
859                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
860                    .collect::<Result<Vec<_>>>()?,
861            },
862            alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
863                options: vec![SetIndexOption::try_from(o)?],
864            },
865            alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
866                options: vec![UnsetIndexOption::try_from(o)?],
867            },
868            alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
869                options: o
870                    .set_indexes
871                    .into_iter()
872                    .map(SetIndexOption::try_from)
873                    .collect::<Result<Vec<_>>>()?,
874            },
875            alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
876                options: o
877                    .unset_indexes
878                    .into_iter()
879                    .map(UnsetIndexOption::try_from)
880                    .collect::<Result<Vec<_>>>()?,
881            },
882            alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
883                names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
884            },
885        };
886
887        Ok(alter_kind)
888    }
889}
890
891/// Adds a column.
892#[derive(Debug, PartialEq, Eq, Clone)]
893pub struct AddColumn {
894    /// Metadata of the column to add.
895    pub column_metadata: ColumnMetadata,
896    /// Location to add the column. If location is None, the region adds
897    /// the column to the last.
898    pub location: Option<AddColumnLocation>,
899}
900
901impl AddColumn {
902    /// Returns an error if the column to add is invalid.
903    ///
904    /// It allows adding existing columns. However, the existing column must have the same metadata
905    /// and the location must be None.
906    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
907        ensure!(
908            self.column_metadata.column_schema.is_nullable()
909                || self
910                    .column_metadata
911                    .column_schema
912                    .default_constraint()
913                    .is_some(),
914            InvalidRegionRequestSnafu {
915                region_id: metadata.region_id,
916                err: format!(
917                    "no default value for column {}",
918                    self.column_metadata.column_schema.name
919                ),
920            }
921        );
922
923        if let Some(existing_column) =
924            metadata.column_by_name(&self.column_metadata.column_schema.name)
925        {
926            // If the column already exists.
927            ensure!(
928                *existing_column == self.column_metadata,
929                InvalidRegionRequestSnafu {
930                    region_id: metadata.region_id,
931                    err: format!(
932                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
933                        self.column_metadata.column_schema.name, existing_column, self.column_metadata,
934                    ),
935                }
936            );
937            ensure!(
938                self.location.is_none(),
939                InvalidRegionRequestSnafu {
940                    region_id: metadata.region_id,
941                    err: format!(
942                        "column {} already exists, but location is specified",
943                        self.column_metadata.column_schema.name
944                    ),
945                }
946            );
947        }
948
949        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
950            // Ensures the existing column has the same name.
951            ensure!(
952                existing_column.column_schema.name == self.column_metadata.column_schema.name,
953                InvalidRegionRequestSnafu {
954                    region_id: metadata.region_id,
955                    err: format!(
956                        "column id {} already exists with different name {}",
957                        self.column_metadata.column_id, existing_column.column_schema.name
958                    ),
959                }
960            );
961        }
962
963        Ok(())
964    }
965
966    /// Returns true if no column to add to the region.
967    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
968        debug_assert!(self.validate(metadata).is_ok());
969        metadata
970            .column_by_name(&self.column_metadata.column_schema.name)
971            .is_none()
972    }
973}
974
975impl TryFrom<v1::region::AddColumn> for AddColumn {
976    type Error = MetadataError;
977
978    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
979        let column_def = add_column
980            .column_def
981            .context(InvalidRawRegionRequestSnafu {
982                err: "missing column_def in AddColumn",
983            })?;
984
985        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
986        let location = add_column
987            .location
988            .map(AddColumnLocation::try_from)
989            .transpose()?;
990
991        Ok(AddColumn {
992            column_metadata,
993            location,
994        })
995    }
996}
997
998/// Location to add a column.
999#[derive(Debug, PartialEq, Eq, Clone)]
1000pub enum AddColumnLocation {
1001    /// Add the column to the first position of columns.
1002    First,
1003    /// Add the column after specific column.
1004    After {
1005        /// Add the column after this column.
1006        column_name: String,
1007    },
1008}
1009
1010impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1011    type Error = MetadataError;
1012
1013    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1014        let location_type = LocationType::try_from(location.location_type)
1015            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1016        let add_column_location = match location_type {
1017            LocationType::First => AddColumnLocation::First,
1018            LocationType::After => AddColumnLocation::After {
1019                column_name: location.after_column_name,
1020            },
1021        };
1022
1023        Ok(add_column_location)
1024    }
1025}
1026
1027/// Change a column's datatype.
1028#[derive(Debug, PartialEq, Eq, Clone)]
1029pub struct ModifyColumnType {
1030    /// Schema of the column to modify.
1031    pub column_name: String,
1032    /// Column will be changed to this type.
1033    pub target_type: ConcreteDataType,
1034}
1035
1036impl ModifyColumnType {
1037    /// Returns an error if the column's datatype to change is invalid.
1038    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1039        let column_meta = metadata
1040            .column_by_name(&self.column_name)
1041            .with_context(|| InvalidRegionRequestSnafu {
1042                region_id: metadata.region_id,
1043                err: format!("column {} not found", self.column_name),
1044            })?;
1045
1046        ensure!(
1047            matches!(column_meta.semantic_type, SemanticType::Field),
1048            InvalidRegionRequestSnafu {
1049                region_id: metadata.region_id,
1050                err: "'timestamp' or 'tag' column cannot change type".to_string()
1051            }
1052        );
1053        ensure!(
1054            column_meta
1055                .column_schema
1056                .data_type
1057                .can_arrow_type_cast_to(&self.target_type),
1058            InvalidRegionRequestSnafu {
1059                region_id: metadata.region_id,
1060                err: format!(
1061                    "column '{}' cannot be cast automatically to type '{}'",
1062                    self.column_name, self.target_type
1063                ),
1064            }
1065        );
1066
1067        Ok(())
1068    }
1069
1070    /// Returns true if no column's datatype to change to the region.
1071    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1072        debug_assert!(self.validate(metadata).is_ok());
1073        metadata.column_by_name(&self.column_name).is_some()
1074    }
1075}
1076
1077impl From<v1::ModifyColumnType> for ModifyColumnType {
1078    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1079        let target_type = ColumnDataTypeWrapper::new(
1080            modify_column_type.target_type(),
1081            modify_column_type.target_type_extension,
1082        )
1083        .into();
1084
1085        ModifyColumnType {
1086            column_name: modify_column_type.column_name,
1087            target_type,
1088        }
1089    }
1090}
1091
1092#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1093pub enum SetRegionOption {
1094    Ttl(Option<TimeToLive>),
1095    // Modifying TwscOptions with values as (option name, new value).
1096    Twsc(String, String),
1097}
1098
1099impl TryFrom<&PbOption> for SetRegionOption {
1100    type Error = MetadataError;
1101
1102    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1103        let PbOption { key, value } = value;
1104        match key.as_str() {
1105            TTL_KEY => {
1106                let ttl = TimeToLive::from_humantime_or_str(value)
1107                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1108
1109                Ok(Self::Ttl(Some(ttl)))
1110            }
1111            TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1112                Ok(Self::Twsc(key.to_string(), value.to_string()))
1113            }
1114            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1115        }
1116    }
1117}
1118
1119impl From<&UnsetRegionOption> for SetRegionOption {
1120    fn from(unset_option: &UnsetRegionOption) -> Self {
1121        match unset_option {
1122            UnsetRegionOption::TwcsTriggerFileNum => {
1123                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1124            }
1125            UnsetRegionOption::TwcsMaxOutputFileSize => {
1126                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1127            }
1128            UnsetRegionOption::TwcsTimeWindow => {
1129                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1130            }
1131            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1132        }
1133    }
1134}
1135
1136impl TryFrom<&str> for UnsetRegionOption {
1137    type Error = MetadataError;
1138
1139    fn try_from(key: &str) -> Result<Self> {
1140        match key.to_ascii_lowercase().as_str() {
1141            TTL_KEY => Ok(Self::Ttl),
1142            TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1143            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1144            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1145            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1146        }
1147    }
1148}
1149
1150#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1151pub enum UnsetRegionOption {
1152    TwcsTriggerFileNum,
1153    TwcsMaxOutputFileSize,
1154    TwcsTimeWindow,
1155    Ttl,
1156}
1157
1158impl UnsetRegionOption {
1159    pub fn as_str(&self) -> &str {
1160        match self {
1161            Self::Ttl => TTL_KEY,
1162            Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1163            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1164            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1165        }
1166    }
1167}
1168
1169impl Display for UnsetRegionOption {
1170    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1171        write!(f, "{}", self.as_str())
1172    }
1173}
1174
1175#[derive(Debug, Clone, Default)]
1176pub struct RegionFlushRequest {
1177    pub row_group_size: Option<usize>,
1178}
1179
1180#[derive(Debug)]
1181pub struct RegionCompactRequest {
1182    pub options: compact_request::Options,
1183}
1184
1185impl Default for RegionCompactRequest {
1186    fn default() -> Self {
1187        Self {
1188            // Default to regular compaction.
1189            options: compact_request::Options::Regular(Default::default()),
1190        }
1191    }
1192}
1193
1194/// Truncate region request.
1195#[derive(Debug)]
1196pub struct RegionTruncateRequest {}
1197
1198/// Catchup region request.
1199///
1200/// Makes a readonly region to catch up to leader region changes.
1201/// There is no effect if it operating on a leader region.
1202#[derive(Debug, Clone, Copy)]
1203pub struct RegionCatchupRequest {
1204    /// Sets it to writable if it's available after it has caught up with all changes.
1205    pub set_writable: bool,
1206    /// The `entry_id` that was expected to reply to.
1207    /// `None` stands replaying to latest.
1208    pub entry_id: Option<entry::Id>,
1209    /// Used for metrics metadata region.
1210    /// The `entry_id` that was expected to reply to.
1211    /// `None` stands replaying to latest.
1212    pub metadata_entry_id: Option<entry::Id>,
1213    /// The hint for replaying memtable.
1214    pub location_id: Option<u64>,
1215}
1216
1217/// Get sequences of regions by region ids.
1218#[derive(Debug, Clone)]
1219pub struct RegionSequencesRequest {
1220    pub region_ids: Vec<RegionId>,
1221}
1222
1223#[derive(Debug, Clone)]
1224pub struct RegionBulkInsertsRequest {
1225    pub region_id: RegionId,
1226    pub payload: DfRecordBatch,
1227    pub raw_data: ArrowIpc,
1228}
1229
1230impl RegionBulkInsertsRequest {
1231    pub fn estimated_size(&self) -> usize {
1232        self.payload.get_array_memory_size()
1233    }
1234}
1235
1236impl fmt::Display for RegionRequest {
1237    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1238        match self {
1239            RegionRequest::Put(_) => write!(f, "Put"),
1240            RegionRequest::Delete(_) => write!(f, "Delete"),
1241            RegionRequest::Create(_) => write!(f, "Create"),
1242            RegionRequest::Drop(_) => write!(f, "Drop"),
1243            RegionRequest::Open(_) => write!(f, "Open"),
1244            RegionRequest::Close(_) => write!(f, "Close"),
1245            RegionRequest::Alter(_) => write!(f, "Alter"),
1246            RegionRequest::Flush(_) => write!(f, "Flush"),
1247            RegionRequest::Compact(_) => write!(f, "Compact"),
1248            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1249            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1250            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1251        }
1252    }
1253}
1254
1255#[cfg(test)]
1256mod tests {
1257    use api::v1::region::RegionColumnDef;
1258    use api::v1::{ColumnDataType, ColumnDef};
1259    use datatypes::prelude::ConcreteDataType;
1260    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1261
1262    use super::*;
1263    use crate::metadata::RegionMetadataBuilder;
1264
1265    #[test]
1266    fn test_from_proto_location() {
1267        let proto_location = v1::AddColumnLocation {
1268            location_type: LocationType::First as i32,
1269            after_column_name: String::default(),
1270        };
1271        let location = AddColumnLocation::try_from(proto_location).unwrap();
1272        assert_eq!(location, AddColumnLocation::First);
1273
1274        let proto_location = v1::AddColumnLocation {
1275            location_type: 10,
1276            after_column_name: String::default(),
1277        };
1278        AddColumnLocation::try_from(proto_location).unwrap_err();
1279
1280        let proto_location = v1::AddColumnLocation {
1281            location_type: LocationType::After as i32,
1282            after_column_name: "a".to_string(),
1283        };
1284        let location = AddColumnLocation::try_from(proto_location).unwrap();
1285        assert_eq!(
1286            location,
1287            AddColumnLocation::After {
1288                column_name: "a".to_string()
1289            }
1290        );
1291    }
1292
1293    #[test]
1294    fn test_from_none_proto_add_column() {
1295        AddColumn::try_from(v1::region::AddColumn {
1296            column_def: None,
1297            location: None,
1298        })
1299        .unwrap_err();
1300    }
1301
1302    #[test]
1303    fn test_from_proto_alter_request() {
1304        RegionAlterRequest::try_from(AlterRequest {
1305            region_id: 0,
1306            schema_version: 1,
1307            kind: None,
1308        })
1309        .unwrap_err();
1310
1311        let request = RegionAlterRequest::try_from(AlterRequest {
1312            region_id: 0,
1313            schema_version: 1,
1314            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1315                add_columns: vec![v1::region::AddColumn {
1316                    column_def: Some(RegionColumnDef {
1317                        column_def: Some(ColumnDef {
1318                            name: "a".to_string(),
1319                            data_type: ColumnDataType::String as i32,
1320                            is_nullable: true,
1321                            default_constraint: vec![],
1322                            semantic_type: SemanticType::Field as i32,
1323                            comment: String::new(),
1324                            ..Default::default()
1325                        }),
1326                        column_id: 1,
1327                    }),
1328                    location: Some(v1::AddColumnLocation {
1329                        location_type: LocationType::First as i32,
1330                        after_column_name: String::default(),
1331                    }),
1332                }],
1333            })),
1334        })
1335        .unwrap();
1336
1337        assert_eq!(
1338            request,
1339            RegionAlterRequest {
1340                kind: AlterKind::AddColumns {
1341                    columns: vec![AddColumn {
1342                        column_metadata: ColumnMetadata {
1343                            column_schema: ColumnSchema::new(
1344                                "a",
1345                                ConcreteDataType::string_datatype(),
1346                                true,
1347                            ),
1348                            semantic_type: SemanticType::Field,
1349                            column_id: 1,
1350                        },
1351                        location: Some(AddColumnLocation::First),
1352                    }]
1353                },
1354            }
1355        );
1356    }
1357
1358    /// Returns a new region metadata for testing. Metadata:
1359    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1360    fn new_metadata() -> RegionMetadata {
1361        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1362        builder
1363            .push_column_metadata(ColumnMetadata {
1364                column_schema: ColumnSchema::new(
1365                    "ts",
1366                    ConcreteDataType::timestamp_millisecond_datatype(),
1367                    false,
1368                ),
1369                semantic_type: SemanticType::Timestamp,
1370                column_id: 1,
1371            })
1372            .push_column_metadata(ColumnMetadata {
1373                column_schema: ColumnSchema::new(
1374                    "tag_0",
1375                    ConcreteDataType::string_datatype(),
1376                    true,
1377                ),
1378                semantic_type: SemanticType::Tag,
1379                column_id: 2,
1380            })
1381            .push_column_metadata(ColumnMetadata {
1382                column_schema: ColumnSchema::new(
1383                    "field_0",
1384                    ConcreteDataType::string_datatype(),
1385                    true,
1386                ),
1387                semantic_type: SemanticType::Field,
1388                column_id: 3,
1389            })
1390            .push_column_metadata(ColumnMetadata {
1391                column_schema: ColumnSchema::new(
1392                    "field_1",
1393                    ConcreteDataType::boolean_datatype(),
1394                    true,
1395                ),
1396                semantic_type: SemanticType::Field,
1397                column_id: 4,
1398            })
1399            .primary_key(vec![2]);
1400        builder.build().unwrap()
1401    }
1402
1403    #[test]
1404    fn test_add_column_validate() {
1405        let metadata = new_metadata();
1406        let add_column = AddColumn {
1407            column_metadata: ColumnMetadata {
1408                column_schema: ColumnSchema::new(
1409                    "tag_1",
1410                    ConcreteDataType::string_datatype(),
1411                    true,
1412                ),
1413                semantic_type: SemanticType::Tag,
1414                column_id: 5,
1415            },
1416            location: None,
1417        };
1418        add_column.validate(&metadata).unwrap();
1419        assert!(add_column.need_alter(&metadata));
1420
1421        // Add not null column.
1422        AddColumn {
1423            column_metadata: ColumnMetadata {
1424                column_schema: ColumnSchema::new(
1425                    "tag_1",
1426                    ConcreteDataType::string_datatype(),
1427                    false,
1428                ),
1429                semantic_type: SemanticType::Tag,
1430                column_id: 5,
1431            },
1432            location: None,
1433        }
1434        .validate(&metadata)
1435        .unwrap_err();
1436
1437        // Add existing column.
1438        let add_column = AddColumn {
1439            column_metadata: ColumnMetadata {
1440                column_schema: ColumnSchema::new(
1441                    "tag_0",
1442                    ConcreteDataType::string_datatype(),
1443                    true,
1444                ),
1445                semantic_type: SemanticType::Tag,
1446                column_id: 2,
1447            },
1448            location: None,
1449        };
1450        add_column.validate(&metadata).unwrap();
1451        assert!(!add_column.need_alter(&metadata));
1452    }
1453
1454    #[test]
1455    fn test_add_duplicate_columns() {
1456        let kind = AlterKind::AddColumns {
1457            columns: vec![
1458                AddColumn {
1459                    column_metadata: ColumnMetadata {
1460                        column_schema: ColumnSchema::new(
1461                            "tag_1",
1462                            ConcreteDataType::string_datatype(),
1463                            true,
1464                        ),
1465                        semantic_type: SemanticType::Tag,
1466                        column_id: 5,
1467                    },
1468                    location: None,
1469                },
1470                AddColumn {
1471                    column_metadata: ColumnMetadata {
1472                        column_schema: ColumnSchema::new(
1473                            "tag_1",
1474                            ConcreteDataType::string_datatype(),
1475                            true,
1476                        ),
1477                        semantic_type: SemanticType::Field,
1478                        column_id: 6,
1479                    },
1480                    location: None,
1481                },
1482            ],
1483        };
1484        let metadata = new_metadata();
1485        kind.validate(&metadata).unwrap();
1486        assert!(kind.need_alter(&metadata));
1487    }
1488
1489    #[test]
1490    fn test_add_existing_column_different_metadata() {
1491        let metadata = new_metadata();
1492
1493        // Add existing column with different id.
1494        let kind = AlterKind::AddColumns {
1495            columns: vec![AddColumn {
1496                column_metadata: ColumnMetadata {
1497                    column_schema: ColumnSchema::new(
1498                        "tag_0",
1499                        ConcreteDataType::string_datatype(),
1500                        true,
1501                    ),
1502                    semantic_type: SemanticType::Tag,
1503                    column_id: 4,
1504                },
1505                location: None,
1506            }],
1507        };
1508        kind.validate(&metadata).unwrap_err();
1509
1510        // Add existing column with different type.
1511        let kind = AlterKind::AddColumns {
1512            columns: vec![AddColumn {
1513                column_metadata: ColumnMetadata {
1514                    column_schema: ColumnSchema::new(
1515                        "tag_0",
1516                        ConcreteDataType::int64_datatype(),
1517                        true,
1518                    ),
1519                    semantic_type: SemanticType::Tag,
1520                    column_id: 2,
1521                },
1522                location: None,
1523            }],
1524        };
1525        kind.validate(&metadata).unwrap_err();
1526
1527        // Add existing column with different name.
1528        let kind = AlterKind::AddColumns {
1529            columns: vec![AddColumn {
1530                column_metadata: ColumnMetadata {
1531                    column_schema: ColumnSchema::new(
1532                        "tag_1",
1533                        ConcreteDataType::string_datatype(),
1534                        true,
1535                    ),
1536                    semantic_type: SemanticType::Tag,
1537                    column_id: 2,
1538                },
1539                location: None,
1540            }],
1541        };
1542        kind.validate(&metadata).unwrap_err();
1543    }
1544
1545    #[test]
1546    fn test_add_existing_column_with_location() {
1547        let metadata = new_metadata();
1548        let kind = AlterKind::AddColumns {
1549            columns: vec![AddColumn {
1550                column_metadata: ColumnMetadata {
1551                    column_schema: ColumnSchema::new(
1552                        "tag_0",
1553                        ConcreteDataType::string_datatype(),
1554                        true,
1555                    ),
1556                    semantic_type: SemanticType::Tag,
1557                    column_id: 2,
1558                },
1559                location: Some(AddColumnLocation::First),
1560            }],
1561        };
1562        kind.validate(&metadata).unwrap_err();
1563    }
1564
1565    #[test]
1566    fn test_validate_drop_column() {
1567        let metadata = new_metadata();
1568        let kind = AlterKind::DropColumns {
1569            names: vec!["xxxx".to_string()],
1570        };
1571        kind.validate(&metadata).unwrap();
1572        assert!(!kind.need_alter(&metadata));
1573
1574        AlterKind::DropColumns {
1575            names: vec!["tag_0".to_string()],
1576        }
1577        .validate(&metadata)
1578        .unwrap_err();
1579
1580        let kind = AlterKind::DropColumns {
1581            names: vec!["field_0".to_string()],
1582        };
1583        kind.validate(&metadata).unwrap();
1584        assert!(kind.need_alter(&metadata));
1585    }
1586
1587    #[test]
1588    fn test_validate_modify_column_type() {
1589        let metadata = new_metadata();
1590        AlterKind::ModifyColumnTypes {
1591            columns: vec![ModifyColumnType {
1592                column_name: "xxxx".to_string(),
1593                target_type: ConcreteDataType::string_datatype(),
1594            }],
1595        }
1596        .validate(&metadata)
1597        .unwrap_err();
1598
1599        AlterKind::ModifyColumnTypes {
1600            columns: vec![ModifyColumnType {
1601                column_name: "field_1".to_string(),
1602                target_type: ConcreteDataType::date_datatype(),
1603            }],
1604        }
1605        .validate(&metadata)
1606        .unwrap_err();
1607
1608        AlterKind::ModifyColumnTypes {
1609            columns: vec![ModifyColumnType {
1610                column_name: "ts".to_string(),
1611                target_type: ConcreteDataType::date_datatype(),
1612            }],
1613        }
1614        .validate(&metadata)
1615        .unwrap_err();
1616
1617        AlterKind::ModifyColumnTypes {
1618            columns: vec![ModifyColumnType {
1619                column_name: "tag_0".to_string(),
1620                target_type: ConcreteDataType::date_datatype(),
1621            }],
1622        }
1623        .validate(&metadata)
1624        .unwrap_err();
1625
1626        let kind = AlterKind::ModifyColumnTypes {
1627            columns: vec![ModifyColumnType {
1628                column_name: "field_0".to_string(),
1629                target_type: ConcreteDataType::int32_datatype(),
1630            }],
1631        };
1632        kind.validate(&metadata).unwrap();
1633        assert!(kind.need_alter(&metadata));
1634    }
1635
1636    #[test]
1637    fn test_validate_add_columns() {
1638        let kind = AlterKind::AddColumns {
1639            columns: vec![
1640                AddColumn {
1641                    column_metadata: ColumnMetadata {
1642                        column_schema: ColumnSchema::new(
1643                            "tag_1",
1644                            ConcreteDataType::string_datatype(),
1645                            true,
1646                        ),
1647                        semantic_type: SemanticType::Tag,
1648                        column_id: 5,
1649                    },
1650                    location: None,
1651                },
1652                AddColumn {
1653                    column_metadata: ColumnMetadata {
1654                        column_schema: ColumnSchema::new(
1655                            "field_2",
1656                            ConcreteDataType::string_datatype(),
1657                            true,
1658                        ),
1659                        semantic_type: SemanticType::Field,
1660                        column_id: 6,
1661                    },
1662                    location: None,
1663                },
1664            ],
1665        };
1666        let request = RegionAlterRequest { kind };
1667        let mut metadata = new_metadata();
1668        metadata.schema_version = 1;
1669        request.validate(&metadata).unwrap();
1670    }
1671
1672    #[test]
1673    fn test_validate_create_region() {
1674        let column_metadatas = vec![
1675            ColumnMetadata {
1676                column_schema: ColumnSchema::new(
1677                    "ts",
1678                    ConcreteDataType::timestamp_millisecond_datatype(),
1679                    false,
1680                ),
1681                semantic_type: SemanticType::Timestamp,
1682                column_id: 1,
1683            },
1684            ColumnMetadata {
1685                column_schema: ColumnSchema::new(
1686                    "tag_0",
1687                    ConcreteDataType::string_datatype(),
1688                    true,
1689                ),
1690                semantic_type: SemanticType::Tag,
1691                column_id: 2,
1692            },
1693            ColumnMetadata {
1694                column_schema: ColumnSchema::new(
1695                    "field_0",
1696                    ConcreteDataType::string_datatype(),
1697                    true,
1698                ),
1699                semantic_type: SemanticType::Field,
1700                column_id: 3,
1701            },
1702        ];
1703        let create = RegionCreateRequest {
1704            engine: "mito".to_string(),
1705            column_metadatas,
1706            primary_key: vec![3, 4],
1707            options: HashMap::new(),
1708            table_dir: "path".to_string(),
1709            path_type: PathType::Bare,
1710        };
1711
1712        assert!(create.validate().is_err());
1713    }
1714
1715    #[test]
1716    fn test_validate_modify_column_fulltext_options() {
1717        let kind = AlterKind::SetIndexes {
1718            options: vec![SetIndexOption::Fulltext {
1719                column_name: "tag_0".to_string(),
1720                options: FulltextOptions::new_unchecked(
1721                    true,
1722                    FulltextAnalyzer::Chinese,
1723                    false,
1724                    FulltextBackend::Bloom,
1725                    1000,
1726                    0.01,
1727                ),
1728            }],
1729        };
1730        let request = RegionAlterRequest { kind };
1731        let mut metadata = new_metadata();
1732        metadata.schema_version = 1;
1733        request.validate(&metadata).unwrap();
1734
1735        let kind = AlterKind::UnsetIndexes {
1736            options: vec![UnsetIndexOption::Fulltext {
1737                column_name: "tag_0".to_string(),
1738            }],
1739        };
1740        let request = RegionAlterRequest { kind };
1741        let mut metadata = new_metadata();
1742        metadata.schema_version = 1;
1743        request.validate(&metadata).unwrap();
1744    }
1745}