store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest,
26    CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest,
27    DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
28};
29use api::v1::{
30    self, set_index, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption,
31    Rows, SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::TimeToLive;
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use serde::{Deserialize, Serialize};
40use snafu::{ensure, OptionExt, ResultExt};
41use strum::{AsRefStr, IntoStaticStr};
42
43use crate::logstore::entry;
44use crate::metadata::{
45    ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidIndexOptionSnafu,
46    InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
47    InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu,
48};
49use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
50use crate::metrics;
51use crate::mito_engine_options::{
52    TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
53};
54use crate::path_utils::region_dir;
55use crate::storage::{ColumnId, RegionId, ScanRequest};
56
57#[derive(Debug, IntoStaticStr)]
58pub enum BatchRegionDdlRequest {
59    Create(Vec<(RegionId, RegionCreateRequest)>),
60    Drop(Vec<(RegionId, RegionDropRequest)>),
61    Alter(Vec<(RegionId, RegionAlterRequest)>),
62}
63
64impl BatchRegionDdlRequest {
65    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
66    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
67        match body {
68            region_request::Body::Creates(creates) => {
69                let requests = creates
70                    .requests
71                    .into_iter()
72                    .map(parse_region_create)
73                    .collect::<Result<Vec<_>>>()?;
74                Ok(Some(Self::Create(requests)))
75            }
76            region_request::Body::Drops(drops) => {
77                let requests = drops
78                    .requests
79                    .into_iter()
80                    .map(parse_region_drop)
81                    .collect::<Result<Vec<_>>>()?;
82                Ok(Some(Self::Drop(requests)))
83            }
84            region_request::Body::Alters(alters) => {
85                let requests = alters
86                    .requests
87                    .into_iter()
88                    .map(parse_region_alter)
89                    .collect::<Result<Vec<_>>>()?;
90                Ok(Some(Self::Alter(requests)))
91            }
92            _ => Ok(None),
93        }
94    }
95
96    pub fn request_type(&self) -> &'static str {
97        self.into()
98    }
99
100    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
101        match self {
102            Self::Create(requests) => requests
103                .into_iter()
104                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
105                .collect(),
106            Self::Drop(requests) => requests
107                .into_iter()
108                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
109                .collect(),
110            Self::Alter(requests) => requests
111                .into_iter()
112                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
113                .collect(),
114        }
115    }
116}
117
118#[derive(Debug, IntoStaticStr)]
119pub enum RegionRequest {
120    Put(RegionPutRequest),
121    Delete(RegionDeleteRequest),
122    Create(RegionCreateRequest),
123    Drop(RegionDropRequest),
124    Open(RegionOpenRequest),
125    Close(RegionCloseRequest),
126    Alter(RegionAlterRequest),
127    Flush(RegionFlushRequest),
128    Compact(RegionCompactRequest),
129    Truncate(RegionTruncateRequest),
130    Catchup(RegionCatchupRequest),
131    BulkInserts(RegionBulkInsertsRequest),
132}
133
134impl RegionRequest {
135    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
136    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
137    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
138        match body {
139            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
140            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
141            region_request::Body::Create(create) => make_region_create(create),
142            region_request::Body::Drop(drop) => make_region_drop(drop),
143            region_request::Body::Open(open) => make_region_open(open),
144            region_request::Body::Close(close) => make_region_close(close),
145            region_request::Body::Alter(alter) => make_region_alter(alter),
146            region_request::Body::Flush(flush) => make_region_flush(flush),
147            region_request::Body::Compact(compact) => make_region_compact(compact),
148            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
149            region_request::Body::Creates(creates) => make_region_creates(creates),
150            region_request::Body::Drops(drops) => make_region_drops(drops),
151            region_request::Body::Alters(alters) => make_region_alters(alters),
152            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
153            region_request::Body::Sync(_) => UnexpectedSnafu {
154                reason: "Sync request should be handled separately by RegionServer",
155            }
156            .fail(),
157            region_request::Body::ListMetadata(_) => UnexpectedSnafu {
158                reason: "ListMetadata request should be handled separately by RegionServer",
159            }
160            .fail(),
161        }
162    }
163
164    /// Returns the type name of the request.
165    pub fn request_type(&self) -> &'static str {
166        self.into()
167    }
168}
169
170fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
171    let requests = inserts
172        .requests
173        .into_iter()
174        .filter_map(|r| {
175            let region_id = r.region_id.into();
176            r.rows.map(|rows| {
177                (
178                    region_id,
179                    RegionRequest::Put(RegionPutRequest { rows, hint: None }),
180                )
181            })
182        })
183        .collect();
184    Ok(requests)
185}
186
187fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
188    let requests = deletes
189        .requests
190        .into_iter()
191        .filter_map(|r| {
192            let region_id = r.region_id.into();
193            r.rows.map(|rows| {
194                (
195                    region_id,
196                    RegionRequest::Delete(RegionDeleteRequest { rows }),
197                )
198            })
199        })
200        .collect();
201    Ok(requests)
202}
203
204fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
205    let column_metadatas = create
206        .column_defs
207        .into_iter()
208        .map(ColumnMetadata::try_from_column_def)
209        .collect::<Result<Vec<_>>>()?;
210    let region_id = create.region_id.into();
211    let region_dir = region_dir(&create.path, region_id);
212    Ok((
213        region_id,
214        RegionCreateRequest {
215            engine: create.engine,
216            column_metadatas,
217            primary_key: create.primary_key,
218            options: create.options,
219            region_dir,
220        },
221    ))
222}
223
224fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
225    let (region_id, request) = parse_region_create(create)?;
226    Ok(vec![(region_id, RegionRequest::Create(request))])
227}
228
229fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
230    let mut requests = Vec::with_capacity(creates.requests.len());
231    for create in creates.requests {
232        requests.extend(make_region_create(create)?);
233    }
234    Ok(requests)
235}
236
237fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
238    let region_id = drop.region_id.into();
239    Ok((
240        region_id,
241        RegionDropRequest {
242            fast_path: drop.fast_path,
243        },
244    ))
245}
246
247fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
248    let (region_id, request) = parse_region_drop(drop)?;
249    Ok(vec![(region_id, RegionRequest::Drop(request))])
250}
251
252fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
253    let mut requests = Vec::with_capacity(drops.requests.len());
254    for drop in drops.requests {
255        requests.extend(make_region_drop(drop)?);
256    }
257    Ok(requests)
258}
259
260fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
261    let region_id = open.region_id.into();
262    let region_dir = region_dir(&open.path, region_id);
263    Ok(vec![(
264        region_id,
265        RegionRequest::Open(RegionOpenRequest {
266            engine: open.engine,
267            region_dir,
268            options: open.options,
269            skip_wal_replay: false,
270        }),
271    )])
272}
273
274fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
275    let region_id = close.region_id.into();
276    Ok(vec![(
277        region_id,
278        RegionRequest::Close(RegionCloseRequest {}),
279    )])
280}
281
282fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
283    let region_id = alter.region_id.into();
284    let request = RegionAlterRequest::try_from(alter)?;
285    Ok((region_id, request))
286}
287
288fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
289    let (region_id, request) = parse_region_alter(alter)?;
290    Ok(vec![(region_id, RegionRequest::Alter(request))])
291}
292
293fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
294    let mut requests = Vec::with_capacity(alters.requests.len());
295    for alter in alters.requests {
296        requests.extend(make_region_alter(alter)?);
297    }
298    Ok(requests)
299}
300
301fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
302    let region_id = flush.region_id.into();
303    Ok(vec![(
304        region_id,
305        RegionRequest::Flush(RegionFlushRequest {
306            row_group_size: None,
307        }),
308    )])
309}
310
311fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
312    let region_id = compact.region_id.into();
313    let options = compact
314        .options
315        .unwrap_or(compact_request::Options::Regular(Default::default()));
316    Ok(vec![(
317        region_id,
318        RegionRequest::Compact(RegionCompactRequest { options }),
319    )])
320}
321
322fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
323    let region_id = truncate.region_id.into();
324    Ok(vec![(
325        region_id,
326        RegionRequest::Truncate(RegionTruncateRequest {}),
327    )])
328}
329
330/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
331fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
332    let region_id = request.region_id.into();
333    let Some(Body::ArrowIpc(request)) = request.body else {
334        return Ok(vec![]);
335    };
336
337    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
338        .with_label_values(&["decode"])
339        .start_timer();
340    let mut decoder =
341        FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
342    let payload = decoder
343        .try_decode_record_batch(&request.data_header, &request.payload)
344        .context(FlightCodecSnafu)?;
345    decoder_timer.observe_duration();
346    Ok(vec![(
347        region_id,
348        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
349            region_id,
350            payload,
351            raw_data: request,
352        }),
353    )])
354}
355
356/// Request to put data into a region.
357#[derive(Debug)]
358pub struct RegionPutRequest {
359    /// Rows to put.
360    pub rows: Rows,
361    /// Write hint.
362    pub hint: Option<WriteHint>,
363}
364
365#[derive(Debug)]
366pub struct RegionReadRequest {
367    pub request: ScanRequest,
368}
369
370/// Request to delete data from a region.
371#[derive(Debug)]
372pub struct RegionDeleteRequest {
373    /// Keys to rows to delete.
374    ///
375    /// Each row only contains primary key columns and a time index column.
376    pub rows: Rows,
377}
378
379#[derive(Debug, Clone)]
380pub struct RegionCreateRequest {
381    /// Region engine name
382    pub engine: String,
383    /// Columns in this region.
384    pub column_metadatas: Vec<ColumnMetadata>,
385    /// Columns in the primary key.
386    pub primary_key: Vec<ColumnId>,
387    /// Options of the created region.
388    pub options: HashMap<String, String>,
389    /// Directory for region's data home. Usually is composed by catalog and table id
390    pub region_dir: String,
391}
392
393impl RegionCreateRequest {
394    /// Checks whether the request is valid, returns an error if it is invalid.
395    pub fn validate(&self) -> Result<()> {
396        // time index must exist
397        ensure!(
398            self.column_metadatas
399                .iter()
400                .any(|x| x.semantic_type == SemanticType::Timestamp),
401            InvalidRegionRequestSnafu {
402                region_id: RegionId::new(0, 0),
403                err: "missing timestamp column in create region request".to_string(),
404            }
405        );
406
407        // build column id to indices
408        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
409        for (i, c) in self.column_metadatas.iter().enumerate() {
410            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
411                return InvalidRegionRequestSnafu {
412                    region_id: RegionId::new(0, 0),
413                    err: format!(
414                        "duplicate column id {} (at position {} and {}) in create region request",
415                        c.column_id, previous, i
416                    ),
417                }
418                .fail();
419            }
420        }
421
422        // primary key must exist
423        for column_id in &self.primary_key {
424            ensure!(
425                column_id_to_indices.contains_key(column_id),
426                InvalidRegionRequestSnafu {
427                    region_id: RegionId::new(0, 0),
428                    err: format!(
429                        "missing primary key column {} in create region request",
430                        column_id
431                    ),
432                }
433            );
434        }
435
436        Ok(())
437    }
438
439    /// Returns true when the region belongs to the metric engine's physical table.
440    pub fn is_physical_table(&self) -> bool {
441        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
442    }
443}
444
445#[derive(Debug, Clone)]
446pub struct RegionDropRequest {
447    pub fast_path: bool,
448}
449
450/// Open region request.
451#[derive(Debug, Clone)]
452pub struct RegionOpenRequest {
453    /// Region engine name
454    pub engine: String,
455    /// Data directory of the region.
456    pub region_dir: String,
457    /// Options of the opened region.
458    pub options: HashMap<String, String>,
459    /// To skip replaying the WAL.
460    pub skip_wal_replay: bool,
461}
462
463impl RegionOpenRequest {
464    /// Returns true when the region belongs to the metric engine's physical table.
465    pub fn is_physical_table(&self) -> bool {
466        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
467    }
468}
469
470/// Close region request.
471#[derive(Debug)]
472pub struct RegionCloseRequest {}
473
474/// Alter metadata of a region.
475#[derive(Debug, PartialEq, Eq, Clone)]
476pub struct RegionAlterRequest {
477    /// Kind of alteration to do.
478    pub kind: AlterKind,
479}
480
481impl RegionAlterRequest {
482    /// Checks whether the request is valid, returns an error if it is invalid.
483    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
484        self.kind.validate(metadata)?;
485
486        Ok(())
487    }
488
489    /// Returns true if we need to apply the request to the region.
490    ///
491    /// The `request` should be valid.
492    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
493        debug_assert!(self.validate(metadata).is_ok());
494        self.kind.need_alter(metadata)
495    }
496}
497
498impl TryFrom<AlterRequest> for RegionAlterRequest {
499    type Error = MetadataError;
500
501    fn try_from(value: AlterRequest) -> Result<Self> {
502        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
503            err: "missing kind in AlterRequest",
504        })?;
505
506        let kind = AlterKind::try_from(kind)?;
507        Ok(RegionAlterRequest { kind })
508    }
509}
510
511/// Kind of the alteration.
512#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
513pub enum AlterKind {
514    /// Add columns to the region.
515    AddColumns {
516        /// Columns to add.
517        columns: Vec<AddColumn>,
518    },
519    /// Drop columns from the region, only fields are allowed to drop.
520    DropColumns {
521        /// Name of columns to drop.
522        names: Vec<String>,
523    },
524    /// Change columns datatype form the region, only fields are allowed to change.
525    ModifyColumnTypes {
526        /// Columns to change.
527        columns: Vec<ModifyColumnType>,
528    },
529    /// Set region options.
530    SetRegionOptions { options: Vec<SetRegionOption> },
531    /// Unset region options.
532    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
533    /// Set index options.
534    SetIndex { options: ApiSetIndexOptions },
535    /// Unset index options.
536    UnsetIndex { options: ApiUnsetIndexOptions },
537    /// Drop column default value.
538    DropDefaults {
539        /// Name of columns to drop.
540        names: Vec<String>,
541    },
542}
543
544#[derive(Debug, PartialEq, Eq, Clone)]
545pub enum ApiSetIndexOptions {
546    Fulltext {
547        column_name: String,
548        options: FulltextOptions,
549    },
550    Inverted {
551        column_name: String,
552    },
553    Skipping {
554        column_name: String,
555        options: SkippingIndexOptions,
556    },
557}
558
559impl ApiSetIndexOptions {
560    pub fn column_name(&self) -> &String {
561        match self {
562            ApiSetIndexOptions::Fulltext { column_name, .. } => column_name,
563            ApiSetIndexOptions::Inverted { column_name } => column_name,
564            ApiSetIndexOptions::Skipping { column_name, .. } => column_name,
565        }
566    }
567
568    pub fn is_fulltext(&self) -> bool {
569        match self {
570            ApiSetIndexOptions::Fulltext { .. } => true,
571            ApiSetIndexOptions::Inverted { .. } => false,
572            ApiSetIndexOptions::Skipping { .. } => false,
573        }
574    }
575}
576
577#[derive(Debug, PartialEq, Eq, Clone)]
578pub enum ApiUnsetIndexOptions {
579    Fulltext { column_name: String },
580    Inverted { column_name: String },
581    Skipping { column_name: String },
582}
583
584impl ApiUnsetIndexOptions {
585    pub fn column_name(&self) -> &String {
586        match self {
587            ApiUnsetIndexOptions::Fulltext { column_name } => column_name,
588            ApiUnsetIndexOptions::Inverted { column_name } => column_name,
589            ApiUnsetIndexOptions::Skipping { column_name } => column_name,
590        }
591    }
592
593    pub fn is_fulltext(&self) -> bool {
594        match self {
595            ApiUnsetIndexOptions::Fulltext { .. } => true,
596            ApiUnsetIndexOptions::Inverted { .. } => false,
597            ApiUnsetIndexOptions::Skipping { .. } => false,
598        }
599    }
600}
601
602impl AlterKind {
603    /// Returns an error if the alter kind is invalid.
604    ///
605    /// It allows adding column if not exists and dropping column if exists.
606    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
607        match self {
608            AlterKind::AddColumns { columns } => {
609                for col_to_add in columns {
610                    col_to_add.validate(metadata)?;
611                }
612            }
613            AlterKind::DropColumns { names } => {
614                for name in names {
615                    Self::validate_column_to_drop(name, metadata)?;
616                }
617            }
618            AlterKind::ModifyColumnTypes { columns } => {
619                for col_to_change in columns {
620                    col_to_change.validate(metadata)?;
621                }
622            }
623            AlterKind::SetRegionOptions { .. } => {}
624            AlterKind::UnsetRegionOptions { .. } => {}
625            AlterKind::SetIndex { options } => {
626                Self::validate_column_alter_index_option(
627                    options.column_name(),
628                    metadata,
629                    options.is_fulltext(),
630                )?;
631            }
632            AlterKind::UnsetIndex { options } => {
633                Self::validate_column_alter_index_option(
634                    options.column_name(),
635                    metadata,
636                    options.is_fulltext(),
637                )?;
638            }
639            AlterKind::DropDefaults { names } => {
640                names
641                    .iter()
642                    .try_for_each(|name| Self::validate_column_to_drop(name, metadata))?;
643            }
644        }
645        Ok(())
646    }
647
648    /// Returns true if we need to apply the alteration to the region.
649    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
650        debug_assert!(self.validate(metadata).is_ok());
651        match self {
652            AlterKind::AddColumns { columns } => columns
653                .iter()
654                .any(|col_to_add| col_to_add.need_alter(metadata)),
655            AlterKind::DropColumns { names } => names
656                .iter()
657                .any(|name| metadata.column_by_name(name).is_some()),
658            AlterKind::ModifyColumnTypes { columns } => columns
659                .iter()
660                .any(|col_to_change| col_to_change.need_alter(metadata)),
661            AlterKind::SetRegionOptions { .. } => {
662                // we need to update region options for `ChangeTableOptions`.
663                // todo: we need to check if ttl has ever changed.
664                true
665            }
666            AlterKind::UnsetRegionOptions { .. } => true,
667            AlterKind::SetIndex { options, .. } => {
668                metadata.column_by_name(options.column_name()).is_some()
669            }
670            AlterKind::UnsetIndex { options } => {
671                metadata.column_by_name(options.column_name()).is_some()
672            }
673            AlterKind::DropDefaults { names } => names
674                .iter()
675                .any(|name| metadata.column_by_name(name).is_some()),
676        }
677    }
678
679    /// Returns an error if the column to drop is invalid.
680    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
681        let Some(column) = metadata.column_by_name(name) else {
682            return Ok(());
683        };
684        ensure!(
685            column.semantic_type == SemanticType::Field,
686            InvalidRegionRequestSnafu {
687                region_id: metadata.region_id,
688                err: format!("column {} is not a field and could not be dropped", name),
689            }
690        );
691        Ok(())
692    }
693
694    /// Returns an error if the column's alter index option is invalid.
695    fn validate_column_alter_index_option(
696        column_name: &String,
697        metadata: &RegionMetadata,
698        is_fulltext: bool,
699    ) -> Result<()> {
700        let column = metadata
701            .column_by_name(column_name)
702            .context(InvalidRegionRequestSnafu {
703                region_id: metadata.region_id,
704                err: format!("column {} not found", column_name),
705            })?;
706
707        if is_fulltext {
708            ensure!(
709                column.column_schema.data_type.is_string(),
710                InvalidRegionRequestSnafu {
711                    region_id: metadata.region_id,
712                    err: format!(
713                        "cannot change alter index options for non-string column {}",
714                        column_name
715                    ),
716                }
717            );
718        }
719
720        Ok(())
721    }
722}
723
724impl TryFrom<alter_request::Kind> for AlterKind {
725    type Error = MetadataError;
726
727    fn try_from(kind: alter_request::Kind) -> Result<Self> {
728        let alter_kind = match kind {
729            alter_request::Kind::AddColumns(x) => {
730                let columns = x
731                    .add_columns
732                    .into_iter()
733                    .map(|x| x.try_into())
734                    .collect::<Result<Vec<_>>>()?;
735                AlterKind::AddColumns { columns }
736            }
737            alter_request::Kind::ModifyColumnTypes(x) => {
738                let columns = x
739                    .modify_column_types
740                    .into_iter()
741                    .map(|x| x.into())
742                    .collect::<Vec<_>>();
743                AlterKind::ModifyColumnTypes { columns }
744            }
745            alter_request::Kind::DropColumns(x) => {
746                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
747                AlterKind::DropColumns { names }
748            }
749            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
750                options: options
751                    .table_options
752                    .iter()
753                    .map(TryFrom::try_from)
754                    .collect::<Result<Vec<_>>>()?,
755            },
756            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
757                keys: options
758                    .keys
759                    .iter()
760                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
761                    .collect::<Result<Vec<_>>>()?,
762            },
763            alter_request::Kind::SetIndex(o) => match o.options.unwrap() {
764                set_index::Options::Fulltext(x) => AlterKind::SetIndex {
765                    options: ApiSetIndexOptions::Fulltext {
766                        column_name: x.column_name.clone(),
767                        options: FulltextOptions::new(
768                            x.enable,
769                            as_fulltext_option_analyzer(
770                                Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
771                            ),
772                            x.case_sensitive,
773                            as_fulltext_option_backend(
774                                PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
775                            ),
776                            x.granularity as u32,
777                            x.false_positive_rate,
778                        )
779                        .context(InvalidIndexOptionSnafu)?,
780                    },
781                },
782                set_index::Options::Inverted(i) => AlterKind::SetIndex {
783                    options: ApiSetIndexOptions::Inverted {
784                        column_name: i.column_name,
785                    },
786                },
787                set_index::Options::Skipping(s) => AlterKind::SetIndex {
788                    options: ApiSetIndexOptions::Skipping {
789                        column_name: s.column_name,
790                        options: SkippingIndexOptions::new(
791                            s.granularity as u32,
792                            s.false_positive_rate,
793                            as_skipping_index_type(
794                                PbSkippingIndexType::try_from(s.skipping_index_type)
795                                    .context(DecodeProtoSnafu)?,
796                            ),
797                        )
798                        .context(InvalidIndexOptionSnafu)?,
799                    },
800                },
801            },
802            alter_request::Kind::UnsetIndex(o) => match o.options.unwrap() {
803                v1::unset_index::Options::Fulltext(f) => AlterKind::UnsetIndex {
804                    options: ApiUnsetIndexOptions::Fulltext {
805                        column_name: f.column_name,
806                    },
807                },
808                v1::unset_index::Options::Inverted(i) => AlterKind::UnsetIndex {
809                    options: ApiUnsetIndexOptions::Inverted {
810                        column_name: i.column_name,
811                    },
812                },
813                v1::unset_index::Options::Skipping(s) => AlterKind::UnsetIndex {
814                    options: ApiUnsetIndexOptions::Skipping {
815                        column_name: s.column_name,
816                    },
817                },
818            },
819            alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
820                names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
821            },
822        };
823
824        Ok(alter_kind)
825    }
826}
827
828/// Adds a column.
829#[derive(Debug, PartialEq, Eq, Clone)]
830pub struct AddColumn {
831    /// Metadata of the column to add.
832    pub column_metadata: ColumnMetadata,
833    /// Location to add the column. If location is None, the region adds
834    /// the column to the last.
835    pub location: Option<AddColumnLocation>,
836}
837
838impl AddColumn {
839    /// Returns an error if the column to add is invalid.
840    ///
841    /// It allows adding existing columns. However, the existing column must have the same metadata
842    /// and the location must be None.
843    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
844        ensure!(
845            self.column_metadata.column_schema.is_nullable()
846                || self
847                    .column_metadata
848                    .column_schema
849                    .default_constraint()
850                    .is_some(),
851            InvalidRegionRequestSnafu {
852                region_id: metadata.region_id,
853                err: format!(
854                    "no default value for column {}",
855                    self.column_metadata.column_schema.name
856                ),
857            }
858        );
859
860        if let Some(existing_column) =
861            metadata.column_by_name(&self.column_metadata.column_schema.name)
862        {
863            // If the column already exists.
864            ensure!(
865                *existing_column == self.column_metadata,
866                InvalidRegionRequestSnafu {
867                    region_id: metadata.region_id,
868                    err: format!(
869                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
870                        self.column_metadata.column_schema.name, existing_column, self.column_metadata,
871                    ),
872                }
873            );
874            ensure!(
875                self.location.is_none(),
876                InvalidRegionRequestSnafu {
877                    region_id: metadata.region_id,
878                    err: format!(
879                        "column {} already exists, but location is specified",
880                        self.column_metadata.column_schema.name
881                    ),
882                }
883            );
884        }
885
886        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
887            // Ensures the existing column has the same name.
888            ensure!(
889                existing_column.column_schema.name == self.column_metadata.column_schema.name,
890                InvalidRegionRequestSnafu {
891                    region_id: metadata.region_id,
892                    err: format!(
893                        "column id {} already exists with different name {}",
894                        self.column_metadata.column_id, existing_column.column_schema.name
895                    ),
896                }
897            );
898        }
899
900        Ok(())
901    }
902
903    /// Returns true if no column to add to the region.
904    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
905        debug_assert!(self.validate(metadata).is_ok());
906        metadata
907            .column_by_name(&self.column_metadata.column_schema.name)
908            .is_none()
909    }
910}
911
912impl TryFrom<v1::region::AddColumn> for AddColumn {
913    type Error = MetadataError;
914
915    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
916        let column_def = add_column
917            .column_def
918            .context(InvalidRawRegionRequestSnafu {
919                err: "missing column_def in AddColumn",
920            })?;
921
922        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
923        let location = add_column
924            .location
925            .map(AddColumnLocation::try_from)
926            .transpose()?;
927
928        Ok(AddColumn {
929            column_metadata,
930            location,
931        })
932    }
933}
934
935/// Location to add a column.
936#[derive(Debug, PartialEq, Eq, Clone)]
937pub enum AddColumnLocation {
938    /// Add the column to the first position of columns.
939    First,
940    /// Add the column after specific column.
941    After {
942        /// Add the column after this column.
943        column_name: String,
944    },
945}
946
947impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
948    type Error = MetadataError;
949
950    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
951        let location_type = LocationType::try_from(location.location_type)
952            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
953        let add_column_location = match location_type {
954            LocationType::First => AddColumnLocation::First,
955            LocationType::After => AddColumnLocation::After {
956                column_name: location.after_column_name,
957            },
958        };
959
960        Ok(add_column_location)
961    }
962}
963
964/// Change a column's datatype.
965#[derive(Debug, PartialEq, Eq, Clone)]
966pub struct ModifyColumnType {
967    /// Schema of the column to modify.
968    pub column_name: String,
969    /// Column will be changed to this type.
970    pub target_type: ConcreteDataType,
971}
972
973impl ModifyColumnType {
974    /// Returns an error if the column's datatype to change is invalid.
975    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
976        let column_meta = metadata
977            .column_by_name(&self.column_name)
978            .with_context(|| InvalidRegionRequestSnafu {
979                region_id: metadata.region_id,
980                err: format!("column {} not found", self.column_name),
981            })?;
982
983        ensure!(
984            matches!(column_meta.semantic_type, SemanticType::Field),
985            InvalidRegionRequestSnafu {
986                region_id: metadata.region_id,
987                err: "'timestamp' or 'tag' column cannot change type".to_string()
988            }
989        );
990        ensure!(
991            column_meta
992                .column_schema
993                .data_type
994                .can_arrow_type_cast_to(&self.target_type),
995            InvalidRegionRequestSnafu {
996                region_id: metadata.region_id,
997                err: format!(
998                    "column '{}' cannot be cast automatically to type '{}'",
999                    self.column_name, self.target_type
1000                ),
1001            }
1002        );
1003
1004        Ok(())
1005    }
1006
1007    /// Returns true if no column's datatype to change to the region.
1008    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1009        debug_assert!(self.validate(metadata).is_ok());
1010        metadata.column_by_name(&self.column_name).is_some()
1011    }
1012}
1013
1014impl From<v1::ModifyColumnType> for ModifyColumnType {
1015    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1016        let target_type = ColumnDataTypeWrapper::new(
1017            modify_column_type.target_type(),
1018            modify_column_type.target_type_extension,
1019        )
1020        .into();
1021
1022        ModifyColumnType {
1023            column_name: modify_column_type.column_name,
1024            target_type,
1025        }
1026    }
1027}
1028
1029#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1030pub enum SetRegionOption {
1031    Ttl(Option<TimeToLive>),
1032    // Modifying TwscOptions with values as (option name, new value).
1033    Twsc(String, String),
1034}
1035
1036impl TryFrom<&PbOption> for SetRegionOption {
1037    type Error = MetadataError;
1038
1039    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1040        let PbOption { key, value } = value;
1041        match key.as_str() {
1042            TTL_KEY => {
1043                let ttl = TimeToLive::from_humantime_or_str(value)
1044                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1045
1046                Ok(Self::Ttl(Some(ttl)))
1047            }
1048            TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1049                Ok(Self::Twsc(key.to_string(), value.to_string()))
1050            }
1051            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1052        }
1053    }
1054}
1055
1056impl From<&UnsetRegionOption> for SetRegionOption {
1057    fn from(unset_option: &UnsetRegionOption) -> Self {
1058        match unset_option {
1059            UnsetRegionOption::TwcsTriggerFileNum => {
1060                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1061            }
1062            UnsetRegionOption::TwcsMaxOutputFileSize => {
1063                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1064            }
1065            UnsetRegionOption::TwcsTimeWindow => {
1066                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1067            }
1068            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1069        }
1070    }
1071}
1072
1073impl TryFrom<&str> for UnsetRegionOption {
1074    type Error = MetadataError;
1075
1076    fn try_from(key: &str) -> Result<Self> {
1077        match key.to_ascii_lowercase().as_str() {
1078            TTL_KEY => Ok(Self::Ttl),
1079            TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1080            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1081            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1082            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1083        }
1084    }
1085}
1086
1087#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1088pub enum UnsetRegionOption {
1089    TwcsTriggerFileNum,
1090    TwcsMaxOutputFileSize,
1091    TwcsTimeWindow,
1092    Ttl,
1093}
1094
1095impl UnsetRegionOption {
1096    pub fn as_str(&self) -> &str {
1097        match self {
1098            Self::Ttl => TTL_KEY,
1099            Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1100            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1101            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1102        }
1103    }
1104}
1105
1106impl Display for UnsetRegionOption {
1107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1108        write!(f, "{}", self.as_str())
1109    }
1110}
1111
1112#[derive(Debug, Clone, Default)]
1113pub struct RegionFlushRequest {
1114    pub row_group_size: Option<usize>,
1115}
1116
1117#[derive(Debug)]
1118pub struct RegionCompactRequest {
1119    pub options: compact_request::Options,
1120}
1121
1122impl Default for RegionCompactRequest {
1123    fn default() -> Self {
1124        Self {
1125            // Default to regular compaction.
1126            options: compact_request::Options::Regular(Default::default()),
1127        }
1128    }
1129}
1130
1131/// Truncate region request.
1132#[derive(Debug)]
1133pub struct RegionTruncateRequest {}
1134
1135/// Catchup region request.
1136///
1137/// Makes a readonly region to catch up to leader region changes.
1138/// There is no effect if it operating on a leader region.
1139#[derive(Debug, Clone, Copy)]
1140pub struct RegionCatchupRequest {
1141    /// Sets it to writable if it's available after it has caught up with all changes.
1142    pub set_writable: bool,
1143    /// The `entry_id` that was expected to reply to.
1144    /// `None` stands replaying to latest.
1145    pub entry_id: Option<entry::Id>,
1146    /// Used for metrics metadata region.
1147    /// The `entry_id` that was expected to reply to.
1148    /// `None` stands replaying to latest.
1149    pub metadata_entry_id: Option<entry::Id>,
1150    /// The hint for replaying memtable.
1151    pub location_id: Option<u64>,
1152}
1153
1154/// Get sequences of regions by region ids.
1155#[derive(Debug, Clone)]
1156pub struct RegionSequencesRequest {
1157    pub region_ids: Vec<RegionId>,
1158}
1159
1160#[derive(Debug, Clone)]
1161pub struct RegionBulkInsertsRequest {
1162    pub region_id: RegionId,
1163    pub payload: DfRecordBatch,
1164    pub raw_data: ArrowIpc,
1165}
1166
1167impl RegionBulkInsertsRequest {
1168    pub fn estimated_size(&self) -> usize {
1169        self.payload.get_array_memory_size()
1170    }
1171}
1172
1173impl fmt::Display for RegionRequest {
1174    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1175        match self {
1176            RegionRequest::Put(_) => write!(f, "Put"),
1177            RegionRequest::Delete(_) => write!(f, "Delete"),
1178            RegionRequest::Create(_) => write!(f, "Create"),
1179            RegionRequest::Drop(_) => write!(f, "Drop"),
1180            RegionRequest::Open(_) => write!(f, "Open"),
1181            RegionRequest::Close(_) => write!(f, "Close"),
1182            RegionRequest::Alter(_) => write!(f, "Alter"),
1183            RegionRequest::Flush(_) => write!(f, "Flush"),
1184            RegionRequest::Compact(_) => write!(f, "Compact"),
1185            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1186            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1187            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1188        }
1189    }
1190}
1191
1192#[cfg(test)]
1193mod tests {
1194    use api::v1::region::RegionColumnDef;
1195    use api::v1::{ColumnDataType, ColumnDef};
1196    use datatypes::prelude::ConcreteDataType;
1197    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1198
1199    use super::*;
1200    use crate::metadata::RegionMetadataBuilder;
1201
1202    #[test]
1203    fn test_from_proto_location() {
1204        let proto_location = v1::AddColumnLocation {
1205            location_type: LocationType::First as i32,
1206            after_column_name: String::default(),
1207        };
1208        let location = AddColumnLocation::try_from(proto_location).unwrap();
1209        assert_eq!(location, AddColumnLocation::First);
1210
1211        let proto_location = v1::AddColumnLocation {
1212            location_type: 10,
1213            after_column_name: String::default(),
1214        };
1215        AddColumnLocation::try_from(proto_location).unwrap_err();
1216
1217        let proto_location = v1::AddColumnLocation {
1218            location_type: LocationType::After as i32,
1219            after_column_name: "a".to_string(),
1220        };
1221        let location = AddColumnLocation::try_from(proto_location).unwrap();
1222        assert_eq!(
1223            location,
1224            AddColumnLocation::After {
1225                column_name: "a".to_string()
1226            }
1227        );
1228    }
1229
1230    #[test]
1231    fn test_from_none_proto_add_column() {
1232        AddColumn::try_from(v1::region::AddColumn {
1233            column_def: None,
1234            location: None,
1235        })
1236        .unwrap_err();
1237    }
1238
1239    #[test]
1240    fn test_from_proto_alter_request() {
1241        RegionAlterRequest::try_from(AlterRequest {
1242            region_id: 0,
1243            schema_version: 1,
1244            kind: None,
1245        })
1246        .unwrap_err();
1247
1248        let request = RegionAlterRequest::try_from(AlterRequest {
1249            region_id: 0,
1250            schema_version: 1,
1251            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1252                add_columns: vec![v1::region::AddColumn {
1253                    column_def: Some(RegionColumnDef {
1254                        column_def: Some(ColumnDef {
1255                            name: "a".to_string(),
1256                            data_type: ColumnDataType::String as i32,
1257                            is_nullable: true,
1258                            default_constraint: vec![],
1259                            semantic_type: SemanticType::Field as i32,
1260                            comment: String::new(),
1261                            ..Default::default()
1262                        }),
1263                        column_id: 1,
1264                    }),
1265                    location: Some(v1::AddColumnLocation {
1266                        location_type: LocationType::First as i32,
1267                        after_column_name: String::default(),
1268                    }),
1269                }],
1270            })),
1271        })
1272        .unwrap();
1273
1274        assert_eq!(
1275            request,
1276            RegionAlterRequest {
1277                kind: AlterKind::AddColumns {
1278                    columns: vec![AddColumn {
1279                        column_metadata: ColumnMetadata {
1280                            column_schema: ColumnSchema::new(
1281                                "a",
1282                                ConcreteDataType::string_datatype(),
1283                                true,
1284                            ),
1285                            semantic_type: SemanticType::Field,
1286                            column_id: 1,
1287                        },
1288                        location: Some(AddColumnLocation::First),
1289                    }]
1290                },
1291            }
1292        );
1293    }
1294
1295    /// Returns a new region metadata for testing. Metadata:
1296    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1297    fn new_metadata() -> RegionMetadata {
1298        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1299        builder
1300            .push_column_metadata(ColumnMetadata {
1301                column_schema: ColumnSchema::new(
1302                    "ts",
1303                    ConcreteDataType::timestamp_millisecond_datatype(),
1304                    false,
1305                ),
1306                semantic_type: SemanticType::Timestamp,
1307                column_id: 1,
1308            })
1309            .push_column_metadata(ColumnMetadata {
1310                column_schema: ColumnSchema::new(
1311                    "tag_0",
1312                    ConcreteDataType::string_datatype(),
1313                    true,
1314                ),
1315                semantic_type: SemanticType::Tag,
1316                column_id: 2,
1317            })
1318            .push_column_metadata(ColumnMetadata {
1319                column_schema: ColumnSchema::new(
1320                    "field_0",
1321                    ConcreteDataType::string_datatype(),
1322                    true,
1323                ),
1324                semantic_type: SemanticType::Field,
1325                column_id: 3,
1326            })
1327            .push_column_metadata(ColumnMetadata {
1328                column_schema: ColumnSchema::new(
1329                    "field_1",
1330                    ConcreteDataType::boolean_datatype(),
1331                    true,
1332                ),
1333                semantic_type: SemanticType::Field,
1334                column_id: 4,
1335            })
1336            .primary_key(vec![2]);
1337        builder.build().unwrap()
1338    }
1339
1340    #[test]
1341    fn test_add_column_validate() {
1342        let metadata = new_metadata();
1343        let add_column = AddColumn {
1344            column_metadata: ColumnMetadata {
1345                column_schema: ColumnSchema::new(
1346                    "tag_1",
1347                    ConcreteDataType::string_datatype(),
1348                    true,
1349                ),
1350                semantic_type: SemanticType::Tag,
1351                column_id: 5,
1352            },
1353            location: None,
1354        };
1355        add_column.validate(&metadata).unwrap();
1356        assert!(add_column.need_alter(&metadata));
1357
1358        // Add not null column.
1359        AddColumn {
1360            column_metadata: ColumnMetadata {
1361                column_schema: ColumnSchema::new(
1362                    "tag_1",
1363                    ConcreteDataType::string_datatype(),
1364                    false,
1365                ),
1366                semantic_type: SemanticType::Tag,
1367                column_id: 5,
1368            },
1369            location: None,
1370        }
1371        .validate(&metadata)
1372        .unwrap_err();
1373
1374        // Add existing column.
1375        let add_column = AddColumn {
1376            column_metadata: ColumnMetadata {
1377                column_schema: ColumnSchema::new(
1378                    "tag_0",
1379                    ConcreteDataType::string_datatype(),
1380                    true,
1381                ),
1382                semantic_type: SemanticType::Tag,
1383                column_id: 2,
1384            },
1385            location: None,
1386        };
1387        add_column.validate(&metadata).unwrap();
1388        assert!(!add_column.need_alter(&metadata));
1389    }
1390
1391    #[test]
1392    fn test_add_duplicate_columns() {
1393        let kind = AlterKind::AddColumns {
1394            columns: vec![
1395                AddColumn {
1396                    column_metadata: ColumnMetadata {
1397                        column_schema: ColumnSchema::new(
1398                            "tag_1",
1399                            ConcreteDataType::string_datatype(),
1400                            true,
1401                        ),
1402                        semantic_type: SemanticType::Tag,
1403                        column_id: 5,
1404                    },
1405                    location: None,
1406                },
1407                AddColumn {
1408                    column_metadata: ColumnMetadata {
1409                        column_schema: ColumnSchema::new(
1410                            "tag_1",
1411                            ConcreteDataType::string_datatype(),
1412                            true,
1413                        ),
1414                        semantic_type: SemanticType::Field,
1415                        column_id: 6,
1416                    },
1417                    location: None,
1418                },
1419            ],
1420        };
1421        let metadata = new_metadata();
1422        kind.validate(&metadata).unwrap();
1423        assert!(kind.need_alter(&metadata));
1424    }
1425
1426    #[test]
1427    fn test_add_existing_column_different_metadata() {
1428        let metadata = new_metadata();
1429
1430        // Add existing column with different id.
1431        let kind = AlterKind::AddColumns {
1432            columns: vec![AddColumn {
1433                column_metadata: ColumnMetadata {
1434                    column_schema: ColumnSchema::new(
1435                        "tag_0",
1436                        ConcreteDataType::string_datatype(),
1437                        true,
1438                    ),
1439                    semantic_type: SemanticType::Tag,
1440                    column_id: 4,
1441                },
1442                location: None,
1443            }],
1444        };
1445        kind.validate(&metadata).unwrap_err();
1446
1447        // Add existing column with different type.
1448        let kind = AlterKind::AddColumns {
1449            columns: vec![AddColumn {
1450                column_metadata: ColumnMetadata {
1451                    column_schema: ColumnSchema::new(
1452                        "tag_0",
1453                        ConcreteDataType::int64_datatype(),
1454                        true,
1455                    ),
1456                    semantic_type: SemanticType::Tag,
1457                    column_id: 2,
1458                },
1459                location: None,
1460            }],
1461        };
1462        kind.validate(&metadata).unwrap_err();
1463
1464        // Add existing column with different name.
1465        let kind = AlterKind::AddColumns {
1466            columns: vec![AddColumn {
1467                column_metadata: ColumnMetadata {
1468                    column_schema: ColumnSchema::new(
1469                        "tag_1",
1470                        ConcreteDataType::string_datatype(),
1471                        true,
1472                    ),
1473                    semantic_type: SemanticType::Tag,
1474                    column_id: 2,
1475                },
1476                location: None,
1477            }],
1478        };
1479        kind.validate(&metadata).unwrap_err();
1480    }
1481
1482    #[test]
1483    fn test_add_existing_column_with_location() {
1484        let metadata = new_metadata();
1485        let kind = AlterKind::AddColumns {
1486            columns: vec![AddColumn {
1487                column_metadata: ColumnMetadata {
1488                    column_schema: ColumnSchema::new(
1489                        "tag_0",
1490                        ConcreteDataType::string_datatype(),
1491                        true,
1492                    ),
1493                    semantic_type: SemanticType::Tag,
1494                    column_id: 2,
1495                },
1496                location: Some(AddColumnLocation::First),
1497            }],
1498        };
1499        kind.validate(&metadata).unwrap_err();
1500    }
1501
1502    #[test]
1503    fn test_validate_drop_column() {
1504        let metadata = new_metadata();
1505        let kind = AlterKind::DropColumns {
1506            names: vec!["xxxx".to_string()],
1507        };
1508        kind.validate(&metadata).unwrap();
1509        assert!(!kind.need_alter(&metadata));
1510
1511        AlterKind::DropColumns {
1512            names: vec!["tag_0".to_string()],
1513        }
1514        .validate(&metadata)
1515        .unwrap_err();
1516
1517        let kind = AlterKind::DropColumns {
1518            names: vec!["field_0".to_string()],
1519        };
1520        kind.validate(&metadata).unwrap();
1521        assert!(kind.need_alter(&metadata));
1522    }
1523
1524    #[test]
1525    fn test_validate_modify_column_type() {
1526        let metadata = new_metadata();
1527        AlterKind::ModifyColumnTypes {
1528            columns: vec![ModifyColumnType {
1529                column_name: "xxxx".to_string(),
1530                target_type: ConcreteDataType::string_datatype(),
1531            }],
1532        }
1533        .validate(&metadata)
1534        .unwrap_err();
1535
1536        AlterKind::ModifyColumnTypes {
1537            columns: vec![ModifyColumnType {
1538                column_name: "field_1".to_string(),
1539                target_type: ConcreteDataType::date_datatype(),
1540            }],
1541        }
1542        .validate(&metadata)
1543        .unwrap_err();
1544
1545        AlterKind::ModifyColumnTypes {
1546            columns: vec![ModifyColumnType {
1547                column_name: "ts".to_string(),
1548                target_type: ConcreteDataType::date_datatype(),
1549            }],
1550        }
1551        .validate(&metadata)
1552        .unwrap_err();
1553
1554        AlterKind::ModifyColumnTypes {
1555            columns: vec![ModifyColumnType {
1556                column_name: "tag_0".to_string(),
1557                target_type: ConcreteDataType::date_datatype(),
1558            }],
1559        }
1560        .validate(&metadata)
1561        .unwrap_err();
1562
1563        let kind = AlterKind::ModifyColumnTypes {
1564            columns: vec![ModifyColumnType {
1565                column_name: "field_0".to_string(),
1566                target_type: ConcreteDataType::int32_datatype(),
1567            }],
1568        };
1569        kind.validate(&metadata).unwrap();
1570        assert!(kind.need_alter(&metadata));
1571    }
1572
1573    #[test]
1574    fn test_validate_add_columns() {
1575        let kind = AlterKind::AddColumns {
1576            columns: vec![
1577                AddColumn {
1578                    column_metadata: ColumnMetadata {
1579                        column_schema: ColumnSchema::new(
1580                            "tag_1",
1581                            ConcreteDataType::string_datatype(),
1582                            true,
1583                        ),
1584                        semantic_type: SemanticType::Tag,
1585                        column_id: 5,
1586                    },
1587                    location: None,
1588                },
1589                AddColumn {
1590                    column_metadata: ColumnMetadata {
1591                        column_schema: ColumnSchema::new(
1592                            "field_2",
1593                            ConcreteDataType::string_datatype(),
1594                            true,
1595                        ),
1596                        semantic_type: SemanticType::Field,
1597                        column_id: 6,
1598                    },
1599                    location: None,
1600                },
1601            ],
1602        };
1603        let request = RegionAlterRequest { kind };
1604        let mut metadata = new_metadata();
1605        metadata.schema_version = 1;
1606        request.validate(&metadata).unwrap();
1607    }
1608
1609    #[test]
1610    fn test_validate_create_region() {
1611        let column_metadatas = vec![
1612            ColumnMetadata {
1613                column_schema: ColumnSchema::new(
1614                    "ts",
1615                    ConcreteDataType::timestamp_millisecond_datatype(),
1616                    false,
1617                ),
1618                semantic_type: SemanticType::Timestamp,
1619                column_id: 1,
1620            },
1621            ColumnMetadata {
1622                column_schema: ColumnSchema::new(
1623                    "tag_0",
1624                    ConcreteDataType::string_datatype(),
1625                    true,
1626                ),
1627                semantic_type: SemanticType::Tag,
1628                column_id: 2,
1629            },
1630            ColumnMetadata {
1631                column_schema: ColumnSchema::new(
1632                    "field_0",
1633                    ConcreteDataType::string_datatype(),
1634                    true,
1635                ),
1636                semantic_type: SemanticType::Field,
1637                column_id: 3,
1638            },
1639        ];
1640        let create = RegionCreateRequest {
1641            engine: "mito".to_string(),
1642            column_metadatas,
1643            primary_key: vec![3, 4],
1644            options: HashMap::new(),
1645            region_dir: "path".to_string(),
1646        };
1647
1648        assert!(create.validate().is_err());
1649    }
1650
1651    #[test]
1652    fn test_validate_modify_column_fulltext_options() {
1653        let kind = AlterKind::SetIndex {
1654            options: ApiSetIndexOptions::Fulltext {
1655                column_name: "tag_0".to_string(),
1656                options: FulltextOptions::new_unchecked(
1657                    true,
1658                    FulltextAnalyzer::Chinese,
1659                    false,
1660                    FulltextBackend::Bloom,
1661                    1000,
1662                    0.01,
1663                ),
1664            },
1665        };
1666        let request = RegionAlterRequest { kind };
1667        let mut metadata = new_metadata();
1668        metadata.schema_version = 1;
1669        request.validate(&metadata).unwrap();
1670
1671        let kind = AlterKind::UnsetIndex {
1672            options: ApiUnsetIndexOptions::Fulltext {
1673                column_name: "tag_0".to_string(),
1674            },
1675        };
1676        let request = RegionAlterRequest { kind };
1677        let mut metadata = new_metadata();
1678        metadata.schema_version = 1;
1679        request.validate(&metadata).unwrap();
1680    }
1681}