From 6739f65752e1a61ce77544c69cb478b7ea2ecdf3 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 15 Mar 2017 11:22:40 +0000 Subject: [PATCH] Implement event sending part of /rooms/$roomid/send/$type/$txnid (#39) This involves: - Parsing the HTTP request - Requesting auth events from the roomserver via the Query API - Building the event - Doing auth checks on the event - Sending it to the roomserver input log --- .../dendrite/clientapi/clientapi.go | 7 +- .../dendrite/clientapi/config/config.go | 2 + .../dendrite/clientapi/routing/routing.go | 7 +- .../dendrite/clientapi/writers/sendmessage.go | 148 ++++++++++++++++-- 4 files changed, 149 insertions(+), 15 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/clientapi.go b/src/github.com/matrix-org/dendrite/clientapi/clientapi.go index 9f17edea1..03d03d33d 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/clientapi.go +++ b/src/github.com/matrix-org/dendrite/clientapi/clientapi.go @@ -9,6 +9,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/routing" + "github.com/matrix-org/dendrite/roomserver/api" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dugong" @@ -52,7 +53,8 @@ func main() { KeyID: "ed25519:something", PrivateKey: privKey, KafkaProducerURIs: []string{"localhost:9092"}, - ClientAPIOutputTopic: "clientapiOutput", + ClientAPIOutputTopic: "roomserverInput", + RoomserverURL: "http://localhost:7777", } log.Info("Starting clientapi") @@ -61,7 +63,8 @@ func main() { if err != nil { log.Panicf("Failed to setup kafka producers(%s): %s", cfg.KafkaProducerURIs, err) } + queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil) - routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, producer) + routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, producer, queryAPI) log.Fatal(http.ListenAndServe(bindAddr, nil)) } diff --git a/src/github.com/matrix-org/dendrite/clientapi/config/config.go b/src/github.com/matrix-org/dendrite/clientapi/config/config.go index 001da837b..7f048fca9 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/config/config.go +++ b/src/github.com/matrix-org/dendrite/clientapi/config/config.go @@ -15,4 +15,6 @@ type ClientAPI struct { KafkaProducerURIs []string // The topic for events which are written to the logs. ClientAPIOutputTopic string + // The URL of the roomserver which can service Query API requests + RoomserverURL string } diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index a98c05b57..24943da2d 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -7,6 +7,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/readers" "github.com/matrix-org/dendrite/clientapi/writers" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" sarama "gopkg.in/Shopify/sarama.v1" @@ -16,7 +17,7 @@ const pathPrefixR0 = "/_matrix/client/r0" // Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client // to clients which need to make outbound HTTP requests. -func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI, producer sarama.SyncProducer) { +func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI, producer sarama.SyncProducer, queryAPI api.RoomserverQueryAPI) { apiMux := mux.NewRouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux.Handle("/createRoom", make("createRoom", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { @@ -25,10 +26,10 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { return readers.Sync(req) }))) - r0mux.Handle("/rooms/{roomID}/send/{eventType}", + r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}", make("send_message", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { vars := mux.Vars(req) - return writers.SendMessage(req, vars["roomID"], vars["eventType"]) + return writers.SendMessage(req, vars["roomID"], vars["eventType"], vars["txnID"], cfg, queryAPI, producer) })), ) diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/sendmessage.go b/src/github.com/matrix-org/dendrite/clientapi/writers/sendmessage.go index bbb432ec8..717025fa8 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/sendmessage.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/sendmessage.go @@ -3,22 +3,150 @@ package writers import ( "net/http" - log "github.com/Sirupsen/logrus" + "encoding/json" + "fmt" "github.com/matrix-org/dendrite/clientapi/auth" + "github.com/matrix-org/dendrite/clientapi/config" + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + sarama "gopkg.in/Shopify/sarama.v1" + "time" ) -// SendMessage implements /rooms/{roomID}/send/{eventType} -func SendMessage(req *http.Request, roomID, eventType string) util.JSONResponse { - logger := util.GetLogger(req.Context()) +// http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid +type sendMessageResponse struct { + EventID string `json:"event_id"` +} + +// SendMessage implements /rooms/{roomID}/send/{eventType}/{txnID} +func SendMessage(req *http.Request, roomID, eventType, txnID string, cfg config.ClientAPI, queryAPI api.RoomserverQueryAPI, producer sarama.SyncProducer) util.JSONResponse { + // parse the incoming http request userID, resErr := auth.VerifyAccessToken(req) if resErr != nil { return *resErr } - logger.WithFields(log.Fields{ - "roomID": roomID, - "eventType": eventType, - "userID": userID, - }).Info("Doing stuff...") - return util.MessageResponse(404, "Not implemented yet") + var r map[string]interface{} // must be a JSON object + resErr = httputil.UnmarshalJSONRequest(req, &r) + if resErr != nil { + return *resErr + } + + // create the new event and set all the fields we can + builder := gomatrixserverlib.EventBuilder{ + Sender: userID, + RoomID: roomID, + Type: eventType, + StateKey: nil, + } + builder.SetContent(r) + + // work out what will be required in order to send this event + requiredStateEvents, err := stateNeeded(&builder) + if err != nil { + return httputil.LogThenError(req, err) + } + + // Ask the roomserver for information about this room + queryReq := api.QueryLatestEventsAndStateRequest{ + RoomID: roomID, + StateToFetch: requiredStateEvents, + } + var queryRes api.QueryLatestEventsAndStateResponse + if queryErr := queryAPI.QueryLatestEventsAndState(&queryReq, &queryRes); queryErr != nil { + return httputil.LogThenError(req, queryErr) + } + if !queryRes.RoomExists { + return util.JSONResponse{ + Code: 404, + JSON: jsonerror.NotFound("Room does not exist"), + } + } + + // set the fields we previously couldn't do and build the event + builder.PrevEvents = queryRes.LatestEvents // the current events will be the prev events of the new event + var refs []gomatrixserverlib.EventReference + for _, e := range queryRes.StateEvents { + refs = append(refs, e.EventReference()) + } + builder.AuthEvents = refs + eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.ServerName) + e, err := builder.Build(eventID, time.Now(), cfg.ServerName, cfg.KeyID, cfg.PrivateKey) + if err != nil { + return httputil.LogThenError(req, err) + } + + // check to see if this user can perform this operation + stateEvents := make([]*gomatrixserverlib.Event, len(queryRes.StateEvents)) + for i := range queryRes.StateEvents { + stateEvents[i] = &queryRes.StateEvents[i] + } + provider := gomatrixserverlib.NewAuthEvents(stateEvents) + if err = gomatrixserverlib.Allowed(e, &provider); err != nil { + return util.JSONResponse{ + Code: 403, + JSON: jsonerror.Forbidden(err.Error()), // TODO: Is this error string comprehensible to the client? + } + } + + // pass the new event to the roomserver + if err := sendToRoomserver(e, producer, cfg.ClientAPIOutputTopic); err != nil { + return httputil.LogThenError(req, err) + } + + return util.JSONResponse{ + Code: 200, + JSON: sendMessageResponse{e.EventID()}, + } +} + +func sendToRoomserver(e gomatrixserverlib.Event, producer sarama.SyncProducer, topic string) error { + var authEventIDs []string + for _, ref := range e.AuthEvents() { + authEventIDs = append(authEventIDs, ref.EventID) + } + ire := api.InputRoomEvent{ + Kind: api.KindNew, + Event: e.JSON(), + AuthEventIDs: authEventIDs, + } + + value, err := json.Marshal(ire) + if err != nil { + return err + } + var m sarama.ProducerMessage + m.Topic = topic + m.Key = sarama.StringEncoder(e.EventID()) + m.Value = sarama.ByteEncoder(value) + if _, _, err := producer.SendMessage(&m); err != nil { + return err + } + return nil +} + +func stateNeeded(builder *gomatrixserverlib.EventBuilder) (requiredStateEvents []common.StateKeyTuple, err error) { + authEvents, err := gomatrixserverlib.StateNeededForEventBuilder(builder) + if err != nil { + return + } + if authEvents.Create { + requiredStateEvents = append(requiredStateEvents, common.StateKeyTuple{"m.room.create", ""}) + } + if authEvents.JoinRules { + requiredStateEvents = append(requiredStateEvents, common.StateKeyTuple{"m.room.join_rules", ""}) + } + if authEvents.PowerLevels { + requiredStateEvents = append(requiredStateEvents, common.StateKeyTuple{"m.room.power_levels", ""}) + } + for _, userID := range authEvents.Member { + requiredStateEvents = append(requiredStateEvents, common.StateKeyTuple{"m.room.member", userID}) + } + for _, token := range authEvents.ThirdPartyInvite { + requiredStateEvents = append(requiredStateEvents, common.StateKeyTuple{"m.room.third_party_invite", token}) + } + return }