From c2424350e3242fffc325768166e2d9b234ce3455 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Fri, 16 Dec 2022 14:32:13 +0200 Subject: [PATCH 01/14] rsocket-java - initial guide structure copied from python --- content-docs/guides/rsocket-java/index.mdx | 51 +++ .../guides/rsocket-java/tutorial/00-base.mdx | 130 ++++++++ .../tutorial/01-request_routing.mdx | 130 ++++++++ .../rsocket-java/tutorial/02-user_session.mdx | 132 ++++++++ .../rsocket-java/tutorial/03-messages.mdx | 293 ++++++++++++++++++ .../guides/rsocket-java/tutorial/index.mdx | 39 +++ sidebar-rsocket-java.js | 13 + sidebars.js | 3 +- src/css/customTheme.css | 9 +- 9 files changed, 796 insertions(+), 4 deletions(-) create mode 100644 content-docs/guides/rsocket-java/index.mdx create mode 100644 content-docs/guides/rsocket-java/tutorial/00-base.mdx create mode 100644 content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx create mode 100644 content-docs/guides/rsocket-java/tutorial/02-user_session.mdx create mode 100644 content-docs/guides/rsocket-java/tutorial/03-messages.mdx create mode 100644 content-docs/guides/rsocket-java/tutorial/index.mdx create mode 100644 sidebar-rsocket-java.js diff --git a/content-docs/guides/rsocket-java/index.mdx b/content-docs/guides/rsocket-java/index.mdx new file mode 100644 index 00000000..a2fdfa48 --- /dev/null +++ b/content-docs/guides/rsocket-java/index.mdx @@ -0,0 +1,51 @@ +--- +slug: /guides/rsocket-java +title: rsocket-java +sidebar_label: Introduction +--- + +:::caution +The python package API is not stable. There may be changes until version 1.0.0. +::: + +The python `rsocket` package implements the 1.0 version of the [RSocket protocol](/about/protocol) +(excluding "resume" functionality) and is designed for use in python >= 3.8 using asyncio. + +## Guides + +See [Quick Start](/guides/rsocket-py/simple) for a short getting started guide, and [Tutorial](/guides/rsocket-py/tutorial) for a more in depth +step by step construction of an application. + +Use the [Command-line tool](/guides/rsocket-py/cli) to quicly interact with an RSocket server without writing code. + +Other code snippets examples for [Client](/guides/rsocket-py/client), [Server](/guides/rsocket-py/server) +and [RxPy](/guides/rsocket-py/rxpy) integration are also available. + +API Documentation (Under construction) is available at [ReadTheDocs](https://rsocket.readthedocs.io/) + +## Installing + +A pip package is available when installing with +'pip install rsocket' ([rsocket](https://pypi.org/project/rsocket/)) + +Optionally, install using some extras: + +- rx: RxPy3 client +- reactivex: RxPy4 client +- aiohttp: Websocket server/client transport for aiohttp framework +- quart: Websocket server transport for quart framework +- quic: QUIC/HTTP3(wss) support +- cli: Command line interface + +## Status + +The following are currently implemented: + +- RSocketClient / RSocketServer +- Transports: +- TCP +- QUIC/HTTP3(wss) +- Websocket (aiohttp (server/client), quart (server) +- Simple load balancing +- Minimal integration with RxPy (>= 3.x) and reactivex +- Command line interface diff --git a/content-docs/guides/rsocket-java/tutorial/00-base.mdx b/content-docs/guides/rsocket-java/tutorial/00-base.mdx new file mode 100644 index 00000000..bf6c663d --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/00-base.mdx @@ -0,0 +1,130 @@ +--- +slug: /guides/rsocket-java/tutorial/base +title: Getting started +sidebar_label: Getting started +--- + +## Application structure + +In this step we will set up a minimal code required for both the server and the client. + +The application will be composed of: +- Server side +- Client side +- Shared code + +See resulting code on [GitHub](https://github.com/rsocket/rsocket-java/tree/master/examples/tutorial/step0) + +## Server side + +We will set up a simple server to accept connections and respond to the client sending the user's name. +The server will listen on TCP port 6565. + +Below is the code for the ServerApplication class: + +```java +package io.rsocket.guide.step0; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.core.RSocketServer; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import reactor.core.publisher.Mono; + +import java.util.Objects; + +public class ServerApplication { + + public static void main(String[] args) { + final var rSocketServer = RSocketServer.create() + .acceptor((setup, sendingSocket) -> Mono.just(new RSocket() { + public Mono requestResponse(String route, Payload payload) { + return Mono.just(DefaultPayload.create("Welcome to chat, " + payload.getDataUtf8())); + } + })); + + final var transport = TcpServerTransport.create(6565); + Objects.requireNonNull(rSocketServer.bind(transport) + .block()) + .onClose() + .block(); + } +} +``` + +*Lines 16-22* Define the RSocket server with a single response endpoint at *Lines *18-20*. + +*Line 19* Takes the username from the `Payload` instance's data and returns it to the client with a "welcome" message. + +*Lines 25-29* start a TCP server listening on localhost:6565. + +** Python ** +The 2 parameters passed are: +- transport : An instance of a supported connection method. In this case it is an adapter over the TCP connection. +- handler_factory: A callable which returns an `RSocketHandler` instance. This will be used to respond to the client's requests. + +There is no need to specify anything else here since the `RSocketServer` starts internal +tasks which listen for requests, and responds accordingly. The session will close when the connection is lost. + +In the example, the handler factory (*Line 12*) is a subclass of `BaseRequestHandler`. In this class, we can implement any of the methods +which handle the 4 RSocket request types: +- `request_response` +- `request_stream` +- `request_channel` +- `fire_and_forget` + +Check the `BaseRequestHandler` for other methods which can be implemented. + +*Lines 13-15* implement the `request_response` handler, which welcomes the user. This method receives a single argument containing the payload. +It is an instance of a `Payload` class which contains the data and metadata of the request. The username is taken from the `data` property +of the `Payload`. The `data` property's type is always `bytes`. In our case it is a UTF8 encoded string, so we will use the `utf8_decode` +helper to decode it. + +A response is created using helper methods: +- `create_future` : This creates a future which contains the response data. +- `ensure_bytes` : All values in a response must be of type `bytes`. This method encodes string to bytes and assumes UTF8 for the input. + +Next we will look at a simple client which connects to this server. + +## Client side + +The client will connect to the server, send a single *response* request and disconnect. + +Below is the code for the ClientApplication class: + +```java +package io.rsocket.guide.step0; + +import io.rsocket.Payload; +import io.rsocket.core.RSocketConnector; +import io.rsocket.metadata.WellKnownMimeType; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; + +import java.time.Duration; + +public class ClientApplication { + + public static void main(String[] args) { + final var rSocket = RSocketConnector.create() + .connect(TcpClientTransport.create("localhost", 6565)) + .block(); + + final Payload payload = DefaultPayload.create("George"); + rSocket.requestResponse(payload) + .doOnNext(response -> System.out.println(response.getDataUtf8())) + .block(Duration.ofMinutes(1)); + } + +} +``` + +*Line 10* instantiates an asyncio TCP connection to localhost on port 6565. + +*Line 12* instantiates an `RSocketClient` using an `async with` statement, to ensure the client closes the TCP connection when done. + +*Line 13* sends the request with a `Payload` containing the username as the data. The `data` value must be of type `bytes`. + +The response is a `Payload` instance, the `data` of which is printed (*Line 15*). diff --git a/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx b/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx new file mode 100644 index 00000000..19596859 --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx @@ -0,0 +1,130 @@ +--- +slug: /guides/rsocket-java/tutorial/request_routing +title: Request routing +sidebar_label: Request routing +--- + +In the previous step we added a single request-response handler. In order to allow more than one functionality to use this handler, +(e.g. login, messages, join/leave chanel) they need to be distinguished from each other. To achieve this, each request to the server will +be identified by a [route](https://github.com/rsocket/rsocket/blob/master/Extensions/Routing.md). This is similar to paths in an HTTP URL where +each URL may handle one of the HTTP methods (eg. GET, POST). To implement this we will use the `RequestRouter` and `RoutingRequestHandler` classes. + +See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master/examples/tutorial/step1) + +## Server side + +We will modify the example from the previous step into a routed request response. + +### Routing request handler + +The `handler_factory` method below replaces the `Handler` class from the previous step: + +```py +from typing import Awaitable + +from rsocket.frame_helpers import ensure_bytes +from rsocket.helpers import utf8_decode, create_response +from rsocket.payload import Payload +from rsocket.routing.request_router import RequestRouter +from rsocket.routing.routing_request_handler import RoutingRequestHandler + +def handler_factory() -> RoutingRequestHandler: + router = RequestRouter() + + @router.response('login') + async def login(payload: Payload) -> Awaitable[Payload]: + username = utf8_decode(payload.data) + return create_response(ensure_bytes(f'Welcome to chat, {username}')) + + return RoutingRequestHandler(router) +``` + +*Line 10* instantiates the `RequestRouter`. The methods on this helper are used as decorators to register the route of each request +(it is similar to Flask and Quart syntax). + +The `RequestRouter` has a decorator method for each RSocket request type: +- `response` +- `stream` +- `channel` +- `fire_and_forget` +- `metadata_push` + +All of the above methods receive a single argument, a string representing the route. The decorators may be applied multiple times +which allows for aliases for the same route. + +*Lines 12-15* define the login method and attach it to a request-response with the login route. +The method name does not have to match the route. + +All methods decorated by the request router accept two optional arguments: +- Any argument named *composite_metadata* (regardless of type-hint), or type-hinted with `CompositeMetadata` will receive a `CompositeMetadata` instance containing parsed composite metadata +- Any other argument without a type-hint, or type-hinted with `Payload` will receive the request's payload + +The `RequestRouter` has additional functionality which will be covered in later sections. + +*Line 17* returns the actual request handler, an instance of `RoutingRequestHandler`, which uses the request router instance. + +### Use the routing request handler + +Modify the `RSocketServer` instantiation from the previous example and pass the `handler_factory` method as the *handler_factory* parameter: + +```py +from rsocket.rsocket_server import RSocketServer +from rsocket.transports.tcp import TransportTCP + +async def run_server(): + def session(*connection): + RSocketServer(TransportTCP(*connection), handler_factory=handler_factory) +``` + +## Client side + +Let's modify the client side to call this new routed request. For readability and maintainability, we will create a `ChatClient` +which will wrap the RSocket client and provide the methods for interacting with the server. + +### ChatClient class + +Below is the complete code for the new client.py module: + +```py +from rsocket.extensions.helpers import composite, route +from rsocket.frame_helpers import ensure_bytes +from rsocket.helpers import utf8_decode +from rsocket.payload import Payload +from rsocket.rsocket_client import RSocketClient + +class ChatClient: + def __init__(self, rsocket: RSocketClient): + self._rsocket = rsocket + + async def login(self, username: str): + payload = Payload(ensure_bytes(username), composite(route('login'))) + response = await self._rsocket.request_response(payload) + print(f'Server response: {utf8_decode(response.data)}') +``` + +*Lines 7-14* define our new `ChatClient` which will encapsulate the methods used to interact with the chat server. + +*Lines 11-14* define a `login` method. It uses the `composite` and `route` helper methods to create the metadata which will ensure +the payload is routed to the method registered on the server side in the previous step. + +### Test the new functionality + +Let's modify the client module to test our new `ChatClient`: + +```py +from rsocket.extensions.mimetypes import WellKnownMimeTypes +from rsocket.helpers import single_transport_provider +from rsocket.rsocket_client import RSocketClient +from rsocket.transports.tcp import TransportTCP + +async def main(): + ... + async with RSocketClient(single_transport_provider(TransportTCP(*connection)), + metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA) as client: + user = ChatClient(client) + await user.login('George') +``` + +*Line 9* changes the *metadata_encoding* type to be COMPOSITE_METADATA. This is required for routing support. + +*Lines 10-11* instantiate a `ChatClient` and call the `login` method. diff --git a/content-docs/guides/rsocket-java/tutorial/02-user_session.mdx b/content-docs/guides/rsocket-java/tutorial/02-user_session.mdx new file mode 100644 index 00000000..e58a0072 --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/02-user_session.mdx @@ -0,0 +1,132 @@ +--- +slug: /guides/rsocket-java/tutorial/user_session +title: User session +sidebar_label: User session +--- + +Let's add a server side session to store the logged-in user's state. Later on it will be used to temporarily store +the messages which will be delivered to the client. + +See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master/examples/tutorial/step2) + +## Server side + +### Data-classes + +First we will add some [dataclasses](https://docs.python.org/3/library/dataclasses.html) to represent a single user session, and a lookup dictionary for all user sessions +by their id. + +```py +from dataclasses import dataclass, field +from typing import Dict +from weakref import WeakValueDictionary + +class SessionId(str): + pass + +@dataclass() +class UserSessionData: + username: str + session_id: SessionId + +@dataclass(frozen=True) +class ChatData: + user_session_by_id: Dict[SessionId, UserSessionData] = field(default_factory=WeakValueDictionary) + +chat_data = ChatData() +``` + +The `SessionId` defined in *Lines 5-6* is required in order to store the string session-id as a [weak reference](https://docs.python.org/3/library/weakref.html) later on. + +*Lines 8-11* define the `UserSessionData` dataclass which represents the user's session. It contains two fields: +- `username` - Human readable name specified in the login payload. +- `session_id` - unique id (e.g. UUID4) generated to identify the session. + +*Lines 13-15* define the `ChatData` dataclass which represents the application's data. For now, it contains only a dict for looking up +user sessions by their id. It is initialized as a [WeakValueDictionary](https://docs.python.org/3/library/weakref.html#weakref.WeakValueDictionary) in order for the session to be removed when the user disconnects. + +*Line 17* instantiates a global instance of this class. + +### Login endpoint + +Next we will modify the login endpoint to create a user session: + +```py +import logging +import uuid +from typing import Optional, Awaitable + +from rsocket.frame_helpers import ensure_bytes +from rsocket.helpers import utf8_decode, create_response +from rsocket.payload import Payload +from rsocket.routing.request_router import RequestRouter + +class ChatUserSession: + + def __init__(self): + self._session: Optional[UserSessionData] = None + + def router_factory(self): + router = RequestRouter() + + @router.response('login') + async def login(payload: Payload) -> Awaitable[Payload]: + username = utf8_decode(payload.data) + + logging.info(f'New user: {username}') + + session_id = SessionId(uuid.uuid4()) + self._session = UserSessionData(username, session_id) + chat_data.user_session_by_id[session_id] = self._session + + return create_response(ensure_bytes(session_id)) + + return router +``` + +In order to keep a reference to the `UserSessionData` we will modify the request handler factory. +The `ChatUserSession` class will keep the reference to the session data, and define the request routes. + +Below is the modified `handler_factory`: + +```py +class CustomRoutingRequestHandler(RoutingRequestHandler): + def __init__(self, session: ChatUserSession): + super().__init__(session.router_factory()) + self._session = session + +def handler_factory(): + return CustomRoutingRequestHandler(ChatUserSession()) +``` + +The `CustomRoutingRequestHandler` class (*Lines 1-4*) is the actual request handler which will wrap the `ChatUserSession` instance. + +Finally, we modify the `handler_factory` (*Lines 6-7*) to instantiate the handler and the session. + +## Client side + +Below is the modified ChatClient: + +```py +from rsocket.extensions.helpers import composite, route +from rsocket.frame_helpers import ensure_bytes +from rsocket.payload import Payload +from rsocket.rsocket_client import RSocketClient + +class ChatClient: + def __init__(self, rsocket: RSocketClient): + self._rsocket = rsocket + self._session_id = None + self._username: Optional[str] = None + + async def login(self, username: str): + payload = Payload(ensure_bytes(username), composite(route('login'))) + response = await self._rsocket.request_response(payload) + self._session_id = response.data + self._username = username + return self +``` + +Instead of a greeting from the server, we now receive a session id in the response payload. *Line 14* stores this session on the client class. + +We also store the username on the client for later printing as part of incoming messages. diff --git a/content-docs/guides/rsocket-java/tutorial/03-messages.mdx b/content-docs/guides/rsocket-java/tutorial/03-messages.mdx new file mode 100644 index 00000000..4f8af97e --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/03-messages.mdx @@ -0,0 +1,293 @@ +--- +slug: /guides/rsocket-java/tutorial/messages +title: Private messages +sidebar_label: Private messages +--- + +Let's add private messaging between users. We will use a request-stream to listen for new messages from other users. + +See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master/examples/tutorial/step3) + +## Shared + +Let's add an object representation of a message to the shared module: + +```py +from dataclasses import dataclass +from typing import Optional + +@dataclass(frozen=True) +class Message: + user: Optional[str] = None + content: Optional[str] = None +``` + +*Lines 4-7* defines a frozen dataclass with two fields: +- `user` : name of the recipient user when sending a message, and the name of the sender when receiving it. +- `content` : the message body. + +We will use [json](https://docs.python.org/3/library/json.html) to serialize the messages for transport. Add the following helper methods to the shared module: + +```py +import json +from typing import TypeVar, Type +from rsocket.frame_helpers import ensure_bytes +from rsocket.payload import Payload +from rsocket.helpers import utf8_decode + +def encode_dataclass(obj): + return ensure_bytes(json.dumps(obj.__dict__)) + +T = TypeVar('T') + +def decode_dataclass(data: bytes, cls: Type[T]) -> T: + return cls(**json.loads(utf8_decode(data))) + +def dataclass_to_payload(obj) -> Payload: + return Payload(encode_dataclass(obj)) +``` + +*Lines 7-8* Define a minimal dataclass json encoder which assumes all the fields in the dataclass are python primitives, or builtin collections of those. + +*Lines 10-13* Define the decoder counterpart of the above method. + +*Lines 15-16* Define a helper method for creating `Payload`s containing only a serialized `dataclass`. + +## Server side + +### Data storage and helper methods + +First we add a queue for incoming user messages: + +```py +from dataclasses import dataclass, field +from asyncio import Queue + +@dataclass() +class UserSessionData: + ... + messages: Queue = field(default_factory=Queue) +``` + +*Line 7* defines a `messages` queue. These are private (and later on channel) messages to the user from other clients. + +```py +from typing import Optional + +def find_session_by_username(username: str) -> Optional[UserSessionData]: + try: + return next(session for session in chat_data.user_session_by_id.values() if + session.username == username) + except StopIteration: + return None +``` + +*Lines 3-8* define a helper for looking up a user's session by username. This will be used to deliver private messages. + +### Send messages + +Next we will register a request-response endpoint for sending private messages: + +```py +import json +from typing import Awaitable + +from rsocket.helpers import create_response +from rsocket.payload import Payload +from rsocket.routing.request_router import RequestRouter +from shared import Message + +class ChatUserSession: + + def router_factory(self): + router = RequestRouter() + + @router.response('message') + async def send_message(payload: Payload) -> Awaitable[Payload]: + message = Message(**json.loads(payload.data)) + + logging.info('Received message for user: %s', message.user) + + target_message = Message(self._session.username, message.content) + + session = find_session_by_username(message.user) + await session.messages.put(target_message) + + return create_response() +``` + +*Lines 15-26* define the endpoint for sending messages. The Payload must contain a json serialized `Message` object. +The recipient's session is found (*Line 23*), and the message is placed in the user's message queue (*Line 24*). + +*Line 25* returns an empty `Payload` future using the `create_response` helper method. + +### Receive incoming messages + +As a last step on the server side, we register a request-stream endpoint which listens for incoming messages and sends +them to the client: + +```py +import asyncio + +from shared import encode_dataclass +from reactivestreams.publisher import DefaultPublisher +from reactivestreams.subscriber import Subscriber +from reactivestreams.subscription import DefaultSubscription +from rsocket.payload import Payload +from rsocket.routing.request_router import RequestRouter + +class ChatUserSession: + + def router_factory(self): + router = RequestRouter() + + @router.stream('messages.incoming') + async def messages_incoming(): + class MessagePublisher(DefaultPublisher, DefaultSubscription): + def __init__(self, session: UserSessionData): + self._session = session + self._sender = None + + def cancel(self): + self._sender.cancel() + + def subscribe(self, subscriber: Subscriber): + super(MessagePublisher, self).subscribe(subscriber) + subscriber.on_subscribe(self) + self._sender = asyncio.create_task(self._message_sender()) + + async def _message_sender(self): + while True: + next_message = await self._session.messages.get() + next_payload = Payload(encode_dataclass(next_message)) + self._subscriber.on_next(next_payload) + + return MessagePublisher(self._session) +``` + +*Lines 15-36* define the endpoint for listening to new messages. + +*Lines 17-34* define the `Publisher` which will be returned. It is given access to the user's session (*Lines 18-19*) + +The only method on the `Publisher` interface is `subscribe` (*Lines 25-28*), which is the action taken upon the client sending a request to the route. +The `subscriber` argument represents the client side. In this method an asyncio [Task](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task) is started (*Line 28*) +which continuously sends the subscriber the messages intended for that user from the server side session queue (*Lines 30-34*) + +The same class will be used for canceling the stream (*Lines 22-23*), if triggered by the client. This is provided by the `Subscription` interface which +is provided to the client on *Line 27*. + +## Client side + +First let's add a client method for sending private messages: + +```py +from shared import Message, encode_dataclass +from rsocket.extensions.helpers import composite, route +from rsocket.payload import Payload + +class ChatClient: + + async def private_message(self, username: str, content: str): + print(f'Sending {content} to user {username}') + + await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)), + composite(route('message')))) +``` + +*Line 10-11* creates a `Payload` with the messages and sends it to the 'message' route. + +Next we add a method which will listen for incoming messages: + +```py +import json +from typing import Optional + +from shared import Message +from reactivestreams.subscriber import DefaultSubscriber +from reactivestreams.subscription import DefaultSubscription +from rsocket.rsocket_client import RSocketClient +from rsocket.extensions.helpers import composite, route +from rsocket.payload import Payload + +class ChatClient: + def __init__(self, rsocket: RSocketClient): + ... + self._message_subscriber: Optional = None + + def listen_for_messages(self): + def print_message(data: bytes): + message = Message(**json.loads(data)) + print(f'{self._username}: from {message.user}: {message.content}') + + class MessageListener(DefaultSubscriber, DefaultSubscription): + def __init__(self): + + def on_next(self, value, is_complete=False): + print_message(value.data) + + def on_error(self, exception: Exception): + print(exception) + + def cancel(self): + self.subscription.cancel() + + self._message_subscriber = MessageListener() + self._rsocket.request_stream( + Payload(metadata=composite(route('messages.incoming'))) + ).subscribe(self._message_subscriber) + + def stop_listening_for_messages(self): + self._message_subscriber.cancel() +``` + +*Lines 21-31* define the `Subscriber` which will listen for incoming messages and print them on the client side. + +An instance of the `MessageListener` is stored on the client (*Line 33*) to later allow stopping the incoming message stream. + +*Lines 34-36* send the request and subscribe to the resulting `Publisher`. + +The method in *Lines 38-39* can be used to stop the above message listener. + +### Test the new functionality + +Finally, let's test the new functionality. Modify the `main` method in the client: + +```py +import asyncio + +from rsocket.extensions.mimetypes import WellKnownMimeTypes +from rsocket.helpers import single_transport_provider +from rsocket.rsocket_client import RSocketClient +from rsocket.transports.tcp import TransportTCP + +async def main(): + connection1 = await asyncio.open_connection('localhost', 6565) + + async with RSocketClient(single_transport_provider(TransportTCP(*connection1)), + metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA) as client1: + connection2 = await asyncio.open_connection('localhost', 6565) + + async with RSocketClient(single_transport_provider(TransportTCP(*connection2)), + metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA) as client2: + user1 = ChatClient(client1) + user2 = ChatClient(client2) + + await user1.login('user1') + await user2.login('user2') + + user2.listen_for_messages() + + await user1.private_message('user2', 'private message from user1') + + await asyncio.sleep(3) + + user2.stop_listening_for_messages() +``` + +In this example, we open two rsocket connections to the server (*lines 9-12* and *lines 13-16*). + +*Lines 17-21* wrap the rsocket clients with the chat client adapter and login the two users. + +*Line 23* makes user2 listen for incoming messages, while *line 25* has user1 send a message to user2. + +Finally, *lines 27-29* make the application wait for 3 seconds, then stops user2 listening for messages. diff --git a/content-docs/guides/rsocket-java/tutorial/index.mdx b/content-docs/guides/rsocket-java/tutorial/index.mdx new file mode 100644 index 00000000..682ca732 --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/index.mdx @@ -0,0 +1,39 @@ +--- +slug: /guides/rsocket-java/tutorial +title: Chat Application +sidebar_label: Preface +--- + +This guide will go over step by step of setting up an application using the java implementation of RSocket. + +:::tip +If you find a problem, code or otherwise, please report an [issue](https://github.com/rsocket/rsocket-website/issues) +::: + +## Preface + +We will be creating a chat application with a server and a client. + +The chat client will have the following functionality: +- Private messages between users +- Joining and sending messages to channels +- Uploading/Downloading files +- Getting server and client statistics (e.g. number of channels) + +Since the emphasis is on showcasing as much RSocket functionality as possible, some examples may be either a bit contrived, or +be possible to implement in a different way using RSocket. This is left as an exercise to the reader. + +## Required knowledge + +The guide assumes the following knowledge: + +* Basic java level (classes/methods, threads, streams) +* Basic understanding of RSocket protocol (See [About RSocket](/about/faq)) + +## Required setup + +TODO: setting up a java projects with rsocket as a dependency + +## Code + +The tutorial code is available on [GitHub](https://github.com/rsocket/rsocket-java) under examples/tutorial. diff --git a/sidebar-rsocket-java.js b/sidebar-rsocket-java.js new file mode 100644 index 00000000..3fceeb21 --- /dev/null +++ b/sidebar-rsocket-java.js @@ -0,0 +1,13 @@ +module.exports = [ + "guides/rsocket-java/index", + { + "Tutorial": + [ + "guides/rsocket-java/tutorial/index", + "guides/rsocket-java/tutorial/base", + "guides/rsocket-java/tutorial/request_routing", + "guides/rsocket-java/tutorial/user_session", + "guides/rsocket-java/tutorial/messages" + ] + } +]; diff --git a/sidebars.js b/sidebars.js index 33972631..efecaa8e 100644 --- a/sidebars.js +++ b/sidebars.js @@ -9,7 +9,8 @@ const guideItems = [ "guides/index", { "rsocket-js": require("./sidebar-rsocket-js"), - "rsocket-py": require("./sidebar-rsocket-py") + "rsocket-py": require("./sidebar-rsocket-py"), + "rsocket-java": require("./sidebar-rsocket-java") } ]; diff --git a/src/css/customTheme.css b/src/css/customTheme.css index 847511a0..6cde21fa 100644 --- a/src/css/customTheme.css +++ b/src/css/customTheme.css @@ -34,15 +34,18 @@ html[data-theme='dark'] .hero { -pre.language-py code::before { +pre.language-py code::before, +pre.language-java code::before { counter-reset: listing; } -pre.language-py code > span { +pre.language-py code > span, +pre.language-java code > span{ counter-increment: listing; } -pre.language-py code > span::before { +pre.language-py code > span::before, +pre.language-java code > span::before{ color: #9a9a9a; content: counter(listing) ". "; display: inline-block; From ca096c0498023880472b372c49b0f6244f1e965f Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Fri, 16 Dec 2022 19:27:26 +0200 Subject: [PATCH 02/14] rsocket-java - initial guide step 0 rewritten from python --- .../guides/rsocket-java/tutorial/00-base.mdx | 87 +++++++++---------- 1 file changed, 42 insertions(+), 45 deletions(-) diff --git a/content-docs/guides/rsocket-java/tutorial/00-base.mdx b/content-docs/guides/rsocket-java/tutorial/00-base.mdx index bf6c663d..9e3520c0 100644 --- a/content-docs/guides/rsocket-java/tutorial/00-base.mdx +++ b/content-docs/guides/rsocket-java/tutorial/00-base.mdx @@ -27,64 +27,59 @@ package io.rsocket.guide.step0; import io.rsocket.Payload; import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; import io.rsocket.core.RSocketServer; -import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.util.DefaultPayload; import reactor.core.publisher.Mono; -import java.util.Objects; - public class ServerApplication { public static void main(String[] args) { - final var rSocketServer = RSocketServer.create() - .acceptor((setup, sendingSocket) -> Mono.just(new RSocket() { - public Mono requestResponse(String route, Payload payload) { - return Mono.just(DefaultPayload.create("Welcome to chat, " + payload.getDataUtf8())); - } - })); - final var transport = TcpServerTransport.create(6565); - Objects.requireNonNull(rSocketServer.bind(transport) - .block()) + + final SocketAcceptor socketAcceptor = (setup, sendingSocket) -> Mono.just(new RSocket() { + public Mono requestResponse(Payload payload) { + return Mono.just(DefaultPayload.create("Welcome to chat, " + payload.getDataUtf8())); + } + }); + + RSocketServer.create() + .acceptor(socketAcceptor) + .bind(transport) + .block() .onClose() .block(); } } ``` -*Lines 16-22* Define the RSocket server with a single response endpoint at *Lines *18-20*. +*Lines 22-27* start an RSocket TCP server listening on localhost:6565. -*Line 19* Takes the username from the `Payload` instance's data and returns it to the client with a "welcome" message. - -*Lines 25-29* start a TCP server listening on localhost:6565. - -** Python ** The 2 parameters passed are: -- transport : An instance of a supported connection method. In this case it is an adapter over the TCP connection. -- handler_factory: A callable which returns an `RSocketHandler` instance. This will be used to respond to the client's requests. +- transport : An instance of a supported connection method. In this case it is at instance of `TcpServerTransport` created in *Line 14*. +- socketAcceptor: A callable which returns an `RSocket` instance wrapped in a `Mono`. This will be used to respond to the client's requests. -There is no need to specify anything else here since the `RSocketServer` starts internal -tasks which listen for requests, and responds accordingly. The session will close when the connection is lost. - -In the example, the handler factory (*Line 12*) is a subclass of `BaseRequestHandler`. In this class, we can implement any of the methods -which handle the 4 RSocket request types: -- `request_response` -- `request_stream` -- `request_channel` -- `fire_and_forget` +*Lines 16-20* Define the `RSocket` service with a single `requestResponse` endpoint at *Lines *17-19*. -Check the `BaseRequestHandler` for other methods which can be implemented. +The `requestResponse` method receives a single argument containing the payload. +It is an instance of a `Payload` class which contains the data and metadata of the request. The data property is assumed to contain +a UTF-8 encoded string of the username, so is retrieved using `getDataUtf8`. -*Lines 13-15* implement the `request_response` handler, which welcomes the user. This method receives a single argument containing the payload. -It is an instance of a `Payload` class which contains the data and metadata of the request. The username is taken from the `data` property -of the `Payload`. The `data` property's type is always `bytes`. In our case it is a UTF8 encoded string, so we will use the `utf8_decode` -helper to decode it. +*Line 18* Takes the username from the `Payload` instance's data and returns it to the client with a "welcome" message. A response is created using helper methods: -- `create_future` : This creates a future which contains the response data. -- `ensure_bytes` : All values in a response must be of type `bytes`. This method encodes string to bytes and assumes UTF8 for the input. +- `DefaultPayload::create` : This creates a payload which is the standard object which wraps all data transferred over RSocket. In our case, only the data property is set. +- `Mono::just` : All RSocket responses must be in the form of streams, either a `Flux` or a `Mono`. + +In the example, only the `requestResponse` method of `RSocket` is overridden. In this class, we can override the methods +which handle the 4 RSocket request types: +- `requestResponse` +- `requestStream` +- `requestChannel` +- `fireAndForget` + +Check the `RSocket` for other methods which can be implemented. Next we will look at a simple client which connects to this server. @@ -97,9 +92,7 @@ Below is the code for the ClientApplication class: ```java package io.rsocket.guide.step0; -import io.rsocket.Payload; import io.rsocket.core.RSocketConnector; -import io.rsocket.metadata.WellKnownMimeType; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.util.DefaultPayload; @@ -108,23 +101,27 @@ import java.time.Duration; public class ClientApplication { public static void main(String[] args) { + final var transport = TcpClientTransport.create("localhost", 6565); + final var rSocket = RSocketConnector.create() - .connect(TcpClientTransport.create("localhost", 6565)) + .connect(transport) .block(); - final Payload payload = DefaultPayload.create("George"); + final var payload = DefaultPayload.create("George"); + rSocket.requestResponse(payload) .doOnNext(response -> System.out.println(response.getDataUtf8())) .block(Duration.ofMinutes(1)); } - } ``` -*Line 10* instantiates an asyncio TCP connection to localhost on port 6565. +*Line 12* instantiates a TCP connection to localhost on port 6565, similar to the one in `ServerApplication`. + +*Lines 14-16* instantiates an `RSocket` client. -*Line 12* instantiates an `RSocketClient` using an `async with` statement, to ensure the client closes the TCP connection when done. +*Line 18* Wraps the username "George" which the client will send to the server in a `Payload` using the `DefaultPayload.create` factory method -*Line 13* sends the request with a `Payload` containing the username as the data. The `data` value must be of type `bytes`. +Finally, *Line 20* sends the request to the server and prints (*Line 21*) the received response. -The response is a `Payload` instance, the `data` of which is printed (*Line 15*). +Since RSocket is reactive, and we want to wait for the request to finish before quitting, a call to `block(Duration.ofMinutes(1))` is added to block for 1 minute. From 023e74c433ce6af7aa69a0f369f853eefd58b468 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Fri, 16 Dec 2022 20:52:12 +0200 Subject: [PATCH 03/14] rsocket-java - initial guide routing rewritten from python --- .../tutorial/01-request_routing.mdx | 179 ++++++++++-------- 1 file changed, 97 insertions(+), 82 deletions(-) diff --git a/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx b/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx index 19596859..bfc7ec84 100644 --- a/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx +++ b/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx @@ -7,9 +7,9 @@ sidebar_label: Request routing In the previous step we added a single request-response handler. In order to allow more than one functionality to use this handler, (e.g. login, messages, join/leave chanel) they need to be distinguished from each other. To achieve this, each request to the server will be identified by a [route](https://github.com/rsocket/rsocket/blob/master/Extensions/Routing.md). This is similar to paths in an HTTP URL where -each URL may handle one of the HTTP methods (eg. GET, POST). To implement this we will use the `RequestRouter` and `RoutingRequestHandler` classes. +each URL may handle one of the HTTP methods (eg. GET, POST). -See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master/examples/tutorial/step1) +See resulting code on [GitHub](https://github.com/rsocket/rsocket-java/tree/master/examples/tutorial/step1) ## Server side @@ -19,112 +19,127 @@ We will modify the example from the previous step into a routed request response The `handler_factory` method below replaces the `Handler` class from the previous step: -```py -from typing import Awaitable - -from rsocket.frame_helpers import ensure_bytes -from rsocket.helpers import utf8_decode, create_response -from rsocket.payload import Payload -from rsocket.routing.request_router import RequestRouter -from rsocket.routing.routing_request_handler import RoutingRequestHandler - -def handler_factory() -> RoutingRequestHandler: - router = RequestRouter() - - @router.response('login') - async def login(payload: Payload) -> Awaitable[Payload]: - username = utf8_decode(payload.data) - return create_response(ensure_bytes(f'Welcome to chat, {username}')) - - return RoutingRequestHandler(router) +```java +final SocketAcceptor socketAcceptor = (setup, sendingSocket) -> Mono.just(new RSocket() { + public Mono requestResponse(Payload payload) { + final var route = requireRoute(payload); + + switch (route) { + case "login": + return Mono.just(DefaultPayload.create("Welcome to chat, " + payload.getDataUtf8())); + } + + throw new RuntimeException("Unknown requestResponse route " + route); + } + + private String requireRoute(Payload payload) { + final var metadata = payload.sliceMetadata(); + final CompositeMetadata compositeMetadata = new CompositeMetadata(metadata, false); + + for (CompositeMetadata.Entry metadatum : compositeMetadata) { + if (Objects.requireNonNull(metadatum.getMimeType()) + .equals(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())) { + return new RoutingMetadata(metadatum.getContent()).iterator().next(); + } + } + + throw new IllegalStateException(); + } +}); ``` -*Line 10* instantiates the `RequestRouter`. The methods on this helper are used as decorators to register the route of each request -(it is similar to Flask and Quart syntax). +The `requestResponse` method in *Lines 2-11* is modified to first parse the route from the `Payload` metadata, using the `requireRoute` helper method. +For now there is only a single case, the "login" route, which returns the same response as in the previous section of this guide. -The `RequestRouter` has a decorator method for each RSocket request type: -- `response` -- `stream` -- `channel` -- `fire_and_forget` -- `metadata_push` +*Line 10* raises an exception if no known route is supplied. -All of the above methods receive a single argument, a string representing the route. The decorators may be applied multiple times -which allows for aliases for the same route. +The `requireRoute` method parses the `Payload` metadata using the `CompositeMetadata` class. If any of the metadata items is of routing type, its value is returned. +If no routing metadata is found (*Line 24*) an exception is thrown. -*Lines 12-15* define the login method and attach it to a request-response with the login route. -The method name does not have to match the route. +## Client side -All methods decorated by the request router accept two optional arguments: -- Any argument named *composite_metadata* (regardless of type-hint), or type-hinted with `CompositeMetadata` will receive a `CompositeMetadata` instance containing parsed composite metadata -- Any other argument without a type-hint, or type-hinted with `Payload` will receive the request's payload +Let's modify the client side to call this new routed request. For readability and maintainability, we will create a `ChatClient` +which will wrap the RSocket client and provide the methods for interacting with the server. -The `RequestRouter` has additional functionality which will be covered in later sections. +### Client class -*Line 17* returns the actual request handler, an instance of `RoutingRequestHandler`, which uses the request router instance. +Below is the complete code for the new `Client` class: -### Use the routing request handler +```java +package io.rsocket.guide.step1; -Modify the `RSocketServer` instantiation from the previous example and pass the `handler_factory` method as the *handler_factory* parameter: +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.metadata.CompositeMetadataCodec; +import io.rsocket.metadata.TaggingMetadataCodec; +import io.rsocket.metadata.WellKnownMimeType; +import io.rsocket.util.DefaultPayload; +import reactor.core.publisher.Mono; -```py -from rsocket.rsocket_server import RSocketServer -from rsocket.transports.tcp import TransportTCP +import java.util.List; -async def run_server(): - def session(*connection): - RSocketServer(TransportTCP(*connection), handler_factory=handler_factory) -``` +public class Client { -## Client side - -Let's modify the client side to call this new routed request. For readability and maintainability, we will create a `ChatClient` -which will wrap the RSocket client and provide the methods for interacting with the server. + private final RSocket rSocket; -### ChatClient class + public Client(RSocket rSocket) { + this.rSocket = rSocket; + } -Below is the complete code for the new client.py module: + public Mono login(String username) { + final Payload payload = DefaultPayload.create( + Unpooled.wrappedBuffer(username.getBytes()), + route("login") + ); + return rSocket.requestResponse(payload); + } -```py -from rsocket.extensions.helpers import composite, route -from rsocket.frame_helpers import ensure_bytes -from rsocket.helpers import utf8_decode -from rsocket.payload import Payload -from rsocket.rsocket_client import RSocketClient + private static CompositeByteBuf route(String route) { + final var metadata = ByteBufAllocator.DEFAULT.compositeBuffer(); -class ChatClient: - def __init__(self, rsocket: RSocketClient): - self._rsocket = rsocket + CompositeMetadataCodec.encodeAndAddMetadata( + metadata, + ByteBufAllocator.DEFAULT, + WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, + TaggingMetadataCodec.createTaggingContent(ByteBufAllocator.DEFAULT, List.of(route)) + ); - async def login(self, username: str): - payload = Payload(ensure_bytes(username), composite(route('login'))) - response = await self._rsocket.request_response(payload) - print(f'Server response: {utf8_decode(response.data)}') + return metadata; + } +} ``` -*Lines 7-14* define our new `ChatClient` which will encapsulate the methods used to interact with the chat server. +*Lines 17-45* define our new `Client` which will encapsulate the methods used to interact with the chat server. -*Lines 11-14* define a `login` method. It uses the `composite` and `route` helper methods to create the metadata which will ensure -the payload is routed to the method registered on the server side in the previous step. +*Lines 25-31* define a `login` method. It uses the `route` helper method defined later in the class to create the routing metadata, which is added to the `Payload`. +This ensures the payload is routed to the method registered on the server side in the previous step. + +The `route` method defined in *Lines 33-44*, creates a composite metadata item (*Line 34*) and adds the route metadata to it (*Lines 36-41*). ### Test the new functionality -Let's modify the client module to test our new `ChatClient`: +Let's modify the `ClientApplication` class to test our new `Client`: + +```java +final var rSocket = RSocketConnector.create() + .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()) + .connect(transport) + .block(); -```py -from rsocket.extensions.mimetypes import WellKnownMimeTypes -from rsocket.helpers import single_transport_provider -from rsocket.rsocket_client import RSocketClient -from rsocket.transports.tcp import TransportTCP +final var client = new Client(rSocket); -async def main(): - ... - async with RSocketClient(single_transport_provider(TransportTCP(*connection)), - metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA) as client: - user = ChatClient(client) - await user.login('George') +client.login("George") + .doOnNext(response -> System.out.println(response.getDataUtf8())) + .block(Duration.ofMinutes(10)); ``` -*Line 9* changes the *metadata_encoding* type to be COMPOSITE_METADATA. This is required for routing support. +The `RSocket` instantiation is modified, and in *Line 2* sets the `metadataMimeType` type to be COMPOSITE_METADATA. +This is required for multiple elements in the `Payload` metadata, which includes the routing information. + +*Lines 6* instantiates a `Client`, passing it the `RSocket` -*Lines 10-11* instantiate a `ChatClient` and call the `login` method. +*Lines 8-10* call the `login` method, and prints the response. From 3e41e99f00bb1ebda2a4fa28903611da443c79af Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Fri, 16 Dec 2022 21:23:46 +0200 Subject: [PATCH 04/14] rsocket-java - initial guide user session --- .../rsocket-java/tutorial/02-user_session.mdx | 126 +++++++++--------- 1 file changed, 64 insertions(+), 62 deletions(-) diff --git a/content-docs/guides/rsocket-java/tutorial/02-user_session.mdx b/content-docs/guides/rsocket-java/tutorial/02-user_session.mdx index e58a0072..88fcc5e1 100644 --- a/content-docs/guides/rsocket-java/tutorial/02-user_session.mdx +++ b/content-docs/guides/rsocket-java/tutorial/02-user_session.mdx @@ -7,81 +7,83 @@ sidebar_label: User session Let's add a server side session to store the logged-in user's state. Later on it will be used to temporarily store the messages which will be delivered to the client. -See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master/examples/tutorial/step2) +See resulting code on [GitHub](https://github.com/rsocket/rsocket-java/tree/master/examples/tutorial/step2) ## Server side ### Data-classes -First we will add some [dataclasses](https://docs.python.org/3/library/dataclasses.html) to represent a single user session, and a lookup dictionary for all user sessions -by their id. +First we will add a POJO to represent a single user session. Below is the contents of the new `Session` class: -```py -from dataclasses import dataclass, field -from typing import Dict -from weakref import WeakValueDictionary - -class SessionId(str): - pass - -@dataclass() -class UserSessionData: - username: str - session_id: SessionId +```java +package io.rsocket.guide; -@dataclass(frozen=True) -class ChatData: - user_session_by_id: Dict[SessionId, UserSessionData] = field(default_factory=WeakValueDictionary) +public class Session { + public String username; -chat_data = ChatData() + public String sessionId; +} ``` -The `SessionId` defined in *Lines 5-6* is required in order to store the string session-id as a [weak reference](https://docs.python.org/3/library/weakref.html) later on. - -*Lines 8-11* define the `UserSessionData` dataclass which represents the user's session. It contains two fields: -- `username` - Human readable name specified in the login payload. -- `session_id` - unique id (e.g. UUID4) generated to identify the session. - -*Lines 13-15* define the `ChatData` dataclass which represents the application's data. For now, it contains only a dict for looking up -user sessions by their id. It is initialized as a [WeakValueDictionary](https://docs.python.org/3/library/weakref.html#weakref.WeakValueDictionary) in order for the session to be removed when the user disconnects. - -*Line 17* instantiates a global instance of this class. +The username (*Line 4*) will be supplied by the client, and the sessionId (*Line 6*) will be a UUID4 generated by the server. ### Login endpoint -Next we will modify the login endpoint to create a user session: - -```py -import logging -import uuid -from typing import Optional, Awaitable - -from rsocket.frame_helpers import ensure_bytes -from rsocket.helpers import utf8_decode, create_response -from rsocket.payload import Payload -from rsocket.routing.request_router import RequestRouter - -class ChatUserSession: - - def __init__(self): - self._session: Optional[UserSessionData] = None - - def router_factory(self): - router = RequestRouter() - - @router.response('login') - async def login(payload: Payload) -> Awaitable[Payload]: - username = utf8_decode(payload.data) - - logging.info(f'New user: {username}') - - session_id = SessionId(uuid.uuid4()) - self._session = UserSessionData(username, session_id) - chat_data.user_session_by_id[session_id] = self._session - - return create_response(ensure_bytes(session_id)) - - return router +Let's separate the `SocketAcceptor` creation from the `ServerApplication` class. Below is the contents of the new `Server` class: + +```java +package io.rsocket.guide.step2; + +import io.rsocket.ConnectionSetupPayload; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import io.rsocket.guide.step8.Session; +import io.rsocket.metadata.CompositeMetadata; +import io.rsocket.metadata.RoutingMetadata; +import io.rsocket.metadata.WellKnownMimeType; +import io.rsocket.util.DefaultPayload; +import reactor.core.publisher.Mono; + +import java.util.Objects; +import java.util.UUID; + +public class Server implements SocketAcceptor { + + @Override + public Mono accept(ConnectionSetupPayload setup, RSocket sendingSocket) { + final var session = new Session(); + session.sessionId = UUID.randomUUID().toString(); + + return Mono.just(new RSocket() { + public Mono requestResponse(Payload payload) { + final var route = requireRoute(payload); + + switch (route) { + case "login": + session.username = payload.getDataUtf8(); + return Mono.just(DefaultPayload.create(session.sessionId)); + } + + throw new RuntimeException("Unknown requestResponse route " + route); + } + + private String requireRoute(Payload payload) { + final var metadata = payload.sliceMetadata(); + final CompositeMetadata compositeMetadata = new CompositeMetadata(metadata, false); + + for (CompositeMetadata.Entry metadatum : compositeMetadata) { + if (Objects.requireNonNull(metadatum.getMimeType()) + .equals(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())) { + return new RoutingMetadata(metadatum.getContent()).iterator().next(); + } + } + + throw new IllegalStateException(); + } + }); + } +} ``` In order to keep a reference to the `UserSessionData` we will modify the request handler factory. From 2749aa034253de47e23b2f59b40a4c87a6317aae Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Fri, 16 Dec 2022 21:23:56 +0200 Subject: [PATCH 05/14] rsocket-java - initial guide user session --- content-docs/guides/rsocket-java/tutorial/00-base.mdx | 4 ++-- .../guides/rsocket-java/tutorial/01-request_routing.mdx | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/content-docs/guides/rsocket-java/tutorial/00-base.mdx b/content-docs/guides/rsocket-java/tutorial/00-base.mdx index 9e3520c0..304ec6fb 100644 --- a/content-docs/guides/rsocket-java/tutorial/00-base.mdx +++ b/content-docs/guides/rsocket-java/tutorial/00-base.mdx @@ -23,7 +23,7 @@ The server will listen on TCP port 6565. Below is the code for the ServerApplication class: ```java -package io.rsocket.guide.step0; +package io.rsocket.guide; import io.rsocket.Payload; import io.rsocket.RSocket; @@ -90,7 +90,7 @@ The client will connect to the server, send a single *response* request and disc Below is the code for the ClientApplication class: ```java -package io.rsocket.guide.step0; +package io.rsocket.guide; import io.rsocket.core.RSocketConnector; import io.rsocket.transport.netty.client.TcpClientTransport; diff --git a/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx b/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx index bfc7ec84..b38c04d6 100644 --- a/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx +++ b/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx @@ -66,7 +66,7 @@ which will wrap the RSocket client and provide the methods for interacting with Below is the complete code for the new `Client` class: ```java -package io.rsocket.guide.step1; +package io.rsocket.guide; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; From 1d928726f2e7014aba89b5c76eaaf1430c6ca09b Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Sat, 17 Dec 2022 13:52:42 +0200 Subject: [PATCH 06/14] rsocket-java - initial guide messaging --- content-docs/guides/index.mdx | 1 + .../rsocket-java/tutorial/02-user_session.mdx | 52 ++++------------ .../rsocket-java/tutorial/03-messages.mdx | 61 +++++++++++++++---- 3 files changed, 62 insertions(+), 52 deletions(-) diff --git a/content-docs/guides/index.mdx b/content-docs/guides/index.mdx index d0c5c617..bf482dc3 100644 --- a/content-docs/guides/index.mdx +++ b/content-docs/guides/index.mdx @@ -10,3 +10,4 @@ In this section you will find guides related to working with and consuming the v - [`rsocket-js`](./rsocket-js/index.mdx) - [`rsocket-py`](./rsocket-py/index.mdx) +- [`rsocket-java`](./rsocket-java/index.mdx) diff --git a/content-docs/guides/rsocket-java/tutorial/02-user_session.mdx b/content-docs/guides/rsocket-java/tutorial/02-user_session.mdx index 88fcc5e1..a27db692 100644 --- a/content-docs/guides/rsocket-java/tutorial/02-user_session.mdx +++ b/content-docs/guides/rsocket-java/tutorial/02-user_session.mdx @@ -86,49 +86,23 @@ public class Server implements SocketAcceptor { } ``` -In order to keep a reference to the `UserSessionData` we will modify the request handler factory. -The `ChatUserSession` class will keep the reference to the session data, and define the request routes. - -Below is the modified `handler_factory`: - -```py -class CustomRoutingRequestHandler(RoutingRequestHandler): - def __init__(self, session: ChatUserSession): - super().__init__(session.router_factory()) - self._session = session - -def handler_factory(): - return CustomRoutingRequestHandler(ChatUserSession()) -``` - -The `CustomRoutingRequestHandler` class (*Lines 1-4*) is the actual request handler which will wrap the `ChatUserSession` instance. - -Finally, we modify the `handler_factory` (*Lines 6-7*) to instantiate the handler and the session. +In order to keep a reference to the `Session` we will instantiate it in the `accept` method (*Line 21-22*) which serves as the scope for the current client connection. +The username provided in the login `Payload` will be stored in the session (*Line 30*). ## Client side -Below is the modified ChatClient: +We will modify the `Client` to store the username, to use it in output later on: ```py -from rsocket.extensions.helpers import composite, route -from rsocket.frame_helpers import ensure_bytes -from rsocket.payload import Payload -from rsocket.rsocket_client import RSocketClient - -class ChatClient: - def __init__(self, rsocket: RSocketClient): - self._rsocket = rsocket - self._session_id = None - self._username: Optional[str] = None - - async def login(self, username: str): - payload = Payload(ensure_bytes(username), composite(route('login'))) - response = await self._rsocket.request_response(payload) - self._session_id = response.data - self._username = username - return self +public Mono login(String username) { + this.username = username; + + final Payload payload = DefaultPayload.create( + Unpooled.wrappedBuffer(username.getBytes()), + route("login") + ); + return rSocket.requestResponse(payload); +} ``` -Instead of a greeting from the server, we now receive a session id in the response payload. *Line 14* stores this session on the client class. - -We also store the username on the client for later printing as part of incoming messages. +Instead of a greeting from the server, we now receive a session id in the response payload (*Line 8*). diff --git a/content-docs/guides/rsocket-java/tutorial/03-messages.mdx b/content-docs/guides/rsocket-java/tutorial/03-messages.mdx index 4f8af97e..0a2469a3 100644 --- a/content-docs/guides/rsocket-java/tutorial/03-messages.mdx +++ b/content-docs/guides/rsocket-java/tutorial/03-messages.mdx @@ -6,27 +6,62 @@ sidebar_label: Private messages Let's add private messaging between users. We will use a request-stream to listen for new messages from other users. -See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master/examples/tutorial/step3) +See resulting code on [GitHub](https://github.com/rsocket/rsocket-java/tree/master/examples/tutorial/step3) ## Shared -Let's add an object representation of a message to the shared module: +Let's add an object representation of a message. Below is the contents of the `Message` class: -```py -from dataclasses import dataclass -from typing import Optional +```java +package io.rsocket.guide; + +public class Message { + public String user; + public String content; + + public Message() { + } + + public Message(String user, String content) { + this.user = user; + this.content = content; + } +} +``` -@dataclass(frozen=True) -class Message: - user: Optional[str] = None - content: Optional[str] = None +*Lines 3-6* defines a POJO with 2 fields: +- `user` : Name of the recipient user when sending a message, and the name of the sender when receiving it. +- `content` : The message body. + +We will use [json](https://docs.python.org/3/library/json.html) to serialize the messages for transport. We will use the jackson library to do this. +Add the following dependencies to the pom.xml: + +```xml + + com.fasterxml.jackson.core + jackson-databind + 2.14.1 + + + com.fasterxml.jackson.core + jackson-core + 2.14.1 + ``` -*Lines 4-7* defines a frozen dataclass with two fields: -- `user` : name of the recipient user when sending a message, and the name of the sender when receiving it. -- `content` : the message body. +We will also add a global storage in order to look up sessions of other users and deliver them messages. Add the `ChatData` class: + +```java +package io.rsocket.guide.step3; + +import java.util.HashMap; +import java.util.Map; + +public class ChatData { + public final Map sessionById = new HashMap<>(); +} +``` -We will use [json](https://docs.python.org/3/library/json.html) to serialize the messages for transport. Add the following helper methods to the shared module: ```py import json From aa983b35dbac0e4807219c447b3cf765f84321a6 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Sat, 17 Dec 2022 13:58:03 +0200 Subject: [PATCH 07/14] rsocket-java - initial guide messaging --- .../rsocket-java/tutorial/03-messages.mdx | 312 +++++------------- 1 file changed, 89 insertions(+), 223 deletions(-) diff --git a/content-docs/guides/rsocket-java/tutorial/03-messages.mdx b/content-docs/guides/rsocket-java/tutorial/03-messages.mdx index 0a2469a3..b5d7bca2 100644 --- a/content-docs/guides/rsocket-java/tutorial/03-messages.mdx +++ b/content-docs/guides/rsocket-java/tutorial/03-messages.mdx @@ -62,267 +62,133 @@ public class ChatData { } ``` - -```py -import json -from typing import TypeVar, Type -from rsocket.frame_helpers import ensure_bytes -from rsocket.payload import Payload -from rsocket.helpers import utf8_decode - -def encode_dataclass(obj): - return ensure_bytes(json.dumps(obj.__dict__)) - -T = TypeVar('T') - -def decode_dataclass(data: bytes, cls: Type[T]) -> T: - return cls(**json.loads(utf8_decode(data))) - -def dataclass_to_payload(obj) -> Payload: - return Payload(encode_dataclass(obj)) -``` - -*Lines 7-8* Define a minimal dataclass json encoder which assumes all the fields in the dataclass are python primitives, or builtin collections of those. - -*Lines 10-13* Define the decoder counterpart of the above method. - -*Lines 15-16* Define a helper method for creating `Payload`s containing only a serialized `dataclass`. - ## Server side ### Data storage and helper methods -First we add a queue for incoming user messages: +Let's add a helper method to find sessions by username to the `Server` class: -```py -from dataclasses import dataclass, field -from asyncio import Queue - -@dataclass() -class UserSessionData: - ... - messages: Queue = field(default_factory=Queue) -``` - -*Line 7* defines a `messages` queue. These are private (and later on channel) messages to the user from other clients. - -```py -from typing import Optional - -def find_session_by_username(username: str) -> Optional[UserSessionData]: - try: - return next(session for session in chat_data.user_session_by_id.values() if - session.username == username) - except StopIteration: - return None +```java +public Mono findUserByName(final String username) { + return Flux.fromIterable(chatData.sessionById.entrySet()) + .filter(e -> e.getValue().username.equals(username)) + .map(e -> e.getValue()) + .single(); + } ``` -*Lines 3-8* define a helper for looking up a user's session by username. This will be used to deliver private messages. +TODO: explain ### Send messages -Next we will register a request-response endpoint for sending private messages: - -```py -import json -from typing import Awaitable - -from rsocket.helpers import create_response -from rsocket.payload import Payload -from rsocket.routing.request_router import RequestRouter -from shared import Message - -class ChatUserSession: +Next we will register a request-response endpoint for sending private messages in the `requestResponse` route switch case: - def router_factory(self): - router = RequestRouter() - - @router.response('message') - async def send_message(payload: Payload) -> Awaitable[Payload]: - message = Message(**json.loads(payload.data)) - - logging.info('Received message for user: %s', message.user) - - target_message = Message(self._session.username, message.content) - - session = find_session_by_username(message.user) - await session.messages.put(target_message) - - return create_response() +```java +case "message": + try { + final var message = objectMapper.readValue(payload.getDataUtf8(), Message.class); + final var targetMessage = new Message(session.username, message.content); + return findUserByName(message.user) + .doOnNext(targetSession -> targetSession.messages.add(targetMessage)) + .thenReturn(EmptyPayload.INSTANCE); + } catch (Exception exception) { + throw new RuntimeException(exception); + } ``` -*Lines 15-26* define the endpoint for sending messages. The Payload must contain a json serialized `Message` object. -The recipient's session is found (*Line 23*), and the message is placed in the user's message queue (*Line 24*). - -*Line 25* returns an empty `Payload` future using the `create_response` helper method. +TODO: explain ### Receive incoming messages As a last step on the server side, we register a request-stream endpoint which listens for incoming messages and sends them to the client: -```py -import asyncio - -from shared import encode_dataclass -from reactivestreams.publisher import DefaultPublisher -from reactivestreams.subscriber import Subscriber -from reactivestreams.subscription import DefaultSubscription -from rsocket.payload import Payload -from rsocket.routing.request_router import RequestRouter - -class ChatUserSession: - - def router_factory(self): - router = RequestRouter() - - @router.stream('messages.incoming') - async def messages_incoming(): - class MessagePublisher(DefaultPublisher, DefaultSubscription): - def __init__(self, session: UserSessionData): - self._session = session - self._sender = None - - def cancel(self): - self._sender.cancel() - - def subscribe(self, subscriber: Subscriber): - super(MessagePublisher, self).subscribe(subscriber) - subscriber.on_subscribe(self) - self._sender = asyncio.create_task(self._message_sender()) - - async def _message_sender(self): - while True: - next_message = await self._session.messages.get() - next_payload = Payload(encode_dataclass(next_message)) - self._subscriber.on_next(next_payload) +```java +public void messageSupplier(FluxSink sink) { + while (true) { + try { + final var message = session.messages.poll(20, TimeUnit.DAYS); + if (message != null) { + sink.next(DefaultPayload.create(objectMapper.writeValueAsString(message))); + } + } catch (Exception exception) { + break; + } + } +} - return MessagePublisher(self._session) +public Flux requestStream(String route, Payload payload) { + return Flux.defer(() -> { + switch (route) { + case "messages.incoming": + final var threadContainer = new AtomicReference(); + return Flux.create(sink -> sink.onRequest(n -> { + if (threadContainer.get() == null) { + final var thread = new Thread(() -> messageSupplier(sink)); + thread.start(); + threadContainer.set(thread); + } + }) + .onCancel(() -> threadContainer.get().interrupt()) + .onDispose(() -> threadContainer.get().interrupt())); + } + + throw new IllegalStateException(); + }); +} ``` -*Lines 15-36* define the endpoint for listening to new messages. - -*Lines 17-34* define the `Publisher` which will be returned. It is given access to the user's session (*Lines 18-19*) - -The only method on the `Publisher` interface is `subscribe` (*Lines 25-28*), which is the action taken upon the client sending a request to the route. -The `subscriber` argument represents the client side. In this method an asyncio [Task](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task) is started (*Line 28*) -which continuously sends the subscriber the messages intended for that user from the server side session queue (*Lines 30-34*) - -The same class will be used for canceling the stream (*Lines 22-23*), if triggered by the client. This is provided by the `Subscription` interface which -is provided to the client on *Line 27*. +TODO: explain ## Client side First let's add a client method for sending private messages: -```py -from shared import Message, encode_dataclass -from rsocket.extensions.helpers import composite, route -from rsocket.payload import Payload - -class ChatClient: - - async def private_message(self, username: str, content: str): - print(f'Sending {content} to user {username}') - - await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)), - composite(route('message')))) +```java +public Mono sendMessage(String data) { + final Payload payload = DefaultPayload.create(Unpooled.wrappedBuffer(data.getBytes()), + route("message") + ); + return rSocket.requestResponse(payload); +} ``` -*Line 10-11* creates a `Payload` with the messages and sends it to the 'message' route. +TODO: explain Next we add a method which will listen for incoming messages: -```py -import json -from typing import Optional - -from shared import Message -from reactivestreams.subscriber import DefaultSubscriber -from reactivestreams.subscription import DefaultSubscription -from rsocket.rsocket_client import RSocketClient -from rsocket.extensions.helpers import composite, route -from rsocket.payload import Payload - -class ChatClient: - def __init__(self, rsocket: RSocketClient): - ... - self._message_subscriber: Optional = None - - def listen_for_messages(self): - def print_message(data: bytes): - message = Message(**json.loads(data)) - print(f'{self._username}: from {message.user}: {message.content}') - - class MessageListener(DefaultSubscriber, DefaultSubscription): - def __init__(self): - - def on_next(self, value, is_complete=False): - print_message(value.data) - - def on_error(self, exception: Exception): - print(exception) - - def cancel(self): - self.subscription.cancel() - - self._message_subscriber = MessageListener() - self._rsocket.request_stream( - Payload(metadata=composite(route('messages.incoming'))) - ).subscribe(self._message_subscriber) +```java +public final AtomicReference incomingMessages = new AtomicReference<>(); + +public void listenForMessages() { + new Thread(() -> + { + Disposable subscribe = rSocket.requestStream(DefaultPayload.create(null, route("messages.incoming"))) + .doOnComplete(() -> System.out.println("Response from server stream completed")) + .doOnNext(response -> System.out.println("Response from server stream :: " + response.getDataUtf8())) + + .collectList() + .subscribe(); + incomingMessages.set(subscribe); + }).start(); +} - def stop_listening_for_messages(self): - self._message_subscriber.cancel() +public void stopListeningForMessages() { + incomingMessages.get().dispose(); +} ``` -*Lines 21-31* define the `Subscriber` which will listen for incoming messages and print them on the client side. - -An instance of the `MessageListener` is stored on the client (*Line 33*) to later allow stopping the incoming message stream. - -*Lines 34-36* send the request and subscribe to the resulting `Publisher`. - -The method in *Lines 38-39* can be used to stop the above message listener. +TODO: explain ### Test the new functionality -Finally, let's test the new functionality. Modify the `main` method in the client: - -```py -import asyncio - -from rsocket.extensions.mimetypes import WellKnownMimeTypes -from rsocket.helpers import single_transport_provider -from rsocket.rsocket_client import RSocketClient -from rsocket.transports.tcp import TransportTCP - -async def main(): - connection1 = await asyncio.open_connection('localhost', 6565) - - async with RSocketClient(single_transport_provider(TransportTCP(*connection1)), - metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA) as client1: - connection2 = await asyncio.open_connection('localhost', 6565) +Finally, let's test the new functionality. Modify the `ClientApplication.main` method: - async with RSocketClient(single_transport_provider(TransportTCP(*connection2)), - metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA) as client2: - user1 = ChatClient(client1) - user2 = ChatClient(client2) - - await user1.login('user1') - await user2.login('user2') - - user2.listen_for_messages() - - await user1.private_message('user2', 'private message from user1') - - await asyncio.sleep(3) - - user2.stop_listening_for_messages() +```java +client.listenForMessages(); +client.sendMessage("{\"user\":\"user1\", \"content\":\"message\"}"); +Thread.sleep(2000); +client.incomingMessages.get().dispose(); ``` -In this example, we open two rsocket connections to the server (*lines 9-12* and *lines 13-16*). - -*Lines 17-21* wrap the rsocket clients with the chat client adapter and login the two users. - -*Line 23* makes user2 listen for incoming messages, while *line 25* has user1 send a message to user2. - -Finally, *lines 27-29* make the application wait for 3 seconds, then stops user2 listening for messages. +TODO: explain From 0b46dc4cdc4085214cefce48c09f5a9b4ee7847c Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Sun, 18 Dec 2022 11:57:51 +0200 Subject: [PATCH 08/14] rsocket-java - experiment with importing shared content --- content-docs/guides/guide-shared/routing.mdx | 4 ++++ .../guides/rsocket-java/tutorial/01-request_routing.mdx | 7 +++---- .../guides/rsocket-py/tutorial/01-request_routing.mdx | 8 ++++---- 3 files changed, 11 insertions(+), 8 deletions(-) create mode 100644 content-docs/guides/guide-shared/routing.mdx diff --git a/content-docs/guides/guide-shared/routing.mdx b/content-docs/guides/guide-shared/routing.mdx new file mode 100644 index 00000000..a46757f6 --- /dev/null +++ b/content-docs/guides/guide-shared/routing.mdx @@ -0,0 +1,4 @@ +In the previous step we added a single request-response handler. In order to allow more than one functionality to use this handler, +(e.g. login, messages, join/leave chanel) they need to be distinguished from each other. To achieve this, each request to the server will +be identified by a [route](https://github.com/rsocket/rsocket/blob/master/Extensions/Routing.md). This is similar to paths in an HTTP URL where +each URL may handle one of the HTTP methods (eg. GET, POST). diff --git a/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx b/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx index b38c04d6..fd83ccd8 100644 --- a/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx +++ b/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx @@ -4,10 +4,9 @@ title: Request routing sidebar_label: Request routing --- -In the previous step we added a single request-response handler. In order to allow more than one functionality to use this handler, -(e.g. login, messages, join/leave chanel) they need to be distinguished from each other. To achieve this, each request to the server will -be identified by a [route](https://github.com/rsocket/rsocket/blob/master/Extensions/Routing.md). This is similar to paths in an HTTP URL where -each URL may handle one of the HTTP methods (eg. GET, POST). +import Routing from '../../guide-shared/routing.mdx' + + See resulting code on [GitHub](https://github.com/rsocket/rsocket-java/tree/master/examples/tutorial/step1) diff --git a/content-docs/guides/rsocket-py/tutorial/01-request_routing.mdx b/content-docs/guides/rsocket-py/tutorial/01-request_routing.mdx index a137b096..b0618825 100644 --- a/content-docs/guides/rsocket-py/tutorial/01-request_routing.mdx +++ b/content-docs/guides/rsocket-py/tutorial/01-request_routing.mdx @@ -4,9 +4,9 @@ title: Request routing sidebar_label: Request routing --- -The chat application will have various functionality (e.g. private messages and channels). Each request to the server will -be identified by a [route](https://github.com/rsocket/rsocket/blob/master/Extensions/Routing.md) (similar to paths in an HTTP URL). To implement this we will use the `RequestRouter` and `RoutingRequestHandler` -classes. +import Routing from '../../guide-shared/routing.mdx' + + See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master/examples/tutorial/step1) @@ -16,7 +16,7 @@ We will modify the example from the previous step into a routed request response ### Routing request handler -The `handler_factory` method below replaces the `Handler` class from the previous step: +To implement routing we will use the `RequestRouter` and `RoutingRequestHandler` classes. The `handler_factory` method below replaces the `Handler` class from the previous step: ```py from typing import Awaitable From c53bfa7bc8bbbb531a6894b40817280134dea850 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Sun, 18 Dec 2022 13:25:51 +0200 Subject: [PATCH 09/14] rsocket-java - correctly mark partial document with _ --- content-docs/guides/guide-shared/{routing.mdx => _routing.mdx} | 0 .../guides/rsocket-java/tutorial/01-request_routing.mdx | 2 +- content-docs/guides/rsocket-py/tutorial/01-request_routing.mdx | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename content-docs/guides/guide-shared/{routing.mdx => _routing.mdx} (100%) diff --git a/content-docs/guides/guide-shared/routing.mdx b/content-docs/guides/guide-shared/_routing.mdx similarity index 100% rename from content-docs/guides/guide-shared/routing.mdx rename to content-docs/guides/guide-shared/_routing.mdx diff --git a/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx b/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx index fd83ccd8..260b0421 100644 --- a/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx +++ b/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx @@ -4,7 +4,7 @@ title: Request routing sidebar_label: Request routing --- -import Routing from '../../guide-shared/routing.mdx' +import Routing from '../../guide-shared/_routing.mdx' diff --git a/content-docs/guides/rsocket-py/tutorial/01-request_routing.mdx b/content-docs/guides/rsocket-py/tutorial/01-request_routing.mdx index b0618825..ed827e91 100644 --- a/content-docs/guides/rsocket-py/tutorial/01-request_routing.mdx +++ b/content-docs/guides/rsocket-py/tutorial/01-request_routing.mdx @@ -4,7 +4,7 @@ title: Request routing sidebar_label: Request routing --- -import Routing from '../../guide-shared/routing.mdx' +import Routing from '../../guide-shared/_routing.mdx' From 71817ef14b9556368bcabca79b467c40025893e9 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Sun, 18 Dec 2022 13:31:32 +0200 Subject: [PATCH 10/14] rsocket-java - fix docs referencing python in java --- .../guides/rsocket-java/tutorial/01-request_routing.mdx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx b/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx index 260b0421..fea217de 100644 --- a/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx +++ b/content-docs/guides/rsocket-java/tutorial/01-request_routing.mdx @@ -16,7 +16,7 @@ We will modify the example from the previous step into a routed request response ### Routing request handler -The `handler_factory` method below replaces the `Handler` class from the previous step: +Below is the modified code for instantiating `SocketAcceptor`: ```java final SocketAcceptor socketAcceptor = (setup, sendingSocket) -> Mono.just(new RSocket() { @@ -57,7 +57,7 @@ If no routing metadata is found (*Line 24*) an exception is thrown. ## Client side -Let's modify the client side to call this new routed request. For readability and maintainability, we will create a `ChatClient` +Let's modify the client side to call this new routed request. For readability and maintainability, we will create a `Client` which will wrap the RSocket client and provide the methods for interacting with the server. ### Client class From 8149737615b1b8f27cdd8c3b1e32916b74f090e9 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Sun, 18 Dec 2022 19:44:16 +0200 Subject: [PATCH 11/14] rsocket-java - extract python/java shared guide text --- content-docs/guides/guide-shared/_preface.mdx | 10 ++++++++++ content-docs/guides/rsocket-java/tutorial/index.mdx | 11 ++--------- content-docs/guides/rsocket-py/tutorial/index.mdx | 11 ++--------- 3 files changed, 14 insertions(+), 18 deletions(-) create mode 100644 content-docs/guides/guide-shared/_preface.mdx diff --git a/content-docs/guides/guide-shared/_preface.mdx b/content-docs/guides/guide-shared/_preface.mdx new file mode 100644 index 00000000..bfde2f48 --- /dev/null +++ b/content-docs/guides/guide-shared/_preface.mdx @@ -0,0 +1,10 @@ +We will be creating a chat application with a server and a client. + +The chat client will have the following functionality: +- Private messages between users +- Joining and sending messages to channels +- Uploading/Downloading files +- Getting server and client statistics (e.g. number of channels) + +Since the emphasis is on showcasing as much RSocket functionality as possible, some examples may be either a bit contrived, or +be possible to implement in a different way using RSocket. This is left as an exercise to the reader. diff --git a/content-docs/guides/rsocket-java/tutorial/index.mdx b/content-docs/guides/rsocket-java/tutorial/index.mdx index 682ca732..dae02395 100644 --- a/content-docs/guides/rsocket-java/tutorial/index.mdx +++ b/content-docs/guides/rsocket-java/tutorial/index.mdx @@ -12,16 +12,9 @@ If you find a problem, code or otherwise, please report an [issue](https://githu ## Preface -We will be creating a chat application with a server and a client. +import Preface from '../../guide-shared/_preface.mdx' -The chat client will have the following functionality: -- Private messages between users -- Joining and sending messages to channels -- Uploading/Downloading files -- Getting server and client statistics (e.g. number of channels) - -Since the emphasis is on showcasing as much RSocket functionality as possible, some examples may be either a bit contrived, or -be possible to implement in a different way using RSocket. This is left as an exercise to the reader. + ## Required knowledge diff --git a/content-docs/guides/rsocket-py/tutorial/index.mdx b/content-docs/guides/rsocket-py/tutorial/index.mdx index 354ab791..0ea8c6a5 100644 --- a/content-docs/guides/rsocket-py/tutorial/index.mdx +++ b/content-docs/guides/rsocket-py/tutorial/index.mdx @@ -12,16 +12,9 @@ If you find a problem, code or otherwise, please report an [issue](https://githu ## Preface -We will be setting up a chat application with a server and a client. +import Preface from '../../guide-shared/_preface.mdx' -The chat client will have the following functionality: -- Private messages between users -- Joining and sending messages to channels -- Uploading/Downloading files -- Getting server and client statistics (e.g. number of channels) - -Since the emphasis is on showcasing as much RSocket functionality as possible, some of the examples may be either a bit contrived, or -be possible to implement in a different way using RSocket. This is left as an exercise to the reader. + In the first steps the code will be written using only the core code. This results in more verbose code, but prevents the need for additional packages need be installed. From c558479e45aa52542a88d1752003b8edf6735be0 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Sun, 18 Dec 2022 20:02:26 +0200 Subject: [PATCH 12/14] rsocket-java - added more java sections --- .../rsocket-java/tutorial/04-channels.mdx | 267 +++++++++++++++++ .../guides/rsocket-java/tutorial/04-files.mdx | 211 ++++++++++++++ .../rsocket-java/tutorial/05-statistics.mdx | 272 ++++++++++++++++++ sidebar-rsocket-java.js | 5 +- 4 files changed, 754 insertions(+), 1 deletion(-) create mode 100644 content-docs/guides/rsocket-java/tutorial/04-channels.mdx create mode 100644 content-docs/guides/rsocket-java/tutorial/04-files.mdx create mode 100644 content-docs/guides/rsocket-java/tutorial/05-statistics.mdx diff --git a/content-docs/guides/rsocket-java/tutorial/04-channels.mdx b/content-docs/guides/rsocket-java/tutorial/04-channels.mdx new file mode 100644 index 00000000..2c8fc824 --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/04-channels.mdx @@ -0,0 +1,267 @@ +--- +slug: /guides/rsocket-java/tutorial/channels +title: Channels +sidebar_label: Channels +--- + +In this section we will add basic channel support: +- Joining and leaving channels +- Sending messages to channels + +See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master/examples/tutorial/step4) + +## Shared code + +Let's add a `channel` property to the `Message` class. It will contain the name of the channel the message is intended for: + +```java +public class Message { + + // existing fields + + public String channel; + + // existing constructors + + public Message(String user, String content, String channel) { + this.user = user; + this.content = content; + this.channel = channel; + } +} +``` + +## Server side + +### Data-classes +We will add functionality to store the channel state. Belows is the contents of the new `ChatChannel` class: + +```java +package io.rsocket.guide; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +public class ChatChannel { + public String name; + + final public BlockingQueue messages = new LinkedBlockingQueue<>(); + + final public AtomicReference messageRouter = new AtomicReference<>(); + + final public Set users = new HashSet<>(); +} +``` + +```java +public class ChatData { + + // existing fields + + public final Map channelByName = new HashMap<>(); +} +``` + +In the `channel_users` dict, the keys are channel names, and the value is a set of user session ids. A [WeakSet](https://docs.python.org/3/library/weakref.html#weakref.WeakSet) is used to automatically remove logged-out users. + +In the `channel_messages` dict, the keys are the channel names, and the value is a [Queue](https://docs.python.org/3/library/asyncio-queue.html) of messages sent by users to the channel. + +### Helper methods + +Next, we will define some helper methods for managing channel messages: +- `ensure_channel_exists`: initialize the data for a new channel if it doesn't exist. +- `channel_message_delivery`: an asyncio task which will deliver channel messages to all the users in a channel. + +```java +public void ensureChannel(String channelName) { + if (!chatData.channelByName.containsKey(channelName)) { + ChatChannel chatChannel = new ChatChannel(); + chatChannel.name = channelName; + chatData.channelByName.put(channelName, chatChannel); + final var thread = new Thread(() -> channelMessageRouter(channelName)); + thread.start(); + chatChannel.messageRouter.set(thread); + } +} +``` + +If the channel doesn't exist yet (*Line 2*) It will be added to the `channel_users` and `channel_messages` dictionaries. +*Line 5* starts an asyncio task (described below) which will deliver messages sent to the channel, to the channel's users. + +```java +public void channelMessageRouter(String channelName) { + final var channel = chatData.channelByName.get(channelName); + while (true) { + try { + final var message = channel.messages.poll(20, TimeUnit.DAYS); + if (message != null) { + for (String user : channel.users) { + findUserByName(user).doOnNext(session -> { + try { + session.messages.put(message); + } catch (InterruptedException exception) { + throw new RuntimeException(exception); + } + }).block(); + } + } + } catch (Exception exception) { + break; + } + } +} +``` + +The above method will loop infinitely and watch the `channel_messages` queue of the specified +channel (*Line 8*). Upon receiving a message, it will be delivered to all the users in the channel (*Lines 9-13*). + +### Join/Leave Channel + +Now let's add the channel join/leave handling request-response endpoints. + +```java +case "channel.join": + final var channelJoin = payload.getDataUtf8(); + ensureChannel(channelJoin); + join(channelJoin, session.sessionId); + return Mono.just(EmptyPayload.INSTANCE); +case "channel.leave": + leave(payload.getDataUtf8(), session.sessionId); + return Mono.just(EmptyPayload.INSTANCE); +``` + +### Send channel message + +Next we add the ability to send channel message. We will modify the `send_message` method: + +```java +case "message": + final var message = fromJson(payload.getDataUtf8(), Message.class); + final var targetMessage = new Message(session.username, message.content, message.channel); + + if (message.channel != null) { + chatData.channelByName.get(message.channel).messages.add(targetMessage); + } else { + + return findUserByName(message.user) + .doOnNext(targetSession -> targetSession.messages.add(targetMessage)) + .thenReturn(EmptyPayload.INSTANCE); + } +``` + +*Lines 16-20* decide whether it is a private message or a channel message, and add it to the relevant queue. + +### List channels + +```java +case "channels": + return Flux.fromIterable(chatData.channelByName.keySet()).map(DefaultPayload::create); +``` + +*Lines 6-11* define an endpoint for getting a list of channels. It uses the `StreamFromGenerator` helper. Note that the argument to this class +is a factory method for the [generator](https://docs.python.org/3/glossary.html#term-generator), not the generator itself. + +### Get channel users + +```java +case "channel.users": + return Flux.fromIterable(chatData.channelByName.getOrDefault(payload.getDataUtf8(), new ChatChannel()).users) + .map(DefaultPayload::create); +``` +*Lines 6-11* define an endpoint for getting a list of users in a given channel. The `find_username_by_session` helper method is used to +convert the session ids to usernames. + +If the channel does not exist (*Line 10*) the `EmptyStream` helper can be used as a response. + +## Client side + +We will add the methods on the `ChatClient` to interact with the new server functionality: + +```py +from typing import List + +from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket +from rsocket.extensions.helpers import composite, route +from rsocket.frame_helpers import ensure_bytes +from rsocket.payload import Payload +from rsocket.helpers import utf8_decode + +from shared import encode_dataclass + +class ChatClient: + + async def join(self, channel_name: str): + request = Payload(ensure_bytes(channel_name), composite(route('channel.join'))) + await self._rsocket.request_response(request) + return self + + async def leave(self, channel_name: str): + request = Payload(ensure_bytes(channel_name), composite(route('channel.leave'))) + await self._rsocket.request_response(request) + return self + + async def channel_message(self, channel: str, content: str): + print(f'Sending {content} to channel {channel}') + await self._rsocket.request_response(Payload(encode_dataclass(Message(channel=channel, content=content)), + composite(route('message')))) + + async def list_channels(self) -> List[str]: + request = Payload(metadata=composite(route('channels'))) + response = await AwaitableRSocket(self._rsocket).request_stream(request) + return list(map(lambda _: utf8_decode(_.data), response)) + + async def get_users(self, channel_name: str) -> List[str]: + request = Payload(ensure_bytes(channel_name), composite(route('channel.users'))) + users = await AwaitableRSocket(self._rsocket).request_stream(request) + return [utf8_decode(user.data) for user in users] +``` + +*Lines 15-23* define the join/leave methods. They are both simple routed `request_response` calls, with the channel name as the payload data. + +*Lines 25-28* define the list_channels method. This method uses the `AwaitableRSocket` adapter to simplify getting the response stream as a list. + +*Lines 30-31* define the get_users method, which lists a channel's users. + +Update the `print_message` method to include the channel: + +```py +def print_message(data: bytes): + message = Message(**json.loads(data)) + print(f'{self._username}: from {message.user} ({message.channel}): {message.content}') +``` + +Let's test the new functionality using the following code: + +```py +async def messaging_example(user1: ChatClient, user2: ChatClient): + user1.listen_for_messages() + user2.listen_for_messages() + + await user1.join('channel1') + await user2.join('channel1') + + print(f'Channels: {await user1.list_channels()}') + + await user1.private_message('user2', 'private message from user1') + await user1.channel_message('channel1', 'channel message from user1') + + await asyncio.sleep(1) + + user1.stop_listening_for_messages() + user2.stop_listening_for_messages() +``` + +Call the example method from the `main` method and pass it the two chat clients: + +```py +user1 = ChatClient(client1) +user2 = ChatClient(client2) + +await user1.login('user1') +await user2.login('user2') + +await messaging_example(user1, user2) +``` diff --git a/content-docs/guides/rsocket-java/tutorial/04-files.mdx b/content-docs/guides/rsocket-java/tutorial/04-files.mdx new file mode 100644 index 00000000..0dd14a0d --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/04-files.mdx @@ -0,0 +1,211 @@ +--- +slug: /guides/rsocket-java/tutorial/files +title: File upload/download +sidebar_label: File upload/download +--- + +In this section we will add very basic file upload/download functionality. All files will be stored in memory, +and downloadable by all users. + +See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master/examples/tutorial/step5) + +## Shared + +First, define a mimetype which will represent file names in the payloads. This will be used by both server and client, so +place it in the shared module: + +```py +chat_filename_mimetype = b'chat/file-name' +``` + +## Server side + +### Data-classes + +Next, we need a place to store the files in memory. Add a dictionary to the `ChatData` class to store the files. +The keys will be the file names, and the values the file content. + +```py +from dataclasses import dataclass, field +from typing import Dict + +@dataclass(frozen=True) +class ChatData: + ... + files: Dict[str, bytes] = field(default_factory=dict) +``` + +### Helper methods + +Next, define a helper method which extracts the filename from the upload/download payload: + +```py +from shared import chat_filename_mimetype +from rsocket.extensions.composite_metadata import CompositeMetadata +from rsocket.helpers import utf8_decode + +def get_file_name(composite_metadata: CompositeMetadata): + return utf8_decode(composite_metadata.find_by_mimetype(chat_filename_mimetype)[0].content) +``` + +This helper uses the `find_by_mimetype` method of `CompositeMetadata` to get a list of metadata items with the +specified mimetype. + +### Endpoints + +Next, register the request-response endpoints for uploading and downloading files, and for retrieving a list of +available files: + +```py +from typing import Awaitable + +from shared import chat_filename_mimetype +from rsocket.extensions.composite_metadata import CompositeMetadata +from rsocket.extensions.helpers import composite, metadata_item +from rsocket.frame_helpers import ensure_bytes +from rsocket.helpers import create_response +from rsocket.payload import Payload +from rsocket.routing.request_router import RequestRouter +from rsocket.streams.stream_from_generator import StreamFromGenerator + +class ChatUserSession: + + def router_factory(self): + router = RequestRouter() + + @router.response('file.upload') + async def upload_file(payload: Payload, composite_metadata: CompositeMetadata) -> Awaitable[Payload]: + chat_data.files[get_file_name(composite_metadata)] = payload.data + return create_response() + + @router.response('file.download') + async def download_file(composite_metadata: CompositeMetadata) -> Awaitable[Payload]: + file_name = get_file_name(composite_metadata) + return create_response(chat_data.files[file_name], + composite(metadata_item(ensure_bytes(file_name), chat_filename_mimetype))) + + @router.stream('files') + async def get_file_names() -> Publisher: + count = len(chat_data.files) + generator = ((Payload(ensure_bytes(file_name)), index == count) for (index, file_name) in + enumerate(chat_data.files.keys(), 1)) + return StreamFromGenerator(lambda: generator) +``` + +The `upload_file` and `download_file` methods (*Lines 18-27*) extract the filename from the metadata using the helper method we created, +and set and get the file content from the `chat_data` storage respectively. + +In this section we introduce the second argument which can be passed to routed endpoints. If the session is set up to use +composite metadata, the `composite_metadata` parameter will contain a parsed structure of the metadata in the request payload. + +Line 34 uses the `StreamFromGenerator` helper which creates a stream publisher from a generator factory. + +The generator must return a tuple of two values for each iteration: +- Payload instance +- boolean value denoting if it is the last element in the generator. +The argument for the helper class is a method which returns a generator, not the generator itself. + +### Large file support + +In the `download_file` method (Line 24), even though the frame size limit is 16MB, larger files can be downloaded. +To allow this, fragmentation must be enabled. This is done by adding the `fragment_size_bytes` argument to the `RSocketServer` instantiation: + +```py +from rsocket.rsocket_server import RSocketServer +from rsocket.transports.tcp import TransportTCP + +def session(*connection): + RSocketServer(TransportTCP(*connection), + handler_factory=handler_factory, + fragment_size_bytes=1_000_000) +``` + +## Client side + +### Methods + +On the client side, we will add 3 methods to access the new server functionality: +- `upload` +- `download` +- `list_files` + +```py +from typing import List + +from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket +from rsocket.extensions.helpers import composite, route, metadata_item +from rsocket.frame_helpers import ensure_bytes +from rsocket.helpers import utf8_decode +from rsocket.payload import Payload + +from shared import chat_filename_mimetype + +class ChatClient: + + async def upload(self, file_name: str, content: bytes): + await self._rsocket.request_response(Payload(content, composite( + route('file.upload'), + metadata_item(ensure_bytes(file_name), chat_filename_mimetype) + ))) + + async def download(self, file_name: str): + return await self._rsocket.request_response(Payload( + metadata=composite( + route('file.download'), + metadata_item(ensure_bytes(file_name), chat_filename_mimetype) + ))) + + async def list_files(self) -> List[str]: + request = Payload(metadata=composite(route('files'))) + response = await AwaitableRSocket(self._rsocket).request_stream(request) + return list(map(lambda _: utf8_decode(_.data), response)) +``` + +*Lines 13-17* define the upload method. the `Payload` of the request-response consists of a body with the file's contents, +and metadata which contains routing and the filename. To specify the filename a custom mimetype was used **chat/file-name**. +This mime type was used to create a metadata item using the `metadata_item` method. the `composite` method was used to combine +the two metadata items to the complete metadata of the payload. + +*Lines 19-24* define the download method. It is similar to the upload method, except for the absence of the payload data, +and a different route: 'file.download'. + +*Lines 26-32* defines the list_files method. Same as the `list_channels` method in the previous section, +it uses the request-stream 'files' endpoint to get a list of files. + +### Large file support + +Same as on the server size, fragmentation must be enabled to allow uploading files larger than 16MB. +This is done by adding the `fragment_size_bytes` argument to the `RSocketClient` instantiation. Do this for both clients: + +```py +from rsocket.extensions.mimetypes import WellKnownMimeTypes +from rsocket.helpers import single_transport_provider +from rsocket.rsocket_client import RSocketClient +from rsocket.transports.tcp import TransportTCP + +async with RSocketClient(single_transport_provider(TransportTCP(*connection1)), + metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA, + fragment_size_bytes=1_000_000) as client1: + ... +``` + +We will try out the new functionality with the following code: + +```py +async def files_example(user1: ChatClient, user2: ChatClient): + file_contents = b'abcdefg1234567' + file_name = 'file_name_1.txt' + + await user1.upload(file_name, file_contents) + + print(f'Files: {await user1.list_files()}') + + download = await user2.download(file_name) + + if download.data != file_contents: + raise Exception('File download failed') + else: + print(f'Downloaded file: {len(download.data)} bytes') +``` + +call the `files_example` method from the main client method. diff --git a/content-docs/guides/rsocket-java/tutorial/05-statistics.mdx b/content-docs/guides/rsocket-java/tutorial/05-statistics.mdx new file mode 100644 index 00000000..d6a8ee62 --- /dev/null +++ b/content-docs/guides/rsocket-java/tutorial/05-statistics.mdx @@ -0,0 +1,272 @@ +--- +slug: /guides/rsocket-java/tutorial/statistics +title: Statistics +sidebar_label: Statistics +--- + +As a last step, we will add passing some statistics between the client and the server: +- The client will be able to send its memory usage to the server. +- The server will report the number of users and channels. The client will be able to specify which of these statistics it wants. + +See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master/examples/tutorial/step6) + +## Shared code + +We will define some data-classes to represent the payloads being sent between the client and server: + +```py +from dataclasses import dataclass, field +from typing import Optional, List + +@dataclass(frozen=True) +class ServerStatistics: + user_count: Optional[int] = None + channel_count: Optional[int] = None + +@dataclass() +class ServerStatisticsRequest: + ids: Optional[List[str]] = field(default_factory=lambda: ['users', 'channels']) + period_seconds: Optional[int] = field(default_factory=lambda: 2) + +@dataclass(frozen=True) +class ClientStatistics: + memory_usage: Optional[int] = None +``` + +*Lines 4-7* define the data sent to the client upon request. It contains two optional fields, the user count and the channel count. + +*Lines 9-12* define a request from the client which specified which statistics it wants and how often to report. The `ids` list +represents the two values in the `ServerStatistics` class. + +*Lines 14-16* define the statistics sent from the client to the server. + +## Server side + +# Data-classes + +First we will add a field on the `UserSessionData` to store the last statistics sent by the client: + +```py +from dataclasses import dataclass +from typing import Optional + +from shared import ClientStatistics + +@dataclass() +class UserSessionData: + ... + statistics: Optional[ClientStatistics] = None +``` + +### Endpoints + +We will add two endpoints, one for receiving from the client, and one for requesting specific statistics from the server. + +#### Client send statistics + +```py +import json + +from shared import ClientStatistics +from rsocket.helpers import utf8_decode +from rsocket.payload import Payload +from rsocket.routing.request_router import RequestRouter + +class ChatUserSession: + def router_factory(self): + router = RequestRouter() + + ... + + @router.fire_and_forget('statistics') + async def receive_statistics(payload: Payload): + statistics = ClientStatistics(**json.loads(utf8_decode(payload.data))) + self._session.statistics = statistics +``` + +*Lines 14-17* defines an endpoint for receiving statistics from the client. It uses the fire-and-forget request type, since this +data is not critical to the application. No return value is required from this method, and if provided will be ignored. + +#### Receive requested statistics + +We will add a helper method for creating a new statistics response: + +```py +def new_statistics_data(statistics_request: ServerStatisticsRequest): + statistics_data = {} + + if 'users' in statistics_request.ids: + statistics_data['user_count'] = len(chat_data.user_session_by_id) + + if 'channels' in statistics_request.ids: + statistics_data['channel_count'] = len(chat_data.channel_messages) + + return ServerStatistics(**statistics_data) +``` + +Next we define the endpoint for sending statistics to the client: + +```py +import asyncio +import json + +from shared import ClientStatistics, ServerStatisticsRequest, ServerStatistics, encode_dataclass +from reactivestreams.publisher import DefaultPublisher +from reactivestreams.subscriber import Subscriber, DefaultSubscriber +from reactivestreams.subscription import DefaultSubscription +from rsocket.helpers import utf8_decode +from rsocket.payload import Payload +from rsocket.routing.request_router import RequestRouter + +class ChatUserSession: + def router_factory(self): + router = RequestRouter() + + @router.channel('statistics') + async def send_statistics(): + + class StatisticsChannel(DefaultPublisher, DefaultSubscriber, DefaultSubscription): + + def __init__(self, session: UserSessionData): + super().__init__() + self._session = session + self._requested_statistics = ServerStatisticsRequest() + + def cancel(self): + self._sender.cancel() + + def subscribe(self, subscriber: Subscriber): + super().subscribe(subscriber) + subscriber.on_subscribe(self) + self._sender = asyncio.create_task(self._statistics_sender()) + + async def _statistics_sender(self): + while True: + try: + await asyncio.sleep(self._requested_statistics.period_seconds) + next_message = new_statistics_data(self._requested_statistics) + + self._subscriber.on_next(dataclass_to_payload(next_message)) + except Exception: + logging.error('Statistics', exc_info=True) + + def on_next(self, value: Payload, is_complete=False): + request = ServerStatisticsRequest(**json.loads(utf8_decode(value.data))) + + logging.info(f'Received statistics request {request.ids}, {request.period_seconds}') + + if request.ids is not None: + self._requested_statistics.ids = request.ids + + if request.period_seconds is not None: + self._requested_statistics.period_seconds = request.period_seconds + + response = StatisticsChannel(self._session) + + return response, response +``` + +*Lines 16-57* defines an endpoint for sending statistics to the client. It uses the request-channel request type, which will allow +the client to both receive the server statistics, and update the server as to which statistics it wants to receive. + +## Client side + +On the client side we will add the methods to access the new server side functionality: +- `send_statistics` +- `listen_for_statistics` + +```py +import resource + +from shared import ServerStatistics, ClientStatistics +from rsocket.extensions.helpers import composite, route +from rsocket.payload import Payload + +class ChatClient: + + async def send_statistics(self): + memory_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + payload = Payload(encode_dataclass(ClientStatistics(memory_usage=memory_usage)), + metadata=composite(route('statistics'))) + await self._rsocket.fire_and_forget(payload) +``` + +The `send_statistics` uses a fire-and-forget request (*Line 15*) to send statistics to the server. This request does not receive a response, +so does not wait for confirmation that the payload was delivered, as it is not critical information (at least for this tutorial). + +Next we will request statistics from the server. First we will define a handler to listen on the channel request and control it: + +```py +import json +from asyncio import Event +from datetime import timedelta +from typing import List + +from examples.tutorial.step6.models import ServerStatistics, ServerStatisticsRequest, dataclass_to_payload +from reactivestreams.publisher import DefaultPublisher +from reactivestreams.subscriber import DefaultSubscriber +from reactivestreams.subscription import DefaultSubscription +from rsocket.helpers import utf8_decode +from rsocket.payload import Payload + +class StatisticsHandler(DefaultPublisher, DefaultSubscriber, DefaultSubscription): + + def __init__(self): + super().__init__() + self.done = Event() + + def on_next(self, value: Payload, is_complete=False): + statistics = ServerStatistics(**json.loads(utf8_decode(value.data))) + print(statistics) + + if is_complete: + self.done.set() + + def cancel(self): + self.subscription.cancel() + + def set_requested_statistics(self, ids: List[str]): + self._subscriber.on_next(dataclass_to_payload(ServerStatisticsRequest(ids=ids))) + + def set_period(self, period: timedelta): + self._subscriber.on_next( + dataclass_to_payload(ServerStatisticsRequest(period_seconds=int(period.total_seconds())))) +``` + +Next we will use this new handler in the `ChatClient`: + +```py +from rsocket.extensions.helpers import composite, route +from rsocket.payload import Payload + +class ChatClient: + + def listen_for_statistics(self) -> StatisticsHandler: + self._statistics_subscriber = StatisticsHandler() + self._rsocket.request_channel(Payload(metadata=composite( + route('statistics') + )), publisher=self._statistics_subscriber).subscribe(self._statistics_subscriber) + return self._statistics_subscriber + + def stop_listening_for_statistics(self): + self._statistics_subscriber.cancel() +``` + +Finally, let's try out this new functionality in the client: + +```py +async def statistics_example(user1): + await user1.send_statistics() + + statistics_control = user1.listen_for_statistics() + + await asyncio.sleep(5) + + statistics_control.set_requested_statistics(['users']) + + await asyncio.sleep(5) + + user1.stop_listening_for_statistics() +``` + +Call this new method from the client `main` method. diff --git a/sidebar-rsocket-java.js b/sidebar-rsocket-java.js index 3fceeb21..d3fd3dd3 100644 --- a/sidebar-rsocket-java.js +++ b/sidebar-rsocket-java.js @@ -7,7 +7,10 @@ module.exports = [ "guides/rsocket-java/tutorial/base", "guides/rsocket-java/tutorial/request_routing", "guides/rsocket-java/tutorial/user_session", - "guides/rsocket-java/tutorial/messages" + "guides/rsocket-java/tutorial/messages", + "guides/rsocket-java/tutorial/channels", + "guides/rsocket-java/tutorial/files", + "guides/rsocket-java/tutorial/statistics" ] } ]; From e698d973e7f04a07c00020c7059c9658173212df Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Mon, 19 Dec 2022 18:22:56 +0200 Subject: [PATCH 13/14] rsocket-java guide - minor fix --- content-docs/guides/rsocket-java/tutorial/04-channels.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/content-docs/guides/rsocket-java/tutorial/04-channels.mdx b/content-docs/guides/rsocket-java/tutorial/04-channels.mdx index 2c8fc824..6d5285b6 100644 --- a/content-docs/guides/rsocket-java/tutorial/04-channels.mdx +++ b/content-docs/guides/rsocket-java/tutorial/04-channels.mdx @@ -178,7 +178,7 @@ If the channel does not exist (*Line 10*) the `EmptyStream` helper can be used a ## Client side -We will add the methods on the `ChatClient` to interact with the new server functionality: +We will add the methods on the `Client` to interact with the new server functionality: ```py from typing import List From 66fdf1734979f17ca35952898f6778997d0f5d59 Mon Sep 17 00:00:00 2001 From: jell-o-fishi Date: Fri, 23 Dec 2022 13:05:37 +0200 Subject: [PATCH 14/14] rsocket-java guide - work --- content-docs/guides/rsocket-java/index.mdx | 44 +----- .../rsocket-java/tutorial/05-statistics.mdx | 131 +++++++++++------- 2 files changed, 83 insertions(+), 92 deletions(-) diff --git a/content-docs/guides/rsocket-java/index.mdx b/content-docs/guides/rsocket-java/index.mdx index a2fdfa48..647c3f65 100644 --- a/content-docs/guides/rsocket-java/index.mdx +++ b/content-docs/guides/rsocket-java/index.mdx @@ -4,48 +4,8 @@ title: rsocket-java sidebar_label: Introduction --- -:::caution -The python package API is not stable. There may be changes until version 1.0.0. -::: - -The python `rsocket` package implements the 1.0 version of the [RSocket protocol](/about/protocol) -(excluding "resume" functionality) and is designed for use in python >= 3.8 using asyncio. +The java `rsocket` package implements the 1.0 version of the [RSocket protocol](/about/protocol). ## Guides -See [Quick Start](/guides/rsocket-py/simple) for a short getting started guide, and [Tutorial](/guides/rsocket-py/tutorial) for a more in depth -step by step construction of an application. - -Use the [Command-line tool](/guides/rsocket-py/cli) to quicly interact with an RSocket server without writing code. - -Other code snippets examples for [Client](/guides/rsocket-py/client), [Server](/guides/rsocket-py/server) -and [RxPy](/guides/rsocket-py/rxpy) integration are also available. - -API Documentation (Under construction) is available at [ReadTheDocs](https://rsocket.readthedocs.io/) - -## Installing - -A pip package is available when installing with -'pip install rsocket' ([rsocket](https://pypi.org/project/rsocket/)) - -Optionally, install using some extras: - -- rx: RxPy3 client -- reactivex: RxPy4 client -- aiohttp: Websocket server/client transport for aiohttp framework -- quart: Websocket server transport for quart framework -- quic: QUIC/HTTP3(wss) support -- cli: Command line interface - -## Status - -The following are currently implemented: - -- RSocketClient / RSocketServer -- Transports: -- TCP -- QUIC/HTTP3(wss) -- Websocket (aiohttp (server/client), quart (server) -- Simple load balancing -- Minimal integration with RxPy (>= 3.x) and reactivex -- Command line interface +See [Tutorial](/guides/rsocket-java/tutorial) for a step by step construction of an application. diff --git a/content-docs/guides/rsocket-java/tutorial/05-statistics.mdx b/content-docs/guides/rsocket-java/tutorial/05-statistics.mdx index d6a8ee62..b496091b 100644 --- a/content-docs/guides/rsocket-java/tutorial/05-statistics.mdx +++ b/content-docs/guides/rsocket-java/tutorial/05-statistics.mdx @@ -12,76 +12,107 @@ See resulting code on [GitHub](https://github.com/rsocket/rsocket-py/tree/master ## Shared code -We will define some data-classes to represent the payloads being sent between the client and server: +We will define some POJOs to represent the payloads being sent between the client and server. -```py -from dataclasses import dataclass, field -from typing import Optional, List - -@dataclass(frozen=True) -class ServerStatistics: - user_count: Optional[int] = None - channel_count: Optional[int] = None - -@dataclass() -class ServerStatisticsRequest: - ids: Optional[List[str]] = field(default_factory=lambda: ['users', 'channels']) - period_seconds: Optional[int] = field(default_factory=lambda: 2) - -@dataclass(frozen=True) -class ClientStatistics: - memory_usage: Optional[int] = None +:::note +The Jackson JSON annotations are optional. They are only required for compatibility with the client/server implementations of the other languages. +::: + +A `ServerStatistics` will hold the server channel and user count: + +```java +import com.fasterxml.jackson.annotation.JsonProperty; + +public class ServerStatistic { + @JsonProperty("user_count") + public Integer userCount; + + @JsonProperty("channel_count") + public Integer channelCount; + + public ServerStatistic() { + } + + public ServerStatistic(Integer userCount, Integer channelCount) { + this.userCount = userCount; + this.channelCount = channelCount; + } +} ``` -*Lines 4-7* define the data sent to the client upon request. It contains two optional fields, the user count and the channel count. +A `ClientStatistics` will hold the client's memory usage: +```java +import com.fasterxml.jackson.annotation.JsonProperty; -*Lines 9-12* define a request from the client which specified which statistics it wants and how often to report. The `ids` list -represents the two values in the `ServerStatistics` class. +public class ClientStatistics { + @JsonProperty("memory_usage") + public Long memoryUsage; -*Lines 14-16* define the statistics sent from the client to the server. + public ClientStatistics() { + } -## Server side + public ClientStatistics(Long memoryUsage) { + this.memoryUsage = memoryUsage; + } +} +``` -# Data-classes +And finally, the client will use a `StatisticsSettings` instance to tell the server which statistics it wants and how often: +```java +import com.fasterxml.jackson.annotation.JsonProperty; -First we will add a field on the `UserSessionData` to store the last statistics sent by the client: +public class ServerStatistic { + @JsonProperty("user_count") + public Integer userCount; -```py -from dataclasses import dataclass -from typing import Optional + @JsonProperty("channel_count") + public Integer channelCount; -from shared import ClientStatistics + public ServerStatistic() { + } -@dataclass() -class UserSessionData: - ... - statistics: Optional[ClientStatistics] = None + public ServerStatistic(Integer userCount, Integer channelCount) { + this.userCount = userCount; + this.channelCount = channelCount; + } +} ``` -### Endpoints +## Server side -We will add two endpoints, one for receiving from the client, and one for requesting specific statistics from the server. +### Session -#### Client send statistics +First we will add fields on the `Session` class to hold statistics and statistics-settings sent from the client: -```py -import json +```java +public class Session { -from shared import ClientStatistics -from rsocket.helpers import utf8_decode -from rsocket.payload import Payload -from rsocket.routing.request_router import RequestRouter + public StatisticsSettings statisticsSettings = new StatisticsSettings(); -class ChatUserSession: - def router_factory(self): - router = RequestRouter() + public ClientStatistics clientStatistics; +} +``` + +### Endpoints + +We will add two endpoints, one for receiving from the client, and one for sending specific statistics from the server. + +#### Client sent statistics + +```java +public Mono fireAndForget(Payload payload) { + final var route = requireRoute(payload); - ... + return Mono.defer(() -> { + switch (route) { + case "statistics": + session.clientStatistics = fromJson(payload.getDataUtf8(), ClientStatistics.class); + return Mono.empty(); + } - @router.fire_and_forget('statistics') - async def receive_statistics(payload: Payload): - statistics = ClientStatistics(**json.loads(utf8_decode(payload.data))) - self._session.statistics = statistics + throw new IllegalStateException(); + }); +} ``` *Lines 14-17* defines an endpoint for receiving statistics from the client. It uses the fire-and-forget request type, since this