pipeline/etl/processor/
cmcd.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Pipeline Processor for CMCD (Common Media Client Data) data.
16//!
17//! Refer to [`CmcdProcessor`] for more information.
18
19use snafu::{OptionExt, ResultExt};
20use urlencoding::decode;
21
22use crate::error::{
23    CmcdMissingKeySnafu, CmcdMissingValueSnafu, Error, FailedToParseFloatKeySnafu,
24    FailedToParseIntKeySnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
25    ProcessorMissingFieldSnafu, Result,
26};
27use crate::etl::field::Fields;
28use crate::etl::processor::{
29    yaml_bool, yaml_new_field, yaml_new_fields, Processor, FIELDS_NAME, FIELD_NAME,
30    IGNORE_MISSING_NAME,
31};
32use crate::etl::value::Value;
33use crate::etl::PipelineMap;
34
35pub(crate) const PROCESSOR_CMCD: &str = "cmcd";
36
37const CMCD_KEY_BR: &str = "br"; // Encoded bitrate, Integer kbps
38const CMCD_KEY_BL: &str = "bl"; // Buffer length, Integer milliseconds
39const CMCD_KEY_BS: &str = "bs"; // Buffer starvation, Boolean
40const CMCD_KEY_CID: &str = "cid"; // Content ID, String
41const CMCD_KEY_D: &str = "d"; // Object duration, Integer milliseconds
42const CMCD_KEY_DL: &str = "dl"; // Deadline, Integer milliseconds
43const CMCD_KEY_MTP: &str = "mtp"; // Measured throughput, Integer kbps
44const CMCD_KEY_NOR: &str = "nor"; // Next object request, String
45const CMCD_KEY_NRR: &str = "nrr"; // Next request range, String, "<range-start>-<range-end>"
46const CMCD_KEY_OT: &str = "ot"; // Object type, Token - one of [m,a,v,av,i,c,tt,k,o]
47const CMCD_KEY_PR: &str = "pr"; // Playback rate, Decimal
48const CMCD_KEY_RTP: &str = "rtp"; // Requested maximum throughput, Integer kbps
49const CMCD_KEY_SF: &str = "sf"; // Stall frequency, Token - one of [d,h,s,o]
50const CMCD_KEY_SID: &str = "sid"; // Session ID, String
51const CMCD_KEY_ST: &str = "st"; // Stream type, Token - one of [v,l]
52const CMCD_KEY_SU: &str = "su"; // Startup, Boolean
53const CMCD_KEY_TB: &str = "tb"; // Top bitrate, Integer kbps
54const CMCD_KEY_V: &str = "v"; // Version
55
56const CMCD_KEYS: [&str; 18] = [
57    CMCD_KEY_BR,
58    CMCD_KEY_BL,
59    CMCD_KEY_BS,
60    CMCD_KEY_CID,
61    CMCD_KEY_D,
62    CMCD_KEY_DL,
63    CMCD_KEY_MTP,
64    CMCD_KEY_NOR,
65    CMCD_KEY_NRR,
66    CMCD_KEY_OT,
67    CMCD_KEY_PR,
68    CMCD_KEY_RTP,
69    CMCD_KEY_SF,
70    CMCD_KEY_SID,
71    CMCD_KEY_ST,
72    CMCD_KEY_SU,
73    CMCD_KEY_TB,
74    CMCD_KEY_V,
75];
76
77/// function to resolve CMCD_KEY_BS | CMCD_KEY_SU
78fn bs_su(_: &str, _: &str, _: Option<&str>) -> Result<Value> {
79    Ok(Value::Boolean(true))
80}
81
82/// function to resolve CMCD_KEY_BR | CMCD_KEY_BL | CMCD_KEY_D | CMCD_KEY_DL | CMCD_KEY_MTP | CMCD_KEY_RTP | CMCD_KEY_TB
83fn br_tb(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
84    let v = v.context(CmcdMissingValueSnafu { k, s })?;
85    let val: i64 = v
86        .parse()
87        .context(FailedToParseIntKeySnafu { key: k, value: v })?;
88    Ok(Value::Int64(val))
89}
90
91/// function to resolve CMCD_KEY_CID | CMCD_KEY_NRR | CMCD_KEY_OT | CMCD_KEY_SF | CMCD_KEY_SID | CMCD_KEY_V
92fn cid_v(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
93    let v = v.context(CmcdMissingValueSnafu { k, s })?;
94    Ok(Value::String(v.to_string()))
95}
96
97/// function to resolve CMCD_KEY_NOR
98fn nor(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
99    let v = v.context(CmcdMissingValueSnafu { k, s })?;
100    let val = match decode(v) {
101        Ok(val) => val.to_string(),
102        Err(_) => v.to_string(),
103    };
104    Ok(Value::String(val))
105}
106
107/// function to resolve CMCD_KEY_PR
108fn pr(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
109    let v = v.context(CmcdMissingValueSnafu { k, s })?;
110    let val: f64 = v
111        .parse()
112        .context(FailedToParseFloatKeySnafu { key: k, value: v })?;
113    Ok(Value::Float64(val))
114}
115
116/// Common Media Client Data Specification:
117/// https://cdn.cta.tech/cta/media/media/resources/standards/pdfs/cta-5004-final.pdf
118///
119///
120/// The data payload for Header and Query Argument transmission consists of a series of
121/// key/value pairs constructed according to the following rules:
122/// 1. All information in the payload MUST be represented as <key>=<value> pairs.
123/// 2. The key and value MUST be separated by an equals sign Unicode 0x3D. If the
124///    value type is BOOLEAN and the value is TRUE, then the equals sign and the value
125///    MUST be omitted.
126/// 3. Successive key/value pairs MUST be delimited by a comma Unicode 0x2C.
127/// 4. The key names described in this specification are reserved. Custom key names
128///    may be used, but they MUST carry a hyphenated prefix to ensure that there will
129///    not be a namespace collision with future revisions to this specification. Clients
130///    SHOULD use a reverse-DNS syntax when defining their own prefix.
131/// 5. If headers are used for data transmission, then custom keys SHOULD be
132///    allocated to one of the four defined header names based upon their expected
133///    level of variability:
134///       a. CMCD-Request: keys whose values vary with each request.
135///       b. CMCD-Object: keys whose values vary with the object being requested.
136///       c. CMCD-Status: keys whose values do not vary with every request or object.
137///       d. CMCD-Session: keys whose values are expected to be invariant over the life of the session.
138/// 6. All key names are case-sensitive.
139/// 7. Any value of type String MUST be enclosed by opening and closing double
140///    quotes Unicode 0x22. Double quotes and backslashes MUST be escaped using a
141///    backslash "\" Unicode 0x5C character. Any value of type Token does not require
142///    quoting.
143/// 8. All keys are OPTIONAL.
144/// 9. Key-value pairs SHOULD be sequenced in alphabetical order of the key name in
145///    order to reduce the fingerprinting surface exposed by the player.
146/// 10. If the data payload is transmitted as a query argument, then the entire payload
147///     string MUST be URLEncoded per [5]. Data payloads transmitted via headers
148///     MUST NOT be URLEncoded.
149/// 11. The data payload syntax is intended to be compliant with Structured Field Values for HTTP [6].
150/// 12. Transport Layer Security SHOULD be used to protect all transmission of CMCD data.
151#[derive(Debug, Default)]
152pub struct CmcdProcessor {
153    fields: Fields,
154    ignore_missing: bool,
155}
156
157impl CmcdProcessor {
158    fn generate_key(prefix: &str, key: &str) -> String {
159        format!("{}_{}", prefix, key)
160    }
161
162    fn parse(&self, name: &str, value: &str) -> Result<PipelineMap> {
163        let mut working_set = PipelineMap::new();
164
165        let parts = value.split(',');
166
167        for part in parts {
168            let mut kv = part.split('=');
169            let k = kv.next().context(CmcdMissingKeySnafu { part, s: value })?;
170            let v = kv.next();
171
172            for cmcd_key in CMCD_KEYS {
173                if cmcd_key == k {
174                    match cmcd_key {
175                        CMCD_KEY_BS | CMCD_KEY_SU => {
176                            working_set
177                                .insert(Self::generate_key(name, cmcd_key), bs_su(value, k, v)?);
178                        }
179                        CMCD_KEY_BR | CMCD_KEY_BL | CMCD_KEY_D | CMCD_KEY_DL | CMCD_KEY_MTP
180                        | CMCD_KEY_RTP | CMCD_KEY_TB => {
181                            working_set
182                                .insert(Self::generate_key(name, cmcd_key), br_tb(value, k, v)?);
183                        }
184                        CMCD_KEY_CID | CMCD_KEY_NRR | CMCD_KEY_OT | CMCD_KEY_SF | CMCD_KEY_SID
185                        | CMCD_KEY_ST | CMCD_KEY_V => {
186                            working_set
187                                .insert(Self::generate_key(name, cmcd_key), cid_v(value, k, v)?);
188                        }
189                        CMCD_KEY_NOR => {
190                            working_set
191                                .insert(Self::generate_key(name, cmcd_key), nor(value, k, v)?);
192                        }
193                        CMCD_KEY_PR => {
194                            working_set
195                                .insert(Self::generate_key(name, cmcd_key), pr(value, k, v)?);
196                        }
197
198                        _ => {}
199                    }
200                }
201            }
202        }
203        Ok(working_set)
204    }
205}
206
207impl TryFrom<&yaml_rust::yaml::Hash> for CmcdProcessor {
208    type Error = Error;
209
210    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
211        let mut fields = Fields::default();
212        let mut ignore_missing = false;
213
214        for (k, v) in value.iter() {
215            let key = k
216                .as_str()
217                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
218            match key {
219                FIELD_NAME => {
220                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
221                }
222                FIELDS_NAME => {
223                    fields = yaml_new_fields(v, FIELDS_NAME)?;
224                }
225
226                IGNORE_MISSING_NAME => {
227                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
228                }
229
230                _ => {}
231            }
232        }
233
234        let proc = CmcdProcessor {
235            fields,
236            ignore_missing,
237        };
238
239        Ok(proc)
240    }
241}
242
243impl Processor for CmcdProcessor {
244    fn kind(&self) -> &str {
245        PROCESSOR_CMCD
246    }
247
248    fn ignore_missing(&self) -> bool {
249        self.ignore_missing
250    }
251
252    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
253        for field in self.fields.iter() {
254            let name = field.input_field();
255
256            match val.get(name) {
257                Some(Value::String(s)) => {
258                    let results = self.parse(field.target_or_input_field(), s)?;
259                    val.extend(results);
260                }
261                Some(Value::Null) | None => {
262                    if !self.ignore_missing {
263                        return ProcessorMissingFieldSnafu {
264                            processor: self.kind().to_string(),
265                            field: name.to_string(),
266                        }
267                        .fail();
268                    }
269                }
270                Some(v) => {
271                    return ProcessorExpectStringSnafu {
272                        processor: self.kind().to_string(),
273                        v: v.clone(),
274                    }
275                    .fail();
276                }
277            }
278        }
279
280        Ok(())
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use urlencoding::decode;
287
288    use super::*;
289    use crate::etl::field::{Field, Fields};
290    use crate::etl::value::Value;
291
292    #[test]
293    fn test_cmcd() {
294        let ss = [
295            (
296                "sid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
297                vec![(
298                    "prefix_sid",
299                    Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
300                )],
301            ),
302            (
303                "br%3D3200%2Cbs%2Cd%3D4004%2Cmtp%3D25400%2Cot%3Dv%2Crtp%3D15000%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22%2Ctb%3D6000",
304                vec![
305                    ("prefix_bs", Value::Boolean(true)),
306                    ("prefix_ot", Value::String("v".into())),
307                    ("prefix_rtp", Value::Int64(15000)),
308                    ("prefix_br", Value::Int64(3200)),
309                    ("prefix_tb", Value::Int64(6000)),
310                    ("prefix_d", Value::Int64(4004)),
311                    (
312                        "prefix_sid",
313                        Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
314                    ),
315                    ("prefix_mtp", Value::Int64(25400)),
316                ],
317            ),
318            (
319                // we not resolve `b` key
320                "b%2Crtp%3D15000%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
321                vec![
322                    (
323                        "prefix_sid",
324                        Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
325                    ),
326                    ("prefix_rtp", Value::Int64(15000)),
327                ],
328            ),
329            (
330                "bs%2Csu",
331                vec![
332                    ("prefix_su", Value::Boolean(true)),
333                    ("prefix_bs", Value::Boolean(true)),
334                ],
335            ),
336            (
337                // we not resolve custom key
338                "d%3D4004%2Ccom.example-myNumericKey%3D500%2Ccom.examplemyStringKey%3D%22myStringValue%22",
339                vec![
340                    // (
341                    //     "prefix_com.example-myNumericKey",
342                    //     Value::String("500".into()),
343                    // ),
344                    // (
345                    //     "prefix_com.examplemyStringKey",
346                    //     Value::String("\"myStringValue\"".into()),
347                    // ),
348                    ("prefix_d", Value::Int64(4004)),
349                ],
350            ),
351            (
352                "nor%3D%22..%252F300kbps%252Fsegment35.m4v%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
353                vec![
354                    (
355                        "prefix_sid",
356                        Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
357                    ),
358                    (
359                        "prefix_nor",
360                        Value::String("\"../300kbps/segment35.m4v\"".into()),
361
362                    ),
363                ],
364            ),
365            (
366                "nrr%3D%2212323-48763%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
367                vec![
368                    ("prefix_nrr", Value::String("\"12323-48763\"".into())),
369                    (
370                        "prefix_sid",
371                        Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
372                    ),
373                ],
374            ),
375            (
376                "nor%3D%22..%252F300kbps%252Ftrack.m4v%22%2Cnrr%3D%2212323-48763%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
377                vec![
378                    ("prefix_nrr", Value::String("\"12323-48763\"".into())),
379                    (
380                        "prefix_sid",
381                        Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
382                    ),
383                    (
384                        "prefix_nor",
385                        Value::String("\"../300kbps/track.m4v\"".into()),
386                    ),
387                ],
388            ),
389            (
390                "bl%3D21300%2Cbr%3D3200%2Cbs%2Ccid%3D%22faec5fc2-ac30-11eabb37-0242ac130002%22%2Cd%3D4004%2Cdl%3D18500%2Cmtp%3D48100%2Cnor%3D%22..%252F300kbps%252Ftrack.m4v%22%2Cnrr%3D%2212323-48763%22%2Cot%3Dv%2Cpr%3D1.08%2Crtp%3D12000%2Csf%3Dd%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22%2Cst%3Dv%2Csu%2Ctb%3D6000",
391                vec![
392                    ("prefix_bl", Value::Int64(21300)),
393                    ("prefix_bs", Value::Boolean(true)),
394                    ("prefix_st", Value::String("v".into())),
395                    ("prefix_ot", Value::String("v".into())),
396                    (
397                        "prefix_sid",
398                        Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
399                    ),
400                    ("prefix_tb", Value::Int64(6000)),
401                    ("prefix_d", Value::Int64(4004)),
402                    (
403                        "prefix_cid",
404                        Value::String("\"faec5fc2-ac30-11eabb37-0242ac130002\"".into()),
405                    ),
406                    ("prefix_mtp", Value::Int64(48100)),
407                    ("prefix_rtp", Value::Int64(12000)),
408                    (
409                        "prefix_nor",
410                        Value::String("\"../300kbps/track.m4v\"".into()),
411                    ),
412                    ("prefix_sf", Value::String("d".into())),
413                    ("prefix_br", Value::Int64(3200)),
414                    ("prefix_nrr", Value::String("\"12323-48763\"".into())),
415                    ("prefix_pr", Value::Float64(1.08)),
416                    ("prefix_su", Value::Boolean(true)),
417                    ("prefix_dl", Value::Int64(18500)),
418                ],
419            ),
420        ];
421
422        let field = Field::new("prefix", None);
423
424        let processor = CmcdProcessor {
425            fields: Fields::new(vec![field]),
426            ignore_missing: false,
427        };
428
429        for (s, vec) in ss.into_iter() {
430            let decoded = decode(s).unwrap().to_string();
431
432            let expected = vec
433                .into_iter()
434                .map(|(k, v)| (k.to_string(), v))
435                .collect::<PipelineMap>();
436
437            let actual = processor.parse("prefix", &decoded).unwrap();
438            assert_eq!(actual, expected);
439        }
440    }
441}