diff --git a/apps/desktop/src-tauri/src/lib.rs b/apps/desktop/src-tauri/src/lib.rs index 923db76f23..92b63a2400 100644 --- a/apps/desktop/src-tauri/src/lib.rs +++ b/apps/desktop/src-tauri/src/lib.rs @@ -3046,6 +3046,22 @@ async fn create_editor_instance_impl( ) -> Result, String> { let app = app.clone(); + if RecordingMeta::needs_recovery(&path) { + let pretty_name = path + .file_name() + .and_then(|s| s.to_str()) + .map(|s| s.trim_end_matches(".cap").to_string()) + .unwrap_or_else(|| "Recovered Recording".to_string()); + + tracing::info!("Attempting to recover incomplete recording: {:?}", path); + match RecordingMeta::try_recover(&path, pretty_name) { + Ok(_) => tracing::info!("Successfully recovered recording: {:?}", path), + Err(e) => { + return Err(format!("Failed to recover recording: {e}")); + } + } + } + let instance = { let app = app.clone(); EditorInstance::new( diff --git a/crates/enc-ffmpeg/src/mux/fragmented_mp4.rs b/crates/enc-ffmpeg/src/mux/fragmented_mp4.rs new file mode 100644 index 0000000000..837d3daf0c --- /dev/null +++ b/crates/enc-ffmpeg/src/mux/fragmented_mp4.rs @@ -0,0 +1,169 @@ +use cap_media_info::RawVideoFormat; +use ffmpeg::{format, frame}; +use std::{path::PathBuf, time::Duration}; +use tracing::*; + +use crate::{ + audio::AudioEncoder, + h264, + video::h264::{H264Encoder, H264EncoderError}, +}; + +pub struct FragmentedMP4File { + #[allow(unused)] + tag: &'static str, + output: format::context::Output, + video: H264Encoder, + audio: Option>, + is_finished: bool, +} + +#[derive(thiserror::Error, Debug)] +pub enum InitError { + #[error("{0:?}")] + Ffmpeg(ffmpeg::Error), + #[error("Video/{0}")] + VideoInit(H264EncoderError), + #[error("Audio/{0}")] + AudioInit(Box), +} + +#[derive(thiserror::Error, Debug)] +pub enum FinishError { + #[error("Already finished")] + AlreadyFinished, + #[error("{0}")] + WriteTrailerFailed(ffmpeg::Error), +} + +pub struct FinishResult { + pub video_finish: Result<(), ffmpeg::Error>, + pub audio_finish: Result<(), ffmpeg::Error>, +} + +impl FragmentedMP4File { + pub fn init( + tag: &'static str, + mut output_path: PathBuf, + video: impl FnOnce(&mut format::context::Output) -> Result, + audio: impl FnOnce( + &mut format::context::Output, + ) + -> Option, Box>>, + ) -> Result { + output_path.set_extension("mp4"); + + if let Some(parent) = output_path.parent() { + let _ = std::fs::create_dir_all(parent); + } + + let mut output = format::output(&output_path).map_err(InitError::Ffmpeg)?; + + trace!("Preparing encoders for fragmented mp4 file"); + + let video = video(&mut output).map_err(InitError::VideoInit)?; + let audio = audio(&mut output) + .transpose() + .map_err(InitError::AudioInit)?; + + info!("Prepared encoders for fragmented mp4 file"); + + let mut opts = ffmpeg::Dictionary::new(); + opts.set("movflags", "frag_keyframe+empty_moov+default_base_moof"); + opts.set("frag_duration", "1000000"); + + output.write_header_with(opts).map_err(InitError::Ffmpeg)?; + + Ok(Self { + tag, + output, + video, + audio, + is_finished: false, + }) + } + + pub fn video_format() -> RawVideoFormat { + RawVideoFormat::YUYV420 + } + + pub fn queue_video_frame( + &mut self, + frame: frame::Video, + timestamp: Duration, + ) -> Result<(), h264::QueueFrameError> { + if self.is_finished { + return Ok(()); + } + + self.video.queue_frame(frame, timestamp, &mut self.output) + } + + pub fn queue_audio_frame(&mut self, frame: frame::Audio) { + if self.is_finished { + return; + } + + let Some(audio) = &mut self.audio else { + return; + }; + + audio.send_frame(frame, &mut self.output); + } + + pub fn finish(&mut self) -> Result { + if self.is_finished { + return Err(FinishError::AlreadyFinished); + } + + self.is_finished = true; + + tracing::info!("FragmentedMP4Encoder: Finishing encoding"); + + let video_finish = self.video.flush(&mut self.output).inspect_err(|e| { + error!("Failed to finish video encoder: {e:#}"); + }); + + let audio_finish = self + .audio + .as_mut() + .map(|enc| { + tracing::info!("FragmentedMP4Encoder: Flushing audio encoder"); + enc.flush(&mut self.output).inspect_err(|e| { + error!("Failed to finish audio encoder: {e:#}"); + }) + }) + .unwrap_or(Ok(())); + + tracing::info!("FragmentedMP4Encoder: Writing trailer"); + self.output + .write_trailer() + .map_err(FinishError::WriteTrailerFailed)?; + + Ok(FinishResult { + video_finish, + audio_finish, + }) + } + + pub fn video(&self) -> &H264Encoder { + &self.video + } + + pub fn video_mut(&mut self) -> &mut H264Encoder { + &mut self.video + } +} + +impl Drop for FragmentedMP4File { + fn drop(&mut self) { + let _ = self.finish(); + } +} + +pub struct FragmentedMP4Input { + pub video: frame::Video, + pub audio: Option, +} + +unsafe impl Send for FragmentedMP4File {} diff --git a/crates/enc-ffmpeg/src/mux/mod.rs b/crates/enc-ffmpeg/src/mux/mod.rs index 280593d90f..7a2d248479 100644 --- a/crates/enc-ffmpeg/src/mux/mod.rs +++ b/crates/enc-ffmpeg/src/mux/mod.rs @@ -1,2 +1,3 @@ +pub mod fragmented_mp4; pub mod mp4; pub mod ogg; diff --git a/crates/project/src/cursor.rs b/crates/project/src/cursor.rs index a5a4e542bd..a3f59c083a 100644 --- a/crates/project/src/cursor.rs +++ b/crates/project/src/cursor.rs @@ -75,6 +75,72 @@ impl CursorEvents { serde_json::from_reader(file).map_err(|e| format!("Failed to parse cursor data: {e}")) } + pub fn load_from_stream(path: &Path) -> Result { + use std::io::{BufRead, BufReader}; + + let file = File::open(path).map_err(|e| format!("Failed to open cursor stream: {e}"))?; + let reader = BufReader::new(file); + + let mut all_moves = Vec::new(); + let mut all_clicks = Vec::new(); + + for line in reader.lines() { + let line = line.map_err(|e| format!("Failed to read line: {e}"))?; + if line.trim().is_empty() { + continue; + } + + #[derive(serde::Deserialize)] + struct Batch { + moves: Vec, + clicks: Vec, + } + + match serde_json::from_str::(&line) { + Ok(batch) => { + all_moves.extend(batch.moves); + all_clicks.extend(batch.clicks); + } + Err(e) => { + tracing::warn!("Failed to parse cursor batch: {}", e); + } + } + } + + Ok(Self { + moves: all_moves, + clicks: all_clicks, + }) + } + + pub fn load_with_fallback(dir: &Path) -> Self { + let stream_path = dir.join("cursor-stream.jsonl"); + let json_path = dir.join("cursor.json"); + + if stream_path.exists() { + match Self::load_from_stream(&stream_path) { + Ok(events) if !events.moves.is_empty() || !events.clicks.is_empty() => { + return events; + } + Ok(_) => {} + Err(e) => { + tracing::warn!("Failed to load cursor stream: {}", e); + } + } + } + + if json_path.exists() { + match Self::load_from_file(&json_path) { + Ok(events) => return events, + Err(e) => { + tracing::warn!("Failed to load cursor json: {}", e); + } + } + } + + Self::default() + } + pub fn stabilize_short_lived_cursor_shapes( &mut self, pointer_ids: Option<&HashSet>, diff --git a/crates/project/src/meta.rs b/crates/project/src/meta.rs index 496ac6d5dd..dcb2e6248e 100644 --- a/crates/project/src/meta.rs +++ b/crates/project/src/meta.rs @@ -57,6 +57,9 @@ impl Default for Platform { #[cfg(target_os = "macos")] return Self::MacOS; + + #[cfg(not(any(windows, target_os = "macos")))] + return Self::MacOS; } } @@ -197,6 +200,150 @@ impl RecordingMeta { _ => None, } } + + pub fn needs_recovery(project_path: &Path) -> bool { + let partial_meta_path = project_path.join("recording-meta-partial.json"); + let full_meta_path = project_path.join("recording-meta.json"); + + partial_meta_path.exists() && !full_meta_path.exists() + } + + pub fn try_recover(project_path: &Path, pretty_name: String) -> Result> { + info!("Attempting to recover recording at {:?}", project_path); + + let segments_dir = project_path.join("content").join("segments"); + let cursors_dir = project_path.join("content").join("cursors"); + + if !segments_dir.exists() { + return Err("No segments directory found".into()); + } + + let mut segments = Vec::new(); + let mut segment_dirs: Vec<_> = std::fs::read_dir(&segments_dir)? + .filter_map(|e| e.ok()) + .filter(|e| e.path().is_dir()) + .collect(); + segment_dirs.sort_by_key(|e| e.path()); + + const MIN_VIDEO_SIZE: u64 = 1024; + + let file_is_valid = |path: &Path, min_size: u64| -> bool { + path.exists() + && std::fs::metadata(path) + .map(|m| m.len() >= min_size) + .unwrap_or(false) + }; + + for segment_entry in segment_dirs { + let segment_dir = segment_entry.path(); + + let display_path = segment_dir.join("display.mp4"); + if !file_is_valid(&display_path, MIN_VIDEO_SIZE) { + warn!( + "Skipping segment {:?} - display.mp4 missing or too small", + segment_dir.file_name() + ); + continue; + } + + let camera_path = segment_dir.join("camera.mp4"); + let mic_path = segment_dir.join("audio-input.ogg"); + let system_audio_path = segment_dir.join("system_audio.ogg"); + let cursor_json_path = segment_dir.join("cursor.json"); + let cursor_stream_path = segment_dir.join("cursor-stream.jsonl"); + let has_cursor_data = cursor_json_path.exists() || cursor_stream_path.exists(); + + let relative_base = segment_dir + .strip_prefix(project_path) + .map(|p| RelativePathBuf::from_path(p).ok()) + .ok() + .flatten() + .unwrap_or_else(|| RelativePathBuf::from("content/segments/segment-0")); + + segments.push(MultipleSegment { + display: VideoMeta { + path: relative_base.join("display.mp4"), + fps: 30, + start_time: None, + }, + camera: file_is_valid(&camera_path, MIN_VIDEO_SIZE).then(|| VideoMeta { + path: relative_base.join("camera.mp4"), + fps: 30, + start_time: None, + }), + mic: mic_path.exists().then(|| AudioMeta { + path: relative_base.join("audio-input.ogg"), + start_time: None, + }), + system_audio: system_audio_path.exists().then(|| AudioMeta { + path: relative_base.join("system_audio.ogg"), + start_time: None, + }), + cursor: has_cursor_data.then(|| relative_base.join("cursor.json")), + }); + } + + if segments.is_empty() { + return Err("No valid segments found for recovery".into()); + } + + let segment_count = segments.len(); + + let mut cursor_map = HashMap::new(); + if cursors_dir.exists() { + for entry in std::fs::read_dir(&cursors_dir)?.filter_map(|e| e.ok()) { + let path = entry.path(); + if path.extension().is_some_and(|e| e == "png") { + if let Some(file_stem) = path.file_stem().and_then(|s| s.to_str()) { + if let Some(id) = file_stem.strip_prefix("cursor_") { + let relative_path = RelativePathBuf::from("content/cursors") + .join(entry.file_name().to_string_lossy().as_ref()); + cursor_map.insert( + id.to_string(), + CursorMeta { + image_path: relative_path, + hotspot: XY { x: 0.0, y: 0.0 }, + shape: None, + }, + ); + } + } + } + } + } + + let meta = Self { + platform: Some(Platform::default()), + project_path: project_path.to_path_buf(), + pretty_name, + sharing: None, + inner: RecordingMetaInner::Studio(StudioRecordingMeta::MultipleSegments { + inner: MultipleSegments { + segments, + cursors: Cursors::Correct(cursor_map), + status: Some(StudioRecordingStatus::Complete), + }, + }), + upload: None, + }; + + meta.save_for_project().map_err(|e| match e { + Either::Left(e) => Box::new(e) as Box, + Either::Right(e) => Box::new(e) as Box, + })?; + + let partial_meta_path = project_path.join("recording-meta-partial.json"); + if partial_meta_path.exists() { + let _ = std::fs::remove_file(partial_meta_path); + } + + info!( + "Successfully recovered recording with {} segment(s)", + segment_count + ); + + Ok(meta) + } } #[derive(Debug, Clone, Serialize, Deserialize, Type)] @@ -382,20 +529,22 @@ impl MultipleSegment { } pub fn cursor_events(&self, meta: &RecordingMeta) -> CursorEvents { - let Some(cursor_path) = &self.cursor else { + let segment_dir = self + .cursor + .as_ref() + .and_then(|p| meta.path(p).parent().map(|p| p.to_path_buf())) + .or_else(|| { + self.display + .path + .parent() + .map(|p| meta.project_path.join(p.as_str())) + }); + + let Some(dir) = segment_dir else { return CursorEvents::default(); }; - let full_path = meta.path(cursor_path); - - // Try to load the cursor data - let mut data = match CursorEvents::load_from_file(&full_path) { - Ok(data) => data, - Err(e) => { - eprintln!("Failed to load cursor data: {e}"); - return CursorEvents::default(); - } - }; + let mut data = CursorEvents::load_with_fallback(&dir); let pointer_ids = if let RecordingMetaInner::Studio(studio_meta) = &meta.inner { studio_meta.pointer_cursor_ids() diff --git a/crates/recording/src/capture_pipeline.rs b/crates/recording/src/capture_pipeline.rs index eea5d2d770..76f7d25434 100644 --- a/crates/recording/src/capture_pipeline.rs +++ b/crates/recording/src/capture_pipeline.rs @@ -8,6 +8,9 @@ use anyhow::anyhow; use cap_timestamp::Timestamps; use std::{path::PathBuf, sync::Arc}; +#[cfg(target_os = "macos")] +use crate::output_pipeline::FragmentedMp4MuxerMacOS; + #[cfg(windows)] use std::sync::atomic::{AtomicBool, Ordering}; @@ -68,7 +71,7 @@ impl MakeCapturePipeline for screen_capture::CMSampleBufferCapture { OutputPipeline::builder(output_path.clone()) .with_video::(screen_capture) .with_timestamps(start_time) - .build::(Default::default()) + .build::(()) .await } diff --git a/crates/recording/src/cursor.rs b/crates/recording/src/cursor.rs index 0f618804ec..21fcf684ab 100644 --- a/crates/recording/src/cursor.rs +++ b/crates/recording/src/cursor.rs @@ -3,7 +3,13 @@ use cap_cursor_info::CursorShape; use cap_project::{CursorClickEvent, CursorMoveEvent, XY}; use cap_timestamp::Timestamps; use futures::{FutureExt, future::Shared}; -use std::{collections::HashMap, path::PathBuf}; +use std::{ + collections::HashMap, + fs::{File, OpenOptions}, + io::{BufWriter, Write}, + path::PathBuf, + time::Instant, +}; use tokio::sync::oneshot; use tokio_util::sync::{CancellationToken, DropGuard}; @@ -19,13 +25,20 @@ pub type Cursors = HashMap; #[derive(Clone)] pub struct CursorActorResponse { - // pub cursor_images: HashMap>, pub cursors: Cursors, pub next_cursor_id: u32, pub moves: Vec, pub clicks: Vec, } +#[derive(serde::Serialize, serde::Deserialize)] +struct CursorEventBatch { + moves: Vec, + clicks: Vec, +} + +const CURSOR_FLUSH_INTERVAL_SECS: u64 = 5; + pub struct CursorActor { stop: Option, pub rx: Shared>, @@ -42,6 +55,7 @@ pub fn spawn_cursor_recorder( crop_bounds: CursorCropBounds, display: scap_targets::Display, cursors_dir: PathBuf, + cursor_stream_path: PathBuf, prev_cursors: Cursors, next_cursor_id: u32, start_time: Timestamps, @@ -54,7 +68,7 @@ pub fn spawn_cursor_recorder( pin::pin, time::Duration, }; - use tracing::{error, info}; + use tracing::{error, info, warn}; let stop_token = CancellationToken::new(); let (tx, rx) = oneshot::channel(); @@ -66,7 +80,6 @@ pub fn spawn_cursor_recorder( let mut last_position = cap_cursor_capture::RawCursorPosition::get(); - // Create cursors directory if it doesn't exist std::fs::create_dir_all(&cursors_dir).unwrap(); let mut response = CursorActorResponse { @@ -76,6 +89,36 @@ pub fn spawn_cursor_recorder( clicks: vec![], }; + let mut pending_moves: Vec = vec![]; + let mut pending_clicks: Vec = vec![]; + let mut last_flush = Instant::now(); + + let flush_to_stream = |path: &PathBuf, + moves: &mut Vec, + clicks: &mut Vec| { + if moves.is_empty() && clicks.is_empty() { + return; + } + + let batch = CursorEventBatch { + moves: std::mem::take(moves), + clicks: std::mem::take(clicks), + }; + + match OpenOptions::new().create(true).append(true).open(path) { + Ok(file) => { + let mut writer = BufWriter::new(file); + if let Ok(json) = serde_json::to_string(&batch) { + let _ = writeln!(writer, "{}", json); + let _ = writer.flush(); + } + } + Err(e) => { + warn!("Failed to open cursor stream file: {}", e); + } + } + }; + loop { let sleep = tokio::time::sleep(Duration::from_millis(10)); let Either::Right(_) = @@ -84,6 +127,11 @@ pub fn spawn_cursor_recorder( break; }; + if last_flush.elapsed().as_secs() >= CURSOR_FLUSH_INTERVAL_SECS { + flush_to_stream(&cursor_stream_path, &mut pending_moves, &mut pending_clicks); + last_flush = Instant::now(); + } + let elapsed = start_time.instant().elapsed().as_secs_f64() * 1000.0; let mouse_state = device_state.get_mouse(); @@ -151,6 +199,7 @@ pub fn spawn_cursor_recorder( y, }; + pending_moves.push(mouse_event.clone()); response.moves.push(mouse_event); } @@ -170,12 +219,15 @@ pub fn spawn_cursor_recorder( cursor_id: cursor_id.clone(), time_ms: elapsed, }; + pending_clicks.push(mouse_event.clone()); response.clicks.push(mouse_event); } last_mouse_state = mouse_state; } + flush_to_stream(&cursor_stream_path, &mut pending_moves, &mut pending_clicks); + info!("cursor recorder done"); let _ = tx.send(response); diff --git a/crates/recording/src/output_pipeline/ffmpeg.rs b/crates/recording/src/output_pipeline/ffmpeg.rs index 8453479de1..f820d762d6 100644 --- a/crates/recording/src/output_pipeline/ffmpeg.rs +++ b/crates/recording/src/output_pipeline/ffmpeg.rs @@ -3,7 +3,9 @@ use crate::{ output_pipeline::{AudioFrame, AudioMuxer, Muxer, VideoFrame, VideoMuxer}, }; use anyhow::{Context, anyhow}; -use cap_enc_ffmpeg::{aac::AACEncoder, h264::*, ogg::*, opus::OpusEncoder}; +use cap_enc_ffmpeg::{ + aac::AACEncoder, fragmented_mp4::FragmentedMP4File, h264::*, ogg::*, opus::OpusEncoder, +}; use cap_media_info::{AudioInfo, VideoInfo}; use cap_timestamp::Timestamp; use std::{ @@ -154,3 +156,99 @@ impl AudioMuxer for OggMuxer { Ok(self.0.queue_frame(frame.inner, timestamp)?) } } + +pub struct FragmentedMp4Muxer { + output: ffmpeg::format::context::Output, + video_encoder: Option, + audio_encoder: Option, +} + +impl Muxer for FragmentedMp4Muxer { + type Config = (); + + async fn setup( + _: Self::Config, + output_path: std::path::PathBuf, + video_config: Option, + audio_config: Option, + _: Arc, + _: &mut TaskPool, + ) -> anyhow::Result + where + Self: Sized, + { + let mut output = ffmpeg::format::output(&output_path)?; + + let video_encoder = video_config + .map(|video_config| H264Encoder::builder(video_config).build(&mut output)) + .transpose() + .context("video encoder")?; + + let audio_encoder = audio_config + .map(|config| AACEncoder::init(config, &mut output)) + .transpose() + .context("audio encoder")?; + + let mut opts = ffmpeg::Dictionary::new(); + opts.set("movflags", "frag_keyframe+empty_moov+default_base_moof"); + opts.set("frag_duration", "1000000"); + + output.write_header_with(opts)?; + + Ok(Self { + output, + video_encoder, + audio_encoder, + }) + } + + fn finish(&mut self, _: Duration) -> anyhow::Result> { + let video_result = self + .video_encoder + .as_mut() + .map(|enc| enc.flush(&mut self.output)) + .unwrap_or(Ok(())); + + let audio_result = self + .audio_encoder + .as_mut() + .map(|enc| enc.flush(&mut self.output)) + .unwrap_or(Ok(())); + + self.output.write_trailer().context("write_trailer")?; + + if video_result.is_ok() && audio_result.is_ok() { + return Ok(Ok(())); + } + + Ok(Err(anyhow!( + "Video: {video_result:#?}, Audio: {audio_result:#?}" + ))) + } +} + +impl VideoMuxer for FragmentedMp4Muxer { + type VideoFrame = FFmpegVideoFrame; + + fn send_video_frame( + &mut self, + frame: Self::VideoFrame, + timestamp: Duration, + ) -> anyhow::Result<()> { + if let Some(video_encoder) = self.video_encoder.as_mut() { + video_encoder.queue_frame(frame.inner, timestamp, &mut self.output)?; + } + + Ok(()) + } +} + +impl AudioMuxer for FragmentedMp4Muxer { + fn send_audio_frame(&mut self, frame: AudioFrame, timestamp: Duration) -> anyhow::Result<()> { + if let Some(audio_encoder) = self.audio_encoder.as_mut() { + audio_encoder.send_frame(frame.inner, timestamp, &mut self.output)?; + } + + Ok(()) + } +} diff --git a/crates/recording/src/output_pipeline/macos.rs b/crates/recording/src/output_pipeline/macos.rs index 6c67459db3..967bdde286 100644 --- a/crates/recording/src/output_pipeline/macos.rs +++ b/crates/recording/src/output_pipeline/macos.rs @@ -2,9 +2,11 @@ use crate::{ output_pipeline::{AudioFrame, AudioMuxer, Muxer, TaskPool, VideoMuxer}, sources::screen_capture, }; -use anyhow::anyhow; +use anyhow::{Context, anyhow}; use cap_enc_avfoundation::QueueFrameError; +use cap_enc_ffmpeg::{aac::AACEncoder, h264::H264Encoder}; use cap_media_info::{AudioInfo, VideoInfo}; +use cidre::cv::{self, pixel_buffer::LockFlags}; use std::{ path::PathBuf, sync::{Arc, Mutex, atomic::AtomicBool}, @@ -121,3 +123,187 @@ impl AudioMuxer for AVFoundationMp4Muxer { Ok(()) } } + +pub struct FragmentedMp4MuxerMacOS { + output: ffmpeg::format::context::Output, + video_encoder: Option, + audio_encoder: Option, +} + +impl Muxer for FragmentedMp4MuxerMacOS { + type Config = (); + + async fn setup( + _: Self::Config, + output_path: PathBuf, + video_config: Option, + audio_config: Option, + _pause_flag: Arc, + _tasks: &mut TaskPool, + ) -> anyhow::Result + where + Self: Sized, + { + let mut output = ffmpeg::format::output(&output_path)?; + + let video_encoder = video_config + .map(|video_config| H264Encoder::builder(video_config).build(&mut output)) + .transpose() + .context("video encoder")?; + + let audio_encoder = audio_config + .map(|config| AACEncoder::init(config, &mut output)) + .transpose() + .context("audio encoder")?; + + let mut opts = ffmpeg::Dictionary::new(); + opts.set("movflags", "frag_keyframe+empty_moov+default_base_moof"); + opts.set("frag_duration", "1000000"); + + output.write_header_with(opts)?; + + Ok(Self { + output, + video_encoder, + audio_encoder, + }) + } + + fn finish(&mut self, _: Duration) -> anyhow::Result> { + let video_result = self + .video_encoder + .as_mut() + .map(|enc| enc.flush(&mut self.output)) + .unwrap_or(Ok(())); + + let audio_result = self + .audio_encoder + .as_mut() + .map(|enc| enc.flush(&mut self.output)) + .unwrap_or(Ok(())); + + self.output.write_trailer().context("write_trailer")?; + + if video_result.is_ok() && audio_result.is_ok() { + return Ok(Ok(())); + } + + Ok(Err(anyhow!( + "Video: {video_result:#?}, Audio: {audio_result:#?}" + ))) + } +} + +fn sample_buf_to_ffmpeg(sample_buf: &cidre::cm::SampleBuf) -> anyhow::Result { + let image_buf = sample_buf + .image_buf() + .ok_or_else(|| anyhow!("No image buffer in sample"))?; + + let width = image_buf.width(); + let height = image_buf.height(); + + let mut image_buf_mut = image_buf.retained(); + unsafe { image_buf_mut.lock_base_addr(LockFlags::READ_ONLY) } + .result() + .map_err(|e| anyhow!("Failed to lock base addr: {e:?}"))?; + + let result = match image_buf.pixel_format() { + cv::PixelFormat::_420V => { + let mut ff_frame = + ffmpeg::frame::Video::new(ffmpeg::format::Pixel::NV12, width as u32, height as u32); + + let src_stride = image_buf.plane_bytes_per_row(0); + let dest_stride = ff_frame.stride(0); + let src_bytes = unsafe { + std::slice::from_raw_parts( + image_buf.plane_base_address(0), + src_stride * image_buf.plane_height(0), + ) + }; + let dest_bytes = ff_frame.data_mut(0); + + for y in 0..height { + let row_width = width; + let src_row = &src_bytes[y * src_stride..y * src_stride + row_width]; + let dest_row = &mut dest_bytes[y * dest_stride..y * dest_stride + row_width]; + dest_row.copy_from_slice(src_row); + } + + let src_stride = image_buf.plane_bytes_per_row(1); + let dest_stride = ff_frame.stride(1); + let src_bytes = unsafe { + std::slice::from_raw_parts( + image_buf.plane_base_address(1), + src_stride * image_buf.plane_height(1), + ) + }; + let dest_bytes = ff_frame.data_mut(1); + + for y in 0..height / 2 { + let row_width = width; + let src_row = &src_bytes[y * src_stride..y * src_stride + row_width]; + let dest_row = &mut dest_bytes[y * dest_stride..y * dest_stride + row_width]; + dest_row.copy_from_slice(src_row); + } + + Ok(ff_frame) + } + cv::PixelFormat::_32_BGRA => { + let mut ff_frame = + ffmpeg::frame::Video::new(ffmpeg::format::Pixel::BGRA, width as u32, height as u32); + + let src_stride = image_buf.plane_bytes_per_row(0); + let dest_stride = ff_frame.stride(0); + let src_bytes = unsafe { + std::slice::from_raw_parts( + image_buf.plane_base_address(0), + src_stride * image_buf.plane_height(0), + ) + }; + let dest_bytes = ff_frame.data_mut(0); + + for y in 0..height { + let row_width = width * 4; + let src_row = &src_bytes[y * src_stride..y * src_stride + row_width]; + let dest_row = &mut dest_bytes[y * dest_stride..y * dest_stride + row_width]; + dest_row.copy_from_slice(src_row); + } + + Ok(ff_frame) + } + format => Err(anyhow!("Unsupported pixel format: {:?}", format)), + }; + + unsafe { image_buf_mut.unlock_lock_base_addr(LockFlags::READ_ONLY) } + .result() + .ok(); + + result +} + +impl VideoMuxer for FragmentedMp4MuxerMacOS { + type VideoFrame = screen_capture::VideoFrame; + + fn send_video_frame( + &mut self, + frame: Self::VideoFrame, + timestamp: Duration, + ) -> anyhow::Result<()> { + if let Some(video_encoder) = self.video_encoder.as_mut() { + let ff_frame = sample_buf_to_ffmpeg(&frame.sample_buf)?; + video_encoder.queue_frame(ff_frame, timestamp, &mut self.output)?; + } + + Ok(()) + } +} + +impl AudioMuxer for FragmentedMp4MuxerMacOS { + fn send_audio_frame(&mut self, frame: AudioFrame, timestamp: Duration) -> anyhow::Result<()> { + if let Some(audio_encoder) = self.audio_encoder.as_mut() { + audio_encoder.send_frame(frame.inner, timestamp, &mut self.output)?; + } + + Ok(()) + } +} diff --git a/crates/recording/src/output_pipeline/win.rs b/crates/recording/src/output_pipeline/win.rs index 77edf9bd8c..dd7475dc95 100644 --- a/crates/recording/src/output_pipeline/win.rs +++ b/crates/recording/src/output_pipeline/win.rs @@ -305,7 +305,13 @@ impl Muxer for WindowsMuxer { .await .map_err(|_| anyhow!("Encoder thread ended unexpectedly"))??; - output.lock().unwrap().write_header()?; + { + let mut output_guard = output.lock().unwrap(); + let mut opts = ffmpeg::Dictionary::new(); + opts.set("movflags", "frag_keyframe+empty_moov+default_base_moof"); + opts.set("frag_duration", "1000000"); + output_guard.write_header_with(opts)?; + } Ok(Self { video_tx, diff --git a/crates/recording/src/studio_recording.rs b/crates/recording/src/studio_recording.rs index 79a7b19c27..baf31a2d70 100644 --- a/crates/recording/src/studio_recording.rs +++ b/crates/recording/src/studio_recording.rs @@ -5,7 +5,7 @@ use crate::{ }, cursor::{CursorActor, Cursors, spawn_cursor_recorder}, feeds::{camera::CameraFeedLock, microphone::MicrophoneFeedLock}, - ffmpeg::{Mp4Muxer, OggMuxer}, + ffmpeg::{FragmentedMp4Muxer, OggMuxer}, output_pipeline::{DoneFut, FinishedOutputPipeline, OutputPipeline, PipelineDoneError}, screen_capture::ScreenCaptureConfig, sources::{self, screen_capture}, @@ -381,6 +381,7 @@ impl Pipeline { struct CursorPipeline { output_path: PathBuf, + stream_path: PathBuf, actor: CursorActor, } @@ -512,6 +513,8 @@ async fn spawn_studio_recording_actor( trace!("creating recording actor"); + write_in_progress_meta(&recording_dir)?; + let content_dir = ensure_dir(&recording_dir.join("content"))?; let segments_dir = ensure_dir(&content_dir.join("segments"))?; @@ -580,6 +583,24 @@ pub struct CompletedRecording { pub cursor_data: cap_project::CursorImages, } +fn write_in_progress_meta(recording_dir: &Path) -> anyhow::Result<()> { + use cap_project::*; + + let meta = StudioRecordingMeta::MultipleSegments { + inner: MultipleSegments { + segments: Vec::new(), + cursors: Cursors::default(), + status: Some(StudioRecordingStatus::InProgress), + }, + }; + + let meta_path = recording_dir.join("recording-meta-partial.json"); + let meta_json = serde_json::to_string_pretty(&meta)?; + std::fs::write(&meta_path, meta_json)?; + + Ok(()) +} + async fn stop_recording( recording_dir: PathBuf, segments: Vec, @@ -654,12 +675,15 @@ async fn stop_recording( .write(&recording_dir) .map_err(RecordingError::from)?; + let partial_meta_path = recording_dir.join("recording-meta-partial.json"); + if partial_meta_path.exists() { + let _ = std::fs::remove_file(partial_meta_path); + } + Ok(CompletedRecording { project_path: recording_dir, meta, cursor_data: Default::default(), - // display_source: actor.options.capture_target, - // segments: actor.segments, }) } @@ -828,7 +852,7 @@ async fn create_segment_pipeline( OutputPipeline::builder(dir.join("camera.mp4")) .with_video::(camera_feed) .with_timestamps(start_time) - .build::(()) + .build::(()) .instrument(error_span!("camera-out")) })) .await @@ -868,6 +892,7 @@ async fn create_segment_pipeline( cursor_crop_bounds, display, cursors_dir.to_path_buf(), + dir.join("cursor-stream.jsonl"), prev_cursors, next_cursors_id, start_time, @@ -875,6 +900,7 @@ async fn create_segment_pipeline( Ok::<_, CreateSegmentPipelineError>(CursorPipeline { output_path: dir.join("cursor.json"), + stream_path: dir.join("cursor-stream.jsonl"), actor: cursor, }) })