Allow sending analytics to custom server

This commit is contained in:
Tulir Asokan 2023-09-29 14:19:48 +03:00
parent e1edb644c0
commit d2110f6ee7
8 changed files with 54 additions and 41 deletions

View file

@ -26,27 +26,26 @@ import (
"maunium.net/go/mautrix/id" "maunium.net/go/mautrix/id"
) )
const SegmentURL = "https://api.segment.io/v1/track" type AnalyticsClient struct {
url string
type SegmentClient struct {
key string key string
userID string userID string
log log.Logger log log.Logger
client http.Client client http.Client
} }
var Segment SegmentClient var Analytics AnalyticsClient
func (sc *SegmentClient) trackSync(userID id.UserID, event string, properties map[string]interface{}) error { func (sc *AnalyticsClient) trackSync(userID id.UserID, event string, properties map[string]interface{}) error {
var buf bytes.Buffer var buf bytes.Buffer
var segmentUserID string var analyticsUserID string
if Segment.userID != "" { if Analytics.userID != "" {
segmentUserID = Segment.userID analyticsUserID = Analytics.userID
} else { } else {
segmentUserID = userID.String() analyticsUserID = userID.String()
} }
err := json.NewEncoder(&buf).Encode(map[string]interface{}{ err := json.NewEncoder(&buf).Encode(map[string]interface{}{
"userId": segmentUserID, "userId": analyticsUserID,
"event": event, "event": event,
"properties": properties, "properties": properties,
}) })
@ -54,7 +53,7 @@ func (sc *SegmentClient) trackSync(userID id.UserID, event string, properties ma
return err return err
} }
req, err := http.NewRequest("POST", SegmentURL, &buf) req, err := http.NewRequest(http.MethodPost, sc.url, &buf)
if err != nil { if err != nil {
return err return err
} }
@ -70,11 +69,11 @@ func (sc *SegmentClient) trackSync(userID id.UserID, event string, properties ma
return nil return nil
} }
func (sc *SegmentClient) IsEnabled() bool { func (sc *AnalyticsClient) IsEnabled() bool {
return len(sc.key) > 0 return len(sc.key) > 0
} }
func (sc *SegmentClient) Track(userID id.UserID, event string, properties ...map[string]interface{}) { func (sc *AnalyticsClient) Track(userID id.UserID, event string, properties ...map[string]interface{}) {
if !sc.IsEnabled() { if !sc.IsEnabled() {
return return
} else if len(properties) > 1 { } else if len(properties) > 1 {

View file

@ -24,8 +24,11 @@ import (
type Config struct { type Config struct {
*bridgeconfig.BaseConfig `yaml:",inline"` *bridgeconfig.BaseConfig `yaml:",inline"`
SegmentKey string `yaml:"segment_key"` Analytics struct {
SegmentUserID string `yaml:"segment_user_id"` Host string `yaml:"host"`
Token string `yaml:"token"`
UserID string `yaml:"user_id"`
}
Metrics struct { Metrics struct {
Enabled bool `yaml:"enabled"` Enabled bool `yaml:"enabled"`

View file

@ -27,8 +27,9 @@ import (
func DoUpgrade(helper *up.Helper) { func DoUpgrade(helper *up.Helper) {
bridgeconfig.Upgrader.DoUpgrade(helper) bridgeconfig.Upgrader.DoUpgrade(helper)
helper.Copy(up.Str|up.Null, "segment_key") helper.Copy(up.Str|up.Null, "analytics", "host")
helper.Copy(up.Str|up.Null, "segment_user_id") helper.Copy(up.Str|up.Null, "analytics", "token")
helper.Copy(up.Str|up.Null, "analytics", "user_id")
helper.Copy(up.Bool, "metrics", "enabled") helper.Copy(up.Bool, "metrics", "enabled")
helper.Copy(up.Str, "metrics", "listen") helper.Copy(up.Str, "metrics", "listen")
@ -181,7 +182,7 @@ var SpacedBlocks = [][]string{
{"appservice", "database"}, {"appservice", "database"},
{"appservice", "id"}, {"appservice", "id"},
{"appservice", "as_token"}, {"appservice", "as_token"},
{"segment_key"}, {"analytics"},
{"metrics"}, {"metrics"},
{"whatsapp"}, {"whatsapp"},
{"bridge"}, {"bridge"},

View file

@ -76,10 +76,14 @@ appservice:
as_token: "This value is generated when generating the registration" as_token: "This value is generated when generating the registration"
hs_token: "This value is generated when generating the registration" hs_token: "This value is generated when generating the registration"
# Segment API key to track some events, like provisioning API login and encryption errors. # Segment-compatible analytics endpoint for tracking some events, like provisioning API login and encryption errors.
segment_key: null analytics:
# Optional user_id to use when sending Segment events. If null, defaults to using mxID. # Hostname of the tracking server. The path is hardcoded to /v1/track
segment_user_id: null host: api.segment.io
# API key to send with tracking requests. Tracking is disabled if this is null.
token: null
# Optional user ID for tracking events. If null, defaults to using Matrix user ID.
user_id: null
# Prometheus config. # Prometheus config.
metrics: metrics:

20
main.go
View file

@ -19,6 +19,7 @@ package main
import ( import (
_ "embed" _ "embed"
"net/http" "net/http"
"net/url"
"os" "os"
"strconv" "strconv"
"strings" "strings"
@ -90,13 +91,18 @@ func (br *WABridge) Init() {
br.EventProcessor.On(TypeMSC3381PollResponse, br.MatrixHandler.HandleMessage) br.EventProcessor.On(TypeMSC3381PollResponse, br.MatrixHandler.HandleMessage)
br.EventProcessor.On(TypeMSC3381V2PollResponse, br.MatrixHandler.HandleMessage) br.EventProcessor.On(TypeMSC3381V2PollResponse, br.MatrixHandler.HandleMessage)
Segment.log = br.Log.Sub("Segment") Analytics.log = br.Log.Sub("Analytics")
Segment.key = br.Config.SegmentKey Analytics.url = (&url.URL{
Segment.userID = br.Config.SegmentUserID Scheme: "https",
if Segment.IsEnabled() { Host: br.Config.Analytics.Host,
Segment.log.Infoln("Segment metrics are enabled") Path: "/v1/track",
if Segment.userID != "" { }).String()
Segment.log.Infoln("Overriding Segment user_id with %v", Segment.userID) Analytics.key = br.Config.Analytics.Token
Analytics.userID = br.Config.Analytics.UserID
if Analytics.IsEnabled() {
Analytics.log.Infoln("Analytics metrics are enabled")
if Analytics.userID != "" {
Analytics.log.Infoln("Overriding analytics user_id with %v", Analytics.userID)
} }
} }

View file

@ -773,7 +773,7 @@ func (portal *Portal) handleUndecryptableMessage(source *User, evt *events.Undec
if evt.IsUnavailable { if evt.IsUnavailable {
metricType = "unavailable" metricType = "unavailable"
} }
Segment.Track(source.MXID, "WhatsApp undecryptable message", map[string]interface{}{ Analytics.Track(source.MXID, "WhatsApp undecryptable message", map[string]interface{}{
"messageID": evt.Info.ID, "messageID": evt.Info.ID,
"undecryptableType": metricType, "undecryptableType": metricType,
}) })
@ -849,7 +849,7 @@ func (portal *Portal) handleMessage(source *User, evt *events.Message, historica
if evt.UnavailableRequestID != "" { if evt.UnavailableRequestID != "" {
resolveType = "phone" resolveType = "phone"
} }
Segment.Track(source.MXID, "WhatsApp undecryptable message resolved", map[string]interface{}{ Analytics.Track(source.MXID, "WhatsApp undecryptable message resolved", map[string]interface{}{
"messageID": evt.Info.ID, "messageID": evt.Info.ID,
"resolveType": resolveType, "resolveType": resolveType,
}) })

View file

@ -692,7 +692,7 @@ func (prov *ProvisioningAPI) Login(w http.ResponseWriter, r *http.Request) {
} }
} }
user.log.Debugln("Started login via provisioning API") user.log.Debugln("Started login via provisioning API")
Segment.Track(user.MXID, "$login_start") Analytics.Track(user.MXID, "$login_start")
for { for {
select { select {
@ -701,7 +701,7 @@ func (prov *ProvisioningAPI) Login(w http.ResponseWriter, r *http.Request) {
case whatsmeow.QRChannelSuccess.Event: case whatsmeow.QRChannelSuccess.Event:
jid := user.Client.Store.ID jid := user.Client.Store.ID
user.log.Debugln("Successful login as", jid, "via provisioning API") user.log.Debugln("Successful login as", jid, "via provisioning API")
Segment.Track(user.MXID, "$login_success") Analytics.Track(user.MXID, "$login_success")
_ = c.WriteJSON(map[string]interface{}{ _ = c.WriteJSON(map[string]interface{}{
"success": true, "success": true,
"jid": jid, "jid": jid,
@ -711,7 +711,7 @@ func (prov *ProvisioningAPI) Login(w http.ResponseWriter, r *http.Request) {
case whatsmeow.QRChannelTimeout.Event: case whatsmeow.QRChannelTimeout.Event:
user.log.Debugln("Login via provisioning API timed out") user.log.Debugln("Login via provisioning API timed out")
errCode := "login timed out" errCode := "login timed out"
Segment.Track(user.MXID, "$login_failure", map[string]interface{}{"error": errCode}) Analytics.Track(user.MXID, "$login_failure", map[string]interface{}{"error": errCode})
_ = c.WriteJSON(Error{ _ = c.WriteJSON(Error{
Error: "QR code scan timed out. Please try again.", Error: "QR code scan timed out. Please try again.",
ErrCode: errCode, ErrCode: errCode,
@ -719,7 +719,7 @@ func (prov *ProvisioningAPI) Login(w http.ResponseWriter, r *http.Request) {
case whatsmeow.QRChannelErrUnexpectedEvent.Event: case whatsmeow.QRChannelErrUnexpectedEvent.Event:
user.log.Debugln("Login via provisioning API failed due to unexpected event") user.log.Debugln("Login via provisioning API failed due to unexpected event")
errCode := "unexpected event" errCode := "unexpected event"
Segment.Track(user.MXID, "$login_failure", map[string]interface{}{"error": errCode}) Analytics.Track(user.MXID, "$login_failure", map[string]interface{}{"error": errCode})
_ = c.WriteJSON(Error{ _ = c.WriteJSON(Error{
Error: "Got unexpected event while waiting for QRs, perhaps you're already logged in?", Error: "Got unexpected event while waiting for QRs, perhaps you're already logged in?",
ErrCode: errCode, ErrCode: errCode,
@ -727,14 +727,14 @@ func (prov *ProvisioningAPI) Login(w http.ResponseWriter, r *http.Request) {
case whatsmeow.QRChannelClientOutdated.Event: case whatsmeow.QRChannelClientOutdated.Event:
user.log.Debugln("Login via provisioning API failed due to outdated client") user.log.Debugln("Login via provisioning API failed due to outdated client")
errCode := "bridge outdated" errCode := "bridge outdated"
Segment.Track(user.MXID, "$login_failure", map[string]interface{}{"error": errCode}) Analytics.Track(user.MXID, "$login_failure", map[string]interface{}{"error": errCode})
_ = c.WriteJSON(Error{ _ = c.WriteJSON(Error{
Error: "Got client outdated error while waiting for QRs. The bridge must be updated to continue.", Error: "Got client outdated error while waiting for QRs. The bridge must be updated to continue.",
ErrCode: errCode, ErrCode: errCode,
}) })
case whatsmeow.QRChannelScannedWithoutMultidevice.Event: case whatsmeow.QRChannelScannedWithoutMultidevice.Event:
errCode := "multidevice not enabled" errCode := "multidevice not enabled"
Segment.Track(user.MXID, "$login_failure", map[string]interface{}{"error": errCode}) Analytics.Track(user.MXID, "$login_failure", map[string]interface{}{"error": errCode})
_ = c.WriteJSON(Error{ _ = c.WriteJSON(Error{
Error: "Please enable the WhatsApp multidevice beta and scan the QR code again.", Error: "Please enable the WhatsApp multidevice beta and scan the QR code again.",
ErrCode: errCode, ErrCode: errCode,
@ -742,13 +742,13 @@ func (prov *ProvisioningAPI) Login(w http.ResponseWriter, r *http.Request) {
continue continue
case "error": case "error":
errCode := "fatal error" errCode := "fatal error"
Segment.Track(user.MXID, "$login_failure", map[string]interface{}{"error": errCode}) Analytics.Track(user.MXID, "$login_failure", map[string]interface{}{"error": errCode})
_ = c.WriteJSON(Error{ _ = c.WriteJSON(Error{
Error: "Fatal error while logging in", Error: "Fatal error while logging in",
ErrCode: errCode, ErrCode: errCode,
}) })
case "code": case "code":
Segment.Track(user.MXID, "$qrcode_retrieved") Analytics.Track(user.MXID, "$qrcode_retrieved")
_ = c.WriteJSON(map[string]interface{}{ _ = c.WriteJSON(map[string]interface{}{
"code": evt.Code, "code": evt.Code,
"timeout": int(evt.Timeout.Seconds()), "timeout": int(evt.Timeout.Seconds()),

View file

@ -498,7 +498,7 @@ func (user *User) createClient(sess *store.Device) {
user.Client.SetForceActiveDeliveryReceipts(user.bridge.Config.Bridge.ForceActiveDeliveryReceipts) user.Client.SetForceActiveDeliveryReceipts(user.bridge.Config.Bridge.ForceActiveDeliveryReceipts)
user.Client.AutomaticMessageRerequestFromPhone = true user.Client.AutomaticMessageRerequestFromPhone = true
user.Client.GetMessageForRetry = func(requester, to types.JID, id types.MessageID) *waProto.Message { user.Client.GetMessageForRetry = func(requester, to types.JID, id types.MessageID) *waProto.Message {
Segment.Track(user.MXID, "WhatsApp incoming retry (message not found)", map[string]interface{}{ Analytics.Track(user.MXID, "WhatsApp incoming retry (message not found)", map[string]interface{}{
"requester": user.obfuscateJID(requester), "requester": user.obfuscateJID(requester),
"messageID": id, "messageID": id,
}) })
@ -506,7 +506,7 @@ func (user *User) createClient(sess *store.Device) {
return nil return nil
} }
user.Client.PreRetryCallback = func(receipt *events.Receipt, messageID types.MessageID, retryCount int, msg *waProto.Message) bool { user.Client.PreRetryCallback = func(receipt *events.Receipt, messageID types.MessageID, retryCount int, msg *waProto.Message) bool {
Segment.Track(user.MXID, "WhatsApp incoming retry (accepted)", map[string]interface{}{ Analytics.Track(user.MXID, "WhatsApp incoming retry (accepted)", map[string]interface{}{
"requester": user.obfuscateJID(receipt.Sender), "requester": user.obfuscateJID(receipt.Sender),
"messageID": messageID, "messageID": messageID,
"retryCount": retryCount, "retryCount": retryCount,