pipeline/etl/processor/
cmcd.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Pipeline Processor for CMCD (Common Media Client Data) data.
16//!
17//! Refer to [`CmcdProcessor`] for more information.
18
19use std::collections::BTreeMap;
20
21use snafu::{OptionExt, ResultExt};
22use urlencoding::decode;
23
24use crate::error::{
25    CmcdMissingKeySnafu, CmcdMissingValueSnafu, Error, FailedToParseFloatKeySnafu,
26    FailedToParseIntKeySnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
27    ProcessorMissingFieldSnafu, Result,
28};
29use crate::etl::field::Fields;
30use crate::etl::processor::{
31    yaml_bool, yaml_new_field, yaml_new_fields, Processor, FIELDS_NAME, FIELD_NAME,
32    IGNORE_MISSING_NAME,
33};
34use crate::etl::value::Value;
35
36pub(crate) const PROCESSOR_CMCD: &str = "cmcd";
37
38const CMCD_KEY_BR: &str = "br"; // Encoded bitrate, Integer kbps
39const CMCD_KEY_BL: &str = "bl"; // Buffer length, Integer milliseconds
40const CMCD_KEY_BS: &str = "bs"; // Buffer starvation, Boolean
41const CMCD_KEY_CID: &str = "cid"; // Content ID, String
42const CMCD_KEY_D: &str = "d"; // Object duration, Integer milliseconds
43const CMCD_KEY_DL: &str = "dl"; // Deadline, Integer milliseconds
44const CMCD_KEY_MTP: &str = "mtp"; // Measured throughput, Integer kbps
45const CMCD_KEY_NOR: &str = "nor"; // Next object request, String
46const CMCD_KEY_NRR: &str = "nrr"; // Next request range, String, "<range-start>-<range-end>"
47const CMCD_KEY_OT: &str = "ot"; // Object type, Token - one of [m,a,v,av,i,c,tt,k,o]
48const CMCD_KEY_PR: &str = "pr"; // Playback rate, Decimal
49const CMCD_KEY_RTP: &str = "rtp"; // Requested maximum throughput, Integer kbps
50const CMCD_KEY_SF: &str = "sf"; // Stall frequency, Token - one of [d,h,s,o]
51const CMCD_KEY_SID: &str = "sid"; // Session ID, String
52const CMCD_KEY_ST: &str = "st"; // Stream type, Token - one of [v,l]
53const CMCD_KEY_SU: &str = "su"; // Startup, Boolean
54const CMCD_KEY_TB: &str = "tb"; // Top bitrate, Integer kbps
55const CMCD_KEY_V: &str = "v"; // Version
56
57const CMCD_KEYS: [&str; 18] = [
58    CMCD_KEY_BR,
59    CMCD_KEY_BL,
60    CMCD_KEY_BS,
61    CMCD_KEY_CID,
62    CMCD_KEY_D,
63    CMCD_KEY_DL,
64    CMCD_KEY_MTP,
65    CMCD_KEY_NOR,
66    CMCD_KEY_NRR,
67    CMCD_KEY_OT,
68    CMCD_KEY_PR,
69    CMCD_KEY_RTP,
70    CMCD_KEY_SF,
71    CMCD_KEY_SID,
72    CMCD_KEY_ST,
73    CMCD_KEY_SU,
74    CMCD_KEY_TB,
75    CMCD_KEY_V,
76];
77
78/// function to resolve CMCD_KEY_BS | CMCD_KEY_SU
79fn bs_su(_: &str, _: &str, _: Option<&str>) -> Result<Value> {
80    Ok(Value::Boolean(true))
81}
82
83/// function to resolve CMCD_KEY_BR | CMCD_KEY_BL | CMCD_KEY_D | CMCD_KEY_DL | CMCD_KEY_MTP | CMCD_KEY_RTP | CMCD_KEY_TB
84fn br_tb(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
85    let v = v.context(CmcdMissingValueSnafu { k, s })?;
86    let val: i64 = v
87        .parse()
88        .context(FailedToParseIntKeySnafu { key: k, value: v })?;
89    Ok(Value::Int64(val))
90}
91
92/// function to resolve CMCD_KEY_CID | CMCD_KEY_NRR | CMCD_KEY_OT | CMCD_KEY_SF | CMCD_KEY_SID | CMCD_KEY_V
93fn cid_v(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
94    let v = v.context(CmcdMissingValueSnafu { k, s })?;
95    Ok(Value::String(v.to_string()))
96}
97
98/// function to resolve CMCD_KEY_NOR
99fn nor(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
100    let v = v.context(CmcdMissingValueSnafu { k, s })?;
101    let val = match decode(v) {
102        Ok(val) => val.to_string(),
103        Err(_) => v.to_string(),
104    };
105    Ok(Value::String(val))
106}
107
108/// function to resolve CMCD_KEY_PR
109fn pr(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
110    let v = v.context(CmcdMissingValueSnafu { k, s })?;
111    let val: f64 = v
112        .parse()
113        .context(FailedToParseFloatKeySnafu { key: k, value: v })?;
114    Ok(Value::Float64(val))
115}
116
117/// Common Media Client Data Specification:
118/// https://cdn.cta.tech/cta/media/media/resources/standards/pdfs/cta-5004-final.pdf
119///
120///
121/// The data payload for Header and Query Argument transmission consists of a series of
122/// key/value pairs constructed according to the following rules:
123/// 1. All information in the payload MUST be represented as <key>=<value> pairs.
124/// 2. The key and value MUST be separated by an equals sign Unicode 0x3D. If the
125///    value type is BOOLEAN and the value is TRUE, then the equals sign and the value
126///    MUST be omitted.
127/// 3. Successive key/value pairs MUST be delimited by a comma Unicode 0x2C.
128/// 4. The key names described in this specification are reserved. Custom key names
129///    may be used, but they MUST carry a hyphenated prefix to ensure that there will
130///    not be a namespace collision with future revisions to this specification. Clients
131///    SHOULD use a reverse-DNS syntax when defining their own prefix.
132/// 5. If headers are used for data transmission, then custom keys SHOULD be
133///    allocated to one of the four defined header names based upon their expected
134///    level of variability:
135///       a. CMCD-Request: keys whose values vary with each request.
136///       b. CMCD-Object: keys whose values vary with the object being requested.
137///       c. CMCD-Status: keys whose values do not vary with every request or object.
138///       d. CMCD-Session: keys whose values are expected to be invariant over the life of the session.
139/// 6. All key names are case-sensitive.
140/// 7. Any value of type String MUST be enclosed by opening and closing double
141///    quotes Unicode 0x22. Double quotes and backslashes MUST be escaped using a
142///    backslash "\" Unicode 0x5C character. Any value of type Token does not require
143///    quoting.
144/// 8. All keys are OPTIONAL.
145/// 9. Key-value pairs SHOULD be sequenced in alphabetical order of the key name in
146///    order to reduce the fingerprinting surface exposed by the player.
147/// 10. If the data payload is transmitted as a query argument, then the entire payload
148///     string MUST be URLEncoded per [5]. Data payloads transmitted via headers
149///     MUST NOT be URLEncoded.
150/// 11. The data payload syntax is intended to be compliant with Structured Field Values for HTTP [6].
151/// 12. Transport Layer Security SHOULD be used to protect all transmission of CMCD data.
152#[derive(Debug, Default)]
153pub struct CmcdProcessor {
154    fields: Fields,
155    ignore_missing: bool,
156}
157
158impl CmcdProcessor {
159    fn generate_key(prefix: &str, key: &str) -> String {
160        format!("{}_{}", prefix, key)
161    }
162
163    fn parse(&self, name: &str, value: &str) -> Result<BTreeMap<String, Value>> {
164        let mut working_set = BTreeMap::new();
165
166        let parts = value.split(',');
167
168        for part in parts {
169            let mut kv = part.split('=');
170            let k = kv.next().context(CmcdMissingKeySnafu { part, s: value })?;
171            let v = kv.next();
172
173            for cmcd_key in CMCD_KEYS {
174                if cmcd_key == k {
175                    match cmcd_key {
176                        CMCD_KEY_BS | CMCD_KEY_SU => {
177                            working_set
178                                .insert(Self::generate_key(name, cmcd_key), bs_su(value, k, v)?);
179                        }
180                        CMCD_KEY_BR | CMCD_KEY_BL | CMCD_KEY_D | CMCD_KEY_DL | CMCD_KEY_MTP
181                        | CMCD_KEY_RTP | CMCD_KEY_TB => {
182                            working_set
183                                .insert(Self::generate_key(name, cmcd_key), br_tb(value, k, v)?);
184                        }
185                        CMCD_KEY_CID | CMCD_KEY_NRR | CMCD_KEY_OT | CMCD_KEY_SF | CMCD_KEY_SID
186                        | CMCD_KEY_ST | CMCD_KEY_V => {
187                            working_set
188                                .insert(Self::generate_key(name, cmcd_key), cid_v(value, k, v)?);
189                        }
190                        CMCD_KEY_NOR => {
191                            working_set
192                                .insert(Self::generate_key(name, cmcd_key), nor(value, k, v)?);
193                        }
194                        CMCD_KEY_PR => {
195                            working_set
196                                .insert(Self::generate_key(name, cmcd_key), pr(value, k, v)?);
197                        }
198
199                        _ => {}
200                    }
201                }
202            }
203        }
204        Ok(working_set)
205    }
206}
207
208impl TryFrom<&yaml_rust::yaml::Hash> for CmcdProcessor {
209    type Error = Error;
210
211    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
212        let mut fields = Fields::default();
213        let mut ignore_missing = false;
214
215        for (k, v) in value.iter() {
216            let key = k
217                .as_str()
218                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
219            match key {
220                FIELD_NAME => {
221                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
222                }
223                FIELDS_NAME => {
224                    fields = yaml_new_fields(v, FIELDS_NAME)?;
225                }
226
227                IGNORE_MISSING_NAME => {
228                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
229                }
230
231                _ => {}
232            }
233        }
234
235        let proc = CmcdProcessor {
236            fields,
237            ignore_missing,
238        };
239
240        Ok(proc)
241    }
242}
243
244impl Processor for CmcdProcessor {
245    fn kind(&self) -> &str {
246        PROCESSOR_CMCD
247    }
248
249    fn ignore_missing(&self) -> bool {
250        self.ignore_missing
251    }
252
253    fn exec_mut(&self, mut val: Value) -> Result<Value> {
254        for field in self.fields.iter() {
255            let name = field.input_field();
256
257            match val.get(name) {
258                Some(Value::String(s)) => {
259                    let results = self.parse(field.target_or_input_field(), s)?;
260                    val.extend(results.into())?;
261                }
262                Some(Value::Null) | None => {
263                    if !self.ignore_missing {
264                        return ProcessorMissingFieldSnafu {
265                            processor: self.kind().to_string(),
266                            field: name.to_string(),
267                        }
268                        .fail();
269                    }
270                }
271                Some(v) => {
272                    return ProcessorExpectStringSnafu {
273                        processor: self.kind().to_string(),
274                        v: v.clone(),
275                    }
276                    .fail();
277                }
278            }
279        }
280
281        Ok(val)
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use urlencoding::decode;
288
289    use super::*;
290    use crate::etl::field::{Field, Fields};
291    use crate::etl::value::Value;
292
293    #[test]
294    fn test_cmcd() {
295        let ss = [
296            (
297                "sid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
298                vec![(
299                    "prefix_sid",
300                    Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
301                )],
302            ),
303            (
304                "br%3D3200%2Cbs%2Cd%3D4004%2Cmtp%3D25400%2Cot%3Dv%2Crtp%3D15000%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22%2Ctb%3D6000",
305                vec![
306                    ("prefix_bs", Value::Boolean(true)),
307                    ("prefix_ot", Value::String("v".into())),
308                    ("prefix_rtp", Value::Int64(15000)),
309                    ("prefix_br", Value::Int64(3200)),
310                    ("prefix_tb", Value::Int64(6000)),
311                    ("prefix_d", Value::Int64(4004)),
312                    (
313                        "prefix_sid",
314                        Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
315                    ),
316                    ("prefix_mtp", Value::Int64(25400)),
317                ],
318            ),
319            (
320                // we not resolve `b` key
321                "b%2Crtp%3D15000%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
322                vec![
323                    (
324                        "prefix_sid",
325                        Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
326                    ),
327                    ("prefix_rtp", Value::Int64(15000)),
328                ],
329            ),
330            (
331                "bs%2Csu",
332                vec![
333                    ("prefix_su", Value::Boolean(true)),
334                    ("prefix_bs", Value::Boolean(true)),
335                ],
336            ),
337            (
338                // we not resolve custom key
339                "d%3D4004%2Ccom.example-myNumericKey%3D500%2Ccom.examplemyStringKey%3D%22myStringValue%22",
340                vec![
341                    // (
342                    //     "prefix_com.example-myNumericKey",
343                    //     Value::String("500".into()),
344                    // ),
345                    // (
346                    //     "prefix_com.examplemyStringKey",
347                    //     Value::String("\"myStringValue\"".into()),
348                    // ),
349                    ("prefix_d", Value::Int64(4004)),
350                ],
351            ),
352            (
353                "nor%3D%22..%252F300kbps%252Fsegment35.m4v%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
354                vec![
355                    (
356                        "prefix_sid",
357                        Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
358                    ),
359                    (
360                        "prefix_nor",
361                        Value::String("\"../300kbps/segment35.m4v\"".into()),
362
363                    ),
364                ],
365            ),
366            (
367                "nrr%3D%2212323-48763%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
368                vec![
369                    ("prefix_nrr", Value::String("\"12323-48763\"".into())),
370                    (
371                        "prefix_sid",
372                        Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
373                    ),
374                ],
375            ),
376            (
377                "nor%3D%22..%252F300kbps%252Ftrack.m4v%22%2Cnrr%3D%2212323-48763%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
378                vec![
379                    ("prefix_nrr", Value::String("\"12323-48763\"".into())),
380                    (
381                        "prefix_sid",
382                        Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
383                    ),
384                    (
385                        "prefix_nor",
386                        Value::String("\"../300kbps/track.m4v\"".into()),
387                    ),
388                ],
389            ),
390            (
391                "bl%3D21300%2Cbr%3D3200%2Cbs%2Ccid%3D%22faec5fc2-ac30-11eabb37-0242ac130002%22%2Cd%3D4004%2Cdl%3D18500%2Cmtp%3D48100%2Cnor%3D%22..%252F300kbps%252Ftrack.m4v%22%2Cnrr%3D%2212323-48763%22%2Cot%3Dv%2Cpr%3D1.08%2Crtp%3D12000%2Csf%3Dd%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22%2Cst%3Dv%2Csu%2Ctb%3D6000",
392                vec![
393                    ("prefix_bl", Value::Int64(21300)),
394                    ("prefix_bs", Value::Boolean(true)),
395                    ("prefix_st", Value::String("v".into())),
396                    ("prefix_ot", Value::String("v".into())),
397                    (
398                        "prefix_sid",
399                        Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
400                    ),
401                    ("prefix_tb", Value::Int64(6000)),
402                    ("prefix_d", Value::Int64(4004)),
403                    (
404                        "prefix_cid",
405                        Value::String("\"faec5fc2-ac30-11eabb37-0242ac130002\"".into()),
406                    ),
407                    ("prefix_mtp", Value::Int64(48100)),
408                    ("prefix_rtp", Value::Int64(12000)),
409                    (
410                        "prefix_nor",
411                        Value::String("\"../300kbps/track.m4v\"".into()),
412                    ),
413                    ("prefix_sf", Value::String("d".into())),
414                    ("prefix_br", Value::Int64(3200)),
415                    ("prefix_nrr", Value::String("\"12323-48763\"".into())),
416                    ("prefix_pr", Value::Float64(1.08)),
417                    ("prefix_su", Value::Boolean(true)),
418                    ("prefix_dl", Value::Int64(18500)),
419                ],
420            ),
421        ];
422
423        let field = Field::new("prefix", None);
424
425        let processor = CmcdProcessor {
426            fields: Fields::new(vec![field]),
427            ignore_missing: false,
428        };
429
430        for (s, vec) in ss.into_iter() {
431            let decoded = decode(s).unwrap().to_string();
432
433            let expected = vec
434                .into_iter()
435                .map(|(k, v)| (k.to_string(), v))
436                .collect::<BTreeMap<String, Value>>();
437
438            let actual = processor.parse("prefix", &decoded).unwrap();
439            assert_eq!(actual, expected);
440        }
441    }
442}