mirror of
https://github.com/matrix-org/dendrite
synced 2024-12-05 03:22:34 +01:00
Fix: Edited messages appear twice in fulltext search (#3363)
As stated in https://github.com/matrix-org/dendrite/issues/3358 the search response contains both original and edited message. This PR fixes it by removing of the original message from the fulltext index after indexing the edit message event. I also made some cosmetic changes/fixes i found in the code Signed-off-by: `Alexander Dubovikov <d.lexand@gmail.com>`
This commit is contained in:
parent
affb6977e4
commit
9897959731
6 changed files with 115 additions and 4 deletions
|
@ -328,7 +328,7 @@ func AdminPurgeRoom(req *http.Request, rsAPI roomserverAPI.ClientRoomserverAPI)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *api.Device, userAPI api.ClientUserAPI) util.JSONResponse {
|
func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *api.Device, userAPI userapi.ClientUserAPI) util.JSONResponse {
|
||||||
if req.Body == nil {
|
if req.Body == nil {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusBadRequest,
|
Code: http.StatusBadRequest,
|
||||||
|
@ -423,7 +423,7 @@ func AdminReindex(req *http.Request, cfg *config.ClientAPI, device *api.Device,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func AdminMarkAsStale(req *http.Request, cfg *config.ClientAPI, keyAPI api.ClientKeyAPI) util.JSONResponse {
|
func AdminMarkAsStale(req *http.Request, cfg *config.ClientAPI, keyAPI userapi.ClientKeyAPI) util.JSONResponse {
|
||||||
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
|
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
|
|
|
@ -70,7 +70,7 @@ func GetPushRulesByKind(ctx context.Context, scope, kind string, device *userapi
|
||||||
}
|
}
|
||||||
rulesPtr := pushRuleSetKindPointer(ruleSet, pushrules.Kind(kind))
|
rulesPtr := pushRuleSetKindPointer(ruleSet, pushrules.Kind(kind))
|
||||||
// Even if rulesPtr is not nil, there may not be any rules for this kind
|
// Even if rulesPtr is not nil, there may not be any rules for this kind
|
||||||
if rulesPtr == nil || (rulesPtr != nil && len(*rulesPtr) == 0) {
|
if rulesPtr == nil || len(*rulesPtr) == 0 {
|
||||||
return errorResponse(ctx, spec.InvalidParam("invalid push rules kind"), "pushRuleSetKindPointer failed")
|
return errorResponse(ctx, spec.InvalidParam("invalid push rules kind"), "pushRuleSetKindPointer failed")
|
||||||
}
|
}
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
|
|
@ -265,7 +265,7 @@ func createEvents(eventsJSON []string, roomVer gomatrixserverlib.RoomVersion) ([
|
||||||
for i, eventJSON := range eventsJSON {
|
for i, eventJSON := range eventsJSON {
|
||||||
pdu, evErr := roomVerImpl.NewEventFromTrustedJSON([]byte(eventJSON), false)
|
pdu, evErr := roomVerImpl.NewEventFromTrustedJSON([]byte(eventJSON), false)
|
||||||
if evErr != nil {
|
if evErr != nil {
|
||||||
return nil, fmt.Errorf("failed to make event: %s", err.Error())
|
return nil, fmt.Errorf("failed to make event: %s", evErr.Error())
|
||||||
}
|
}
|
||||||
ev := types.HeaderedEvent{PDU: pdu}
|
ev := types.HeaderedEvent{PDU: pdu}
|
||||||
events[i] = &ev
|
events[i] = &ev
|
||||||
|
|
|
@ -601,9 +601,11 @@ func (s *OutputRoomEventConsumer) writeFTS(ev *rstypes.HeaderedEvent, pduPositio
|
||||||
}
|
}
|
||||||
e.SetContentType(ev.Type())
|
e.SetContentType(ev.Type())
|
||||||
|
|
||||||
|
var relatesTo gjson.Result
|
||||||
switch ev.Type() {
|
switch ev.Type() {
|
||||||
case "m.room.message":
|
case "m.room.message":
|
||||||
e.Content = gjson.GetBytes(ev.Content(), "body").String()
|
e.Content = gjson.GetBytes(ev.Content(), "body").String()
|
||||||
|
relatesTo = gjson.GetBytes(ev.Content(), "m\\.relates_to")
|
||||||
case spec.MRoomName:
|
case spec.MRoomName:
|
||||||
e.Content = gjson.GetBytes(ev.Content(), "name").String()
|
e.Content = gjson.GetBytes(ev.Content(), "name").String()
|
||||||
case spec.MRoomTopic:
|
case spec.MRoomTopic:
|
||||||
|
@ -622,6 +624,22 @@ func (s *OutputRoomEventConsumer) writeFTS(ev *rstypes.HeaderedEvent, pduPositio
|
||||||
if err := s.fts.Index(e); err != nil {
|
if err := s.fts.Index(e); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// If the event is an edited message we remove the original event from the index
|
||||||
|
// to avoid duplicates in the search results.
|
||||||
|
if relatesTo.Exists() {
|
||||||
|
relatedData := relatesTo.Map()
|
||||||
|
if _, ok := relatedData["rel_type"]; ok && relatedData["rel_type"].Str == "m.replace" {
|
||||||
|
// We remove the original event from the index
|
||||||
|
if srcEventID, ok := relatedData["event_id"]; ok {
|
||||||
|
if err := s.fts.Delete(srcEventID.Str); err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"event_id": ev.EventID(),
|
||||||
|
"src_id": srcEventID.Str,
|
||||||
|
}).WithError(err).Error("Failed to delete edited message from the fulltext index")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,12 +4,14 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/mux"
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
@ -17,6 +19,7 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
|
|
||||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
@ -1324,6 +1327,95 @@ func TestUpdateRelations(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRemoveEditedEventFromSearchIndex(t *testing.T) {
|
||||||
|
user := test.NewUser(t)
|
||||||
|
alice := userapi.Device{
|
||||||
|
ID: "ALICEID",
|
||||||
|
UserID: user.ID,
|
||||||
|
AccessToken: "ALICE_BEARER_TOKEN",
|
||||||
|
DisplayName: "Alice",
|
||||||
|
AccountType: userapi.AccountTypeUser,
|
||||||
|
}
|
||||||
|
|
||||||
|
routers := httputil.NewRouters()
|
||||||
|
|
||||||
|
cfg, processCtx, close := testrig.CreateConfig(t, test.DBTypeSQLite)
|
||||||
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
||||||
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
||||||
|
defer close()
|
||||||
|
|
||||||
|
// Use an actual roomserver for this
|
||||||
|
natsInstance := jetstream.NATSInstance{}
|
||||||
|
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
|
||||||
|
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
|
||||||
|
|
||||||
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
|
|
||||||
|
room := test.NewRoom(t, user)
|
||||||
|
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)
|
||||||
|
|
||||||
|
if err := api.SendEvents(processCtx.Context(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
|
||||||
|
t.Fatalf("failed to send events: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ev1 := room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "first"})
|
||||||
|
ev2 := room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{
|
||||||
|
"body": " * first",
|
||||||
|
"m.new_content": map[string]interface{}{
|
||||||
|
"body": "first",
|
||||||
|
"msgtype": "m.text",
|
||||||
|
},
|
||||||
|
"m.relates_to": map[string]interface{}{
|
||||||
|
"event_id": ev1.EventID(),
|
||||||
|
"rel_type": "m.replace",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
events := []*rstypes.HeaderedEvent{ev1, ev2}
|
||||||
|
|
||||||
|
for _, e := range events {
|
||||||
|
roomEvents := append([]*rstypes.HeaderedEvent{}, e)
|
||||||
|
if err := api.SendEvents(processCtx.Context(), rsAPI, api.KindNew, roomEvents, "test", "test", "test", nil, false); err != nil {
|
||||||
|
t.Fatalf("failed to send events: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
|
||||||
|
// wait for the last sent eventID to come down sync
|
||||||
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, e.EventID())
|
||||||
|
|
||||||
|
return gjson.Get(syncBody, path).Exists()
|
||||||
|
})
|
||||||
|
|
||||||
|
// We search that event is the only one nad is the exact event we sent
|
||||||
|
searchResult := searchRequest(t, routers.Client, alice.AccessToken, "first", []string{room.ID})
|
||||||
|
results := gjson.GetBytes(searchResult, fmt.Sprintf(`search_categories.room_events.groups.room_id.%s.results`, room.ID))
|
||||||
|
assert.True(t, results.Exists(), "Should be a search response")
|
||||||
|
assert.Equal(t, 1, len(results.Array()), "Should be exactly one result")
|
||||||
|
assert.Equal(t, e.EventID(), results.Array()[0].String(), "Should be only found exact event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func searchRequest(t *testing.T, router *mux.Router, accessToken, searchTerm string, roomList []string) []byte {
|
||||||
|
t.Helper()
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
rq := test.NewRequest(t, "POST", "/_matrix/client/v3/search", test.WithQueryParams(map[string]string{
|
||||||
|
"access_token": accessToken,
|
||||||
|
}), test.WithJSONBody(t, map[string]interface{}{
|
||||||
|
"search_categories": map[string]interface{}{
|
||||||
|
"room_events": map[string]interface{}{
|
||||||
|
"filters": roomList,
|
||||||
|
"search_term": searchTerm,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
router.ServeHTTP(w, rq)
|
||||||
|
assert.Equal(t, 200, w.Code)
|
||||||
|
defer w.Result().Body.Close()
|
||||||
|
body, err := io.ReadAll(w.Result().Body)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
return body
|
||||||
|
}
|
||||||
func syncUntil(t *testing.T,
|
func syncUntil(t *testing.T,
|
||||||
routers httputil.Routers, accessToken string,
|
routers httputil.Routers, accessToken string,
|
||||||
skip bool,
|
skip bool,
|
||||||
|
|
|
@ -71,6 +71,7 @@ func CreateConfig(t *testing.T, dbType test.DBType) (*config.Dendrite, *process.
|
||||||
SingleDatabase: false,
|
SingleDatabase: false,
|
||||||
})
|
})
|
||||||
cfg.Global.ServerName = "test"
|
cfg.Global.ServerName = "test"
|
||||||
|
cfg.SyncAPI.Fulltext.Enabled = true
|
||||||
cfg.SyncAPI.Fulltext.InMemory = true
|
cfg.SyncAPI.Fulltext.InMemory = true
|
||||||
// use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use
|
// use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use
|
||||||
// the file system event with InMemory=true :(
|
// the file system event with InMemory=true :(
|
||||||
|
|
Loading…
Reference in a new issue