store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::fmt::{self, Display};
18use std::io::Cursor;
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::add_column_location::LocationType;
22use api::v1::column_def::{
23    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
24};
25use api::v1::region::{
26    alter_request, compact_request, region_request, AlterRequest, AlterRequests,
27    BulkInsertRequests, CloseRequest, CompactRequest, CreateRequest, CreateRequests,
28    DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests, OpenRequest,
29    TruncateRequest,
30};
31use api::v1::{
32    self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
33    SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
34};
35pub use common_base::AffectedRows;
36use common_recordbatch::DfRecordBatch;
37use common_time::TimeToLive;
38use datatypes::arrow::ipc::reader::FileReader;
39use datatypes::prelude::ConcreteDataType;
40use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
41use serde::{Deserialize, Serialize};
42use snafu::{ensure, OptionExt, ResultExt};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47    ColumnMetadata, DecodeArrowIpcSnafu, DecodeProtoSnafu, InvalidRawRegionRequestSnafu,
48    InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
49    InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu,
50};
51use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
52use crate::mito_engine_options::{
53    TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS,
54    TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE,
55    TWCS_TIME_WINDOW,
56};
57use crate::path_utils::region_dir;
58use crate::storage::{ColumnId, RegionId, ScanRequest};
59
60#[derive(Debug, IntoStaticStr)]
61pub enum BatchRegionDdlRequest {
62    Create(Vec<(RegionId, RegionCreateRequest)>),
63    Drop(Vec<(RegionId, RegionDropRequest)>),
64    Alter(Vec<(RegionId, RegionAlterRequest)>),
65}
66
67impl BatchRegionDdlRequest {
68    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
69    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
70        match body {
71            region_request::Body::Creates(creates) => {
72                let requests = creates
73                    .requests
74                    .into_iter()
75                    .map(parse_region_create)
76                    .collect::<Result<Vec<_>>>()?;
77                Ok(Some(Self::Create(requests)))
78            }
79            region_request::Body::Drops(drops) => {
80                let requests = drops
81                    .requests
82                    .into_iter()
83                    .map(parse_region_drop)
84                    .collect::<Result<Vec<_>>>()?;
85                Ok(Some(Self::Drop(requests)))
86            }
87            region_request::Body::Alters(alters) => {
88                let requests = alters
89                    .requests
90                    .into_iter()
91                    .map(parse_region_alter)
92                    .collect::<Result<Vec<_>>>()?;
93                Ok(Some(Self::Alter(requests)))
94            }
95            _ => Ok(None),
96        }
97    }
98
99    pub fn request_type(&self) -> &'static str {
100        self.into()
101    }
102
103    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
104        match self {
105            Self::Create(requests) => requests
106                .into_iter()
107                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
108                .collect(),
109            Self::Drop(requests) => requests
110                .into_iter()
111                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
112                .collect(),
113            Self::Alter(requests) => requests
114                .into_iter()
115                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
116                .collect(),
117        }
118    }
119}
120
121#[derive(Debug, IntoStaticStr)]
122pub enum RegionRequest {
123    Put(RegionPutRequest),
124    Delete(RegionDeleteRequest),
125    Create(RegionCreateRequest),
126    Drop(RegionDropRequest),
127    Open(RegionOpenRequest),
128    Close(RegionCloseRequest),
129    Alter(RegionAlterRequest),
130    Flush(RegionFlushRequest),
131    Compact(RegionCompactRequest),
132    Truncate(RegionTruncateRequest),
133    Catchup(RegionCatchupRequest),
134    BulkInserts(RegionBulkInsertsRequest),
135}
136
137impl RegionRequest {
138    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
139    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
140    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
141        match body {
142            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
143            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
144            region_request::Body::Create(create) => make_region_create(create),
145            region_request::Body::Drop(drop) => make_region_drop(drop),
146            region_request::Body::Open(open) => make_region_open(open),
147            region_request::Body::Close(close) => make_region_close(close),
148            region_request::Body::Alter(alter) => make_region_alter(alter),
149            region_request::Body::Flush(flush) => make_region_flush(flush),
150            region_request::Body::Compact(compact) => make_region_compact(compact),
151            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
152            region_request::Body::Creates(creates) => make_region_creates(creates),
153            region_request::Body::Drops(drops) => make_region_drops(drops),
154            region_request::Body::Alters(alters) => make_region_alters(alters),
155            region_request::Body::BulkInserts(bulk) => make_region_bulk_inserts(bulk),
156            region_request::Body::Sync(_) => UnexpectedSnafu {
157                reason: "Sync request should be handled separately by RegionServer",
158            }
159            .fail(),
160        }
161    }
162
163    /// Returns the type name of the request.
164    pub fn request_type(&self) -> &'static str {
165        self.into()
166    }
167}
168
169fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
170    let requests = inserts
171        .requests
172        .into_iter()
173        .filter_map(|r| {
174            let region_id = r.region_id.into();
175            r.rows.map(|rows| {
176                (
177                    region_id,
178                    RegionRequest::Put(RegionPutRequest { rows, hint: None }),
179                )
180            })
181        })
182        .collect();
183    Ok(requests)
184}
185
186fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
187    let requests = deletes
188        .requests
189        .into_iter()
190        .filter_map(|r| {
191            let region_id = r.region_id.into();
192            r.rows.map(|rows| {
193                (
194                    region_id,
195                    RegionRequest::Delete(RegionDeleteRequest { rows }),
196                )
197            })
198        })
199        .collect();
200    Ok(requests)
201}
202
203fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
204    let column_metadatas = create
205        .column_defs
206        .into_iter()
207        .map(ColumnMetadata::try_from_column_def)
208        .collect::<Result<Vec<_>>>()?;
209    let region_id = create.region_id.into();
210    let region_dir = region_dir(&create.path, region_id);
211    Ok((
212        region_id,
213        RegionCreateRequest {
214            engine: create.engine,
215            column_metadatas,
216            primary_key: create.primary_key,
217            options: create.options,
218            region_dir,
219        },
220    ))
221}
222
223fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
224    let (region_id, request) = parse_region_create(create)?;
225    Ok(vec![(region_id, RegionRequest::Create(request))])
226}
227
228fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
229    let mut requests = Vec::with_capacity(creates.requests.len());
230    for create in creates.requests {
231        requests.extend(make_region_create(create)?);
232    }
233    Ok(requests)
234}
235
236fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
237    let region_id = drop.region_id.into();
238    Ok((
239        region_id,
240        RegionDropRequest {
241            fast_path: drop.fast_path,
242        },
243    ))
244}
245
246fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
247    let (region_id, request) = parse_region_drop(drop)?;
248    Ok(vec![(region_id, RegionRequest::Drop(request))])
249}
250
251fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
252    let mut requests = Vec::with_capacity(drops.requests.len());
253    for drop in drops.requests {
254        requests.extend(make_region_drop(drop)?);
255    }
256    Ok(requests)
257}
258
259fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
260    let region_id = open.region_id.into();
261    let region_dir = region_dir(&open.path, region_id);
262    Ok(vec![(
263        region_id,
264        RegionRequest::Open(RegionOpenRequest {
265            engine: open.engine,
266            region_dir,
267            options: open.options,
268            skip_wal_replay: false,
269        }),
270    )])
271}
272
273fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
274    let region_id = close.region_id.into();
275    Ok(vec![(
276        region_id,
277        RegionRequest::Close(RegionCloseRequest {}),
278    )])
279}
280
281fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
282    let region_id = alter.region_id.into();
283    let request = RegionAlterRequest::try_from(alter)?;
284    Ok((region_id, request))
285}
286
287fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
288    let (region_id, request) = parse_region_alter(alter)?;
289    Ok(vec![(region_id, RegionRequest::Alter(request))])
290}
291
292fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
293    let mut requests = Vec::with_capacity(alters.requests.len());
294    for alter in alters.requests {
295        requests.extend(make_region_alter(alter)?);
296    }
297    Ok(requests)
298}
299
300fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
301    let region_id = flush.region_id.into();
302    Ok(vec![(
303        region_id,
304        RegionRequest::Flush(RegionFlushRequest {
305            row_group_size: None,
306        }),
307    )])
308}
309
310fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
311    let region_id = compact.region_id.into();
312    let options = compact
313        .options
314        .unwrap_or(compact_request::Options::Regular(Default::default()));
315    Ok(vec![(
316        region_id,
317        RegionRequest::Compact(RegionCompactRequest { options }),
318    )])
319}
320
321fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
322    let region_id = truncate.region_id.into();
323    Ok(vec![(
324        region_id,
325        RegionRequest::Truncate(RegionTruncateRequest {}),
326    )])
327}
328
329/// Convert [BulkInsertRequests] to [RegionRequest] and group by [RegionId].
330fn make_region_bulk_inserts(
331    requests: BulkInsertRequests,
332) -> Result<Vec<(RegionId, RegionRequest)>> {
333    let mut region_requests: HashMap<u64, Vec<BulkInsertPayload>> =
334        HashMap::with_capacity(requests.requests.len());
335
336    for req in requests.requests {
337        let region_id = req.region_id;
338        match req.payload_type() {
339            api::v1::region::BulkInsertType::ArrowIpc => {
340                // todo(hl): use StreamReader instead
341                let reader = FileReader::try_new(Cursor::new(req.payload), None)
342                    .context(DecodeArrowIpcSnafu)?;
343                let record_batches = reader
344                    .map(|b| b.map(BulkInsertPayload::ArrowIpc))
345                    .try_collect::<Vec<_>>()
346                    .context(DecodeArrowIpcSnafu)?;
347                match region_requests.entry(region_id) {
348                    Entry::Occupied(mut e) => {
349                        e.get_mut().extend(record_batches);
350                    }
351                    Entry::Vacant(e) => {
352                        e.insert(record_batches);
353                    }
354                }
355            }
356        }
357    }
358
359    let result = region_requests
360        .into_iter()
361        .map(|(region_id, payloads)| {
362            (
363                region_id.into(),
364                RegionRequest::BulkInserts(RegionBulkInsertsRequest {
365                    region_id: region_id.into(),
366                    payloads,
367                }),
368            )
369        })
370        .collect::<Vec<_>>();
371    Ok(result)
372}
373
374/// Request to put data into a region.
375#[derive(Debug)]
376pub struct RegionPutRequest {
377    /// Rows to put.
378    pub rows: Rows,
379    /// Write hint.
380    pub hint: Option<WriteHint>,
381}
382
383#[derive(Debug)]
384pub struct RegionReadRequest {
385    pub request: ScanRequest,
386}
387
388/// Request to delete data from a region.
389#[derive(Debug)]
390pub struct RegionDeleteRequest {
391    /// Keys to rows to delete.
392    ///
393    /// Each row only contains primary key columns and a time index column.
394    pub rows: Rows,
395}
396
397#[derive(Debug, Clone)]
398pub struct RegionCreateRequest {
399    /// Region engine name
400    pub engine: String,
401    /// Columns in this region.
402    pub column_metadatas: Vec<ColumnMetadata>,
403    /// Columns in the primary key.
404    pub primary_key: Vec<ColumnId>,
405    /// Options of the created region.
406    pub options: HashMap<String, String>,
407    /// Directory for region's data home. Usually is composed by catalog and table id
408    pub region_dir: String,
409}
410
411impl RegionCreateRequest {
412    /// Checks whether the request is valid, returns an error if it is invalid.
413    pub fn validate(&self) -> Result<()> {
414        // time index must exist
415        ensure!(
416            self.column_metadatas
417                .iter()
418                .any(|x| x.semantic_type == SemanticType::Timestamp),
419            InvalidRegionRequestSnafu {
420                region_id: RegionId::new(0, 0),
421                err: "missing timestamp column in create region request".to_string(),
422            }
423        );
424
425        // build column id to indices
426        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
427        for (i, c) in self.column_metadatas.iter().enumerate() {
428            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
429                return InvalidRegionRequestSnafu {
430                    region_id: RegionId::new(0, 0),
431                    err: format!(
432                        "duplicate column id {} (at position {} and {}) in create region request",
433                        c.column_id, previous, i
434                    ),
435                }
436                .fail();
437            }
438        }
439
440        // primary key must exist
441        for column_id in &self.primary_key {
442            ensure!(
443                column_id_to_indices.contains_key(column_id),
444                InvalidRegionRequestSnafu {
445                    region_id: RegionId::new(0, 0),
446                    err: format!(
447                        "missing primary key column {} in create region request",
448                        column_id
449                    ),
450                }
451            );
452        }
453
454        Ok(())
455    }
456
457    /// Returns true when the region belongs to the metric engine's physical table.
458    pub fn is_physical_table(&self) -> bool {
459        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
460    }
461}
462
463#[derive(Debug, Clone)]
464pub struct RegionDropRequest {
465    pub fast_path: bool,
466}
467
468/// Open region request.
469#[derive(Debug, Clone)]
470pub struct RegionOpenRequest {
471    /// Region engine name
472    pub engine: String,
473    /// Data directory of the region.
474    pub region_dir: String,
475    /// Options of the opened region.
476    pub options: HashMap<String, String>,
477    /// To skip replaying the WAL.
478    pub skip_wal_replay: bool,
479}
480
481impl RegionOpenRequest {
482    /// Returns true when the region belongs to the metric engine's physical table.
483    pub fn is_physical_table(&self) -> bool {
484        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
485    }
486}
487
488/// Close region request.
489#[derive(Debug)]
490pub struct RegionCloseRequest {}
491
492/// Alter metadata of a region.
493#[derive(Debug, PartialEq, Eq, Clone)]
494pub struct RegionAlterRequest {
495    /// Kind of alteration to do.
496    pub kind: AlterKind,
497}
498
499impl RegionAlterRequest {
500    /// Checks whether the request is valid, returns an error if it is invalid.
501    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
502        self.kind.validate(metadata)?;
503
504        Ok(())
505    }
506
507    /// Returns true if we need to apply the request to the region.
508    ///
509    /// The `request` should be valid.
510    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
511        debug_assert!(self.validate(metadata).is_ok());
512        self.kind.need_alter(metadata)
513    }
514}
515
516impl TryFrom<AlterRequest> for RegionAlterRequest {
517    type Error = MetadataError;
518
519    fn try_from(value: AlterRequest) -> Result<Self> {
520        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
521            err: "missing kind in AlterRequest",
522        })?;
523
524        let kind = AlterKind::try_from(kind)?;
525        Ok(RegionAlterRequest { kind })
526    }
527}
528
529/// Kind of the alteration.
530#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
531pub enum AlterKind {
532    /// Add columns to the region.
533    AddColumns {
534        /// Columns to add.
535        columns: Vec<AddColumn>,
536    },
537    /// Drop columns from the region, only fields are allowed to drop.
538    DropColumns {
539        /// Name of columns to drop.
540        names: Vec<String>,
541    },
542    /// Change columns datatype form the region, only fields are allowed to change.
543    ModifyColumnTypes {
544        /// Columns to change.
545        columns: Vec<ModifyColumnType>,
546    },
547    /// Set region options.
548    SetRegionOptions { options: Vec<SetRegionOption> },
549    /// Unset region options.
550    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
551    /// Set index options.
552    SetIndex { options: ApiSetIndexOptions },
553    /// Unset index options.
554    UnsetIndex { options: ApiUnsetIndexOptions },
555}
556
557#[derive(Debug, PartialEq, Eq, Clone)]
558pub enum ApiSetIndexOptions {
559    Fulltext {
560        column_name: String,
561        options: FulltextOptions,
562    },
563    Inverted {
564        column_name: String,
565    },
566    Skipping {
567        column_name: String,
568        options: SkippingIndexOptions,
569    },
570}
571
572impl ApiSetIndexOptions {
573    pub fn column_name(&self) -> &String {
574        match self {
575            ApiSetIndexOptions::Fulltext { column_name, .. } => column_name,
576            ApiSetIndexOptions::Inverted { column_name } => column_name,
577            ApiSetIndexOptions::Skipping { column_name, .. } => column_name,
578        }
579    }
580
581    pub fn is_fulltext(&self) -> bool {
582        match self {
583            ApiSetIndexOptions::Fulltext { .. } => true,
584            ApiSetIndexOptions::Inverted { .. } => false,
585            ApiSetIndexOptions::Skipping { .. } => false,
586        }
587    }
588}
589
590#[derive(Debug, PartialEq, Eq, Clone)]
591pub enum ApiUnsetIndexOptions {
592    Fulltext { column_name: String },
593    Inverted { column_name: String },
594    Skipping { column_name: String },
595}
596
597impl ApiUnsetIndexOptions {
598    pub fn column_name(&self) -> &String {
599        match self {
600            ApiUnsetIndexOptions::Fulltext { column_name } => column_name,
601            ApiUnsetIndexOptions::Inverted { column_name } => column_name,
602            ApiUnsetIndexOptions::Skipping { column_name } => column_name,
603        }
604    }
605
606    pub fn is_fulltext(&self) -> bool {
607        match self {
608            ApiUnsetIndexOptions::Fulltext { .. } => true,
609            ApiUnsetIndexOptions::Inverted { .. } => false,
610            ApiUnsetIndexOptions::Skipping { .. } => false,
611        }
612    }
613}
614
615impl AlterKind {
616    /// Returns an error if the alter kind is invalid.
617    ///
618    /// It allows adding column if not exists and dropping column if exists.
619    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
620        match self {
621            AlterKind::AddColumns { columns } => {
622                for col_to_add in columns {
623                    col_to_add.validate(metadata)?;
624                }
625            }
626            AlterKind::DropColumns { names } => {
627                for name in names {
628                    Self::validate_column_to_drop(name, metadata)?;
629                }
630            }
631            AlterKind::ModifyColumnTypes { columns } => {
632                for col_to_change in columns {
633                    col_to_change.validate(metadata)?;
634                }
635            }
636            AlterKind::SetRegionOptions { .. } => {}
637            AlterKind::UnsetRegionOptions { .. } => {}
638            AlterKind::SetIndex { options } => {
639                Self::validate_column_alter_index_option(
640                    options.column_name(),
641                    metadata,
642                    options.is_fulltext(),
643                )?;
644            }
645            AlterKind::UnsetIndex { options } => {
646                Self::validate_column_alter_index_option(
647                    options.column_name(),
648                    metadata,
649                    options.is_fulltext(),
650                )?;
651            }
652        }
653        Ok(())
654    }
655
656    /// Returns true if we need to apply the alteration to the region.
657    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
658        debug_assert!(self.validate(metadata).is_ok());
659        match self {
660            AlterKind::AddColumns { columns } => columns
661                .iter()
662                .any(|col_to_add| col_to_add.need_alter(metadata)),
663            AlterKind::DropColumns { names } => names
664                .iter()
665                .any(|name| metadata.column_by_name(name).is_some()),
666            AlterKind::ModifyColumnTypes { columns } => columns
667                .iter()
668                .any(|col_to_change| col_to_change.need_alter(metadata)),
669            AlterKind::SetRegionOptions { .. } => {
670                // we need to update region options for `ChangeTableOptions`.
671                // todo: we need to check if ttl has ever changed.
672                true
673            }
674            AlterKind::UnsetRegionOptions { .. } => true,
675            AlterKind::SetIndex { options, .. } => {
676                metadata.column_by_name(options.column_name()).is_some()
677            }
678            AlterKind::UnsetIndex { options } => {
679                metadata.column_by_name(options.column_name()).is_some()
680            }
681        }
682    }
683
684    /// Returns an error if the column to drop is invalid.
685    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
686        let Some(column) = metadata.column_by_name(name) else {
687            return Ok(());
688        };
689        ensure!(
690            column.semantic_type == SemanticType::Field,
691            InvalidRegionRequestSnafu {
692                region_id: metadata.region_id,
693                err: format!("column {} is not a field and could not be dropped", name),
694            }
695        );
696        Ok(())
697    }
698
699    /// Returns an error if the column's alter index option is invalid.
700    fn validate_column_alter_index_option(
701        column_name: &String,
702        metadata: &RegionMetadata,
703        is_fulltext: bool,
704    ) -> Result<()> {
705        let column = metadata
706            .column_by_name(column_name)
707            .context(InvalidRegionRequestSnafu {
708                region_id: metadata.region_id,
709                err: format!("column {} not found", column_name),
710            })?;
711
712        if is_fulltext {
713            ensure!(
714                column.column_schema.data_type.is_string(),
715                InvalidRegionRequestSnafu {
716                    region_id: metadata.region_id,
717                    err: format!(
718                        "cannot change alter index options for non-string column {}",
719                        column_name
720                    ),
721                }
722            );
723        }
724
725        Ok(())
726    }
727}
728
729impl TryFrom<alter_request::Kind> for AlterKind {
730    type Error = MetadataError;
731
732    fn try_from(kind: alter_request::Kind) -> Result<Self> {
733        let alter_kind = match kind {
734            alter_request::Kind::AddColumns(x) => {
735                let columns = x
736                    .add_columns
737                    .into_iter()
738                    .map(|x| x.try_into())
739                    .collect::<Result<Vec<_>>>()?;
740                AlterKind::AddColumns { columns }
741            }
742            alter_request::Kind::ModifyColumnTypes(x) => {
743                let columns = x
744                    .modify_column_types
745                    .into_iter()
746                    .map(|x| x.into())
747                    .collect::<Vec<_>>();
748                AlterKind::ModifyColumnTypes { columns }
749            }
750            alter_request::Kind::DropColumns(x) => {
751                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
752                AlterKind::DropColumns { names }
753            }
754            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
755                options: options
756                    .table_options
757                    .iter()
758                    .map(TryFrom::try_from)
759                    .collect::<Result<Vec<_>>>()?,
760            },
761            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
762                keys: options
763                    .keys
764                    .iter()
765                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
766                    .collect::<Result<Vec<_>>>()?,
767            },
768            alter_request::Kind::SetIndex(o) => match o.options.unwrap() {
769                set_index::Options::Fulltext(x) => AlterKind::SetIndex {
770                    options: ApiSetIndexOptions::Fulltext {
771                        column_name: x.column_name.clone(),
772                        options: FulltextOptions {
773                            enable: x.enable,
774                            analyzer: as_fulltext_option_analyzer(
775                                Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
776                            ),
777                            case_sensitive: x.case_sensitive,
778                            backend: as_fulltext_option_backend(
779                                PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
780                            ),
781                        },
782                    },
783                },
784                set_index::Options::Inverted(i) => AlterKind::SetIndex {
785                    options: ApiSetIndexOptions::Inverted {
786                        column_name: i.column_name,
787                    },
788                },
789                set_index::Options::Skipping(s) => AlterKind::SetIndex {
790                    options: ApiSetIndexOptions::Skipping {
791                        column_name: s.column_name,
792                        options: SkippingIndexOptions {
793                            index_type: as_skipping_index_type(
794                                PbSkippingIndexType::try_from(s.skipping_index_type)
795                                    .context(DecodeProtoSnafu)?,
796                            ),
797                            granularity: s.granularity as u32,
798                        },
799                    },
800                },
801            },
802            alter_request::Kind::UnsetIndex(o) => match o.options.unwrap() {
803                v1::unset_index::Options::Fulltext(f) => AlterKind::UnsetIndex {
804                    options: ApiUnsetIndexOptions::Fulltext {
805                        column_name: f.column_name,
806                    },
807                },
808                v1::unset_index::Options::Inverted(i) => AlterKind::UnsetIndex {
809                    options: ApiUnsetIndexOptions::Inverted {
810                        column_name: i.column_name,
811                    },
812                },
813                v1::unset_index::Options::Skipping(s) => AlterKind::UnsetIndex {
814                    options: ApiUnsetIndexOptions::Skipping {
815                        column_name: s.column_name,
816                    },
817                },
818            },
819        };
820
821        Ok(alter_kind)
822    }
823}
824
825/// Adds a column.
826#[derive(Debug, PartialEq, Eq, Clone)]
827pub struct AddColumn {
828    /// Metadata of the column to add.
829    pub column_metadata: ColumnMetadata,
830    /// Location to add the column. If location is None, the region adds
831    /// the column to the last.
832    pub location: Option<AddColumnLocation>,
833}
834
835impl AddColumn {
836    /// Returns an error if the column to add is invalid.
837    ///
838    /// It allows adding existing columns. However, the existing column must have the same metadata
839    /// and the location must be None.
840    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
841        ensure!(
842            self.column_metadata.column_schema.is_nullable()
843                || self
844                    .column_metadata
845                    .column_schema
846                    .default_constraint()
847                    .is_some(),
848            InvalidRegionRequestSnafu {
849                region_id: metadata.region_id,
850                err: format!(
851                    "no default value for column {}",
852                    self.column_metadata.column_schema.name
853                ),
854            }
855        );
856
857        if let Some(existing_column) =
858            metadata.column_by_name(&self.column_metadata.column_schema.name)
859        {
860            // If the column already exists.
861            ensure!(
862                *existing_column == self.column_metadata,
863                InvalidRegionRequestSnafu {
864                    region_id: metadata.region_id,
865                    err: format!(
866                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
867                        self.column_metadata.column_schema.name, existing_column, self.column_metadata,
868                    ),
869                }
870            );
871            ensure!(
872                self.location.is_none(),
873                InvalidRegionRequestSnafu {
874                    region_id: metadata.region_id,
875                    err: format!(
876                        "column {} already exists, but location is specified",
877                        self.column_metadata.column_schema.name
878                    ),
879                }
880            );
881        }
882
883        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
884            // Ensures the existing column has the same name.
885            ensure!(
886                existing_column.column_schema.name == self.column_metadata.column_schema.name,
887                InvalidRegionRequestSnafu {
888                    region_id: metadata.region_id,
889                    err: format!(
890                        "column id {} already exists with different name {}",
891                        self.column_metadata.column_id, existing_column.column_schema.name
892                    ),
893                }
894            );
895        }
896
897        Ok(())
898    }
899
900    /// Returns true if no column to add to the region.
901    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
902        debug_assert!(self.validate(metadata).is_ok());
903        metadata
904            .column_by_name(&self.column_metadata.column_schema.name)
905            .is_none()
906    }
907}
908
909impl TryFrom<v1::region::AddColumn> for AddColumn {
910    type Error = MetadataError;
911
912    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
913        let column_def = add_column
914            .column_def
915            .context(InvalidRawRegionRequestSnafu {
916                err: "missing column_def in AddColumn",
917            })?;
918
919        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
920        let location = add_column
921            .location
922            .map(AddColumnLocation::try_from)
923            .transpose()?;
924
925        Ok(AddColumn {
926            column_metadata,
927            location,
928        })
929    }
930}
931
932/// Location to add a column.
933#[derive(Debug, PartialEq, Eq, Clone)]
934pub enum AddColumnLocation {
935    /// Add the column to the first position of columns.
936    First,
937    /// Add the column after specific column.
938    After {
939        /// Add the column after this column.
940        column_name: String,
941    },
942}
943
944impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
945    type Error = MetadataError;
946
947    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
948        let location_type = LocationType::try_from(location.location_type)
949            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
950        let add_column_location = match location_type {
951            LocationType::First => AddColumnLocation::First,
952            LocationType::After => AddColumnLocation::After {
953                column_name: location.after_column_name,
954            },
955        };
956
957        Ok(add_column_location)
958    }
959}
960
961/// Change a column's datatype.
962#[derive(Debug, PartialEq, Eq, Clone)]
963pub struct ModifyColumnType {
964    /// Schema of the column to modify.
965    pub column_name: String,
966    /// Column will be changed to this type.
967    pub target_type: ConcreteDataType,
968}
969
970impl ModifyColumnType {
971    /// Returns an error if the column's datatype to change is invalid.
972    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
973        let column_meta = metadata
974            .column_by_name(&self.column_name)
975            .with_context(|| InvalidRegionRequestSnafu {
976                region_id: metadata.region_id,
977                err: format!("column {} not found", self.column_name),
978            })?;
979
980        ensure!(
981            matches!(column_meta.semantic_type, SemanticType::Field),
982            InvalidRegionRequestSnafu {
983                region_id: metadata.region_id,
984                err: "'timestamp' or 'tag' column cannot change type".to_string()
985            }
986        );
987        ensure!(
988            column_meta
989                .column_schema
990                .data_type
991                .can_arrow_type_cast_to(&self.target_type),
992            InvalidRegionRequestSnafu {
993                region_id: metadata.region_id,
994                err: format!(
995                    "column '{}' cannot be cast automatically to type '{}'",
996                    self.column_name, self.target_type
997                ),
998            }
999        );
1000
1001        Ok(())
1002    }
1003
1004    /// Returns true if no column's datatype to change to the region.
1005    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1006        debug_assert!(self.validate(metadata).is_ok());
1007        metadata.column_by_name(&self.column_name).is_some()
1008    }
1009}
1010
1011impl From<v1::ModifyColumnType> for ModifyColumnType {
1012    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1013        let target_type = ColumnDataTypeWrapper::new(
1014            modify_column_type.target_type(),
1015            modify_column_type.target_type_extension,
1016        )
1017        .into();
1018
1019        ModifyColumnType {
1020            column_name: modify_column_type.column_name,
1021            target_type,
1022        }
1023    }
1024}
1025
1026#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1027pub enum SetRegionOption {
1028    Ttl(Option<TimeToLive>),
1029    // Modifying TwscOptions with values as (option name, new value).
1030    Twsc(String, String),
1031}
1032
1033impl TryFrom<&PbOption> for SetRegionOption {
1034    type Error = MetadataError;
1035
1036    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1037        let PbOption { key, value } = value;
1038        match key.as_str() {
1039            TTL_KEY => {
1040                let ttl = TimeToLive::from_humantime_or_str(value)
1041                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1042
1043                Ok(Self::Ttl(Some(ttl)))
1044            }
1045            TWCS_MAX_ACTIVE_WINDOW_RUNS
1046            | TWCS_MAX_ACTIVE_WINDOW_FILES
1047            | TWCS_MAX_INACTIVE_WINDOW_FILES
1048            | TWCS_MAX_INACTIVE_WINDOW_RUNS
1049            | TWCS_MAX_OUTPUT_FILE_SIZE
1050            | TWCS_TIME_WINDOW => Ok(Self::Twsc(key.to_string(), value.to_string())),
1051            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1052        }
1053    }
1054}
1055
1056impl From<&UnsetRegionOption> for SetRegionOption {
1057    fn from(unset_option: &UnsetRegionOption) -> Self {
1058        match unset_option {
1059            UnsetRegionOption::TwcsMaxActiveWindowFiles => {
1060                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1061            }
1062            UnsetRegionOption::TwcsMaxInactiveWindowFiles => {
1063                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1064            }
1065            UnsetRegionOption::TwcsMaxActiveWindowRuns => {
1066                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1067            }
1068            UnsetRegionOption::TwcsMaxInactiveWindowRuns => {
1069                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1070            }
1071            UnsetRegionOption::TwcsMaxOutputFileSize => {
1072                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1073            }
1074            UnsetRegionOption::TwcsTimeWindow => {
1075                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1076            }
1077            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1078        }
1079    }
1080}
1081
1082impl TryFrom<&str> for UnsetRegionOption {
1083    type Error = MetadataError;
1084
1085    fn try_from(key: &str) -> Result<Self> {
1086        match key.to_ascii_lowercase().as_str() {
1087            TTL_KEY => Ok(Self::Ttl),
1088            TWCS_MAX_ACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxActiveWindowFiles),
1089            TWCS_MAX_INACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxInactiveWindowFiles),
1090            TWCS_MAX_ACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxActiveWindowRuns),
1091            TWCS_MAX_INACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxInactiveWindowRuns),
1092            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1093            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1094            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1095        }
1096    }
1097}
1098
1099#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1100pub enum UnsetRegionOption {
1101    TwcsMaxActiveWindowFiles,
1102    TwcsMaxInactiveWindowFiles,
1103    TwcsMaxActiveWindowRuns,
1104    TwcsMaxInactiveWindowRuns,
1105    TwcsMaxOutputFileSize,
1106    TwcsTimeWindow,
1107    Ttl,
1108}
1109
1110impl UnsetRegionOption {
1111    pub fn as_str(&self) -> &str {
1112        match self {
1113            Self::Ttl => TTL_KEY,
1114            Self::TwcsMaxActiveWindowFiles => TWCS_MAX_ACTIVE_WINDOW_FILES,
1115            Self::TwcsMaxInactiveWindowFiles => TWCS_MAX_INACTIVE_WINDOW_FILES,
1116            Self::TwcsMaxActiveWindowRuns => TWCS_MAX_ACTIVE_WINDOW_RUNS,
1117            Self::TwcsMaxInactiveWindowRuns => TWCS_MAX_INACTIVE_WINDOW_RUNS,
1118            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1119            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1120        }
1121    }
1122}
1123
1124impl Display for UnsetRegionOption {
1125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1126        write!(f, "{}", self.as_str())
1127    }
1128}
1129
1130#[derive(Debug, Clone, Default)]
1131pub struct RegionFlushRequest {
1132    pub row_group_size: Option<usize>,
1133}
1134
1135#[derive(Debug)]
1136pub struct RegionCompactRequest {
1137    pub options: compact_request::Options,
1138}
1139
1140impl Default for RegionCompactRequest {
1141    fn default() -> Self {
1142        Self {
1143            // Default to regular compaction.
1144            options: compact_request::Options::Regular(Default::default()),
1145        }
1146    }
1147}
1148
1149/// Truncate region request.
1150#[derive(Debug)]
1151pub struct RegionTruncateRequest {}
1152
1153/// Catchup region request.
1154///
1155/// Makes a readonly region to catch up to leader region changes.
1156/// There is no effect if it operating on a leader region.
1157#[derive(Debug, Clone, Copy)]
1158pub struct RegionCatchupRequest {
1159    /// Sets it to writable if it's available after it has caught up with all changes.
1160    pub set_writable: bool,
1161    /// The `entry_id` that was expected to reply to.
1162    /// `None` stands replaying to latest.
1163    pub entry_id: Option<entry::Id>,
1164    /// Used for metrics metadata region.
1165    /// The `entry_id` that was expected to reply to.
1166    /// `None` stands replaying to latest.
1167    pub metadata_entry_id: Option<entry::Id>,
1168    /// The hint for replaying memtable.
1169    pub location_id: Option<u64>,
1170}
1171
1172/// Get sequences of regions by region ids.
1173#[derive(Debug, Clone)]
1174pub struct RegionSequencesRequest {
1175    pub region_ids: Vec<RegionId>,
1176}
1177
1178#[derive(Debug, Clone)]
1179pub struct RegionBulkInsertsRequest {
1180    pub region_id: RegionId,
1181    pub payloads: Vec<BulkInsertPayload>,
1182}
1183
1184#[derive(Debug, Clone)]
1185pub enum BulkInsertPayload {
1186    ArrowIpc(DfRecordBatch),
1187}
1188
1189impl fmt::Display for RegionRequest {
1190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1191        match self {
1192            RegionRequest::Put(_) => write!(f, "Put"),
1193            RegionRequest::Delete(_) => write!(f, "Delete"),
1194            RegionRequest::Create(_) => write!(f, "Create"),
1195            RegionRequest::Drop(_) => write!(f, "Drop"),
1196            RegionRequest::Open(_) => write!(f, "Open"),
1197            RegionRequest::Close(_) => write!(f, "Close"),
1198            RegionRequest::Alter(_) => write!(f, "Alter"),
1199            RegionRequest::Flush(_) => write!(f, "Flush"),
1200            RegionRequest::Compact(_) => write!(f, "Compact"),
1201            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1202            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1203            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1204        }
1205    }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210    use api::v1::region::RegionColumnDef;
1211    use api::v1::{ColumnDataType, ColumnDef};
1212    use datatypes::prelude::ConcreteDataType;
1213    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1214
1215    use super::*;
1216    use crate::metadata::RegionMetadataBuilder;
1217
1218    #[test]
1219    fn test_from_proto_location() {
1220        let proto_location = v1::AddColumnLocation {
1221            location_type: LocationType::First as i32,
1222            after_column_name: String::default(),
1223        };
1224        let location = AddColumnLocation::try_from(proto_location).unwrap();
1225        assert_eq!(location, AddColumnLocation::First);
1226
1227        let proto_location = v1::AddColumnLocation {
1228            location_type: 10,
1229            after_column_name: String::default(),
1230        };
1231        AddColumnLocation::try_from(proto_location).unwrap_err();
1232
1233        let proto_location = v1::AddColumnLocation {
1234            location_type: LocationType::After as i32,
1235            after_column_name: "a".to_string(),
1236        };
1237        let location = AddColumnLocation::try_from(proto_location).unwrap();
1238        assert_eq!(
1239            location,
1240            AddColumnLocation::After {
1241                column_name: "a".to_string()
1242            }
1243        );
1244    }
1245
1246    #[test]
1247    fn test_from_none_proto_add_column() {
1248        AddColumn::try_from(v1::region::AddColumn {
1249            column_def: None,
1250            location: None,
1251        })
1252        .unwrap_err();
1253    }
1254
1255    #[test]
1256    fn test_from_proto_alter_request() {
1257        RegionAlterRequest::try_from(AlterRequest {
1258            region_id: 0,
1259            schema_version: 1,
1260            kind: None,
1261        })
1262        .unwrap_err();
1263
1264        let request = RegionAlterRequest::try_from(AlterRequest {
1265            region_id: 0,
1266            schema_version: 1,
1267            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1268                add_columns: vec![v1::region::AddColumn {
1269                    column_def: Some(RegionColumnDef {
1270                        column_def: Some(ColumnDef {
1271                            name: "a".to_string(),
1272                            data_type: ColumnDataType::String as i32,
1273                            is_nullable: true,
1274                            default_constraint: vec![],
1275                            semantic_type: SemanticType::Field as i32,
1276                            comment: String::new(),
1277                            ..Default::default()
1278                        }),
1279                        column_id: 1,
1280                    }),
1281                    location: Some(v1::AddColumnLocation {
1282                        location_type: LocationType::First as i32,
1283                        after_column_name: String::default(),
1284                    }),
1285                }],
1286            })),
1287        })
1288        .unwrap();
1289
1290        assert_eq!(
1291            request,
1292            RegionAlterRequest {
1293                kind: AlterKind::AddColumns {
1294                    columns: vec![AddColumn {
1295                        column_metadata: ColumnMetadata {
1296                            column_schema: ColumnSchema::new(
1297                                "a",
1298                                ConcreteDataType::string_datatype(),
1299                                true,
1300                            ),
1301                            semantic_type: SemanticType::Field,
1302                            column_id: 1,
1303                        },
1304                        location: Some(AddColumnLocation::First),
1305                    }]
1306                },
1307            }
1308        );
1309    }
1310
1311    /// Returns a new region metadata for testing. Metadata:
1312    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1313    fn new_metadata() -> RegionMetadata {
1314        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1315        builder
1316            .push_column_metadata(ColumnMetadata {
1317                column_schema: ColumnSchema::new(
1318                    "ts",
1319                    ConcreteDataType::timestamp_millisecond_datatype(),
1320                    false,
1321                ),
1322                semantic_type: SemanticType::Timestamp,
1323                column_id: 1,
1324            })
1325            .push_column_metadata(ColumnMetadata {
1326                column_schema: ColumnSchema::new(
1327                    "tag_0",
1328                    ConcreteDataType::string_datatype(),
1329                    true,
1330                ),
1331                semantic_type: SemanticType::Tag,
1332                column_id: 2,
1333            })
1334            .push_column_metadata(ColumnMetadata {
1335                column_schema: ColumnSchema::new(
1336                    "field_0",
1337                    ConcreteDataType::string_datatype(),
1338                    true,
1339                ),
1340                semantic_type: SemanticType::Field,
1341                column_id: 3,
1342            })
1343            .push_column_metadata(ColumnMetadata {
1344                column_schema: ColumnSchema::new(
1345                    "field_1",
1346                    ConcreteDataType::boolean_datatype(),
1347                    true,
1348                ),
1349                semantic_type: SemanticType::Field,
1350                column_id: 4,
1351            })
1352            .primary_key(vec![2]);
1353        builder.build().unwrap()
1354    }
1355
1356    #[test]
1357    fn test_add_column_validate() {
1358        let metadata = new_metadata();
1359        let add_column = AddColumn {
1360            column_metadata: ColumnMetadata {
1361                column_schema: ColumnSchema::new(
1362                    "tag_1",
1363                    ConcreteDataType::string_datatype(),
1364                    true,
1365                ),
1366                semantic_type: SemanticType::Tag,
1367                column_id: 5,
1368            },
1369            location: None,
1370        };
1371        add_column.validate(&metadata).unwrap();
1372        assert!(add_column.need_alter(&metadata));
1373
1374        // Add not null column.
1375        AddColumn {
1376            column_metadata: ColumnMetadata {
1377                column_schema: ColumnSchema::new(
1378                    "tag_1",
1379                    ConcreteDataType::string_datatype(),
1380                    false,
1381                ),
1382                semantic_type: SemanticType::Tag,
1383                column_id: 5,
1384            },
1385            location: None,
1386        }
1387        .validate(&metadata)
1388        .unwrap_err();
1389
1390        // Add existing column.
1391        let add_column = AddColumn {
1392            column_metadata: ColumnMetadata {
1393                column_schema: ColumnSchema::new(
1394                    "tag_0",
1395                    ConcreteDataType::string_datatype(),
1396                    true,
1397                ),
1398                semantic_type: SemanticType::Tag,
1399                column_id: 2,
1400            },
1401            location: None,
1402        };
1403        add_column.validate(&metadata).unwrap();
1404        assert!(!add_column.need_alter(&metadata));
1405    }
1406
1407    #[test]
1408    fn test_add_duplicate_columns() {
1409        let kind = AlterKind::AddColumns {
1410            columns: vec![
1411                AddColumn {
1412                    column_metadata: ColumnMetadata {
1413                        column_schema: ColumnSchema::new(
1414                            "tag_1",
1415                            ConcreteDataType::string_datatype(),
1416                            true,
1417                        ),
1418                        semantic_type: SemanticType::Tag,
1419                        column_id: 5,
1420                    },
1421                    location: None,
1422                },
1423                AddColumn {
1424                    column_metadata: ColumnMetadata {
1425                        column_schema: ColumnSchema::new(
1426                            "tag_1",
1427                            ConcreteDataType::string_datatype(),
1428                            true,
1429                        ),
1430                        semantic_type: SemanticType::Field,
1431                        column_id: 6,
1432                    },
1433                    location: None,
1434                },
1435            ],
1436        };
1437        let metadata = new_metadata();
1438        kind.validate(&metadata).unwrap();
1439        assert!(kind.need_alter(&metadata));
1440    }
1441
1442    #[test]
1443    fn test_add_existing_column_different_metadata() {
1444        let metadata = new_metadata();
1445
1446        // Add existing column with different id.
1447        let kind = AlterKind::AddColumns {
1448            columns: vec![AddColumn {
1449                column_metadata: ColumnMetadata {
1450                    column_schema: ColumnSchema::new(
1451                        "tag_0",
1452                        ConcreteDataType::string_datatype(),
1453                        true,
1454                    ),
1455                    semantic_type: SemanticType::Tag,
1456                    column_id: 4,
1457                },
1458                location: None,
1459            }],
1460        };
1461        kind.validate(&metadata).unwrap_err();
1462
1463        // Add existing column with different type.
1464        let kind = AlterKind::AddColumns {
1465            columns: vec![AddColumn {
1466                column_metadata: ColumnMetadata {
1467                    column_schema: ColumnSchema::new(
1468                        "tag_0",
1469                        ConcreteDataType::int64_datatype(),
1470                        true,
1471                    ),
1472                    semantic_type: SemanticType::Tag,
1473                    column_id: 2,
1474                },
1475                location: None,
1476            }],
1477        };
1478        kind.validate(&metadata).unwrap_err();
1479
1480        // Add existing column with different name.
1481        let kind = AlterKind::AddColumns {
1482            columns: vec![AddColumn {
1483                column_metadata: ColumnMetadata {
1484                    column_schema: ColumnSchema::new(
1485                        "tag_1",
1486                        ConcreteDataType::string_datatype(),
1487                        true,
1488                    ),
1489                    semantic_type: SemanticType::Tag,
1490                    column_id: 2,
1491                },
1492                location: None,
1493            }],
1494        };
1495        kind.validate(&metadata).unwrap_err();
1496    }
1497
1498    #[test]
1499    fn test_add_existing_column_with_location() {
1500        let metadata = new_metadata();
1501        let kind = AlterKind::AddColumns {
1502            columns: vec![AddColumn {
1503                column_metadata: ColumnMetadata {
1504                    column_schema: ColumnSchema::new(
1505                        "tag_0",
1506                        ConcreteDataType::string_datatype(),
1507                        true,
1508                    ),
1509                    semantic_type: SemanticType::Tag,
1510                    column_id: 2,
1511                },
1512                location: Some(AddColumnLocation::First),
1513            }],
1514        };
1515        kind.validate(&metadata).unwrap_err();
1516    }
1517
1518    #[test]
1519    fn test_validate_drop_column() {
1520        let metadata = new_metadata();
1521        let kind = AlterKind::DropColumns {
1522            names: vec!["xxxx".to_string()],
1523        };
1524        kind.validate(&metadata).unwrap();
1525        assert!(!kind.need_alter(&metadata));
1526
1527        AlterKind::DropColumns {
1528            names: vec!["tag_0".to_string()],
1529        }
1530        .validate(&metadata)
1531        .unwrap_err();
1532
1533        let kind = AlterKind::DropColumns {
1534            names: vec!["field_0".to_string()],
1535        };
1536        kind.validate(&metadata).unwrap();
1537        assert!(kind.need_alter(&metadata));
1538    }
1539
1540    #[test]
1541    fn test_validate_modify_column_type() {
1542        let metadata = new_metadata();
1543        AlterKind::ModifyColumnTypes {
1544            columns: vec![ModifyColumnType {
1545                column_name: "xxxx".to_string(),
1546                target_type: ConcreteDataType::string_datatype(),
1547            }],
1548        }
1549        .validate(&metadata)
1550        .unwrap_err();
1551
1552        AlterKind::ModifyColumnTypes {
1553            columns: vec![ModifyColumnType {
1554                column_name: "field_1".to_string(),
1555                target_type: ConcreteDataType::date_datatype(),
1556            }],
1557        }
1558        .validate(&metadata)
1559        .unwrap_err();
1560
1561        AlterKind::ModifyColumnTypes {
1562            columns: vec![ModifyColumnType {
1563                column_name: "ts".to_string(),
1564                target_type: ConcreteDataType::date_datatype(),
1565            }],
1566        }
1567        .validate(&metadata)
1568        .unwrap_err();
1569
1570        AlterKind::ModifyColumnTypes {
1571            columns: vec![ModifyColumnType {
1572                column_name: "tag_0".to_string(),
1573                target_type: ConcreteDataType::date_datatype(),
1574            }],
1575        }
1576        .validate(&metadata)
1577        .unwrap_err();
1578
1579        let kind = AlterKind::ModifyColumnTypes {
1580            columns: vec![ModifyColumnType {
1581                column_name: "field_0".to_string(),
1582                target_type: ConcreteDataType::int32_datatype(),
1583            }],
1584        };
1585        kind.validate(&metadata).unwrap();
1586        assert!(kind.need_alter(&metadata));
1587    }
1588
1589    #[test]
1590    fn test_validate_add_columns() {
1591        let kind = AlterKind::AddColumns {
1592            columns: vec![
1593                AddColumn {
1594                    column_metadata: ColumnMetadata {
1595                        column_schema: ColumnSchema::new(
1596                            "tag_1",
1597                            ConcreteDataType::string_datatype(),
1598                            true,
1599                        ),
1600                        semantic_type: SemanticType::Tag,
1601                        column_id: 5,
1602                    },
1603                    location: None,
1604                },
1605                AddColumn {
1606                    column_metadata: ColumnMetadata {
1607                        column_schema: ColumnSchema::new(
1608                            "field_2",
1609                            ConcreteDataType::string_datatype(),
1610                            true,
1611                        ),
1612                        semantic_type: SemanticType::Field,
1613                        column_id: 6,
1614                    },
1615                    location: None,
1616                },
1617            ],
1618        };
1619        let request = RegionAlterRequest { kind };
1620        let mut metadata = new_metadata();
1621        metadata.schema_version = 1;
1622        request.validate(&metadata).unwrap();
1623    }
1624
1625    #[test]
1626    fn test_validate_create_region() {
1627        let column_metadatas = vec![
1628            ColumnMetadata {
1629                column_schema: ColumnSchema::new(
1630                    "ts",
1631                    ConcreteDataType::timestamp_millisecond_datatype(),
1632                    false,
1633                ),
1634                semantic_type: SemanticType::Timestamp,
1635                column_id: 1,
1636            },
1637            ColumnMetadata {
1638                column_schema: ColumnSchema::new(
1639                    "tag_0",
1640                    ConcreteDataType::string_datatype(),
1641                    true,
1642                ),
1643                semantic_type: SemanticType::Tag,
1644                column_id: 2,
1645            },
1646            ColumnMetadata {
1647                column_schema: ColumnSchema::new(
1648                    "field_0",
1649                    ConcreteDataType::string_datatype(),
1650                    true,
1651                ),
1652                semantic_type: SemanticType::Field,
1653                column_id: 3,
1654            },
1655        ];
1656        let create = RegionCreateRequest {
1657            engine: "mito".to_string(),
1658            column_metadatas,
1659            primary_key: vec![3, 4],
1660            options: HashMap::new(),
1661            region_dir: "path".to_string(),
1662        };
1663
1664        assert!(create.validate().is_err());
1665    }
1666
1667    #[test]
1668    fn test_validate_modify_column_fulltext_options() {
1669        let kind = AlterKind::SetIndex {
1670            options: ApiSetIndexOptions::Fulltext {
1671                column_name: "tag_0".to_string(),
1672                options: FulltextOptions {
1673                    enable: true,
1674                    analyzer: FulltextAnalyzer::Chinese,
1675                    case_sensitive: false,
1676                    backend: FulltextBackend::Bloom,
1677                },
1678            },
1679        };
1680        let request = RegionAlterRequest { kind };
1681        let mut metadata = new_metadata();
1682        metadata.schema_version = 1;
1683        request.validate(&metadata).unwrap();
1684
1685        let kind = AlterKind::UnsetIndex {
1686            options: ApiUnsetIndexOptions::Fulltext {
1687                column_name: "tag_0".to_string(),
1688            },
1689        };
1690        let request = RegionAlterRequest { kind };
1691        let mut metadata = new_metadata();
1692        metadata.schema_version = 1;
1693        request.validate(&metadata).unwrap();
1694    }
1695}