Skip to content

Commit 600c0cb

Browse files
committed
WIP test recovery from BrokenPipeError
1 parent 2564fe7 commit 600c0cb

File tree

5 files changed

+117
-3
lines changed

5 files changed

+117
-3
lines changed

python_packages/jupyter_lsp/jupyter_lsp/manager.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ def init_language_servers(self) -> None:
111111
# copy the language servers before anybody monkeys with them
112112
language_servers_from_config = dict(self.language_servers)
113113
language_servers_from_config.update(self.conf_d_language_servers)
114+
self.log.error(language_servers_from_config)
114115

115116
if self.autodetect:
116117
language_servers.update(self._autodetect_language_servers())
@@ -215,6 +216,7 @@ def _autodetect_language_servers(self):
215216

216217
try:
217218
entry_points = entrypoints.get_group_named(EP_SPEC_V1)
219+
self.log.debug(f"{entry_points}")
218220
except Exception: # pragma: no cover
219221
self.log.exception("Failed to load entry_points")
220222

python_packages/jupyter_lsp/jupyter_lsp/session.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import string
77
import subprocess
88
from copy import copy
9-
from datetime import datetime, timezone
9+
from datetime import datetime, timedelta, timezone
1010

1111
from tornado.ioloop import IOLoop
1212
from tornado.queues import Queue
@@ -49,8 +49,12 @@ class LanguageServerSession(LoggingConfigurable):
4949
status = UseEnum(SessionStatus, default_value=SessionStatus.NOT_STARTED)
5050
last_handler_message_at = Instance(datetime, allow_none=True)
5151
last_server_message_at = Instance(datetime, allow_none=True)
52+
allow_server_failure_not_more_often_than = Instance(
53+
timedelta, allow_none=False, default_value=timedelta(minutes=20)
54+
)
5255

5356
_tasks = None
57+
_last_failure = None
5458

5559
_skip_serialize = ["argv", "debug_argv"]
5660

@@ -169,7 +173,12 @@ async def _read_lsp(self):
169173
await self.reader.read()
170174

171175
async def _write_lsp(self):
172-
await self.writer.write()
176+
task = self.writer.write()
177+
results = await asyncio.gather(task, return_exceptions=True)
178+
for result in results:
179+
if isinstance(result, BrokenPipeError):
180+
self._handle_server_failure(result)
181+
return results
173182

174183
async def _broadcast_from_lsp(self):
175184
"""loop for reading messages from the queue of messages from the language
@@ -179,3 +188,34 @@ async def _broadcast_from_lsp(self):
179188
self.last_server_message_at = self.now()
180189
await self.parent.on_server_message(message, self)
181190
self.from_lsp.task_done()
191+
192+
def _handle_server_failure(self, error):
193+
description: str
194+
action: str
195+
now = datetime.now()
196+
197+
allowed = self.allow_server_failure_not_more_often_than
198+
if self._last_failure and now - self._last_failure > allowed:
199+
delta = now - self._last_failure
200+
description = (
201+
f"giving up as the previous failure was {delta} ago"
202+
f" which is less than te minimum allowed interval ({allowed})"
203+
)
204+
action = "raise"
205+
else:
206+
action = "restart"
207+
description = "restarting session..."
208+
209+
text = (
210+
f"Encountered {self.language_server} language server failure;"
211+
f" {description}"
212+
f" (exception: {error})"
213+
f" (faulty process: {self.process})"
214+
)
215+
self.log.warning(text)
216+
217+
if action == "raise":
218+
raise
219+
elif action == "restart":
220+
self.stop()
221+
self.initialize()

python_packages/jupyter_lsp/jupyter_lsp/stdio.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ async def _read_content(
139139
if len(raw) != length: # pragma: no cover
140140
self.log.warning(
141141
f"Readout and content-length mismatch: {len(raw)} vs {length};"
142-
f"remaining empties: {max_empties}; remaining parts: {max_parts}"
142+
f" remaining empties: {max_empties}; remaining parts: {max_parts}"
143143
)
144144

145145
return raw
@@ -191,7 +191,12 @@ async def write(self) -> None:
191191
body = message.encode("utf-8")
192192
response = "Content-Length: {}\r\n\r\n{}".format(len(body), message)
193193
await convert_yielded(self._write_one(response.encode("utf-8")))
194+
except BrokenPipeError:
195+
self.queue.task_done()
196+
# propagate broken pipe errors
197+
raise
194198
except Exception: # pragma: no cover
199+
# catch other (hopefully mild) exceptions
195200
self.log.exception("%s couldn't write message: %s", self, response)
196201
finally:
197202
self.queue.task_done()

python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,25 @@ def jsonrpc_init_msg():
109109
)
110110

111111

112+
@fixture
113+
def did_open_message():
114+
return json.dumps(
115+
{
116+
"id": 0,
117+
"jsonrpc": "2.0",
118+
"method": "textDocument/didOpen",
119+
"params": {
120+
"textDocument": {
121+
"uri": pathlib.Path(__file__).as_uri(),
122+
"languageId": "python",
123+
"version": 0,
124+
"text": "",
125+
}
126+
},
127+
}
128+
)
129+
130+
112131
@fixture
113132
def app():
114133
return MockServerApp()

python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
import asyncio
2+
import logging
3+
import subprocess
24

35
import pytest
46

7+
from ..handlers import LanguageServerWebSocketHandler
58
from ..schema import SERVERS_RESPONSE
9+
from ..session import LanguageServerSession
610

711

812
def assert_status_set(handler, expected_statuses, language_server=None):
@@ -100,3 +104,47 @@ async def test_ping(handlers):
100104
assert ws_handler._ping_sent is True
101105

102106
ws_handler.on_close()
107+
108+
109+
@pytest.mark.asyncio
110+
async def test_broken_pipe(handlers, jsonrpc_init_msg, did_open_message, caplog):
111+
"""If the pipe breaks (i.e. server dies), can we recover by restarting the server?"""
112+
a_server = "pyls"
113+
114+
# use real handler in this test rather than a mock -> testing broken pipe requires that here
115+
handler, ws_handler = handlers
116+
manager = handler.manager
117+
118+
manager.initialize()
119+
120+
assert_status_set(handler, {"not_started"}, a_server)
121+
122+
ws_handler.open(a_server)
123+
124+
await ws_handler.on_message(jsonrpc_init_msg)
125+
assert_status_set(handler, {"started"}, a_server)
126+
127+
session: LanguageServerSession = manager.sessions[a_server]
128+
process: subprocess.Popen = session.process
129+
process.kill()
130+
131+
with caplog.at_level(logging.WARNING):
132+
# an attempt to write should raise BrokenPipeError
133+
await ws_handler.on_message(did_open_message)
134+
await asyncio.sleep(1)
135+
136+
# which should be caught
137+
assert "Encountered pyls language server failure" in caplog.text
138+
assert "exception: [Errno 32] Broken pipe" in caplog.text
139+
140+
# and the server should get restarted
141+
assert "restarting session..." in caplog.text
142+
143+
assert_status_set(handler, {"started"}, a_server)
144+
145+
with caplog.at_level(logging.WARNING):
146+
# we should be able to send a message now
147+
await ws_handler.on_message(did_open_message)
148+
assert caplog.text == ""
149+
150+
ws_handler.on_close()

0 commit comments

Comments
 (0)