mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23};
24use api::v1::column_def::options_from_column_schema;
25use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
26use common_telemetry::info;
27use datatypes::prelude::DataType;
28use partition::expr::PartitionExpr;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{
37    MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
38};
39use store_api::region_request::{
40    AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
41    RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
42    RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest,
43    RegionOpenRequest, RegionRequest, RegionTruncateRequest, StagingPartitionDirective,
44};
45use store_api::storage::{FileId, RegionId};
46use tokio::sync::oneshot::{self, Receiver, Sender};
47
48use crate::error::{
49    CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
50    FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu,
51    Result, UnexpectedSnafu,
52};
53use crate::flush::FlushReason;
54use crate::manifest::action::{RegionEdit, TruncateKind};
55use crate::memtable::MemtableId;
56use crate::memtable::bulk::part::BulkPart;
57use crate::metrics::COMPACTION_ELAPSED_TOTAL;
58use crate::region::options::RegionOptions;
59use crate::sst::file::FileMeta;
60use crate::sst::index::IndexBuildType;
61use crate::wal::EntryId;
62use crate::wal::entry_distributor::WalEntryReceiver;
63
64/// Request to write a region.
65#[derive(Debug)]
66pub struct WriteRequest {
67    /// Region to write.
68    pub region_id: RegionId,
69    /// Type of the write request.
70    pub op_type: OpType,
71    /// Rows to write.
72    pub rows: Rows,
73    /// Map column name to column index in `rows`.
74    pub name_to_index: HashMap<String, usize>,
75    /// Whether each column has null.
76    pub has_null: Vec<bool>,
77    /// Write hint.
78    pub hint: Option<WriteHint>,
79    /// Region metadata on the time of this request is created.
80    pub(crate) region_metadata: Option<RegionMetadataRef>,
81    /// Partition expression version for the region.
82    pub partition_expr_version: Option<u64>,
83}
84
85impl WriteRequest {
86    /// Creates a new request.
87    ///
88    /// Returns `Err` if `rows` are invalid.
89    pub fn new(
90        region_id: RegionId,
91        op_type: OpType,
92        rows: Rows,
93        region_metadata: Option<RegionMetadataRef>,
94    ) -> Result<WriteRequest> {
95        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
96        for (index, column) in rows.schema.iter().enumerate() {
97            ensure!(
98                name_to_index
99                    .insert(column.column_name.clone(), index)
100                    .is_none(),
101                InvalidRequestSnafu {
102                    region_id,
103                    reason: format!("duplicate column {}", column.column_name),
104                }
105            );
106        }
107
108        let mut has_null = vec![false; rows.schema.len()];
109        for row in &rows.rows {
110            ensure!(
111                row.values.len() == rows.schema.len(),
112                InvalidRequestSnafu {
113                    region_id,
114                    reason: format!(
115                        "row has {} columns but schema has {}",
116                        row.values.len(),
117                        rows.schema.len()
118                    ),
119                }
120            );
121
122            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
123                validate_proto_value(region_id, value, column_schema)?;
124
125                if value.value_data.is_none() {
126                    has_null[i] = true;
127                }
128            }
129        }
130
131        Ok(WriteRequest {
132            region_id,
133            op_type,
134            rows,
135            name_to_index,
136            has_null,
137            hint: None,
138            region_metadata,
139            partition_expr_version: None,
140        })
141    }
142
143    /// Sets the write hint.
144    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
145        self.hint = hint;
146        self
147    }
148
149    pub fn with_partition_expr_version(mut self, partition_expr_version: Option<u64>) -> Self {
150        self.partition_expr_version = partition_expr_version;
151        self
152    }
153
154    /// Returns the encoding hint.
155    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
156        infer_primary_key_encoding_from_hint(self.hint.as_ref())
157    }
158
159    /// Returns estimated size of the request.
160    pub(crate) fn estimated_size(&self) -> usize {
161        let row_size = self
162            .rows
163            .rows
164            .first()
165            .map(|row| row.encoded_len())
166            .unwrap_or(0);
167        row_size * self.rows.rows.len()
168    }
169
170    /// Gets column index by name.
171    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
172        self.name_to_index.get(name).copied()
173    }
174
175    /// Checks schema of rows is compatible with schema of the region.
176    ///
177    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
178    /// error.
179    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
180        debug_assert_eq!(self.region_id, metadata.region_id);
181
182        let region_id = self.region_id;
183        // Index all columns in rows.
184        let mut rows_columns: HashMap<_, _> = self
185            .rows
186            .schema
187            .iter()
188            .map(|column| (&column.column_name, column))
189            .collect();
190
191        let mut need_fill_default = false;
192        // Checks all columns in this region.
193        for column in &metadata.column_metadatas {
194            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
195                // Check data type.
196                ensure!(
197                    is_column_type_value_eq(
198                        input_col.datatype,
199                        input_col.datatype_extension.clone(),
200                        &column.column_schema.data_type
201                    ),
202                    InvalidRequestSnafu {
203                        region_id,
204                        reason: format!(
205                            "column {} expect type {:?}, given: {}({})",
206                            column.column_schema.name,
207                            column.column_schema.data_type,
208                            ColumnDataType::try_from(input_col.datatype)
209                                .map(|v| v.as_str_name())
210                                .unwrap_or("Unknown"),
211                            input_col.datatype,
212                        )
213                    }
214                );
215
216                // Check semantic type.
217                ensure!(
218                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
219                    InvalidRequestSnafu {
220                        region_id,
221                        reason: format!(
222                            "column {} has semantic type {:?}, given: {}({})",
223                            column.column_schema.name,
224                            column.semantic_type,
225                            api::v1::SemanticType::try_from(input_col.semantic_type)
226                                .map(|v| v.as_str_name())
227                                .unwrap_or("Unknown"),
228                            input_col.semantic_type
229                        ),
230                    }
231                );
232
233                // Check nullable.
234                // Safety: `rows_columns` ensures this column exists.
235                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
236                ensure!(
237                    !has_null || column.column_schema.is_nullable(),
238                    InvalidRequestSnafu {
239                        region_id,
240                        reason: format!(
241                            "column {} is not null but input has null",
242                            column.column_schema.name
243                        ),
244                    }
245                );
246            } else {
247                // Rows don't have this column.
248                self.check_missing_column(column)?;
249
250                need_fill_default = true;
251            }
252        }
253
254        // Checks all columns in rows exist in the region.
255        if !rows_columns.is_empty() {
256            let names: Vec<_> = rows_columns.into_keys().collect();
257            return InvalidRequestSnafu {
258                region_id,
259                reason: format!("unknown columns: {:?}", names),
260            }
261            .fail();
262        }
263
264        // If we need to fill default values, return a special error.
265        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
266
267        Ok(())
268    }
269
270    /// Tries to fill missing columns.
271    ///
272    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
273    /// values.
274    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
275        debug_assert_eq!(self.region_id, metadata.region_id);
276
277        let mut columns_to_fill = vec![];
278        for column in &metadata.column_metadatas {
279            if !self.name_to_index.contains_key(&column.column_schema.name) {
280                columns_to_fill.push(column);
281            }
282        }
283        self.fill_columns(columns_to_fill)?;
284
285        Ok(())
286    }
287
288    /// Checks the schema and fill missing columns.
289    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
290        if let Err(e) = self.check_schema(metadata) {
291            if e.is_fill_default() {
292                // TODO(yingwen): Add metrics for this case.
293                // We need to fill default value. The write request may be a request
294                // sent before changing the schema.
295                self.fill_missing_columns(metadata)?;
296            } else {
297                return Err(e);
298            }
299        }
300
301        Ok(())
302    }
303
304    /// Fills default value for specific `columns`.
305    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
306        let mut default_values = Vec::with_capacity(columns.len());
307        let mut columns_to_fill = Vec::with_capacity(columns.len());
308        for column in columns {
309            let default_value = self.column_default_value(column)?;
310            if default_value.value_data.is_some() {
311                default_values.push(default_value);
312                columns_to_fill.push(column);
313            }
314        }
315
316        for row in &mut self.rows.rows {
317            row.values.extend(default_values.iter().cloned());
318        }
319
320        for column in columns_to_fill {
321            let (datatype, datatype_ext) =
322                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
323                    .with_context(|_| ConvertColumnDataTypeSnafu {
324                        reason: format!(
325                            "no protobuf type for column {} ({:?})",
326                            column.column_schema.name, column.column_schema.data_type
327                        ),
328                    })?
329                    .to_parts();
330            self.rows.schema.push(ColumnSchema {
331                column_name: column.column_schema.name.clone(),
332                datatype: datatype as i32,
333                semantic_type: column.semantic_type as i32,
334                datatype_extension: datatype_ext,
335                options: options_from_column_schema(&column.column_schema),
336            });
337        }
338
339        Ok(())
340    }
341
342    /// Checks whether we should allow a row doesn't provide this column.
343    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
344        if self.op_type == OpType::Delete {
345            if column.semantic_type == SemanticType::Field {
346                // For delete request, all tags and timestamp is required. We don't fill default
347                // tag or timestamp while deleting rows.
348                return Ok(());
349            } else {
350                return InvalidRequestSnafu {
351                    region_id: self.region_id,
352                    reason: format!("delete requests need column {}", column.column_schema.name),
353                }
354                .fail();
355            }
356        }
357
358        // Not a delete request. Checks whether they have default value.
359        ensure!(
360            column.column_schema.is_nullable()
361                || column.column_schema.default_constraint().is_some(),
362            InvalidRequestSnafu {
363                region_id: self.region_id,
364                reason: format!("missing column {}", column.column_schema.name),
365            }
366        );
367
368        Ok(())
369    }
370
371    /// Returns the default value for specific column.
372    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
373        let default_value = match self.op_type {
374            OpType::Delete => {
375                ensure!(
376                    column.semantic_type == SemanticType::Field,
377                    InvalidRequestSnafu {
378                        region_id: self.region_id,
379                        reason: format!(
380                            "delete requests need column {}",
381                            column.column_schema.name
382                        ),
383                    }
384                );
385
386                // For delete request, we need a default value for padding so we
387                // can delete a row even a field doesn't have a default value. So the
388                // value doesn't need to following the default value constraint of the
389                // column.
390                if column.column_schema.is_nullable() {
391                    datatypes::value::Value::Null
392                } else {
393                    column.column_schema.data_type.default_value()
394                }
395            }
396            OpType::Put => {
397                // For put requests, we use the default value from column schema.
398                if column.column_schema.is_default_impure() {
399                    UnexpectedSnafu {
400                        reason: format!(
401                            "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
402                            self.region_id,
403                            column.column_schema.name,
404                            column.column_schema.default_constraint(),
405                        ),
406                    }
407                    .fail()?
408                }
409                column
410                    .column_schema
411                    .create_default()
412                    .context(CreateDefaultSnafu {
413                        region_id: self.region_id,
414                        column: &column.column_schema.name,
415                    })?
416                    // This column doesn't have default value.
417                    .with_context(|| InvalidRequestSnafu {
418                        region_id: self.region_id,
419                        reason: format!(
420                            "column {} does not have default value",
421                            column.column_schema.name
422                        ),
423                    })?
424            }
425        };
426
427        // Convert default value into proto's value.
428        Ok(api::helper::to_grpc_value(default_value))
429    }
430}
431
432/// Validate proto value schema.
433pub(crate) fn validate_proto_value(
434    region_id: RegionId,
435    value: &Value,
436    column_schema: &ColumnSchema,
437) -> Result<()> {
438    if let Some(value_type) = proto_value_type(value) {
439        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
440            InvalidRequestSnafu {
441                region_id,
442                reason: format!(
443                    "column {} has unknown type {}",
444                    column_schema.column_name, column_schema.datatype
445                ),
446            }
447            .build()
448        })?;
449        ensure!(
450            proto_value_type_match(column_type, value_type),
451            InvalidRequestSnafu {
452                region_id,
453                reason: format!(
454                    "value has type {:?}, but column {} has type {:?}({})",
455                    value_type, column_schema.column_name, column_type, column_schema.datatype,
456                ),
457            }
458        );
459    }
460
461    Ok(())
462}
463
464fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
465    match (column_type, value_type) {
466        (ct, vt) if ct == vt => true,
467        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
468        (ColumnDataType::Json, ColumnDataType::Binary) => true,
469        _ => false,
470    }
471}
472
473/// Oneshot output result sender.
474#[derive(Debug)]
475pub struct OutputTx(Sender<Result<AffectedRows>>);
476
477impl OutputTx {
478    /// Creates a new output sender.
479    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
480        OutputTx(sender)
481    }
482
483    /// Sends the `result`.
484    pub(crate) fn send(self, result: Result<AffectedRows>) {
485        // Ignores send result.
486        let _ = self.0.send(result);
487    }
488}
489
490/// Optional output result sender.
491#[derive(Debug)]
492pub(crate) struct OptionOutputTx(Option<OutputTx>);
493
494impl OptionOutputTx {
495    /// Creates a sender.
496    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
497        OptionOutputTx(sender)
498    }
499
500    /// Creates an empty sender.
501    pub(crate) fn none() -> OptionOutputTx {
502        OptionOutputTx(None)
503    }
504
505    /// Sends the `result` and consumes the inner sender.
506    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
507        if let Some(sender) = self.0.take() {
508            sender.send(result);
509        }
510    }
511
512    /// Sends the `result` and consumes the sender.
513    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
514        if let Some(sender) = self.0.take() {
515            sender.send(result);
516        }
517    }
518
519    /// Takes the inner sender.
520    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
521        self.0.take()
522    }
523}
524
525impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
526    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
527        Self::new(Some(OutputTx::new(sender)))
528    }
529}
530
531impl OnFailure for OptionOutputTx {
532    fn on_failure(&mut self, err: Error) {
533        self.send_mut(Err(err));
534    }
535}
536
537/// Callback on failure.
538pub(crate) trait OnFailure {
539    /// Handles `err` on failure.
540    fn on_failure(&mut self, err: Error);
541}
542
543/// Sender and write request.
544#[derive(Debug)]
545pub(crate) struct SenderWriteRequest {
546    /// Result sender.
547    pub(crate) sender: OptionOutputTx,
548    pub(crate) request: WriteRequest,
549}
550
551pub(crate) struct SenderBulkRequest {
552    pub(crate) sender: OptionOutputTx,
553    pub(crate) region_id: RegionId,
554    pub(crate) request: BulkPart,
555    pub(crate) region_metadata: RegionMetadataRef,
556    pub(crate) partition_expr_version: Option<u64>,
557}
558
559/// Request sent to a worker with timestamp
560#[derive(Debug)]
561pub(crate) struct WorkerRequestWithTime {
562    pub(crate) request: WorkerRequest,
563    pub(crate) created_at: Instant,
564}
565
566impl WorkerRequestWithTime {
567    pub(crate) fn new(request: WorkerRequest) -> Self {
568        Self {
569            request,
570            created_at: Instant::now(),
571        }
572    }
573}
574
575/// Request sent to a worker
576#[derive(Debug)]
577pub(crate) enum WorkerRequest {
578    /// Write to a region.
579    Write(SenderWriteRequest),
580
581    /// Ddl request to a region.
582    Ddl(SenderDdlRequest),
583
584    /// Notifications from internal background jobs.
585    Background {
586        /// Id of the region to send.
587        region_id: RegionId,
588        /// Internal notification.
589        notify: BackgroundNotify,
590    },
591
592    /// The internal commands.
593    SetRegionRoleStateGracefully {
594        /// Id of the region to send.
595        region_id: RegionId,
596        /// The [SettableRegionRoleState].
597        region_role_state: SettableRegionRoleState,
598        /// The sender of [SetReadonlyResponse].
599        sender: Sender<SetRegionRoleStateResponse>,
600    },
601
602    /// Notify a worker to stop.
603    Stop,
604
605    /// Use [RegionEdit] to edit a region directly.
606    EditRegion(RegionEditRequest),
607
608    /// Keep the manifest of a region up to date.
609    SyncRegion(RegionSyncRequest),
610
611    /// Bulk inserts request and region metadata.
612    BulkInserts {
613        metadata: Option<RegionMetadataRef>,
614        request: RegionBulkInsertsRequest,
615        sender: OptionOutputTx,
616    },
617
618    /// Remap manifests request.
619    RemapManifests(RemapManifestsRequest),
620
621    /// Copy region from request.
622    CopyRegionFrom(CopyRegionFromRequest),
623}
624
625impl WorkerRequest {
626    /// Creates a new open region request.
627    pub(crate) fn new_open_region_request(
628        region_id: RegionId,
629        request: RegionOpenRequest,
630        entry_receiver: Option<WalEntryReceiver>,
631    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
632        let (sender, receiver) = oneshot::channel();
633
634        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
635            region_id,
636            sender: sender.into(),
637            request: DdlRequest::Open((request, entry_receiver)),
638        });
639
640        (worker_request, receiver)
641    }
642
643    /// Creates a new catchup region request.
644    pub(crate) fn new_catchup_region_request(
645        region_id: RegionId,
646        request: RegionCatchupRequest,
647        entry_receiver: Option<WalEntryReceiver>,
648    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
649        let (sender, receiver) = oneshot::channel();
650        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
651            region_id,
652            sender: sender.into(),
653            request: DdlRequest::Catchup((request, entry_receiver)),
654        });
655        (worker_request, receiver)
656    }
657
658    /// Converts request from a [RegionRequest].
659    pub(crate) fn try_from_region_request(
660        region_id: RegionId,
661        value: RegionRequest,
662        region_metadata: Option<RegionMetadataRef>,
663    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
664        let (sender, receiver) = oneshot::channel();
665        let worker_request = match value {
666            RegionRequest::Put(v) => {
667                let mut write_request =
668                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
669                        .with_hint(v.hint)
670                        .with_partition_expr_version(v.partition_expr_version);
671                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
672                    && let Some(region_metadata) = &region_metadata
673                {
674                    write_request.maybe_fill_missing_columns(region_metadata)?;
675                }
676                WorkerRequest::Write(SenderWriteRequest {
677                    sender: sender.into(),
678                    request: write_request,
679                })
680            }
681            RegionRequest::Delete(v) => {
682                let mut write_request =
683                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
684                        .with_hint(v.hint)
685                        .with_partition_expr_version(v.partition_expr_version);
686                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
687                    && let Some(region_metadata) = &region_metadata
688                {
689                    write_request.maybe_fill_missing_columns(region_metadata)?;
690                }
691                WorkerRequest::Write(SenderWriteRequest {
692                    sender: sender.into(),
693                    request: write_request,
694                })
695            }
696            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
697                region_id,
698                sender: sender.into(),
699                request: DdlRequest::Create(v),
700            }),
701            RegionRequest::Drop(v) => WorkerRequest::Ddl(SenderDdlRequest {
702                region_id,
703                sender: sender.into(),
704                request: DdlRequest::Drop(v),
705            }),
706            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
707                region_id,
708                sender: sender.into(),
709                request: DdlRequest::Open((v, None)),
710            }),
711            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
712                region_id,
713                sender: sender.into(),
714                request: DdlRequest::Close(v),
715            }),
716            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
717                region_id,
718                sender: sender.into(),
719                request: DdlRequest::Alter(v),
720            }),
721            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
722                region_id,
723                sender: sender.into(),
724                request: DdlRequest::Flush(v),
725            }),
726            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
727                region_id,
728                sender: sender.into(),
729                request: DdlRequest::Compact(v),
730            }),
731            RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
732                region_id,
733                sender: sender.into(),
734                request: DdlRequest::BuildIndex(v),
735            }),
736            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
737                region_id,
738                sender: sender.into(),
739                request: DdlRequest::Truncate(v),
740            }),
741            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
742                region_id,
743                sender: sender.into(),
744                request: DdlRequest::Catchup((v, None)),
745            }),
746            RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
747                region_id,
748                sender: sender.into(),
749                request: DdlRequest::EnterStaging(v),
750            }),
751            RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
752                metadata: region_metadata,
753                sender: sender.into(),
754                request: region_bulk_inserts_request,
755            },
756            RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
757                region_id,
758                sender: sender.into(),
759                request: DdlRequest::ApplyStagingManifest(v),
760            }),
761        };
762
763        Ok((worker_request, receiver))
764    }
765
766    pub(crate) fn new_set_readonly_gracefully(
767        region_id: RegionId,
768        region_role_state: SettableRegionRoleState,
769    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
770        let (sender, receiver) = oneshot::channel();
771
772        (
773            WorkerRequest::SetRegionRoleStateGracefully {
774                region_id,
775                region_role_state,
776                sender,
777            },
778            receiver,
779        )
780    }
781
782    pub(crate) fn new_sync_region_request(
783        region_id: RegionId,
784        manifest_version: ManifestVersion,
785    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
786        let (sender, receiver) = oneshot::channel();
787        (
788            WorkerRequest::SyncRegion(RegionSyncRequest {
789                region_id,
790                manifest_version,
791                sender,
792            }),
793            receiver,
794        )
795    }
796
797    /// Converts [RemapManifestsRequest] from a [RemapManifestsRequest](store_api::region_engine::RemapManifestsRequest).
798    ///
799    /// # Errors
800    ///
801    /// Returns an error if the partition expression is invalid or missing.
802    /// Returns an error if the new partition expressions are not found for some regions.
803    #[allow(clippy::type_complexity)]
804    pub(crate) fn try_from_remap_manifests_request(
805        store_api::region_engine::RemapManifestsRequest {
806            region_id,
807            input_regions,
808            region_mapping,
809            new_partition_exprs,
810        }: store_api::region_engine::RemapManifestsRequest,
811    ) -> Result<(WorkerRequest, Receiver<Result<HashMap<RegionId, String>>>)> {
812        let (sender, receiver) = oneshot::channel();
813        let new_partition_exprs = new_partition_exprs
814            .into_iter()
815            .map(|(k, v)| {
816                Ok((
817                    k,
818                    PartitionExpr::from_json_str(&v)
819                        .context(InvalidPartitionExprSnafu { expr: v })?
820                        .context(MissingPartitionExprSnafu { region_id: k })?,
821                ))
822            })
823            .collect::<Result<HashMap<_, _>>>()?;
824
825        let request = RemapManifestsRequest {
826            region_id,
827            input_regions,
828            region_mapping,
829            new_partition_exprs,
830            sender,
831        };
832
833        Ok((WorkerRequest::RemapManifests(request), receiver))
834    }
835
836    /// Converts [CopyRegionFromRequest] from a [MitoCopyRegionFromRequest](store_api::region_engine::MitoCopyRegionFromRequest).
837    pub(crate) fn try_from_copy_region_from_request(
838        region_id: RegionId,
839        store_api::region_engine::MitoCopyRegionFromRequest {
840            source_region_id,
841            parallelism,
842        }: store_api::region_engine::MitoCopyRegionFromRequest,
843    ) -> Result<(WorkerRequest, Receiver<Result<MitoCopyRegionFromResponse>>)> {
844        let (sender, receiver) = oneshot::channel();
845        let request = CopyRegionFromRequest {
846            region_id,
847            source_region_id,
848            parallelism,
849            sender,
850        };
851        Ok((WorkerRequest::CopyRegionFrom(request), receiver))
852    }
853}
854
855/// DDL request to a region.
856#[derive(Debug)]
857pub(crate) enum DdlRequest {
858    Create(RegionCreateRequest),
859    Drop(RegionDropRequest),
860    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
861    Close(RegionCloseRequest),
862    Alter(RegionAlterRequest),
863    Flush(RegionFlushRequest),
864    Compact(RegionCompactRequest),
865    BuildIndex(RegionBuildIndexRequest),
866    Truncate(RegionTruncateRequest),
867    Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
868    EnterStaging(EnterStagingRequest),
869    ApplyStagingManifest(ApplyStagingManifestRequest),
870}
871
872/// Sender and Ddl request.
873#[derive(Debug)]
874pub(crate) struct SenderDdlRequest {
875    /// Region id of the request.
876    pub(crate) region_id: RegionId,
877    /// Result sender.
878    pub(crate) sender: OptionOutputTx,
879    /// Ddl request.
880    pub(crate) request: DdlRequest,
881}
882
883/// Notification from a background job.
884#[derive(Debug)]
885pub(crate) enum BackgroundNotify {
886    /// Flush has finished.
887    FlushFinished(FlushFinished),
888    /// Flush has failed.
889    FlushFailed(FlushFailed),
890    /// Index build has finished.
891    IndexBuildFinished(IndexBuildFinished),
892    /// Index build has been stopped (aborted or succeeded).
893    IndexBuildStopped(IndexBuildStopped),
894    /// Index build has failed.
895    IndexBuildFailed(IndexBuildFailed),
896    /// Compaction has finished.
897    CompactionFinished(CompactionFinished),
898    /// Compaction has failed.
899    CompactionFailed(CompactionFailed),
900    /// Truncate result.
901    Truncate(TruncateResult),
902    /// Region change result.
903    RegionChange(RegionChangeResult),
904    /// Region edit result.
905    RegionEdit(RegionEditResult),
906    /// Enter staging result.
907    EnterStaging(EnterStagingResult),
908    /// Copy region result.
909    CopyRegionFromFinished(CopyRegionFromFinished),
910}
911
912/// Notifies a flush job is finished.
913#[derive(Debug)]
914pub(crate) struct FlushFinished {
915    /// Region id.
916    pub(crate) region_id: RegionId,
917    /// Entry id of flushed data.
918    pub(crate) flushed_entry_id: EntryId,
919    /// Flush result senders.
920    pub(crate) senders: Vec<OutputTx>,
921    /// Flush timer.
922    pub(crate) _timer: HistogramTimer,
923    /// Region edit to apply.
924    pub(crate) edit: RegionEdit,
925    /// Memtables to remove.
926    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
927    /// Whether the region is in staging mode.
928    pub(crate) is_staging: bool,
929    /// Reason for flush.
930    pub(crate) flush_reason: FlushReason,
931}
932
933impl FlushFinished {
934    /// Marks the flush job as successful and observes the timer.
935    pub(crate) fn on_success(self) {
936        for sender in self.senders {
937            sender.send(Ok(0));
938        }
939    }
940}
941
942impl OnFailure for FlushFinished {
943    fn on_failure(&mut self, err: Error) {
944        let err = Arc::new(err);
945        for sender in self.senders.drain(..) {
946            sender.send(Err(err.clone()).context(FlushRegionSnafu {
947                region_id: self.region_id,
948            }));
949        }
950    }
951}
952
953/// Notifies a flush job is failed.
954#[derive(Debug)]
955pub(crate) struct FlushFailed {
956    /// The error source of the failure.
957    pub(crate) err: Arc<Error>,
958}
959
960#[derive(Debug)]
961pub(crate) struct IndexBuildFinished {
962    #[allow(dead_code)]
963    pub(crate) region_id: RegionId,
964    pub(crate) edit: RegionEdit,
965}
966
967/// Notifies an index build job has been stopped.
968#[derive(Debug)]
969pub(crate) struct IndexBuildStopped {
970    #[allow(dead_code)]
971    pub(crate) region_id: RegionId,
972    pub(crate) file_id: FileId,
973}
974
975/// Notifies an index build job has failed.
976#[derive(Debug)]
977pub(crate) struct IndexBuildFailed {
978    pub(crate) err: Arc<Error>,
979}
980
981/// Notifies a compaction job has finished.
982#[derive(Debug)]
983pub(crate) struct CompactionFinished {
984    /// Region id.
985    pub(crate) region_id: RegionId,
986    /// Compaction result senders.
987    pub(crate) senders: Vec<OutputTx>,
988    /// Start time of compaction task.
989    pub(crate) start_time: Instant,
990    /// Region edit to apply.
991    pub(crate) edit: RegionEdit,
992}
993
994impl CompactionFinished {
995    pub fn on_success(self) {
996        // only update compaction time on success
997        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
998
999        for sender in self.senders {
1000            sender.send(Ok(0));
1001        }
1002        info!("Successfully compacted region: {}", self.region_id);
1003    }
1004}
1005
1006impl OnFailure for CompactionFinished {
1007    /// Compaction succeeded but failed to update manifest or region's already been dropped.
1008    fn on_failure(&mut self, err: Error) {
1009        let err = Arc::new(err);
1010        for sender in self.senders.drain(..) {
1011            sender.send(Err(err.clone()).context(CompactRegionSnafu {
1012                region_id: self.region_id,
1013            }));
1014        }
1015    }
1016}
1017
1018/// A failing compaction result.
1019#[derive(Debug)]
1020pub(crate) struct CompactionFailed {
1021    pub(crate) region_id: RegionId,
1022    /// The error source of the failure.
1023    pub(crate) err: Arc<Error>,
1024}
1025
1026/// Notifies the truncate result of a region.
1027#[derive(Debug)]
1028pub(crate) struct TruncateResult {
1029    /// Region id.
1030    pub(crate) region_id: RegionId,
1031    /// Result sender.
1032    pub(crate) sender: OptionOutputTx,
1033    /// Truncate result.
1034    pub(crate) result: Result<()>,
1035    pub(crate) kind: TruncateKind,
1036}
1037
1038/// Notifies the region the result of writing region change action.
1039#[derive(Debug)]
1040pub(crate) struct RegionChangeResult {
1041    /// Region id.
1042    pub(crate) region_id: RegionId,
1043    /// The new region metadata to apply.
1044    pub(crate) new_meta: RegionMetadataRef,
1045    /// Result sender.
1046    pub(crate) sender: OptionOutputTx,
1047    /// Result from the manifest manager.
1048    pub(crate) result: Result<()>,
1049    /// Used for index build in schema change.
1050    pub(crate) need_index: bool,
1051    /// New options for the region.
1052    pub(crate) new_options: Option<RegionOptions>,
1053}
1054
1055/// Notifies the region the result of entering staging.
1056#[derive(Debug)]
1057pub(crate) struct EnterStagingResult {
1058    /// Region id.
1059    pub(crate) region_id: RegionId,
1060    /// The new staging partition directive to apply.
1061    pub(crate) partition_directive: StagingPartitionDirective,
1062    /// Result sender.
1063    pub(crate) sender: OptionOutputTx,
1064    /// Result from the manifest manager.
1065    pub(crate) result: Result<()>,
1066}
1067
1068#[derive(Debug)]
1069pub(crate) struct CopyRegionFromFinished {
1070    /// Region id.
1071    pub(crate) region_id: RegionId,
1072    /// Region edit to apply.
1073    pub(crate) edit: RegionEdit,
1074    /// Result sender.
1075    pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1076}
1077
1078/// Request to edit a region directly.
1079#[derive(Debug)]
1080pub(crate) struct RegionEditRequest {
1081    pub(crate) region_id: RegionId,
1082    pub(crate) edit: RegionEdit,
1083    /// The sender to notify the result to the region engine.
1084    pub(crate) tx: Sender<Result<()>>,
1085}
1086
1087/// Notifies the regin the result of editing region.
1088#[derive(Debug)]
1089pub(crate) struct RegionEditResult {
1090    /// Region id.
1091    pub(crate) region_id: RegionId,
1092    /// Result sender.
1093    pub(crate) sender: Sender<Result<()>>,
1094    /// Region edit to apply.
1095    pub(crate) edit: RegionEdit,
1096    /// Result from the manifest manager.
1097    pub(crate) result: Result<()>,
1098    /// Whether region state need to be set to Writable after handling this request.
1099    pub(crate) update_region_state: bool,
1100    /// The region is in staging mode before handling this request.
1101    pub(crate) is_staging: bool,
1102}
1103
1104#[derive(Debug)]
1105pub(crate) struct BuildIndexRequest {
1106    pub(crate) region_id: RegionId,
1107    pub(crate) build_type: IndexBuildType,
1108    /// files need to build index, empty means all.
1109    pub(crate) file_metas: Vec<FileMeta>,
1110}
1111
1112#[derive(Debug)]
1113pub(crate) struct RegionSyncRequest {
1114    pub(crate) region_id: RegionId,
1115    pub(crate) manifest_version: ManifestVersion,
1116    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
1117    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
1118}
1119
1120#[derive(Debug)]
1121pub(crate) struct RemapManifestsRequest {
1122    /// The [`RegionId`] of a staging region used to obtain table directory and storage configuration for the remap operation.
1123    pub(crate) region_id: RegionId,
1124    /// Regions to remap manifests from.
1125    pub(crate) input_regions: Vec<RegionId>,
1126    /// For each old region, which new regions should receive its files
1127    pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
1128    /// New partition expressions for the new regions.
1129    pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
1130    /// Sender for the result of the remap operation.
1131    ///
1132    /// The result is a map from region IDs to their corresponding staging manifest paths.
1133    pub(crate) sender: Sender<Result<HashMap<RegionId, String>>>,
1134}
1135
1136#[derive(Debug)]
1137pub(crate) struct CopyRegionFromRequest {
1138    /// The [`RegionId`] of the target region.
1139    pub(crate) region_id: RegionId,
1140    /// The [`RegionId`] of the source region.
1141    pub(crate) source_region_id: RegionId,
1142    /// The parallelism of the copy operation.
1143    pub(crate) parallelism: usize,
1144    /// Result sender.
1145    pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1146}
1147
1148#[cfg(test)]
1149mod tests {
1150    use api::v1::value::ValueData;
1151    use api::v1::{Row, SemanticType};
1152    use datatypes::prelude::ConcreteDataType;
1153    use datatypes::schema::ColumnDefaultConstraint;
1154    use mito_codec::test_util::i64_value;
1155    use store_api::metadata::RegionMetadataBuilder;
1156
1157    use super::*;
1158    use crate::error::Error;
1159    use crate::test_util::ts_ms_value;
1160
1161    fn new_column_schema(
1162        name: &str,
1163        data_type: ColumnDataType,
1164        semantic_type: SemanticType,
1165    ) -> ColumnSchema {
1166        ColumnSchema {
1167            column_name: name.to_string(),
1168            datatype: data_type as i32,
1169            semantic_type: semantic_type as i32,
1170            ..Default::default()
1171        }
1172    }
1173
1174    fn check_invalid_request(err: &Error, expect: &str) {
1175        if let Error::InvalidRequest {
1176            region_id: _,
1177            reason,
1178            location: _,
1179        } = err
1180        {
1181            assert_eq!(reason, expect);
1182        } else {
1183            panic!("Unexpected error {err}")
1184        }
1185    }
1186
1187    #[test]
1188    fn test_write_request_duplicate_column() {
1189        let rows = Rows {
1190            schema: vec![
1191                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1192                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1193            ],
1194            rows: vec![],
1195        };
1196
1197        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1198        check_invalid_request(&err, "duplicate column c0");
1199    }
1200
1201    #[test]
1202    fn test_valid_write_request() {
1203        let rows = Rows {
1204            schema: vec![
1205                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1206                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1207            ],
1208            rows: vec![Row {
1209                values: vec![i64_value(1), i64_value(2)],
1210            }],
1211        };
1212
1213        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1214        assert_eq!(0, request.column_index_by_name("c0").unwrap());
1215        assert_eq!(1, request.column_index_by_name("c1").unwrap());
1216        assert_eq!(None, request.column_index_by_name("c2"));
1217    }
1218
1219    #[test]
1220    fn test_write_request_column_num() {
1221        let rows = Rows {
1222            schema: vec![
1223                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1224                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1225            ],
1226            rows: vec![Row {
1227                values: vec![i64_value(1), i64_value(2), i64_value(3)],
1228            }],
1229        };
1230
1231        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1232        check_invalid_request(&err, "row has 3 columns but schema has 2");
1233    }
1234
1235    fn new_region_metadata() -> RegionMetadata {
1236        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1237        builder
1238            .push_column_metadata(ColumnMetadata {
1239                column_schema: datatypes::schema::ColumnSchema::new(
1240                    "ts",
1241                    ConcreteDataType::timestamp_millisecond_datatype(),
1242                    false,
1243                ),
1244                semantic_type: SemanticType::Timestamp,
1245                column_id: 1,
1246            })
1247            .push_column_metadata(ColumnMetadata {
1248                column_schema: datatypes::schema::ColumnSchema::new(
1249                    "k0",
1250                    ConcreteDataType::int64_datatype(),
1251                    true,
1252                ),
1253                semantic_type: SemanticType::Tag,
1254                column_id: 2,
1255            })
1256            .primary_key(vec![2]);
1257        builder.build().unwrap()
1258    }
1259
1260    #[test]
1261    fn test_check_schema() {
1262        let rows = Rows {
1263            schema: vec![
1264                new_column_schema(
1265                    "ts",
1266                    ColumnDataType::TimestampMillisecond,
1267                    SemanticType::Timestamp,
1268                ),
1269                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1270            ],
1271            rows: vec![Row {
1272                values: vec![ts_ms_value(1), i64_value(2)],
1273            }],
1274        };
1275        let metadata = new_region_metadata();
1276
1277        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1278        request.check_schema(&metadata).unwrap();
1279    }
1280
1281    #[test]
1282    fn test_column_type() {
1283        let rows = Rows {
1284            schema: vec![
1285                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1286                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1287            ],
1288            rows: vec![Row {
1289                values: vec![i64_value(1), i64_value(2)],
1290            }],
1291        };
1292        let metadata = new_region_metadata();
1293
1294        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1295        let err = request.check_schema(&metadata).unwrap_err();
1296        check_invalid_request(
1297            &err,
1298            "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1299        );
1300    }
1301
1302    #[test]
1303    fn test_semantic_type() {
1304        let rows = Rows {
1305            schema: vec![
1306                new_column_schema(
1307                    "ts",
1308                    ColumnDataType::TimestampMillisecond,
1309                    SemanticType::Tag,
1310                ),
1311                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1312            ],
1313            rows: vec![Row {
1314                values: vec![ts_ms_value(1), i64_value(2)],
1315            }],
1316        };
1317        let metadata = new_region_metadata();
1318
1319        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1320        let err = request.check_schema(&metadata).unwrap_err();
1321        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1322    }
1323
1324    #[test]
1325    fn test_column_nullable() {
1326        let rows = Rows {
1327            schema: vec![
1328                new_column_schema(
1329                    "ts",
1330                    ColumnDataType::TimestampMillisecond,
1331                    SemanticType::Timestamp,
1332                ),
1333                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1334            ],
1335            rows: vec![Row {
1336                values: vec![Value { value_data: None }, i64_value(2)],
1337            }],
1338        };
1339        let metadata = new_region_metadata();
1340
1341        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1342        let err = request.check_schema(&metadata).unwrap_err();
1343        check_invalid_request(&err, "column ts is not null but input has null");
1344    }
1345
1346    #[test]
1347    fn test_column_default() {
1348        let rows = Rows {
1349            schema: vec![new_column_schema(
1350                "k0",
1351                ColumnDataType::Int64,
1352                SemanticType::Tag,
1353            )],
1354            rows: vec![Row {
1355                values: vec![i64_value(1)],
1356            }],
1357        };
1358        let metadata = new_region_metadata();
1359
1360        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1361        let err = request.check_schema(&metadata).unwrap_err();
1362        check_invalid_request(&err, "missing column ts");
1363    }
1364
1365    #[test]
1366    fn test_unknown_column() {
1367        let rows = Rows {
1368            schema: vec![
1369                new_column_schema(
1370                    "ts",
1371                    ColumnDataType::TimestampMillisecond,
1372                    SemanticType::Timestamp,
1373                ),
1374                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1375                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1376            ],
1377            rows: vec![Row {
1378                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1379            }],
1380        };
1381        let metadata = new_region_metadata();
1382
1383        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1384        let err = request.check_schema(&metadata).unwrap_err();
1385        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1386    }
1387
1388    #[test]
1389    fn test_fill_impure_columns_err() {
1390        let rows = Rows {
1391            schema: vec![new_column_schema(
1392                "k0",
1393                ColumnDataType::Int64,
1394                SemanticType::Tag,
1395            )],
1396            rows: vec![Row {
1397                values: vec![i64_value(1)],
1398            }],
1399        };
1400        let metadata = {
1401            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1402            builder
1403                .push_column_metadata(ColumnMetadata {
1404                    column_schema: datatypes::schema::ColumnSchema::new(
1405                        "ts",
1406                        ConcreteDataType::timestamp_millisecond_datatype(),
1407                        false,
1408                    )
1409                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1410                        "now()".to_string(),
1411                    )))
1412                    .unwrap(),
1413                    semantic_type: SemanticType::Timestamp,
1414                    column_id: 1,
1415                })
1416                .push_column_metadata(ColumnMetadata {
1417                    column_schema: datatypes::schema::ColumnSchema::new(
1418                        "k0",
1419                        ConcreteDataType::int64_datatype(),
1420                        true,
1421                    ),
1422                    semantic_type: SemanticType::Tag,
1423                    column_id: 2,
1424                })
1425                .primary_key(vec![2]);
1426            builder.build().unwrap()
1427        };
1428
1429        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1430        let err = request.check_schema(&metadata).unwrap_err();
1431        assert!(err.is_fill_default());
1432        assert!(
1433            request
1434                .fill_missing_columns(&metadata)
1435                .unwrap_err()
1436                .to_string()
1437                .contains("unexpected impure default value with region_id")
1438        );
1439    }
1440
1441    #[test]
1442    fn test_fill_missing_columns() {
1443        let rows = Rows {
1444            schema: vec![new_column_schema(
1445                "ts",
1446                ColumnDataType::TimestampMillisecond,
1447                SemanticType::Timestamp,
1448            )],
1449            rows: vec![Row {
1450                values: vec![ts_ms_value(1)],
1451            }],
1452        };
1453        let metadata = new_region_metadata();
1454
1455        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1456        let err = request.check_schema(&metadata).unwrap_err();
1457        assert!(err.is_fill_default());
1458        request.fill_missing_columns(&metadata).unwrap();
1459
1460        let expect_rows = Rows {
1461            schema: vec![new_column_schema(
1462                "ts",
1463                ColumnDataType::TimestampMillisecond,
1464                SemanticType::Timestamp,
1465            )],
1466            rows: vec![Row {
1467                values: vec![ts_ms_value(1)],
1468            }],
1469        };
1470        assert_eq!(expect_rows, request.rows);
1471    }
1472
1473    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1474        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1475        builder
1476            .push_column_metadata(ColumnMetadata {
1477                column_schema: datatypes::schema::ColumnSchema::new(
1478                    "ts",
1479                    ConcreteDataType::timestamp_millisecond_datatype(),
1480                    false,
1481                ),
1482                semantic_type: SemanticType::Timestamp,
1483                column_id: 1,
1484            })
1485            .push_column_metadata(ColumnMetadata {
1486                column_schema: datatypes::schema::ColumnSchema::new(
1487                    "k0",
1488                    ConcreteDataType::int64_datatype(),
1489                    true,
1490                ),
1491                semantic_type: SemanticType::Tag,
1492                column_id: 2,
1493            })
1494            .primary_key(vec![2]);
1495        builder
1496    }
1497
1498    fn region_metadata_two_fields() -> RegionMetadata {
1499        let mut builder = builder_with_ts_tag();
1500        builder
1501            .push_column_metadata(ColumnMetadata {
1502                column_schema: datatypes::schema::ColumnSchema::new(
1503                    "f0",
1504                    ConcreteDataType::int64_datatype(),
1505                    true,
1506                ),
1507                semantic_type: SemanticType::Field,
1508                column_id: 3,
1509            })
1510            // Column is not nullable.
1511            .push_column_metadata(ColumnMetadata {
1512                column_schema: datatypes::schema::ColumnSchema::new(
1513                    "f1",
1514                    ConcreteDataType::int64_datatype(),
1515                    false,
1516                )
1517                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1518                    datatypes::value::Value::Int64(100),
1519                )))
1520                .unwrap(),
1521                semantic_type: SemanticType::Field,
1522                column_id: 4,
1523            });
1524        builder.build().unwrap()
1525    }
1526
1527    #[test]
1528    fn test_fill_missing_for_delete() {
1529        let rows = Rows {
1530            schema: vec![new_column_schema(
1531                "ts",
1532                ColumnDataType::TimestampMillisecond,
1533                SemanticType::Timestamp,
1534            )],
1535            rows: vec![Row {
1536                values: vec![ts_ms_value(1)],
1537            }],
1538        };
1539        let metadata = region_metadata_two_fields();
1540
1541        let mut request =
1542            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1543        let err = request.check_schema(&metadata).unwrap_err();
1544        check_invalid_request(&err, "delete requests need column k0");
1545        let err = request.fill_missing_columns(&metadata).unwrap_err();
1546        check_invalid_request(&err, "delete requests need column k0");
1547
1548        let rows = Rows {
1549            schema: vec![
1550                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1551                new_column_schema(
1552                    "ts",
1553                    ColumnDataType::TimestampMillisecond,
1554                    SemanticType::Timestamp,
1555                ),
1556            ],
1557            rows: vec![Row {
1558                values: vec![i64_value(100), ts_ms_value(1)],
1559            }],
1560        };
1561        let mut request =
1562            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1563        let err = request.check_schema(&metadata).unwrap_err();
1564        assert!(err.is_fill_default());
1565        request.fill_missing_columns(&metadata).unwrap();
1566
1567        let expect_rows = Rows {
1568            schema: vec![
1569                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1570                new_column_schema(
1571                    "ts",
1572                    ColumnDataType::TimestampMillisecond,
1573                    SemanticType::Timestamp,
1574                ),
1575                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1576            ],
1577            // Column f1 is not nullable and we use 0 for padding.
1578            rows: vec![Row {
1579                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1580            }],
1581        };
1582        assert_eq!(expect_rows, request.rows);
1583    }
1584
1585    #[test]
1586    fn test_fill_missing_without_default_in_delete() {
1587        let mut builder = builder_with_ts_tag();
1588        builder
1589            // f0 is nullable.
1590            .push_column_metadata(ColumnMetadata {
1591                column_schema: datatypes::schema::ColumnSchema::new(
1592                    "f0",
1593                    ConcreteDataType::int64_datatype(),
1594                    true,
1595                ),
1596                semantic_type: SemanticType::Field,
1597                column_id: 3,
1598            })
1599            // f1 is not nullable and don't has default.
1600            .push_column_metadata(ColumnMetadata {
1601                column_schema: datatypes::schema::ColumnSchema::new(
1602                    "f1",
1603                    ConcreteDataType::int64_datatype(),
1604                    false,
1605                ),
1606                semantic_type: SemanticType::Field,
1607                column_id: 4,
1608            });
1609        let metadata = builder.build().unwrap();
1610
1611        let rows = Rows {
1612            schema: vec![
1613                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1614                new_column_schema(
1615                    "ts",
1616                    ColumnDataType::TimestampMillisecond,
1617                    SemanticType::Timestamp,
1618                ),
1619            ],
1620            // Missing f0 (nullable), f1 (not nullable).
1621            rows: vec![Row {
1622                values: vec![i64_value(100), ts_ms_value(1)],
1623            }],
1624        };
1625        let mut request =
1626            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1627        let err = request.check_schema(&metadata).unwrap_err();
1628        assert!(err.is_fill_default());
1629        request.fill_missing_columns(&metadata).unwrap();
1630
1631        let expect_rows = Rows {
1632            schema: vec![
1633                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1634                new_column_schema(
1635                    "ts",
1636                    ColumnDataType::TimestampMillisecond,
1637                    SemanticType::Timestamp,
1638                ),
1639                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1640            ],
1641            // Column f1 is not nullable and we use 0 for padding.
1642            rows: vec![Row {
1643                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1644            }],
1645        };
1646        assert_eq!(expect_rows, request.rows);
1647    }
1648
1649    #[test]
1650    fn test_no_default() {
1651        let rows = Rows {
1652            schema: vec![new_column_schema(
1653                "k0",
1654                ColumnDataType::Int64,
1655                SemanticType::Tag,
1656            )],
1657            rows: vec![Row {
1658                values: vec![i64_value(1)],
1659            }],
1660        };
1661        let metadata = new_region_metadata();
1662
1663        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1664        let err = request.fill_missing_columns(&metadata).unwrap_err();
1665        check_invalid_request(&err, "column ts does not have default value");
1666    }
1667
1668    #[test]
1669    fn test_missing_and_invalid() {
1670        // Missing f0 and f1 has invalid type (string).
1671        let rows = Rows {
1672            schema: vec![
1673                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1674                new_column_schema(
1675                    "ts",
1676                    ColumnDataType::TimestampMillisecond,
1677                    SemanticType::Timestamp,
1678                ),
1679                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1680            ],
1681            rows: vec![Row {
1682                values: vec![
1683                    i64_value(100),
1684                    ts_ms_value(1),
1685                    Value {
1686                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1687                    },
1688                ],
1689            }],
1690        };
1691        let metadata = region_metadata_two_fields();
1692
1693        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1694        let err = request.check_schema(&metadata).unwrap_err();
1695        check_invalid_request(
1696            &err,
1697            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1698        );
1699    }
1700
1701    #[test]
1702    fn test_write_request_metadata() {
1703        let rows = Rows {
1704            schema: vec![
1705                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1706                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1707            ],
1708            rows: vec![Row {
1709                values: vec![i64_value(1), i64_value(2)],
1710            }],
1711        };
1712
1713        let metadata = Arc::new(new_region_metadata());
1714        let request = WriteRequest::new(
1715            RegionId::new(1, 1),
1716            OpType::Put,
1717            rows,
1718            Some(metadata.clone()),
1719        )
1720        .unwrap();
1721
1722        assert!(request.region_metadata.is_some());
1723        assert_eq!(
1724            request.region_metadata.unwrap().region_id,
1725            RegionId::new(1, 1)
1726        );
1727    }
1728}