1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value,
23 ColumnDataTypeWrapper,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{ensure, OptionExt, ResultExt};
33use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
34use store_api::manifest::ManifestVersion;
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
37use store_api::region_request::{
38 AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
39 RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
40 RegionOpenRequest, RegionRequest, RegionTruncateRequest,
41};
42use store_api::storage::{RegionId, SequenceNumber};
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46 CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47 FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedImpureDefaultSnafu,
48};
49use crate::manifest::action::RegionEdit;
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::MemtableId;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::wal::entry_distributor::WalEntryReceiver;
54use crate::wal::EntryId;
55
56#[derive(Debug)]
58pub struct WriteRequest {
59 pub region_id: RegionId,
61 pub op_type: OpType,
63 pub rows: Rows,
65 pub name_to_index: HashMap<String, usize>,
67 pub has_null: Vec<bool>,
69 pub hint: Option<WriteHint>,
71 pub(crate) region_metadata: Option<RegionMetadataRef>,
73}
74
75impl WriteRequest {
76 pub fn new(
80 region_id: RegionId,
81 op_type: OpType,
82 rows: Rows,
83 region_metadata: Option<RegionMetadataRef>,
84 ) -> Result<WriteRequest> {
85 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
86 for (index, column) in rows.schema.iter().enumerate() {
87 ensure!(
88 name_to_index
89 .insert(column.column_name.clone(), index)
90 .is_none(),
91 InvalidRequestSnafu {
92 region_id,
93 reason: format!("duplicate column {}", column.column_name),
94 }
95 );
96 }
97
98 let mut has_null = vec![false; rows.schema.len()];
99 for row in &rows.rows {
100 ensure!(
101 row.values.len() == rows.schema.len(),
102 InvalidRequestSnafu {
103 region_id,
104 reason: format!(
105 "row has {} columns but schema has {}",
106 row.values.len(),
107 rows.schema.len()
108 ),
109 }
110 );
111
112 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
113 validate_proto_value(region_id, value, column_schema)?;
114
115 if value.value_data.is_none() {
116 has_null[i] = true;
117 }
118 }
119 }
120
121 Ok(WriteRequest {
122 region_id,
123 op_type,
124 rows,
125 name_to_index,
126 has_null,
127 hint: None,
128 region_metadata,
129 })
130 }
131
132 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
134 self.hint = hint;
135 self
136 }
137
138 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
140 infer_primary_key_encoding_from_hint(self.hint.as_ref())
141 }
142
143 pub(crate) fn estimated_size(&self) -> usize {
145 let row_size = self
146 .rows
147 .rows
148 .first()
149 .map(|row| row.encoded_len())
150 .unwrap_or(0);
151 row_size * self.rows.rows.len()
152 }
153
154 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
156 self.name_to_index.get(name).copied()
157 }
158
159 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
164 debug_assert_eq!(self.region_id, metadata.region_id);
165
166 let region_id = self.region_id;
167 let mut rows_columns: HashMap<_, _> = self
169 .rows
170 .schema
171 .iter()
172 .map(|column| (&column.column_name, column))
173 .collect();
174
175 let mut need_fill_default = false;
176 for column in &metadata.column_metadatas {
178 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
179 ensure!(
181 is_column_type_value_eq(
182 input_col.datatype,
183 input_col.datatype_extension,
184 &column.column_schema.data_type
185 ),
186 InvalidRequestSnafu {
187 region_id,
188 reason: format!(
189 "column {} expect type {:?}, given: {}({})",
190 column.column_schema.name,
191 column.column_schema.data_type,
192 ColumnDataType::try_from(input_col.datatype)
193 .map(|v| v.as_str_name())
194 .unwrap_or("Unknown"),
195 input_col.datatype,
196 )
197 }
198 );
199
200 ensure!(
202 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
203 InvalidRequestSnafu {
204 region_id,
205 reason: format!(
206 "column {} has semantic type {:?}, given: {}({})",
207 column.column_schema.name,
208 column.semantic_type,
209 api::v1::SemanticType::try_from(input_col.semantic_type)
210 .map(|v| v.as_str_name())
211 .unwrap_or("Unknown"),
212 input_col.semantic_type
213 ),
214 }
215 );
216
217 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
220 ensure!(
221 !has_null || column.column_schema.is_nullable(),
222 InvalidRequestSnafu {
223 region_id,
224 reason: format!(
225 "column {} is not null but input has null",
226 column.column_schema.name
227 ),
228 }
229 );
230 } else {
231 self.check_missing_column(column)?;
233
234 need_fill_default = true;
235 }
236 }
237
238 if !rows_columns.is_empty() {
240 let names: Vec<_> = rows_columns.into_keys().collect();
241 return InvalidRequestSnafu {
242 region_id,
243 reason: format!("unknown columns: {:?}", names),
244 }
245 .fail();
246 }
247
248 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
250
251 Ok(())
252 }
253
254 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
259 debug_assert_eq!(self.region_id, metadata.region_id);
260
261 let mut columns_to_fill = vec![];
262 for column in &metadata.column_metadatas {
263 if !self.name_to_index.contains_key(&column.column_schema.name) {
264 columns_to_fill.push(column);
265 }
266 }
267 self.fill_columns(columns_to_fill)?;
268
269 Ok(())
270 }
271
272 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
274 if let Err(e) = self.check_schema(metadata) {
275 if e.is_fill_default() {
276 self.fill_missing_columns(metadata)?;
280 } else {
281 return Err(e);
282 }
283 }
284
285 Ok(())
286 }
287
288 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
290 let mut default_values = Vec::with_capacity(columns.len());
291 let mut columns_to_fill = Vec::with_capacity(columns.len());
292 for column in columns {
293 let default_value = self.column_default_value(column)?;
294 if default_value.value_data.is_some() {
295 default_values.push(default_value);
296 columns_to_fill.push(column);
297 }
298 }
299
300 for row in &mut self.rows.rows {
301 row.values.extend(default_values.iter().cloned());
302 }
303
304 for column in columns_to_fill {
305 let (datatype, datatype_ext) =
306 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
307 .with_context(|_| ConvertColumnDataTypeSnafu {
308 reason: format!(
309 "no protobuf type for column {} ({:?})",
310 column.column_schema.name, column.column_schema.data_type
311 ),
312 })?
313 .to_parts();
314 self.rows.schema.push(ColumnSchema {
315 column_name: column.column_schema.name.clone(),
316 datatype: datatype as i32,
317 semantic_type: column.semantic_type as i32,
318 datatype_extension: datatype_ext,
319 options: options_from_column_schema(&column.column_schema),
320 });
321 }
322
323 Ok(())
324 }
325
326 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
328 if self.op_type == OpType::Delete {
329 if column.semantic_type == SemanticType::Field {
330 return Ok(());
333 } else {
334 return InvalidRequestSnafu {
335 region_id: self.region_id,
336 reason: format!("delete requests need column {}", column.column_schema.name),
337 }
338 .fail();
339 }
340 }
341
342 ensure!(
344 column.column_schema.is_nullable()
345 || column.column_schema.default_constraint().is_some(),
346 InvalidRequestSnafu {
347 region_id: self.region_id,
348 reason: format!("missing column {}", column.column_schema.name),
349 }
350 );
351
352 Ok(())
353 }
354
355 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
357 let default_value = match self.op_type {
358 OpType::Delete => {
359 ensure!(
360 column.semantic_type == SemanticType::Field,
361 InvalidRequestSnafu {
362 region_id: self.region_id,
363 reason: format!(
364 "delete requests need column {}",
365 column.column_schema.name
366 ),
367 }
368 );
369
370 if column.column_schema.is_nullable() {
375 datatypes::value::Value::Null
376 } else {
377 column.column_schema.data_type.default_value()
378 }
379 }
380 OpType::Put => {
381 if column.column_schema.is_default_impure() {
383 UnexpectedImpureDefaultSnafu {
384 region_id: self.region_id,
385 column: &column.column_schema.name,
386 default_value: format!("{:?}", column.column_schema.default_constraint()),
387 }
388 .fail()?
389 }
390 column
391 .column_schema
392 .create_default()
393 .context(CreateDefaultSnafu {
394 region_id: self.region_id,
395 column: &column.column_schema.name,
396 })?
397 .with_context(|| InvalidRequestSnafu {
399 region_id: self.region_id,
400 reason: format!(
401 "column {} does not have default value",
402 column.column_schema.name
403 ),
404 })?
405 }
406 };
407
408 to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
410 region_id: self.region_id,
411 reason: format!(
412 "no protobuf type for default value of column {} ({:?})",
413 column.column_schema.name, column.column_schema.data_type
414 ),
415 })
416 }
417}
418
419pub(crate) fn validate_proto_value(
421 region_id: RegionId,
422 value: &Value,
423 column_schema: &ColumnSchema,
424) -> Result<()> {
425 if let Some(value_type) = proto_value_type(value) {
426 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
427 InvalidRequestSnafu {
428 region_id,
429 reason: format!(
430 "column {} has unknown type {}",
431 column_schema.column_name, column_schema.datatype
432 ),
433 }
434 .build()
435 })?;
436 ensure!(
437 proto_value_type_match(column_type, value_type),
438 InvalidRequestSnafu {
439 region_id,
440 reason: format!(
441 "value has type {:?}, but column {} has type {:?}({})",
442 value_type, column_schema.column_name, column_type, column_schema.datatype,
443 ),
444 }
445 );
446 }
447
448 Ok(())
449}
450
451fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
452 match (column_type, value_type) {
453 (ct, vt) if ct == vt => true,
454 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
455 (ColumnDataType::Json, ColumnDataType::Binary) => true,
456 _ => false,
457 }
458}
459
460#[derive(Debug)]
462pub struct OutputTx(Sender<Result<AffectedRows>>);
463
464impl OutputTx {
465 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
467 OutputTx(sender)
468 }
469
470 pub(crate) fn send(self, result: Result<AffectedRows>) {
472 let _ = self.0.send(result);
474 }
475}
476
477#[derive(Debug)]
479pub(crate) struct OptionOutputTx(Option<OutputTx>);
480
481impl OptionOutputTx {
482 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
484 OptionOutputTx(sender)
485 }
486
487 pub(crate) fn none() -> OptionOutputTx {
489 OptionOutputTx(None)
490 }
491
492 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
494 if let Some(sender) = self.0.take() {
495 sender.send(result);
496 }
497 }
498
499 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
501 if let Some(sender) = self.0.take() {
502 sender.send(result);
503 }
504 }
505
506 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
508 self.0.take()
509 }
510}
511
512impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
513 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
514 Self::new(Some(OutputTx::new(sender)))
515 }
516}
517
518impl OnFailure for OptionOutputTx {
519 fn on_failure(&mut self, err: Error) {
520 self.send_mut(Err(err));
521 }
522}
523
524pub(crate) trait OnFailure {
526 fn on_failure(&mut self, err: Error);
528}
529
530#[derive(Debug)]
532pub(crate) struct SenderWriteRequest {
533 pub(crate) sender: OptionOutputTx,
535 pub(crate) request: WriteRequest,
536}
537
538pub(crate) struct SenderBulkRequest {
539 pub(crate) sender: OptionOutputTx,
540 pub(crate) region_id: RegionId,
541 pub(crate) request: BulkPart,
542 pub(crate) region_metadata: RegionMetadataRef,
543}
544
545#[derive(Debug)]
547pub(crate) enum WorkerRequest {
548 Write(SenderWriteRequest),
550
551 Ddl(SenderDdlRequest),
553
554 Background {
556 region_id: RegionId,
558 notify: BackgroundNotify,
560 },
561
562 SetRegionRoleStateGracefully {
564 region_id: RegionId,
566 region_role_state: SettableRegionRoleState,
568 sender: Sender<SetRegionRoleStateResponse>,
570 },
571
572 Stop,
574
575 EditRegion(RegionEditRequest),
577
578 SyncRegion(RegionSyncRequest),
580
581 BulkInserts {
583 metadata: Option<RegionMetadataRef>,
584 request: RegionBulkInsertsRequest,
585 sender: OptionOutputTx,
586 },
587}
588
589impl WorkerRequest {
590 pub(crate) fn new_open_region_request(
591 region_id: RegionId,
592 request: RegionOpenRequest,
593 entry_receiver: Option<WalEntryReceiver>,
594 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
595 let (sender, receiver) = oneshot::channel();
596
597 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
598 region_id,
599 sender: sender.into(),
600 request: DdlRequest::Open((request, entry_receiver)),
601 });
602
603 (worker_request, receiver)
604 }
605
606 pub(crate) fn try_from_region_request(
608 region_id: RegionId,
609 value: RegionRequest,
610 region_metadata: Option<RegionMetadataRef>,
611 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
612 let (sender, receiver) = oneshot::channel();
613 let worker_request = match value {
614 RegionRequest::Put(v) => {
615 let mut write_request =
616 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
617 .with_hint(v.hint);
618 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
619 && let Some(region_metadata) = ®ion_metadata
620 {
621 write_request.maybe_fill_missing_columns(region_metadata)?;
622 }
623 WorkerRequest::Write(SenderWriteRequest {
624 sender: sender.into(),
625 request: write_request,
626 })
627 }
628 RegionRequest::Delete(v) => {
629 let mut write_request =
630 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
631 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
632 && let Some(region_metadata) = ®ion_metadata
633 {
634 write_request.maybe_fill_missing_columns(region_metadata)?;
635 }
636 WorkerRequest::Write(SenderWriteRequest {
637 sender: sender.into(),
638 request: write_request,
639 })
640 }
641 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
642 region_id,
643 sender: sender.into(),
644 request: DdlRequest::Create(v),
645 }),
646 RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
647 region_id,
648 sender: sender.into(),
649 request: DdlRequest::Drop,
650 }),
651 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
652 region_id,
653 sender: sender.into(),
654 request: DdlRequest::Open((v, None)),
655 }),
656 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
657 region_id,
658 sender: sender.into(),
659 request: DdlRequest::Close(v),
660 }),
661 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
662 region_id,
663 sender: sender.into(),
664 request: DdlRequest::Alter(v),
665 }),
666 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
667 region_id,
668 sender: sender.into(),
669 request: DdlRequest::Flush(v),
670 }),
671 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
672 region_id,
673 sender: sender.into(),
674 request: DdlRequest::Compact(v),
675 }),
676 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
677 region_id,
678 sender: sender.into(),
679 request: DdlRequest::Truncate(v),
680 }),
681 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
682 region_id,
683 sender: sender.into(),
684 request: DdlRequest::Catchup(v),
685 }),
686 RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
687 metadata: region_metadata,
688 sender: sender.into(),
689 request: region_bulk_inserts_request,
690 },
691 };
692
693 Ok((worker_request, receiver))
694 }
695
696 pub(crate) fn new_set_readonly_gracefully(
697 region_id: RegionId,
698 region_role_state: SettableRegionRoleState,
699 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
700 let (sender, receiver) = oneshot::channel();
701
702 (
703 WorkerRequest::SetRegionRoleStateGracefully {
704 region_id,
705 region_role_state,
706 sender,
707 },
708 receiver,
709 )
710 }
711
712 pub(crate) fn new_sync_region_request(
713 region_id: RegionId,
714 manifest_version: ManifestVersion,
715 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
716 let (sender, receiver) = oneshot::channel();
717 (
718 WorkerRequest::SyncRegion(RegionSyncRequest {
719 region_id,
720 manifest_version,
721 sender,
722 }),
723 receiver,
724 )
725 }
726}
727
728#[derive(Debug)]
730pub(crate) enum DdlRequest {
731 Create(RegionCreateRequest),
732 Drop,
733 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
734 Close(RegionCloseRequest),
735 Alter(RegionAlterRequest),
736 Flush(RegionFlushRequest),
737 Compact(RegionCompactRequest),
738 Truncate(RegionTruncateRequest),
739 Catchup(RegionCatchupRequest),
740}
741
742#[derive(Debug)]
744pub(crate) struct SenderDdlRequest {
745 pub(crate) region_id: RegionId,
747 pub(crate) sender: OptionOutputTx,
749 pub(crate) request: DdlRequest,
751}
752
753#[derive(Debug)]
755pub(crate) enum BackgroundNotify {
756 FlushFinished(FlushFinished),
758 FlushFailed(FlushFailed),
760 CompactionFinished(CompactionFinished),
762 CompactionFailed(CompactionFailed),
764 Truncate(TruncateResult),
766 RegionChange(RegionChangeResult),
768 RegionEdit(RegionEditResult),
770}
771
772#[derive(Debug)]
774pub(crate) struct FlushFinished {
775 pub(crate) region_id: RegionId,
777 pub(crate) flushed_entry_id: EntryId,
779 pub(crate) senders: Vec<OutputTx>,
781 pub(crate) _timer: HistogramTimer,
783 pub(crate) edit: RegionEdit,
785 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
787}
788
789impl FlushFinished {
790 pub(crate) fn on_success(self) {
792 for sender in self.senders {
793 sender.send(Ok(0));
794 }
795 }
796}
797
798impl OnFailure for FlushFinished {
799 fn on_failure(&mut self, err: Error) {
800 let err = Arc::new(err);
801 for sender in self.senders.drain(..) {
802 sender.send(Err(err.clone()).context(FlushRegionSnafu {
803 region_id: self.region_id,
804 }));
805 }
806 }
807}
808
809#[derive(Debug)]
811pub(crate) struct FlushFailed {
812 pub(crate) err: Arc<Error>,
814}
815
816#[derive(Debug)]
818pub(crate) struct CompactionFinished {
819 pub(crate) region_id: RegionId,
821 pub(crate) senders: Vec<OutputTx>,
823 pub(crate) start_time: Instant,
825 pub(crate) edit: RegionEdit,
827}
828
829impl CompactionFinished {
830 pub fn on_success(self) {
831 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
833
834 for sender in self.senders {
835 sender.send(Ok(0));
836 }
837 info!("Successfully compacted region: {}", self.region_id);
838 }
839}
840
841impl OnFailure for CompactionFinished {
842 fn on_failure(&mut self, err: Error) {
844 let err = Arc::new(err);
845 for sender in self.senders.drain(..) {
846 sender.send(Err(err.clone()).context(CompactRegionSnafu {
847 region_id: self.region_id,
848 }));
849 }
850 }
851}
852
853#[derive(Debug)]
855pub(crate) struct CompactionFailed {
856 pub(crate) region_id: RegionId,
857 pub(crate) err: Arc<Error>,
859}
860
861#[derive(Debug)]
863pub(crate) struct TruncateResult {
864 pub(crate) region_id: RegionId,
866 pub(crate) sender: OptionOutputTx,
868 pub(crate) result: Result<()>,
870 pub(crate) truncated_entry_id: EntryId,
872 pub(crate) truncated_sequence: SequenceNumber,
874}
875
876#[derive(Debug)]
878pub(crate) struct RegionChangeResult {
879 pub(crate) region_id: RegionId,
881 pub(crate) new_meta: RegionMetadataRef,
883 pub(crate) sender: OptionOutputTx,
885 pub(crate) result: Result<()>,
887}
888
889#[derive(Debug)]
891pub(crate) struct RegionEditRequest {
892 pub(crate) region_id: RegionId,
893 pub(crate) edit: RegionEdit,
894 pub(crate) tx: Sender<Result<()>>,
896}
897
898#[derive(Debug)]
900pub(crate) struct RegionEditResult {
901 pub(crate) region_id: RegionId,
903 pub(crate) sender: Sender<Result<()>>,
905 pub(crate) edit: RegionEdit,
907 pub(crate) result: Result<()>,
909}
910
911#[derive(Debug)]
912pub(crate) struct RegionSyncRequest {
913 pub(crate) region_id: RegionId,
914 pub(crate) manifest_version: ManifestVersion,
915 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
917}
918
919#[cfg(test)]
920mod tests {
921 use api::v1::value::ValueData;
922 use api::v1::{Row, SemanticType};
923 use datatypes::prelude::ConcreteDataType;
924 use datatypes::schema::ColumnDefaultConstraint;
925 use mito_codec::test_util::i64_value;
926 use store_api::metadata::RegionMetadataBuilder;
927
928 use super::*;
929 use crate::error::Error;
930 use crate::test_util::ts_ms_value;
931
932 fn new_column_schema(
933 name: &str,
934 data_type: ColumnDataType,
935 semantic_type: SemanticType,
936 ) -> ColumnSchema {
937 ColumnSchema {
938 column_name: name.to_string(),
939 datatype: data_type as i32,
940 semantic_type: semantic_type as i32,
941 ..Default::default()
942 }
943 }
944
945 fn check_invalid_request(err: &Error, expect: &str) {
946 if let Error::InvalidRequest {
947 region_id: _,
948 reason,
949 location: _,
950 } = err
951 {
952 assert_eq!(reason, expect);
953 } else {
954 panic!("Unexpected error {err}")
955 }
956 }
957
958 #[test]
959 fn test_write_request_duplicate_column() {
960 let rows = Rows {
961 schema: vec![
962 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
963 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
964 ],
965 rows: vec![],
966 };
967
968 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
969 check_invalid_request(&err, "duplicate column c0");
970 }
971
972 #[test]
973 fn test_valid_write_request() {
974 let rows = Rows {
975 schema: vec![
976 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
977 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
978 ],
979 rows: vec![Row {
980 values: vec![i64_value(1), i64_value(2)],
981 }],
982 };
983
984 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
985 assert_eq!(0, request.column_index_by_name("c0").unwrap());
986 assert_eq!(1, request.column_index_by_name("c1").unwrap());
987 assert_eq!(None, request.column_index_by_name("c2"));
988 }
989
990 #[test]
991 fn test_write_request_column_num() {
992 let rows = Rows {
993 schema: vec![
994 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
995 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
996 ],
997 rows: vec![Row {
998 values: vec![i64_value(1), i64_value(2), i64_value(3)],
999 }],
1000 };
1001
1002 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1003 check_invalid_request(&err, "row has 3 columns but schema has 2");
1004 }
1005
1006 fn new_region_metadata() -> RegionMetadata {
1007 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1008 builder
1009 .push_column_metadata(ColumnMetadata {
1010 column_schema: datatypes::schema::ColumnSchema::new(
1011 "ts",
1012 ConcreteDataType::timestamp_millisecond_datatype(),
1013 false,
1014 ),
1015 semantic_type: SemanticType::Timestamp,
1016 column_id: 1,
1017 })
1018 .push_column_metadata(ColumnMetadata {
1019 column_schema: datatypes::schema::ColumnSchema::new(
1020 "k0",
1021 ConcreteDataType::int64_datatype(),
1022 true,
1023 ),
1024 semantic_type: SemanticType::Tag,
1025 column_id: 2,
1026 })
1027 .primary_key(vec![2]);
1028 builder.build().unwrap()
1029 }
1030
1031 #[test]
1032 fn test_check_schema() {
1033 let rows = Rows {
1034 schema: vec![
1035 new_column_schema(
1036 "ts",
1037 ColumnDataType::TimestampMillisecond,
1038 SemanticType::Timestamp,
1039 ),
1040 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1041 ],
1042 rows: vec![Row {
1043 values: vec![ts_ms_value(1), i64_value(2)],
1044 }],
1045 };
1046 let metadata = new_region_metadata();
1047
1048 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1049 request.check_schema(&metadata).unwrap();
1050 }
1051
1052 #[test]
1053 fn test_column_type() {
1054 let rows = Rows {
1055 schema: vec![
1056 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1057 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1058 ],
1059 rows: vec![Row {
1060 values: vec![i64_value(1), i64_value(2)],
1061 }],
1062 };
1063 let metadata = new_region_metadata();
1064
1065 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1066 let err = request.check_schema(&metadata).unwrap_err();
1067 check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)");
1068 }
1069
1070 #[test]
1071 fn test_semantic_type() {
1072 let rows = Rows {
1073 schema: vec![
1074 new_column_schema(
1075 "ts",
1076 ColumnDataType::TimestampMillisecond,
1077 SemanticType::Tag,
1078 ),
1079 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1080 ],
1081 rows: vec![Row {
1082 values: vec![ts_ms_value(1), i64_value(2)],
1083 }],
1084 };
1085 let metadata = new_region_metadata();
1086
1087 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1088 let err = request.check_schema(&metadata).unwrap_err();
1089 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1090 }
1091
1092 #[test]
1093 fn test_column_nullable() {
1094 let rows = Rows {
1095 schema: vec![
1096 new_column_schema(
1097 "ts",
1098 ColumnDataType::TimestampMillisecond,
1099 SemanticType::Timestamp,
1100 ),
1101 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1102 ],
1103 rows: vec![Row {
1104 values: vec![Value { value_data: None }, i64_value(2)],
1105 }],
1106 };
1107 let metadata = new_region_metadata();
1108
1109 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1110 let err = request.check_schema(&metadata).unwrap_err();
1111 check_invalid_request(&err, "column ts is not null but input has null");
1112 }
1113
1114 #[test]
1115 fn test_column_default() {
1116 let rows = Rows {
1117 schema: vec![new_column_schema(
1118 "k0",
1119 ColumnDataType::Int64,
1120 SemanticType::Tag,
1121 )],
1122 rows: vec![Row {
1123 values: vec![i64_value(1)],
1124 }],
1125 };
1126 let metadata = new_region_metadata();
1127
1128 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1129 let err = request.check_schema(&metadata).unwrap_err();
1130 check_invalid_request(&err, "missing column ts");
1131 }
1132
1133 #[test]
1134 fn test_unknown_column() {
1135 let rows = Rows {
1136 schema: vec![
1137 new_column_schema(
1138 "ts",
1139 ColumnDataType::TimestampMillisecond,
1140 SemanticType::Timestamp,
1141 ),
1142 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1143 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1144 ],
1145 rows: vec![Row {
1146 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1147 }],
1148 };
1149 let metadata = new_region_metadata();
1150
1151 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1152 let err = request.check_schema(&metadata).unwrap_err();
1153 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1154 }
1155
1156 #[test]
1157 fn test_fill_impure_columns_err() {
1158 let rows = Rows {
1159 schema: vec![new_column_schema(
1160 "k0",
1161 ColumnDataType::Int64,
1162 SemanticType::Tag,
1163 )],
1164 rows: vec![Row {
1165 values: vec![i64_value(1)],
1166 }],
1167 };
1168 let metadata = {
1169 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1170 builder
1171 .push_column_metadata(ColumnMetadata {
1172 column_schema: datatypes::schema::ColumnSchema::new(
1173 "ts",
1174 ConcreteDataType::timestamp_millisecond_datatype(),
1175 false,
1176 )
1177 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1178 "now()".to_string(),
1179 )))
1180 .unwrap(),
1181 semantic_type: SemanticType::Timestamp,
1182 column_id: 1,
1183 })
1184 .push_column_metadata(ColumnMetadata {
1185 column_schema: datatypes::schema::ColumnSchema::new(
1186 "k0",
1187 ConcreteDataType::int64_datatype(),
1188 true,
1189 ),
1190 semantic_type: SemanticType::Tag,
1191 column_id: 2,
1192 })
1193 .primary_key(vec![2]);
1194 builder.build().unwrap()
1195 };
1196
1197 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1198 let err = request.check_schema(&metadata).unwrap_err();
1199 assert!(err.is_fill_default());
1200 assert!(request
1201 .fill_missing_columns(&metadata)
1202 .unwrap_err()
1203 .to_string()
1204 .contains("Unexpected impure default value with region_id"));
1205 }
1206
1207 #[test]
1208 fn test_fill_missing_columns() {
1209 let rows = Rows {
1210 schema: vec![new_column_schema(
1211 "ts",
1212 ColumnDataType::TimestampMillisecond,
1213 SemanticType::Timestamp,
1214 )],
1215 rows: vec![Row {
1216 values: vec![ts_ms_value(1)],
1217 }],
1218 };
1219 let metadata = new_region_metadata();
1220
1221 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1222 let err = request.check_schema(&metadata).unwrap_err();
1223 assert!(err.is_fill_default());
1224 request.fill_missing_columns(&metadata).unwrap();
1225
1226 let expect_rows = Rows {
1227 schema: vec![new_column_schema(
1228 "ts",
1229 ColumnDataType::TimestampMillisecond,
1230 SemanticType::Timestamp,
1231 )],
1232 rows: vec![Row {
1233 values: vec![ts_ms_value(1)],
1234 }],
1235 };
1236 assert_eq!(expect_rows, request.rows);
1237 }
1238
1239 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1240 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1241 builder
1242 .push_column_metadata(ColumnMetadata {
1243 column_schema: datatypes::schema::ColumnSchema::new(
1244 "ts",
1245 ConcreteDataType::timestamp_millisecond_datatype(),
1246 false,
1247 ),
1248 semantic_type: SemanticType::Timestamp,
1249 column_id: 1,
1250 })
1251 .push_column_metadata(ColumnMetadata {
1252 column_schema: datatypes::schema::ColumnSchema::new(
1253 "k0",
1254 ConcreteDataType::int64_datatype(),
1255 true,
1256 ),
1257 semantic_type: SemanticType::Tag,
1258 column_id: 2,
1259 })
1260 .primary_key(vec![2]);
1261 builder
1262 }
1263
1264 fn region_metadata_two_fields() -> RegionMetadata {
1265 let mut builder = builder_with_ts_tag();
1266 builder
1267 .push_column_metadata(ColumnMetadata {
1268 column_schema: datatypes::schema::ColumnSchema::new(
1269 "f0",
1270 ConcreteDataType::int64_datatype(),
1271 true,
1272 ),
1273 semantic_type: SemanticType::Field,
1274 column_id: 3,
1275 })
1276 .push_column_metadata(ColumnMetadata {
1278 column_schema: datatypes::schema::ColumnSchema::new(
1279 "f1",
1280 ConcreteDataType::int64_datatype(),
1281 false,
1282 )
1283 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1284 datatypes::value::Value::Int64(100),
1285 )))
1286 .unwrap(),
1287 semantic_type: SemanticType::Field,
1288 column_id: 4,
1289 });
1290 builder.build().unwrap()
1291 }
1292
1293 #[test]
1294 fn test_fill_missing_for_delete() {
1295 let rows = Rows {
1296 schema: vec![new_column_schema(
1297 "ts",
1298 ColumnDataType::TimestampMillisecond,
1299 SemanticType::Timestamp,
1300 )],
1301 rows: vec![Row {
1302 values: vec![ts_ms_value(1)],
1303 }],
1304 };
1305 let metadata = region_metadata_two_fields();
1306
1307 let mut request =
1308 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1309 let err = request.check_schema(&metadata).unwrap_err();
1310 check_invalid_request(&err, "delete requests need column k0");
1311 let err = request.fill_missing_columns(&metadata).unwrap_err();
1312 check_invalid_request(&err, "delete requests need column k0");
1313
1314 let rows = Rows {
1315 schema: vec![
1316 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1317 new_column_schema(
1318 "ts",
1319 ColumnDataType::TimestampMillisecond,
1320 SemanticType::Timestamp,
1321 ),
1322 ],
1323 rows: vec![Row {
1324 values: vec![i64_value(100), ts_ms_value(1)],
1325 }],
1326 };
1327 let mut request =
1328 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1329 let err = request.check_schema(&metadata).unwrap_err();
1330 assert!(err.is_fill_default());
1331 request.fill_missing_columns(&metadata).unwrap();
1332
1333 let expect_rows = Rows {
1334 schema: vec![
1335 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1336 new_column_schema(
1337 "ts",
1338 ColumnDataType::TimestampMillisecond,
1339 SemanticType::Timestamp,
1340 ),
1341 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1342 ],
1343 rows: vec![Row {
1345 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1346 }],
1347 };
1348 assert_eq!(expect_rows, request.rows);
1349 }
1350
1351 #[test]
1352 fn test_fill_missing_without_default_in_delete() {
1353 let mut builder = builder_with_ts_tag();
1354 builder
1355 .push_column_metadata(ColumnMetadata {
1357 column_schema: datatypes::schema::ColumnSchema::new(
1358 "f0",
1359 ConcreteDataType::int64_datatype(),
1360 true,
1361 ),
1362 semantic_type: SemanticType::Field,
1363 column_id: 3,
1364 })
1365 .push_column_metadata(ColumnMetadata {
1367 column_schema: datatypes::schema::ColumnSchema::new(
1368 "f1",
1369 ConcreteDataType::int64_datatype(),
1370 false,
1371 ),
1372 semantic_type: SemanticType::Field,
1373 column_id: 4,
1374 });
1375 let metadata = builder.build().unwrap();
1376
1377 let rows = Rows {
1378 schema: vec![
1379 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1380 new_column_schema(
1381 "ts",
1382 ColumnDataType::TimestampMillisecond,
1383 SemanticType::Timestamp,
1384 ),
1385 ],
1386 rows: vec![Row {
1388 values: vec![i64_value(100), ts_ms_value(1)],
1389 }],
1390 };
1391 let mut request =
1392 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1393 let err = request.check_schema(&metadata).unwrap_err();
1394 assert!(err.is_fill_default());
1395 request.fill_missing_columns(&metadata).unwrap();
1396
1397 let expect_rows = Rows {
1398 schema: vec![
1399 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1400 new_column_schema(
1401 "ts",
1402 ColumnDataType::TimestampMillisecond,
1403 SemanticType::Timestamp,
1404 ),
1405 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1406 ],
1407 rows: vec![Row {
1409 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1410 }],
1411 };
1412 assert_eq!(expect_rows, request.rows);
1413 }
1414
1415 #[test]
1416 fn test_no_default() {
1417 let rows = Rows {
1418 schema: vec![new_column_schema(
1419 "k0",
1420 ColumnDataType::Int64,
1421 SemanticType::Tag,
1422 )],
1423 rows: vec![Row {
1424 values: vec![i64_value(1)],
1425 }],
1426 };
1427 let metadata = new_region_metadata();
1428
1429 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1430 let err = request.fill_missing_columns(&metadata).unwrap_err();
1431 check_invalid_request(&err, "column ts does not have default value");
1432 }
1433
1434 #[test]
1435 fn test_missing_and_invalid() {
1436 let rows = Rows {
1438 schema: vec![
1439 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1440 new_column_schema(
1441 "ts",
1442 ColumnDataType::TimestampMillisecond,
1443 SemanticType::Timestamp,
1444 ),
1445 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1446 ],
1447 rows: vec![Row {
1448 values: vec![
1449 i64_value(100),
1450 ts_ms_value(1),
1451 Value {
1452 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1453 },
1454 ],
1455 }],
1456 };
1457 let metadata = region_metadata_two_fields();
1458
1459 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1460 let err = request.check_schema(&metadata).unwrap_err();
1461 check_invalid_request(
1462 &err,
1463 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1464 );
1465 }
1466
1467 #[test]
1468 fn test_write_request_metadata() {
1469 let rows = Rows {
1470 schema: vec![
1471 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1472 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1473 ],
1474 rows: vec![Row {
1475 values: vec![i64_value(1), i64_value(2)],
1476 }],
1477 };
1478
1479 let metadata = Arc::new(new_region_metadata());
1480 let request = WriteRequest::new(
1481 RegionId::new(1, 1),
1482 OpType::Put,
1483 rows,
1484 Some(metadata.clone()),
1485 )
1486 .unwrap();
1487
1488 assert!(request.region_metadata.is_some());
1489 assert_eq!(
1490 request.region_metadata.unwrap().region_id,
1491 RegionId::new(1, 1)
1492 );
1493 }
1494}