mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value,
23    ColumnDataTypeWrapper,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{ensure, OptionExt, ResultExt};
33use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
34use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
35use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
36use store_api::region_request::{
37    AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
38    RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
39    RegionOpenRequest, RegionRequest, RegionTruncateRequest,
40};
41use store_api::storage::{RegionId, SequenceNumber};
42use store_api::ManifestVersion;
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46    CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47    FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
48};
49use crate::manifest::action::RegionEdit;
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::MemtableId;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::wal::entry_distributor::WalEntryReceiver;
54use crate::wal::EntryId;
55
56/// Request to write a region.
57#[derive(Debug)]
58pub struct WriteRequest {
59    /// Region to write.
60    pub region_id: RegionId,
61    /// Type of the write request.
62    pub op_type: OpType,
63    /// Rows to write.
64    pub rows: Rows,
65    /// Map column name to column index in `rows`.
66    pub name_to_index: HashMap<String, usize>,
67    /// Whether each column has null.
68    pub has_null: Vec<bool>,
69    /// Write hint.
70    pub hint: Option<WriteHint>,
71    /// Region metadata on the time of this request is created.
72    pub(crate) region_metadata: Option<RegionMetadataRef>,
73}
74
75impl WriteRequest {
76    /// Creates a new request.
77    ///
78    /// Returns `Err` if `rows` are invalid.
79    pub fn new(
80        region_id: RegionId,
81        op_type: OpType,
82        rows: Rows,
83        region_metadata: Option<RegionMetadataRef>,
84    ) -> Result<WriteRequest> {
85        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
86        for (index, column) in rows.schema.iter().enumerate() {
87            ensure!(
88                name_to_index
89                    .insert(column.column_name.clone(), index)
90                    .is_none(),
91                InvalidRequestSnafu {
92                    region_id,
93                    reason: format!("duplicate column {}", column.column_name),
94                }
95            );
96        }
97
98        let mut has_null = vec![false; rows.schema.len()];
99        for row in &rows.rows {
100            ensure!(
101                row.values.len() == rows.schema.len(),
102                InvalidRequestSnafu {
103                    region_id,
104                    reason: format!(
105                        "row has {} columns but schema has {}",
106                        row.values.len(),
107                        rows.schema.len()
108                    ),
109                }
110            );
111
112            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
113                validate_proto_value(region_id, value, column_schema)?;
114
115                if value.value_data.is_none() {
116                    has_null[i] = true;
117                }
118            }
119        }
120
121        Ok(WriteRequest {
122            region_id,
123            op_type,
124            rows,
125            name_to_index,
126            has_null,
127            hint: None,
128            region_metadata,
129        })
130    }
131
132    /// Sets the write hint.
133    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
134        self.hint = hint;
135        self
136    }
137
138    /// Returns the encoding hint.
139    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
140        infer_primary_key_encoding_from_hint(self.hint.as_ref())
141    }
142
143    /// Returns estimated size of the request.
144    pub(crate) fn estimated_size(&self) -> usize {
145        let row_size = self
146            .rows
147            .rows
148            .first()
149            .map(|row| row.encoded_len())
150            .unwrap_or(0);
151        row_size * self.rows.rows.len()
152    }
153
154    /// Gets column index by name.
155    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
156        self.name_to_index.get(name).copied()
157    }
158
159    /// Checks schema of rows is compatible with schema of the region.
160    ///
161    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
162    /// error.
163    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
164        debug_assert_eq!(self.region_id, metadata.region_id);
165
166        let region_id = self.region_id;
167        // Index all columns in rows.
168        let mut rows_columns: HashMap<_, _> = self
169            .rows
170            .schema
171            .iter()
172            .map(|column| (&column.column_name, column))
173            .collect();
174
175        let mut need_fill_default = false;
176        // Checks all columns in this region.
177        for column in &metadata.column_metadatas {
178            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
179                // Check data type.
180                ensure!(
181                    is_column_type_value_eq(
182                        input_col.datatype,
183                        input_col.datatype_extension,
184                        &column.column_schema.data_type
185                    ),
186                    InvalidRequestSnafu {
187                        region_id,
188                        reason: format!(
189                            "column {} expect type {:?}, given: {}({})",
190                            column.column_schema.name,
191                            column.column_schema.data_type,
192                            ColumnDataType::try_from(input_col.datatype)
193                                .map(|v| v.as_str_name())
194                                .unwrap_or("Unknown"),
195                            input_col.datatype,
196                        )
197                    }
198                );
199
200                // Check semantic type.
201                ensure!(
202                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
203                    InvalidRequestSnafu {
204                        region_id,
205                        reason: format!(
206                            "column {} has semantic type {:?}, given: {}({})",
207                            column.column_schema.name,
208                            column.semantic_type,
209                            api::v1::SemanticType::try_from(input_col.semantic_type)
210                                .map(|v| v.as_str_name())
211                                .unwrap_or("Unknown"),
212                            input_col.semantic_type
213                        ),
214                    }
215                );
216
217                // Check nullable.
218                // Safety: `rows_columns` ensures this column exists.
219                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
220                ensure!(
221                    !has_null || column.column_schema.is_nullable(),
222                    InvalidRequestSnafu {
223                        region_id,
224                        reason: format!(
225                            "column {} is not null but input has null",
226                            column.column_schema.name
227                        ),
228                    }
229                );
230            } else {
231                // Rows don't have this column.
232                self.check_missing_column(column)?;
233
234                need_fill_default = true;
235            }
236        }
237
238        // Checks all columns in rows exist in the region.
239        if !rows_columns.is_empty() {
240            let names: Vec<_> = rows_columns.into_keys().collect();
241            return InvalidRequestSnafu {
242                region_id,
243                reason: format!("unknown columns: {:?}", names),
244            }
245            .fail();
246        }
247
248        // If we need to fill default values, return a special error.
249        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
250
251        Ok(())
252    }
253
254    /// Tries to fill missing columns.
255    ///
256    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
257    /// values.
258    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
259        debug_assert_eq!(self.region_id, metadata.region_id);
260
261        let mut columns_to_fill = vec![];
262        for column in &metadata.column_metadatas {
263            if !self.name_to_index.contains_key(&column.column_schema.name) {
264                columns_to_fill.push(column);
265            }
266        }
267        self.fill_columns(columns_to_fill)?;
268
269        Ok(())
270    }
271
272    /// Checks the schema and fill missing columns.
273    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
274        if let Err(e) = self.check_schema(metadata) {
275            if e.is_fill_default() {
276                // TODO(yingwen): Add metrics for this case.
277                // We need to fill default value. The write request may be a request
278                // sent before changing the schema.
279                self.fill_missing_columns(metadata)?;
280            } else {
281                return Err(e);
282            }
283        }
284
285        Ok(())
286    }
287
288    /// Fills default value for specific `columns`.
289    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
290        let mut default_values = Vec::with_capacity(columns.len());
291        let mut columns_to_fill = Vec::with_capacity(columns.len());
292        for column in columns {
293            let default_value = self.column_default_value(column)?;
294            if default_value.value_data.is_some() {
295                default_values.push(default_value);
296                columns_to_fill.push(column);
297            }
298        }
299
300        for row in &mut self.rows.rows {
301            row.values.extend(default_values.iter().cloned());
302        }
303
304        for column in columns_to_fill {
305            let (datatype, datatype_ext) =
306                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
307                    .with_context(|_| ConvertColumnDataTypeSnafu {
308                        reason: format!(
309                            "no protobuf type for column {} ({:?})",
310                            column.column_schema.name, column.column_schema.data_type
311                        ),
312                    })?
313                    .to_parts();
314            self.rows.schema.push(ColumnSchema {
315                column_name: column.column_schema.name.clone(),
316                datatype: datatype as i32,
317                semantic_type: column.semantic_type as i32,
318                datatype_extension: datatype_ext,
319                options: options_from_column_schema(&column.column_schema),
320            });
321        }
322
323        Ok(())
324    }
325
326    /// Checks whether we should allow a row doesn't provide this column.
327    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
328        if self.op_type == OpType::Delete {
329            if column.semantic_type == SemanticType::Field {
330                // For delete request, all tags and timestamp is required. We don't fill default
331                // tag or timestamp while deleting rows.
332                return Ok(());
333            } else {
334                return InvalidRequestSnafu {
335                    region_id: self.region_id,
336                    reason: format!("delete requests need column {}", column.column_schema.name),
337                }
338                .fail();
339            }
340        }
341
342        // Not a delete request. Checks whether they have default value.
343        ensure!(
344            column.column_schema.is_nullable()
345                || column.column_schema.default_constraint().is_some(),
346            InvalidRequestSnafu {
347                region_id: self.region_id,
348                reason: format!("missing column {}", column.column_schema.name),
349            }
350        );
351
352        Ok(())
353    }
354
355    /// Returns the default value for specific column.
356    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
357        let default_value = match self.op_type {
358            OpType::Delete => {
359                ensure!(
360                    column.semantic_type == SemanticType::Field,
361                    InvalidRequestSnafu {
362                        region_id: self.region_id,
363                        reason: format!(
364                            "delete requests need column {}",
365                            column.column_schema.name
366                        ),
367                    }
368                );
369
370                // For delete request, we need a default value for padding so we
371                // can delete a row even a field doesn't have a default value. So the
372                // value doesn't need to following the default value constraint of the
373                // column.
374                if column.column_schema.is_nullable() {
375                    datatypes::value::Value::Null
376                } else {
377                    column.column_schema.data_type.default_value()
378                }
379            }
380            OpType::Put => {
381                // For put requests, we use the default value from column schema.
382                if column.column_schema.is_default_impure() {
383                    UnexpectedSnafu {
384                        reason: format!(
385                            "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}", 
386                            self.region_id,
387                            column.column_schema.name,
388                            column.column_schema.default_constraint(),
389                        ),
390                    }
391                    .fail()?
392                }
393                column
394                    .column_schema
395                    .create_default()
396                    .context(CreateDefaultSnafu {
397                        region_id: self.region_id,
398                        column: &column.column_schema.name,
399                    })?
400                    // This column doesn't have default value.
401                    .with_context(|| InvalidRequestSnafu {
402                        region_id: self.region_id,
403                        reason: format!(
404                            "column {} does not have default value",
405                            column.column_schema.name
406                        ),
407                    })?
408            }
409        };
410
411        // Convert default value into proto's value.
412        to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
413            region_id: self.region_id,
414            reason: format!(
415                "no protobuf type for default value of column {} ({:?})",
416                column.column_schema.name, column.column_schema.data_type
417            ),
418        })
419    }
420}
421
422/// Validate proto value schema.
423pub(crate) fn validate_proto_value(
424    region_id: RegionId,
425    value: &Value,
426    column_schema: &ColumnSchema,
427) -> Result<()> {
428    if let Some(value_type) = proto_value_type(value) {
429        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
430            InvalidRequestSnafu {
431                region_id,
432                reason: format!(
433                    "column {} has unknown type {}",
434                    column_schema.column_name, column_schema.datatype
435                ),
436            }
437            .build()
438        })?;
439        ensure!(
440            proto_value_type_match(column_type, value_type),
441            InvalidRequestSnafu {
442                region_id,
443                reason: format!(
444                    "value has type {:?}, but column {} has type {:?}({})",
445                    value_type, column_schema.column_name, column_type, column_schema.datatype,
446                ),
447            }
448        );
449    }
450
451    Ok(())
452}
453
454fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
455    match (column_type, value_type) {
456        (ct, vt) if ct == vt => true,
457        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
458        (ColumnDataType::Json, ColumnDataType::Binary) => true,
459        _ => false,
460    }
461}
462
463/// Oneshot output result sender.
464#[derive(Debug)]
465pub struct OutputTx(Sender<Result<AffectedRows>>);
466
467impl OutputTx {
468    /// Creates a new output sender.
469    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
470        OutputTx(sender)
471    }
472
473    /// Sends the `result`.
474    pub(crate) fn send(self, result: Result<AffectedRows>) {
475        // Ignores send result.
476        let _ = self.0.send(result);
477    }
478}
479
480/// Optional output result sender.
481#[derive(Debug)]
482pub(crate) struct OptionOutputTx(Option<OutputTx>);
483
484impl OptionOutputTx {
485    /// Creates a sender.
486    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
487        OptionOutputTx(sender)
488    }
489
490    /// Creates an empty sender.
491    pub(crate) fn none() -> OptionOutputTx {
492        OptionOutputTx(None)
493    }
494
495    /// Sends the `result` and consumes the inner sender.
496    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
497        if let Some(sender) = self.0.take() {
498            sender.send(result);
499        }
500    }
501
502    /// Sends the `result` and consumes the sender.
503    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
504        if let Some(sender) = self.0.take() {
505            sender.send(result);
506        }
507    }
508
509    /// Takes the inner sender.
510    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
511        self.0.take()
512    }
513}
514
515impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
516    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
517        Self::new(Some(OutputTx::new(sender)))
518    }
519}
520
521impl OnFailure for OptionOutputTx {
522    fn on_failure(&mut self, err: Error) {
523        self.send_mut(Err(err));
524    }
525}
526
527/// Callback on failure.
528pub(crate) trait OnFailure {
529    /// Handles `err` on failure.
530    fn on_failure(&mut self, err: Error);
531}
532
533/// Sender and write request.
534#[derive(Debug)]
535pub(crate) struct SenderWriteRequest {
536    /// Result sender.
537    pub(crate) sender: OptionOutputTx,
538    pub(crate) request: WriteRequest,
539}
540
541pub(crate) struct SenderBulkRequest {
542    pub(crate) sender: OptionOutputTx,
543    pub(crate) region_id: RegionId,
544    pub(crate) request: BulkPart,
545    pub(crate) region_metadata: RegionMetadataRef,
546}
547
548/// Request sent to a worker
549#[derive(Debug)]
550pub(crate) enum WorkerRequest {
551    /// Write to a region.
552    Write(SenderWriteRequest),
553
554    /// Ddl request to a region.
555    Ddl(SenderDdlRequest),
556
557    /// Notifications from internal background jobs.
558    Background {
559        /// Id of the region to send.
560        region_id: RegionId,
561        /// Internal notification.
562        notify: BackgroundNotify,
563    },
564
565    /// The internal commands.
566    SetRegionRoleStateGracefully {
567        /// Id of the region to send.
568        region_id: RegionId,
569        /// The [SettableRegionRoleState].
570        region_role_state: SettableRegionRoleState,
571        /// The sender of [SetReadonlyResponse].
572        sender: Sender<SetRegionRoleStateResponse>,
573    },
574
575    /// Notify a worker to stop.
576    Stop,
577
578    /// Use [RegionEdit] to edit a region directly.
579    EditRegion(RegionEditRequest),
580
581    /// Keep the manifest of a region up to date.
582    SyncRegion(RegionSyncRequest),
583
584    /// Bulk inserts request and region metadata.
585    BulkInserts {
586        metadata: Option<RegionMetadataRef>,
587        request: RegionBulkInsertsRequest,
588        sender: OptionOutputTx,
589    },
590}
591
592impl WorkerRequest {
593    pub(crate) fn new_open_region_request(
594        region_id: RegionId,
595        request: RegionOpenRequest,
596        entry_receiver: Option<WalEntryReceiver>,
597    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
598        let (sender, receiver) = oneshot::channel();
599
600        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
601            region_id,
602            sender: sender.into(),
603            request: DdlRequest::Open((request, entry_receiver)),
604        });
605
606        (worker_request, receiver)
607    }
608
609    /// Converts request from a [RegionRequest].
610    pub(crate) fn try_from_region_request(
611        region_id: RegionId,
612        value: RegionRequest,
613        region_metadata: Option<RegionMetadataRef>,
614    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
615        let (sender, receiver) = oneshot::channel();
616        let worker_request = match value {
617            RegionRequest::Put(v) => {
618                let mut write_request =
619                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
620                        .with_hint(v.hint);
621                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
622                    && let Some(region_metadata) = &region_metadata
623                {
624                    write_request.maybe_fill_missing_columns(region_metadata)?;
625                }
626                WorkerRequest::Write(SenderWriteRequest {
627                    sender: sender.into(),
628                    request: write_request,
629                })
630            }
631            RegionRequest::Delete(v) => {
632                let mut write_request =
633                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
634                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
635                    && let Some(region_metadata) = &region_metadata
636                {
637                    write_request.maybe_fill_missing_columns(region_metadata)?;
638                }
639                WorkerRequest::Write(SenderWriteRequest {
640                    sender: sender.into(),
641                    request: write_request,
642                })
643            }
644            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
645                region_id,
646                sender: sender.into(),
647                request: DdlRequest::Create(v),
648            }),
649            RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
650                region_id,
651                sender: sender.into(),
652                request: DdlRequest::Drop,
653            }),
654            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
655                region_id,
656                sender: sender.into(),
657                request: DdlRequest::Open((v, None)),
658            }),
659            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
660                region_id,
661                sender: sender.into(),
662                request: DdlRequest::Close(v),
663            }),
664            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
665                region_id,
666                sender: sender.into(),
667                request: DdlRequest::Alter(v),
668            }),
669            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
670                region_id,
671                sender: sender.into(),
672                request: DdlRequest::Flush(v),
673            }),
674            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
675                region_id,
676                sender: sender.into(),
677                request: DdlRequest::Compact(v),
678            }),
679            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
680                region_id,
681                sender: sender.into(),
682                request: DdlRequest::Truncate(v),
683            }),
684            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
685                region_id,
686                sender: sender.into(),
687                request: DdlRequest::Catchup(v),
688            }),
689            RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
690                metadata: region_metadata,
691                sender: sender.into(),
692                request: region_bulk_inserts_request,
693            },
694        };
695
696        Ok((worker_request, receiver))
697    }
698
699    pub(crate) fn new_set_readonly_gracefully(
700        region_id: RegionId,
701        region_role_state: SettableRegionRoleState,
702    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
703        let (sender, receiver) = oneshot::channel();
704
705        (
706            WorkerRequest::SetRegionRoleStateGracefully {
707                region_id,
708                region_role_state,
709                sender,
710            },
711            receiver,
712        )
713    }
714
715    pub(crate) fn new_sync_region_request(
716        region_id: RegionId,
717        manifest_version: ManifestVersion,
718    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
719        let (sender, receiver) = oneshot::channel();
720        (
721            WorkerRequest::SyncRegion(RegionSyncRequest {
722                region_id,
723                manifest_version,
724                sender,
725            }),
726            receiver,
727        )
728    }
729}
730
731/// DDL request to a region.
732#[derive(Debug)]
733pub(crate) enum DdlRequest {
734    Create(RegionCreateRequest),
735    Drop,
736    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
737    Close(RegionCloseRequest),
738    Alter(RegionAlterRequest),
739    Flush(RegionFlushRequest),
740    Compact(RegionCompactRequest),
741    Truncate(RegionTruncateRequest),
742    Catchup(RegionCatchupRequest),
743}
744
745/// Sender and Ddl request.
746#[derive(Debug)]
747pub(crate) struct SenderDdlRequest {
748    /// Region id of the request.
749    pub(crate) region_id: RegionId,
750    /// Result sender.
751    pub(crate) sender: OptionOutputTx,
752    /// Ddl request.
753    pub(crate) request: DdlRequest,
754}
755
756/// Notification from a background job.
757#[derive(Debug)]
758pub(crate) enum BackgroundNotify {
759    /// Flush has finished.
760    FlushFinished(FlushFinished),
761    /// Flush has failed.
762    FlushFailed(FlushFailed),
763    /// Compaction has finished.
764    CompactionFinished(CompactionFinished),
765    /// Compaction has failed.
766    CompactionFailed(CompactionFailed),
767    /// Truncate result.
768    Truncate(TruncateResult),
769    /// Region change result.
770    RegionChange(RegionChangeResult),
771    /// Region edit result.
772    RegionEdit(RegionEditResult),
773}
774
775/// Notifies a flush job is finished.
776#[derive(Debug)]
777pub(crate) struct FlushFinished {
778    /// Region id.
779    pub(crate) region_id: RegionId,
780    /// Entry id of flushed data.
781    pub(crate) flushed_entry_id: EntryId,
782    /// Flush result senders.
783    pub(crate) senders: Vec<OutputTx>,
784    /// Flush timer.
785    pub(crate) _timer: HistogramTimer,
786    /// Region edit to apply.
787    pub(crate) edit: RegionEdit,
788    /// Memtables to remove.
789    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
790}
791
792impl FlushFinished {
793    /// Marks the flush job as successful and observes the timer.
794    pub(crate) fn on_success(self) {
795        for sender in self.senders {
796            sender.send(Ok(0));
797        }
798    }
799}
800
801impl OnFailure for FlushFinished {
802    fn on_failure(&mut self, err: Error) {
803        let err = Arc::new(err);
804        for sender in self.senders.drain(..) {
805            sender.send(Err(err.clone()).context(FlushRegionSnafu {
806                region_id: self.region_id,
807            }));
808        }
809    }
810}
811
812/// Notifies a flush job is failed.
813#[derive(Debug)]
814pub(crate) struct FlushFailed {
815    /// The error source of the failure.
816    pub(crate) err: Arc<Error>,
817}
818
819/// Notifies a compaction job has finished.
820#[derive(Debug)]
821pub(crate) struct CompactionFinished {
822    /// Region id.
823    pub(crate) region_id: RegionId,
824    /// Compaction result senders.
825    pub(crate) senders: Vec<OutputTx>,
826    /// Start time of compaction task.
827    pub(crate) start_time: Instant,
828    /// Region edit to apply.
829    pub(crate) edit: RegionEdit,
830}
831
832impl CompactionFinished {
833    pub fn on_success(self) {
834        // only update compaction time on success
835        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
836
837        for sender in self.senders {
838            sender.send(Ok(0));
839        }
840        info!("Successfully compacted region: {}", self.region_id);
841    }
842}
843
844impl OnFailure for CompactionFinished {
845    /// Compaction succeeded but failed to update manifest or region's already been dropped.
846    fn on_failure(&mut self, err: Error) {
847        let err = Arc::new(err);
848        for sender in self.senders.drain(..) {
849            sender.send(Err(err.clone()).context(CompactRegionSnafu {
850                region_id: self.region_id,
851            }));
852        }
853    }
854}
855
856/// A failing compaction result.
857#[derive(Debug)]
858pub(crate) struct CompactionFailed {
859    pub(crate) region_id: RegionId,
860    /// The error source of the failure.
861    pub(crate) err: Arc<Error>,
862}
863
864/// Notifies the truncate result of a region.
865#[derive(Debug)]
866pub(crate) struct TruncateResult {
867    /// Region id.
868    pub(crate) region_id: RegionId,
869    /// Result sender.
870    pub(crate) sender: OptionOutputTx,
871    /// Truncate result.
872    pub(crate) result: Result<()>,
873    /// Truncated entry id.
874    pub(crate) truncated_entry_id: EntryId,
875    /// Truncated sequence.
876    pub(crate) truncated_sequence: SequenceNumber,
877}
878
879/// Notifies the region the result of writing region change action.
880#[derive(Debug)]
881pub(crate) struct RegionChangeResult {
882    /// Region id.
883    pub(crate) region_id: RegionId,
884    /// The new region metadata to apply.
885    pub(crate) new_meta: RegionMetadataRef,
886    /// Result sender.
887    pub(crate) sender: OptionOutputTx,
888    /// Result from the manifest manager.
889    pub(crate) result: Result<()>,
890}
891
892/// Request to edit a region directly.
893#[derive(Debug)]
894pub(crate) struct RegionEditRequest {
895    pub(crate) region_id: RegionId,
896    pub(crate) edit: RegionEdit,
897    /// The sender to notify the result to the region engine.
898    pub(crate) tx: Sender<Result<()>>,
899}
900
901/// Notifies the regin the result of editing region.
902#[derive(Debug)]
903pub(crate) struct RegionEditResult {
904    /// Region id.
905    pub(crate) region_id: RegionId,
906    /// Result sender.
907    pub(crate) sender: Sender<Result<()>>,
908    /// Region edit to apply.
909    pub(crate) edit: RegionEdit,
910    /// Result from the manifest manager.
911    pub(crate) result: Result<()>,
912}
913
914#[derive(Debug)]
915pub(crate) struct RegionSyncRequest {
916    pub(crate) region_id: RegionId,
917    pub(crate) manifest_version: ManifestVersion,
918    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
919    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
920}
921
922#[cfg(test)]
923mod tests {
924    use api::v1::value::ValueData;
925    use api::v1::{Row, SemanticType};
926    use datatypes::prelude::ConcreteDataType;
927    use datatypes::schema::ColumnDefaultConstraint;
928    use mito_codec::test_util::i64_value;
929    use store_api::metadata::RegionMetadataBuilder;
930
931    use super::*;
932    use crate::error::Error;
933    use crate::test_util::ts_ms_value;
934
935    fn new_column_schema(
936        name: &str,
937        data_type: ColumnDataType,
938        semantic_type: SemanticType,
939    ) -> ColumnSchema {
940        ColumnSchema {
941            column_name: name.to_string(),
942            datatype: data_type as i32,
943            semantic_type: semantic_type as i32,
944            ..Default::default()
945        }
946    }
947
948    fn check_invalid_request(err: &Error, expect: &str) {
949        if let Error::InvalidRequest {
950            region_id: _,
951            reason,
952            location: _,
953        } = err
954        {
955            assert_eq!(reason, expect);
956        } else {
957            panic!("Unexpected error {err}")
958        }
959    }
960
961    #[test]
962    fn test_write_request_duplicate_column() {
963        let rows = Rows {
964            schema: vec![
965                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
966                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
967            ],
968            rows: vec![],
969        };
970
971        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
972        check_invalid_request(&err, "duplicate column c0");
973    }
974
975    #[test]
976    fn test_valid_write_request() {
977        let rows = Rows {
978            schema: vec![
979                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
980                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
981            ],
982            rows: vec![Row {
983                values: vec![i64_value(1), i64_value(2)],
984            }],
985        };
986
987        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
988        assert_eq!(0, request.column_index_by_name("c0").unwrap());
989        assert_eq!(1, request.column_index_by_name("c1").unwrap());
990        assert_eq!(None, request.column_index_by_name("c2"));
991    }
992
993    #[test]
994    fn test_write_request_column_num() {
995        let rows = Rows {
996            schema: vec![
997                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
998                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
999            ],
1000            rows: vec![Row {
1001                values: vec![i64_value(1), i64_value(2), i64_value(3)],
1002            }],
1003        };
1004
1005        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1006        check_invalid_request(&err, "row has 3 columns but schema has 2");
1007    }
1008
1009    fn new_region_metadata() -> RegionMetadata {
1010        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1011        builder
1012            .push_column_metadata(ColumnMetadata {
1013                column_schema: datatypes::schema::ColumnSchema::new(
1014                    "ts",
1015                    ConcreteDataType::timestamp_millisecond_datatype(),
1016                    false,
1017                ),
1018                semantic_type: SemanticType::Timestamp,
1019                column_id: 1,
1020            })
1021            .push_column_metadata(ColumnMetadata {
1022                column_schema: datatypes::schema::ColumnSchema::new(
1023                    "k0",
1024                    ConcreteDataType::int64_datatype(),
1025                    true,
1026                ),
1027                semantic_type: SemanticType::Tag,
1028                column_id: 2,
1029            })
1030            .primary_key(vec![2]);
1031        builder.build().unwrap()
1032    }
1033
1034    #[test]
1035    fn test_check_schema() {
1036        let rows = Rows {
1037            schema: vec![
1038                new_column_schema(
1039                    "ts",
1040                    ColumnDataType::TimestampMillisecond,
1041                    SemanticType::Timestamp,
1042                ),
1043                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1044            ],
1045            rows: vec![Row {
1046                values: vec![ts_ms_value(1), i64_value(2)],
1047            }],
1048        };
1049        let metadata = new_region_metadata();
1050
1051        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1052        request.check_schema(&metadata).unwrap();
1053    }
1054
1055    #[test]
1056    fn test_column_type() {
1057        let rows = Rows {
1058            schema: vec![
1059                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1060                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1061            ],
1062            rows: vec![Row {
1063                values: vec![i64_value(1), i64_value(2)],
1064            }],
1065        };
1066        let metadata = new_region_metadata();
1067
1068        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1069        let err = request.check_schema(&metadata).unwrap_err();
1070        check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)");
1071    }
1072
1073    #[test]
1074    fn test_semantic_type() {
1075        let rows = Rows {
1076            schema: vec![
1077                new_column_schema(
1078                    "ts",
1079                    ColumnDataType::TimestampMillisecond,
1080                    SemanticType::Tag,
1081                ),
1082                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1083            ],
1084            rows: vec![Row {
1085                values: vec![ts_ms_value(1), i64_value(2)],
1086            }],
1087        };
1088        let metadata = new_region_metadata();
1089
1090        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1091        let err = request.check_schema(&metadata).unwrap_err();
1092        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1093    }
1094
1095    #[test]
1096    fn test_column_nullable() {
1097        let rows = Rows {
1098            schema: vec![
1099                new_column_schema(
1100                    "ts",
1101                    ColumnDataType::TimestampMillisecond,
1102                    SemanticType::Timestamp,
1103                ),
1104                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1105            ],
1106            rows: vec![Row {
1107                values: vec![Value { value_data: None }, i64_value(2)],
1108            }],
1109        };
1110        let metadata = new_region_metadata();
1111
1112        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1113        let err = request.check_schema(&metadata).unwrap_err();
1114        check_invalid_request(&err, "column ts is not null but input has null");
1115    }
1116
1117    #[test]
1118    fn test_column_default() {
1119        let rows = Rows {
1120            schema: vec![new_column_schema(
1121                "k0",
1122                ColumnDataType::Int64,
1123                SemanticType::Tag,
1124            )],
1125            rows: vec![Row {
1126                values: vec![i64_value(1)],
1127            }],
1128        };
1129        let metadata = new_region_metadata();
1130
1131        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1132        let err = request.check_schema(&metadata).unwrap_err();
1133        check_invalid_request(&err, "missing column ts");
1134    }
1135
1136    #[test]
1137    fn test_unknown_column() {
1138        let rows = Rows {
1139            schema: vec![
1140                new_column_schema(
1141                    "ts",
1142                    ColumnDataType::TimestampMillisecond,
1143                    SemanticType::Timestamp,
1144                ),
1145                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1146                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1147            ],
1148            rows: vec![Row {
1149                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1150            }],
1151        };
1152        let metadata = new_region_metadata();
1153
1154        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1155        let err = request.check_schema(&metadata).unwrap_err();
1156        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1157    }
1158
1159    #[test]
1160    fn test_fill_impure_columns_err() {
1161        let rows = Rows {
1162            schema: vec![new_column_schema(
1163                "k0",
1164                ColumnDataType::Int64,
1165                SemanticType::Tag,
1166            )],
1167            rows: vec![Row {
1168                values: vec![i64_value(1)],
1169            }],
1170        };
1171        let metadata = {
1172            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1173            builder
1174                .push_column_metadata(ColumnMetadata {
1175                    column_schema: datatypes::schema::ColumnSchema::new(
1176                        "ts",
1177                        ConcreteDataType::timestamp_millisecond_datatype(),
1178                        false,
1179                    )
1180                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1181                        "now()".to_string(),
1182                    )))
1183                    .unwrap(),
1184                    semantic_type: SemanticType::Timestamp,
1185                    column_id: 1,
1186                })
1187                .push_column_metadata(ColumnMetadata {
1188                    column_schema: datatypes::schema::ColumnSchema::new(
1189                        "k0",
1190                        ConcreteDataType::int64_datatype(),
1191                        true,
1192                    ),
1193                    semantic_type: SemanticType::Tag,
1194                    column_id: 2,
1195                })
1196                .primary_key(vec![2]);
1197            builder.build().unwrap()
1198        };
1199
1200        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1201        let err = request.check_schema(&metadata).unwrap_err();
1202        assert!(err.is_fill_default());
1203        assert!(request
1204            .fill_missing_columns(&metadata)
1205            .unwrap_err()
1206            .to_string()
1207            .contains("unexpected impure default value with region_id"));
1208    }
1209
1210    #[test]
1211    fn test_fill_missing_columns() {
1212        let rows = Rows {
1213            schema: vec![new_column_schema(
1214                "ts",
1215                ColumnDataType::TimestampMillisecond,
1216                SemanticType::Timestamp,
1217            )],
1218            rows: vec![Row {
1219                values: vec![ts_ms_value(1)],
1220            }],
1221        };
1222        let metadata = new_region_metadata();
1223
1224        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1225        let err = request.check_schema(&metadata).unwrap_err();
1226        assert!(err.is_fill_default());
1227        request.fill_missing_columns(&metadata).unwrap();
1228
1229        let expect_rows = Rows {
1230            schema: vec![new_column_schema(
1231                "ts",
1232                ColumnDataType::TimestampMillisecond,
1233                SemanticType::Timestamp,
1234            )],
1235            rows: vec![Row {
1236                values: vec![ts_ms_value(1)],
1237            }],
1238        };
1239        assert_eq!(expect_rows, request.rows);
1240    }
1241
1242    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1243        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1244        builder
1245            .push_column_metadata(ColumnMetadata {
1246                column_schema: datatypes::schema::ColumnSchema::new(
1247                    "ts",
1248                    ConcreteDataType::timestamp_millisecond_datatype(),
1249                    false,
1250                ),
1251                semantic_type: SemanticType::Timestamp,
1252                column_id: 1,
1253            })
1254            .push_column_metadata(ColumnMetadata {
1255                column_schema: datatypes::schema::ColumnSchema::new(
1256                    "k0",
1257                    ConcreteDataType::int64_datatype(),
1258                    true,
1259                ),
1260                semantic_type: SemanticType::Tag,
1261                column_id: 2,
1262            })
1263            .primary_key(vec![2]);
1264        builder
1265    }
1266
1267    fn region_metadata_two_fields() -> RegionMetadata {
1268        let mut builder = builder_with_ts_tag();
1269        builder
1270            .push_column_metadata(ColumnMetadata {
1271                column_schema: datatypes::schema::ColumnSchema::new(
1272                    "f0",
1273                    ConcreteDataType::int64_datatype(),
1274                    true,
1275                ),
1276                semantic_type: SemanticType::Field,
1277                column_id: 3,
1278            })
1279            // Column is not nullable.
1280            .push_column_metadata(ColumnMetadata {
1281                column_schema: datatypes::schema::ColumnSchema::new(
1282                    "f1",
1283                    ConcreteDataType::int64_datatype(),
1284                    false,
1285                )
1286                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1287                    datatypes::value::Value::Int64(100),
1288                )))
1289                .unwrap(),
1290                semantic_type: SemanticType::Field,
1291                column_id: 4,
1292            });
1293        builder.build().unwrap()
1294    }
1295
1296    #[test]
1297    fn test_fill_missing_for_delete() {
1298        let rows = Rows {
1299            schema: vec![new_column_schema(
1300                "ts",
1301                ColumnDataType::TimestampMillisecond,
1302                SemanticType::Timestamp,
1303            )],
1304            rows: vec![Row {
1305                values: vec![ts_ms_value(1)],
1306            }],
1307        };
1308        let metadata = region_metadata_two_fields();
1309
1310        let mut request =
1311            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1312        let err = request.check_schema(&metadata).unwrap_err();
1313        check_invalid_request(&err, "delete requests need column k0");
1314        let err = request.fill_missing_columns(&metadata).unwrap_err();
1315        check_invalid_request(&err, "delete requests need column k0");
1316
1317        let rows = Rows {
1318            schema: vec![
1319                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1320                new_column_schema(
1321                    "ts",
1322                    ColumnDataType::TimestampMillisecond,
1323                    SemanticType::Timestamp,
1324                ),
1325            ],
1326            rows: vec![Row {
1327                values: vec![i64_value(100), ts_ms_value(1)],
1328            }],
1329        };
1330        let mut request =
1331            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1332        let err = request.check_schema(&metadata).unwrap_err();
1333        assert!(err.is_fill_default());
1334        request.fill_missing_columns(&metadata).unwrap();
1335
1336        let expect_rows = Rows {
1337            schema: vec![
1338                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1339                new_column_schema(
1340                    "ts",
1341                    ColumnDataType::TimestampMillisecond,
1342                    SemanticType::Timestamp,
1343                ),
1344                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1345            ],
1346            // Column f1 is not nullable and we use 0 for padding.
1347            rows: vec![Row {
1348                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1349            }],
1350        };
1351        assert_eq!(expect_rows, request.rows);
1352    }
1353
1354    #[test]
1355    fn test_fill_missing_without_default_in_delete() {
1356        let mut builder = builder_with_ts_tag();
1357        builder
1358            // f0 is nullable.
1359            .push_column_metadata(ColumnMetadata {
1360                column_schema: datatypes::schema::ColumnSchema::new(
1361                    "f0",
1362                    ConcreteDataType::int64_datatype(),
1363                    true,
1364                ),
1365                semantic_type: SemanticType::Field,
1366                column_id: 3,
1367            })
1368            // f1 is not nullable and don't has default.
1369            .push_column_metadata(ColumnMetadata {
1370                column_schema: datatypes::schema::ColumnSchema::new(
1371                    "f1",
1372                    ConcreteDataType::int64_datatype(),
1373                    false,
1374                ),
1375                semantic_type: SemanticType::Field,
1376                column_id: 4,
1377            });
1378        let metadata = builder.build().unwrap();
1379
1380        let rows = Rows {
1381            schema: vec![
1382                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1383                new_column_schema(
1384                    "ts",
1385                    ColumnDataType::TimestampMillisecond,
1386                    SemanticType::Timestamp,
1387                ),
1388            ],
1389            // Missing f0 (nullable), f1 (not nullable).
1390            rows: vec![Row {
1391                values: vec![i64_value(100), ts_ms_value(1)],
1392            }],
1393        };
1394        let mut request =
1395            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1396        let err = request.check_schema(&metadata).unwrap_err();
1397        assert!(err.is_fill_default());
1398        request.fill_missing_columns(&metadata).unwrap();
1399
1400        let expect_rows = Rows {
1401            schema: vec![
1402                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1403                new_column_schema(
1404                    "ts",
1405                    ColumnDataType::TimestampMillisecond,
1406                    SemanticType::Timestamp,
1407                ),
1408                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1409            ],
1410            // Column f1 is not nullable and we use 0 for padding.
1411            rows: vec![Row {
1412                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1413            }],
1414        };
1415        assert_eq!(expect_rows, request.rows);
1416    }
1417
1418    #[test]
1419    fn test_no_default() {
1420        let rows = Rows {
1421            schema: vec![new_column_schema(
1422                "k0",
1423                ColumnDataType::Int64,
1424                SemanticType::Tag,
1425            )],
1426            rows: vec![Row {
1427                values: vec![i64_value(1)],
1428            }],
1429        };
1430        let metadata = new_region_metadata();
1431
1432        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1433        let err = request.fill_missing_columns(&metadata).unwrap_err();
1434        check_invalid_request(&err, "column ts does not have default value");
1435    }
1436
1437    #[test]
1438    fn test_missing_and_invalid() {
1439        // Missing f0 and f1 has invalid type (string).
1440        let rows = Rows {
1441            schema: vec![
1442                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1443                new_column_schema(
1444                    "ts",
1445                    ColumnDataType::TimestampMillisecond,
1446                    SemanticType::Timestamp,
1447                ),
1448                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1449            ],
1450            rows: vec![Row {
1451                values: vec![
1452                    i64_value(100),
1453                    ts_ms_value(1),
1454                    Value {
1455                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1456                    },
1457                ],
1458            }],
1459        };
1460        let metadata = region_metadata_two_fields();
1461
1462        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1463        let err = request.check_schema(&metadata).unwrap_err();
1464        check_invalid_request(
1465            &err,
1466            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1467        );
1468    }
1469
1470    #[test]
1471    fn test_write_request_metadata() {
1472        let rows = Rows {
1473            schema: vec![
1474                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1475                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1476            ],
1477            rows: vec![Row {
1478                values: vec![i64_value(1), i64_value(2)],
1479            }],
1480        };
1481
1482        let metadata = Arc::new(new_region_metadata());
1483        let request = WriteRequest::new(
1484            RegionId::new(1, 1),
1485            OpType::Put,
1486            rows,
1487            Some(metadata.clone()),
1488        )
1489        .unwrap();
1490
1491        assert!(request.region_metadata.is_some());
1492        assert_eq!(
1493            request.region_metadata.unwrap().region_id,
1494            RegionId::new(1, 1)
1495        );
1496    }
1497}