Skip to content

Commit 48a8cbc

Browse files
committed
WIP test recovery from BrokenPipeError
1 parent 2564fe7 commit 48a8cbc

File tree

4 files changed

+116
-3
lines changed

4 files changed

+116
-3
lines changed

python_packages/jupyter_lsp/jupyter_lsp/session.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import string
77
import subprocess
88
from copy import copy
9-
from datetime import datetime, timezone
9+
from datetime import datetime, timedelta, timezone
1010

1111
from tornado.ioloop import IOLoop
1212
from tornado.queues import Queue
@@ -49,8 +49,12 @@ class LanguageServerSession(LoggingConfigurable):
4949
status = UseEnum(SessionStatus, default_value=SessionStatus.NOT_STARTED)
5050
last_handler_message_at = Instance(datetime, allow_none=True)
5151
last_server_message_at = Instance(datetime, allow_none=True)
52+
allow_server_failure_not_more_often_than = Instance(
53+
timedelta, allow_none=False, default_value=timedelta(minutes=20)
54+
)
5255

5356
_tasks = None
57+
_last_failure = None
5458

5559
_skip_serialize = ["argv", "debug_argv"]
5660

@@ -169,7 +173,12 @@ async def _read_lsp(self):
169173
await self.reader.read()
170174

171175
async def _write_lsp(self):
172-
await self.writer.write()
176+
task = self.writer.write()
177+
results = await asyncio.gather(task, return_exceptions=True)
178+
for result in results:
179+
if isinstance(result, BrokenPipeError):
180+
self._handle_server_failure(result)
181+
return results
173182

174183
async def _broadcast_from_lsp(self):
175184
"""loop for reading messages from the queue of messages from the language
@@ -179,3 +188,34 @@ async def _broadcast_from_lsp(self):
179188
self.last_server_message_at = self.now()
180189
await self.parent.on_server_message(message, self)
181190
self.from_lsp.task_done()
191+
192+
def _handle_server_failure(self, error):
193+
description: str
194+
action: str
195+
now = datetime.now()
196+
197+
allowed = self.allow_server_failure_not_more_often_than
198+
if self._last_failure and now - self._last_failure > allowed:
199+
delta = now - self._last_failure
200+
description = (
201+
f"giving up as the previous failure was {delta} ago"
202+
f" which is less than te minimum allowed interval ({allowed})"
203+
)
204+
action = "raise"
205+
else:
206+
action = "restart"
207+
description = "restarting session..."
208+
209+
text = (
210+
f"Encountered {self.language_server} language server failure;"
211+
f" {description}"
212+
f" (exception: {error})"
213+
f" (faulty process: {self.process})"
214+
)
215+
self.log.warning(text)
216+
217+
if action == "raise":
218+
raise
219+
elif action == "restart":
220+
self.stop()
221+
self.initialize()

python_packages/jupyter_lsp/jupyter_lsp/stdio.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ async def _read_content(
139139
if len(raw) != length: # pragma: no cover
140140
self.log.warning(
141141
f"Readout and content-length mismatch: {len(raw)} vs {length};"
142-
f"remaining empties: {max_empties}; remaining parts: {max_parts}"
142+
f" remaining empties: {max_empties}; remaining parts: {max_parts}"
143143
)
144144

145145
return raw
@@ -191,7 +191,12 @@ async def write(self) -> None:
191191
body = message.encode("utf-8")
192192
response = "Content-Length: {}\r\n\r\n{}".format(len(body), message)
193193
await convert_yielded(self._write_one(response.encode("utf-8")))
194+
except BrokenPipeError:
195+
self.queue.task_done()
196+
# propagate broken pipe errors
197+
raise
194198
except Exception: # pragma: no cover
199+
# catch other (hopefully mild) exceptions
195200
self.log.exception("%s couldn't write message: %s", self, response)
196201
finally:
197202
self.queue.task_done()

python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,25 @@ def jsonrpc_init_msg():
109109
)
110110

111111

112+
@fixture
113+
def did_open_message():
114+
return json.dumps(
115+
{
116+
"id": 0,
117+
"jsonrpc": "2.0",
118+
"method": "textDocument/didOpen",
119+
"params": {
120+
"textDocument": {
121+
"uri": pathlib.Path(__file__).as_uri(),
122+
"languageId": "python",
123+
"version": 0,
124+
"text": "",
125+
}
126+
},
127+
}
128+
)
129+
130+
112131
@fixture
113132
def app():
114133
return MockServerApp()

python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
import asyncio
2+
import logging
3+
import subprocess
24

35
import pytest
46

7+
from ..handlers import LanguageServerWebSocketHandler
58
from ..schema import SERVERS_RESPONSE
9+
from ..session import LanguageServerSession
610

711

812
def assert_status_set(handler, expected_statuses, language_server=None):
@@ -100,3 +104,48 @@ async def test_ping(handlers):
100104
assert ws_handler._ping_sent is True
101105

102106
ws_handler.on_close()
107+
108+
109+
@pytest.mark.asyncio
110+
async def test_broken_pipe(handlers, jsonrpc_init_msg, did_open_message, caplog):
111+
"""If the pipe breaks (server dies), can we recover by restarting the server?"""
112+
a_server = "pyls"
113+
114+
# use real handler in this test rather than a mock
115+
# -> testing broken pipe requires that here
116+
handler, ws_handler = handlers
117+
manager = handler.manager
118+
119+
manager.initialize()
120+
121+
assert_status_set(handler, {"not_started"}, a_server)
122+
123+
ws_handler.open(a_server)
124+
125+
await ws_handler.on_message(jsonrpc_init_msg)
126+
assert_status_set(handler, {"started"}, a_server)
127+
128+
session: LanguageServerSession = manager.sessions[a_server]
129+
process: subprocess.Popen = session.process
130+
process.kill()
131+
132+
with caplog.at_level(logging.WARNING):
133+
# an attempt to write should raise BrokenPipeError
134+
await ws_handler.on_message(did_open_message)
135+
await asyncio.sleep(1)
136+
137+
# which should be caught
138+
assert "Encountered pyls language server failure" in caplog.text
139+
assert "exception: [Errno 32] Broken pipe" in caplog.text
140+
141+
# and the server should get restarted
142+
assert "restarting session..." in caplog.text
143+
144+
assert_status_set(handler, {"started"}, a_server)
145+
146+
with caplog.at_level(logging.WARNING):
147+
# we should be able to send a message now
148+
await ws_handler.on_message(did_open_message)
149+
assert caplog.text == ""
150+
151+
ws_handler.on_close()

0 commit comments

Comments
 (0)