pipeline/etl/processor/
cmcd.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Pipeline Processor for CMCD (Common Media Client Data) data.
16//!
17//! Refer to [`CmcdProcessor`] for more information.
18
19use std::collections::BTreeMap;
20
21use ordered_float::NotNan;
22use snafu::{OptionExt, ResultExt};
23use urlencoding::decode;
24use vrl::prelude::Bytes;
25use vrl::value::{KeyString, Value as VrlValue};
26
27use crate::error::{
28    CmcdMissingKeySnafu, CmcdMissingValueSnafu, Error, FailedToParseFloatKeySnafu,
29    FailedToParseIntKeySnafu, FloatIsNanSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
30    ProcessorMissingFieldSnafu, Result, ValueMustBeMapSnafu,
31};
32use crate::etl::field::Fields;
33use crate::etl::processor::{
34    yaml_bool, yaml_new_field, yaml_new_fields, Processor, FIELDS_NAME, FIELD_NAME,
35    IGNORE_MISSING_NAME,
36};
37
38pub(crate) const PROCESSOR_CMCD: &str = "cmcd";
39
40const CMCD_KEY_BR: &str = "br"; // Encoded bitrate, Integer kbps
41const CMCD_KEY_BL: &str = "bl"; // Buffer length, Integer milliseconds
42const CMCD_KEY_BS: &str = "bs"; // Buffer starvation, Boolean
43const CMCD_KEY_CID: &str = "cid"; // Content ID, String
44const CMCD_KEY_D: &str = "d"; // Object duration, Integer milliseconds
45const CMCD_KEY_DL: &str = "dl"; // Deadline, Integer milliseconds
46const CMCD_KEY_MTP: &str = "mtp"; // Measured throughput, Integer kbps
47const CMCD_KEY_NOR: &str = "nor"; // Next object request, String
48const CMCD_KEY_NRR: &str = "nrr"; // Next request range, String, "<range-start>-<range-end>"
49const CMCD_KEY_OT: &str = "ot"; // Object type, Token - one of [m,a,v,av,i,c,tt,k,o]
50const CMCD_KEY_PR: &str = "pr"; // Playback rate, Decimal
51const CMCD_KEY_RTP: &str = "rtp"; // Requested maximum throughput, Integer kbps
52const CMCD_KEY_SF: &str = "sf"; // Stall frequency, Token - one of [d,h,s,o]
53const CMCD_KEY_SID: &str = "sid"; // Session ID, String
54const CMCD_KEY_ST: &str = "st"; // Stream type, Token - one of [v,l]
55const CMCD_KEY_SU: &str = "su"; // Startup, Boolean
56const CMCD_KEY_TB: &str = "tb"; // Top bitrate, Integer kbps
57const CMCD_KEY_V: &str = "v"; // Version
58
59const CMCD_KEYS: [&str; 18] = [
60    CMCD_KEY_BR,
61    CMCD_KEY_BL,
62    CMCD_KEY_BS,
63    CMCD_KEY_CID,
64    CMCD_KEY_D,
65    CMCD_KEY_DL,
66    CMCD_KEY_MTP,
67    CMCD_KEY_NOR,
68    CMCD_KEY_NRR,
69    CMCD_KEY_OT,
70    CMCD_KEY_PR,
71    CMCD_KEY_RTP,
72    CMCD_KEY_SF,
73    CMCD_KEY_SID,
74    CMCD_KEY_ST,
75    CMCD_KEY_SU,
76    CMCD_KEY_TB,
77    CMCD_KEY_V,
78];
79
80/// function to resolve CMCD_KEY_BS | CMCD_KEY_SU
81fn bs_su(_: &str, _: &str, _: Option<&str>) -> Result<VrlValue> {
82    Ok(VrlValue::Boolean(true))
83}
84
85/// function to resolve CMCD_KEY_BR | CMCD_KEY_BL | CMCD_KEY_D | CMCD_KEY_DL | CMCD_KEY_MTP | CMCD_KEY_RTP | CMCD_KEY_TB
86fn br_tb(s: &str, k: &str, v: Option<&str>) -> Result<VrlValue> {
87    let v = v.context(CmcdMissingValueSnafu { k, s })?;
88    let val: i64 = v
89        .parse()
90        .context(FailedToParseIntKeySnafu { key: k, value: v })?;
91    Ok(VrlValue::Integer(val))
92}
93
94/// function to resolve CMCD_KEY_CID | CMCD_KEY_NRR | CMCD_KEY_OT | CMCD_KEY_SF | CMCD_KEY_SID | CMCD_KEY_V
95fn cid_v(s: &str, k: &str, v: Option<&str>) -> Result<VrlValue> {
96    let v = v.context(CmcdMissingValueSnafu { k, s })?;
97    Ok(VrlValue::Bytes(Bytes::from(v.to_string())))
98}
99
100/// function to resolve CMCD_KEY_NOR
101fn nor(s: &str, k: &str, v: Option<&str>) -> Result<VrlValue> {
102    let v = v.context(CmcdMissingValueSnafu { k, s })?;
103    let val = match decode(v) {
104        Ok(val) => val.to_string(),
105        Err(_) => v.to_string(),
106    };
107    Ok(VrlValue::Bytes(Bytes::from(val)))
108}
109
110/// function to resolve CMCD_KEY_PR
111fn pr(s: &str, k: &str, v: Option<&str>) -> Result<VrlValue> {
112    let v = v.context(CmcdMissingValueSnafu { k, s })?;
113    let val: f64 = v
114        .parse()
115        .context(FailedToParseFloatKeySnafu { key: k, value: v })?;
116    let val = NotNan::new(val).context(FloatIsNanSnafu)?;
117    Ok(VrlValue::Float(val))
118}
119
120/// Common Media Client Data Specification:
121/// https://cdn.cta.tech/cta/media/media/resources/standards/pdfs/cta-5004-final.pdf
122///
123///
124/// The data payload for Header and Query Argument transmission consists of a series of
125/// key/value pairs constructed according to the following rules:
126/// 1. All information in the payload MUST be represented as <key>=<value> pairs.
127/// 2. The key and value MUST be separated by an equals sign Unicode 0x3D. If the
128///    value type is BOOLEAN and the value is TRUE, then the equals sign and the value
129///    MUST be omitted.
130/// 3. Successive key/value pairs MUST be delimited by a comma Unicode 0x2C.
131/// 4. The key names described in this specification are reserved. Custom key names
132///    may be used, but they MUST carry a hyphenated prefix to ensure that there will
133///    not be a namespace collision with future revisions to this specification. Clients
134///    SHOULD use a reverse-DNS syntax when defining their own prefix.
135/// 5. If headers are used for data transmission, then custom keys SHOULD be
136///    allocated to one of the four defined header names based upon their expected
137///    level of variability:
138///       a. CMCD-Request: keys whose values vary with each request.
139///       b. CMCD-Object: keys whose values vary with the object being requested.
140///       c. CMCD-Status: keys whose values do not vary with every request or object.
141///       d. CMCD-Session: keys whose values are expected to be invariant over the life of the session.
142/// 6. All key names are case-sensitive.
143/// 7. Any value of type String MUST be enclosed by opening and closing double
144///    quotes Unicode 0x22. Double quotes and backslashes MUST be escaped using a
145///    backslash "\" Unicode 0x5C character. Any value of type Token does not require
146///    quoting.
147/// 8. All keys are OPTIONAL.
148/// 9. Key-value pairs SHOULD be sequenced in alphabetical order of the key name in
149///    order to reduce the fingerprinting surface exposed by the player.
150/// 10. If the data payload is transmitted as a query argument, then the entire payload
151///     string MUST be URLEncoded per [5]. Data payloads transmitted via headers
152///     MUST NOT be URLEncoded.
153/// 11. The data payload syntax is intended to be compliant with Structured Field Values for HTTP [6].
154/// 12. Transport Layer Security SHOULD be used to protect all transmission of CMCD data.
155#[derive(Debug, Default)]
156pub struct CmcdProcessor {
157    fields: Fields,
158    ignore_missing: bool,
159}
160
161impl CmcdProcessor {
162    fn generate_key(prefix: &str, key: &str) -> KeyString {
163        KeyString::from(format!("{}_{}", prefix, key))
164    }
165
166    fn parse(&self, name: &str, value: &str) -> Result<BTreeMap<KeyString, VrlValue>> {
167        let mut working_set = BTreeMap::new();
168
169        let parts = value.split(',');
170
171        for part in parts {
172            let mut kv = part.split('=');
173            let k = kv.next().context(CmcdMissingKeySnafu { part, s: value })?;
174            let v = kv.next();
175
176            for cmcd_key in CMCD_KEYS {
177                if cmcd_key == k {
178                    match cmcd_key {
179                        CMCD_KEY_BS | CMCD_KEY_SU => {
180                            working_set
181                                .insert(Self::generate_key(name, cmcd_key), bs_su(value, k, v)?);
182                        }
183                        CMCD_KEY_BR | CMCD_KEY_BL | CMCD_KEY_D | CMCD_KEY_DL | CMCD_KEY_MTP
184                        | CMCD_KEY_RTP | CMCD_KEY_TB => {
185                            working_set
186                                .insert(Self::generate_key(name, cmcd_key), br_tb(value, k, v)?);
187                        }
188                        CMCD_KEY_CID | CMCD_KEY_NRR | CMCD_KEY_OT | CMCD_KEY_SF | CMCD_KEY_SID
189                        | CMCD_KEY_ST | CMCD_KEY_V => {
190                            working_set
191                                .insert(Self::generate_key(name, cmcd_key), cid_v(value, k, v)?);
192                        }
193                        CMCD_KEY_NOR => {
194                            working_set
195                                .insert(Self::generate_key(name, cmcd_key), nor(value, k, v)?);
196                        }
197                        CMCD_KEY_PR => {
198                            working_set
199                                .insert(Self::generate_key(name, cmcd_key), pr(value, k, v)?);
200                        }
201
202                        _ => {}
203                    }
204                }
205            }
206        }
207        Ok(working_set)
208    }
209}
210
211impl TryFrom<&yaml_rust::yaml::Hash> for CmcdProcessor {
212    type Error = Error;
213
214    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
215        let mut fields = Fields::default();
216        let mut ignore_missing = false;
217
218        for (k, v) in value.iter() {
219            let key = k
220                .as_str()
221                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
222            match key {
223                FIELD_NAME => {
224                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
225                }
226                FIELDS_NAME => {
227                    fields = yaml_new_fields(v, FIELDS_NAME)?;
228                }
229
230                IGNORE_MISSING_NAME => {
231                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
232                }
233
234                _ => {}
235            }
236        }
237
238        let proc = CmcdProcessor {
239            fields,
240            ignore_missing,
241        };
242
243        Ok(proc)
244    }
245}
246
247impl Processor for CmcdProcessor {
248    fn kind(&self) -> &str {
249        PROCESSOR_CMCD
250    }
251
252    fn ignore_missing(&self) -> bool {
253        self.ignore_missing
254    }
255
256    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
257        for field in self.fields.iter() {
258            let name = field.input_field();
259            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
260            match val.get(name) {
261                Some(VrlValue::Bytes(s)) => {
262                    let s = String::from_utf8_lossy(s);
263                    let results = self.parse(field.target_or_input_field(), &s)?;
264
265                    val.extend(results);
266                }
267                Some(VrlValue::Null) | None => {
268                    if !self.ignore_missing {
269                        return ProcessorMissingFieldSnafu {
270                            processor: self.kind().to_string(),
271                            field: name.to_string(),
272                        }
273                        .fail();
274                    }
275                }
276                Some(v) => {
277                    return ProcessorExpectStringSnafu {
278                        processor: self.kind().to_string(),
279                        v: v.clone(),
280                    }
281                    .fail();
282                }
283            }
284        }
285
286        Ok(val)
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use urlencoding::decode;
293
294    use super::*;
295    use crate::etl::field::{Field, Fields};
296
297    #[test]
298    fn test_cmcd() {
299        let ss = [
300            (
301                "sid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
302                vec![(
303                    "prefix_sid",
304                    VrlValue::Bytes(Bytes::from("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"")),
305                )],
306            ),
307            (
308                "br%3D3200%2Cbs%2Cd%3D4004%2Cmtp%3D25400%2Cot%3Dv%2Crtp%3D15000%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22%2Ctb%3D6000",
309                vec![
310                    ("prefix_bs", VrlValue::Boolean(true)),
311                    ("prefix_ot", VrlValue::Bytes(Bytes::from("v"))),
312                    ("prefix_rtp", VrlValue::Integer(15000)),
313                    ("prefix_br", VrlValue::Integer(3200)),
314                    ("prefix_tb", VrlValue::Integer(6000)),
315                    ("prefix_d", VrlValue::Integer(4004)),
316                    (
317                        "prefix_sid",
318                        VrlValue::Bytes(Bytes::from("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"")),
319                    ),
320                    ("prefix_mtp", VrlValue::Integer(25400)),
321                ],
322            ),
323            (
324                // we not resolve `b` key
325                "b%2Crtp%3D15000%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
326                vec![
327                    (
328                        "prefix_sid",
329                        VrlValue::Bytes(Bytes::from("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"")),
330                    ),
331                    ("prefix_rtp", VrlValue::Integer(15000)),
332                ],
333            ),
334            (
335                "bs%2Csu",
336                vec![
337                    ("prefix_su", VrlValue::Boolean(true)),
338                    ("prefix_bs", VrlValue::Boolean(true)),
339                ],
340            ),
341            (
342                // we not resolve custom key
343                "d%3D4004%2Ccom.example-myNumericKey%3D500%2Ccom.examplemyStringKey%3D%22myStringValue%22",
344                vec![
345                    // (
346                    //     "prefix_com.example-myNumericKey",
347                    //     Value::String("500".into()),
348                    // ),
349                    // (
350                    //     "prefix_com.examplemyStringKey",
351                    //     Value::String("\"myStringValue\"".into()),
352                    // ),
353                    ("prefix_d", VrlValue::Integer(4004)),
354                ],
355            ),
356            (
357                "nor%3D%22..%252F300kbps%252Fsegment35.m4v%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
358                vec![
359                    (
360                        "prefix_sid",
361                        VrlValue::Bytes(Bytes::from("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"")),
362                    ),
363                    (
364                        "prefix_nor",
365                        VrlValue::Bytes(Bytes::from("\"../300kbps/segment35.m4v\"")),
366
367                    ),
368                ],
369            ),
370            (
371                "nrr%3D%2212323-48763%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
372                vec![
373                    ("prefix_nrr", VrlValue::Bytes(Bytes::from("\"12323-48763\""))),
374                    (
375                        "prefix_sid",
376                        VrlValue::Bytes(Bytes::from("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"")),
377                    ),
378                ],
379            ),
380            (
381                "nor%3D%22..%252F300kbps%252Ftrack.m4v%22%2Cnrr%3D%2212323-48763%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
382                vec![
383                    ("prefix_nrr", VrlValue::Bytes(Bytes::from("\"12323-48763\""))),
384                    (
385                        "prefix_sid",
386                        VrlValue::Bytes(Bytes::from("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"")),
387                    ),
388                    (
389                        "prefix_nor",
390                        VrlValue::Bytes(Bytes::from("\"../300kbps/track.m4v\"")),
391                    ),
392                ],
393            ),
394            (
395                "bl%3D21300%2Cbr%3D3200%2Cbs%2Ccid%3D%22faec5fc2-ac30-11eabb37-0242ac130002%22%2Cd%3D4004%2Cdl%3D18500%2Cmtp%3D48100%2Cnor%3D%22..%252F300kbps%252Ftrack.m4v%22%2Cnrr%3D%2212323-48763%22%2Cot%3Dv%2Cpr%3D1.08%2Crtp%3D12000%2Csf%3Dd%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22%2Cst%3Dv%2Csu%2Ctb%3D6000",
396                vec![
397                    ("prefix_bl", VrlValue::Integer(21300)),
398                    ("prefix_bs", VrlValue::Boolean(true)),
399                    ("prefix_st", VrlValue::Bytes(Bytes::from("v"))),
400                    ("prefix_ot", VrlValue::Bytes(Bytes::from("v"))),
401                    (
402                        "prefix_sid",
403                        VrlValue::Bytes(Bytes::from("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"")),
404                    ),
405                    ("prefix_tb", VrlValue::Integer(6000)),
406                    ("prefix_d", VrlValue::Integer(4004)),
407                    (
408                        "prefix_cid",
409                        VrlValue::Bytes(Bytes::from("\"faec5fc2-ac30-11eabb37-0242ac130002\"")),
410                    ),
411                    ("prefix_mtp", VrlValue::Integer(48100)),
412                    ("prefix_rtp", VrlValue::Integer(12000)),
413                    (
414                        "prefix_nor",
415                        VrlValue::Bytes(Bytes::from("\"../300kbps/track.m4v\"")),
416                    ),
417                    ("prefix_sf", VrlValue::Bytes(Bytes::from("d"))),
418                    ("prefix_br", VrlValue::Integer(3200)),
419                    ("prefix_nrr", VrlValue::Bytes(Bytes::from("\"12323-48763\""))),
420                    ("prefix_pr", VrlValue::Float(NotNan::new(1.08).unwrap())),
421                    ("prefix_su", VrlValue::Boolean(true)),
422                    ("prefix_dl", VrlValue::Integer(18500)),
423                ],
424            ),
425        ];
426
427        let field = Field::new("prefix", None);
428
429        let processor = CmcdProcessor {
430            fields: Fields::new(vec![field]),
431            ignore_missing: false,
432        };
433
434        for (s, vec) in ss.into_iter() {
435            let decoded = decode(s).unwrap().to_string();
436
437            let expected = vec
438                .into_iter()
439                .map(|(k, v)| (KeyString::from(k.to_string()), v))
440                .collect::<BTreeMap<KeyString, VrlValue>>();
441
442            let actual = processor.parse("prefix", &decoded).unwrap();
443            assert_eq!(actual, expected);
444        }
445    }
446}