Skip to content

Commit b1e8ede

Browse files
authored
Make Connector demand before start of stream (#952)
* Make connector demand before sos * Pause auto demand in Connector on start of stream * Update changelog
1 parent 6ac047f commit b1e8ede

File tree

2 files changed

+15
-20
lines changed

2 files changed

+15
-20
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# Changelog
22

33
## Master
4-
* Improve stream format error [#950](https://github.com/membraneframework/membrane_core/pull/950)
4+
* Improve stream format error. [#950](https://github.com/membraneframework/membrane_core/pull/950)
5+
* Minor fixes in `Membrane.Connector`. [#952](https://github.com/membraneframework/membrane_core/pull/952)
56

67
## 1.2.0
78
* Add `:max_instances` option for dynamic pads. [#876](https://github.com/membraneframework/membrane_core/pull/876)

lib/membrane/connector.ex

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,6 @@ defmodule Membrane.Connector do
4242
defguardp flowing?(ctx, state)
4343
when ctx.playback == :playing and state.input != nil and state.output != nil
4444

45-
defguardp input_demand_should_be_paused?(ctx, state)
46-
when ctx.playback == :playing and state.input != nil and state.output == nil
47-
4845
@impl true
4946
def handle_init(_ctx, opts) do
5047
state =
@@ -58,11 +55,13 @@ defmodule Membrane.Connector do
5855
@impl true
5956
def handle_pad_added(Pad.ref(direction, _ref) = pad, ctx, state) do
6057
state = state |> Map.put(direction, pad)
61-
handle_flowing_state_changed(ctx, state)
58+
{maybe_resume, state} = maybe_resume_auto_demand(state)
59+
{maybe_flush, state} = maybe_flush_queue(ctx, state)
60+
{maybe_resume ++ maybe_flush, state}
6261
end
6362

6463
@impl true
65-
def handle_playing(ctx, state), do: handle_flowing_state_changed(ctx, state)
64+
def handle_start_of_stream(_input_pad, ctx, state), do: maybe_pause_auto_demand(ctx, state)
6665

6766
[handle_buffer: :buffer, handle_event: :event, handle_stream_format: :stream_format]
6867
|> Enum.map(fn {callback, action} ->
@@ -95,24 +94,19 @@ defmodule Membrane.Connector do
9594
state |> Map.update!(:queue, &[{action, pad, item} | &1])
9695
end
9796

98-
defp handle_flowing_state_changed(ctx, state) do
99-
{pause_or_resume_demand, state} = manage_input_demand(ctx, state)
100-
{flush_queue_actions, state} = maybe_flush_queue(ctx, state)
101-
{pause_or_resume_demand ++ flush_queue_actions, state}
102-
end
103-
104-
defp manage_input_demand(ctx, %{input_demand_paused?: true} = state)
105-
when not input_demand_should_be_paused?(ctx, state) do
106-
{[resume_auto_demand: state.input], %{state | input_demand_paused?: false}}
97+
defp maybe_pause_auto_demand(ctx, state) do
98+
if state.output == nil and not state.input_demand_paused? and
99+
ctx.pads[state.input].stream_format != nil,
100+
do: {[pause_auto_demand: state.input], %{state | input_demand_paused?: true}},
101+
else: {[], state}
107102
end
108103

109-
defp manage_input_demand(ctx, %{input_demand_paused?: false} = state)
110-
when input_demand_should_be_paused?(ctx, state) do
111-
{[pause_auto_demand: state.input], %{state | input_demand_paused?: true}}
104+
defp maybe_resume_auto_demand(state) do
105+
if state.input_demand_paused? and state.output != nil,
106+
do: {[resume_auto_demand: state.input], %{state | input_demand_paused?: false}},
107+
else: {[], state}
112108
end
113109

114-
defp manage_input_demand(_ctx, state), do: {[], state}
115-
116110
defp maybe_flush_queue(ctx, state) when flowing?(ctx, state) do
117111
actions =
118112
state.queue

0 commit comments

Comments
 (0)