common_grpc/flight/
do_put.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use arrow_flight::PutResult;
16use common_base::AffectedRows;
17use serde::{Deserialize, Serialize};
18use snafu::ResultExt;
19
20use crate::error::{Error, SerdeJsonSnafu};
21
22/// The metadata for "DoPut" requests and responses.
23///
24/// Currently, there's only a "request_id", for coordinating requests and responses in the streams.
25/// Client can set a unique request id in this metadata, and the server will return the same id in
26/// the corresponding response. In doing so, a client can know how to do with its pending requests.
27#[derive(Serialize, Deserialize)]
28pub struct DoPutMetadata {
29    request_id: i64,
30}
31
32impl DoPutMetadata {
33    pub fn new(request_id: i64) -> Self {
34        Self { request_id }
35    }
36
37    pub fn request_id(&self) -> i64 {
38        self.request_id
39    }
40}
41
42/// The response in the "DoPut" returned stream.
43#[derive(Serialize, Deserialize)]
44pub struct DoPutResponse {
45    /// The same "request_id" in the request; see the [DoPutMetadata].
46    request_id: i64,
47    /// The successfully ingested rows number.
48    affected_rows: AffectedRows,
49}
50
51impl DoPutResponse {
52    pub fn new(request_id: i64, affected_rows: AffectedRows) -> Self {
53        Self {
54            request_id,
55            affected_rows,
56        }
57    }
58
59    pub fn request_id(&self) -> i64 {
60        self.request_id
61    }
62
63    pub fn affected_rows(&self) -> AffectedRows {
64        self.affected_rows
65    }
66}
67
68impl TryFrom<PutResult> for DoPutResponse {
69    type Error = Error;
70
71    fn try_from(value: PutResult) -> Result<Self, Self::Error> {
72        serde_json::from_slice(&value.app_metadata).context(SerdeJsonSnafu)
73    }
74}
75
76#[cfg(test)]
77mod tests {
78    use super::*;
79
80    #[test]
81    fn test_serde_do_put_metadata() {
82        let serialized = r#"{"request_id":42}"#;
83        let metadata = serde_json::from_str::<DoPutMetadata>(serialized).unwrap();
84        assert_eq!(metadata.request_id(), 42);
85    }
86
87    #[test]
88    fn test_serde_do_put_response() {
89        let x = DoPutResponse::new(42, 88);
90        let serialized = serde_json::to_string(&x).unwrap();
91        assert_eq!(serialized, r#"{"request_id":42,"affected_rows":88}"#);
92    }
93}