@@ -34,6 +34,7 @@ internal class ChromeDPConnection(
34
34
private val frames = webSocket.incoming.receiveAsFlow()
35
35
.filterIsInstance<Frame .Text >()
36
36
.map { frame -> chromeDpJson.decodeFromString(InboundFrameSerializer , frame.readText()) }
37
+ .materializeErrors()
37
38
.shareIn(
38
39
scope = coroutineScope,
39
40
started = SharingStarted .Eagerly ,
@@ -47,6 +48,7 @@ internal class ChromeDPConnection(
47
48
*/
48
49
suspend fun request (request : RequestFrame ): ResponseFrame {
49
50
val resultFrame = frames.onSubscription { sendOrFailUniformly(request) }
51
+ .dematerializeErrors()
50
52
.filterIsInstance<ResultFrame >()
51
53
.filter { it.matchesRequest(request) }
52
54
.first() // a shared flow never completes, so this will never throw NoSuchElementException (but can hang forever)
@@ -76,7 +78,7 @@ internal class ChromeDPConnection(
76
78
/* *
77
79
* A flow of incoming events.
78
80
*/
79
- fun events () = frames.filterIsInstance<EventFrame >()
81
+ fun events () = frames.dematerializeErrors(). filterIsInstance<EventFrame >()
80
82
81
83
/* *
82
84
* Stops listening to incoming events and closes the underlying web socket connection.
@@ -87,6 +89,17 @@ internal class ChromeDPConnection(
87
89
}
88
90
}
89
91
92
+ private fun Flow<InboundFrame>.materializeErrors (): Flow <InboundFrameOrError > =
93
+ catch <InboundFrameOrError > { emit(InboundFramesConnectionError (cause = it)) }
94
+
95
+ private fun Flow<InboundFrameOrError>.dematerializeErrors (): Flow <InboundFrame > =
96
+ map {
97
+ when (it) {
98
+ is InboundFramesConnectionError -> throw it.cause
99
+ is InboundFrame -> it
100
+ }
101
+ }
102
+
90
103
/* *
91
104
* An exception thrown when an error occurred during the processing of a request on Chrome side.
92
105
*/
0 commit comments