common_grpc/flight/
do_put.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use arrow_flight::PutResult;
16use common_base::AffectedRows;
17use serde::{Deserialize, Serialize};
18use snafu::ResultExt;
19
20use crate::error::{Error, SerdeJsonSnafu};
21
22/// The metadata for "DoPut" requests and responses.
23///
24/// Currently, there's only a "request_id", for coordinating requests and responses in the streams.
25/// Client can set a unique request id in this metadata, and the server will return the same id in
26/// the corresponding response. In doing so, a client can know how to do with its pending requests.
27#[derive(Serialize, Deserialize)]
28pub struct DoPutMetadata {
29    request_id: i64,
30}
31
32impl DoPutMetadata {
33    pub fn new(request_id: i64) -> Self {
34        Self { request_id }
35    }
36
37    pub fn request_id(&self) -> i64 {
38        self.request_id
39    }
40}
41
42/// The response in the "DoPut" returned stream.
43#[derive(Serialize, Deserialize)]
44pub struct DoPutResponse {
45    /// The same "request_id" in the request; see the [DoPutMetadata].
46    request_id: i64,
47    /// The successfully ingested rows number.
48    affected_rows: AffectedRows,
49    /// The elapsed time in seconds for handling the bulk insert.
50    elapsed_secs: f64,
51}
52
53impl DoPutResponse {
54    pub fn new(request_id: i64, affected_rows: AffectedRows, elapsed_secs: f64) -> Self {
55        Self {
56            request_id,
57            affected_rows,
58            elapsed_secs,
59        }
60    }
61
62    pub fn request_id(&self) -> i64 {
63        self.request_id
64    }
65
66    pub fn affected_rows(&self) -> AffectedRows {
67        self.affected_rows
68    }
69
70    pub fn elapsed_secs(&self) -> f64 {
71        self.elapsed_secs
72    }
73}
74
75impl TryFrom<PutResult> for DoPutResponse {
76    type Error = Error;
77
78    fn try_from(value: PutResult) -> Result<Self, Self::Error> {
79        serde_json::from_slice(&value.app_metadata).context(SerdeJsonSnafu)
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86
87    #[test]
88    fn test_serde_do_put_metadata() {
89        let serialized = r#"{"request_id":42}"#;
90        let metadata = serde_json::from_str::<DoPutMetadata>(serialized).unwrap();
91        assert_eq!(metadata.request_id(), 42);
92    }
93
94    #[test]
95    fn test_serde_do_put_response() {
96        let x = DoPutResponse::new(42, 88, 0.123);
97        let serialized = serde_json::to_string(&x).unwrap();
98        assert_eq!(
99            serialized,
100            r#"{"request_id":42,"affected_rows":88,"elapsed_secs":0.123}"#
101        );
102    }
103}