metric_engine/engine/
put.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::{
18    ColumnSchema, PrimaryKeyEncoding as PrimaryKeyEncodingProto, Row, Rows, Value, WriteHint,
19};
20use common_telemetry::{error, info};
21use fxhash::FxHashMap;
22use snafu::{OptionExt, ensure};
23use store_api::codec::PrimaryKeyEncoding;
24use store_api::region_request::{
25    AffectedRows, RegionDeleteRequest, RegionPutRequest, RegionRequest,
26};
27use store_api::storage::{RegionId, TableId};
28
29use crate::engine::MetricEngineInner;
30use crate::error::{
31    ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, InvalidRequestSnafu,
32    LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
33};
34use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED};
35use crate::row_modifier::{RowsIter, TableIdInput};
36use crate::utils::to_data_region_id;
37
38impl MetricEngineInner {
39    /// Dispatch region put request
40    pub async fn put_region(
41        &self,
42        region_id: RegionId,
43        request: RegionPutRequest,
44    ) -> Result<AffectedRows> {
45        let is_putting_physical_region =
46            self.state.read().unwrap().exist_physical_region(region_id);
47
48        if is_putting_physical_region {
49            info!(
50                "Metric region received put request {request:?} on physical region {region_id:?}"
51            );
52            FORBIDDEN_OPERATION_COUNT.inc();
53
54            ForbiddenPhysicalAlterSnafu.fail()
55        } else {
56            self.put_logical_region(region_id, request).await
57        }
58    }
59
60    /// Batch write multiple logical regions to the same physical region.
61    ///
62    /// Dispatch region put requests in batch.
63    ///
64    /// Requests may span multiple physical regions. We group them by physical
65    /// region and write sequentially. This method fails fast on validation or
66    /// preparation errors within a group and stops at the first failure.
67    /// Writes in earlier physical-region groups are not rolled back if a later
68    /// group fails.
69    pub async fn put_regions_batch(
70        &self,
71        requests: impl ExactSizeIterator<Item = (RegionId, RegionPutRequest)>,
72    ) -> Result<AffectedRows> {
73        let len = requests.len();
74
75        if len == 0 {
76            return Ok(0);
77        }
78
79        let _timer = MITO_OPERATION_ELAPSED
80            .with_label_values(&["put_batch"])
81            .start_timer();
82
83        // Fast path: single request, no batching overhead
84        if len == 1 {
85            let (logical_id, req) = requests.into_iter().next().unwrap();
86            return self.put_logical_region(logical_id, req).await;
87        }
88
89        let mut requests_per_physical: HashMap<RegionId, Vec<(RegionId, RegionPutRequest)>> =
90            HashMap::new();
91        for (logical_region_id, request) in requests {
92            let physical_region_id = self.find_physical_region_id(logical_region_id)?;
93            requests_per_physical
94                .entry(physical_region_id)
95                .or_default()
96                .push((logical_region_id, request));
97        }
98
99        let mut total_affected_rows: AffectedRows = 0;
100        for (physical_region_id, requests) in requests_per_physical {
101            let affected_rows = self
102                .put_regions_batch_single_physical(physical_region_id, requests)
103                .await?;
104            total_affected_rows += affected_rows;
105        }
106
107        Ok(total_affected_rows)
108    }
109
110    /// Write a batch of requests that all belong to the same physical region.
111    ///
112    /// This function orchestrates the batch write process:
113    /// 1. Validates all requests
114    /// 2. Merges requests according to the encoding strategy (sparse or dense)
115    /// 3. Writes the merged batch to the physical region
116    async fn put_regions_batch_single_physical(
117        &self,
118        physical_region_id: RegionId,
119        requests: Vec<(RegionId, RegionPutRequest)>,
120    ) -> Result<AffectedRows> {
121        if requests.is_empty() {
122            return Ok(0);
123        }
124
125        let data_region_id = to_data_region_id(physical_region_id);
126        let primary_key_encoding = self.get_primary_key_encoding(data_region_id)?;
127
128        // Validate all requests
129        self.validate_batch_requests(physical_region_id, &requests)
130            .await?;
131
132        // Merge requests according to encoding strategy
133        let (merged_request, total_affected_rows) = match primary_key_encoding {
134            PrimaryKeyEncoding::Sparse => self.merge_sparse_batch(physical_region_id, requests)?,
135            PrimaryKeyEncoding::Dense => self.merge_dense_batch(data_region_id, requests)?,
136        };
137
138        // Write once to the physical region
139        self.data_region
140            .write_data(data_region_id, RegionRequest::Put(merged_request))
141            .await?;
142
143        Ok(total_affected_rows)
144    }
145
146    /// Get primary key encoding for a data region.
147    fn get_primary_key_encoding(&self, data_region_id: RegionId) -> Result<PrimaryKeyEncoding> {
148        let state = self.state.read().unwrap();
149        state
150            .get_primary_key_encoding(data_region_id)
151            .context(PhysicalRegionNotFoundSnafu {
152                region_id: data_region_id,
153            })
154    }
155
156    /// Validates all requests in a batch.
157    async fn validate_batch_requests(
158        &self,
159        physical_region_id: RegionId,
160        requests: &[(RegionId, RegionPutRequest)],
161    ) -> Result<()> {
162        for (logical_region_id, request) in requests {
163            self.verify_rows(*logical_region_id, physical_region_id, &request.rows)
164                .await?;
165        }
166        Ok(())
167    }
168
169    /// Merges multiple requests using sparse primary key encoding.
170    fn merge_sparse_batch(
171        &self,
172        physical_region_id: RegionId,
173        requests: Vec<(RegionId, RegionPutRequest)>,
174    ) -> Result<(RegionPutRequest, AffectedRows)> {
175        let total_rows: usize = requests.iter().map(|(_, req)| req.rows.rows.len()).sum();
176        let mut merged_rows = Vec::with_capacity(total_rows);
177        let mut total_affected_rows: AffectedRows = 0;
178        let mut output_schema: Option<Vec<ColumnSchema>> = None;
179        let mut merged_version: Option<u64> = None;
180
181        // Modify and collect rows from each request
182        for (logical_region_id, mut request) in requests {
183            if let Some(request_version) = request.partition_rule_version {
184                if let Some(merged_version) = merged_version {
185                    ensure!(
186                        merged_version == request_version,
187                        InvalidRequestSnafu {
188                            region_id: physical_region_id,
189                            reason: "inconsistent partition rule version in batch"
190                        }
191                    );
192                } else {
193                    merged_version = Some(request_version);
194                }
195            }
196            self.modify_rows(
197                physical_region_id,
198                logical_region_id.table_id(),
199                &mut request.rows,
200                PrimaryKeyEncoding::Sparse,
201            )?;
202
203            let row_count = request.rows.rows.len();
204            total_affected_rows += row_count as AffectedRows;
205
206            // Capture the output schema from the first modified request
207            if output_schema.is_none() {
208                output_schema = Some(request.rows.schema.clone());
209            }
210
211            merged_rows.extend(request.rows.rows);
212        }
213
214        // Safe to unwrap: requests is guaranteed non-empty by caller
215        let schema = output_schema.unwrap();
216
217        let merged_request = RegionPutRequest {
218            rows: Rows {
219                schema,
220                rows: merged_rows,
221            },
222            hint: Some(WriteHint {
223                primary_key_encoding: PrimaryKeyEncodingProto::Sparse.into(),
224            }),
225            partition_rule_version: merged_version,
226        };
227
228        Ok((merged_request, total_affected_rows))
229    }
230
231    /// Merges multiple requests using dense primary key encoding.
232    ///
233    /// In dense mode, different requests can have different columns.
234    /// We merge all schemas into a union schema, align each row to this schema,
235    /// then batch-modify all rows together (adding __table_id and __tsid).
236    fn merge_dense_batch(
237        &self,
238        data_region_id: RegionId,
239        requests: Vec<(RegionId, RegionPutRequest)>,
240    ) -> Result<(RegionPutRequest, AffectedRows)> {
241        // Build union schema from all requests
242        let merged_schema = Self::build_union_schema(&requests);
243
244        // Align all rows to the merged schema and collect table_ids
245        let (merged_rows, table_ids, merged_version) =
246            Self::align_requests_to_schema(requests, &merged_schema)?;
247
248        // Batch-modify all rows (add __table_id and __tsid columns)
249        let final_rows = {
250            let state = self.state.read().unwrap();
251            let name_to_id = state
252                .physical_region_states()
253                .get(&data_region_id)
254                .with_context(|| PhysicalRegionNotFoundSnafu {
255                    region_id: data_region_id,
256                })?
257                .physical_columns();
258
259            let iter = RowsIter::new(
260                Rows {
261                    schema: merged_schema,
262                    rows: merged_rows,
263                },
264                name_to_id,
265            );
266
267            self.row_modifier.modify_rows(
268                iter,
269                TableIdInput::Batch(&table_ids),
270                PrimaryKeyEncoding::Dense,
271            )?
272        };
273
274        let merged_request = RegionPutRequest {
275            rows: final_rows,
276            hint: None,
277            partition_rule_version: merged_version,
278        };
279
280        Ok((merged_request, table_ids.len() as AffectedRows))
281    }
282
283    /// Builds a union schema containing all columns from all requests.
284    fn build_union_schema(requests: &[(RegionId, RegionPutRequest)]) -> Vec<ColumnSchema> {
285        let mut schema_map: HashMap<&str, ColumnSchema> = HashMap::new();
286        for (_, request) in requests {
287            for col in &request.rows.schema {
288                schema_map
289                    .entry(col.column_name.as_str())
290                    .or_insert_with(|| col.clone());
291            }
292        }
293        schema_map.into_values().collect()
294    }
295
296    fn align_requests_to_schema(
297        requests: Vec<(RegionId, RegionPutRequest)>,
298        merged_schema: &[ColumnSchema],
299    ) -> Result<(Vec<Row>, Vec<TableId>, Option<u64>)> {
300        // Pre-calculate total capacity
301        let total_rows: usize = requests.iter().map(|(_, req)| req.rows.rows.len()).sum();
302        let mut merged_rows = Vec::with_capacity(total_rows);
303        let mut table_ids = Vec::with_capacity(total_rows);
304        let mut merged_version: Option<u64> = None;
305
306        let null_value = Value { value_data: None };
307
308        for (logical_region_id, request) in requests {
309            if let Some(request_version) = request.partition_rule_version {
310                if let Some(merged_version) = merged_version {
311                    ensure!(
312                        merged_version == request_version,
313                        InvalidRequestSnafu {
314                            region_id: logical_region_id,
315                            reason: "inconsistent partition rule version in batch"
316                        }
317                    );
318                } else {
319                    merged_version = Some(request_version);
320                }
321            }
322            let table_id = logical_region_id.table_id();
323
324            // Build column name to index mapping once per request
325            let col_name_to_idx: FxHashMap<&str, usize> = request
326                .rows
327                .schema
328                .iter()
329                .enumerate()
330                .map(|(idx, col)| (col.column_name.as_str(), idx))
331                .collect();
332
333            // Build column mapping array once per request
334            // col_mapping[i] = Some(idx) means merged_schema[i] is at request.schema[idx]
335            // col_mapping[i] = None means merged_schema[i] doesn't exist in request.schema
336            let col_mapping: Vec<Option<usize>> = merged_schema
337                .iter()
338                .map(|merged_col| {
339                    col_name_to_idx
340                        .get(merged_col.column_name.as_str())
341                        .copied()
342                })
343                .collect();
344
345            // Apply the mapping to all rows
346            for mut row in request.rows.rows {
347                let mut aligned_values = Vec::with_capacity(merged_schema.len());
348                for &opt_idx in &col_mapping {
349                    aligned_values.push(match opt_idx {
350                        Some(idx) => std::mem::take(&mut row.values[idx]),
351                        None => null_value.clone(),
352                    });
353                }
354                merged_rows.push(Row {
355                    values: aligned_values,
356                });
357                table_ids.push(table_id);
358            }
359        }
360
361        Ok((merged_rows, table_ids, merged_version))
362    }
363
364    /// Find the physical region id for a logical region.
365    fn find_physical_region_id(&self, logical_region_id: RegionId) -> Result<RegionId> {
366        let state = self.state.read().unwrap();
367        state
368            .logical_regions()
369            .get(&logical_region_id)
370            .copied()
371            .context(LogicalRegionNotFoundSnafu {
372                region_id: logical_region_id,
373            })
374    }
375
376    /// Dispatch region delete request
377    pub async fn delete_region(
378        &self,
379        region_id: RegionId,
380        request: RegionDeleteRequest,
381    ) -> Result<AffectedRows> {
382        if self.is_physical_region(region_id) {
383            info!(
384                "Metric region received delete request {request:?} on physical region {region_id:?}"
385            );
386            FORBIDDEN_OPERATION_COUNT.inc();
387
388            UnsupportedRegionRequestSnafu {
389                request: RegionRequest::Delete(request),
390            }
391            .fail()
392        } else {
393            self.delete_logical_region(region_id, request).await
394        }
395    }
396
397    async fn put_logical_region(
398        &self,
399        logical_region_id: RegionId,
400        mut request: RegionPutRequest,
401    ) -> Result<AffectedRows> {
402        let _timer = MITO_OPERATION_ELAPSED
403            .with_label_values(&["put"])
404            .start_timer();
405
406        let (physical_region_id, data_region_id, primary_key_encoding) =
407            self.find_data_region_meta(logical_region_id)?;
408
409        self.verify_rows(logical_region_id, physical_region_id, &request.rows)
410            .await?;
411
412        // write to data region
413        // TODO: retrieve table name
414        self.modify_rows(
415            physical_region_id,
416            logical_region_id.table_id(),
417            &mut request.rows,
418            primary_key_encoding,
419        )?;
420        if primary_key_encoding == PrimaryKeyEncoding::Sparse {
421            request.hint = Some(WriteHint {
422                primary_key_encoding: PrimaryKeyEncodingProto::Sparse.into(),
423            });
424        }
425        self.data_region
426            .write_data(data_region_id, RegionRequest::Put(request))
427            .await
428    }
429
430    async fn delete_logical_region(
431        &self,
432        logical_region_id: RegionId,
433        mut request: RegionDeleteRequest,
434    ) -> Result<AffectedRows> {
435        let _timer = MITO_OPERATION_ELAPSED
436            .with_label_values(&["delete"])
437            .start_timer();
438
439        let (physical_region_id, data_region_id, primary_key_encoding) =
440            self.find_data_region_meta(logical_region_id)?;
441
442        self.verify_rows(logical_region_id, physical_region_id, &request.rows)
443            .await?;
444
445        // write to data region
446        // TODO: retrieve table name
447        self.modify_rows(
448            physical_region_id,
449            logical_region_id.table_id(),
450            &mut request.rows,
451            primary_key_encoding,
452        )?;
453        if primary_key_encoding == PrimaryKeyEncoding::Sparse {
454            request.hint = Some(WriteHint {
455                primary_key_encoding: PrimaryKeyEncodingProto::Sparse.into(),
456            });
457        }
458        self.data_region
459            .write_data(data_region_id, RegionRequest::Delete(request))
460            .await
461    }
462
463    fn find_data_region_meta(
464        &self,
465        logical_region_id: RegionId,
466    ) -> Result<(RegionId, RegionId, PrimaryKeyEncoding)> {
467        let state = self.state.read().unwrap();
468        let physical_region_id = *state
469            .logical_regions()
470            .get(&logical_region_id)
471            .with_context(|| LogicalRegionNotFoundSnafu {
472                region_id: logical_region_id,
473            })?;
474        let data_region_id = to_data_region_id(physical_region_id);
475        let primary_key_encoding = state.get_primary_key_encoding(data_region_id).context(
476            PhysicalRegionNotFoundSnafu {
477                region_id: data_region_id,
478            },
479        )?;
480        Ok((physical_region_id, data_region_id, primary_key_encoding))
481    }
482
483    /// Verifies a request for a logical region against its corresponding metadata region.
484    ///
485    /// Includes:
486    /// - Check if the logical region exists
487    /// - Check if the columns exist
488    async fn verify_rows(
489        &self,
490        logical_region_id: RegionId,
491        physical_region_id: RegionId,
492        rows: &Rows,
493    ) -> Result<()> {
494        // Check if the region exists
495        let data_region_id = to_data_region_id(physical_region_id);
496        let state = self.state.read().unwrap();
497        if !state.is_logical_region_exist(logical_region_id) {
498            error!("Trying to write to an nonexistent region {logical_region_id}");
499            return LogicalRegionNotFoundSnafu {
500                region_id: logical_region_id,
501            }
502            .fail();
503        }
504
505        // Check if a physical column exists
506        let physical_columns = state
507            .physical_region_states()
508            .get(&data_region_id)
509            .context(PhysicalRegionNotFoundSnafu {
510                region_id: data_region_id,
511            })?
512            .physical_columns();
513        for col in &rows.schema {
514            ensure!(
515                physical_columns.contains_key(&col.column_name),
516                ColumnNotFoundSnafu {
517                    name: col.column_name.clone(),
518                    region_id: logical_region_id,
519                }
520            );
521        }
522
523        Ok(())
524    }
525
526    /// Perform metric engine specific logic to incoming rows.
527    /// - Add table_id column
528    /// - Generate tsid
529    fn modify_rows(
530        &self,
531        physical_region_id: RegionId,
532        table_id: TableId,
533        rows: &mut Rows,
534        encoding: PrimaryKeyEncoding,
535    ) -> Result<()> {
536        let input = std::mem::take(rows);
537        let iter = {
538            let state = self.state.read().unwrap();
539            let name_to_id = state
540                .physical_region_states()
541                .get(&physical_region_id)
542                .with_context(|| PhysicalRegionNotFoundSnafu {
543                    region_id: physical_region_id,
544                })?
545                .physical_columns();
546            RowsIter::new(input, name_to_id)
547        };
548        let output =
549            self.row_modifier
550                .modify_rows(iter, TableIdInput::Single(table_id), encoding)?;
551        *rows = output;
552        Ok(())
553    }
554}
555
556#[cfg(test)]
557mod tests {
558    use std::collections::HashSet;
559
560    use common_error::ext::ErrorExt;
561    use common_error::status_code::StatusCode;
562    use common_function::utils::partition_rule_version;
563    use common_recordbatch::RecordBatches;
564    use datatypes::value::Value as PartitionValue;
565    use partition::expr::col;
566    use store_api::metric_engine_consts::{
567        DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
568        MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING,
569    };
570    use store_api::path_utils::table_dir;
571    use store_api::region_engine::RegionEngine;
572    use store_api::region_request::{EnterStagingRequest, RegionRequest};
573    use store_api::storage::ScanRequest;
574    use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
575
576    use super::*;
577    use crate::test_util::{self, TestEnv};
578
579    fn assert_merged_schema(rows: &Rows, expect_sparse: bool) {
580        let column_names: HashSet<String> = rows
581            .schema
582            .iter()
583            .map(|col| col.column_name.clone())
584            .collect();
585
586        if expect_sparse {
587            assert!(
588                column_names.contains(PRIMARY_KEY_COLUMN_NAME),
589                "sparse encoding should include primary key column"
590            );
591            assert!(
592                !column_names.contains(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
593                "sparse encoding should not include table id column"
594            );
595            assert!(
596                !column_names.contains(DATA_SCHEMA_TSID_COLUMN_NAME),
597                "sparse encoding should not include tsid column"
598            );
599            assert!(
600                !column_names.contains("job"),
601                "sparse encoding should not include tag columns"
602            );
603            assert!(
604                !column_names.contains("instance"),
605                "sparse encoding should not include tag columns"
606            );
607        } else {
608            assert!(
609                !column_names.contains(PRIMARY_KEY_COLUMN_NAME),
610                "dense encoding should not include primary key column"
611            );
612            assert!(
613                column_names.contains(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
614                "dense encoding should include table id column"
615            );
616            assert!(
617                column_names.contains(DATA_SCHEMA_TSID_COLUMN_NAME),
618                "dense encoding should include tsid column"
619            );
620            assert!(
621                column_names.contains("job"),
622                "dense encoding should keep tag columns"
623            );
624            assert!(
625                column_names.contains("instance"),
626                "dense encoding should keep tag columns"
627            );
628        }
629    }
630
631    fn job_partition_expr_json() -> String {
632        let expr = col("job")
633            .gt_eq(PartitionValue::String("job-0".into()))
634            .and(col("job").lt(PartitionValue::String("job-9".into())));
635        expr.as_json_str().unwrap()
636    }
637
638    async fn create_logical_region_with_tags(
639        env: &TestEnv,
640        physical_region_id: RegionId,
641        logical_region_id: RegionId,
642        tags: &[&str],
643    ) {
644        let region_create_request = test_util::create_logical_region_request(
645            tags,
646            physical_region_id,
647            &table_dir("test", logical_region_id.table_id()),
648        );
649        env.metric()
650            .handle_request(
651                logical_region_id,
652                RegionRequest::Create(region_create_request),
653            )
654            .await
655            .unwrap();
656    }
657
658    async fn run_batch_write_with_schema_variants(
659        env: &TestEnv,
660        physical_region_id: RegionId,
661        options: Vec<(String, String)>,
662        expect_sparse: bool,
663    ) {
664        env.create_physical_region(physical_region_id, &TestEnv::default_table_dir(), options)
665            .await;
666
667        let logical_region_1 = env.default_logical_region_id();
668        let logical_region_2 = RegionId::new(1024, 1);
669
670        create_logical_region_with_tags(env, physical_region_id, logical_region_1, &["job"]).await;
671        create_logical_region_with_tags(
672            env,
673            physical_region_id,
674            logical_region_2,
675            &["job", "instance"],
676        )
677        .await;
678
679        let schema_1 = test_util::row_schema_with_tags(&["job"]);
680        let schema_2 = test_util::row_schema_with_tags(&["job", "instance"]);
681
682        let data_region_id = RegionId::new(physical_region_id.table_id(), 2);
683        let primary_key_encoding = env
684            .metric()
685            .inner
686            .get_primary_key_encoding(data_region_id)
687            .unwrap();
688        assert_eq!(
689            primary_key_encoding,
690            if expect_sparse {
691                PrimaryKeyEncoding::Sparse
692            } else {
693                PrimaryKeyEncoding::Dense
694            }
695        );
696
697        let build_requests = || {
698            let rows_1 = test_util::build_rows(1, 3);
699            let rows_2 = test_util::build_rows(2, 2);
700
701            vec![
702                (
703                    logical_region_1,
704                    RegionPutRequest {
705                        rows: Rows {
706                            schema: schema_1.clone(),
707                            rows: rows_1,
708                        },
709                        hint: None,
710                        partition_rule_version: None,
711                    },
712                ),
713                (
714                    logical_region_2,
715                    RegionPutRequest {
716                        rows: Rows {
717                            schema: schema_2.clone(),
718                            rows: rows_2,
719                        },
720                        hint: None,
721                        partition_rule_version: None,
722                    },
723                ),
724            ]
725        };
726
727        let merged_request = if expect_sparse {
728            let (merged_request, _) = env
729                .metric()
730                .inner
731                .merge_sparse_batch(physical_region_id, build_requests())
732                .unwrap();
733            let hint = merged_request
734                .hint
735                .as_ref()
736                .expect("missing sparse write hint");
737            assert_eq!(
738                hint.primary_key_encoding,
739                PrimaryKeyEncodingProto::Sparse as i32
740            );
741            merged_request
742        } else {
743            let (merged_request, _) = env
744                .metric()
745                .inner
746                .merge_dense_batch(data_region_id, build_requests())
747                .unwrap();
748            assert!(merged_request.hint.is_none());
749            merged_request
750        };
751
752        assert_merged_schema(&merged_request.rows, expect_sparse);
753
754        let affected_rows = env
755            .metric()
756            .inner
757            .put_regions_batch(build_requests().into_iter())
758            .await
759            .unwrap();
760        assert_eq!(affected_rows, 5);
761
762        let request = ScanRequest::default();
763        let stream = env
764            .mito()
765            .scan_to_stream(data_region_id, request)
766            .await
767            .unwrap();
768        let batches = RecordBatches::try_collect(stream).await.unwrap();
769
770        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 5);
771    }
772
773    #[tokio::test]
774    async fn test_write_logical_region() {
775        let env = TestEnv::new().await;
776        env.init_metric_region().await;
777
778        // prepare data
779        let schema = test_util::row_schema_with_tags(&["job"]);
780        let rows = test_util::build_rows(1, 5);
781        let request = RegionRequest::Put(RegionPutRequest {
782            rows: Rows { schema, rows },
783            hint: None,
784            partition_rule_version: None,
785        });
786
787        // write data
788        let logical_region_id = env.default_logical_region_id();
789        let result = env
790            .metric()
791            .handle_request(logical_region_id, request)
792            .await
793            .unwrap();
794        assert_eq!(result.affected_rows, 5);
795
796        // read data from physical region
797        let physical_region_id = env.default_physical_region_id();
798        let request = ScanRequest::default();
799        let stream = env
800            .metric()
801            .scan_to_stream(physical_region_id, request)
802            .await
803            .unwrap();
804        let batches = RecordBatches::try_collect(stream).await.unwrap();
805        let expected = "\
806+-------------------------+----------------+------------+---------------------+-------+
807| greptime_timestamp      | greptime_value | __table_id | __tsid              | job   |
808+-------------------------+----------------+------------+---------------------+-------+
809| 1970-01-01T00:00:00     | 0.0            | 3          | 2955007454552897459 | tag_0 |
810| 1970-01-01T00:00:00.001 | 1.0            | 3          | 2955007454552897459 | tag_0 |
811| 1970-01-01T00:00:00.002 | 2.0            | 3          | 2955007454552897459 | tag_0 |
812| 1970-01-01T00:00:00.003 | 3.0            | 3          | 2955007454552897459 | tag_0 |
813| 1970-01-01T00:00:00.004 | 4.0            | 3          | 2955007454552897459 | tag_0 |
814+-------------------------+----------------+------------+---------------------+-------+";
815        assert_eq!(expected, batches.pretty_print().unwrap(), "physical region");
816
817        // read data from logical region
818        let request = ScanRequest::default();
819        let stream = env
820            .metric()
821            .scan_to_stream(logical_region_id, request)
822            .await
823            .unwrap();
824        let batches = RecordBatches::try_collect(stream).await.unwrap();
825        let expected = "\
826+-------------------------+----------------+-------+
827| greptime_timestamp      | greptime_value | job   |
828+-------------------------+----------------+-------+
829| 1970-01-01T00:00:00     | 0.0            | tag_0 |
830| 1970-01-01T00:00:00.001 | 1.0            | tag_0 |
831| 1970-01-01T00:00:00.002 | 2.0            | tag_0 |
832| 1970-01-01T00:00:00.003 | 3.0            | tag_0 |
833| 1970-01-01T00:00:00.004 | 4.0            | tag_0 |
834+-------------------------+----------------+-------+";
835        assert_eq!(expected, batches.pretty_print().unwrap(), "logical region");
836    }
837
838    #[tokio::test]
839    async fn test_write_logical_region_row_count() {
840        let env = TestEnv::new().await;
841        env.init_metric_region().await;
842        let engine = env.metric();
843
844        // add columns
845        let logical_region_id = env.default_logical_region_id();
846        let columns = &["odd", "even", "Ev_En"];
847        let alter_request = test_util::alter_logical_region_add_tag_columns(123456, columns);
848        engine
849            .handle_request(logical_region_id, RegionRequest::Alter(alter_request))
850            .await
851            .unwrap();
852
853        // prepare data
854        let schema = test_util::row_schema_with_tags(columns);
855        let rows = test_util::build_rows(3, 100);
856        let request = RegionRequest::Put(RegionPutRequest {
857            rows: Rows { schema, rows },
858            hint: None,
859            partition_rule_version: None,
860        });
861
862        // write data
863        let result = engine
864            .handle_request(logical_region_id, request)
865            .await
866            .unwrap();
867        assert_eq!(100, result.affected_rows);
868    }
869
870    #[tokio::test]
871    async fn test_write_physical_region() {
872        let env = TestEnv::new().await;
873        env.init_metric_region().await;
874        let engine = env.metric();
875
876        let physical_region_id = env.default_physical_region_id();
877        let schema = test_util::row_schema_with_tags(&["abc"]);
878        let rows = test_util::build_rows(1, 100);
879        let request = RegionRequest::Put(RegionPutRequest {
880            rows: Rows { schema, rows },
881            hint: None,
882            partition_rule_version: None,
883        });
884
885        engine
886            .handle_request(physical_region_id, request)
887            .await
888            .unwrap_err();
889    }
890
891    #[tokio::test]
892    async fn test_write_nonexist_logical_region() {
893        let env = TestEnv::new().await;
894        env.init_metric_region().await;
895        let engine = env.metric();
896
897        let logical_region_id = RegionId::new(175, 8345);
898        let schema = test_util::row_schema_with_tags(&["def"]);
899        let rows = test_util::build_rows(1, 100);
900        let request = RegionRequest::Put(RegionPutRequest {
901            rows: Rows { schema, rows },
902            hint: None,
903            partition_rule_version: None,
904        });
905
906        engine
907            .handle_request(logical_region_id, request)
908            .await
909            .unwrap_err();
910    }
911
912    #[tokio::test]
913    async fn test_batch_write_multiple_logical_regions() {
914        let env = TestEnv::new().await;
915        env.init_metric_region().await;
916        let engine = env.metric();
917
918        // Create two additional logical regions
919        let physical_region_id = env.default_physical_region_id();
920        let logical_region_1 = env.default_logical_region_id();
921        let logical_region_2 = RegionId::new(1024, 1);
922        let logical_region_3 = RegionId::new(1024, 2);
923
924        env.create_logical_region(physical_region_id, logical_region_2)
925            .await;
926        env.create_logical_region(physical_region_id, logical_region_3)
927            .await;
928
929        // Prepare batch requests with non-overlapping timestamps
930        let schema = test_util::row_schema_with_tags(&["job"]);
931
932        // Use build_rows_with_ts to create non-overlapping timestamps
933        // logical_region_1: ts 0, 1, 2
934        // logical_region_2: ts 10, 11  (offset to avoid overlap)
935        // logical_region_3: ts 20, 21, 22, 23, 24  (offset to avoid overlap)
936        let rows1 = test_util::build_rows(1, 3);
937        let mut rows2 = test_util::build_rows(1, 2);
938        let mut rows3 = test_util::build_rows(1, 5);
939
940        // Adjust timestamps to avoid conflicts
941        use api::v1::value::ValueData;
942        for (i, row) in rows2.iter_mut().enumerate() {
943            if let Some(ValueData::TimestampMillisecondValue(ts)) =
944                row.values.get_mut(0).and_then(|v| v.value_data.as_mut())
945            {
946                *ts = (10 + i) as i64;
947            }
948        }
949        for (i, row) in rows3.iter_mut().enumerate() {
950            if let Some(ValueData::TimestampMillisecondValue(ts)) =
951                row.values.get_mut(0).and_then(|v| v.value_data.as_mut())
952            {
953                *ts = (20 + i) as i64;
954            }
955        }
956
957        let requests = vec![
958            (
959                logical_region_1,
960                RegionPutRequest {
961                    rows: Rows {
962                        schema: schema.clone(),
963                        rows: rows1,
964                    },
965                    hint: None,
966                    partition_rule_version: None,
967                },
968            ),
969            (
970                logical_region_2,
971                RegionPutRequest {
972                    rows: Rows {
973                        schema: schema.clone(),
974                        rows: rows2,
975                    },
976                    hint: None,
977                    partition_rule_version: None,
978                },
979            ),
980            (
981                logical_region_3,
982                RegionPutRequest {
983                    rows: Rows {
984                        schema: schema.clone(),
985                        rows: rows3,
986                    },
987                    hint: None,
988                    partition_rule_version: None,
989                },
990            ),
991        ];
992
993        // Batch write
994        let affected_rows = engine
995            .inner
996            .put_regions_batch(requests.into_iter())
997            .await
998            .unwrap();
999        assert_eq!(affected_rows, 10);
1000
1001        // Verify physical region contains data from all logical regions
1002        let request = ScanRequest::default();
1003        let stream = env
1004            .metric()
1005            .scan_to_stream(physical_region_id, request)
1006            .await
1007            .unwrap();
1008        let batches = RecordBatches::try_collect(stream).await.unwrap();
1009
1010        // Should have 3 + 2 + 5 = 10 rows total
1011        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 10);
1012    }
1013
1014    #[tokio::test]
1015    async fn test_batch_write_with_partial_failure() {
1016        let env = TestEnv::new().await;
1017        env.init_metric_region().await;
1018        let engine = env.metric();
1019
1020        let physical_region_id = env.default_physical_region_id();
1021        let logical_region_1 = env.default_logical_region_id();
1022        let logical_region_2 = RegionId::new(1024, 1);
1023        let nonexistent_region = RegionId::new(9999, 9999);
1024
1025        env.create_logical_region(physical_region_id, logical_region_2)
1026            .await;
1027
1028        // Prepare batch with one invalid region
1029        let schema = test_util::row_schema_with_tags(&["job"]);
1030        let requests = vec![
1031            (
1032                logical_region_1,
1033                RegionPutRequest {
1034                    rows: Rows {
1035                        schema: schema.clone(),
1036                        rows: test_util::build_rows(1, 3),
1037                    },
1038                    hint: None,
1039                    partition_rule_version: None,
1040                },
1041            ),
1042            (
1043                nonexistent_region,
1044                RegionPutRequest {
1045                    rows: Rows {
1046                        schema: schema.clone(),
1047                        rows: test_util::build_rows(1, 2),
1048                    },
1049                    hint: None,
1050                    partition_rule_version: None,
1051                },
1052            ),
1053            (
1054                logical_region_2,
1055                RegionPutRequest {
1056                    rows: Rows {
1057                        schema: schema.clone(),
1058                        rows: test_util::build_rows(1, 5),
1059                    },
1060                    hint: None,
1061                    partition_rule_version: None,
1062                },
1063            ),
1064        ];
1065
1066        // Batch write
1067        let result = engine.inner.put_regions_batch(requests.into_iter()).await;
1068        assert!(result.is_err());
1069
1070        // Invalid region is detected before any write, so the physical region remains empty.
1071        // Fail-fast is per physical-region group; cross-group partial success is possible.
1072        let request = ScanRequest::default();
1073        let stream = env
1074            .metric()
1075            .scan_to_stream(physical_region_id, request)
1076            .await
1077            .unwrap();
1078        let batches = RecordBatches::try_collect(stream).await.unwrap();
1079
1080        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 0);
1081    }
1082
1083    #[tokio::test]
1084    async fn test_batch_write_single_request_fast_path() {
1085        let env = TestEnv::new().await;
1086        env.init_metric_region().await;
1087        let engine = env.metric();
1088
1089        let logical_region_id = env.default_logical_region_id();
1090        let schema = test_util::row_schema_with_tags(&["job"]);
1091
1092        // Single request should use fast path
1093        let requests = vec![(
1094            logical_region_id,
1095            RegionPutRequest {
1096                rows: Rows {
1097                    schema,
1098                    rows: test_util::build_rows(1, 5),
1099                },
1100                hint: None,
1101                partition_rule_version: None,
1102            },
1103        )];
1104
1105        let affected_rows = engine
1106            .inner
1107            .put_regions_batch(requests.into_iter())
1108            .await
1109            .unwrap();
1110        assert_eq!(affected_rows, 5);
1111    }
1112
1113    #[tokio::test]
1114    async fn test_batch_write_empty_requests() {
1115        let env = TestEnv::new().await;
1116        env.init_metric_region().await;
1117        let engine = env.metric();
1118
1119        // Empty batch should return zero affected rows
1120        let requests = vec![];
1121        let affected_rows = engine
1122            .inner
1123            .put_regions_batch(requests.into_iter())
1124            .await
1125            .unwrap();
1126
1127        assert_eq!(affected_rows, 0);
1128    }
1129
1130    #[tokio::test]
1131    async fn test_batch_write_sparse_encoding() {
1132        let env = TestEnv::new().await;
1133        let physical_region_id = env.default_physical_region_id();
1134
1135        run_batch_write_with_schema_variants(
1136            &env,
1137            physical_region_id,
1138            vec![(
1139                MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
1140                "sparse".to_string(),
1141            )],
1142            true,
1143        )
1144        .await;
1145    }
1146
1147    #[tokio::test]
1148    async fn test_batch_write_dense_encoding() {
1149        let env = TestEnv::new().await;
1150        let physical_region_id = env.default_physical_region_id();
1151
1152        run_batch_write_with_schema_variants(
1153            &env,
1154            physical_region_id,
1155            vec![(
1156                MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
1157                "dense".to_string(),
1158            )],
1159            false,
1160        )
1161        .await;
1162    }
1163
1164    #[tokio::test]
1165    async fn test_metric_put_rejects_bad_partition_rule_version() {
1166        let env = TestEnv::new().await;
1167        env.init_metric_region().await;
1168
1169        let logical_region_id = env.default_logical_region_id();
1170        let rows = Rows {
1171            schema: test_util::row_schema_with_tags(&["job"]),
1172            rows: test_util::build_rows(1, 3),
1173        };
1174
1175        let err = env
1176            .metric()
1177            .handle_request(
1178                logical_region_id,
1179                RegionRequest::Put(RegionPutRequest {
1180                    rows,
1181                    hint: None,
1182                    partition_rule_version: Some(1),
1183                }),
1184            )
1185            .await
1186            .unwrap_err();
1187
1188        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
1189    }
1190
1191    #[tokio::test]
1192    async fn test_metric_put_respects_staging_partition_rule_version() {
1193        let env = TestEnv::new().await;
1194        env.init_metric_region().await;
1195
1196        let logical_region_id = env.default_logical_region_id();
1197        let physical_region_id = env.default_physical_region_id();
1198        let partition_expr = job_partition_expr_json();
1199        env.metric()
1200            .handle_request(
1201                physical_region_id,
1202                RegionRequest::EnterStaging(EnterStagingRequest {
1203                    partition_expr: partition_expr.clone(),
1204                }),
1205            )
1206            .await
1207            .unwrap();
1208
1209        let expected_version = partition_rule_version(Some(&partition_expr));
1210        let rows = Rows {
1211            schema: test_util::row_schema_with_tags(&["job"]),
1212            rows: test_util::build_rows(1, 3),
1213        };
1214
1215        let err = env
1216            .metric()
1217            .handle_request(
1218                logical_region_id,
1219                RegionRequest::Put(RegionPutRequest {
1220                    rows: rows.clone(),
1221                    hint: None,
1222                    partition_rule_version: Some(expected_version.wrapping_add(1)),
1223                }),
1224            )
1225            .await
1226            .unwrap_err();
1227        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
1228
1229        let response = env
1230            .metric()
1231            .handle_request(
1232                logical_region_id,
1233                RegionRequest::Put(RegionPutRequest {
1234                    rows: rows.clone(),
1235                    hint: None,
1236                    partition_rule_version: None,
1237                }),
1238            )
1239            .await
1240            .unwrap();
1241        assert_eq!(response.affected_rows, 3);
1242
1243        let response = env
1244            .metric()
1245            .handle_request(
1246                logical_region_id,
1247                RegionRequest::Put(RegionPutRequest {
1248                    rows,
1249                    hint: None,
1250                    partition_rule_version: Some(expected_version),
1251                }),
1252            )
1253            .await
1254            .unwrap();
1255        assert_eq!(response.affected_rows, 3);
1256    }
1257}