mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value,
23    ColumnDataTypeWrapper,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{ensure, OptionExt, ResultExt};
33use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
34use store_api::manifest::ManifestVersion;
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
37use store_api::region_request::{
38    AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
39    RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
40    RegionOpenRequest, RegionRequest, RegionTruncateRequest,
41};
42use store_api::storage::{RegionId, SequenceNumber};
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46    CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47    FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedImpureDefaultSnafu,
48};
49use crate::manifest::action::RegionEdit;
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::MemtableId;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::wal::entry_distributor::WalEntryReceiver;
54use crate::wal::EntryId;
55
56/// Request to write a region.
57#[derive(Debug)]
58pub struct WriteRequest {
59    /// Region to write.
60    pub region_id: RegionId,
61    /// Type of the write request.
62    pub op_type: OpType,
63    /// Rows to write.
64    pub rows: Rows,
65    /// Map column name to column index in `rows`.
66    pub name_to_index: HashMap<String, usize>,
67    /// Whether each column has null.
68    pub has_null: Vec<bool>,
69    /// Write hint.
70    pub hint: Option<WriteHint>,
71    /// Region metadata on the time of this request is created.
72    pub(crate) region_metadata: Option<RegionMetadataRef>,
73}
74
75impl WriteRequest {
76    /// Creates a new request.
77    ///
78    /// Returns `Err` if `rows` are invalid.
79    pub fn new(
80        region_id: RegionId,
81        op_type: OpType,
82        rows: Rows,
83        region_metadata: Option<RegionMetadataRef>,
84    ) -> Result<WriteRequest> {
85        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
86        for (index, column) in rows.schema.iter().enumerate() {
87            ensure!(
88                name_to_index
89                    .insert(column.column_name.clone(), index)
90                    .is_none(),
91                InvalidRequestSnafu {
92                    region_id,
93                    reason: format!("duplicate column {}", column.column_name),
94                }
95            );
96        }
97
98        let mut has_null = vec![false; rows.schema.len()];
99        for row in &rows.rows {
100            ensure!(
101                row.values.len() == rows.schema.len(),
102                InvalidRequestSnafu {
103                    region_id,
104                    reason: format!(
105                        "row has {} columns but schema has {}",
106                        row.values.len(),
107                        rows.schema.len()
108                    ),
109                }
110            );
111
112            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
113                validate_proto_value(region_id, value, column_schema)?;
114
115                if value.value_data.is_none() {
116                    has_null[i] = true;
117                }
118            }
119        }
120
121        Ok(WriteRequest {
122            region_id,
123            op_type,
124            rows,
125            name_to_index,
126            has_null,
127            hint: None,
128            region_metadata,
129        })
130    }
131
132    /// Sets the write hint.
133    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
134        self.hint = hint;
135        self
136    }
137
138    /// Returns the encoding hint.
139    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
140        infer_primary_key_encoding_from_hint(self.hint.as_ref())
141    }
142
143    /// Returns estimated size of the request.
144    pub(crate) fn estimated_size(&self) -> usize {
145        let row_size = self
146            .rows
147            .rows
148            .first()
149            .map(|row| row.encoded_len())
150            .unwrap_or(0);
151        row_size * self.rows.rows.len()
152    }
153
154    /// Gets column index by name.
155    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
156        self.name_to_index.get(name).copied()
157    }
158
159    /// Checks schema of rows is compatible with schema of the region.
160    ///
161    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
162    /// error.
163    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
164        debug_assert_eq!(self.region_id, metadata.region_id);
165
166        let region_id = self.region_id;
167        // Index all columns in rows.
168        let mut rows_columns: HashMap<_, _> = self
169            .rows
170            .schema
171            .iter()
172            .map(|column| (&column.column_name, column))
173            .collect();
174
175        let mut need_fill_default = false;
176        // Checks all columns in this region.
177        for column in &metadata.column_metadatas {
178            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
179                // Check data type.
180                ensure!(
181                    is_column_type_value_eq(
182                        input_col.datatype,
183                        input_col.datatype_extension,
184                        &column.column_schema.data_type
185                    ),
186                    InvalidRequestSnafu {
187                        region_id,
188                        reason: format!(
189                            "column {} expect type {:?}, given: {}({})",
190                            column.column_schema.name,
191                            column.column_schema.data_type,
192                            ColumnDataType::try_from(input_col.datatype)
193                                .map(|v| v.as_str_name())
194                                .unwrap_or("Unknown"),
195                            input_col.datatype,
196                        )
197                    }
198                );
199
200                // Check semantic type.
201                ensure!(
202                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
203                    InvalidRequestSnafu {
204                        region_id,
205                        reason: format!(
206                            "column {} has semantic type {:?}, given: {}({})",
207                            column.column_schema.name,
208                            column.semantic_type,
209                            api::v1::SemanticType::try_from(input_col.semantic_type)
210                                .map(|v| v.as_str_name())
211                                .unwrap_or("Unknown"),
212                            input_col.semantic_type
213                        ),
214                    }
215                );
216
217                // Check nullable.
218                // Safety: `rows_columns` ensures this column exists.
219                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
220                ensure!(
221                    !has_null || column.column_schema.is_nullable(),
222                    InvalidRequestSnafu {
223                        region_id,
224                        reason: format!(
225                            "column {} is not null but input has null",
226                            column.column_schema.name
227                        ),
228                    }
229                );
230            } else {
231                // Rows don't have this column.
232                self.check_missing_column(column)?;
233
234                need_fill_default = true;
235            }
236        }
237
238        // Checks all columns in rows exist in the region.
239        if !rows_columns.is_empty() {
240            let names: Vec<_> = rows_columns.into_keys().collect();
241            return InvalidRequestSnafu {
242                region_id,
243                reason: format!("unknown columns: {:?}", names),
244            }
245            .fail();
246        }
247
248        // If we need to fill default values, return a special error.
249        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
250
251        Ok(())
252    }
253
254    /// Tries to fill missing columns.
255    ///
256    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
257    /// values.
258    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
259        debug_assert_eq!(self.region_id, metadata.region_id);
260
261        let mut columns_to_fill = vec![];
262        for column in &metadata.column_metadatas {
263            if !self.name_to_index.contains_key(&column.column_schema.name) {
264                columns_to_fill.push(column);
265            }
266        }
267        self.fill_columns(columns_to_fill)?;
268
269        Ok(())
270    }
271
272    /// Checks the schema and fill missing columns.
273    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
274        if let Err(e) = self.check_schema(metadata) {
275            if e.is_fill_default() {
276                // TODO(yingwen): Add metrics for this case.
277                // We need to fill default value. The write request may be a request
278                // sent before changing the schema.
279                self.fill_missing_columns(metadata)?;
280            } else {
281                return Err(e);
282            }
283        }
284
285        Ok(())
286    }
287
288    /// Fills default value for specific `columns`.
289    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
290        let mut default_values = Vec::with_capacity(columns.len());
291        let mut columns_to_fill = Vec::with_capacity(columns.len());
292        for column in columns {
293            let default_value = self.column_default_value(column)?;
294            if default_value.value_data.is_some() {
295                default_values.push(default_value);
296                columns_to_fill.push(column);
297            }
298        }
299
300        for row in &mut self.rows.rows {
301            row.values.extend(default_values.iter().cloned());
302        }
303
304        for column in columns_to_fill {
305            let (datatype, datatype_ext) =
306                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
307                    .with_context(|_| ConvertColumnDataTypeSnafu {
308                        reason: format!(
309                            "no protobuf type for column {} ({:?})",
310                            column.column_schema.name, column.column_schema.data_type
311                        ),
312                    })?
313                    .to_parts();
314            self.rows.schema.push(ColumnSchema {
315                column_name: column.column_schema.name.clone(),
316                datatype: datatype as i32,
317                semantic_type: column.semantic_type as i32,
318                datatype_extension: datatype_ext,
319                options: options_from_column_schema(&column.column_schema),
320            });
321        }
322
323        Ok(())
324    }
325
326    /// Checks whether we should allow a row doesn't provide this column.
327    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
328        if self.op_type == OpType::Delete {
329            if column.semantic_type == SemanticType::Field {
330                // For delete request, all tags and timestamp is required. We don't fill default
331                // tag or timestamp while deleting rows.
332                return Ok(());
333            } else {
334                return InvalidRequestSnafu {
335                    region_id: self.region_id,
336                    reason: format!("delete requests need column {}", column.column_schema.name),
337                }
338                .fail();
339            }
340        }
341
342        // Not a delete request. Checks whether they have default value.
343        ensure!(
344            column.column_schema.is_nullable()
345                || column.column_schema.default_constraint().is_some(),
346            InvalidRequestSnafu {
347                region_id: self.region_id,
348                reason: format!("missing column {}", column.column_schema.name),
349            }
350        );
351
352        Ok(())
353    }
354
355    /// Returns the default value for specific column.
356    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
357        let default_value = match self.op_type {
358            OpType::Delete => {
359                ensure!(
360                    column.semantic_type == SemanticType::Field,
361                    InvalidRequestSnafu {
362                        region_id: self.region_id,
363                        reason: format!(
364                            "delete requests need column {}",
365                            column.column_schema.name
366                        ),
367                    }
368                );
369
370                // For delete request, we need a default value for padding so we
371                // can delete a row even a field doesn't have a default value. So the
372                // value doesn't need to following the default value constraint of the
373                // column.
374                if column.column_schema.is_nullable() {
375                    datatypes::value::Value::Null
376                } else {
377                    column.column_schema.data_type.default_value()
378                }
379            }
380            OpType::Put => {
381                // For put requests, we use the default value from column schema.
382                if column.column_schema.is_default_impure() {
383                    UnexpectedImpureDefaultSnafu {
384                        region_id: self.region_id,
385                        column: &column.column_schema.name,
386                        default_value: format!("{:?}", column.column_schema.default_constraint()),
387                    }
388                    .fail()?
389                }
390                column
391                    .column_schema
392                    .create_default()
393                    .context(CreateDefaultSnafu {
394                        region_id: self.region_id,
395                        column: &column.column_schema.name,
396                    })?
397                    // This column doesn't have default value.
398                    .with_context(|| InvalidRequestSnafu {
399                        region_id: self.region_id,
400                        reason: format!(
401                            "column {} does not have default value",
402                            column.column_schema.name
403                        ),
404                    })?
405            }
406        };
407
408        // Convert default value into proto's value.
409        to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
410            region_id: self.region_id,
411            reason: format!(
412                "no protobuf type for default value of column {} ({:?})",
413                column.column_schema.name, column.column_schema.data_type
414            ),
415        })
416    }
417}
418
419/// Validate proto value schema.
420pub(crate) fn validate_proto_value(
421    region_id: RegionId,
422    value: &Value,
423    column_schema: &ColumnSchema,
424) -> Result<()> {
425    if let Some(value_type) = proto_value_type(value) {
426        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
427            InvalidRequestSnafu {
428                region_id,
429                reason: format!(
430                    "column {} has unknown type {}",
431                    column_schema.column_name, column_schema.datatype
432                ),
433            }
434            .build()
435        })?;
436        ensure!(
437            proto_value_type_match(column_type, value_type),
438            InvalidRequestSnafu {
439                region_id,
440                reason: format!(
441                    "value has type {:?}, but column {} has type {:?}({})",
442                    value_type, column_schema.column_name, column_type, column_schema.datatype,
443                ),
444            }
445        );
446    }
447
448    Ok(())
449}
450
451fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
452    match (column_type, value_type) {
453        (ct, vt) if ct == vt => true,
454        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
455        (ColumnDataType::Json, ColumnDataType::Binary) => true,
456        _ => false,
457    }
458}
459
460/// Oneshot output result sender.
461#[derive(Debug)]
462pub struct OutputTx(Sender<Result<AffectedRows>>);
463
464impl OutputTx {
465    /// Creates a new output sender.
466    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
467        OutputTx(sender)
468    }
469
470    /// Sends the `result`.
471    pub(crate) fn send(self, result: Result<AffectedRows>) {
472        // Ignores send result.
473        let _ = self.0.send(result);
474    }
475}
476
477/// Optional output result sender.
478#[derive(Debug)]
479pub(crate) struct OptionOutputTx(Option<OutputTx>);
480
481impl OptionOutputTx {
482    /// Creates a sender.
483    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
484        OptionOutputTx(sender)
485    }
486
487    /// Creates an empty sender.
488    pub(crate) fn none() -> OptionOutputTx {
489        OptionOutputTx(None)
490    }
491
492    /// Sends the `result` and consumes the inner sender.
493    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
494        if let Some(sender) = self.0.take() {
495            sender.send(result);
496        }
497    }
498
499    /// Sends the `result` and consumes the sender.
500    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
501        if let Some(sender) = self.0.take() {
502            sender.send(result);
503        }
504    }
505
506    /// Takes the inner sender.
507    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
508        self.0.take()
509    }
510}
511
512impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
513    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
514        Self::new(Some(OutputTx::new(sender)))
515    }
516}
517
518impl OnFailure for OptionOutputTx {
519    fn on_failure(&mut self, err: Error) {
520        self.send_mut(Err(err));
521    }
522}
523
524/// Callback on failure.
525pub(crate) trait OnFailure {
526    /// Handles `err` on failure.
527    fn on_failure(&mut self, err: Error);
528}
529
530/// Sender and write request.
531#[derive(Debug)]
532pub(crate) struct SenderWriteRequest {
533    /// Result sender.
534    pub(crate) sender: OptionOutputTx,
535    pub(crate) request: WriteRequest,
536}
537
538pub(crate) struct SenderBulkRequest {
539    pub(crate) sender: OptionOutputTx,
540    pub(crate) region_id: RegionId,
541    pub(crate) request: BulkPart,
542    pub(crate) region_metadata: RegionMetadataRef,
543}
544
545/// Request sent to a worker
546#[derive(Debug)]
547pub(crate) enum WorkerRequest {
548    /// Write to a region.
549    Write(SenderWriteRequest),
550
551    /// Ddl request to a region.
552    Ddl(SenderDdlRequest),
553
554    /// Notifications from internal background jobs.
555    Background {
556        /// Id of the region to send.
557        region_id: RegionId,
558        /// Internal notification.
559        notify: BackgroundNotify,
560    },
561
562    /// The internal commands.
563    SetRegionRoleStateGracefully {
564        /// Id of the region to send.
565        region_id: RegionId,
566        /// The [SettableRegionRoleState].
567        region_role_state: SettableRegionRoleState,
568        /// The sender of [SetReadonlyResponse].
569        sender: Sender<SetRegionRoleStateResponse>,
570    },
571
572    /// Notify a worker to stop.
573    Stop,
574
575    /// Use [RegionEdit] to edit a region directly.
576    EditRegion(RegionEditRequest),
577
578    /// Keep the manifest of a region up to date.
579    SyncRegion(RegionSyncRequest),
580
581    /// Bulk inserts request and region metadata.
582    BulkInserts {
583        metadata: Option<RegionMetadataRef>,
584        request: RegionBulkInsertsRequest,
585        sender: OptionOutputTx,
586    },
587}
588
589impl WorkerRequest {
590    pub(crate) fn new_open_region_request(
591        region_id: RegionId,
592        request: RegionOpenRequest,
593        entry_receiver: Option<WalEntryReceiver>,
594    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
595        let (sender, receiver) = oneshot::channel();
596
597        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
598            region_id,
599            sender: sender.into(),
600            request: DdlRequest::Open((request, entry_receiver)),
601        });
602
603        (worker_request, receiver)
604    }
605
606    /// Converts request from a [RegionRequest].
607    pub(crate) fn try_from_region_request(
608        region_id: RegionId,
609        value: RegionRequest,
610        region_metadata: Option<RegionMetadataRef>,
611    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
612        let (sender, receiver) = oneshot::channel();
613        let worker_request = match value {
614            RegionRequest::Put(v) => {
615                let mut write_request =
616                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
617                        .with_hint(v.hint);
618                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
619                    && let Some(region_metadata) = &region_metadata
620                {
621                    write_request.maybe_fill_missing_columns(region_metadata)?;
622                }
623                WorkerRequest::Write(SenderWriteRequest {
624                    sender: sender.into(),
625                    request: write_request,
626                })
627            }
628            RegionRequest::Delete(v) => {
629                let mut write_request =
630                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
631                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
632                    && let Some(region_metadata) = &region_metadata
633                {
634                    write_request.maybe_fill_missing_columns(region_metadata)?;
635                }
636                WorkerRequest::Write(SenderWriteRequest {
637                    sender: sender.into(),
638                    request: write_request,
639                })
640            }
641            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
642                region_id,
643                sender: sender.into(),
644                request: DdlRequest::Create(v),
645            }),
646            RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
647                region_id,
648                sender: sender.into(),
649                request: DdlRequest::Drop,
650            }),
651            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
652                region_id,
653                sender: sender.into(),
654                request: DdlRequest::Open((v, None)),
655            }),
656            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
657                region_id,
658                sender: sender.into(),
659                request: DdlRequest::Close(v),
660            }),
661            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
662                region_id,
663                sender: sender.into(),
664                request: DdlRequest::Alter(v),
665            }),
666            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
667                region_id,
668                sender: sender.into(),
669                request: DdlRequest::Flush(v),
670            }),
671            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
672                region_id,
673                sender: sender.into(),
674                request: DdlRequest::Compact(v),
675            }),
676            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
677                region_id,
678                sender: sender.into(),
679                request: DdlRequest::Truncate(v),
680            }),
681            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
682                region_id,
683                sender: sender.into(),
684                request: DdlRequest::Catchup(v),
685            }),
686            RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
687                metadata: region_metadata,
688                sender: sender.into(),
689                request: region_bulk_inserts_request,
690            },
691        };
692
693        Ok((worker_request, receiver))
694    }
695
696    pub(crate) fn new_set_readonly_gracefully(
697        region_id: RegionId,
698        region_role_state: SettableRegionRoleState,
699    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
700        let (sender, receiver) = oneshot::channel();
701
702        (
703            WorkerRequest::SetRegionRoleStateGracefully {
704                region_id,
705                region_role_state,
706                sender,
707            },
708            receiver,
709        )
710    }
711
712    pub(crate) fn new_sync_region_request(
713        region_id: RegionId,
714        manifest_version: ManifestVersion,
715    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
716        let (sender, receiver) = oneshot::channel();
717        (
718            WorkerRequest::SyncRegion(RegionSyncRequest {
719                region_id,
720                manifest_version,
721                sender,
722            }),
723            receiver,
724        )
725    }
726}
727
728/// DDL request to a region.
729#[derive(Debug)]
730pub(crate) enum DdlRequest {
731    Create(RegionCreateRequest),
732    Drop,
733    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
734    Close(RegionCloseRequest),
735    Alter(RegionAlterRequest),
736    Flush(RegionFlushRequest),
737    Compact(RegionCompactRequest),
738    Truncate(RegionTruncateRequest),
739    Catchup(RegionCatchupRequest),
740}
741
742/// Sender and Ddl request.
743#[derive(Debug)]
744pub(crate) struct SenderDdlRequest {
745    /// Region id of the request.
746    pub(crate) region_id: RegionId,
747    /// Result sender.
748    pub(crate) sender: OptionOutputTx,
749    /// Ddl request.
750    pub(crate) request: DdlRequest,
751}
752
753/// Notification from a background job.
754#[derive(Debug)]
755pub(crate) enum BackgroundNotify {
756    /// Flush has finished.
757    FlushFinished(FlushFinished),
758    /// Flush has failed.
759    FlushFailed(FlushFailed),
760    /// Compaction has finished.
761    CompactionFinished(CompactionFinished),
762    /// Compaction has failed.
763    CompactionFailed(CompactionFailed),
764    /// Truncate result.
765    Truncate(TruncateResult),
766    /// Region change result.
767    RegionChange(RegionChangeResult),
768    /// Region edit result.
769    RegionEdit(RegionEditResult),
770}
771
772/// Notifies a flush job is finished.
773#[derive(Debug)]
774pub(crate) struct FlushFinished {
775    /// Region id.
776    pub(crate) region_id: RegionId,
777    /// Entry id of flushed data.
778    pub(crate) flushed_entry_id: EntryId,
779    /// Flush result senders.
780    pub(crate) senders: Vec<OutputTx>,
781    /// Flush timer.
782    pub(crate) _timer: HistogramTimer,
783    /// Region edit to apply.
784    pub(crate) edit: RegionEdit,
785    /// Memtables to remove.
786    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
787}
788
789impl FlushFinished {
790    /// Marks the flush job as successful and observes the timer.
791    pub(crate) fn on_success(self) {
792        for sender in self.senders {
793            sender.send(Ok(0));
794        }
795    }
796}
797
798impl OnFailure for FlushFinished {
799    fn on_failure(&mut self, err: Error) {
800        let err = Arc::new(err);
801        for sender in self.senders.drain(..) {
802            sender.send(Err(err.clone()).context(FlushRegionSnafu {
803                region_id: self.region_id,
804            }));
805        }
806    }
807}
808
809/// Notifies a flush job is failed.
810#[derive(Debug)]
811pub(crate) struct FlushFailed {
812    /// The error source of the failure.
813    pub(crate) err: Arc<Error>,
814}
815
816/// Notifies a compaction job has finished.
817#[derive(Debug)]
818pub(crate) struct CompactionFinished {
819    /// Region id.
820    pub(crate) region_id: RegionId,
821    /// Compaction result senders.
822    pub(crate) senders: Vec<OutputTx>,
823    /// Start time of compaction task.
824    pub(crate) start_time: Instant,
825    /// Region edit to apply.
826    pub(crate) edit: RegionEdit,
827}
828
829impl CompactionFinished {
830    pub fn on_success(self) {
831        // only update compaction time on success
832        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
833
834        for sender in self.senders {
835            sender.send(Ok(0));
836        }
837        info!("Successfully compacted region: {}", self.region_id);
838    }
839}
840
841impl OnFailure for CompactionFinished {
842    /// Compaction succeeded but failed to update manifest or region's already been dropped.
843    fn on_failure(&mut self, err: Error) {
844        let err = Arc::new(err);
845        for sender in self.senders.drain(..) {
846            sender.send(Err(err.clone()).context(CompactRegionSnafu {
847                region_id: self.region_id,
848            }));
849        }
850    }
851}
852
853/// A failing compaction result.
854#[derive(Debug)]
855pub(crate) struct CompactionFailed {
856    pub(crate) region_id: RegionId,
857    /// The error source of the failure.
858    pub(crate) err: Arc<Error>,
859}
860
861/// Notifies the truncate result of a region.
862#[derive(Debug)]
863pub(crate) struct TruncateResult {
864    /// Region id.
865    pub(crate) region_id: RegionId,
866    /// Result sender.
867    pub(crate) sender: OptionOutputTx,
868    /// Truncate result.
869    pub(crate) result: Result<()>,
870    /// Truncated entry id.
871    pub(crate) truncated_entry_id: EntryId,
872    /// Truncated sequence.
873    pub(crate) truncated_sequence: SequenceNumber,
874}
875
876/// Notifies the region the result of writing region change action.
877#[derive(Debug)]
878pub(crate) struct RegionChangeResult {
879    /// Region id.
880    pub(crate) region_id: RegionId,
881    /// The new region metadata to apply.
882    pub(crate) new_meta: RegionMetadataRef,
883    /// Result sender.
884    pub(crate) sender: OptionOutputTx,
885    /// Result from the manifest manager.
886    pub(crate) result: Result<()>,
887}
888
889/// Request to edit a region directly.
890#[derive(Debug)]
891pub(crate) struct RegionEditRequest {
892    pub(crate) region_id: RegionId,
893    pub(crate) edit: RegionEdit,
894    /// The sender to notify the result to the region engine.
895    pub(crate) tx: Sender<Result<()>>,
896}
897
898/// Notifies the regin the result of editing region.
899#[derive(Debug)]
900pub(crate) struct RegionEditResult {
901    /// Region id.
902    pub(crate) region_id: RegionId,
903    /// Result sender.
904    pub(crate) sender: Sender<Result<()>>,
905    /// Region edit to apply.
906    pub(crate) edit: RegionEdit,
907    /// Result from the manifest manager.
908    pub(crate) result: Result<()>,
909}
910
911#[derive(Debug)]
912pub(crate) struct RegionSyncRequest {
913    pub(crate) region_id: RegionId,
914    pub(crate) manifest_version: ManifestVersion,
915    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
916    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
917}
918
919#[cfg(test)]
920mod tests {
921    use api::v1::value::ValueData;
922    use api::v1::{Row, SemanticType};
923    use datatypes::prelude::ConcreteDataType;
924    use datatypes::schema::ColumnDefaultConstraint;
925    use store_api::metadata::RegionMetadataBuilder;
926
927    use super::*;
928    use crate::error::Error;
929    use crate::test_util::{i64_value, ts_ms_value};
930
931    fn new_column_schema(
932        name: &str,
933        data_type: ColumnDataType,
934        semantic_type: SemanticType,
935    ) -> ColumnSchema {
936        ColumnSchema {
937            column_name: name.to_string(),
938            datatype: data_type as i32,
939            semantic_type: semantic_type as i32,
940            ..Default::default()
941        }
942    }
943
944    fn check_invalid_request(err: &Error, expect: &str) {
945        if let Error::InvalidRequest {
946            region_id: _,
947            reason,
948            location: _,
949        } = err
950        {
951            assert_eq!(reason, expect);
952        } else {
953            panic!("Unexpected error {err}")
954        }
955    }
956
957    #[test]
958    fn test_write_request_duplicate_column() {
959        let rows = Rows {
960            schema: vec![
961                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
962                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
963            ],
964            rows: vec![],
965        };
966
967        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
968        check_invalid_request(&err, "duplicate column c0");
969    }
970
971    #[test]
972    fn test_valid_write_request() {
973        let rows = Rows {
974            schema: vec![
975                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
976                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
977            ],
978            rows: vec![Row {
979                values: vec![i64_value(1), i64_value(2)],
980            }],
981        };
982
983        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
984        assert_eq!(0, request.column_index_by_name("c0").unwrap());
985        assert_eq!(1, request.column_index_by_name("c1").unwrap());
986        assert_eq!(None, request.column_index_by_name("c2"));
987    }
988
989    #[test]
990    fn test_write_request_column_num() {
991        let rows = Rows {
992            schema: vec![
993                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
994                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
995            ],
996            rows: vec![Row {
997                values: vec![i64_value(1), i64_value(2), i64_value(3)],
998            }],
999        };
1000
1001        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1002        check_invalid_request(&err, "row has 3 columns but schema has 2");
1003    }
1004
1005    fn new_region_metadata() -> RegionMetadata {
1006        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1007        builder
1008            .push_column_metadata(ColumnMetadata {
1009                column_schema: datatypes::schema::ColumnSchema::new(
1010                    "ts",
1011                    ConcreteDataType::timestamp_millisecond_datatype(),
1012                    false,
1013                ),
1014                semantic_type: SemanticType::Timestamp,
1015                column_id: 1,
1016            })
1017            .push_column_metadata(ColumnMetadata {
1018                column_schema: datatypes::schema::ColumnSchema::new(
1019                    "k0",
1020                    ConcreteDataType::int64_datatype(),
1021                    true,
1022                ),
1023                semantic_type: SemanticType::Tag,
1024                column_id: 2,
1025            })
1026            .primary_key(vec![2]);
1027        builder.build().unwrap()
1028    }
1029
1030    #[test]
1031    fn test_check_schema() {
1032        let rows = Rows {
1033            schema: vec![
1034                new_column_schema(
1035                    "ts",
1036                    ColumnDataType::TimestampMillisecond,
1037                    SemanticType::Timestamp,
1038                ),
1039                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1040            ],
1041            rows: vec![Row {
1042                values: vec![ts_ms_value(1), i64_value(2)],
1043            }],
1044        };
1045        let metadata = new_region_metadata();
1046
1047        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1048        request.check_schema(&metadata).unwrap();
1049    }
1050
1051    #[test]
1052    fn test_column_type() {
1053        let rows = Rows {
1054            schema: vec![
1055                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1056                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1057            ],
1058            rows: vec![Row {
1059                values: vec![i64_value(1), i64_value(2)],
1060            }],
1061        };
1062        let metadata = new_region_metadata();
1063
1064        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1065        let err = request.check_schema(&metadata).unwrap_err();
1066        check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)");
1067    }
1068
1069    #[test]
1070    fn test_semantic_type() {
1071        let rows = Rows {
1072            schema: vec![
1073                new_column_schema(
1074                    "ts",
1075                    ColumnDataType::TimestampMillisecond,
1076                    SemanticType::Tag,
1077                ),
1078                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1079            ],
1080            rows: vec![Row {
1081                values: vec![ts_ms_value(1), i64_value(2)],
1082            }],
1083        };
1084        let metadata = new_region_metadata();
1085
1086        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1087        let err = request.check_schema(&metadata).unwrap_err();
1088        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1089    }
1090
1091    #[test]
1092    fn test_column_nullable() {
1093        let rows = Rows {
1094            schema: vec![
1095                new_column_schema(
1096                    "ts",
1097                    ColumnDataType::TimestampMillisecond,
1098                    SemanticType::Timestamp,
1099                ),
1100                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1101            ],
1102            rows: vec![Row {
1103                values: vec![Value { value_data: None }, i64_value(2)],
1104            }],
1105        };
1106        let metadata = new_region_metadata();
1107
1108        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1109        let err = request.check_schema(&metadata).unwrap_err();
1110        check_invalid_request(&err, "column ts is not null but input has null");
1111    }
1112
1113    #[test]
1114    fn test_column_default() {
1115        let rows = Rows {
1116            schema: vec![new_column_schema(
1117                "k0",
1118                ColumnDataType::Int64,
1119                SemanticType::Tag,
1120            )],
1121            rows: vec![Row {
1122                values: vec![i64_value(1)],
1123            }],
1124        };
1125        let metadata = new_region_metadata();
1126
1127        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1128        let err = request.check_schema(&metadata).unwrap_err();
1129        check_invalid_request(&err, "missing column ts");
1130    }
1131
1132    #[test]
1133    fn test_unknown_column() {
1134        let rows = Rows {
1135            schema: vec![
1136                new_column_schema(
1137                    "ts",
1138                    ColumnDataType::TimestampMillisecond,
1139                    SemanticType::Timestamp,
1140                ),
1141                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1142                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1143            ],
1144            rows: vec![Row {
1145                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1146            }],
1147        };
1148        let metadata = new_region_metadata();
1149
1150        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1151        let err = request.check_schema(&metadata).unwrap_err();
1152        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1153    }
1154
1155    #[test]
1156    fn test_fill_impure_columns_err() {
1157        let rows = Rows {
1158            schema: vec![new_column_schema(
1159                "k0",
1160                ColumnDataType::Int64,
1161                SemanticType::Tag,
1162            )],
1163            rows: vec![Row {
1164                values: vec![i64_value(1)],
1165            }],
1166        };
1167        let metadata = {
1168            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1169            builder
1170                .push_column_metadata(ColumnMetadata {
1171                    column_schema: datatypes::schema::ColumnSchema::new(
1172                        "ts",
1173                        ConcreteDataType::timestamp_millisecond_datatype(),
1174                        false,
1175                    )
1176                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1177                        "now()".to_string(),
1178                    )))
1179                    .unwrap(),
1180                    semantic_type: SemanticType::Timestamp,
1181                    column_id: 1,
1182                })
1183                .push_column_metadata(ColumnMetadata {
1184                    column_schema: datatypes::schema::ColumnSchema::new(
1185                        "k0",
1186                        ConcreteDataType::int64_datatype(),
1187                        true,
1188                    ),
1189                    semantic_type: SemanticType::Tag,
1190                    column_id: 2,
1191                })
1192                .primary_key(vec![2]);
1193            builder.build().unwrap()
1194        };
1195
1196        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1197        let err = request.check_schema(&metadata).unwrap_err();
1198        assert!(err.is_fill_default());
1199        assert!(request
1200            .fill_missing_columns(&metadata)
1201            .unwrap_err()
1202            .to_string()
1203            .contains("Unexpected impure default value with region_id"));
1204    }
1205
1206    #[test]
1207    fn test_fill_missing_columns() {
1208        let rows = Rows {
1209            schema: vec![new_column_schema(
1210                "ts",
1211                ColumnDataType::TimestampMillisecond,
1212                SemanticType::Timestamp,
1213            )],
1214            rows: vec![Row {
1215                values: vec![ts_ms_value(1)],
1216            }],
1217        };
1218        let metadata = new_region_metadata();
1219
1220        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1221        let err = request.check_schema(&metadata).unwrap_err();
1222        assert!(err.is_fill_default());
1223        request.fill_missing_columns(&metadata).unwrap();
1224
1225        let expect_rows = Rows {
1226            schema: vec![new_column_schema(
1227                "ts",
1228                ColumnDataType::TimestampMillisecond,
1229                SemanticType::Timestamp,
1230            )],
1231            rows: vec![Row {
1232                values: vec![ts_ms_value(1)],
1233            }],
1234        };
1235        assert_eq!(expect_rows, request.rows);
1236    }
1237
1238    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1239        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1240        builder
1241            .push_column_metadata(ColumnMetadata {
1242                column_schema: datatypes::schema::ColumnSchema::new(
1243                    "ts",
1244                    ConcreteDataType::timestamp_millisecond_datatype(),
1245                    false,
1246                ),
1247                semantic_type: SemanticType::Timestamp,
1248                column_id: 1,
1249            })
1250            .push_column_metadata(ColumnMetadata {
1251                column_schema: datatypes::schema::ColumnSchema::new(
1252                    "k0",
1253                    ConcreteDataType::int64_datatype(),
1254                    true,
1255                ),
1256                semantic_type: SemanticType::Tag,
1257                column_id: 2,
1258            })
1259            .primary_key(vec![2]);
1260        builder
1261    }
1262
1263    fn region_metadata_two_fields() -> RegionMetadata {
1264        let mut builder = builder_with_ts_tag();
1265        builder
1266            .push_column_metadata(ColumnMetadata {
1267                column_schema: datatypes::schema::ColumnSchema::new(
1268                    "f0",
1269                    ConcreteDataType::int64_datatype(),
1270                    true,
1271                ),
1272                semantic_type: SemanticType::Field,
1273                column_id: 3,
1274            })
1275            // Column is not nullable.
1276            .push_column_metadata(ColumnMetadata {
1277                column_schema: datatypes::schema::ColumnSchema::new(
1278                    "f1",
1279                    ConcreteDataType::int64_datatype(),
1280                    false,
1281                )
1282                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1283                    datatypes::value::Value::Int64(100),
1284                )))
1285                .unwrap(),
1286                semantic_type: SemanticType::Field,
1287                column_id: 4,
1288            });
1289        builder.build().unwrap()
1290    }
1291
1292    #[test]
1293    fn test_fill_missing_for_delete() {
1294        let rows = Rows {
1295            schema: vec![new_column_schema(
1296                "ts",
1297                ColumnDataType::TimestampMillisecond,
1298                SemanticType::Timestamp,
1299            )],
1300            rows: vec![Row {
1301                values: vec![ts_ms_value(1)],
1302            }],
1303        };
1304        let metadata = region_metadata_two_fields();
1305
1306        let mut request =
1307            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1308        let err = request.check_schema(&metadata).unwrap_err();
1309        check_invalid_request(&err, "delete requests need column k0");
1310        let err = request.fill_missing_columns(&metadata).unwrap_err();
1311        check_invalid_request(&err, "delete requests need column k0");
1312
1313        let rows = Rows {
1314            schema: vec![
1315                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1316                new_column_schema(
1317                    "ts",
1318                    ColumnDataType::TimestampMillisecond,
1319                    SemanticType::Timestamp,
1320                ),
1321            ],
1322            rows: vec![Row {
1323                values: vec![i64_value(100), ts_ms_value(1)],
1324            }],
1325        };
1326        let mut request =
1327            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1328        let err = request.check_schema(&metadata).unwrap_err();
1329        assert!(err.is_fill_default());
1330        request.fill_missing_columns(&metadata).unwrap();
1331
1332        let expect_rows = Rows {
1333            schema: vec![
1334                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1335                new_column_schema(
1336                    "ts",
1337                    ColumnDataType::TimestampMillisecond,
1338                    SemanticType::Timestamp,
1339                ),
1340                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1341            ],
1342            // Column f1 is not nullable and we use 0 for padding.
1343            rows: vec![Row {
1344                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1345            }],
1346        };
1347        assert_eq!(expect_rows, request.rows);
1348    }
1349
1350    #[test]
1351    fn test_fill_missing_without_default_in_delete() {
1352        let mut builder = builder_with_ts_tag();
1353        builder
1354            // f0 is nullable.
1355            .push_column_metadata(ColumnMetadata {
1356                column_schema: datatypes::schema::ColumnSchema::new(
1357                    "f0",
1358                    ConcreteDataType::int64_datatype(),
1359                    true,
1360                ),
1361                semantic_type: SemanticType::Field,
1362                column_id: 3,
1363            })
1364            // f1 is not nullable and don't has default.
1365            .push_column_metadata(ColumnMetadata {
1366                column_schema: datatypes::schema::ColumnSchema::new(
1367                    "f1",
1368                    ConcreteDataType::int64_datatype(),
1369                    false,
1370                ),
1371                semantic_type: SemanticType::Field,
1372                column_id: 4,
1373            });
1374        let metadata = builder.build().unwrap();
1375
1376        let rows = Rows {
1377            schema: vec![
1378                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1379                new_column_schema(
1380                    "ts",
1381                    ColumnDataType::TimestampMillisecond,
1382                    SemanticType::Timestamp,
1383                ),
1384            ],
1385            // Missing f0 (nullable), f1 (not nullable).
1386            rows: vec![Row {
1387                values: vec![i64_value(100), ts_ms_value(1)],
1388            }],
1389        };
1390        let mut request =
1391            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1392        let err = request.check_schema(&metadata).unwrap_err();
1393        assert!(err.is_fill_default());
1394        request.fill_missing_columns(&metadata).unwrap();
1395
1396        let expect_rows = Rows {
1397            schema: vec![
1398                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1399                new_column_schema(
1400                    "ts",
1401                    ColumnDataType::TimestampMillisecond,
1402                    SemanticType::Timestamp,
1403                ),
1404                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1405            ],
1406            // Column f1 is not nullable and we use 0 for padding.
1407            rows: vec![Row {
1408                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1409            }],
1410        };
1411        assert_eq!(expect_rows, request.rows);
1412    }
1413
1414    #[test]
1415    fn test_no_default() {
1416        let rows = Rows {
1417            schema: vec![new_column_schema(
1418                "k0",
1419                ColumnDataType::Int64,
1420                SemanticType::Tag,
1421            )],
1422            rows: vec![Row {
1423                values: vec![i64_value(1)],
1424            }],
1425        };
1426        let metadata = new_region_metadata();
1427
1428        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1429        let err = request.fill_missing_columns(&metadata).unwrap_err();
1430        check_invalid_request(&err, "column ts does not have default value");
1431    }
1432
1433    #[test]
1434    fn test_missing_and_invalid() {
1435        // Missing f0 and f1 has invalid type (string).
1436        let rows = Rows {
1437            schema: vec![
1438                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1439                new_column_schema(
1440                    "ts",
1441                    ColumnDataType::TimestampMillisecond,
1442                    SemanticType::Timestamp,
1443                ),
1444                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1445            ],
1446            rows: vec![Row {
1447                values: vec![
1448                    i64_value(100),
1449                    ts_ms_value(1),
1450                    Value {
1451                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1452                    },
1453                ],
1454            }],
1455        };
1456        let metadata = region_metadata_two_fields();
1457
1458        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1459        let err = request.check_schema(&metadata).unwrap_err();
1460        check_invalid_request(
1461            &err,
1462            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1463        );
1464    }
1465
1466    #[test]
1467    fn test_write_request_metadata() {
1468        let rows = Rows {
1469            schema: vec![
1470                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1471                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1472            ],
1473            rows: vec![Row {
1474                values: vec![i64_value(1), i64_value(2)],
1475            }],
1476        };
1477
1478        let metadata = Arc::new(new_region_metadata());
1479        let request = WriteRequest::new(
1480            RegionId::new(1, 1),
1481            OpType::Put,
1482            rows,
1483            Some(metadata.clone()),
1484        )
1485        .unwrap();
1486
1487        assert!(request.region_metadata.is_some());
1488        assert_eq!(
1489            request.region_metadata.unwrap().region_id,
1490            RegionId::new(1, 1)
1491        );
1492    }
1493}