@@ -24,7 +24,7 @@ def __init__(self, pts=None, buffer_duration_ms=5000):
24
24
# A note on locking. The lock is principally to protect outputframe, which is called by
25
25
# the background encoder thread. Applications are going to call things like open_output,
26
26
# close_output, start and stop. These only grab that lock for a short period of time to
27
- # manipulate _output_available , which controls whether outputframe will do anything.
27
+ # manipulate _output , which controls whether outputframe will do anything.
28
28
# THe application API does not have it's own lock, because there doesn't seem to be a
29
29
# need to drive it from different threads (though we could add one if necessary).
30
30
self ._lock = Lock ()
@@ -33,7 +33,6 @@ def __init__(self, pts=None, buffer_duration_ms=5000):
33
33
self ._buffer_duration_ms = buffer_duration_ms
34
34
self ._circular = collections .deque ()
35
35
self ._output = None
36
- self ._output_available = False
37
36
self ._streams = []
38
37
39
38
@property
@@ -52,15 +51,14 @@ def open_output(self, output):
52
51
if self ._output :
53
52
raise RuntimeError ("Underlying output must be closed first" )
54
53
55
- self ._output = output
56
- self ._output .start ()
54
+ output .start ()
57
55
# Some outputs (PyavOutput) may need to know about the encoder's streams.
58
56
for encoder_stream , codec , kwargs in self ._streams :
59
57
output ._add_stream (encoder_stream , codec , ** kwargs )
60
58
61
59
# Now it's OK for the background thread to output frames.
62
60
with self ._lock :
63
- self ._output_available = True
61
+ self ._output = output
64
62
self ._first_frame = True
65
63
66
64
def close_output (self ):
@@ -69,43 +67,39 @@ def close_output(self):
69
67
raise RuntimeError ("No underlying output has been opened" )
70
68
71
69
# After this, we guarantee that the background thread will never use the output.
70
+ output = self ._output
72
71
with self ._lock :
73
- self ._output_available = False
72
+ self ._output = None
74
73
75
- self ._output .stop ()
76
- self ._output = None
74
+ output .stop ()
75
+
76
+ def _flush (self , timestamp_now , output ):
77
+ # Flush out anything that is time-expired compared to timestamp_now.
78
+ # If timestamp_now is None, flush everything.
79
+ while self ._circular and (front := self ._circular [0 ]):
80
+ _ , keyframe , timestamp , _ , audio = front
77
81
78
- def _get_frame (self ):
79
- # Fetch the next frame to be saved to the underlying output.
80
- if not self ._circular :
81
- return
82
- if not self ._first_frame :
83
- return self ._circular .popleft ()
84
- # Must skip ahead to the first I frame if we haven't seen one yet.
85
- while self ._circular :
86
- entry = self ._circular .popleft ()
87
- _ , key_frame , _ , _ , audio = entry
88
- # If there is audio, all audio frames are likely to be keyframes, so we must ignore them when
89
- # deciding when the streams can resume - only the video counts.
90
- if key_frame and not audio :
82
+ if timestamp_now and timestamp_now - timestamp < self .buffer_duration_ms * 1000 :
83
+ break
84
+
85
+ # We need to drop this entry, writing it out if we can.
86
+ self ._circular .popleft ()
87
+
88
+ if keyframe and not audio :
91
89
self ._first_frame = False
92
- return entry
90
+
91
+ if not self ._first_frame and output :
92
+ output .outputframe (* front )
93
93
94
94
def outputframe (self , frame , keyframe = True , timestamp = None , packet = None , audio = False ):
95
95
"""Write frame to circular buffer"""
96
96
with self ._lock :
97
97
if self ._buffer_duration_ms == 0 or not self .recording :
98
98
return
99
- self ._circular .append ((frame , keyframe , timestamp , packet , audio ))
100
- # Discard any expired buffer entries.
101
- while timestamp - self ._circular [0 ][2 ] > self ._buffer_duration_ms * 1000 :
102
- self ._circular .popleft ()
103
99
104
- if self ._output_available :
105
- # Actually write this to the underlying output.
106
- entry = self ._get_frame ()
107
- if entry :
108
- self ._output .outputframe (* entry )
100
+ # Add this new frame to the buffer and flush anything that is now expired.
101
+ self ._circular .append ((frame , keyframe , timestamp , packet , audio ))
102
+ self ._flush (timestamp , self ._output )
109
103
110
104
def start (self ):
111
105
"""Start recording in the circular buffer."""
@@ -116,20 +110,19 @@ def start(self):
116
110
117
111
def stop (self ):
118
112
"""Close file handle and stop recording"""
113
+ output = self ._output
119
114
with self ._lock :
120
115
if not self .recording :
121
116
raise RuntimeError ("Circular output was not started" )
122
117
self ._recording = False
123
- self ._output_available = False
124
-
125
- # Flush out anything remaining in the buffer if the underlying output is still going
126
- # when we stop.
127
- if self ._output :
128
- while (entry := self ._get_frame ()):
129
- self ._output .outputframe (* entry )
130
- self ._output .stop ()
131
118
self ._output = None
132
119
120
+ # At this point the background thread can't be using the circular buffer or the output,
121
+ # so we can flush everything out.
122
+ if output :
123
+ self ._flush (None , output )
124
+ output .stop ()
125
+
133
126
def _add_stream (self , encoder_stream , codec_name , ** kwargs ):
134
127
# Notice the PyavOutput of a stream that will be sending it packets to write out. It will need
135
128
# to forward these whenever a new underlying output is opened.
0 commit comments