mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23};
24use api::v1::column_def::options_from_column_schema;
25use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
26use common_telemetry::info;
27use datatypes::prelude::DataType;
28use partition::expr::PartitionExpr;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{
37    MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
38};
39use store_api::region_request::{
40    AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
41    RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
42    RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest,
43    RegionOpenRequest, RegionRequest, RegionTruncateRequest,
44};
45use store_api::storage::{FileId, RegionId};
46use tokio::sync::oneshot::{self, Receiver, Sender};
47
48use crate::error::{
49    CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
50    FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu,
51    Result, UnexpectedSnafu,
52};
53use crate::flush::FlushReason;
54use crate::manifest::action::{RegionEdit, TruncateKind};
55use crate::memtable::MemtableId;
56use crate::memtable::bulk::part::BulkPart;
57use crate::metrics::COMPACTION_ELAPSED_TOTAL;
58use crate::region::options::RegionOptions;
59use crate::sst::file::FileMeta;
60use crate::sst::index::IndexBuildType;
61use crate::wal::EntryId;
62use crate::wal::entry_distributor::WalEntryReceiver;
63
64/// Request to write a region.
65#[derive(Debug)]
66pub struct WriteRequest {
67    /// Region to write.
68    pub region_id: RegionId,
69    /// Type of the write request.
70    pub op_type: OpType,
71    /// Rows to write.
72    pub rows: Rows,
73    /// Map column name to column index in `rows`.
74    pub name_to_index: HashMap<String, usize>,
75    /// Whether each column has null.
76    pub has_null: Vec<bool>,
77    /// Write hint.
78    pub hint: Option<WriteHint>,
79    /// Region metadata on the time of this request is created.
80    pub(crate) region_metadata: Option<RegionMetadataRef>,
81}
82
83impl WriteRequest {
84    /// Creates a new request.
85    ///
86    /// Returns `Err` if `rows` are invalid.
87    pub fn new(
88        region_id: RegionId,
89        op_type: OpType,
90        rows: Rows,
91        region_metadata: Option<RegionMetadataRef>,
92    ) -> Result<WriteRequest> {
93        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
94        for (index, column) in rows.schema.iter().enumerate() {
95            ensure!(
96                name_to_index
97                    .insert(column.column_name.clone(), index)
98                    .is_none(),
99                InvalidRequestSnafu {
100                    region_id,
101                    reason: format!("duplicate column {}", column.column_name),
102                }
103            );
104        }
105
106        let mut has_null = vec![false; rows.schema.len()];
107        for row in &rows.rows {
108            ensure!(
109                row.values.len() == rows.schema.len(),
110                InvalidRequestSnafu {
111                    region_id,
112                    reason: format!(
113                        "row has {} columns but schema has {}",
114                        row.values.len(),
115                        rows.schema.len()
116                    ),
117                }
118            );
119
120            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
121                validate_proto_value(region_id, value, column_schema)?;
122
123                if value.value_data.is_none() {
124                    has_null[i] = true;
125                }
126            }
127        }
128
129        Ok(WriteRequest {
130            region_id,
131            op_type,
132            rows,
133            name_to_index,
134            has_null,
135            hint: None,
136            region_metadata,
137        })
138    }
139
140    /// Sets the write hint.
141    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
142        self.hint = hint;
143        self
144    }
145
146    /// Returns the encoding hint.
147    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
148        infer_primary_key_encoding_from_hint(self.hint.as_ref())
149    }
150
151    /// Returns estimated size of the request.
152    pub(crate) fn estimated_size(&self) -> usize {
153        let row_size = self
154            .rows
155            .rows
156            .first()
157            .map(|row| row.encoded_len())
158            .unwrap_or(0);
159        row_size * self.rows.rows.len()
160    }
161
162    /// Gets column index by name.
163    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
164        self.name_to_index.get(name).copied()
165    }
166
167    /// Checks schema of rows is compatible with schema of the region.
168    ///
169    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
170    /// error.
171    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
172        debug_assert_eq!(self.region_id, metadata.region_id);
173
174        let region_id = self.region_id;
175        // Index all columns in rows.
176        let mut rows_columns: HashMap<_, _> = self
177            .rows
178            .schema
179            .iter()
180            .map(|column| (&column.column_name, column))
181            .collect();
182
183        let mut need_fill_default = false;
184        // Checks all columns in this region.
185        for column in &metadata.column_metadatas {
186            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
187                // Check data type.
188                ensure!(
189                    is_column_type_value_eq(
190                        input_col.datatype,
191                        input_col.datatype_extension.clone(),
192                        &column.column_schema.data_type
193                    ),
194                    InvalidRequestSnafu {
195                        region_id,
196                        reason: format!(
197                            "column {} expect type {:?}, given: {}({})",
198                            column.column_schema.name,
199                            column.column_schema.data_type,
200                            ColumnDataType::try_from(input_col.datatype)
201                                .map(|v| v.as_str_name())
202                                .unwrap_or("Unknown"),
203                            input_col.datatype,
204                        )
205                    }
206                );
207
208                // Check semantic type.
209                ensure!(
210                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
211                    InvalidRequestSnafu {
212                        region_id,
213                        reason: format!(
214                            "column {} has semantic type {:?}, given: {}({})",
215                            column.column_schema.name,
216                            column.semantic_type,
217                            api::v1::SemanticType::try_from(input_col.semantic_type)
218                                .map(|v| v.as_str_name())
219                                .unwrap_or("Unknown"),
220                            input_col.semantic_type
221                        ),
222                    }
223                );
224
225                // Check nullable.
226                // Safety: `rows_columns` ensures this column exists.
227                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
228                ensure!(
229                    !has_null || column.column_schema.is_nullable(),
230                    InvalidRequestSnafu {
231                        region_id,
232                        reason: format!(
233                            "column {} is not null but input has null",
234                            column.column_schema.name
235                        ),
236                    }
237                );
238            } else {
239                // Rows don't have this column.
240                self.check_missing_column(column)?;
241
242                need_fill_default = true;
243            }
244        }
245
246        // Checks all columns in rows exist in the region.
247        if !rows_columns.is_empty() {
248            let names: Vec<_> = rows_columns.into_keys().collect();
249            return InvalidRequestSnafu {
250                region_id,
251                reason: format!("unknown columns: {:?}", names),
252            }
253            .fail();
254        }
255
256        // If we need to fill default values, return a special error.
257        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
258
259        Ok(())
260    }
261
262    /// Tries to fill missing columns.
263    ///
264    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
265    /// values.
266    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
267        debug_assert_eq!(self.region_id, metadata.region_id);
268
269        let mut columns_to_fill = vec![];
270        for column in &metadata.column_metadatas {
271            if !self.name_to_index.contains_key(&column.column_schema.name) {
272                columns_to_fill.push(column);
273            }
274        }
275        self.fill_columns(columns_to_fill)?;
276
277        Ok(())
278    }
279
280    /// Checks the schema and fill missing columns.
281    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
282        if let Err(e) = self.check_schema(metadata) {
283            if e.is_fill_default() {
284                // TODO(yingwen): Add metrics for this case.
285                // We need to fill default value. The write request may be a request
286                // sent before changing the schema.
287                self.fill_missing_columns(metadata)?;
288            } else {
289                return Err(e);
290            }
291        }
292
293        Ok(())
294    }
295
296    /// Fills default value for specific `columns`.
297    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
298        let mut default_values = Vec::with_capacity(columns.len());
299        let mut columns_to_fill = Vec::with_capacity(columns.len());
300        for column in columns {
301            let default_value = self.column_default_value(column)?;
302            if default_value.value_data.is_some() {
303                default_values.push(default_value);
304                columns_to_fill.push(column);
305            }
306        }
307
308        for row in &mut self.rows.rows {
309            row.values.extend(default_values.iter().cloned());
310        }
311
312        for column in columns_to_fill {
313            let (datatype, datatype_ext) =
314                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
315                    .with_context(|_| ConvertColumnDataTypeSnafu {
316                        reason: format!(
317                            "no protobuf type for column {} ({:?})",
318                            column.column_schema.name, column.column_schema.data_type
319                        ),
320                    })?
321                    .to_parts();
322            self.rows.schema.push(ColumnSchema {
323                column_name: column.column_schema.name.clone(),
324                datatype: datatype as i32,
325                semantic_type: column.semantic_type as i32,
326                datatype_extension: datatype_ext,
327                options: options_from_column_schema(&column.column_schema),
328            });
329        }
330
331        Ok(())
332    }
333
334    /// Checks whether we should allow a row doesn't provide this column.
335    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
336        if self.op_type == OpType::Delete {
337            if column.semantic_type == SemanticType::Field {
338                // For delete request, all tags and timestamp is required. We don't fill default
339                // tag or timestamp while deleting rows.
340                return Ok(());
341            } else {
342                return InvalidRequestSnafu {
343                    region_id: self.region_id,
344                    reason: format!("delete requests need column {}", column.column_schema.name),
345                }
346                .fail();
347            }
348        }
349
350        // Not a delete request. Checks whether they have default value.
351        ensure!(
352            column.column_schema.is_nullable()
353                || column.column_schema.default_constraint().is_some(),
354            InvalidRequestSnafu {
355                region_id: self.region_id,
356                reason: format!("missing column {}", column.column_schema.name),
357            }
358        );
359
360        Ok(())
361    }
362
363    /// Returns the default value for specific column.
364    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
365        let default_value = match self.op_type {
366            OpType::Delete => {
367                ensure!(
368                    column.semantic_type == SemanticType::Field,
369                    InvalidRequestSnafu {
370                        region_id: self.region_id,
371                        reason: format!(
372                            "delete requests need column {}",
373                            column.column_schema.name
374                        ),
375                    }
376                );
377
378                // For delete request, we need a default value for padding so we
379                // can delete a row even a field doesn't have a default value. So the
380                // value doesn't need to following the default value constraint of the
381                // column.
382                if column.column_schema.is_nullable() {
383                    datatypes::value::Value::Null
384                } else {
385                    column.column_schema.data_type.default_value()
386                }
387            }
388            OpType::Put => {
389                // For put requests, we use the default value from column schema.
390                if column.column_schema.is_default_impure() {
391                    UnexpectedSnafu {
392                        reason: format!(
393                            "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
394                            self.region_id,
395                            column.column_schema.name,
396                            column.column_schema.default_constraint(),
397                        ),
398                    }
399                    .fail()?
400                }
401                column
402                    .column_schema
403                    .create_default()
404                    .context(CreateDefaultSnafu {
405                        region_id: self.region_id,
406                        column: &column.column_schema.name,
407                    })?
408                    // This column doesn't have default value.
409                    .with_context(|| InvalidRequestSnafu {
410                        region_id: self.region_id,
411                        reason: format!(
412                            "column {} does not have default value",
413                            column.column_schema.name
414                        ),
415                    })?
416            }
417        };
418
419        // Convert default value into proto's value.
420        Ok(api::helper::to_grpc_value(default_value))
421    }
422}
423
424/// Validate proto value schema.
425pub(crate) fn validate_proto_value(
426    region_id: RegionId,
427    value: &Value,
428    column_schema: &ColumnSchema,
429) -> Result<()> {
430    if let Some(value_type) = proto_value_type(value) {
431        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
432            InvalidRequestSnafu {
433                region_id,
434                reason: format!(
435                    "column {} has unknown type {}",
436                    column_schema.column_name, column_schema.datatype
437                ),
438            }
439            .build()
440        })?;
441        ensure!(
442            proto_value_type_match(column_type, value_type),
443            InvalidRequestSnafu {
444                region_id,
445                reason: format!(
446                    "value has type {:?}, but column {} has type {:?}({})",
447                    value_type, column_schema.column_name, column_type, column_schema.datatype,
448                ),
449            }
450        );
451    }
452
453    Ok(())
454}
455
456fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
457    match (column_type, value_type) {
458        (ct, vt) if ct == vt => true,
459        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
460        (ColumnDataType::Json, ColumnDataType::Binary) => true,
461        _ => false,
462    }
463}
464
465/// Oneshot output result sender.
466#[derive(Debug)]
467pub struct OutputTx(Sender<Result<AffectedRows>>);
468
469impl OutputTx {
470    /// Creates a new output sender.
471    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
472        OutputTx(sender)
473    }
474
475    /// Sends the `result`.
476    pub(crate) fn send(self, result: Result<AffectedRows>) {
477        // Ignores send result.
478        let _ = self.0.send(result);
479    }
480}
481
482/// Optional output result sender.
483#[derive(Debug)]
484pub(crate) struct OptionOutputTx(Option<OutputTx>);
485
486impl OptionOutputTx {
487    /// Creates a sender.
488    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
489        OptionOutputTx(sender)
490    }
491
492    /// Creates an empty sender.
493    pub(crate) fn none() -> OptionOutputTx {
494        OptionOutputTx(None)
495    }
496
497    /// Sends the `result` and consumes the inner sender.
498    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
499        if let Some(sender) = self.0.take() {
500            sender.send(result);
501        }
502    }
503
504    /// Sends the `result` and consumes the sender.
505    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
506        if let Some(sender) = self.0.take() {
507            sender.send(result);
508        }
509    }
510
511    /// Takes the inner sender.
512    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
513        self.0.take()
514    }
515}
516
517impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
518    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
519        Self::new(Some(OutputTx::new(sender)))
520    }
521}
522
523impl OnFailure for OptionOutputTx {
524    fn on_failure(&mut self, err: Error) {
525        self.send_mut(Err(err));
526    }
527}
528
529/// Callback on failure.
530pub(crate) trait OnFailure {
531    /// Handles `err` on failure.
532    fn on_failure(&mut self, err: Error);
533}
534
535/// Sender and write request.
536#[derive(Debug)]
537pub(crate) struct SenderWriteRequest {
538    /// Result sender.
539    pub(crate) sender: OptionOutputTx,
540    pub(crate) request: WriteRequest,
541}
542
543pub(crate) struct SenderBulkRequest {
544    pub(crate) sender: OptionOutputTx,
545    pub(crate) region_id: RegionId,
546    pub(crate) request: BulkPart,
547    pub(crate) region_metadata: RegionMetadataRef,
548}
549
550/// Request sent to a worker with timestamp
551#[derive(Debug)]
552pub(crate) struct WorkerRequestWithTime {
553    pub(crate) request: WorkerRequest,
554    pub(crate) created_at: Instant,
555}
556
557impl WorkerRequestWithTime {
558    pub(crate) fn new(request: WorkerRequest) -> Self {
559        Self {
560            request,
561            created_at: Instant::now(),
562        }
563    }
564}
565
566/// Request sent to a worker
567#[derive(Debug)]
568pub(crate) enum WorkerRequest {
569    /// Write to a region.
570    Write(SenderWriteRequest),
571
572    /// Ddl request to a region.
573    Ddl(SenderDdlRequest),
574
575    /// Notifications from internal background jobs.
576    Background {
577        /// Id of the region to send.
578        region_id: RegionId,
579        /// Internal notification.
580        notify: BackgroundNotify,
581    },
582
583    /// The internal commands.
584    SetRegionRoleStateGracefully {
585        /// Id of the region to send.
586        region_id: RegionId,
587        /// The [SettableRegionRoleState].
588        region_role_state: SettableRegionRoleState,
589        /// The sender of [SetReadonlyResponse].
590        sender: Sender<SetRegionRoleStateResponse>,
591    },
592
593    /// Notify a worker to stop.
594    Stop,
595
596    /// Use [RegionEdit] to edit a region directly.
597    EditRegion(RegionEditRequest),
598
599    /// Keep the manifest of a region up to date.
600    SyncRegion(RegionSyncRequest),
601
602    /// Bulk inserts request and region metadata.
603    BulkInserts {
604        metadata: Option<RegionMetadataRef>,
605        request: RegionBulkInsertsRequest,
606        sender: OptionOutputTx,
607    },
608
609    /// Remap manifests request.
610    RemapManifests(RemapManifestsRequest),
611
612    /// Copy region from request.
613    CopyRegionFrom(CopyRegionFromRequest),
614}
615
616impl WorkerRequest {
617    /// Creates a new open region request.
618    pub(crate) fn new_open_region_request(
619        region_id: RegionId,
620        request: RegionOpenRequest,
621        entry_receiver: Option<WalEntryReceiver>,
622    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
623        let (sender, receiver) = oneshot::channel();
624
625        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
626            region_id,
627            sender: sender.into(),
628            request: DdlRequest::Open((request, entry_receiver)),
629        });
630
631        (worker_request, receiver)
632    }
633
634    /// Creates a new catchup region request.
635    pub(crate) fn new_catchup_region_request(
636        region_id: RegionId,
637        request: RegionCatchupRequest,
638        entry_receiver: Option<WalEntryReceiver>,
639    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
640        let (sender, receiver) = oneshot::channel();
641        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
642            region_id,
643            sender: sender.into(),
644            request: DdlRequest::Catchup((request, entry_receiver)),
645        });
646        (worker_request, receiver)
647    }
648
649    /// Converts request from a [RegionRequest].
650    pub(crate) fn try_from_region_request(
651        region_id: RegionId,
652        value: RegionRequest,
653        region_metadata: Option<RegionMetadataRef>,
654    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
655        let (sender, receiver) = oneshot::channel();
656        let worker_request = match value {
657            RegionRequest::Put(v) => {
658                let mut write_request =
659                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
660                        .with_hint(v.hint);
661                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
662                    && let Some(region_metadata) = &region_metadata
663                {
664                    write_request.maybe_fill_missing_columns(region_metadata)?;
665                }
666                WorkerRequest::Write(SenderWriteRequest {
667                    sender: sender.into(),
668                    request: write_request,
669                })
670            }
671            RegionRequest::Delete(v) => {
672                let mut write_request =
673                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
674                        .with_hint(v.hint);
675                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
676                    && let Some(region_metadata) = &region_metadata
677                {
678                    write_request.maybe_fill_missing_columns(region_metadata)?;
679                }
680                WorkerRequest::Write(SenderWriteRequest {
681                    sender: sender.into(),
682                    request: write_request,
683                })
684            }
685            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
686                region_id,
687                sender: sender.into(),
688                request: DdlRequest::Create(v),
689            }),
690            RegionRequest::Drop(v) => WorkerRequest::Ddl(SenderDdlRequest {
691                region_id,
692                sender: sender.into(),
693                request: DdlRequest::Drop(v),
694            }),
695            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
696                region_id,
697                sender: sender.into(),
698                request: DdlRequest::Open((v, None)),
699            }),
700            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
701                region_id,
702                sender: sender.into(),
703                request: DdlRequest::Close(v),
704            }),
705            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
706                region_id,
707                sender: sender.into(),
708                request: DdlRequest::Alter(v),
709            }),
710            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
711                region_id,
712                sender: sender.into(),
713                request: DdlRequest::Flush(v),
714            }),
715            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
716                region_id,
717                sender: sender.into(),
718                request: DdlRequest::Compact(v),
719            }),
720            RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
721                region_id,
722                sender: sender.into(),
723                request: DdlRequest::BuildIndex(v),
724            }),
725            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
726                region_id,
727                sender: sender.into(),
728                request: DdlRequest::Truncate(v),
729            }),
730            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
731                region_id,
732                sender: sender.into(),
733                request: DdlRequest::Catchup((v, None)),
734            }),
735            RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
736                region_id,
737                sender: sender.into(),
738                request: DdlRequest::EnterStaging(v),
739            }),
740            RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
741                metadata: region_metadata,
742                sender: sender.into(),
743                request: region_bulk_inserts_request,
744            },
745            RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
746                region_id,
747                sender: sender.into(),
748                request: DdlRequest::ApplyStagingManifest(v),
749            }),
750        };
751
752        Ok((worker_request, receiver))
753    }
754
755    pub(crate) fn new_set_readonly_gracefully(
756        region_id: RegionId,
757        region_role_state: SettableRegionRoleState,
758    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
759        let (sender, receiver) = oneshot::channel();
760
761        (
762            WorkerRequest::SetRegionRoleStateGracefully {
763                region_id,
764                region_role_state,
765                sender,
766            },
767            receiver,
768        )
769    }
770
771    pub(crate) fn new_sync_region_request(
772        region_id: RegionId,
773        manifest_version: ManifestVersion,
774    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
775        let (sender, receiver) = oneshot::channel();
776        (
777            WorkerRequest::SyncRegion(RegionSyncRequest {
778                region_id,
779                manifest_version,
780                sender,
781            }),
782            receiver,
783        )
784    }
785
786    /// Converts [RemapManifestsRequest] from a [RemapManifestsRequest](store_api::region_engine::RemapManifestsRequest).
787    ///
788    /// # Errors
789    ///
790    /// Returns an error if the partition expression is invalid or missing.
791    /// Returns an error if the new partition expressions are not found for some regions.
792    #[allow(clippy::type_complexity)]
793    pub(crate) fn try_from_remap_manifests_request(
794        store_api::region_engine::RemapManifestsRequest {
795            region_id,
796            input_regions,
797            region_mapping,
798            new_partition_exprs,
799        }: store_api::region_engine::RemapManifestsRequest,
800    ) -> Result<(WorkerRequest, Receiver<Result<HashMap<RegionId, String>>>)> {
801        let (sender, receiver) = oneshot::channel();
802        let new_partition_exprs = new_partition_exprs
803            .into_iter()
804            .map(|(k, v)| {
805                Ok((
806                    k,
807                    PartitionExpr::from_json_str(&v)
808                        .context(InvalidPartitionExprSnafu { expr: v })?
809                        .context(MissingPartitionExprSnafu { region_id: k })?,
810                ))
811            })
812            .collect::<Result<HashMap<_, _>>>()?;
813
814        let request = RemapManifestsRequest {
815            region_id,
816            input_regions,
817            region_mapping,
818            new_partition_exprs,
819            sender,
820        };
821
822        Ok((WorkerRequest::RemapManifests(request), receiver))
823    }
824
825    /// Converts [CopyRegionFromRequest] from a [MitoCopyRegionFromRequest](store_api::region_engine::MitoCopyRegionFromRequest).
826    pub(crate) fn try_from_copy_region_from_request(
827        region_id: RegionId,
828        store_api::region_engine::MitoCopyRegionFromRequest {
829            source_region_id,
830            parallelism,
831        }: store_api::region_engine::MitoCopyRegionFromRequest,
832    ) -> Result<(WorkerRequest, Receiver<Result<MitoCopyRegionFromResponse>>)> {
833        let (sender, receiver) = oneshot::channel();
834        let request = CopyRegionFromRequest {
835            region_id,
836            source_region_id,
837            parallelism,
838            sender,
839        };
840        Ok((WorkerRequest::CopyRegionFrom(request), receiver))
841    }
842}
843
844/// DDL request to a region.
845#[derive(Debug)]
846pub(crate) enum DdlRequest {
847    Create(RegionCreateRequest),
848    Drop(RegionDropRequest),
849    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
850    Close(RegionCloseRequest),
851    Alter(RegionAlterRequest),
852    Flush(RegionFlushRequest),
853    Compact(RegionCompactRequest),
854    BuildIndex(RegionBuildIndexRequest),
855    Truncate(RegionTruncateRequest),
856    Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
857    EnterStaging(EnterStagingRequest),
858    ApplyStagingManifest(ApplyStagingManifestRequest),
859}
860
861/// Sender and Ddl request.
862#[derive(Debug)]
863pub(crate) struct SenderDdlRequest {
864    /// Region id of the request.
865    pub(crate) region_id: RegionId,
866    /// Result sender.
867    pub(crate) sender: OptionOutputTx,
868    /// Ddl request.
869    pub(crate) request: DdlRequest,
870}
871
872/// Notification from a background job.
873#[derive(Debug)]
874pub(crate) enum BackgroundNotify {
875    /// Flush has finished.
876    FlushFinished(FlushFinished),
877    /// Flush has failed.
878    FlushFailed(FlushFailed),
879    /// Index build has finished.
880    IndexBuildFinished(IndexBuildFinished),
881    /// Index build has been stopped (aborted or succeeded).
882    IndexBuildStopped(IndexBuildStopped),
883    /// Index build has failed.
884    IndexBuildFailed(IndexBuildFailed),
885    /// Compaction has finished.
886    CompactionFinished(CompactionFinished),
887    /// Compaction has failed.
888    CompactionFailed(CompactionFailed),
889    /// Truncate result.
890    Truncate(TruncateResult),
891    /// Region change result.
892    RegionChange(RegionChangeResult),
893    /// Region edit result.
894    RegionEdit(RegionEditResult),
895    /// Enter staging result.
896    EnterStaging(EnterStagingResult),
897    /// Copy region result.
898    CopyRegionFromFinished(CopyRegionFromFinished),
899}
900
901/// Notifies a flush job is finished.
902#[derive(Debug)]
903pub(crate) struct FlushFinished {
904    /// Region id.
905    pub(crate) region_id: RegionId,
906    /// Entry id of flushed data.
907    pub(crate) flushed_entry_id: EntryId,
908    /// Flush result senders.
909    pub(crate) senders: Vec<OutputTx>,
910    /// Flush timer.
911    pub(crate) _timer: HistogramTimer,
912    /// Region edit to apply.
913    pub(crate) edit: RegionEdit,
914    /// Memtables to remove.
915    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
916    /// Whether the region is in staging mode.
917    pub(crate) is_staging: bool,
918    /// Reason for flush.
919    pub(crate) flush_reason: FlushReason,
920}
921
922impl FlushFinished {
923    /// Marks the flush job as successful and observes the timer.
924    pub(crate) fn on_success(self) {
925        for sender in self.senders {
926            sender.send(Ok(0));
927        }
928    }
929}
930
931impl OnFailure for FlushFinished {
932    fn on_failure(&mut self, err: Error) {
933        let err = Arc::new(err);
934        for sender in self.senders.drain(..) {
935            sender.send(Err(err.clone()).context(FlushRegionSnafu {
936                region_id: self.region_id,
937            }));
938        }
939    }
940}
941
942/// Notifies a flush job is failed.
943#[derive(Debug)]
944pub(crate) struct FlushFailed {
945    /// The error source of the failure.
946    pub(crate) err: Arc<Error>,
947}
948
949#[derive(Debug)]
950pub(crate) struct IndexBuildFinished {
951    #[allow(dead_code)]
952    pub(crate) region_id: RegionId,
953    pub(crate) edit: RegionEdit,
954}
955
956/// Notifies an index build job has been stopped.
957#[derive(Debug)]
958pub(crate) struct IndexBuildStopped {
959    #[allow(dead_code)]
960    pub(crate) region_id: RegionId,
961    pub(crate) file_id: FileId,
962}
963
964/// Notifies an index build job has failed.
965#[derive(Debug)]
966pub(crate) struct IndexBuildFailed {
967    pub(crate) err: Arc<Error>,
968}
969
970/// Notifies a compaction job has finished.
971#[derive(Debug)]
972pub(crate) struct CompactionFinished {
973    /// Region id.
974    pub(crate) region_id: RegionId,
975    /// Compaction result senders.
976    pub(crate) senders: Vec<OutputTx>,
977    /// Start time of compaction task.
978    pub(crate) start_time: Instant,
979    /// Region edit to apply.
980    pub(crate) edit: RegionEdit,
981}
982
983impl CompactionFinished {
984    pub fn on_success(self) {
985        // only update compaction time on success
986        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
987
988        for sender in self.senders {
989            sender.send(Ok(0));
990        }
991        info!("Successfully compacted region: {}", self.region_id);
992    }
993}
994
995impl OnFailure for CompactionFinished {
996    /// Compaction succeeded but failed to update manifest or region's already been dropped.
997    fn on_failure(&mut self, err: Error) {
998        let err = Arc::new(err);
999        for sender in self.senders.drain(..) {
1000            sender.send(Err(err.clone()).context(CompactRegionSnafu {
1001                region_id: self.region_id,
1002            }));
1003        }
1004    }
1005}
1006
1007/// A failing compaction result.
1008#[derive(Debug)]
1009pub(crate) struct CompactionFailed {
1010    pub(crate) region_id: RegionId,
1011    /// The error source of the failure.
1012    pub(crate) err: Arc<Error>,
1013}
1014
1015/// Notifies the truncate result of a region.
1016#[derive(Debug)]
1017pub(crate) struct TruncateResult {
1018    /// Region id.
1019    pub(crate) region_id: RegionId,
1020    /// Result sender.
1021    pub(crate) sender: OptionOutputTx,
1022    /// Truncate result.
1023    pub(crate) result: Result<()>,
1024    pub(crate) kind: TruncateKind,
1025}
1026
1027/// Notifies the region the result of writing region change action.
1028#[derive(Debug)]
1029pub(crate) struct RegionChangeResult {
1030    /// Region id.
1031    pub(crate) region_id: RegionId,
1032    /// The new region metadata to apply.
1033    pub(crate) new_meta: RegionMetadataRef,
1034    /// Result sender.
1035    pub(crate) sender: OptionOutputTx,
1036    /// Result from the manifest manager.
1037    pub(crate) result: Result<()>,
1038    /// Used for index build in schema change.
1039    pub(crate) need_index: bool,
1040    /// New options for the region.
1041    pub(crate) new_options: Option<RegionOptions>,
1042}
1043
1044/// Notifies the region the result of entering staging.
1045#[derive(Debug)]
1046pub(crate) struct EnterStagingResult {
1047    /// Region id.
1048    pub(crate) region_id: RegionId,
1049    /// The new partition expression to apply.
1050    pub(crate) partition_expr: String,
1051    /// Result sender.
1052    pub(crate) sender: OptionOutputTx,
1053    /// Result from the manifest manager.
1054    pub(crate) result: Result<()>,
1055}
1056
1057#[derive(Debug)]
1058pub(crate) struct CopyRegionFromFinished {
1059    /// Region id.
1060    pub(crate) region_id: RegionId,
1061    /// Region edit to apply.
1062    pub(crate) edit: RegionEdit,
1063    /// Result sender.
1064    pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1065}
1066
1067/// Request to edit a region directly.
1068#[derive(Debug)]
1069pub(crate) struct RegionEditRequest {
1070    pub(crate) region_id: RegionId,
1071    pub(crate) edit: RegionEdit,
1072    /// The sender to notify the result to the region engine.
1073    pub(crate) tx: Sender<Result<()>>,
1074}
1075
1076/// Notifies the regin the result of editing region.
1077#[derive(Debug)]
1078pub(crate) struct RegionEditResult {
1079    /// Region id.
1080    pub(crate) region_id: RegionId,
1081    /// Result sender.
1082    pub(crate) sender: Sender<Result<()>>,
1083    /// Region edit to apply.
1084    pub(crate) edit: RegionEdit,
1085    /// Result from the manifest manager.
1086    pub(crate) result: Result<()>,
1087    /// Whether region state need to be set to Writable after handling this request.
1088    pub(crate) update_region_state: bool,
1089    /// The region is in staging mode before handling this request.
1090    pub(crate) is_staging: bool,
1091}
1092
1093#[derive(Debug)]
1094pub(crate) struct BuildIndexRequest {
1095    pub(crate) region_id: RegionId,
1096    pub(crate) build_type: IndexBuildType,
1097    /// files need to build index, empty means all.
1098    pub(crate) file_metas: Vec<FileMeta>,
1099}
1100
1101#[derive(Debug)]
1102pub(crate) struct RegionSyncRequest {
1103    pub(crate) region_id: RegionId,
1104    pub(crate) manifest_version: ManifestVersion,
1105    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
1106    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
1107}
1108
1109#[derive(Debug)]
1110pub(crate) struct RemapManifestsRequest {
1111    /// The [`RegionId`] of a staging region used to obtain table directory and storage configuration for the remap operation.
1112    pub(crate) region_id: RegionId,
1113    /// Regions to remap manifests from.
1114    pub(crate) input_regions: Vec<RegionId>,
1115    /// For each old region, which new regions should receive its files
1116    pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
1117    /// New partition expressions for the new regions.
1118    pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
1119    /// Sender for the result of the remap operation.
1120    ///
1121    /// The result is a map from region IDs to their corresponding staging manifest paths.
1122    pub(crate) sender: Sender<Result<HashMap<RegionId, String>>>,
1123}
1124
1125#[derive(Debug)]
1126pub(crate) struct CopyRegionFromRequest {
1127    /// The [`RegionId`] of the target region.
1128    pub(crate) region_id: RegionId,
1129    /// The [`RegionId`] of the source region.
1130    pub(crate) source_region_id: RegionId,
1131    /// The parallelism of the copy operation.
1132    pub(crate) parallelism: usize,
1133    /// Result sender.
1134    pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1135}
1136
1137#[cfg(test)]
1138mod tests {
1139    use api::v1::value::ValueData;
1140    use api::v1::{Row, SemanticType};
1141    use datatypes::prelude::ConcreteDataType;
1142    use datatypes::schema::ColumnDefaultConstraint;
1143    use mito_codec::test_util::i64_value;
1144    use store_api::metadata::RegionMetadataBuilder;
1145
1146    use super::*;
1147    use crate::error::Error;
1148    use crate::test_util::ts_ms_value;
1149
1150    fn new_column_schema(
1151        name: &str,
1152        data_type: ColumnDataType,
1153        semantic_type: SemanticType,
1154    ) -> ColumnSchema {
1155        ColumnSchema {
1156            column_name: name.to_string(),
1157            datatype: data_type as i32,
1158            semantic_type: semantic_type as i32,
1159            ..Default::default()
1160        }
1161    }
1162
1163    fn check_invalid_request(err: &Error, expect: &str) {
1164        if let Error::InvalidRequest {
1165            region_id: _,
1166            reason,
1167            location: _,
1168        } = err
1169        {
1170            assert_eq!(reason, expect);
1171        } else {
1172            panic!("Unexpected error {err}")
1173        }
1174    }
1175
1176    #[test]
1177    fn test_write_request_duplicate_column() {
1178        let rows = Rows {
1179            schema: vec![
1180                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1181                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1182            ],
1183            rows: vec![],
1184        };
1185
1186        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1187        check_invalid_request(&err, "duplicate column c0");
1188    }
1189
1190    #[test]
1191    fn test_valid_write_request() {
1192        let rows = Rows {
1193            schema: vec![
1194                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1195                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1196            ],
1197            rows: vec![Row {
1198                values: vec![i64_value(1), i64_value(2)],
1199            }],
1200        };
1201
1202        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1203        assert_eq!(0, request.column_index_by_name("c0").unwrap());
1204        assert_eq!(1, request.column_index_by_name("c1").unwrap());
1205        assert_eq!(None, request.column_index_by_name("c2"));
1206    }
1207
1208    #[test]
1209    fn test_write_request_column_num() {
1210        let rows = Rows {
1211            schema: vec![
1212                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1213                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1214            ],
1215            rows: vec![Row {
1216                values: vec![i64_value(1), i64_value(2), i64_value(3)],
1217            }],
1218        };
1219
1220        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1221        check_invalid_request(&err, "row has 3 columns but schema has 2");
1222    }
1223
1224    fn new_region_metadata() -> RegionMetadata {
1225        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1226        builder
1227            .push_column_metadata(ColumnMetadata {
1228                column_schema: datatypes::schema::ColumnSchema::new(
1229                    "ts",
1230                    ConcreteDataType::timestamp_millisecond_datatype(),
1231                    false,
1232                ),
1233                semantic_type: SemanticType::Timestamp,
1234                column_id: 1,
1235            })
1236            .push_column_metadata(ColumnMetadata {
1237                column_schema: datatypes::schema::ColumnSchema::new(
1238                    "k0",
1239                    ConcreteDataType::int64_datatype(),
1240                    true,
1241                ),
1242                semantic_type: SemanticType::Tag,
1243                column_id: 2,
1244            })
1245            .primary_key(vec![2]);
1246        builder.build().unwrap()
1247    }
1248
1249    #[test]
1250    fn test_check_schema() {
1251        let rows = Rows {
1252            schema: vec![
1253                new_column_schema(
1254                    "ts",
1255                    ColumnDataType::TimestampMillisecond,
1256                    SemanticType::Timestamp,
1257                ),
1258                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1259            ],
1260            rows: vec![Row {
1261                values: vec![ts_ms_value(1), i64_value(2)],
1262            }],
1263        };
1264        let metadata = new_region_metadata();
1265
1266        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1267        request.check_schema(&metadata).unwrap();
1268    }
1269
1270    #[test]
1271    fn test_column_type() {
1272        let rows = Rows {
1273            schema: vec![
1274                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1275                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1276            ],
1277            rows: vec![Row {
1278                values: vec![i64_value(1), i64_value(2)],
1279            }],
1280        };
1281        let metadata = new_region_metadata();
1282
1283        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1284        let err = request.check_schema(&metadata).unwrap_err();
1285        check_invalid_request(
1286            &err,
1287            "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1288        );
1289    }
1290
1291    #[test]
1292    fn test_semantic_type() {
1293        let rows = Rows {
1294            schema: vec![
1295                new_column_schema(
1296                    "ts",
1297                    ColumnDataType::TimestampMillisecond,
1298                    SemanticType::Tag,
1299                ),
1300                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1301            ],
1302            rows: vec![Row {
1303                values: vec![ts_ms_value(1), i64_value(2)],
1304            }],
1305        };
1306        let metadata = new_region_metadata();
1307
1308        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1309        let err = request.check_schema(&metadata).unwrap_err();
1310        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1311    }
1312
1313    #[test]
1314    fn test_column_nullable() {
1315        let rows = Rows {
1316            schema: vec![
1317                new_column_schema(
1318                    "ts",
1319                    ColumnDataType::TimestampMillisecond,
1320                    SemanticType::Timestamp,
1321                ),
1322                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1323            ],
1324            rows: vec![Row {
1325                values: vec![Value { value_data: None }, i64_value(2)],
1326            }],
1327        };
1328        let metadata = new_region_metadata();
1329
1330        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1331        let err = request.check_schema(&metadata).unwrap_err();
1332        check_invalid_request(&err, "column ts is not null but input has null");
1333    }
1334
1335    #[test]
1336    fn test_column_default() {
1337        let rows = Rows {
1338            schema: vec![new_column_schema(
1339                "k0",
1340                ColumnDataType::Int64,
1341                SemanticType::Tag,
1342            )],
1343            rows: vec![Row {
1344                values: vec![i64_value(1)],
1345            }],
1346        };
1347        let metadata = new_region_metadata();
1348
1349        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1350        let err = request.check_schema(&metadata).unwrap_err();
1351        check_invalid_request(&err, "missing column ts");
1352    }
1353
1354    #[test]
1355    fn test_unknown_column() {
1356        let rows = Rows {
1357            schema: vec![
1358                new_column_schema(
1359                    "ts",
1360                    ColumnDataType::TimestampMillisecond,
1361                    SemanticType::Timestamp,
1362                ),
1363                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1364                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1365            ],
1366            rows: vec![Row {
1367                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1368            }],
1369        };
1370        let metadata = new_region_metadata();
1371
1372        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1373        let err = request.check_schema(&metadata).unwrap_err();
1374        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1375    }
1376
1377    #[test]
1378    fn test_fill_impure_columns_err() {
1379        let rows = Rows {
1380            schema: vec![new_column_schema(
1381                "k0",
1382                ColumnDataType::Int64,
1383                SemanticType::Tag,
1384            )],
1385            rows: vec![Row {
1386                values: vec![i64_value(1)],
1387            }],
1388        };
1389        let metadata = {
1390            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1391            builder
1392                .push_column_metadata(ColumnMetadata {
1393                    column_schema: datatypes::schema::ColumnSchema::new(
1394                        "ts",
1395                        ConcreteDataType::timestamp_millisecond_datatype(),
1396                        false,
1397                    )
1398                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1399                        "now()".to_string(),
1400                    )))
1401                    .unwrap(),
1402                    semantic_type: SemanticType::Timestamp,
1403                    column_id: 1,
1404                })
1405                .push_column_metadata(ColumnMetadata {
1406                    column_schema: datatypes::schema::ColumnSchema::new(
1407                        "k0",
1408                        ConcreteDataType::int64_datatype(),
1409                        true,
1410                    ),
1411                    semantic_type: SemanticType::Tag,
1412                    column_id: 2,
1413                })
1414                .primary_key(vec![2]);
1415            builder.build().unwrap()
1416        };
1417
1418        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1419        let err = request.check_schema(&metadata).unwrap_err();
1420        assert!(err.is_fill_default());
1421        assert!(
1422            request
1423                .fill_missing_columns(&metadata)
1424                .unwrap_err()
1425                .to_string()
1426                .contains("unexpected impure default value with region_id")
1427        );
1428    }
1429
1430    #[test]
1431    fn test_fill_missing_columns() {
1432        let rows = Rows {
1433            schema: vec![new_column_schema(
1434                "ts",
1435                ColumnDataType::TimestampMillisecond,
1436                SemanticType::Timestamp,
1437            )],
1438            rows: vec![Row {
1439                values: vec![ts_ms_value(1)],
1440            }],
1441        };
1442        let metadata = new_region_metadata();
1443
1444        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1445        let err = request.check_schema(&metadata).unwrap_err();
1446        assert!(err.is_fill_default());
1447        request.fill_missing_columns(&metadata).unwrap();
1448
1449        let expect_rows = Rows {
1450            schema: vec![new_column_schema(
1451                "ts",
1452                ColumnDataType::TimestampMillisecond,
1453                SemanticType::Timestamp,
1454            )],
1455            rows: vec![Row {
1456                values: vec![ts_ms_value(1)],
1457            }],
1458        };
1459        assert_eq!(expect_rows, request.rows);
1460    }
1461
1462    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1463        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1464        builder
1465            .push_column_metadata(ColumnMetadata {
1466                column_schema: datatypes::schema::ColumnSchema::new(
1467                    "ts",
1468                    ConcreteDataType::timestamp_millisecond_datatype(),
1469                    false,
1470                ),
1471                semantic_type: SemanticType::Timestamp,
1472                column_id: 1,
1473            })
1474            .push_column_metadata(ColumnMetadata {
1475                column_schema: datatypes::schema::ColumnSchema::new(
1476                    "k0",
1477                    ConcreteDataType::int64_datatype(),
1478                    true,
1479                ),
1480                semantic_type: SemanticType::Tag,
1481                column_id: 2,
1482            })
1483            .primary_key(vec![2]);
1484        builder
1485    }
1486
1487    fn region_metadata_two_fields() -> RegionMetadata {
1488        let mut builder = builder_with_ts_tag();
1489        builder
1490            .push_column_metadata(ColumnMetadata {
1491                column_schema: datatypes::schema::ColumnSchema::new(
1492                    "f0",
1493                    ConcreteDataType::int64_datatype(),
1494                    true,
1495                ),
1496                semantic_type: SemanticType::Field,
1497                column_id: 3,
1498            })
1499            // Column is not nullable.
1500            .push_column_metadata(ColumnMetadata {
1501                column_schema: datatypes::schema::ColumnSchema::new(
1502                    "f1",
1503                    ConcreteDataType::int64_datatype(),
1504                    false,
1505                )
1506                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1507                    datatypes::value::Value::Int64(100),
1508                )))
1509                .unwrap(),
1510                semantic_type: SemanticType::Field,
1511                column_id: 4,
1512            });
1513        builder.build().unwrap()
1514    }
1515
1516    #[test]
1517    fn test_fill_missing_for_delete() {
1518        let rows = Rows {
1519            schema: vec![new_column_schema(
1520                "ts",
1521                ColumnDataType::TimestampMillisecond,
1522                SemanticType::Timestamp,
1523            )],
1524            rows: vec![Row {
1525                values: vec![ts_ms_value(1)],
1526            }],
1527        };
1528        let metadata = region_metadata_two_fields();
1529
1530        let mut request =
1531            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1532        let err = request.check_schema(&metadata).unwrap_err();
1533        check_invalid_request(&err, "delete requests need column k0");
1534        let err = request.fill_missing_columns(&metadata).unwrap_err();
1535        check_invalid_request(&err, "delete requests need column k0");
1536
1537        let rows = Rows {
1538            schema: vec![
1539                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1540                new_column_schema(
1541                    "ts",
1542                    ColumnDataType::TimestampMillisecond,
1543                    SemanticType::Timestamp,
1544                ),
1545            ],
1546            rows: vec![Row {
1547                values: vec![i64_value(100), ts_ms_value(1)],
1548            }],
1549        };
1550        let mut request =
1551            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1552        let err = request.check_schema(&metadata).unwrap_err();
1553        assert!(err.is_fill_default());
1554        request.fill_missing_columns(&metadata).unwrap();
1555
1556        let expect_rows = Rows {
1557            schema: vec![
1558                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1559                new_column_schema(
1560                    "ts",
1561                    ColumnDataType::TimestampMillisecond,
1562                    SemanticType::Timestamp,
1563                ),
1564                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1565            ],
1566            // Column f1 is not nullable and we use 0 for padding.
1567            rows: vec![Row {
1568                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1569            }],
1570        };
1571        assert_eq!(expect_rows, request.rows);
1572    }
1573
1574    #[test]
1575    fn test_fill_missing_without_default_in_delete() {
1576        let mut builder = builder_with_ts_tag();
1577        builder
1578            // f0 is nullable.
1579            .push_column_metadata(ColumnMetadata {
1580                column_schema: datatypes::schema::ColumnSchema::new(
1581                    "f0",
1582                    ConcreteDataType::int64_datatype(),
1583                    true,
1584                ),
1585                semantic_type: SemanticType::Field,
1586                column_id: 3,
1587            })
1588            // f1 is not nullable and don't has default.
1589            .push_column_metadata(ColumnMetadata {
1590                column_schema: datatypes::schema::ColumnSchema::new(
1591                    "f1",
1592                    ConcreteDataType::int64_datatype(),
1593                    false,
1594                ),
1595                semantic_type: SemanticType::Field,
1596                column_id: 4,
1597            });
1598        let metadata = builder.build().unwrap();
1599
1600        let rows = Rows {
1601            schema: vec![
1602                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1603                new_column_schema(
1604                    "ts",
1605                    ColumnDataType::TimestampMillisecond,
1606                    SemanticType::Timestamp,
1607                ),
1608            ],
1609            // Missing f0 (nullable), f1 (not nullable).
1610            rows: vec![Row {
1611                values: vec![i64_value(100), ts_ms_value(1)],
1612            }],
1613        };
1614        let mut request =
1615            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1616        let err = request.check_schema(&metadata).unwrap_err();
1617        assert!(err.is_fill_default());
1618        request.fill_missing_columns(&metadata).unwrap();
1619
1620        let expect_rows = Rows {
1621            schema: vec![
1622                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1623                new_column_schema(
1624                    "ts",
1625                    ColumnDataType::TimestampMillisecond,
1626                    SemanticType::Timestamp,
1627                ),
1628                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1629            ],
1630            // Column f1 is not nullable and we use 0 for padding.
1631            rows: vec![Row {
1632                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1633            }],
1634        };
1635        assert_eq!(expect_rows, request.rows);
1636    }
1637
1638    #[test]
1639    fn test_no_default() {
1640        let rows = Rows {
1641            schema: vec![new_column_schema(
1642                "k0",
1643                ColumnDataType::Int64,
1644                SemanticType::Tag,
1645            )],
1646            rows: vec![Row {
1647                values: vec![i64_value(1)],
1648            }],
1649        };
1650        let metadata = new_region_metadata();
1651
1652        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1653        let err = request.fill_missing_columns(&metadata).unwrap_err();
1654        check_invalid_request(&err, "column ts does not have default value");
1655    }
1656
1657    #[test]
1658    fn test_missing_and_invalid() {
1659        // Missing f0 and f1 has invalid type (string).
1660        let rows = Rows {
1661            schema: vec![
1662                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1663                new_column_schema(
1664                    "ts",
1665                    ColumnDataType::TimestampMillisecond,
1666                    SemanticType::Timestamp,
1667                ),
1668                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1669            ],
1670            rows: vec![Row {
1671                values: vec![
1672                    i64_value(100),
1673                    ts_ms_value(1),
1674                    Value {
1675                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1676                    },
1677                ],
1678            }],
1679        };
1680        let metadata = region_metadata_two_fields();
1681
1682        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1683        let err = request.check_schema(&metadata).unwrap_err();
1684        check_invalid_request(
1685            &err,
1686            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1687        );
1688    }
1689
1690    #[test]
1691    fn test_write_request_metadata() {
1692        let rows = Rows {
1693            schema: vec![
1694                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1695                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1696            ],
1697            rows: vec![Row {
1698                values: vec![i64_value(1), i64_value(2)],
1699            }],
1700        };
1701
1702        let metadata = Arc::new(new_region_metadata());
1703        let request = WriteRequest::new(
1704            RegionId::new(1, 1),
1705            OpType::Put,
1706            rows,
1707            Some(metadata.clone()),
1708        )
1709        .unwrap();
1710
1711        assert!(request.region_metadata.is_some());
1712        assert_eq!(
1713            request.region_metadata.unwrap().region_id,
1714            RegionId::new(1, 1)
1715        );
1716    }
1717}