1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value,
23 ColumnDataTypeWrapper,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{ensure, OptionExt, ResultExt};
33use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
34use store_api::manifest::ManifestVersion;
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
37use store_api::region_request::{
38 AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
39 RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
40 RegionOpenRequest, RegionRequest, RegionTruncateRequest,
41};
42use store_api::storage::{RegionId, SequenceNumber};
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46 CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47 FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedImpureDefaultSnafu,
48};
49use crate::manifest::action::RegionEdit;
50use crate::memtable::MemtableId;
51use crate::metrics::COMPACTION_ELAPSED_TOTAL;
52use crate::wal::entry_distributor::WalEntryReceiver;
53use crate::wal::EntryId;
54
55#[derive(Debug)]
57pub struct WriteRequest {
58 pub region_id: RegionId,
60 pub op_type: OpType,
62 pub rows: Rows,
64 name_to_index: HashMap<String, usize>,
66 has_null: Vec<bool>,
68 pub hint: Option<WriteHint>,
70 pub(crate) region_metadata: Option<RegionMetadataRef>,
72}
73
74impl WriteRequest {
75 pub fn new(
79 region_id: RegionId,
80 op_type: OpType,
81 rows: Rows,
82 region_metadata: Option<RegionMetadataRef>,
83 ) -> Result<WriteRequest> {
84 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
85 for (index, column) in rows.schema.iter().enumerate() {
86 ensure!(
87 name_to_index
88 .insert(column.column_name.clone(), index)
89 .is_none(),
90 InvalidRequestSnafu {
91 region_id,
92 reason: format!("duplicate column {}", column.column_name),
93 }
94 );
95 }
96
97 let mut has_null = vec![false; rows.schema.len()];
98 for row in &rows.rows {
99 ensure!(
100 row.values.len() == rows.schema.len(),
101 InvalidRequestSnafu {
102 region_id,
103 reason: format!(
104 "row has {} columns but schema has {}",
105 row.values.len(),
106 rows.schema.len()
107 ),
108 }
109 );
110
111 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
112 validate_proto_value(region_id, value, column_schema)?;
113
114 if value.value_data.is_none() {
115 has_null[i] = true;
116 }
117 }
118 }
119
120 Ok(WriteRequest {
121 region_id,
122 op_type,
123 rows,
124 name_to_index,
125 has_null,
126 hint: None,
127 region_metadata,
128 })
129 }
130
131 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
133 self.hint = hint;
134 self
135 }
136
137 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
139 infer_primary_key_encoding_from_hint(self.hint.as_ref())
140 }
141
142 pub(crate) fn estimated_size(&self) -> usize {
144 let row_size = self
145 .rows
146 .rows
147 .first()
148 .map(|row| row.encoded_len())
149 .unwrap_or(0);
150 row_size * self.rows.rows.len()
151 }
152
153 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
155 self.name_to_index.get(name).copied()
156 }
157
158 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
163 debug_assert_eq!(self.region_id, metadata.region_id);
164
165 let region_id = self.region_id;
166 let mut rows_columns: HashMap<_, _> = self
168 .rows
169 .schema
170 .iter()
171 .map(|column| (&column.column_name, column))
172 .collect();
173
174 let mut need_fill_default = false;
175 for column in &metadata.column_metadatas {
177 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
178 ensure!(
180 is_column_type_value_eq(
181 input_col.datatype,
182 input_col.datatype_extension,
183 &column.column_schema.data_type
184 ),
185 InvalidRequestSnafu {
186 region_id,
187 reason: format!(
188 "column {} expect type {:?}, given: {}({})",
189 column.column_schema.name,
190 column.column_schema.data_type,
191 ColumnDataType::try_from(input_col.datatype)
192 .map(|v| v.as_str_name())
193 .unwrap_or("Unknown"),
194 input_col.datatype,
195 )
196 }
197 );
198
199 ensure!(
201 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
202 InvalidRequestSnafu {
203 region_id,
204 reason: format!(
205 "column {} has semantic type {:?}, given: {}({})",
206 column.column_schema.name,
207 column.semantic_type,
208 api::v1::SemanticType::try_from(input_col.semantic_type)
209 .map(|v| v.as_str_name())
210 .unwrap_or("Unknown"),
211 input_col.semantic_type
212 ),
213 }
214 );
215
216 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
219 ensure!(
220 !has_null || column.column_schema.is_nullable(),
221 InvalidRequestSnafu {
222 region_id,
223 reason: format!(
224 "column {} is not null but input has null",
225 column.column_schema.name
226 ),
227 }
228 );
229 } else {
230 self.check_missing_column(column)?;
232
233 need_fill_default = true;
234 }
235 }
236
237 if !rows_columns.is_empty() {
239 let names: Vec<_> = rows_columns.into_keys().collect();
240 return InvalidRequestSnafu {
241 region_id,
242 reason: format!("unknown columns: {:?}", names),
243 }
244 .fail();
245 }
246
247 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
249
250 Ok(())
251 }
252
253 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
258 debug_assert_eq!(self.region_id, metadata.region_id);
259
260 let mut columns_to_fill = vec![];
261 for column in &metadata.column_metadatas {
262 if !self.name_to_index.contains_key(&column.column_schema.name) {
263 columns_to_fill.push(column);
264 }
265 }
266 self.fill_columns(columns_to_fill)?;
267
268 Ok(())
269 }
270
271 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
273 if let Err(e) = self.check_schema(metadata) {
274 if e.is_fill_default() {
275 self.fill_missing_columns(metadata)?;
279 } else {
280 return Err(e);
281 }
282 }
283
284 Ok(())
285 }
286
287 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
289 let mut default_values = Vec::with_capacity(columns.len());
290 let mut columns_to_fill = Vec::with_capacity(columns.len());
291 for column in columns {
292 let default_value = self.column_default_value(column)?;
293 if default_value.value_data.is_some() {
294 default_values.push(default_value);
295 columns_to_fill.push(column);
296 }
297 }
298
299 for row in &mut self.rows.rows {
300 row.values.extend(default_values.iter().cloned());
301 }
302
303 for column in columns_to_fill {
304 let (datatype, datatype_ext) =
305 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
306 .with_context(|_| ConvertColumnDataTypeSnafu {
307 reason: format!(
308 "no protobuf type for column {} ({:?})",
309 column.column_schema.name, column.column_schema.data_type
310 ),
311 })?
312 .to_parts();
313 self.rows.schema.push(ColumnSchema {
314 column_name: column.column_schema.name.clone(),
315 datatype: datatype as i32,
316 semantic_type: column.semantic_type as i32,
317 datatype_extension: datatype_ext,
318 options: options_from_column_schema(&column.column_schema),
319 });
320 }
321
322 Ok(())
323 }
324
325 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
327 if self.op_type == OpType::Delete {
328 if column.semantic_type == SemanticType::Field {
329 return Ok(());
332 } else {
333 return InvalidRequestSnafu {
334 region_id: self.region_id,
335 reason: format!("delete requests need column {}", column.column_schema.name),
336 }
337 .fail();
338 }
339 }
340
341 ensure!(
343 column.column_schema.is_nullable()
344 || column.column_schema.default_constraint().is_some(),
345 InvalidRequestSnafu {
346 region_id: self.region_id,
347 reason: format!("missing column {}", column.column_schema.name),
348 }
349 );
350
351 Ok(())
352 }
353
354 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
356 let default_value = match self.op_type {
357 OpType::Delete => {
358 ensure!(
359 column.semantic_type == SemanticType::Field,
360 InvalidRequestSnafu {
361 region_id: self.region_id,
362 reason: format!(
363 "delete requests need column {}",
364 column.column_schema.name
365 ),
366 }
367 );
368
369 if column.column_schema.is_nullable() {
374 datatypes::value::Value::Null
375 } else {
376 column.column_schema.data_type.default_value()
377 }
378 }
379 OpType::Put => {
380 if column.column_schema.is_default_impure() {
382 UnexpectedImpureDefaultSnafu {
383 region_id: self.region_id,
384 column: &column.column_schema.name,
385 default_value: format!("{:?}", column.column_schema.default_constraint()),
386 }
387 .fail()?
388 }
389 column
390 .column_schema
391 .create_default()
392 .context(CreateDefaultSnafu {
393 region_id: self.region_id,
394 column: &column.column_schema.name,
395 })?
396 .with_context(|| InvalidRequestSnafu {
398 region_id: self.region_id,
399 reason: format!(
400 "column {} does not have default value",
401 column.column_schema.name
402 ),
403 })?
404 }
405 };
406
407 to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
409 region_id: self.region_id,
410 reason: format!(
411 "no protobuf type for default value of column {} ({:?})",
412 column.column_schema.name, column.column_schema.data_type
413 ),
414 })
415 }
416}
417
418pub(crate) fn validate_proto_value(
420 region_id: RegionId,
421 value: &Value,
422 column_schema: &ColumnSchema,
423) -> Result<()> {
424 if let Some(value_type) = proto_value_type(value) {
425 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
426 InvalidRequestSnafu {
427 region_id,
428 reason: format!(
429 "column {} has unknown type {}",
430 column_schema.column_name, column_schema.datatype
431 ),
432 }
433 .build()
434 })?;
435 ensure!(
436 proto_value_type_match(column_type, value_type),
437 InvalidRequestSnafu {
438 region_id,
439 reason: format!(
440 "value has type {:?}, but column {} has type {:?}({})",
441 value_type, column_schema.column_name, column_type, column_schema.datatype,
442 ),
443 }
444 );
445 }
446
447 Ok(())
448}
449
450fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
451 match (column_type, value_type) {
452 (ct, vt) if ct == vt => true,
453 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
454 (ColumnDataType::Json, ColumnDataType::Binary) => true,
455 _ => false,
456 }
457}
458
459#[derive(Debug)]
461pub struct OutputTx(Sender<Result<AffectedRows>>);
462
463impl OutputTx {
464 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
466 OutputTx(sender)
467 }
468
469 pub(crate) fn send(self, result: Result<AffectedRows>) {
471 let _ = self.0.send(result);
473 }
474}
475
476#[derive(Debug)]
478pub(crate) struct OptionOutputTx(Option<OutputTx>);
479
480impl OptionOutputTx {
481 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
483 OptionOutputTx(sender)
484 }
485
486 pub(crate) fn none() -> OptionOutputTx {
488 OptionOutputTx(None)
489 }
490
491 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
493 if let Some(sender) = self.0.take() {
494 sender.send(result);
495 }
496 }
497
498 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
500 if let Some(sender) = self.0.take() {
501 sender.send(result);
502 }
503 }
504
505 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
507 self.0.take()
508 }
509}
510
511impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
512 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
513 Self::new(Some(OutputTx::new(sender)))
514 }
515}
516
517impl OnFailure for OptionOutputTx {
518 fn on_failure(&mut self, err: Error) {
519 self.send_mut(Err(err));
520 }
521}
522
523pub(crate) trait OnFailure {
525 fn on_failure(&mut self, err: Error);
527}
528
529#[derive(Debug)]
531pub(crate) struct SenderWriteRequest {
532 pub(crate) sender: OptionOutputTx,
534 pub(crate) request: WriteRequest,
535}
536
537#[derive(Debug)]
539pub(crate) enum WorkerRequest {
540 Write(SenderWriteRequest),
542
543 Ddl(SenderDdlRequest),
545
546 Background {
548 region_id: RegionId,
550 notify: BackgroundNotify,
552 },
553
554 SetRegionRoleStateGracefully {
556 region_id: RegionId,
558 region_role_state: SettableRegionRoleState,
560 sender: Sender<SetRegionRoleStateResponse>,
562 },
563
564 Stop,
566
567 EditRegion(RegionEditRequest),
569
570 SyncRegion(RegionSyncRequest),
572
573 BulkInserts {
575 metadata: Option<RegionMetadataRef>,
576 request: RegionBulkInsertsRequest,
577 sender: OptionOutputTx,
578 },
579}
580
581impl WorkerRequest {
582 pub(crate) fn new_open_region_request(
583 region_id: RegionId,
584 request: RegionOpenRequest,
585 entry_receiver: Option<WalEntryReceiver>,
586 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
587 let (sender, receiver) = oneshot::channel();
588
589 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
590 region_id,
591 sender: sender.into(),
592 request: DdlRequest::Open((request, entry_receiver)),
593 });
594
595 (worker_request, receiver)
596 }
597
598 pub(crate) fn try_from_region_request(
600 region_id: RegionId,
601 value: RegionRequest,
602 region_metadata: Option<RegionMetadataRef>,
603 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
604 let (sender, receiver) = oneshot::channel();
605 let worker_request = match value {
606 RegionRequest::Put(v) => {
607 let mut write_request =
608 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
609 .with_hint(v.hint);
610 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
611 && let Some(region_metadata) = ®ion_metadata
612 {
613 write_request.maybe_fill_missing_columns(region_metadata)?;
614 }
615 WorkerRequest::Write(SenderWriteRequest {
616 sender: sender.into(),
617 request: write_request,
618 })
619 }
620 RegionRequest::Delete(v) => {
621 let mut write_request =
622 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
623 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
624 && let Some(region_metadata) = ®ion_metadata
625 {
626 write_request.maybe_fill_missing_columns(region_metadata)?;
627 }
628 WorkerRequest::Write(SenderWriteRequest {
629 sender: sender.into(),
630 request: write_request,
631 })
632 }
633 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
634 region_id,
635 sender: sender.into(),
636 request: DdlRequest::Create(v),
637 }),
638 RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
639 region_id,
640 sender: sender.into(),
641 request: DdlRequest::Drop,
642 }),
643 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
644 region_id,
645 sender: sender.into(),
646 request: DdlRequest::Open((v, None)),
647 }),
648 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
649 region_id,
650 sender: sender.into(),
651 request: DdlRequest::Close(v),
652 }),
653 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
654 region_id,
655 sender: sender.into(),
656 request: DdlRequest::Alter(v),
657 }),
658 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
659 region_id,
660 sender: sender.into(),
661 request: DdlRequest::Flush(v),
662 }),
663 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
664 region_id,
665 sender: sender.into(),
666 request: DdlRequest::Compact(v),
667 }),
668 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
669 region_id,
670 sender: sender.into(),
671 request: DdlRequest::Truncate(v),
672 }),
673 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
674 region_id,
675 sender: sender.into(),
676 request: DdlRequest::Catchup(v),
677 }),
678 RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
679 metadata: region_metadata,
680 sender: sender.into(),
681 request: region_bulk_inserts_request,
682 },
683 };
684
685 Ok((worker_request, receiver))
686 }
687
688 pub(crate) fn new_set_readonly_gracefully(
689 region_id: RegionId,
690 region_role_state: SettableRegionRoleState,
691 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
692 let (sender, receiver) = oneshot::channel();
693
694 (
695 WorkerRequest::SetRegionRoleStateGracefully {
696 region_id,
697 region_role_state,
698 sender,
699 },
700 receiver,
701 )
702 }
703
704 pub(crate) fn new_sync_region_request(
705 region_id: RegionId,
706 manifest_version: ManifestVersion,
707 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
708 let (sender, receiver) = oneshot::channel();
709 (
710 WorkerRequest::SyncRegion(RegionSyncRequest {
711 region_id,
712 manifest_version,
713 sender,
714 }),
715 receiver,
716 )
717 }
718}
719
720#[derive(Debug)]
722pub(crate) enum DdlRequest {
723 Create(RegionCreateRequest),
724 Drop,
725 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
726 Close(RegionCloseRequest),
727 Alter(RegionAlterRequest),
728 Flush(RegionFlushRequest),
729 Compact(RegionCompactRequest),
730 Truncate(RegionTruncateRequest),
731 Catchup(RegionCatchupRequest),
732}
733
734#[derive(Debug)]
736pub(crate) struct SenderDdlRequest {
737 pub(crate) region_id: RegionId,
739 pub(crate) sender: OptionOutputTx,
741 pub(crate) request: DdlRequest,
743}
744
745#[derive(Debug)]
747pub(crate) enum BackgroundNotify {
748 FlushFinished(FlushFinished),
750 FlushFailed(FlushFailed),
752 CompactionFinished(CompactionFinished),
754 CompactionFailed(CompactionFailed),
756 Truncate(TruncateResult),
758 RegionChange(RegionChangeResult),
760 RegionEdit(RegionEditResult),
762}
763
764#[derive(Debug)]
766pub(crate) struct FlushFinished {
767 pub(crate) region_id: RegionId,
769 pub(crate) flushed_entry_id: EntryId,
771 pub(crate) senders: Vec<OutputTx>,
773 pub(crate) _timer: HistogramTimer,
775 pub(crate) edit: RegionEdit,
777 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
779}
780
781impl FlushFinished {
782 pub(crate) fn on_success(self) {
784 for sender in self.senders {
785 sender.send(Ok(0));
786 }
787 }
788}
789
790impl OnFailure for FlushFinished {
791 fn on_failure(&mut self, err: Error) {
792 let err = Arc::new(err);
793 for sender in self.senders.drain(..) {
794 sender.send(Err(err.clone()).context(FlushRegionSnafu {
795 region_id: self.region_id,
796 }));
797 }
798 }
799}
800
801#[derive(Debug)]
803pub(crate) struct FlushFailed {
804 pub(crate) err: Arc<Error>,
806}
807
808#[derive(Debug)]
810pub(crate) struct CompactionFinished {
811 pub(crate) region_id: RegionId,
813 pub(crate) senders: Vec<OutputTx>,
815 pub(crate) start_time: Instant,
817 pub(crate) edit: RegionEdit,
819}
820
821impl CompactionFinished {
822 pub fn on_success(self) {
823 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
825
826 for sender in self.senders {
827 sender.send(Ok(0));
828 }
829 info!("Successfully compacted region: {}", self.region_id);
830 }
831}
832
833impl OnFailure for CompactionFinished {
834 fn on_failure(&mut self, err: Error) {
836 let err = Arc::new(err);
837 for sender in self.senders.drain(..) {
838 sender.send(Err(err.clone()).context(CompactRegionSnafu {
839 region_id: self.region_id,
840 }));
841 }
842 }
843}
844
845#[derive(Debug)]
847pub(crate) struct CompactionFailed {
848 pub(crate) region_id: RegionId,
849 pub(crate) err: Arc<Error>,
851}
852
853#[derive(Debug)]
855pub(crate) struct TruncateResult {
856 pub(crate) region_id: RegionId,
858 pub(crate) sender: OptionOutputTx,
860 pub(crate) result: Result<()>,
862 pub(crate) truncated_entry_id: EntryId,
864 pub(crate) truncated_sequence: SequenceNumber,
866}
867
868#[derive(Debug)]
870pub(crate) struct RegionChangeResult {
871 pub(crate) region_id: RegionId,
873 pub(crate) new_meta: RegionMetadataRef,
875 pub(crate) sender: OptionOutputTx,
877 pub(crate) result: Result<()>,
879}
880
881#[derive(Debug)]
883pub(crate) struct RegionEditRequest {
884 pub(crate) region_id: RegionId,
885 pub(crate) edit: RegionEdit,
886 pub(crate) tx: Sender<Result<()>>,
888}
889
890#[derive(Debug)]
892pub(crate) struct RegionEditResult {
893 pub(crate) region_id: RegionId,
895 pub(crate) sender: Sender<Result<()>>,
897 pub(crate) edit: RegionEdit,
899 pub(crate) result: Result<()>,
901}
902
903#[derive(Debug)]
904pub(crate) struct RegionSyncRequest {
905 pub(crate) region_id: RegionId,
906 pub(crate) manifest_version: ManifestVersion,
907 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
909}
910
911#[cfg(test)]
912mod tests {
913 use api::v1::value::ValueData;
914 use api::v1::{Row, SemanticType};
915 use datatypes::prelude::ConcreteDataType;
916 use datatypes::schema::ColumnDefaultConstraint;
917 use store_api::metadata::RegionMetadataBuilder;
918
919 use super::*;
920 use crate::error::Error;
921 use crate::test_util::{i64_value, ts_ms_value};
922
923 fn new_column_schema(
924 name: &str,
925 data_type: ColumnDataType,
926 semantic_type: SemanticType,
927 ) -> ColumnSchema {
928 ColumnSchema {
929 column_name: name.to_string(),
930 datatype: data_type as i32,
931 semantic_type: semantic_type as i32,
932 ..Default::default()
933 }
934 }
935
936 fn check_invalid_request(err: &Error, expect: &str) {
937 if let Error::InvalidRequest {
938 region_id: _,
939 reason,
940 location: _,
941 } = err
942 {
943 assert_eq!(reason, expect);
944 } else {
945 panic!("Unexpected error {err}")
946 }
947 }
948
949 #[test]
950 fn test_write_request_duplicate_column() {
951 let rows = Rows {
952 schema: vec![
953 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
954 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
955 ],
956 rows: vec![],
957 };
958
959 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
960 check_invalid_request(&err, "duplicate column c0");
961 }
962
963 #[test]
964 fn test_valid_write_request() {
965 let rows = Rows {
966 schema: vec![
967 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
968 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
969 ],
970 rows: vec![Row {
971 values: vec![i64_value(1), i64_value(2)],
972 }],
973 };
974
975 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
976 assert_eq!(0, request.column_index_by_name("c0").unwrap());
977 assert_eq!(1, request.column_index_by_name("c1").unwrap());
978 assert_eq!(None, request.column_index_by_name("c2"));
979 }
980
981 #[test]
982 fn test_write_request_column_num() {
983 let rows = Rows {
984 schema: vec![
985 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
986 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
987 ],
988 rows: vec![Row {
989 values: vec![i64_value(1), i64_value(2), i64_value(3)],
990 }],
991 };
992
993 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
994 check_invalid_request(&err, "row has 3 columns but schema has 2");
995 }
996
997 fn new_region_metadata() -> RegionMetadata {
998 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
999 builder
1000 .push_column_metadata(ColumnMetadata {
1001 column_schema: datatypes::schema::ColumnSchema::new(
1002 "ts",
1003 ConcreteDataType::timestamp_millisecond_datatype(),
1004 false,
1005 ),
1006 semantic_type: SemanticType::Timestamp,
1007 column_id: 1,
1008 })
1009 .push_column_metadata(ColumnMetadata {
1010 column_schema: datatypes::schema::ColumnSchema::new(
1011 "k0",
1012 ConcreteDataType::int64_datatype(),
1013 true,
1014 ),
1015 semantic_type: SemanticType::Tag,
1016 column_id: 2,
1017 })
1018 .primary_key(vec![2]);
1019 builder.build().unwrap()
1020 }
1021
1022 #[test]
1023 fn test_check_schema() {
1024 let rows = Rows {
1025 schema: vec![
1026 new_column_schema(
1027 "ts",
1028 ColumnDataType::TimestampMillisecond,
1029 SemanticType::Timestamp,
1030 ),
1031 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1032 ],
1033 rows: vec![Row {
1034 values: vec![ts_ms_value(1), i64_value(2)],
1035 }],
1036 };
1037 let metadata = new_region_metadata();
1038
1039 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1040 request.check_schema(&metadata).unwrap();
1041 }
1042
1043 #[test]
1044 fn test_column_type() {
1045 let rows = Rows {
1046 schema: vec![
1047 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1048 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1049 ],
1050 rows: vec![Row {
1051 values: vec![i64_value(1), i64_value(2)],
1052 }],
1053 };
1054 let metadata = new_region_metadata();
1055
1056 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1057 let err = request.check_schema(&metadata).unwrap_err();
1058 check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)");
1059 }
1060
1061 #[test]
1062 fn test_semantic_type() {
1063 let rows = Rows {
1064 schema: vec![
1065 new_column_schema(
1066 "ts",
1067 ColumnDataType::TimestampMillisecond,
1068 SemanticType::Tag,
1069 ),
1070 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1071 ],
1072 rows: vec![Row {
1073 values: vec![ts_ms_value(1), i64_value(2)],
1074 }],
1075 };
1076 let metadata = new_region_metadata();
1077
1078 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1079 let err = request.check_schema(&metadata).unwrap_err();
1080 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1081 }
1082
1083 #[test]
1084 fn test_column_nullable() {
1085 let rows = Rows {
1086 schema: vec![
1087 new_column_schema(
1088 "ts",
1089 ColumnDataType::TimestampMillisecond,
1090 SemanticType::Timestamp,
1091 ),
1092 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1093 ],
1094 rows: vec![Row {
1095 values: vec![Value { value_data: None }, i64_value(2)],
1096 }],
1097 };
1098 let metadata = new_region_metadata();
1099
1100 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1101 let err = request.check_schema(&metadata).unwrap_err();
1102 check_invalid_request(&err, "column ts is not null but input has null");
1103 }
1104
1105 #[test]
1106 fn test_column_default() {
1107 let rows = Rows {
1108 schema: vec![new_column_schema(
1109 "k0",
1110 ColumnDataType::Int64,
1111 SemanticType::Tag,
1112 )],
1113 rows: vec![Row {
1114 values: vec![i64_value(1)],
1115 }],
1116 };
1117 let metadata = new_region_metadata();
1118
1119 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1120 let err = request.check_schema(&metadata).unwrap_err();
1121 check_invalid_request(&err, "missing column ts");
1122 }
1123
1124 #[test]
1125 fn test_unknown_column() {
1126 let rows = Rows {
1127 schema: vec![
1128 new_column_schema(
1129 "ts",
1130 ColumnDataType::TimestampMillisecond,
1131 SemanticType::Timestamp,
1132 ),
1133 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1134 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1135 ],
1136 rows: vec![Row {
1137 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1138 }],
1139 };
1140 let metadata = new_region_metadata();
1141
1142 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1143 let err = request.check_schema(&metadata).unwrap_err();
1144 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1145 }
1146
1147 #[test]
1148 fn test_fill_impure_columns_err() {
1149 let rows = Rows {
1150 schema: vec![new_column_schema(
1151 "k0",
1152 ColumnDataType::Int64,
1153 SemanticType::Tag,
1154 )],
1155 rows: vec![Row {
1156 values: vec![i64_value(1)],
1157 }],
1158 };
1159 let metadata = {
1160 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1161 builder
1162 .push_column_metadata(ColumnMetadata {
1163 column_schema: datatypes::schema::ColumnSchema::new(
1164 "ts",
1165 ConcreteDataType::timestamp_millisecond_datatype(),
1166 false,
1167 )
1168 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1169 "now()".to_string(),
1170 )))
1171 .unwrap(),
1172 semantic_type: SemanticType::Timestamp,
1173 column_id: 1,
1174 })
1175 .push_column_metadata(ColumnMetadata {
1176 column_schema: datatypes::schema::ColumnSchema::new(
1177 "k0",
1178 ConcreteDataType::int64_datatype(),
1179 true,
1180 ),
1181 semantic_type: SemanticType::Tag,
1182 column_id: 2,
1183 })
1184 .primary_key(vec![2]);
1185 builder.build().unwrap()
1186 };
1187
1188 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1189 let err = request.check_schema(&metadata).unwrap_err();
1190 assert!(err.is_fill_default());
1191 assert!(request
1192 .fill_missing_columns(&metadata)
1193 .unwrap_err()
1194 .to_string()
1195 .contains("Unexpected impure default value with region_id"));
1196 }
1197
1198 #[test]
1199 fn test_fill_missing_columns() {
1200 let rows = Rows {
1201 schema: vec![new_column_schema(
1202 "ts",
1203 ColumnDataType::TimestampMillisecond,
1204 SemanticType::Timestamp,
1205 )],
1206 rows: vec![Row {
1207 values: vec![ts_ms_value(1)],
1208 }],
1209 };
1210 let metadata = new_region_metadata();
1211
1212 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1213 let err = request.check_schema(&metadata).unwrap_err();
1214 assert!(err.is_fill_default());
1215 request.fill_missing_columns(&metadata).unwrap();
1216
1217 let expect_rows = Rows {
1218 schema: vec![new_column_schema(
1219 "ts",
1220 ColumnDataType::TimestampMillisecond,
1221 SemanticType::Timestamp,
1222 )],
1223 rows: vec![Row {
1224 values: vec![ts_ms_value(1)],
1225 }],
1226 };
1227 assert_eq!(expect_rows, request.rows);
1228 }
1229
1230 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1231 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1232 builder
1233 .push_column_metadata(ColumnMetadata {
1234 column_schema: datatypes::schema::ColumnSchema::new(
1235 "ts",
1236 ConcreteDataType::timestamp_millisecond_datatype(),
1237 false,
1238 ),
1239 semantic_type: SemanticType::Timestamp,
1240 column_id: 1,
1241 })
1242 .push_column_metadata(ColumnMetadata {
1243 column_schema: datatypes::schema::ColumnSchema::new(
1244 "k0",
1245 ConcreteDataType::int64_datatype(),
1246 true,
1247 ),
1248 semantic_type: SemanticType::Tag,
1249 column_id: 2,
1250 })
1251 .primary_key(vec![2]);
1252 builder
1253 }
1254
1255 fn region_metadata_two_fields() -> RegionMetadata {
1256 let mut builder = builder_with_ts_tag();
1257 builder
1258 .push_column_metadata(ColumnMetadata {
1259 column_schema: datatypes::schema::ColumnSchema::new(
1260 "f0",
1261 ConcreteDataType::int64_datatype(),
1262 true,
1263 ),
1264 semantic_type: SemanticType::Field,
1265 column_id: 3,
1266 })
1267 .push_column_metadata(ColumnMetadata {
1269 column_schema: datatypes::schema::ColumnSchema::new(
1270 "f1",
1271 ConcreteDataType::int64_datatype(),
1272 false,
1273 )
1274 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1275 datatypes::value::Value::Int64(100),
1276 )))
1277 .unwrap(),
1278 semantic_type: SemanticType::Field,
1279 column_id: 4,
1280 });
1281 builder.build().unwrap()
1282 }
1283
1284 #[test]
1285 fn test_fill_missing_for_delete() {
1286 let rows = Rows {
1287 schema: vec![new_column_schema(
1288 "ts",
1289 ColumnDataType::TimestampMillisecond,
1290 SemanticType::Timestamp,
1291 )],
1292 rows: vec![Row {
1293 values: vec![ts_ms_value(1)],
1294 }],
1295 };
1296 let metadata = region_metadata_two_fields();
1297
1298 let mut request =
1299 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1300 let err = request.check_schema(&metadata).unwrap_err();
1301 check_invalid_request(&err, "delete requests need column k0");
1302 let err = request.fill_missing_columns(&metadata).unwrap_err();
1303 check_invalid_request(&err, "delete requests need column k0");
1304
1305 let rows = Rows {
1306 schema: vec![
1307 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1308 new_column_schema(
1309 "ts",
1310 ColumnDataType::TimestampMillisecond,
1311 SemanticType::Timestamp,
1312 ),
1313 ],
1314 rows: vec![Row {
1315 values: vec![i64_value(100), ts_ms_value(1)],
1316 }],
1317 };
1318 let mut request =
1319 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1320 let err = request.check_schema(&metadata).unwrap_err();
1321 assert!(err.is_fill_default());
1322 request.fill_missing_columns(&metadata).unwrap();
1323
1324 let expect_rows = Rows {
1325 schema: vec![
1326 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1327 new_column_schema(
1328 "ts",
1329 ColumnDataType::TimestampMillisecond,
1330 SemanticType::Timestamp,
1331 ),
1332 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1333 ],
1334 rows: vec![Row {
1336 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1337 }],
1338 };
1339 assert_eq!(expect_rows, request.rows);
1340 }
1341
1342 #[test]
1343 fn test_fill_missing_without_default_in_delete() {
1344 let mut builder = builder_with_ts_tag();
1345 builder
1346 .push_column_metadata(ColumnMetadata {
1348 column_schema: datatypes::schema::ColumnSchema::new(
1349 "f0",
1350 ConcreteDataType::int64_datatype(),
1351 true,
1352 ),
1353 semantic_type: SemanticType::Field,
1354 column_id: 3,
1355 })
1356 .push_column_metadata(ColumnMetadata {
1358 column_schema: datatypes::schema::ColumnSchema::new(
1359 "f1",
1360 ConcreteDataType::int64_datatype(),
1361 false,
1362 ),
1363 semantic_type: SemanticType::Field,
1364 column_id: 4,
1365 });
1366 let metadata = builder.build().unwrap();
1367
1368 let rows = Rows {
1369 schema: vec![
1370 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1371 new_column_schema(
1372 "ts",
1373 ColumnDataType::TimestampMillisecond,
1374 SemanticType::Timestamp,
1375 ),
1376 ],
1377 rows: vec![Row {
1379 values: vec![i64_value(100), ts_ms_value(1)],
1380 }],
1381 };
1382 let mut request =
1383 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1384 let err = request.check_schema(&metadata).unwrap_err();
1385 assert!(err.is_fill_default());
1386 request.fill_missing_columns(&metadata).unwrap();
1387
1388 let expect_rows = Rows {
1389 schema: vec![
1390 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1391 new_column_schema(
1392 "ts",
1393 ColumnDataType::TimestampMillisecond,
1394 SemanticType::Timestamp,
1395 ),
1396 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1397 ],
1398 rows: vec![Row {
1400 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1401 }],
1402 };
1403 assert_eq!(expect_rows, request.rows);
1404 }
1405
1406 #[test]
1407 fn test_no_default() {
1408 let rows = Rows {
1409 schema: vec![new_column_schema(
1410 "k0",
1411 ColumnDataType::Int64,
1412 SemanticType::Tag,
1413 )],
1414 rows: vec![Row {
1415 values: vec![i64_value(1)],
1416 }],
1417 };
1418 let metadata = new_region_metadata();
1419
1420 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1421 let err = request.fill_missing_columns(&metadata).unwrap_err();
1422 check_invalid_request(&err, "column ts does not have default value");
1423 }
1424
1425 #[test]
1426 fn test_missing_and_invalid() {
1427 let rows = Rows {
1429 schema: vec![
1430 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1431 new_column_schema(
1432 "ts",
1433 ColumnDataType::TimestampMillisecond,
1434 SemanticType::Timestamp,
1435 ),
1436 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1437 ],
1438 rows: vec![Row {
1439 values: vec![
1440 i64_value(100),
1441 ts_ms_value(1),
1442 Value {
1443 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1444 },
1445 ],
1446 }],
1447 };
1448 let metadata = region_metadata_two_fields();
1449
1450 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1451 let err = request.check_schema(&metadata).unwrap_err();
1452 check_invalid_request(
1453 &err,
1454 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1455 );
1456 }
1457
1458 #[test]
1459 fn test_write_request_metadata() {
1460 let rows = Rows {
1461 schema: vec![
1462 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1463 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1464 ],
1465 rows: vec![Row {
1466 values: vec![i64_value(1), i64_value(2)],
1467 }],
1468 };
1469
1470 let metadata = Arc::new(new_region_metadata());
1471 let request = WriteRequest::new(
1472 RegionId::new(1, 1),
1473 OpType::Put,
1474 rows,
1475 Some(metadata.clone()),
1476 )
1477 .unwrap();
1478
1479 assert!(request.region_metadata.is_some());
1480 assert_eq!(
1481 request.region_metadata.unwrap().region_id,
1482 RegionId::new(1, 1)
1483 );
1484 }
1485}