Skip to content
191 changes: 181 additions & 10 deletions src/screencast_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,28 @@ use pipewire::{
stream::{StreamRef, StreamState},
sys::pw_buffer,
};
use std::{collections::HashMap, ffi::c_void, io, iter, os::fd::IntoRawFd, slice};
use spa_sys::{spa_format_video_raw_parse, spa_video_info_raw};

use crate::{
buffer,
wayland::{CaptureSource, DmabufHelper, Session, WaylandHelper},
};
use std::{
collections::HashMap,
ffi::c_void,
io, iter,
os::fd::IntoRawFd,
slice,
time::{Duration, Instant},
};
use tokio::sync::oneshot;
use wayland_client::{
protocol::{wl_buffer, wl_output, wl_shm},
WEnum,
};

use crate::{
buffer,
wayland::{CaptureSource, DmabufHelper, Session, WaylandHelper},
};
const TIMESPEC_NSEC_PER_SEC: u32 = 1_000_000_000;
const FPS_MEASURE_PERIOD_SEC: f64 = 5.;

pub struct ScreencastThread {
node_id: u32,
Expand All @@ -40,6 +51,7 @@ impl ScreencastThread {
) -> anyhow::Result<Self> {
let (tx, rx) = oneshot::channel();
let (thread_stop_tx, thread_stop_rx) = pipewire::channel::channel::<()>();

std::thread::spawn(move || {
match start_stream(wayland_helper, capture_source, overlay_cursor) {
Ok((loop_, _stream, _listener, _context, node_id_rx)) => {
Expand All @@ -53,6 +65,7 @@ impl ScreencastThread {
Err(err) => tx.send(Err(err)).unwrap(),
}
});

Ok(Self {
// XXX can second unwrap fail?
node_id: rx.await.unwrap()?.await.unwrap()?,
Expand All @@ -68,6 +81,95 @@ impl ScreencastThread {
let _ = self.thread_stop_tx.send(());
}
}
struct FPSLimit {
frame_last_time: Instant,
fps_last_time: Instant,
fps_frame_count: u32,
delay_before_capture_frame_ns: u64,
delay_til_next_frame_ns: u64,
accumulated_frame_debt_ns: u64,
}

impl FPSLimit {
fn new() -> Self {
Self {
frame_last_time: Instant::now(),
fps_last_time: Instant::now(),
fps_frame_count: 0,
delay_before_capture_frame_ns: 0,
delay_til_next_frame_ns: 0,
accumulated_frame_debt_ns: 0,
}
}

fn fps_limit_measure_start(&mut self, max_fps: u32) {
if max_fps <= 0 {
return;
}

self.frame_last_time = Instant::now();
}

fn measure_fps(&mut self) {
let now = Instant::now();
self.fps_frame_count += 1;
let elapsed_sec = (now - self.fps_last_time).as_secs_f64();

if elapsed_sec < FPS_MEASURE_PERIOD_SEC {
return;
}
let avg_frames_per_sec = self.fps_frame_count as f64 / elapsed_sec;

log::info!(
"fps_limit: average FPS in the last {:.2} seconds: {:.2}",
elapsed_sec,
avg_frames_per_sec
);
self.fps_frame_count = 0;
self.fps_last_time = now;
}

fn fps_limit_measure_end(&mut self, max_fps: u32) {
if max_fps <= 0 {
self.delay_before_capture_frame_ns = 0;
self.delay_til_next_frame_ns = 0;
self.accumulated_frame_debt_ns = 0;
return;
}
self.measure_fps();

let elapsed_ns = self.frame_last_time.elapsed().as_nanos();
let target_ns = (TIMESPEC_NSEC_PER_SEC / max_fps) as u128;

// Wait for half of the target frame rate duration before requesting a frame capture.
self.delay_before_capture_frame_ns = (target_ns / 2) as u64;

// Throttle after the current frame has been captured:
let total_elapsed_ns = elapsed_ns + self.accumulated_frame_debt_ns as u128;
if target_ns > total_elapsed_ns {
// If it is before the next frame capture time -> wait for the right time.
self.delay_til_next_frame_ns = (target_ns - total_elapsed_ns) as u64;
} else {
// If it is after the next frame capture time, Set value of `delay_til_next_frame_ns` to 0 and increase value of `accumulated_frame_debt_ns` by the amount of time it has been delayed.
self.delay_til_next_frame_ns = 0;
self.accumulated_frame_debt_ns = target_ns.abs_diff(total_elapsed_ns) as u64;
}

// Set `delay_before_capture_frame_ns` to its current value minus the overrun time, if any.
if self.accumulated_frame_debt_ns > self.delay_before_capture_frame_ns {
self.accumulated_frame_debt_ns -= self.delay_before_capture_frame_ns;
self.delay_before_capture_frame_ns = 0;
} else {
self.delay_before_capture_frame_ns -= self.accumulated_frame_debt_ns;
self.accumulated_frame_debt_ns = 0;
}

// Reset at the end of each capture cycle, this helps prevent `accumulated_frame_debt_ns` from increasing indefinitely.
if self.fps_frame_count % max_fps == 0 {
self.accumulated_frame_debt_ns = 0;
}
}
}

struct StreamData {
dmabuf_helper: Option<DmabufHelper>,
Expand All @@ -78,6 +180,10 @@ struct StreamData {
height: u32,
node_id_tx: Option<oneshot::Sender<Result<u32, anyhow::Error>>>,
buffer_damage: HashMap<wl_buffer::WlBuffer, Vec<Rect>>,
// fps limit
framerate: u32,
fps_limit: FPSLimit,
fps_max: u32,
}

impl StreamData {
Expand Down Expand Up @@ -146,6 +252,23 @@ impl StreamData {
if let Some(pod) = pod {
let value = PodDeserializer::deserialize_from::<pod::Value>(pod.as_bytes());
if let Ok((_, pod::Value::Object(object))) = &value {
let pwr_format: spa_video_info_raw = unsafe {
let mut pwr_format = std::mem::MaybeUninit::<spa_video_info_raw>::uninit();
spa_format_video_raw_parse(
pod.as_raw_ptr() as *const _,
pwr_format.as_mut_ptr(),
);
pwr_format.assume_init()
};
if pwr_format.max_framerate.denom != 0 {
let framerate = pwr_format.max_framerate.num / pwr_format.max_framerate.denom;
self.framerate = if framerate > self.fps_max {
self.fps_max
} else {
framerate
};
}

if let Some(modifier_prop) = object
.properties
.iter()
Expand Down Expand Up @@ -177,6 +300,7 @@ impl StreamData {
let params = params(
self.width,
self.height,
self.framerate,
plane_count,
self.dmabuf_helper.as_ref(),
Some(modifier),
Expand Down Expand Up @@ -273,6 +397,16 @@ impl StreamData {
fn process(&mut self, stream: &StreamRef) {
let buffer = unsafe { stream.dequeue_raw_buffer() };
if !buffer.is_null() {
self.fps_limit.fps_limit_measure_start(self.framerate);
if self.fps_limit.delay_before_capture_frame_ns != 0 {
// log::info!(
// "fps_limit: wait {}ns before capture frame",
// self.fps_limit.delay_before_capture_frame_ns
// );
std::thread::sleep(Duration::from_nanos(
self.fps_limit.delay_before_capture_frame_ns,
));
}
let wl_buffer = unsafe { &*((*buffer).user_data as *const wl_buffer::WlBuffer) };
let full_damage = &[Rect {
x: 0,
Expand All @@ -285,6 +419,7 @@ impl StreamData {
.get(wl_buffer)
.map(Vec::as_slice)
.unwrap_or(full_damage);

match block_on(self.session.capture_wl_buffer(wl_buffer, damage)) {
Ok(frame) => {
self.buffer_damage
Expand All @@ -311,6 +446,15 @@ impl StreamData {
}
}
unsafe { stream.queue_raw_buffer(buffer) };
self.fps_limit.fps_limit_measure_end(self.framerate);

if self.fps_limit.delay_til_next_frame_ns != 0 {
// log::info!(
// "fps_limit: wait {}ns til next frame",
// self.fps_limit.delay_til_next_frame_ns
// );
std::thread::sleep(Duration::from_nanos(self.fps_limit.delay_til_next_frame_ns));
}
}
}
}
Expand All @@ -335,6 +479,8 @@ fn start_stream(

let (node_id_tx, node_id_rx) = oneshot::channel();

let framerate = 0; // default not limit the frame rate.

let session = wayland_helper.capture_source_session(capture_source, overlay_cursor);

let Some((width, height)) = block_on(session.wait_for_formats(|formats| formats.buffer_size))
Expand All @@ -355,7 +501,7 @@ fn start_stream(
},
)?;

let initial_params = params(width, height, 1, dmabuf_helper.as_ref(), None);
let initial_params = params(width, height, framerate, 1, dmabuf_helper.as_ref(), None);
let mut initial_params: Vec<_> = initial_params
.iter()
.map(|x| Pod::from_bytes(x.as_slice()).unwrap())
Expand All @@ -380,6 +526,9 @@ fn start_stream(
height,
node_id_tx: Some(node_id_tx),
buffer_damage: HashMap::new(),
framerate,
fps_limit: FPSLimit::new(),
fps_max: 120, // XXX can read from config?
};

let listener = stream
Expand Down Expand Up @@ -444,16 +593,17 @@ fn meta() -> Vec<u8> {
fn params(
width: u32,
height: u32,
framerate: u32,
blocks: u32,
dmabuf: Option<&DmabufHelper>,
fixated_modifier: Option<gbm::Modifier>,
) -> Vec<Vec<u8>> {
[
Some(buffers(width, height, blocks)),
fixated_modifier.map(|x| format(width, height, None, Some(x))),
fixated_modifier.map(|x| format(width, height, framerate, None, Some(x))),
// Favor dmabuf over shm by listing it first
dmabuf.map(|x| format(width, height, Some(x), None)),
Some(format(width, height, None, None)),
dmabuf.map(|x| format(width, height, framerate, Some(x), None)),
Some(format(width, height, framerate, None, None)),
Some(meta()),
]
.into_iter()
Expand Down Expand Up @@ -524,6 +674,7 @@ fn buffers(width: u32, height: u32, blocks: u32) -> Vec<u8> {
fn format(
width: u32,
height: u32,
framerate: u32,
dmabuf: Option<&DmabufHelper>,
fixated_modifier: Option<gbm::Modifier>,
) -> Vec<u8> {
Expand Down Expand Up @@ -551,10 +702,30 @@ fn format(
pod::Property {
key: spa_sys::SPA_FORMAT_VIDEO_framerate,
flags: pod::PropertyFlags::empty(),
value: pod::Value::Fraction(spa::utils::Fraction { num: 60, denom: 1 }),
value: pod::Value::Fraction(spa::utils::Fraction { num: 0, denom: 1 }),
},
// TODO max framerate
];
if framerate > 0 {
properties.push(pod::Property {
key: spa_sys::SPA_FORMAT_VIDEO_maxFramerate,
flags: pod::PropertyFlags::empty(),
value: pod::Value::Choice(pod::ChoiceValue::Fraction(spa::utils::Choice(
spa::utils::ChoiceFlags::empty(),
spa::utils::ChoiceEnum::Range {
default: spa::utils::Fraction {
num: framerate,
denom: 1,
},
min: spa::utils::Fraction { num: 1, denom: 1 },
max: spa::utils::Fraction {
num: framerate,
denom: 1,
},
},
))),
});
}
if let Some(modifier) = fixated_modifier {
properties.push(pod::Property {
key: spa_sys::SPA_FORMAT_VIDEO_modifier,
Expand Down