Skip to content

Commit 57bbf0e

Browse files
committed
Detect missing frames in h264
1 parent fd33ff8 commit 57bbf0e

File tree

6 files changed

+165
-11
lines changed

6 files changed

+165
-11
lines changed

integration-tests/src/bin/play_rtp_dump.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,28 @@ fn main() {
88
}
99

1010
let input_file = &args[2];
11+
let file_name = input_file.split('.').collect::<Vec<_>>()[0];
1112
let command = match args[1].as_str() {
1213
"video" => {
1314
format!(
1415
"gst-launch-1.0 -v filesrc location={input_file} ! application/x-rtp-stream ! rtpstreamdepay ! rtph264depay ! video/x-h264,framerate=30/1 ! h264parse ! h264timestamper ! decodebin ! videoconvert ! autovideosink"
1516
)
1617
}
18+
"video-mp4" => {
19+
format!(
20+
"gst-launch-1.0 -v filesrc location={input_file} ! application/x-rtp-stream ! rtpstreamdepay ! rtph264depay ! video/x-h264,framerate=30/1 ! h264parse ! h264timestamper ! decodebin ! videoconvert ! x264enc ! mp4mux ! filesink location={file_name}.mp4"
21+
)
22+
}
23+
"video-h264" => {
24+
[
25+
"gst-launch-1.0 rtpptdemux name=demux ",
26+
&format!("filesrc location={input_file} ! \"application/x-rtp-stream\" ! rtpstreamdepay ! queue ! demux. "),
27+
"demux.src_96 ! \"application/x-rtp,media=video,clock-rate=90000,encoding-name=H264\" ! queue ",
28+
&format!("! rtph264depay ! decodebin ! videoconvert ! x264enc ! filesink location={file_name}.h264 "),
29+
"demux.src_97 ! \"application/x-rtp,media=audio,clock-rate=48000,encoding-name=OPUS\" ! queue ",
30+
"! rtpopusdepay ! decodebin ! audioconvert ! autoaudiosink ",
31+
].concat()
32+
}
1733
"audio" => {
1834
format!(
1935
"gst-launch-1.0 -v filesrc location={input_file} ! application/x-rtp-stream,payload=97,encoding-name=OPUS ! rtpstreamdepay ! rtpopusdepay ! audio/x-opus ! opusdec ! autoaudiosink"

integration-tests/src/pipeline_tests/video_audio.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub fn single_input_with_video_and_audio_flaky() -> Result<()> {
7575
"transport_protocol": "tcp_server",
7676
"port": input_port,
7777
"video": {
78-
"decoder": "ffmpeg_h264"
78+
"decoder": "vulkan_h264"
7979
},
8080
"audio": {
8181
"decoder": "opus"

smelter-core/src/pipeline/decoder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub(super) use dynamic_stream::{
1818
};
1919
pub(super) use static_stream::{AudioDecoderStream, VideoDecoderStream};
2020

21+
mod au_splitter;
2122
mod ffmpeg_utils;
2223

2324
pub mod ffmpeg_h264;
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
use std::time::Duration;
2+
3+
use bytes::BytesMut;
4+
use tracing::debug;
5+
use vk_video::parser::h264::{AccessUnit, H264Parser, ParsedNalu, nal_types::slice::SliceFamily};
6+
7+
use crate::prelude::*;
8+
9+
#[derive(Default)]
10+
pub struct AUSplitter {
11+
parser: H264Parser,
12+
prev_ref_frame_num: u16,
13+
detected_missed_frames: bool,
14+
}
15+
16+
impl AUSplitter {
17+
pub fn put_chunk(
18+
&mut self,
19+
chunk: EncodedInputChunk,
20+
) -> Result<Vec<EncodedInputChunk>, AUSplitterError> {
21+
if MediaKind::Video(VideoCodec::H264) != chunk.kind {
22+
return Err(AUSplitterError::UnsupportedMediaKind(chunk.kind));
23+
}
24+
25+
let access_units = self
26+
.parser
27+
.parse(&chunk.data, Some(chunk.pts.as_micros() as u64))?;
28+
29+
let mut chunks = Vec::new();
30+
for au in access_units {
31+
self.verify_access_unit(&au)?;
32+
33+
let mut data = BytesMut::new();
34+
let pts = match au.0.first().and_then(|nalu| nalu.pts) {
35+
Some(pts) => pts,
36+
None => {
37+
debug!("Expected access unit with pts. Skipping the access unit");
38+
continue;
39+
}
40+
};
41+
42+
// Parser returns nalus which may not start with a start code
43+
// but each nalu always ends with the start code of the next nalu,
44+
// so we have to make sure that there is a start code in the beginning
45+
const START_CODES: [&[u8]; 2] = [&[0, 0, 0, 1], &[0, 0, 1]];
46+
if let Some(first_nalu) = au.0.first() {
47+
let has_start_code = START_CODES
48+
.iter()
49+
.any(|code| first_nalu.raw_bytes.starts_with(code));
50+
if !has_start_code {
51+
data.extend_from_slice(&[0, 0, 1]);
52+
}
53+
}
54+
55+
for nalu in au.0.iter() {
56+
data.extend_from_slice(&nalu.raw_bytes);
57+
}
58+
59+
chunks.push(EncodedInputChunk {
60+
data: data.freeze(),
61+
pts: Duration::from_micros(pts),
62+
dts: None,
63+
kind: MediaKind::Video(VideoCodec::H264),
64+
});
65+
}
66+
67+
Ok(chunks)
68+
}
69+
70+
fn verify_access_unit(&mut self, au: &AccessUnit) -> Result<(), AUSplitterError> {
71+
let Some(ParsedNalu::Slice(slice)) =
72+
au.0.iter()
73+
.map(|nalu| &nalu.parsed)
74+
.find(|nalu| matches!(nalu, ParsedNalu::Slice(_)))
75+
else {
76+
return Err(AUSplitterError::InvalidAccessUnit);
77+
};
78+
79+
match slice.header.slice_type.family {
80+
SliceFamily::P | SliceFamily::B => {
81+
let sps = &slice.sps;
82+
let frame_num = slice.header.frame_num;
83+
let max_frame_num = 1i64 << sps.log2_max_frame_num();
84+
85+
let is_expected_frame_num = !sps.gaps_in_frame_num_value_allowed_flag
86+
&& frame_num != self.prev_ref_frame_num
87+
&& frame_num != ((self.prev_ref_frame_num as i64 + 1) % max_frame_num) as u16;
88+
if is_expected_frame_num || self.detected_missed_frames {
89+
debug!("AUSplitter detected missing frame");
90+
self.detected_missed_frames = true;
91+
return Err(AUSplitterError::MissingReferenceFrame);
92+
}
93+
94+
self.prev_ref_frame_num = frame_num;
95+
}
96+
SliceFamily::I => {
97+
self.prev_ref_frame_num = 0;
98+
self.detected_missed_frames = false;
99+
}
100+
SliceFamily::SP | SliceFamily::SI => {} // Not supported
101+
}
102+
Ok(())
103+
}
104+
}
105+
106+
#[derive(Debug, thiserror::Error)]
107+
pub enum AUSplitterError {
108+
#[error("Missing reference frame")]
109+
MissingReferenceFrame,
110+
111+
#[error("Could not parse H264 chunk: {0}")]
112+
ParserError(#[from] vk_video::parser::h264::H264ParserError),
113+
114+
#[error("Invalid access unit")]
115+
InvalidAccessUnit,
116+
117+
#[error("Unsupported media kind {0:?}")]
118+
UnsupportedMediaKind(MediaKind),
119+
}

smelter-core/src/pipeline/decoder/ffmpeg_h264.rs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{iter, sync::Arc};
22

33
use crate::pipeline::decoder::{
44
VideoDecoder, VideoDecoderInstance,
5+
au_splitter::AUSplitter,
56
ffmpeg_utils::{create_av_packet, from_av_frame},
67
};
78
use crate::prelude::*;
@@ -12,13 +13,14 @@ use ffmpeg_next::{
1213
media::Type,
1314
};
1415
use smelter_render::Frame;
15-
use tracing::{error, info, trace, warn};
16+
use tracing::{debug, error, info, trace, warn};
1617

1718
const TIME_BASE: i32 = 1_000_000;
1819

1920
pub struct FfmpegH264Decoder {
2021
decoder: ffmpeg_next::decoder::Opened,
2122
av_frame: ffmpeg_next::frame::Video,
23+
au_splitter: AUSplitter,
2224
}
2325

2426
impl VideoDecoder for FfmpegH264Decoder {
@@ -44,28 +46,40 @@ impl VideoDecoder for FfmpegH264Decoder {
4446
Ok(Self {
4547
decoder,
4648
av_frame: ffmpeg_next::frame::Video::empty(),
49+
au_splitter: AUSplitter::default(),
4750
})
4851
}
4952
}
5053

5154
impl VideoDecoderInstance for FfmpegH264Decoder {
5255
fn decode(&mut self, chunk: EncodedInputChunk) -> Vec<Frame> {
5356
trace!(?chunk, "H264 decoder received a chunk.");
54-
let av_packet = match create_av_packet(chunk, VideoCodec::H264, TIME_BASE) {
55-
Ok(packet) => packet,
57+
let chunks = match self.au_splitter.put_chunk(chunk) {
58+
Ok(chunks) => chunks,
5659
Err(err) => {
57-
warn!("Dropping frame: {}", err);
60+
debug!("H264 AU splitter could not process the chunks: {err}");
5861
return Vec::new();
5962
}
6063
};
6164

62-
match self.decoder.send_packet(&av_packet) {
63-
Ok(()) => {}
64-
Err(e) => {
65-
warn!("Failed to send a packet to decoder: {:?}", e);
66-
return Vec::new();
65+
for chunk in chunks {
66+
let av_packet = match create_av_packet(chunk, VideoCodec::H264, TIME_BASE) {
67+
Ok(packet) => packet,
68+
Err(err) => {
69+
warn!("Dropping frame: {}", err);
70+
continue;
71+
}
72+
};
73+
74+
match self.decoder.send_packet(&av_packet) {
75+
Ok(()) => {}
76+
Err(e) => {
77+
warn!("Failed to send a packet to decoder: {:?}", e);
78+
continue;
79+
}
6780
}
6881
}
82+
6983
self.read_all_frames()
7084
}
7185

smelter-core/src/pipeline/rtp/depayloader.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub fn new_depayloader(options: DepayloaderOptions) -> Box<dyn Depayloader> {
3939
info!(?options, "Initialize RTP depayloader");
4040
match options {
4141
DepayloaderOptions::H264 => {
42-
BufferedDepayloader::<H264Packet>::new_boxed(MediaKind::Video(VideoCodec::H264))
42+
SimpleDepayloader::<H264Packet>::new_boxed(MediaKind::Video(VideoCodec::H264))
4343
}
4444
DepayloaderOptions::Vp8 => {
4545
BufferedDepayloader::<Vp8Packet>::new_boxed(MediaKind::Video(VideoCodec::Vp8))
@@ -134,6 +134,10 @@ impl<T: Depacketizer + Default + 'static> Depayloader for SimpleDepayloader<T> {
134134
) -> Result<Vec<EncodedInputChunk>, DepayloadingError> {
135135
trace!(?packet, "RTP depayloader received new packet");
136136
let data = self.depayloader.depacketize(&packet.packet.payload)?;
137+
if data.is_empty() {
138+
return Ok(vec![]);
139+
}
140+
137141
let chunk = EncodedInputChunk {
138142
data,
139143
pts: packet.timestamp,

0 commit comments

Comments
 (0)