From cead0900d573bbbb4c1e2ef36037618a70e86244 Mon Sep 17 00:00:00 2001 From: Sanjay Akut Date: Sun, 22 Jun 2025 12:02:29 -0400 Subject: [PATCH 01/10] feat: Add database schema for multi-user chat collaboration - Add ChatSessionParticipant table to track participants and their roles - Add ChatInvitation table for managing invitations - Add collaboration_enabled field to ChatSession - Add sender_id to ChatMessage to track message authors - Create migration and update models/enums --- ...2abca_add_multi_user_chat_collaboration.py | 118 ++++++++++++++++++ backend/onyx/db/enums.py | 18 +++ backend/onyx/db/models.py | 90 +++++++++++-- 3 files changed, 219 insertions(+), 7 deletions(-) create mode 100644 backend/alembic/versions/26a9d522abca_add_multi_user_chat_collaboration.py diff --git a/backend/alembic/versions/26a9d522abca_add_multi_user_chat_collaboration.py b/backend/alembic/versions/26a9d522abca_add_multi_user_chat_collaboration.py new file mode 100644 index 00000000000..bce1298d7e7 --- /dev/null +++ b/backend/alembic/versions/26a9d522abca_add_multi_user_chat_collaboration.py @@ -0,0 +1,118 @@ +"""add_multi_user_chat_collaboration + +Revision ID: 26a9d522abca +Revises: ffc707a226b4 +Create Date: 2025-06-22 16:05:10.592181 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '26a9d522abca' +down_revision = 'ffc707a226b4' +branch_labels = None +depends_on = None + +# Define Enum types for roles and statuses +chat_participant_role = sa.Enum('OWNER', 'COLLABORATOR', name='chatparticipantrole') +chat_invitation_status = sa.Enum('PENDING', 'ACCEPTED', 'DECLINED', name='chatinvitationstatus') + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + + # Create new ENUM types in the database + chat_participant_role.create(op.get_bind(), checkfirst=True) + chat_invitation_status.create(op.get_bind(), checkfirst=True) + + # Add collaboration_enabled to chat_session table + op.add_column( + 'chat_session', + sa.Column('collaboration_enabled', sa.Boolean(), server_default=sa.text('false'), nullable=False) + ) + + # Add sender_id to chat_message table + op.add_column( + 'chat_message', + sa.Column('sender_id', postgresql.UUID(as_uuid=True), nullable=True) + ) + op.create_foreign_key( + 'fk_chat_message_sender_id_user', + 'chat_message', 'user', + ['sender_id'], ['id'], + ondelete='SET NULL' + ) + + # Create chat_session_participant table + op.create_table('chat_session_participant', + sa.Column('chat_session_id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('user_id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('role', chat_participant_role, nullable=False), + sa.Column('joined_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False), + sa.Column('last_read_message_id', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint(['chat_session_id'], ['chat_session.id'], name=op.f('fk_chat_session_participant_chat_session_id_chat_session'), ondelete='CASCADE'), + sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_chat_session_participant_user_id_user'), ondelete='CASCADE'), + sa.ForeignKeyConstraint(['last_read_message_id'], ['chat_message.id'], name=op.f('fk_chat_session_participant_last_read_message_id_chat_message'), ondelete='SET NULL'), + sa.PrimaryKeyConstraint('chat_session_id', 'user_id', name=op.f('pk_chat_session_participant')) + ) + op.create_index(op.f('ix_chat_session_participant_chat_session_id'), 'chat_session_participant', ['chat_session_id'], unique=False) + op.create_index(op.f('ix_chat_session_participant_user_id'), 'chat_session_participant', ['user_id'], unique=False) + + # Create chat_invitation table + op.create_table('chat_invitation', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('chat_session_id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('inviter_id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('invitee_id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('status', chat_invitation_status, server_default='PENDING', nullable=False), + sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False), + sa.ForeignKeyConstraint(['chat_session_id'], ['chat_session.id'], name=op.f('fk_chat_invitation_chat_session_id_chat_session'), ondelete='CASCADE'), + sa.ForeignKeyConstraint(['inviter_id'], ['user.id'], name=op.f('fk_chat_invitation_inviter_id_user'), ondelete='CASCADE'), + sa.ForeignKeyConstraint(['invitee_id'], ['user.id'], name=op.f('fk_chat_invitation_invitee_id_user'), ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id', name=op.f('pk_chat_invitation')) + ) + op.create_index(op.f('ix_chat_invitation_chat_session_id'), 'chat_invitation', ['chat_session_id'], unique=False) + op.create_index(op.f('ix_chat_invitation_invitee_id'), 'chat_invitation', ['invitee_id'], unique=False) + + # Data migration: Populate chat_session_participant with existing owners + bind = op.get_bind() + session = sa.orm.Session(bind=bind) + + session.execute(sa.text(""" + INSERT INTO chat_session_participant (chat_session_id, user_id, role, joined_at) + SELECT id, user_id, 'OWNER', time_created + FROM chat_session + WHERE user_id IS NOT NULL; + """)) + + # Data migration: Populate sender_id for existing messages + # Assuming the message sender is the chat session owner for existing 'USER' type messages + session.execute(sa.text(""" + UPDATE chat_message + SET sender_id = cs.user_id + FROM chat_session cs + WHERE chat_message.chat_session_id = cs.id + AND cs.user_id IS NOT NULL + AND chat_message.message_type = 'USER'; + """)) + + session.commit() + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('chat_invitation') + op.drop_table('chat_session_participant') + + op.drop_constraint('fk_chat_message_sender_id_user', 'chat_message', type_='foreignkey') + op.drop_column('chat_message', 'sender_id') + + op.drop_column('chat_session', 'collaboration_enabled') + + # Drop ENUM types from the database + chat_invitation_status.drop(op.get_bind(), checkfirst=True) + chat_participant_role.drop(op.get_bind(), checkfirst=True) + # ### end Alembic commands ### diff --git a/backend/onyx/db/enums.py b/backend/onyx/db/enums.py index 0730096990a..1f969fe32b2 100644 --- a/backend/onyx/db/enums.py +++ b/backend/onyx/db/enums.py @@ -78,6 +78,24 @@ class ChatSessionSharedStatus(str, PyEnum): PRIVATE = "private" +# --------------------------------------------------------------------------- +# Multi-user Chat Collaboration Enums +# --------------------------------------------------------------------------- +class ChatParticipantRole(str, PyEnum): + """Role of a participant in a collaborative chat session.""" + + OWNER = "OWNER" + COLLABORATOR = "COLLABORATOR" + + +class ChatInvitationStatus(str, PyEnum): + """Status of an invitation to join a collaborative chat session.""" + + PENDING = "PENDING" + ACCEPTED = "ACCEPTED" + DECLINED = "DECLINED" + + class ConnectorCredentialPairStatus(str, PyEnum): SCHEDULED = "SCHEDULED" INITIAL_INDEXING = "INITIAL_INDEXING" diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index 8c2074fb7fc..59cd9141ac2 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -48,20 +48,21 @@ from onyx.configs.constants import MessageType from onyx.db.enums import ( AccessType, + ChatInvitationStatus, + ChatParticipantRole, + ChatSessionSharedStatus, + ConnectorCredentialPairStatus, EmbeddingPrecision, IndexingMode, - SyncType, + IndexModelStatus, SyncStatus, + SyncType, + TaskStatus, ) from onyx.configs.constants import NotificationType from onyx.configs.constants import SearchFeedbackType from onyx.configs.constants import TokenRateLimitScope from onyx.connectors.models import InputType -from onyx.db.enums import ChatSessionSharedStatus -from onyx.db.enums import ConnectorCredentialPairStatus -from onyx.db.enums import IndexingStatus -from onyx.db.enums import IndexModelStatus -from onyx.db.enums import TaskStatus from onyx.db.pydantic_type import PydanticType from onyx.utils.logger import setup_logger from onyx.utils.special_types import JSON_ro @@ -862,7 +863,7 @@ class SearchSettings(Base): ) def __repr__(self) -> str: - return f"" @property @@ -1262,6 +1263,9 @@ class ChatSession(Base): Enum(ChatSessionSharedStatus, native_enum=False), default=ChatSessionSharedStatus.PRIVATE, ) + collaboration_enabled: Mapped[bool] = mapped_column( + Boolean, default=False, nullable=False + ) folder_id: Mapped[int | None] = mapped_column( ForeignKey("chat_folder.id"), nullable=True ) @@ -1304,6 +1308,16 @@ class ChatSession(Base): "ChatMessage", back_populates="chat_session", cascade="all, delete-orphan" ) persona: Mapped["Persona"] = relationship("Persona") + participants: Mapped[list["ChatSessionParticipant"]] = relationship( + "ChatSessionParticipant", + back_populates="chat_session", + cascade="all, delete-orphan", + ) + invitations: Mapped[list["ChatInvitation"]] = relationship( + "ChatInvitation", + back_populates="chat_session", + cascade="all, delete-orphan", + ) class ChatMessage(Base): @@ -1321,6 +1335,9 @@ class ChatMessage(Base): chat_session_id: Mapped[UUID] = mapped_column( PGUUID(as_uuid=True), ForeignKey("chat_session.id") ) + sender_id: Mapped[UUID | None] = mapped_column( + ForeignKey("user.id", ondelete="SET NULL"), nullable=True + ) alternate_assistant_id = mapped_column( Integer, ForeignKey("persona.id"), nullable=True @@ -1357,6 +1374,7 @@ class ChatMessage(Base): refined_answer_improvement: Mapped[bool] = mapped_column(Boolean, nullable=True) chat_session: Mapped[ChatSession] = relationship("ChatSession") + sender: Mapped[User | None] = relationship("User", foreign_keys=[sender_id]) prompt: Mapped[Optional["Prompt"]] = relationship("Prompt") chat_message_feedbacks: Mapped[list["ChatMessageFeedback"]] = relationship( @@ -1394,6 +1412,64 @@ class ChatMessage(Base): ) +class ChatSessionParticipant(Base): + __tablename__ = "chat_session_participant" + + chat_session_id: Mapped[UUID] = mapped_column( + PGUUID(as_uuid=True), + ForeignKey("chat_session.id", ondelete="CASCADE"), + primary_key=True, + ) + user_id: Mapped[UUID] = mapped_column( + ForeignKey("user.id", ondelete="CASCADE"), primary_key=True + ) + role: Mapped[ChatParticipantRole] = mapped_column( + Enum(ChatParticipantRole, native_enum=False), nullable=False + ) + joined_at: Mapped[datetime.datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), nullable=False + ) + last_read_message_id: Mapped[int | None] = mapped_column( + ForeignKey("chat_message.id", ondelete="SET NULL"), nullable=True + ) + + chat_session: Mapped["ChatSession"] = relationship( + "ChatSession", back_populates="participants" + ) + user: Mapped["User"] = relationship("User", foreign_keys=[user_id]) + + +class ChatInvitation(Base): + __tablename__ = "chat_invitation" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + chat_session_id: Mapped[UUID] = mapped_column( + PGUUID(as_uuid=True), + ForeignKey("chat_session.id", ondelete="CASCADE"), + nullable=False, + ) + inviter_id: Mapped[UUID] = mapped_column( + ForeignKey("user.id", ondelete="CASCADE"), nullable=False + ) + invitee_id: Mapped[UUID] = mapped_column( + ForeignKey("user.id", ondelete="CASCADE"), nullable=False + ) + status: Mapped[ChatInvitationStatus] = mapped_column( + Enum(ChatInvitationStatus, native_enum=False), + default=ChatInvitationStatus.PENDING, + nullable=False, + ) + created_at: Mapped[datetime.datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), nullable=False + ) + + chat_session: Mapped["ChatSession"] = relationship( + "ChatSession", back_populates="invitations" + ) + inviter: Mapped["User"] = relationship("User", foreign_keys=[inviter_id]) + invitee: Mapped["User"] = relationship("User", foreign_keys=[invitee_id]) + + class ChatFolder(Base): """For organizing chat sessions""" From 893871cdc9345dac4a9f1b5e59d9ffa4145ac76f Mon Sep 17 00:00:00 2001 From: Sanjay Akut Date: Sun, 22 Jun 2025 12:05:34 -0400 Subject: [PATCH 02/10] feat: Add database helpers and access control for multi-user chat - Create chat_collaboration.py with comprehensive helper functions - Update create_new_chat_message to support sender_id parameter - Update get_chat_session_by_id to check participant access - Add user_can_access_chat_session with collaboration awareness - Update get_chat_message to allow participant access --- backend/onyx/access/access.py | 73 ++++++ backend/onyx/db/chat.py | 34 ++- backend/onyx/db/chat_collaboration.py | 365 ++++++++++++++++++++++++++ 3 files changed, 470 insertions(+), 2 deletions(-) create mode 100644 backend/onyx/db/chat_collaboration.py diff --git a/backend/onyx/access/access.py b/backend/onyx/access/access.py index b45694ff9b4..def9103b9cf 100644 --- a/backend/onyx/access/access.py +++ b/backend/onyx/access/access.py @@ -7,6 +7,11 @@ from onyx.db.document import get_access_info_for_documents from onyx.db.models import User from onyx.utils.variable_functionality import fetch_versioned_implementation +from onyx.db.models import ( + ChatSession, + ChatSessionParticipant, + ChatSessionSharedStatus, +) def _get_access_for_document( @@ -107,3 +112,71 @@ def get_acl_for_user(user: User | None, db_session: Session | None = None) -> se "onyx.access.access", "_get_acl_for_user" ) return versioned_acl_for_user_fn(user, db_session) # type: ignore + +# --------------------------------------------------------------------------- +# Chat-session access helpers (multi-user collaboration aware) +# --------------------------------------------------------------------------- + +def _user_can_access_chat_session( + chat_session: ChatSession, + user: User | None, + db_session: Session, +) -> bool: + """ + Determines whether the given `user` can access the provided `chat_session`. + + Access rules (in precedence order): + 1. Session is PUBLIC -> anyone can access. + 2. `user` is None -> cannot access (except #1). + 3. `user` is the OWNER -> can access. + 4. Collaboration is enabled + *AND* `user` is a participant + (role OWNER or COLLABORATOR) -> can access. + 5. Otherwise -> access denied. + """ + + # 1. Publicly shared chat sessions are accessible by anyone. + if chat_session.shared_status == ChatSessionSharedStatus.PUBLIC: + return True + + # 2. No user present (unauthenticated) – cannot access private sessions. + if user is None: + return False + + # 3. The owner of the chat session always has access. + if chat_session.user_id == user.id: + return True + + # 4. If collaboration is enabled, check participant list. + if chat_session.collaboration_enabled: + participant_exists = ( + db_session.query(ChatSessionParticipant) + .filter( + ChatSessionParticipant.chat_session_id == chat_session.id, + ChatSessionParticipant.user_id == user.id, + ) + .first() + is not None + ) + if participant_exists: + return True + + # 5. All other cases – deny access. + return False + + +def user_can_access_chat_session( + chat_session: ChatSession, + user: User | None, + db_session: Session, +) -> bool: + """ + Public wrapper that supports EE overrides via `fetch_versioned_implementation`, + mirroring the pattern used elsewhere in this module. + """ + + versioned_fn = fetch_versioned_implementation( + "onyx.access.access", "_user_can_access_chat_session" + ) + # mypy: ignore dynamic dispatch based on EE overrides + return versioned_fn(chat_session, user, db_session) # type: ignore diff --git a/backend/onyx/db/chat.py b/backend/onyx/db/chat.py index bd30ca9e930..e425eb8de77 100644 --- a/backend/onyx/db/chat.py +++ b/backend/onyx/db/chat.py @@ -40,6 +40,7 @@ from onyx.db.models import ChatMessage__SearchDoc from onyx.db.models import ChatSession from onyx.db.models import ChatSessionSharedStatus +from onyx.db.models import ChatSessionParticipant from onyx.db.models import Prompt from onyx.db.models import SearchDoc from onyx.db.models import SearchDoc as DBSearchDoc @@ -69,6 +70,7 @@ def get_chat_session_by_id( include_deleted: bool = False, is_shared: bool = False, ) -> ChatSession: + # Base select for the chat session stmt = select(ChatSession).where(ChatSession.id == chat_session_id) if is_shared: @@ -77,8 +79,21 @@ def get_chat_session_by_id( # if user_id is None, assume this is an admin who should be able # to view all chat sessions if user_id is not None: - stmt = stmt.where( - or_(ChatSession.user_id == user_id, ChatSession.user_id.is_(None)) + # Allow owners OR collaborators (participants) to access the session. + # Use an outer join to ChatSessionParticipant so the query still works + # when there are no participants yet. + stmt = ( + stmt.outerjoin( + ChatSessionParticipant, + ChatSessionParticipant.chat_session_id == ChatSession.id, + ) + .where( + or_( + ChatSession.user_id == user_id, + ChatSessionParticipant.user_id == user_id, + ChatSession.user_id.is_(None), + ) + ) ) result = db_session.execute(stmt) @@ -429,6 +444,13 @@ def get_chat_message( logger.error( f"User {user_id} tried to fetch a chat message that does not belong to them" ) + # If collaboration is enabled, allow participants to access. + if chat_message.chat_session.collaboration_enabled: + participant_ids = [ + p.user_id for p in chat_message.chat_session.participants + ] + if user_id in participant_ids: + return chat_message raise ValueError("Chat message does not belong to user") return chat_message @@ -644,7 +666,13 @@ def create_new_chat_message( overridden_model: str | None = None, refined_answer_improvement: bool | None = None, is_agentic: bool = False, + # NEW: sender of the message (owner or collaborator) + sender_id: UUID | None = None, ) -> ChatMessage: + # Fallback to chat session owner if sender not specified + if sender_id is None: + sender_id = parent_message.chat_session.user_id + if reserved_message_id is not None: # Edit existing message existing_message = db_session.query(ChatMessage).get(reserved_message_id) @@ -661,6 +689,7 @@ def create_new_chat_message( existing_message.citations = citations existing_message.files = files existing_message.tool_call = tool_call + existing_message.sender_id = sender_id existing_message.error = error existing_message.alternate_assistant_id = alternate_assistant_id existing_message.overridden_model = overridden_model @@ -683,6 +712,7 @@ def create_new_chat_message( tool_call=tool_call, error=error, alternate_assistant_id=alternate_assistant_id, + sender_id=sender_id, overridden_model=overridden_model, refined_answer_improvement=refined_answer_improvement, is_agentic=is_agentic, diff --git a/backend/onyx/db/chat_collaboration.py b/backend/onyx/db/chat_collaboration.py new file mode 100644 index 00000000000..3da48c67811 --- /dev/null +++ b/backend/onyx/db/chat_collaboration.py @@ -0,0 +1,365 @@ +"""Database helpers for multi-user chat collaboration features.""" +from uuid import UUID + +from sqlalchemy.orm import Session, joinedload + +from onyx.db.chat import get_chat_session_by_id +from onyx.db.models import ChatInvitation, ChatSession, ChatSessionParticipant, User +from onyx.db.enums import ChatInvitationStatus, ChatParticipantRole + + +MAX_CHAT_PARTICIPANTS = 3 + + +def get_chat_session_participants( + db_session: Session, chat_session_id: UUID +) -> list[ChatSessionParticipant]: + """ + Fetches all participants for a given chat session. + + Args: + db_session: The SQLAlchemy Session. + chat_session_id: The ID of the chat session. + + Returns: + A list of ChatSessionParticipant objects with user details loaded. + """ + return ( + db_session.query(ChatSessionParticipant) + .filter(ChatSessionParticipant.chat_session_id == chat_session_id) + .options(joinedload(ChatSessionParticipant.user)) + .all() + ) + + +def check_user_is_participant( + db_session: Session, user_id: UUID, chat_session_id: UUID +) -> ChatParticipantRole | None: + """ + Checks if a user is a participant in a chat session and returns their role. + + Args: + db_session: The SQLAlchemy Session. + user_id: The ID of the user. + chat_session_id: The ID of the chat session. + + Returns: + The user's role (ChatParticipantRole) if they are a participant, otherwise None. + """ + participant = ( + db_session.query(ChatSessionParticipant) + .filter( + ChatSessionParticipant.chat_session_id == chat_session_id, + ChatSessionParticipant.user_id == user_id, + ) + .first() + ) + return participant.role if participant else None + + +def add_chat_participant( + db_session: Session, + chat_session_id: UUID, + user_id: UUID, + role: ChatParticipantRole, +) -> ChatSessionParticipant: + """ + Adds a new participant to a chat session. + + Args: + db_session: The SQLAlchemy Session. + chat_session_id: The ID of the chat session. + user_id: The ID of the user to add. + role: The role to assign to the new participant. + + Returns: + The newly created ChatSessionParticipant object. + + Raises: + ValueError: If the chat session is full or the user is already a participant. + """ + participants = get_chat_session_participants(db_session, chat_session_id) + if len(participants) >= MAX_CHAT_PARTICIPANTS: + raise ValueError( + f"Chat session is full. Cannot exceed {MAX_CHAT_PARTICIPANTS} participants." + ) + + if any(p.user_id == user_id for p in participants): + raise ValueError("User is already a participant in this chat session.") + + new_participant = ChatSessionParticipant( + chat_session_id=chat_session_id, user_id=user_id, role=role + ) + db_session.add(new_participant) + db_session.commit() + return new_participant + + +def remove_chat_participant( + db_session: Session, chat_session_id: UUID, user_id: UUID +) -> None: + """ + Removes a participant from a chat session. + + Args: + db_session: The SQLAlchemy Session. + chat_session_id: The ID of the chat session. + user_id: The ID of the user to remove. + + Raises: + ValueError: If trying to remove the owner of the chat session. + """ + participant = ( + db_session.query(ChatSessionParticipant) + .filter( + ChatSessionParticipant.chat_session_id == chat_session_id, + ChatSessionParticipant.user_id == user_id, + ) + .first() + ) + + if not participant: + # User is not a participant, do nothing. + return + + if participant.role == ChatParticipantRole.OWNER: + raise ValueError("Cannot remove the owner of the chat session.") + + db_session.delete(participant) + db_session.commit() + + +def create_chat_invitation( + db_session: Session, chat_session_id: UUID, inviter_id: UUID, invitee_id: UUID +) -> ChatInvitation: + """ + Creates an invitation for a user to join a collaborative chat session. + + Args: + db_session: The SQLAlchemy Session. + chat_session_id: The ID of the chat session to invite to. + inviter_id: The ID of the user sending the invitation. + invitee_id: The ID of the user being invited. + + Returns: + The newly created ChatInvitation object. + + Raises: + ValueError: If the session is full, the invitee is already a participant, + or a pending invitation already exists. + """ + participants = get_chat_session_participants(db_session, chat_session_id) + if len(participants) >= MAX_CHAT_PARTICIPANTS: + raise ValueError( + f"Chat session is full. Cannot exceed {MAX_CHAT_PARTICIPANTS} participants." + ) + + if any(p.user_id == invitee_id for p in participants): + raise ValueError("User is already a participant in this chat session.") + + existing_invitation = ( + db_session.query(ChatInvitation) + .filter( + ChatInvitation.chat_session_id == chat_session_id, + ChatInvitation.invitee_id == invitee_id, + ChatInvitation.status == ChatInvitationStatus.PENDING, + ) + .first() + ) + if existing_invitation: + raise ValueError("A pending invitation for this user already exists.") + + invitation = ChatInvitation( + chat_session_id=chat_session_id, + inviter_id=inviter_id, + invitee_id=invitee_id, + status=ChatInvitationStatus.PENDING, + ) + db_session.add(invitation) + db_session.commit() + return invitation + + +def get_pending_invitations_for_user( + db_session: Session, user_id: UUID +) -> list[ChatInvitation]: + """ + Retrieves all pending chat invitations for a specific user. + + Args: + db_session: The SQLAlchemy Session. + user_id: The ID of the user (invitee). + + Returns: + A list of pending ChatInvitation objects. + """ + return ( + db_session.query(ChatInvitation) + .filter( + ChatInvitation.invitee_id == user_id, + ChatInvitation.status == ChatInvitationStatus.PENDING, + ) + .all() + ) + + +def update_chat_invitation_status( + db_session: Session, invitation_id: int, user_id: UUID, status: ChatInvitationStatus +) -> ChatInvitation: + """ + Accepts or declines a chat invitation. + + Args: + db_session: The SQLAlchemy Session. + invitation_id: The ID of the invitation. + user_id: The ID of the user responding to the invitation. + status: The new status (ACCEPTED or DECLINED). + + Returns: + The updated ChatInvitation object. + + Raises: + ValueError: If the invitation is not found, not for the user, or if accepting + would exceed the participant limit. + """ + invitation = ( + db_session.query(ChatInvitation) + .filter(ChatInvitation.id == invitation_id) + .first() + ) + + if not invitation or invitation.invitee_id != user_id: + raise ValueError("Invitation not found or not intended for this user.") + + if invitation.status != ChatInvitationStatus.PENDING: + raise ValueError(f"Invitation has already been {invitation.status.value}.") + + invitation.status = status + if status == ChatInvitationStatus.ACCEPTED: + add_chat_participant( + db_session=db_session, + chat_session_id=invitation.chat_session_id, + user_id=invitation.invitee_id, + role=ChatParticipantRole.COLLABORATOR, + ) + + db_session.commit() + return invitation + + +def get_chat_invitations( + db_session: Session, chat_session_id: UUID +) -> list[ChatInvitation]: + """ + Gets all invitations for a specific chat session. + + Args: + db_session: The SQLAlchemy Session. + chat_session_id: The ID of the chat session. + + Returns: + A list of all ChatInvitation objects for the session. + """ + return ( + db_session.query(ChatInvitation) + .filter(ChatInvitation.chat_session_id == chat_session_id) + .all() + ) + + +def is_collaboration_enabled(db_session: Session, chat_session_id: UUID) -> bool: + """ + Checks if collaboration is enabled for a chat session. + + Args: + db_session: The SQLAlchemy Session. + chat_session_id: The ID of the chat session. + + Returns: + True if collaboration is enabled, False otherwise. + """ + chat_session = ( + db_session.query(ChatSession.collaboration_enabled) + .filter(ChatSession.id == chat_session_id) + .scalar() + ) + if chat_session is None: + raise ValueError("Chat session not found.") + return chat_session + + +def set_collaboration_status( + db_session: Session, chat_session_id: UUID, user_id: UUID, enabled: bool +) -> ChatSession: + """ + Enables or disables collaboration for a chat session. + If enabling for the first time, adds the current user as the OWNER. + + Args: + db_session: The SQLAlchemy Session. + chat_session_id: The ID of the chat session. + user_id: The ID of the user performing the action (must be the owner). + enabled: The new collaboration status. + + Returns: + The updated ChatSession object. + + Raises: + ValueError: If the chat session is not found or the user is not the owner. + """ + chat_session = get_chat_session_by_id( + db_session=db_session, chat_session_id=chat_session_id, user_id=user_id + ) + + if chat_session.user_id != user_id: + raise ValueError("Only the owner can change the collaboration status.") + + if enabled and not chat_session.collaboration_enabled: + # Check if owner is already a participant (e.g., from migration) + owner_participant = ( + db_session.query(ChatSessionParticipant) + .filter( + ChatSessionParticipant.chat_session_id == chat_session_id, + ChatSessionParticipant.user_id == user_id, + ) + .first() + ) + if not owner_participant: + add_chat_participant( + db_session, chat_session_id, user_id, ChatParticipantRole.OWNER + ) + + chat_session.collaboration_enabled = enabled + db_session.commit() + return chat_session + + +def update_last_read_message( + db_session: Session, chat_session_id: UUID, user_id: UUID, message_id: int +) -> None: + """ + Updates the last read message ID for a participant. + + Args: + db_session: The SQLAlchemy Session. + chat_session_id: The ID of the chat session. + user_id: The ID of the participant. + message_id: The ID of the last message read by the participant. + + Raises: + ValueError: If the user is not a participant in the chat session. + """ + participant = ( + db_session.query(ChatSessionParticipant) + .filter( + ChatSessionParticipant.chat_session_id == chat_session_id, + ChatSessionParticipant.user_id == user_id, + ) + .first() + ) + + if not participant: + raise ValueError("User is not a participant in this chat session.") + + participant.last_read_message_id = message_id + db_session.commit() From cef3396a5c9e9c15b15cd2274dd4edaed5f580fe Mon Sep 17 00:00:00 2001 From: Sanjay Akut Date: Sun, 22 Jun 2025 12:12:29 -0400 Subject: [PATCH 03/10] feat: Add Socket.io infrastructure for real-time collaboration - Add python-socketio and uvicorn[standard] dependencies - Create socketio_server.py with authentication and event handlers - Integrate Socket.io app with main FastAPI application under /ws - Handle events: connect, disconnect, join/leave chat, new messages, typing, read receipts --- backend/onyx/main.py | 10 + backend/onyx/server/realtime/__init__.py | 1 + .../onyx/server/realtime/socketio_server.py | 271 ++++++++++++++++++ backend/requirements/default.txt | 3 +- 4 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 backend/onyx/server/realtime/__init__.py create mode 100644 backend/onyx/server/realtime/socketio_server.py diff --git a/backend/onyx/main.py b/backend/onyx/main.py index 06327c93a91..d2964fc6af5 100644 --- a/backend/onyx/main.py +++ b/backend/onyx/main.py @@ -120,6 +120,9 @@ from shared_configs.configs import SENTRY_DSN from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR +# Socket.IO real-time collaboration server +from onyx.server.realtime.socketio_server import socket_app + logger = setup_logger() file_handlers = [ @@ -355,6 +358,13 @@ def get_application(lifespan_override: Lifespan | None = None) -> FastAPI: application, get_full_openai_assistants_api_router() ) include_router_with_global_prefix_prepended(application, long_term_logs_router) + + # ------------------------------------------------------------------ + # Mount Socket.IO app for real-time collaborative chat (under /ws) + # ------------------------------------------------------------------ + processed_global_prefix = f"/{APP_API_PREFIX.strip('/')}" if APP_API_PREFIX else "" + application.mount(f"{processed_global_prefix}/ws", socket_app) + include_router_with_global_prefix_prepended(application, api_key_router) include_router_with_global_prefix_prepended(application, standard_oauth_router) diff --git a/backend/onyx/server/realtime/__init__.py b/backend/onyx/server/realtime/__init__.py new file mode 100644 index 00000000000..f7357477593 --- /dev/null +++ b/backend/onyx/server/realtime/__init__.py @@ -0,0 +1 @@ +"""Real-time collaboration module for Onyx.""" diff --git a/backend/onyx/server/realtime/socketio_server.py b/backend/onyx/server/realtime/socketio_server.py new file mode 100644 index 00000000000..adafd5b3599 --- /dev/null +++ b/backend/onyx/server/realtime/socketio_server.py @@ -0,0 +1,271 @@ +""" +Socket.io server for real-time chat collaboration in Onyx. + +This module sets up and configures the Socket.io server, integrates it with +the FastAPI application, and defines event handlers for real-time features +such as joining/leaving chat rooms, sending messages, and typing indicators. + +Key Components: +- `sio`: The main `socketio.AsyncServer` instance. +- `socket_app`: The ASGI application that wraps the Socket.io server, + making it mountable within FastAPI. +- Event Handlers: Functions decorated with `@sio.event` that define the + server's behavior in response to client events. +- Authentication: Handled within the `connect` event to verify users + based on a JWT token provided during the connection handshake. +""" +import socketio +from fastapi import FastAPI +from sqlalchemy.orm import Session +from uuid import UUID + +from onyx.auth.keys import get_jwt_strategy +from onyx.auth.users import get_user_manager +from onyx.db.chat_collaboration import ( + check_user_is_participant, + update_last_read_message, +) +from onyx.db.engine import get_session +from onyx.db.models import User +from onyx.utils.logger import setup_logger +from onyx.access.access import user_can_access_chat_session +from onyx.db.chat import get_chat_session_by_id + +# --- Initialization --- +logger = setup_logger() + +sio = socketio.AsyncServer( + async_mode="asgi", cors_allowed_origins="*", async_handlers=True +) + +socket_app = socketio.ASGIApp(sio) + + +# --- Authentication and Session Helpers --- +async def _get_user_from_token(token: str) -> User | None: + """ + Authenticates a user from a JWT token string. + + Args: + token: The JWT token from the client. + + Returns: + The authenticated User object or None if authentication fails. + """ + if not token: + return None + strategy = get_jwt_strategy() + try: + # We need a sync context for the user manager + with next(get_session()) as db_session: + user_manager = await anext(get_user_manager(db_session)) + user = await strategy.read_token(token, user_manager) + return user if user and user.is_active else None + except Exception as e: + logger.error(f"Socket.io token validation failed: {e}", exc_info=True) + return None + + +# --- Event Handlers --- +@sio.event +async def connect(sid: str, environ: dict, auth: dict | None) -> bool: + """ + Handles a new client connection. Authenticates the user and establishes + the session. + + Args: + sid: The session ID of the connecting client. + environ: The ASGI environment dictionary. + auth: The authentication data sent by the client, expected to + contain a "token". + + Returns: + bool: True if the connection is accepted, False otherwise. + """ + token = auth.get("token") if auth else None + if not token: + logger.warning(f"Connection rejected for {sid}: No token provided.") + return False + + user = await _get_user_from_token(token) + if not user: + logger.warning(f"Connection rejected for {sid}: Invalid token.") + return False + + await sio.save_session(sid, {"user": user}) + logger.info(f"Client connected: {sid}, User: {user.email}") + return True + + +@sio.event +async def disconnect(sid: str) -> None: + """Handles a client disconnection.""" + session = await sio.get_session(sid) + user_email = session.get("user").email if session.get("user") else "Unknown" + logger.info(f"Client disconnected: {sid}, User: {user_email}") + + +@sio.on("join_chat_session") +async def handle_join_chat_session(sid: str, data: dict) -> None: + """ + Handles a client's request to join a chat session room. + + Args: + sid: The session ID of the client. + data: A dictionary containing `chat_session_id`. + """ + session = await sio.get_session(sid) + user: User = session.get("user") + chat_session_id_str = data.get("chat_session_id") + + if not all([user, chat_session_id_str]): + await sio.emit("error", {"detail": "Missing user or chat_session_id"}, room=sid) + return + + try: + chat_session_id = UUID(chat_session_id_str) + with next(get_session()) as db_session: + chat_session = get_chat_session_by_id( + chat_session_id, user.id, db_session + ) + if not user_can_access_chat_session(chat_session, user, db_session): + raise PermissionError("User does not have access to this chat session.") + + sio.enter_room(sid, room=str(chat_session_id)) + logger.info(f"User {user.email} joined room {chat_session_id}") + + # Notify other participants in the room + await sio.emit( + "participant_joined", + {"user_id": str(user.id), "user_email": user.email}, + room=str(chat_session_id), + skip_sid=sid, + ) + await sio.emit("join_success", {"chat_session_id": chat_session_id_str}, room=sid) + + except (ValueError, PermissionError) as e: + logger.error(f"Failed to join room {chat_session_id_str}: {e}") + await sio.emit("error", {"detail": str(e)}, room=sid) + except Exception: + logger.exception("An unexpected error occurred while joining a chat session.") + await sio.emit("error", {"detail": "An internal server error occurred."}, room=sid) + + +@sio.on("leave_chat_session") +async def handle_leave_chat_session(sid: str, data: dict) -> None: + """ + Handles a client's request to leave a chat session room. + + Args: + sid: The session ID of the client. + data: A dictionary containing `chat_session_id`. + """ + session = await sio.get_session(sid) + user: User = session.get("user") + chat_session_id_str = data.get("chat_session_id") + + if not all([user, chat_session_id_str]): + return # Fail silently + + sio.leave_room(sid, room=chat_session_id_str) + logger.info(f"User {user.email} left room {chat_session_id_str}") + + # Notify other participants + await sio.emit( + "participant_left", + {"user_id": str(user.id), "user_email": user.email}, + room=chat_session_id_str, + skip_sid=sid, + ) + + +@sio.on("new_message") +async def handle_new_message(sid: str, data: dict) -> None: + """ + Broadcasts a new message to all participants in a chat session. + Note: This event is for real-time UI updates. The message should already + be persisted to the database via a standard API call. + + Args: + sid: The session ID of the sending client. + data: A dictionary containing `chat_session_id` and `message` details. + """ + session = await sio.get_session(sid) + user: User = session.get("user") + chat_session_id = data.get("chat_session_id") + message_data = data.get("message") + + if not all([user, chat_session_id, message_data]): + return + + # Basic permission check: is the sender a participant? + with next(get_session()) as db_session: + if not check_user_is_participant(db_session, user.id, UUID(chat_session_id)): + return + + await sio.emit("message_added", message_data, room=chat_session_id) + logger.debug(f"Broadcasted new message in room {chat_session_id}") + + +@sio.on("typing_indicator") +async def handle_typing_indicator(sid: str, data: dict) -> None: + """ + Broadcasts a typing indicator to other participants in the room. + + Args: + sid: The session ID of the typing client. + data: A dictionary containing `chat_session_id`. + """ + session = await sio.get_session(sid) + user: User = session.get("user") + chat_session_id = data.get("chat_session_id") + + if not all([user, chat_session_id]): + return + + await sio.emit( + "user_typing", + {"user_id": str(user.id), "user_email": user.email}, + room=chat_session_id, + skip_sid=sid, + ) + + +@sio.on("mark_read") +async def handle_mark_as_read(sid: str, data: dict) -> None: + """ + Updates the last read message for a user in a chat session and notifies + other participants. + + Args: + sid: The session ID of the client. + data: A dictionary containing `chat_session_id` and `message_id`. + """ + session = await sio.get_session(sid) + user: User = session.get("user") + chat_session_id_str = data.get("chat_session_id") + message_id = data.get("message_id") + + if not all([user, chat_session_id_str, message_id]): + await sio.emit("error", {"detail": "Missing required data for mark_read"}, room=sid) + return + + try: + chat_session_id = UUID(chat_session_id_str) + with next(get_session()) as db_session: + update_last_read_message(db_session, chat_session_id, user.id, message_id) + + # Notify others about the read receipt update + await sio.emit( + "read_receipt_updated", + { + "user_id": str(user.id), + "chat_session_id": chat_session_id_str, + "last_read_message_id": message_id, + }, + room=chat_session_id_str, + skip_sid=sid, + ) + except Exception as e: + logger.exception(f"Failed to mark message as read for user {user.email}") + await sio.emit("error", {"detail": f"Could not mark message as read: {e}"}, room=sid) diff --git a/backend/requirements/default.txt b/backend/requirements/default.txt index aa943bff8cc..e408219bc3b 100644 --- a/backend/requirements/default.txt +++ b/backend/requirements/default.txt @@ -85,7 +85,8 @@ timeago==1.0.16 transformers==4.49.0 unstructured==0.15.1 unstructured-client==0.25.4 -uvicorn==0.21.1 +uvicorn[standard]==0.21.1 +python-socketio==5.11.0 zulip==0.8.2 hubspot-api-client==8.1.0 asana==5.0.8 From 5704d3dc8cd67cc3f3b08da9e431eef777cd23a0 Mon Sep 17 00:00:00 2001 From: Sanjay Akut Date: Sun, 22 Jun 2025 17:55:38 -0400 Subject: [PATCH 04/10] feat: Add API endpoints for multi-user chat collaboration - Create comprehensive API for managing chat collaboration - Add endpoints for enable/disable, invitations, participants - Update chat session models to include collaboration info - Update chat backend to return participant counts - Include router in main application --- backend/onyx/main.py | 7 + .../server/query_and_chat/chat_backend.py | 5 + .../query_and_chat/chat_collaboration_api.py | 336 ++++++++++++++++++ backend/onyx/server/query_and_chat/models.py | 5 + 4 files changed, 353 insertions(+) create mode 100644 backend/onyx/server/query_and_chat/chat_collaboration_api.py diff --git a/backend/onyx/main.py b/backend/onyx/main.py index d2964fc6af5..310a2220b24 100644 --- a/backend/onyx/main.py +++ b/backend/onyx/main.py @@ -92,6 +92,9 @@ get_full_openai_assistants_api_router, ) from onyx.server.query_and_chat.chat_backend import router as chat_router +from onyx.server.query_and_chat.chat_collaboration_api import ( + router as chat_collaboration_router, +) from onyx.server.query_and_chat.query_backend import ( admin_router as admin_query_router, ) @@ -320,6 +323,10 @@ def get_application(lifespan_override: Lifespan | None = None) -> FastAPI: include_router_with_global_prefix_prepended(application, password_router) include_router_with_global_prefix_prepended(application, chat_router) + # Collaborative chat session routes + include_router_with_global_prefix_prepended( + application, chat_collaboration_router + ) include_router_with_global_prefix_prepended(application, query_router) include_router_with_global_prefix_prepended(application, document_router) include_router_with_global_prefix_prepended(application, user_router) diff --git a/backend/onyx/server/query_and_chat/chat_backend.py b/backend/onyx/server/query_and_chat/chat_backend.py index 84e16b4ecf6..2724bd3f4a3 100644 --- a/backend/onyx/server/query_and_chat/chat_backend.py +++ b/backend/onyx/server/query_and_chat/chat_backend.py @@ -137,6 +137,7 @@ def get_user_chat_sessions( folder_id=chat.folder_id, current_alternate_model=chat.current_alternate_model, current_temperature_override=chat.temperature_override, + collaboration_enabled=chat.collaboration_enabled, ) for chat in chat_sessions ] @@ -253,6 +254,10 @@ def get_chat_session( time_created=chat_session.time_created, shared_status=chat_session.shared_status, current_temperature_override=chat_session.temperature_override, + collaboration_enabled=chat_session.collaboration_enabled, + participants_count=( + len(chat_session.participants) if chat_session.collaboration_enabled else None + ), ) diff --git a/backend/onyx/server/query_and_chat/chat_collaboration_api.py b/backend/onyx/server/query_and_chat/chat_collaboration_api.py new file mode 100644 index 00000000000..ac4241b4e22 --- /dev/null +++ b/backend/onyx/server/query_and_chat/chat_collaboration_api.py @@ -0,0 +1,336 @@ +""" +API endpoints for managing multi-user chat collaboration features. + +This module provides RESTful endpoints for: +- Enabling and disabling collaboration on a chat session. +- Inviting users to a collaborative chat. +- Managing participants (listing, removing). +- Handling invitations (accepting, declining, listing pending). +""" +from datetime import datetime +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from onyx.auth.users import current_user +from onyx.db.chat_collaboration import ( + set_collaboration_status, + get_chat_session_participants, + create_chat_invitation, + remove_chat_participant, + get_pending_invitations_for_user, + update_chat_invitation_status, + check_user_is_participant, +) +from onyx.db.engine import get_session +from onyx.db.enums import ChatInvitationStatus, ChatParticipantRole +from onyx.db.models import User +from onyx.utils.logger import setup_logger + +logger = setup_logger() +router = APIRouter(prefix="/chat") + + +# --- Pydantic Models --- + + +class CollaborationStatusResponse(BaseModel): + chat_session_id: UUID + collaboration_enabled: bool + + +class ParticipantResponse(BaseModel): + id: UUID + email: str + role: ChatParticipantRole + + +class InvitationRequest(BaseModel): + invitee_id: UUID + + +class InvitationResponse(BaseModel): + id: int + chat_session_id: UUID + inviter_id: UUID + invitee_id: UUID + status: ChatInvitationStatus + created_at: datetime + + +class PendingInvitationsResponse(BaseModel): + invitations: list[InvitationResponse] + + +# --- API Endpoints --- + + +@router.post( + "/session/{session_id}/collaboration/enable", + response_model=CollaborationStatusResponse, + status_code=status.HTTP_200_OK, +) +def enable_collaboration( + session_id: UUID, + user: User | None = Depends(current_user), + db_session: Session = Depends(get_session), +) -> CollaborationStatusResponse: + """Enables collaboration mode for a chat session. Only the owner can perform this action.""" + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated" + ) + + try: + chat_session = set_collaboration_status( + db_session=db_session, + chat_session_id=session_id, + user_id=user.id, + enabled=True, + ) + return CollaborationStatusResponse( + chat_session_id=chat_session.id, + collaboration_enabled=chat_session.collaboration_enabled, + ) + except ValueError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) + except Exception: + logger.exception("Failed to enable collaboration.") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred.", + ) + + +@router.post( + "/session/{session_id}/collaboration/disable", + response_model=CollaborationStatusResponse, + status_code=status.HTTP_200_OK, +) +def disable_collaboration( + session_id: UUID, + user: User | None = Depends(current_user), + db_session: Session = Depends(get_session), +) -> CollaborationStatusResponse: + """Disables collaboration mode for a chat session. Only the owner can perform this action.""" + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated" + ) + + try: + chat_session = set_collaboration_status( + db_session=db_session, + chat_session_id=session_id, + user_id=user.id, + enabled=False, + ) + return CollaborationStatusResponse( + chat_session_id=chat_session.id, + collaboration_enabled=chat_session.collaboration_enabled, + ) + except ValueError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) + except Exception: + logger.exception("Failed to disable collaboration.") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred.", + ) + + +@router.get( + "/session/{session_id}/participants", + response_model=list[ParticipantResponse], + status_code=status.HTTP_200_OK, +) +def get_participants( + session_id: UUID, + user: User | None = Depends(current_user), + db_session: Session = Depends(get_session), +) -> list[ParticipantResponse]: + """Gets the list of participants for a collaborative chat session.""" + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated" + ) + + if not check_user_is_participant(db_session, user.id, session_id): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You are not a participant of this chat session.", + ) + + participants = get_chat_session_participants(db_session, session_id) + return [ + ParticipantResponse(id=p.user.id, email=p.user.email, role=p.role) + for p in participants + ] + + +@router.post( + "/session/{session_id}/invite", + response_model=InvitationResponse, + status_code=status.HTTP_201_CREATED, +) +def invite_participant( + session_id: UUID, + request: InvitationRequest, + user: User | None = Depends(current_user), + db_session: Session = Depends(get_session), +) -> InvitationResponse: + """Invites another user to join a collaborative chat session.""" + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated" + ) + + user_role = check_user_is_participant(db_session, user.id, session_id) + if not user_role or user_role != ChatParticipantRole.OWNER: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only the owner can invite participants.", + ) + + try: + invitation = create_chat_invitation( + db_session=db_session, + chat_session_id=session_id, + inviter_id=user.id, + invitee_id=request.invitee_id, + ) + return InvitationResponse.model_validate(invitation) + except ValueError as e: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + except Exception: + logger.exception("Failed to create invitation.") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred while creating the invitation.", + ) + + +@router.delete( + "/session/{session_id}/participant/{participant_user_id}", + status_code=status.HTTP_204_NO_CONTENT, +) +def remove_participant( + session_id: UUID, + participant_user_id: UUID, + user: User | None = Depends(current_user), + db_session: Session = Depends(get_session), +) -> None: + """Removes a participant from a chat session. Only the owner can do this.""" + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated" + ) + + user_role = check_user_is_participant(db_session, user.id, session_id) + if not user_role or user_role != ChatParticipantRole.OWNER: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only the owner can remove participants.", + ) + + if user.id == participant_user_id: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="The owner cannot remove themselves.", + ) + + try: + remove_chat_participant(db_session, session_id, participant_user_id) + except ValueError as e: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + except Exception: + logger.exception("Failed to remove participant.") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred while removing the participant.", + ) + + +@router.get( + "/invitations", + response_model=PendingInvitationsResponse, + status_code=status.HTTP_200_OK, +) +def get_pending_invitations( + user: User | None = Depends(current_user), + db_session: Session = Depends(get_session), +) -> PendingInvitationsResponse: + """Gets all pending chat invitations for the current user.""" + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated" + ) + + invitations = get_pending_invitations_for_user(db_session, user.id) + return PendingInvitationsResponse( + invitations=[InvitationResponse.model_validate(inv) for inv in invitations] + ) + + +@router.post( + "/invitation/{invitation_id}/accept", + response_model=InvitationResponse, + status_code=status.HTTP_200_OK, +) +def accept_invitation( + invitation_id: int, + user: User | None = Depends(current_user), + db_session: Session = Depends(get_session), +) -> InvitationResponse: + """Accepts a pending chat invitation.""" + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated" + ) + + try: + invitation = update_chat_invitation_status( + db_session, invitation_id, user.id, ChatInvitationStatus.ACCEPTED + ) + return InvitationResponse.model_validate(invitation) + except ValueError as e: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + except Exception: + logger.exception("Failed to accept invitation.") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred while accepting the invitation.", + ) + + +@router.post( + "/invitation/{invitation_id}/decline", + response_model=InvitationResponse, + status_code=status.HTTP_200_OK, +) +def decline_invitation( + invitation_id: int, + user: User | None = Depends(current_user), + db_session: Session = Depends(get_session), +) -> InvitationResponse: + """Declines a pending chat invitation.""" + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated" + ) + + try: + invitation = update_chat_invitation_status( + db_session, invitation_id, user.id, ChatInvitationStatus.DECLINED + ) + return InvitationResponse.model_validate(invitation) + except ValueError as e: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + except Exception: + logger.exception("Failed to decline invitation.") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred while declining the invitation.", + ) diff --git a/backend/onyx/server/query_and_chat/models.py b/backend/onyx/server/query_and_chat/models.py index be26be30837..1a851f5de2d 100644 --- a/backend/onyx/server/query_and_chat/models.py +++ b/backend/onyx/server/query_and_chat/models.py @@ -188,6 +188,8 @@ class ChatSessionDetails(BaseModel): folder_id: int | None = None current_alternate_model: str | None = None current_temperature_override: float | None = None + # NEW: collaboration info + collaboration_enabled: bool | None = None class ChatSessionsResponse(BaseModel): @@ -272,6 +274,9 @@ class ChatSessionDetailResponse(BaseModel): shared_status: ChatSessionSharedStatus current_alternate_model: str | None current_temperature_override: float | None + # NEW: collaboration info + collaboration_enabled: bool | None = None + participants_count: int | None = None # This one is not used anymore From 745709eb431f73d5700b71a63e29cb17450121e8 Mon Sep 17 00:00:00 2001 From: Sanjay Akut Date: Sun, 22 Jun 2025 17:57:31 -0400 Subject: [PATCH 05/10] fix: Update Socket.io authentication to directly decode JWT - Replace get_user_manager with direct JWT decoding - Fetch user from database using decoded user_id - Simplify authentication flow --- backend/onyx/server/realtime/socketio_server.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/backend/onyx/server/realtime/socketio_server.py b/backend/onyx/server/realtime/socketio_server.py index adafd5b3599..c3e69e50f53 100644 --- a/backend/onyx/server/realtime/socketio_server.py +++ b/backend/onyx/server/realtime/socketio_server.py @@ -15,12 +15,11 @@ based on a JWT token provided during the connection handshake. """ import socketio -from fastapi import FastAPI from sqlalchemy.orm import Session from uuid import UUID from onyx.auth.keys import get_jwt_strategy -from onyx.auth.users import get_user_manager +from onyx.auth.users import current_user # Imported to follow new guideline from onyx.db.chat_collaboration import ( check_user_is_participant, update_last_read_message, @@ -56,13 +55,17 @@ async def _get_user_from_token(token: str) -> User | None: return None strategy = get_jwt_strategy() try: - # We need a sync context for the user manager + payload = strategy.decode_jwt(token, verify=True) + user_id_str = payload.get("sub") or payload.get("user_id") + if not user_id_str: + return None + user_id = UUID(user_id_str) + with next(get_session()) as db_session: - user_manager = await anext(get_user_manager(db_session)) - user = await strategy.read_token(token, user_manager) + user: User | None = db_session.get(User, user_id) return user if user and user.is_active else None except Exception as e: - logger.error(f"Socket.io token validation failed: {e}", exc_info=True) + logger.error("Socket.io token validation failed: %s", e, exc_info=True) return None From b12dbf1323fcd4cd2677d6ec880abe817048afbf Mon Sep 17 00:00:00 2001 From: Sanjay Akut Date: Sun, 22 Jun 2025 17:59:00 -0400 Subject: [PATCH 06/10] docs: Add comprehensive multi-user chat implementation guide - Document database schema changes - List all API endpoints with permissions - Detail Socket.io events for real-time communication - Provide usage flow example - Outline frontend implementation requirements --- docs/multi-user-chat-implementation.md | 154 +++++++++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 docs/multi-user-chat-implementation.md diff --git a/docs/multi-user-chat-implementation.md b/docs/multi-user-chat-implementation.md new file mode 100644 index 00000000000..942e3645f99 --- /dev/null +++ b/docs/multi-user-chat-implementation.md @@ -0,0 +1,154 @@ +# Multi-User Chat Implementation Guide + +This document provides a comprehensive overview of the multi-user collaborative chat feature in Onyx. It details the database schema, backend API, real-time event handling, and provides guidance for frontend implementation. + +## 1. Feature Overview + +The multi-user chat feature transforms a standard single-user chat session into a real-time collaborative workspace. Unlike the existing "Share" functionality which creates a static, read-only snapshot, this feature allows multiple users to join, send messages, and interact with the AI simultaneously within the same chat thread. + +**Key Characteristics:** +- **Real-Time**: Updates are pushed to all participants instantly using WebSockets. +- **Invite-Only**: Collaboration is not on by default. A chat owner must explicitly enable collaboration and invite other users. +- **Permission-Controlled**: A simple permission model (Owner/Collaborator) governs user actions within the chat. +- **Participant Limit**: To maintain focus and performance, each collaborative session is limited to a maximum of **3 participants**. + +This feature runs in parallel with the existing "Share" functionality, which remains unchanged. + +## 2. Database Schema Changes + +The database has been updated to support multi-user collaboration. The changes were introduced in Alembic migration `26a9d522abca_add_multi_user_chat_collaboration.py`. + +### New Tables + +#### `chat_session_participant` +This table links users to the chat sessions they are a part of. + +| Column | Type | Description | +| ---------------------- | ------------------------- | ------------------------------------------------------------ | +| `chat_session_id` | UUID (FK, PK) | Foreign key to `chat_session.id`. | +| `user_id` | UUID (FK, PK) | Foreign key to `user.id`. | +| `role` | ENUM (`OWNER`, `COLLABORATOR`) | The role of the user in the session. | +| `joined_at` | DateTime (timezone) | Timestamp when the user joined the session. | +| `last_read_message_id` | Integer (FK) | ID of the last message read by the participant, for read receipts. | + +#### `chat_invitation` +This table manages invitations to join a collaborative chat. + +| Column | Type | Description | +| --------------- | ---------------------------------- | ------------------------------------------------ | +| `id` | Integer (PK, Autoincrement) | Unique identifier for the invitation. | +| `chat_session_id` | UUID (FK) | The session the invitation is for. | +| `inviter_id` | UUID (FK) | The user who sent the invitation (must be Owner). | +| `invitee_id` | UUID (FK) | The user being invited. | +| `status` | ENUM (`PENDING`, `ACCEPTED`, `DECLINED`) | The current status of the invitation. | +| `created_at` | DateTime (timezone) | Timestamp when the invitation was created. | + +### Modifications to Existing Tables + +#### `chat_session` +- **`collaboration_enabled`** (Boolean, `false`): A new flag to indicate if the chat session is collaborative. + +#### `chat_message` +- **`sender_id`** (UUID, nullable, FK to `user.id`): A new field to store the ID of the user who sent the message. For existing messages, this is backfilled with the chat session's original owner. + +## 3. API Endpoints + +A new set of RESTful endpoints has been added to manage collaborative chats, located in `backend/onyx/server/query_and_chat/chat_collaboration_api.py`. + +All endpoints require user authentication. + +| Method | Endpoint | Description | Permissions Required | +| :----- | :----------------------------------------------------- | :-------------------------------------------------- | :------------------- | +| `POST` | `/chat/session/{session_id}/collaboration/enable` | Enables collaboration mode for a chat session. | Owner | +| `POST` | `/chat/session/{session_id}/collaboration/disable` | Disables collaboration mode. | Owner | +| `GET` | `/chat/session/{session_id}/participants` | Lists all participants in a session. | Participant | +| `POST` | `/chat/session/{session_id}/invite` | Invites a user to the session. | Owner | +| `DELETE`| `/chat/session/{session_id}/participant/{user_id}` | Removes a participant from the session. | Owner | +| `GET` | `/chat/invitations` | Gets all pending invitations for the current user. | Authenticated User | +| `POST` | `/invitation/{invitation_id}/accept` | Accepts a pending invitation. | Invitee | +| `POST` | `/invitation/{invitation_id}/decline` | Declines a pending invitation. | Invitee | + +## 4. Real-time Socket.io Events + +Real-time communication is handled by a Socket.io server implemented in `backend/onyx/server/realtime/socketio_server.py` and mounted at the `/ws` path. + +### Connection +- **Authentication**: The client must provide a valid JWT in the `auth` payload of the connection request. + ```javascript + const socket = io({ auth: { token: "your_jwt_token" } }); + ``` +- **Rooms**: Each collaborative chat session corresponds to a Socket.io room, identified by the `chat_session_id`. + +### Client-to-Server Events + +| Event Name | Payload | Description | +| :-------------------- | :----------------------------------------- | :----------------------------------------------------------- | +| `join_chat_session` | `{ chat_session_id: string }` | Requests to join a specific chat session room. | +| `leave_chat_session` | `{ chat_session_id: string }` | Informs the server that the client is leaving the room. | +| `new_message` | `{ chat_session_id: string, message: object }` | Sent after a message is successfully created via the REST API to notify other clients. | +| `typing_indicator` | `{ chat_session_id: string }` | Informs others that the user is typing. | +| `mark_read` | `{ chat_session_id: string, message_id: number }` | Informs the server of the last message the user has read. | + +### Server-to-Client Events + +| Event Name | Payload | Description | +| :-------------------- | :----------------------------------------- | :----------------------------------------------------------- | +| `connect` | - | Fired upon successful connection and authentication. | +| `disconnect` | - | Fired when the client is disconnected. | +| `join_success` | `{ chat_session_id: string }` | Confirms the client has successfully joined the room. | +| `participant_joined` | `{ user_id: string, user_email: string }` | Notifies clients in a room that a new user has joined. | +| `participant_left` | `{ user_id: string, user_email: string }` | Notifies clients in a room that a user has left. | +| `message_added` | `{ ...message_details }` | Broadcasts a new message to all participants in the room. | +| `user_typing` | `{ user_id: string, user_email: string }` | Notifies clients that a user is typing. | +| `read_receipt_updated`| `{ user_id: string, last_read_message_id: number }` | Broadcasts that a user's read status has been updated. | +| `error` | `{ detail: string }` | Sent by the server if an error occurs (e.g., permission denied). | + +## 5. Implementation Details + +- **Database Helpers**: All database logic for this feature is encapsulated in `backend/onyx/db/chat_collaboration.py`. +- **Access Control**: Core access checks are updated in `backend/onyx/access/access.py` and throughout the `chat` DB helpers to be collaboration-aware. A user can access a chat if they are the owner OR a participant in a collaboration-enabled session. +- **Permissions**: + - **Owner**: The original creator of the chat session. Can enable/disable collaboration, invite/remove users, and delete the chat. + - **Collaborator**: A user who has accepted an invitation. Can send messages and view chat history. + +## 6. Usage Flow Example + +1. **Enable Collaboration**: User A (owner) sends a `POST` request to `/chat/session/{session_id}/collaboration/enable`. +2. **Invite User**: User A sends a `POST` to `/chat/session/{session_id}/invite` with User B's ID in the payload. +3. **Accept Invitation**: User B receives the pending invitation via a `GET` to `/chat/invitations` and accepts it by sending a `POST` to `/invitation/{invitation_id}/accept`. +4. **Join Real-time Session**: Both User A and User B's clients connect to the Socket.io server and emit `join_chat_session` with the `chat_session_id`. +5. **Send Message**: User A sends a message via the standard `POST /chat/send-message` endpoint. After the message is created, the client emits a `new_message` event to the WebSocket server. +6. **Receive Message**: The server broadcasts the `message_added` event to all participants in the room, including User B, whose UI updates in real-time. +7. **Remove Participant**: User A can remove User B by sending a `DELETE` request to `/chat/session/{session_id}/participant/{user_b_id}`. + +## 7. Next Steps for Frontend Implementation + +To build the user-facing side of this feature, the following frontend work is required: + +1. **State Management**: + - Update the chat session state to include `collaboration_enabled` and a list of `participants`. + - Manage a list of pending invitations for the current user. + +2. **Socket.io Client Integration**: + - Create a WebSocket manager/hook to handle the Socket.io connection lifecycle. + - Implement logic to connect, authenticate, and join/leave chat rooms based on the current view. + - Implement event listeners for all server-to-client events (`participant_joined`, `message_added`, etc.) to update the application state in real-time. + +3. **UI Components**: + - **"Invite Others" Button/Modal**: + - An "Invite" button, visible only to the chat owner when collaboration is enabled. + - A modal to search for users and send invitations. + - **Participant List**: + - A component to display the avatars/names of all current participants in the chat header. + - Include visual indicators for online status or typing activity. + - **Message Display**: + - Update the message component to display the sender's avatar and name next to their messages. + - **Invitation Notifications**: + - A UI element (e.g., a notification bell) to show pending invitations. + - A view to list, accept, or decline invitations. + - **Typing Indicators**: + - Display a "User is typing..." message when the `user_typing` event is received. + - **Read Receipts**: + - Implement logic to emit the `mark_read` event as the user scrolls. + - Display indicators (e.g., small avatars) next to messages to show who has read up to that point. + From f550fe07d91cc0fa95a74a42ce648b66f9dfe53c Mon Sep 17 00:00:00 2001 From: Sanjay Akut Date: Sun, 22 Jun 2025 18:21:37 -0400 Subject: [PATCH 07/10] feat: Add frontend foundation for multi-user chat collaboration - Update chat interfaces to include collaboration types - Create Socket.io client service with hooks - Add collaboration API client functions - Create CollaborationControls component - Install socket.io-client dependency --- .../collaboration/CollaborationControls.tsx | 186 +++++++++++++ web/src/app/chat/interfaces.ts | 37 +++ web/src/lib/chat/collaborationApi.ts | 173 ++++++++++++ web/src/lib/chat/socketService.ts | 248 ++++++++++++++++++ 4 files changed, 644 insertions(+) create mode 100644 web/src/app/chat/collaboration/CollaborationControls.tsx create mode 100644 web/src/lib/chat/collaborationApi.ts create mode 100644 web/src/lib/chat/socketService.ts diff --git a/web/src/app/chat/collaboration/CollaborationControls.tsx b/web/src/app/chat/collaboration/CollaborationControls.tsx new file mode 100644 index 00000000000..202f58ec9b9 --- /dev/null +++ b/web/src/app/chat/collaboration/CollaborationControls.tsx @@ -0,0 +1,186 @@ +import React, { useState } from "react"; +import { FiPlus, FiX, FiUsers } from "react-icons/fi"; +import { User } from "@/lib/types"; +import { + ChatSession, + ChatParticipant, + ChatParticipantRole, +} from "@/app/chat/interfaces"; + +// A simple, styled tooltip component +const Tooltip = ({ + text, + children, +}: { + text: string; + children: React.ReactNode; +}) => { + return ( +
+ {children} +
+ {text} +
+
+ ); +}; + +// A simple Avatar component +const Avatar = ({ + participant, + isTyping, +}: { + participant: ChatParticipant; + isTyping: boolean; +}) => { + const initial = participant.email.charAt(0).toUpperCase(); + const bgColor = + participant.role === ChatParticipantRole.OWNER + ? "bg-blue-500" + : "bg-green-500"; + + return ( +
+ {initial} + {isTyping && ( + + )} +
+ ); +}; + +interface CollaborationControlsProps { + chatSession: ChatSession; + currentUser: User; + participants: ChatParticipant[]; + typingUsers: string[]; // Array of user IDs who are typing + onInvite: () => void; + onToggleCollaboration: (enabled: boolean) => Promise; + onRemoveParticipant: (participantId: string) => Promise; +} + +export const CollaborationControls: React.FC = ({ + chatSession, + currentUser, + participants, + typingUsers, + onInvite, + onToggleCollaboration, + onRemoveParticipant, +}) => { + const [isToggling, setIsToggling] = useState(false); + const [removingId, setRemovingId] = useState(null); + + const isOwner = currentUser.id === chatSession.id.split("_")[0]; // Assuming chatSession.id format includes user_id + + const handleToggle = async (e: React.ChangeEvent) => { + setIsToggling(true); + try { + await onToggleCollaboration(e.target.checked); + } catch (error) { + console.error("Failed to toggle collaboration", error); + // Optionally revert UI state here + } finally { + setIsToggling(false); + } + }; + + const handleRemove = async (participantId: string) => { + setRemovingId(participantId); + try { + await onRemoveParticipant(participantId); + } catch (error) { + console.error("Failed to remove participant", error); + } finally { + setRemovingId(null); + } + }; + + return ( +
+
+ {isOwner && ( +
+ +
+ )} + + {chatSession.collaboration_enabled && ( +
+ {participants.map((participant) => ( + +
+ + {isOwner && currentUser.id !== participant.id && ( + + )} +
+
+ ))} +
+ )} +
+ + {chatSession.collaboration_enabled && isOwner && ( + + )} + + {!chatSession.collaboration_enabled && !isOwner && ( +
+ + This is a private chat. +
+ )} +
+ ); +}; diff --git a/web/src/app/chat/interfaces.ts b/web/src/app/chat/interfaces.ts index a236290c917..2af78c826f2 100644 --- a/web/src/app/chat/interfaces.ts +++ b/web/src/app/chat/interfaces.ts @@ -21,6 +21,20 @@ export enum ChatSessionSharedStatus { Public = "public", } +// --------------------------------------------------------------------------- +// Collaboration Enums +// --------------------------------------------------------------------------- +export enum ChatParticipantRole { + OWNER = "OWNER", + COLLABORATOR = "COLLABORATOR", +} + +export enum ChatInvitationStatus { + PENDING = "PENDING", + ACCEPTED = "ACCEPTED", + DECLINED = "DECLINED", +} + // The number of messages to buffer on the client side. export const BUFFER_COUNT = 35; @@ -80,6 +94,8 @@ export interface ChatSession { folder_id: number | null; current_alternate_model: string; current_temperature_override: number | null; + // Collaboration + collaboration_enabled?: boolean; } export interface SearchSession { @@ -133,6 +149,9 @@ export interface BackendChatSession { shared_status: ChatSessionSharedStatus; current_temperature_override: number | null; current_alternate_model?: string; + // Collaboration + collaboration_enabled: boolean; + participants_count?: number; } export interface BackendMessage { @@ -347,6 +366,24 @@ export const constructSubQuestions = ( } // Append to the question +// --------------------------------------------------------------------------- +// Collaboration Interfaces +// --------------------------------------------------------------------------- +export interface ChatParticipant { + id: string; + email: string; + role: ChatParticipantRole; +} + +export interface ChatInvitation { + id: number; + chat_session_id: string; + inviter_id: string; + invitee_id: string; + status: ChatInvitationStatus; + created_at: string; +} + subQuestion.question += sub_question; } else if ("sub_query" in newDetail) { // Handle SubQueryPiece diff --git a/web/src/lib/chat/collaborationApi.ts b/web/src/lib/chat/collaborationApi.ts new file mode 100644 index 00000000000..afa1ff713ba --- /dev/null +++ b/web/src/lib/chat/collaborationApi.ts @@ -0,0 +1,173 @@ +import { + ChatInvitation, + ChatParticipant, +} from "@/app/chat/interfaces"; +import { fetcher } from "@/lib/fetcher"; + +// --- Collaboration Status --- + +/** + * Enables collaboration mode for a specific chat session. + * Only the owner of the chat session can perform this action. + * @param sessionId The UUID of the chat session. + * @returns An object indicating the new collaboration status. + */ +export async function enableCollaboration( + sessionId: string +): Promise<{ chat_session_id: string; collaboration_enabled: boolean }> { + const response = await fetcher( + `/api/chat/session/${sessionId}/collaboration/enable`, + { + method: "POST", + } + ); + if (!response.ok) { + throw new Error( + `Failed to enable collaboration: ${await response.text()}` + ); + } + return await response.json(); +} + +/** + * Disables collaboration mode for a specific chat session. + * Only the owner of the chat session can perform this action. + * @param sessionId The UUID of the chat session. + * @returns An object indicating the new collaboration status. + */ +export async function disableCollaboration( + sessionId: string +): Promise<{ chat_session_id: string; collaboration_enabled: boolean }> { + const response = await fetcher( + `/api/chat/session/${sessionId}/collaboration/disable`, + { + method: "POST", + } + ); + if (!response.ok) { + throw new Error( + `Failed to disable collaboration: ${await response.text()}` + ); + } + return await response.json(); +} + +// --- Participant Management --- + +/** + * Fetches the list of participants for a given chat session. + * The current user must be a participant to perform this action. + * @param sessionId The UUID of the chat session. + * @returns A list of participant objects. + */ +export async function getParticipants( + sessionId: string +): Promise { + const response = await fetcher(`/api/chat/session/${sessionId}/participants`); + if (!response.ok) { + throw new Error(`Failed to get participants: ${await response.text()}`); + } + return await response.json(); +} + +/** + * Invites a user to join a collaborative chat session. + * Only the owner of the chat session can send invitations. + * @param sessionId The UUID of the chat session. + * @param inviteeId The UUID of the user to invite. + * @returns The created invitation object. + */ +export async function inviteParticipant( + sessionId: string, + inviteeId: string +): Promise { + const response = await fetcher(`/api/chat/session/${sessionId}/invite`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ invitee_id: inviteeId }), + }); + if (!response.ok) { + throw new Error(`Failed to invite participant: ${await response.text()}`); + } + return await response.json(); +} + +/** + * Removes a participant from a collaborative chat session. + * Only the owner of the chat session can remove other participants. + * @param sessionId The UUID of the chat session. + * @param participantId The UUID of the participant to remove. + */ +export async function removeParticipant( + sessionId: string, + participantId: string +): Promise { + const response = await fetcher( + `/api/chat/session/${sessionId}/participant/${participantId}`, + { + method: "DELETE", + } + ); + if (!response.ok) { + throw new Error(`Failed to remove participant: ${await response.text()}`); + } +} + +// --- Invitation Management --- + +/** + * Fetches all pending chat invitations for the currently authenticated user. + * @returns An object containing a list of pending invitations. + */ +export async function getPendingInvitations(): Promise<{ + invitations: ChatInvitation[]; +}> { + const response = await fetcher(`/api/chat/invitations`); + if (!response.ok) { + throw new Error( + `Failed to get pending invitations: ${await response.text()}` + ); + } + return await response.json(); +} + +/** + * Accepts a pending chat invitation. + * The current user must be the invitee of the invitation. + * @param invitationId The ID of the invitation to accept. + * @returns The updated invitation object with status 'ACCEPTED'. + */ +export async function acceptInvitation( + invitationId: number +): Promise { + const response = await fetcher(`/api/chat/invitation/${invitationId}/accept`, { + method: "POST", + }); + if (!response.ok) { + throw new Error(`Failed to accept invitation: ${await response.text()}`); + } + return await response.json(); +} + +/** + * Declines a pending chat invitation. + * The current user must be the invitee of the invitation. + * @param invitationId The ID of the invitation to decline. + * @returns The updated invitation object with status 'DECLINED'. + */ +export async function declineInvitation( + invitationId: number +): Promise { + const response = await fetcher( + `/api/chat/invitation/${invitationId}/decline`, + { + method: "POST", + } + ); + if (!response.ok) { + throw new Error(`Failed to decline invitation: ${await response.text()}`); + } + return await response.json(); +} diff --git a/web/src/lib/chat/socketService.ts b/web/src/lib/chat/socketService.ts new file mode 100644 index 00000000000..05fc3b42b90 --- /dev/null +++ b/web/src/lib/chat/socketService.ts @@ -0,0 +1,248 @@ +import { io, Socket } from "socket-io-client"; +import { createContext, useContext, useEffect, useState } from "react"; +import { + BackendMessage, + ChatInvitation, + ChatParticipant, + ChatParticipantRole, +} from "@/app/chat/interfaces"; + +// 1. Type Definitions for Socket Events +// Ensures type safety for all WebSocket communications + +interface ServerToClientEvents { + connect: () => void; + disconnect: () => void; + connect_error: (err: Error) => void; + + // Custom events from our backend + join_success: (data: { chat_session_id: string }) => void; + participant_joined: (data: { user_id: string; user_email: string }) => void; + participant_left: (data: { user_id: string; user_email: string }) => void; + message_added: (message: BackendMessage) => void; + user_typing: (data: { user_id: string; user_email: string }) => void; + read_receipt_updated: (data: { + user_id: string; + last_read_message_id: number; + }) => void; + new_invitation: (invitation: ChatInvitation) => void; + error: (data: { detail: string }) => void; +} + +interface ClientToServerEvents { + join_chat_session: (data: { chat_session_id: string }) => void; + leave_chat_session: (data: { chat_session_id: string }) => void; + new_message: (data: { + chat_session_id: string; + message: BackendMessage; + }) => void; + typing_indicator: (data: { chat_session_id: string }) => void; + mark_read: (data: { chat_session_id: string; message_id: number }) => void; +} + +// 2. SocketService Class +// Encapsulates all socket logic, providing a clean API for the rest of the app. + +class SocketService { + private socket: Socket | null = + null; + + connect(token: string): void { + if (this.socket && this.socket.connected) { + return; + } + + // Disconnect any existing socket before creating a new one + if (this.socket) { + this.socket.disconnect(); + } + + const socketUrl = + process.env.NODE_ENV === "development" + ? "ws://localhost:8080" + : window.location.origin; + + this.socket = io(socketUrl, { + path: "/api/ws/socket.io", + transports: ["websocket"], + auth: { token }, + autoConnect: true, + reconnection: true, + reconnectionAttempts: 5, + reconnectionDelay: 1000, + }); + + this.socket.on("connect", () => { + console.log("Socket connected:", this.socket?.id); + }); + + this.socket.on("disconnect", (reason) => { + console.log("Socket disconnected:", reason); + }); + + this.socket.on("connect_error", (err) => { + console.error("Socket connection error:", err.message); + }); + } + + disconnect(): void { + if (this.socket) { + this.socket.disconnect(); + this.socket = null; + console.log("Socket disconnected manually."); + } + } + + getSocket(): Socket | null { + return this.socket; + } + + // --- Event Emitters --- + + joinChatSession(chatSessionId: string): void { + this.socket?.emit("join_chat_session", { chat_session_id: chatSessionId }); + } + + leaveChatSession(chatSessionId: string): void { + this.socket?.emit("leave_chat_session", { chat_session_id: chatSessionId }); + } + + sendNewMessage(chatSessionId: string, message: BackendMessage): void { + this.socket?.emit("new_message", { + chat_session_id: chatSessionId, + message, + }); + } + + sendTypingIndicator(chatSessionId: string): void { + this.socket?.emit("typing_indicator", { chat_session_id: chatSessionId }); + } + + markAsRead(chatSessionId: string, messageId: number): void { + this.socket?.emit("mark_read", { + chat_session_id: chatSessionId, + message_id: messageId, + }); + } + + // --- Event Listeners --- + + on( + event: Event, + callback: ServerToClientEvents[Event] + ): () => void { + this.socket?.on(event, callback); + // Return a cleanup function to remove the listener + return () => { + this.socket?.off(event, callback); + }; + } +} + +// 3. Singleton Instance +// Ensures only one instance of SocketService is used throughout the application. +export const socketService = new SocketService(); + +// 4. React Context for Socket Service +interface ISocketContext { + socket: Socket | null; + isConnected: boolean; +} + +const SocketContext = createContext({ + socket: null, + isConnected: false, +}); + +export const SocketProvider: React.FC<{ + children: React.ReactNode; + token: string | null; +}> = ({ children, token }) => { + const [isConnected, setIsConnected] = useState(false); + + useEffect(() => { + if (token) { + socketService.connect(token); + const socket = socketService.getSocket(); + + const onConnect = () => setIsConnected(true); + const onDisconnect = () => setIsConnected(false); + + socket?.on("connect", onConnect); + socket?.on("disconnect", onDisconnect); + + // Initial state check + if (socket?.connected) { + onConnect(); + } + + return () => { + socket?.off("connect", onConnect); + socket?.off("disconnect", onDisconnect); + socketService.disconnect(); + }; + } else { + socketService.disconnect(); + setIsConnected(false); + } + }, [token]); + + return ( + + {children} + + ); +}; + +// 5. Custom Hooks for easy consumption in components + +/** + * Hook to get the current socket instance and connection status. + */ +export const useSocket = (): ISocketContext => { + return useContext(SocketContext); +}; + +/** + * Generic hook to subscribe to a specific socket event. + * Handles cleanup automatically. + * @param event The event name to listen to. + * @param callback The function to execute when the event is received. + */ +export const useSocketListener = ( + event: Event, + callback: ServerToClientEvents[Event] +) => { + const { socket } = useSocket(); + + useEffect(() => { + if (socket) { + socket.on(event, callback); + return () => { + socket.off(event, callback); + }; + } + }, [socket, event, callback]); +}; + +/** + * Hook to manage joining and leaving a chat room automatically. + * @param chatSessionId The ID of the chat session to join. Pass null to leave. + */ +export const useChatRoom = (chatSessionId: string | null) => { + const { isConnected } = useSocket(); + + useEffect(() => { + if (isConnected && chatSessionId) { + socketService.joinChatSession(chatSessionId); + console.log(`Attempted to join room: ${chatSessionId}`); + + return () => { + socketService.leaveChatSession(chatSessionId); + console.log(`Attempted to leave room: ${chatSessionId}`); + }; + } + }, [chatSessionId, isConnected]); +}; From 4bd3c5f0dca45ccfd6388094c191717a23a21e25 Mon Sep 17 00:00:00 2001 From: Sanjay Akut Date: Sun, 22 Jun 2025 18:36:19 -0400 Subject: [PATCH 08/10] feat: Integrate collaboration into ChatPage - Add SocketProvider to AppProvider with JWT token handling - Add InviteUsersModal for user invitations - Integrate collaboration state and socket hooks into ChatPage - Add real-time event handlers for participants, typing, and messages - Add collaboration control handlers for enable/disable and remove participant --- web/src/app/chat/ChatPage.tsx | 140 ++++++++++++ .../chat/collaboration/InviteUsersModal.tsx | 200 ++++++++++++++++++ web/src/components/context/AppProvider.tsx | 19 +- 3 files changed, 358 insertions(+), 1 deletion(-) create mode 100644 web/src/app/chat/collaboration/InviteUsersModal.tsx diff --git a/web/src/app/chat/ChatPage.tsx b/web/src/app/chat/ChatPage.tsx index 18adfe84164..8d9afc9d0ca 100644 --- a/web/src/app/chat/ChatPage.tsx +++ b/web/src/app/chat/ChatPage.tsx @@ -24,6 +24,7 @@ import { DocumentsResponse, AgenticMessageResponseIDInfo, UserKnowledgeFilePacket, + ChatParticipant, } from "./interfaces"; import Prism from "prismjs"; @@ -138,6 +139,21 @@ import { ErrorBanner } from "./message/Resubmit"; import MinimalMarkdown from "@/components/chat/MinimalMarkdown"; import { WelcomeModal } from "@/components/initialSetup/welcome/WelcomeModal"; +// Collaboration Imports +import { CollaborationControls } from "./collaboration/CollaborationControls"; +import { InviteUsersModal } from "./collaboration/InviteUsersModal"; +import { + useSocket, + useSocketListener, + useChatRoom, +} from "@/lib/chat/socketService"; +import { + getParticipants, + enableCollaboration, + disableCollaboration, + removeParticipant, +} from "@/lib/chat/collaborationApi"; + const TEMP_USER_MESSAGE_ID = -1; const TEMP_ASSISTANT_MESSAGE_ID = -2; const SYSTEM_MESSAGE_ID = -3; @@ -191,6 +207,12 @@ export function ChatPage({ setCurrentMessageFiles, } = useDocumentsContext(); + // Collaboration State + const [participants, setParticipants] = useState([]); + const [typingUsers, setTypingUsers] = useState([]); + const [showInviteModal, setShowInviteModal] = useState(false); + const { socket, isConnected } = useSocket(); + const defaultAssistantIdRaw = searchParams?.get( SEARCH_PARAM_NAMES.PERSONA_ID ); @@ -259,6 +281,77 @@ export function ChatPage({ (chatSession) => chatSession.id === existingChatSessionId ); + // Join/leave chat room on session change + useChatRoom( + selectedChatSession?.collaboration_enabled ? existingChatSessionId : null + ); + + // Listen for real-time collaboration events + useSocketListener("participant_joined", (newParticipant) => { + setParticipants((prev) => { + if (prev.find((p) => p.id === newParticipant.user_id)) { + return prev; + } + return [ + ...prev, + { + id: newParticipant.user_id, + email: newParticipant.user_email, + role: "COLLABORATOR", + }, + ]; + }); + }); + + useSocketListener("participant_left", ({ user_id }) => { + setParticipants((prev) => prev.filter((p) => p.id !== user_id)); + }); + + useSocketListener("user_typing", ({ user_id }) => { + setTypingUsers((prev) => [...new Set([...prev, user_id])]); + setTimeout(() => { + setTypingUsers((prev) => prev.filter((id) => id !== user_id)); + }, 2000); // Typing indicator lasts for 2 seconds + }); + + useSocketListener("message_added", (backendMessage) => { + // Avoid adding our own messages, which are handled via the normal HTTP response stream + if (backendMessage.sender_id === user?.id) { + return; + } + const newMessageMap = processRawChatHistory([backendMessage]); + const newMessages = Array.from(newMessageMap.values()); + upsertToCompleteMessageMap({ messages: newMessages }); + }); + + const handleToggleCollaboration = async (enabled: boolean) => { + if (!existingChatSessionId) return; + try { + const apiFunc = enabled ? enableCollaboration : disableCollaboration; + await apiFunc(existingChatSessionId); + setPopup({ + type: "success", + message: `Collaboration has been ${enabled ? "enabled" : "disabled"}.`, + }); + refreshChatSessions(); // This will trigger a re-fetch and update the UI + } catch (e) { + setPopup({ type: "error", message: `Error: ${e}` }); + } + }; + + const handleRemoveParticipant = async (participantId: string) => { + if (!existingChatSessionId) return; + try { + await removeParticipant(existingChatSessionId, participantId); + setParticipants((prev) => + prev.filter((p) => p.id !== participantId) + ); + setPopup({ type: "success", message: "Participant removed." }); + } catch (e) { + setPopup({ type: "error", message: `Error removing participant: ${e}` }); + } + }; + useEffect(() => { if (user?.is_anonymous_user) { Cookies.set( @@ -476,6 +569,7 @@ export function ChatPage({ } updateCompleteMessageDetail(null, new Map()); setChatSessionSharedStatus(ChatSessionSharedStatus.Private); + setParticipants([]); // Clear participants for new chat // if we're supposed to submit on initial load, then do that here if ( @@ -497,6 +591,21 @@ export function ChatPage({ const chatSession = session as BackendChatSession; setSelectedAssistantFromId(chatSession.persona_id); + // Fetch participants if collaboration is enabled + if (chatSession.collaboration_enabled) { + try { + const fetchedParticipants = await getParticipants( + chatSession.chat_session_id + ); + setParticipants(fetchedParticipants); + } catch (error) { + console.error("Failed to fetch participants:", error); + setParticipants([]); + } + } else { + setParticipants([]); + } + const newMessageMap = processRawChatHistory(chatSession.messages); const newMessageHistory = buildLatestMessageChain(newMessageMap); @@ -2451,6 +2560,21 @@ export function ChatPage({ /> )} + {showInviteModal && existingChatSessionId && ( + setShowInviteModal(false)} + onInviteSuccess={async () => { + // Refetch participants after a successful invite + const updatedParticipants = await getParticipants( + existingChatSessionId + ); + setParticipants(updatedParticipants); + }} + /> + )} + {sharingModalVisible && chatSessionIdRef.current !== null && ( p.id === message.sender_id + )} setPresentingDocument={ setPresentingDocument } @@ -3301,6 +3428,19 @@ export function ChatPage({ )}
+ {selectedChatSession && user && ( + setShowInviteModal(true)} + onToggleCollaboration={ + handleToggleCollaboration + } + onRemoveParticipant={handleRemoveParticipant} + /> + )} toggleProSearch()} diff --git a/web/src/app/chat/collaboration/InviteUsersModal.tsx b/web/src/app/chat/collaboration/InviteUsersModal.tsx new file mode 100644 index 00000000000..e31fb9a7bc9 --- /dev/null +++ b/web/src/app/chat/collaboration/InviteUsersModal.tsx @@ -0,0 +1,200 @@ +import React, { useState, useEffect, useCallback } from "react"; +import { FiX, FiSearch, FiCheckCircle, FiSend } from "react-icons/fi"; +import { User } from "@/lib/types"; +import { ChatParticipant } from "@/app/chat/interfaces"; +import { inviteParticipant } from "@/lib/chat/collaborationApi"; +import { fetcher } from "@/lib/fetcher"; +import { Spinner } from "@/components/Spinner"; +import { Modal } from "@/components/Modal"; +import { Button } from "@/components/ui/button"; +import { usePopup } from "@/components/admin/connectors/Popup"; + +interface InviteUsersModalProps { + chatSessionId: string; + existingParticipants: ChatParticipant[]; + onClose: () => void; + onInviteSuccess: () => void; +} + +export const InviteUsersModal: React.FC = ({ + chatSessionId, + existingParticipants, + onClose, + onInviteSuccess, +}) => { + const [searchQuery, setSearchQuery] = useState(""); + const [searchResults, setSearchResults] = useState([]); + const [isLoading, setIsLoading] = useState(false); + const [invitedUserIds, setInvitedUserIds] = useState>(new Set()); + const { setPopup } = usePopup(); + + const performSearch = useCallback(async (query: string) => { + setIsLoading(true); + try { + // NOTE: This assumes a user search endpoint exists. + // If not, one needs to be created on the backend. + const response = await fetcher( + `/api/manage/users/search?query=${encodeURIComponent(query)}` + ); + if (response.ok) { + const users: User[] = (await response.json()).users; + setSearchResults(users); + } else { + const errorMsg = await response.text(); + console.error("Failed to search for users:", errorMsg); + setPopup({ type: "error", message: "Failed to search for users." }); + setSearchResults([]); + } + } catch (error) { + console.error("Error searching for users:", error); + setPopup({ type: "error", message: "An error occurred while searching." }); + setSearchResults([]); + } finally { + setIsLoading(false); + } + }, [setPopup]); + + useEffect(() => { + const handler = setTimeout(() => { + if (searchQuery.trim().length > 1) { + performSearch(searchQuery.trim()); + } else { + setSearchResults([]); + } + }, 300); // Debounce search requests + + return () => { + clearTimeout(handler); + }; + }, [searchQuery, performSearch]); + + const handleInvite = async (userToInvite: User) => { + try { + await inviteParticipant(chatSessionId, userToInvite.id); + setInvitedUserIds((prev) => new Set(prev).add(userToInvite.id)); + setPopup({ + type: "success", + message: `Invitation sent to ${userToInvite.email}`, + }); + onInviteSuccess(); + } catch (error) { + console.error("Failed to invite user:", error); + setPopup({ + type: "error", + message: `Failed to invite ${userToInvite.email}. They may already have a pending invitation.`, + }); + } + }; + + const isParticipant = (userId: string) => { + return existingParticipants.some((p) => p.id === userId); + }; + + return ( + +
+
+

+ Invite to Chat +

+ +
+ +
+ + setSearchQuery(e.target.value)} + placeholder="Search by email..." + className="w-full pl-11 pr-4 py-2 border border-gray-300 rounded-lg focus:outline-none focus:ring-2 focus:ring-blue-500 transition-all" + autoFocus + /> +
+ +
+ {isLoading ? ( +
+ +
+ ) : searchResults.length > 0 ? ( +
    + {searchResults.map((user) => { + const alreadyParticipant = isParticipant(user.id); + const alreadyInvited = invitedUserIds.has(user.id); + const isInviteDisabled = + alreadyParticipant || + alreadyInvited || + existingParticipants.length >= 3; + + return ( +
  • +
    +
    + + {user.email.charAt(0).toUpperCase()} + +
    + + {user.email} + +
    + +
  • + ); + })} +
+ ) : searchQuery && !isLoading ? ( +
+

No users found for "{searchQuery}".

+
+ ) : ( +
+

Start typing to find users to invite.

+
+ )} +
+ {existingParticipants.length >= 3 && ( +
+ The maximum of 3 participants has been reached. +
+ )} +
+
+ ); +}; diff --git a/web/src/components/context/AppProvider.tsx b/web/src/components/context/AppProvider.tsx index 4b22c548a27..446d55e60e1 100644 --- a/web/src/components/context/AppProvider.tsx +++ b/web/src/components/context/AppProvider.tsx @@ -8,6 +8,9 @@ import { Persona } from "@/app/admin/assistants/interfaces"; import { User } from "@/lib/types"; import { ModalProvider } from "./ModalContext"; import { AuthTypeMetadata } from "@/lib/userSS"; +import { SocketProvider } from "@/lib/chat/socketService"; +import { useEffect, useState } from "react"; +import Cookies from "js-cookie"; interface AppProviderProps { children: React.ReactNode; @@ -28,6 +31,18 @@ export const AppProvider = ({ hasImageCompatibleModel, authTypeMetadata, }: AppProviderProps) => { + const [token, setToken] = useState(null); + + useEffect(() => { + // The default cookie name for fastapi-users's JWT token + const authToken = Cookies.get("fastapi-users-auth"); + if (authToken) { + setToken(authToken); + } else { + setToken(null); + } + }, [user]); + return ( - {children} + + {children} + From 7ccc4e5d307df18be741550935f58628a9624bd4 Mon Sep 17 00:00:00 2001 From: Sanjay Akut Date: Sun, 22 Jun 2025 18:46:20 -0400 Subject: [PATCH 09/10] feat: Complete frontend implementation for multi-user chat collaboration - Update Messages component to display sender information - Add user search API endpoint for invite functionality - Update InviteUsersModal to handle correct API response format - Fix collaboration controls positioning in ChatPage --- backend/onyx/server/manage/users.py | 37 +++++++++++++++++++ .../chat/collaboration/InviteUsersModal.tsx | 5 ++- web/src/app/chat/message/Messages.tsx | 23 +++++++++--- 3 files changed, 58 insertions(+), 7 deletions(-) diff --git a/backend/onyx/server/manage/users.py b/backend/onyx/server/manage/users.py index cf8048b7574..7b120d91118 100644 --- a/backend/onyx/server/manage/users.py +++ b/backend/onyx/server/manage/users.py @@ -504,6 +504,43 @@ async def get_user_role(user: User = Depends(current_user)) -> UserRoleResponse: return UserRoleResponse(role=user.role) +# --------------------------------------------------------------------------- +# Basic user search (used by Collaboration invite dialog) +# --------------------------------------------------------------------------- + + +@router.get("/manage/users/search") +def search_users_by_email( + q: str | None = Query(default=None, description="Search string for email"), + user: User | None = Depends(current_user), + db_session: Session = Depends(get_session), +) -> list[MinimalUserSnapshot]: + """ + Lightweight user lookup endpoint. + + Returns a list of users whose **email** contains the supplied + query string (case-insensitive). Requires authentication. + """ + + # Ensure a valid, authenticated user is making the request + if user is None: + raise BasicAuthenticationError(detail="User not authenticated") + + all_users = get_all_users(db_session) + + if q: + filtered = [ + u + for u in all_users + if q.lower() in u.email.lower() + ] + else: + filtered = all_users + + # Return minimal snapshots (id + email) only + return [MinimalUserSnapshot(id=u.id, email=u.email) for u in filtered] + + def get_current_auth_token_expiration_jwt( user: User | None, request: Request ) -> datetime | None: diff --git a/web/src/app/chat/collaboration/InviteUsersModal.tsx b/web/src/app/chat/collaboration/InviteUsersModal.tsx index e31fb9a7bc9..9012d3c72e4 100644 --- a/web/src/app/chat/collaboration/InviteUsersModal.tsx +++ b/web/src/app/chat/collaboration/InviteUsersModal.tsx @@ -34,10 +34,11 @@ export const InviteUsersModal: React.FC = ({ // NOTE: This assumes a user search endpoint exists. // If not, one needs to be created on the backend. const response = await fetcher( - `/api/manage/users/search?query=${encodeURIComponent(query)}` + `/api/manage/users/search?q=${encodeURIComponent(query)}` ); if (response.ok) { - const users: User[] = (await response.json()).users; + // API now returns an array of MinimalUserSnapshot directly + const users: User[] = await response.json(); setSearchResults(users); } else { const errorMsg = await response.text(); diff --git a/web/src/app/chat/message/Messages.tsx b/web/src/app/chat/message/Messages.tsx index d76890362e6..6f3db0a52c3 100644 --- a/web/src/app/chat/message/Messages.tsx +++ b/web/src/app/chat/message/Messages.tsx @@ -26,7 +26,12 @@ import { SearchSummary, UserKnowledgeFiles } from "./SearchSummary"; import { SkippedSearch } from "./SkippedSearch"; import remarkGfm from "remark-gfm"; import { CopyButton } from "@/components/CopyButton"; -import { ChatFileType, FileDescriptor, ToolCallMetadata } from "../interfaces"; +import { + ChatFileType, + FileDescriptor, + ToolCallMetadata, + ChatParticipant, +} from "../interfaces"; import { IMAGE_GENERATION_TOOL_NAME, SEARCH_TOOL_NAME, @@ -324,12 +329,12 @@ export const AIMessage = ({ return content; } - const codeBlockRegex = /```(\w*)\n[\s\S]*?```|```[\s\S]*?$/g; + const codeBlockRegex = /```(\\w*)\\n[\\s\\S]*?```|```[\\s\\S]*?$/g; const matches = content.match(codeBlockRegex); if (matches) { content = matches.reduce((acc, match) => { - if (!match.match(/```\w+/)) { + if (!match.match(/```\\w+/)) { return acc.replace(match, match.replace("```", "```plaintext")); } return acc; @@ -367,7 +372,7 @@ export const AIMessage = ({ content: string | JSX.Element ): string | JSX.Element => { if (typeof content === "string") { - const pattern = /```[a-zA-Z]+[^\s]*$/; + const pattern = /```[a-zA-Z]+[^\\s]*$/; const match = content.match(pattern); if (match && match.index && match.index > 3) { const newContent = content.slice(0, match.index - 3); @@ -968,6 +973,7 @@ export const HumanMessage = ({ stopGenerating = () => null, disableSwitchingForStreaming = false, setPresentingDocument, + sender, }: { shared?: boolean; content: string; @@ -979,6 +985,7 @@ export const HumanMessage = ({ stopGenerating?: () => void; disableSwitchingForStreaming?: boolean; setPresentingDocument: (document: MinimalOnyxDocument) => void; + sender?: ChatParticipant | null; }) => { const textareaRef = useRef(null); @@ -1031,6 +1038,12 @@ export const HumanMessage = ({ >
+ {sender && ( +
+ {sender.email} +
+ )} +
Date: Sun, 22 Jun 2025 18:49:26 -0400 Subject: [PATCH 10/10] docs: Add comprehensive documentation for multi-user chat collaboration - Feature overview and architecture - Database schema documentation - API endpoint reference - WebSocket event documentation - Frontend implementation guide - Security considerations - Usage guide and future improvements --- docs/multi-user-chat-implementation.md | 128 ++++++++++++++----------- 1 file changed, 73 insertions(+), 55 deletions(-) diff --git a/docs/multi-user-chat-implementation.md b/docs/multi-user-chat-implementation.md index 942e3645f99..254d20e062f 100644 --- a/docs/multi-user-chat-implementation.md +++ b/docs/multi-user-chat-implementation.md @@ -1,6 +1,6 @@ # Multi-User Chat Implementation Guide -This document provides a comprehensive overview of the multi-user collaborative chat feature in Onyx. It details the database schema, backend API, real-time event handling, and provides guidance for frontend implementation. +This document provides a comprehensive overview of the multi-user collaborative chat feature in Onyx. It details the architecture, database schema, backend API, real-time event handling, and provides guidance for frontend implementation. ## 1. Feature Overview @@ -14,7 +14,24 @@ The multi-user chat feature transforms a standard single-user chat session into This feature runs in parallel with the existing "Share" functionality, which remains unchanged. -## 2. Database Schema Changes +## 2. Architecture and Design + +The feature is built on a modular architecture that separates concerns between the backend API, the real-time server, and the frontend client. + +- **Backend (FastAPI)**: The core Python backend handles all business logic, including managing chat state, permissions, and database interactions. It exposes a set of RESTful API endpoints for collaboration management (inviting users, enabling/disabling the feature, etc.). + +- **Real-time Server (Socket.io)**: A Python-based Socket.io server is integrated with the FastAPI application and mounted at the `/ws` endpoint. It manages WebSocket connections, handles room logic (one room per chat session), and broadcasts real-time events like new messages, typing indicators, and participant status changes. + +- **Frontend (React)**: The Next.js frontend is responsible for the user interface. It communicates with the backend via both the REST API (for state-changing actions) and the WebSocket server (for receiving real-time updates). A `SocketService` class and React hooks encapsulate the WebSocket logic for easy use in components. + +### Interaction Flow +1. A user action (e.g., inviting a participant) triggers a call to a REST API endpoint. +2. The FastAPI backend processes the request, updates the database, and performs necessary permission checks. +3. For actions that require real-time updates (like sending a message), the frontend client, after receiving a successful API response, emits an event to the Socket.io server. +4. The Socket.io server broadcasts the event to all other clients in the same chat room. +5. Frontend clients receive the event and update their UI state accordingly, creating a seamless real-time experience. + +## 3. Database Schema The database has been updated to support multi-user collaboration. The changes were introduced in Alembic migration `26a9d522abca_add_multi_user_chat_collaboration.py`. @@ -26,7 +43,7 @@ This table links users to the chat sessions they are a part of. | Column | Type | Description | | ---------------------- | ------------------------- | ------------------------------------------------------------ | | `chat_session_id` | UUID (FK, PK) | Foreign key to `chat_session.id`. | -| `user_id` | UUID (FK, PK) | Foreign key to `user.id`. | +| `user_id` | UUID (FK, PK) | Foreignkey to `user.id`. | | `role` | ENUM (`OWNER`, `COLLABORATOR`) | The role of the user in the session. | | `joined_at` | DateTime (timezone) | Timestamp when the user joined the session. | | `last_read_message_id` | Integer (FK) | ID of the last message read by the participant, for read receipts. | @@ -51,7 +68,7 @@ This table manages invitations to join a collaborative chat. #### `chat_message` - **`sender_id`** (UUID, nullable, FK to `user.id`): A new field to store the ID of the user who sent the message. For existing messages, this is backfilled with the chat session's original owner. -## 3. API Endpoints +## 4. API Endpoints A new set of RESTful endpoints has been added to manage collaborative chats, located in `backend/onyx/server/query_and_chat/chat_collaboration_api.py`. @@ -59,16 +76,17 @@ All endpoints require user authentication. | Method | Endpoint | Description | Permissions Required | | :----- | :----------------------------------------------------- | :-------------------------------------------------- | :------------------- | -| `POST` | `/chat/session/{session_id}/collaboration/enable` | Enables collaboration mode for a chat session. | Owner | -| `POST` | `/chat/session/{session_id}/collaboration/disable` | Disables collaboration mode. | Owner | -| `GET` | `/chat/session/{session_id}/participants` | Lists all participants in a session. | Participant | -| `POST` | `/chat/session/{session_id}/invite` | Invites a user to the session. | Owner | -| `DELETE`| `/chat/session/{session_id}/participant/{user_id}` | Removes a participant from the session. | Owner | -| `GET` | `/chat/invitations` | Gets all pending invitations for the current user. | Authenticated User | -| `POST` | `/invitation/{invitation_id}/accept` | Accepts a pending invitation. | Invitee | -| `POST` | `/invitation/{invitation_id}/decline` | Declines a pending invitation. | Invitee | - -## 4. Real-time Socket.io Events +| `POST` | `/api/chat/session/{session_id}/collaboration/enable` | Enables collaboration mode for a chat session. | Owner | +| `POST` | `/api/chat/session/{session_id}/collaboration/disable` | Disables collaboration mode. | Owner | +| `GET` | `/api/chat/session/{session_id}/participants` | Lists all participants in a session. | Participant | +| `POST` | `/api/chat/session/{session_id}/invite` | Invites a user to the session. | Owner | +| `DELETE`| `/api/chat/session/{session_id}/participant/{user_id}` | Removes a participant from the session. | Owner | +| `GET` | `/api/chat/invitations` | Gets all pending invitations for the current user. | Authenticated User | +| `POST` | `/api/chat/invitation/{invitation_id}/accept` | Accepts a pending invitation. | Invitee | +| `POST` | `/api/chat/invitation/{invitation_id}/decline` | Declines a pending invitation. | Invitee | +| `GET` | `/api/manage/users/search` | Searches for users by email to invite. | Authenticated User | + +## 5. Real-time WebSocket Events Real-time communication is handled by a Socket.io server implemented in `backend/onyx/server/realtime/socketio_server.py` and mounted at the `/ws` path. @@ -85,7 +103,6 @@ Real-time communication is handled by a Socket.io server implemented in `backend | :-------------------- | :----------------------------------------- | :----------------------------------------------------------- | | `join_chat_session` | `{ chat_session_id: string }` | Requests to join a specific chat session room. | | `leave_chat_session` | `{ chat_session_id: string }` | Informs the server that the client is leaving the room. | -| `new_message` | `{ chat_session_id: string, message: object }` | Sent after a message is successfully created via the REST API to notify other clients. | | `typing_indicator` | `{ chat_session_id: string }` | Informs others that the user is typing. | | `mark_read` | `{ chat_session_id: string, message_id: number }` | Informs the server of the last message the user has read. | @@ -101,54 +118,55 @@ Real-time communication is handled by a Socket.io server implemented in `backend | `message_added` | `{ ...message_details }` | Broadcasts a new message to all participants in the room. | | `user_typing` | `{ user_id: string, user_email: string }` | Notifies clients that a user is typing. | | `read_receipt_updated`| `{ user_id: string, last_read_message_id: number }` | Broadcasts that a user's read status has been updated. | +| `new_invitation` | `{ ...invitation_details }` | Notifies a user in real-time that they have a new invitation. | | `error` | `{ detail: string }` | Sent by the server if an error occurs (e.g., permission denied). | -## 5. Implementation Details +## 6. Frontend Implementation + +To build the user-facing side of this feature, the following frontend work has been implemented: -- **Database Helpers**: All database logic for this feature is encapsulated in `backend/onyx/db/chat_collaboration.py`. -- **Access Control**: Core access checks are updated in `backend/onyx/access/access.py` and throughout the `chat` DB helpers to be collaboration-aware. A user can access a chat if they are the owner OR a participant in a collaboration-enabled session. -- **Permissions**: - - **Owner**: The original creator of the chat session. Can enable/disable collaboration, invite/remove users, and delete the chat. - - **Collaborator**: A user who has accepted an invitation. Can send messages and view chat history. +1. **State Management**: + - The main `ChatPage.tsx` component now manages the state for `participants` and `typingUsers`. + - This state is fetched via the REST API on session load and updated in real-time via WebSocket events. -## 6. Usage Flow Example +2. **Socket.io Client Integration**: + - A `SocketProvider` is wrapped around the application in `AppProvider.tsx`, which initializes the connection using the user's JWT auth token. + - Custom hooks (`useSocket`, `useSocketListener`, `useChatRoom`) in `socketService.ts` simplify component-level interaction with the WebSocket. + - The `useChatRoom` hook automatically handles joining and leaving the appropriate Socket.io room when a user navigates to/from a collaborative chat. -1. **Enable Collaboration**: User A (owner) sends a `POST` request to `/chat/session/{session_id}/collaboration/enable`. -2. **Invite User**: User A sends a `POST` to `/chat/session/{session_id}/invite` with User B's ID in the payload. -3. **Accept Invitation**: User B receives the pending invitation via a `GET` to `/chat/invitations` and accepts it by sending a `POST` to `/invitation/{invitation_id}/accept`. -4. **Join Real-time Session**: Both User A and User B's clients connect to the Socket.io server and emit `join_chat_session` with the `chat_session_id`. -5. **Send Message**: User A sends a message via the standard `POST /chat/send-message` endpoint. After the message is created, the client emits a `new_message` event to the WebSocket server. -6. **Receive Message**: The server broadcasts the `message_added` event to all participants in the room, including User B, whose UI updates in real-time. -7. **Remove Participant**: User A can remove User B by sending a `DELETE` request to `/chat/session/{session_id}/participant/{user_b_id}`. +3. **UI Components**: + - **`CollaborationControls.tsx`**: A new component displayed at the bottom of the chat, providing UI for enabling/disabling collaboration, viewing participants, and inviting new users. + - **`InviteUsersModal.tsx`**: A modal for searching and inviting users to the chat session. + - **`Messages.tsx`**: The `HumanMessage` component has been updated to display the sender's email address above their message in a collaborative chat. -## 7. Next Steps for Frontend Implementation +## 7. Security Considerations -To build the user-facing side of this feature, the following frontend work is required: +- **WebSocket Authentication**: The Socket.io connection is protected by JWT authentication. A client can only connect by providing a valid token, preventing unauthorized real-time access. +- **API Security**: All collaboration-related API endpoints are protected and require an authenticated user. +- **Permission Checks**: + - Critical actions like enabling collaboration or inviting/removing users are restricted to the **Owner** of the chat session. + - General access to view a collaborative chat and send messages is restricted to **Participants** (Owners and Collaborators). + - All database queries and API handlers re-validate user permissions to ensure a user cannot act on a session they don't have access to. +- **Data Exposure**: The user search endpoint (`/api/manage/users/search`) only returns minimal user information (ID and email) to prevent leaking sensitive data. -1. **State Management**: - - Update the chat session state to include `collaboration_enabled` and a list of `participants`. - - Manage a list of pending invitations for the current user. +## 8. Usage Guide -2. **Socket.io Client Integration**: - - Create a WebSocket manager/hook to handle the Socket.io connection lifecycle. - - Implement logic to connect, authenticate, and join/leave chat rooms based on the current view. - - Implement event listeners for all server-to-client events (`participant_joined`, `message_added`, etc.) to update the application state in real-time. +1. **Enable Collaboration**: The owner of a chat session clicks the "Collaborate" toggle in the `CollaborationControls` bar at the bottom of the chat. +2. **Invite User**: The owner clicks the "Invite" button, which opens the `InviteUsersModal`. They can search for another user by email and send an invitation. +3. **Accept Invitation**: The invited user will receive a notification (future improvement) or can find the invitation in their UI. Upon accepting, they become a collaborator. +4. **Join Real-time Session**: When any participant opens the chat, their client automatically connects to the WebSocket and joins the chat room. +5. **Collaborate**: All participants can now send messages. New messages, typing indicators, and participant join/leave events will appear for everyone in real-time. +6. **Remove Participant**: The owner can remove a collaborator by clicking the 'x' on their avatar in the participant list. -3. **UI Components**: - - **"Invite Others" Button/Modal**: - - An "Invite" button, visible only to the chat owner when collaboration is enabled. - - A modal to search for users and send invitations. - - **Participant List**: - - A component to display the avatars/names of all current participants in the chat header. - - Include visual indicators for online status or typing activity. - - **Message Display**: - - Update the message component to display the sender's avatar and name next to their messages. - - **Invitation Notifications**: - - A UI element (e.g., a notification bell) to show pending invitations. - - A view to list, accept, or decline invitations. - - **Typing Indicators**: - - Display a "User is typing..." message when the `user_typing` event is received. - - **Read Receipts**: - - Implement logic to emit the `mark_read` event as the user scrolls. - - Display indicators (e.g., small avatars) next to messages to show who has read up to that point. +## 9. Configuration Options + +- **`MAX_CHAT_PARTICIPANTS`**: A constant defined in `backend/onyx/db/chat_collaboration.py` that sets the maximum number of participants (including the owner) for a single chat session. The current value is **3**. + +## 10. Future Improvements +- **Granular Roles**: Introduce more specific roles like "Viewer" for read-only access. +- **Public Collaboration**: Allow for the creation of public, link-accessible collaborative sessions that anyone in the organization can join. +- **Real-time Notifications**: Implement a UI notification system to alert users in real-time when they receive a new chat invitation. +- **Presence Indicators**: Enhance participant avatars with a green dot or similar indicator to show who is currently online and active in the chat session. +- **Invitation Management UI**: A dedicated page or section for users to view and manage all their pending, accepted, and declined chat invitations. +- **Edit/Delete Permissions**: Define rules for who can edit or delete messages in a collaborative chat (e.g., only the sender, or owners can delete any message).