Skip to content

refactor(timeline): enhance the Timeline::send() and Timeline::send_reply() APIs when the timeline is threaded #5427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 23, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/matrix-sdk-ui/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ All notable changes to this project will be documented in this file.

## [Unreleased] - ReleaseDate

### Features

- `Timeline::send()` will now automatically fill the thread relationship, if the timeline has a
thread focus, and the sent event doesn't have a prefilled `relates_to` field (i.e. a relationship).
([5427](https://github.com/matrix-org/matrix-rust-sdk/pull/5427))

### Refactor

- [**breaking**] The MSRV has been bumped to Rust 1.88.
Expand Down
71 changes: 61 additions & 10 deletions crates/matrix-sdk-ui/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ use matrix_sdk::{
deserialized_responses::TimelineEvent,
event_cache::{EventCacheDropHandles, RoomEventCache},
executor::JoinHandle,
room::{Receipts, Room, edit::EditedContent, reply::Reply},
room::{
Receipts, Room,
edit::EditedContent,
reply::{EnforceThread, Reply},
},
send_queue::{RoomSendQueueError, SendHandle},
};
use mime::Mime;
Expand All @@ -46,7 +50,7 @@ use ruma::{
poll::unstable_start::{NewUnstablePollStartEventContent, UnstablePollStartEventContent},
receipt::{Receipt, ReceiptThread},
room::{
message::RoomMessageEventContentWithoutRelation,
message::{ReplyWithinThread, RoomMessageEventContentWithoutRelation},
pinned_events::RoomPinnedEventsEventContent,
},
},
Expand Down Expand Up @@ -270,18 +274,65 @@ impl Timeline {
/// If sending the message fails, the local echo item will change its
/// `send_state` to [`EventSendState::SendingFailed`].
///
/// This will do the right thing in the presence of threads:
/// - if this timeline is not focused on a thread, then it will send the
/// event as is.
/// - if this is a threaded timeline, and the event to send is a room
/// message without a relationship, it will automatically mark it as a
/// thread reply with the correct reply fallback, and send it.
///
/// # Arguments
///
/// * `content` - The content of the message event.
///
/// [`MessageLikeUnsigned`]: ruma::events::MessageLikeUnsigned
/// [`SyncMessageLikeEvent`]: ruma::events::SyncMessageLikeEvent
#[instrument(skip(self, content), fields(room_id = ?self.room().room_id()))]
pub async fn send(
&self,
content: AnyMessageLikeEventContent,
) -> Result<SendHandle, RoomSendQueueError> {
self.room().send_queue().send(content).await
pub async fn send(&self, content: AnyMessageLikeEventContent) -> Result<SendHandle, Error> {
// If this is a room event we're sending in a threaded timeline, we add the
// thread relation ourselves.
if let AnyMessageLikeEventContent::RoomMessage(ref room_msg_content) = content
&& room_msg_content.relates_to.is_none()
&& let Some(thread_root) = self.controller.thread_root()
{
// The latest event id is used for the reply-to fallback, for clients which
// don't handle threads. It should be correctly set to the latest
// event in the thread, which the timeline instance might or might
// not know about; in this case, we do a best effort of filling it, and resort
// to using the thread root if we don't know about any event.
//
// Note: we could trigger a back-pagination if the timeline is empty, and wait
// for the results, if the timeline is too often empty.
let latest_event_id = self
.controller
.items()
.await
.iter()
.rev()
.find_map(|item| {
if let TimelineItemKind::Event(event) = item.kind() {
event.event_id().map(ToOwned::to_owned)
} else {
None
}
})
.unwrap_or(thread_root);

let content = self
.room()
.make_reply_event(
// Note: this `.into()` gets rid of the relation, but we've checked previously
// that the `relates_to` field wasn't set.
room_msg_content.clone().into(),
Reply {
event_id: latest_event_id,
enforce_thread: EnforceThread::Threaded(ReplyWithinThread::No),
},
)
.await?;

Ok(self.room().send_queue().send(content.into()).await?)
} else {
// Otherwise, we send the message as is.
Ok(self.room().send_queue().send(content).await?)
}
}

/// Send a reply to the given event.
Expand Down
133 changes: 119 additions & 14 deletions crates/matrix-sdk-ui/tests/integration/timeline/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use eyeball_im::VectorDiff;
use futures_util::StreamExt as _;
use matrix_sdk::{
assert_let_timeout,
room::reply::{EnforceThread, Reply},
test_utils::mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
};
use matrix_sdk_test::{
Expand All @@ -32,8 +31,9 @@ use ruma::{
api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
event_id,
events::{
AnySyncTimelineEvent,
receipt::{ReceiptThread, ReceiptType},
room::message::{ReplyWithinThread, RoomMessageEventContentWithoutRelation},
room::message::{Relation, ReplacementMetadata, RoomMessageEventContent},
},
owned_event_id, room_id, user_id,
};
Expand Down Expand Up @@ -732,17 +732,12 @@ async fn test_thread_timeline_gets_local_echoes() {
// update.
let sent_event_id = event_id!("$sent_msg");
server.mock_room_state_encryption().plain().mount().await;
server.mock_room_send().ok(sent_event_id).mock_once().mount().await;
timeline
.send_reply(
RoomMessageEventContentWithoutRelation::text_plain("hello to you too!"),
Reply {
event_id: threaded_event_id.to_owned(),
enforce_thread: EnforceThread::Threaded(ReplyWithinThread::No),
},
)
.await
.unwrap();

let (event_receiver, mock_builder) =
server.mock_room_send().ok_with_capture(sent_event_id, *ALICE);
mock_builder.mock_once().mount().await;

timeline.send(RoomMessageEventContent::text_plain("hello to you too!").into()).await.unwrap();

// I get the local echo for the in-thread event.
assert_let_timeout!(Some(timeline_updates) = stream.next());
Expand All @@ -753,19 +748,41 @@ async fn test_thread_timeline_gets_local_echoes() {
assert!(event_item.is_local_echo());
assert!(event_item.event_id().is_none());

// The thread information is properly filled.
let msglike = event_item.content().as_msglike().unwrap();
assert_eq!(msglike.thread_root.as_ref(), Some(&thread_root_event_id));
assert!(msglike.in_reply_to.is_none());

// Then the local echo morphs into a sent local echo.
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 1);

assert_let!(VectorDiff::Set { index: 2, value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id(), Some(sent_event_id));
assert_eq!(event_item.event_id(), Some(sent_event_id));
assert!(event_item.content().reactions().unwrap().is_empty());

// Then nothing else.
assert_pending!(stream);

// The raw event includes the correctly reply-to fallback.
{
let raw_event = event_receiver.await.unwrap();
let event = raw_event.deserialize().unwrap();
assert_let!(
AnySyncTimelineEvent::MessageLike(
ruma::events::AnySyncMessageLikeEvent::RoomMessage(event)
) = event
);
let event = event.as_original().unwrap();
assert_let!(Some(Relation::Thread(thread)) = event.content.relates_to.clone());
assert_eq!(thread.event_id, thread_root_event_id);
assert!(thread.is_falling_back);
// The reply-to fallback is set to the latest in-thread event, not the thread
// root.
assert_eq!(thread.in_reply_to.unwrap().event_id, threaded_event_id);
}

// If I send a reaction for the in-thread event, the timeline gets updated, even
// though the reaction doesn't mention the thread directly.
server.mock_room_send().ok(event_id!("$reaction_id")).mock_once().mount().await;
Expand All @@ -791,6 +808,94 @@ async fn test_thread_timeline_gets_local_echoes() {
assert_pending!(stream);
}

#[async_test]
async fn test_thread_timeline_can_send_edit() {
// If I send an edit to a threaded timeline, it just works (aka the system to
// set the threaded relationship doesn't kick in, since there's already a
// relationship).

let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;

let room_id = room_id!("!a:b.c");
let thread_root_event_id = owned_event_id!("$root");
let threaded_event_id = event_id!("$threaded_event");

let room = server.sync_joined_room(&client, room_id).await;

let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Thread { root_event_id: thread_root_event_id.clone() })
.build()
.await
.unwrap();

let (initial_items, mut stream) = timeline.subscribe().await;

// At first, the timeline is empty.
assert!(initial_items.is_empty());
assert_pending!(stream);

// Start the timeline with an in-thread event.
let f = EventFactory::new();
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_event(
f.text_msg("hello world")
.sender(*ALICE)
.event_id(threaded_event_id)
.in_thread(&thread_root_event_id, threaded_event_id)
.server_ts(MilliSecondsSinceUnixEpoch::now()),
),
)
.await;

// Sanity check: I receive the event and the date divider.
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 2);

// If I send an edit to an in-thread event, the timeline receives an update.
// Note: it's a bit far fetched to send an edit without using
// `Timeline::edit()`, but since it's possible…
let sent_event_id = event_id!("$sent_msg");
server.mock_room_state_encryption().plain().mount().await;

// No mock_once here: this endpoint may or may not be called, depending on
// timing of the end of the test.
server.mock_room_send().ok(sent_event_id).mount().await;

timeline
.send(
RoomMessageEventContent::text_plain("bonjour monde")
.make_replacement(
ReplacementMetadata::new(threaded_event_id.to_owned(), None),
None,
)
.into(),
)
.await
.unwrap();

// I get the local echo for the in-thread event.
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 1);

assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();

// The thread information is (still) present.
let msglike = event_item.content().as_msglike().unwrap();
assert_eq!(msglike.thread_root.as_ref(), Some(&thread_root_event_id));
assert!(msglike.in_reply_to.is_none());

// Text is eagerly updated.
assert_eq!(msglike.as_message().unwrap().body(), "bonjour monde");

// Then we're done.
assert_pending!(stream);
}

#[async_test]
async fn test_sending_read_receipt_with_no_events_doesnt_unset_read_flag() {
// If a thread timeline has no events, then marking it as read doesn't unset the
Expand Down
Loading