mirror of
https://github.com/matrix-org/dendrite
synced 2025-01-21 13:41:56 +01:00
Yggdrasil demo updates
Squashed commit of the following: commit 6c2c48f862c1b6f8e741c57804282eceffe02487 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Fri Jul 10 16:28:09 2020 +0100 Add README.md commit 5eeefdadf8e3881dd7a32559a92be49bd7ddaf47 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Fri Jul 10 10:18:50 2020 +0100 Fix wedge in federation sender commit e2ebffbfba25cf82378393940a613ec32bfb909f Merge: 0883ef88abf26c12
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Fri Jul 10 09:51:23 2020 +0100 Merge branch 'master' into neilalexander/yggdrasil commit 0883ef8870e340f2ae9a0c37ed939dc2ab9911f6 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Fri Jul 10 09:51:06 2020 +0100 Adjust timeouts commit ba2d53199910f13b60cc892debe96a962e8c9acb Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 9 16:34:40 2020 +0100 Try to wake up from peers/sessions properly commit 73f42eb494741ba5b0e0cef43654708e3c8eb399 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 9 15:43:38 2020 +0100 Use TransactionWriter to reduce database lock issues on SQLite commit 08bfe63241a18c58c539c91b9f52edccda63a611 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 9 12:38:02 2020 +0100 Un-wedge federation Squashed commit of the following: commitaee933f878
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 9 12:22:41 2020 +0100 Un-goroutine the goroutines commit478374e5d1
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 9 12:09:31 2020 +0100 Reduce federation sender wedges commit 40cc62c54d9e3a863868214c48b7c18e522a4772 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Thu Jul 9 10:02:52 2020 +0100 Handle switching in/out background more reliably
This commit is contained in:
parent
abf26c12f1
commit
08e9d996b6
10 changed files with 128 additions and 75 deletions
|
@ -3,6 +3,7 @@ package gobind
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -25,12 +26,17 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
||||||
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
type DendriteMonolith struct {
|
type DendriteMonolith struct {
|
||||||
|
logger logrus.Logger
|
||||||
YggdrasilNode *yggconn.Node
|
YggdrasilNode *yggconn.Node
|
||||||
StorageDirectory string
|
StorageDirectory string
|
||||||
listener net.Listener
|
listener net.Listener
|
||||||
|
httpServer *http.Server
|
||||||
|
httpListening atomic.Bool
|
||||||
|
yggListening atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *DendriteMonolith) BaseURL() string {
|
func (m *DendriteMonolith) BaseURL() string {
|
||||||
|
@ -58,9 +64,10 @@ func (m *DendriteMonolith) DisconnectMulticastPeers() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *DendriteMonolith) Start() {
|
func (m *DendriteMonolith) Start() {
|
||||||
logger := logrus.Logger{
|
m.logger = logrus.Logger{
|
||||||
Out: BindLogger{},
|
Out: BindLogger{},
|
||||||
}
|
}
|
||||||
|
m.logger.SetOutput(BindLogger{})
|
||||||
logrus.SetOutput(BindLogger{})
|
logrus.SetOutput(BindLogger{})
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
@ -162,38 +169,39 @@ func (m *DendriteMonolith) Start() {
|
||||||
base.UseHTTPAPIs,
|
base.UseHTTPAPIs,
|
||||||
)
|
)
|
||||||
|
|
||||||
ygg.NotifySessionNew(func(boxPubKey crypto.BoxPubKey) {
|
ygg.NewSession = func(serverName gomatrixserverlib.ServerName) {
|
||||||
serv := gomatrixserverlib.ServerName(boxPubKey.String())
|
logrus.Infof("Found new session %q", serverName)
|
||||||
|
time.Sleep(time.Second * 3)
|
||||||
req := &api.PerformServersAliveRequest{
|
req := &api.PerformServersAliveRequest{
|
||||||
Servers: []gomatrixserverlib.ServerName{serv},
|
Servers: []gomatrixserverlib.ServerName{serverName},
|
||||||
}
|
}
|
||||||
res := &api.PerformServersAliveResponse{}
|
res := &api.PerformServersAliveResponse{}
|
||||||
if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil {
|
if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil {
|
||||||
logrus.WithError(err).Warnf("Failed to notify server %q alive due to new session", serv)
|
logrus.WithError(err).Warn("Failed to notify server alive due to new session")
|
||||||
} else {
|
|
||||||
logrus.Infof("Notified server %q alive due to new session", serv)
|
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
|
|
||||||
ygg.NotifyLinkNew(func(boxPubKey crypto.BoxPubKey, linkType, remote string) {
|
ygg.NotifyLinkNew(func(_ crypto.BoxPubKey, sigPubKey crypto.SigPubKey, linkType, remote string) {
|
||||||
serv := gomatrixserverlib.ServerName(boxPubKey.String())
|
serverName := hex.EncodeToString(sigPubKey[:])
|
||||||
|
logrus.Infof("Found new peer %q", serverName)
|
||||||
|
time.Sleep(time.Second * 3)
|
||||||
req := &api.PerformServersAliveRequest{
|
req := &api.PerformServersAliveRequest{
|
||||||
Servers: []gomatrixserverlib.ServerName{serv},
|
Servers: []gomatrixserverlib.ServerName{
|
||||||
|
gomatrixserverlib.ServerName(serverName),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
res := &api.PerformServersAliveResponse{}
|
res := &api.PerformServersAliveResponse{}
|
||||||
if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil {
|
if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil {
|
||||||
logrus.WithError(err).Warnf("Failed to notify server %q alive due to new peer", serv)
|
logrus.WithError(err).Warn("Failed to notify server alive due to new session")
|
||||||
} else {
|
|
||||||
logrus.Infof("Notified server %q alive due to new peer", serv)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// Build both ends of a HTTP multiplex.
|
// Build both ends of a HTTP multiplex.
|
||||||
httpServer := &http.Server{
|
m.httpServer = &http.Server{
|
||||||
Addr: ":0",
|
Addr: ":0",
|
||||||
TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
|
TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
|
||||||
ReadTimeout: 15 * time.Second,
|
ReadTimeout: 30 * time.Second,
|
||||||
WriteTimeout: 45 * time.Second,
|
WriteTimeout: 30 * time.Second,
|
||||||
IdleTimeout: 60 * time.Second,
|
IdleTimeout: 60 * time.Second,
|
||||||
BaseContext: func(_ net.Listener) context.Context {
|
BaseContext: func(_ net.Listener) context.Context {
|
||||||
return context.Background()
|
return context.Background()
|
||||||
|
@ -201,19 +209,33 @@ func (m *DendriteMonolith) Start() {
|
||||||
Handler: base.BaseMux,
|
Handler: base.BaseMux,
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
m.Resume()
|
||||||
logger.Info("Listening on ", ygg.DerivedServerName())
|
|
||||||
logger.Fatal(httpServer.Serve(ygg))
|
|
||||||
}()
|
|
||||||
go func() {
|
|
||||||
logger.Info("Listening on ", m.BaseURL())
|
|
||||||
logger.Fatal(httpServer.Serve(m.listener))
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *DendriteMonolith) Stop() {
|
func (m *DendriteMonolith) Resume() {
|
||||||
if err := m.listener.Close(); err != nil {
|
logrus.Info("Resuming monolith")
|
||||||
logrus.Warn("Error stopping listener:", err)
|
if listener, err := net.Listen("tcp", "localhost:65432"); err == nil {
|
||||||
|
m.listener = listener
|
||||||
|
}
|
||||||
|
if m.yggListening.CAS(false, true) {
|
||||||
|
go func() {
|
||||||
|
m.logger.Info("Listening on ", m.YggdrasilNode.DerivedServerName())
|
||||||
|
m.logger.Fatal(m.httpServer.Serve(m.YggdrasilNode))
|
||||||
|
m.yggListening.Store(false)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
if m.httpListening.CAS(false, true) {
|
||||||
|
go func() {
|
||||||
|
m.logger.Info("Listening on ", m.BaseURL())
|
||||||
|
m.logger.Fatal(m.httpServer.Serve(m.listener))
|
||||||
|
m.httpListening.Store(false)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *DendriteMonolith) Suspend() {
|
||||||
|
m.logger.Info("Suspending monolith")
|
||||||
|
if err := m.httpServer.Close(); err != nil {
|
||||||
|
m.logger.Warn("Error stopping HTTP server:", err)
|
||||||
}
|
}
|
||||||
m.YggdrasilNode.Stop()
|
|
||||||
}
|
}
|
||||||
|
|
22
cmd/dendrite-demo-yggdrasil/README.md
Normal file
22
cmd/dendrite-demo-yggdrasil/README.md
Normal file
|
@ -0,0 +1,22 @@
|
||||||
|
# Yggdrasil Demo
|
||||||
|
|
||||||
|
This is the Dendrite Yggdrasil demo! It's easy to get started - all you need is Go 1.13 or later.
|
||||||
|
|
||||||
|
To run the homeserver, start at the root of the Dendrite repository and run:
|
||||||
|
|
||||||
|
```
|
||||||
|
go run ./cmd/dendrite-demo-yggdrasil
|
||||||
|
```
|
||||||
|
|
||||||
|
The following command line arguments are accepted:
|
||||||
|
|
||||||
|
* `-peer tcp://a.b.c.d:e` to specify a static Yggdrasil peer to connect to - you will need to supply this if you do not have another Yggdrasil node on your network
|
||||||
|
* `-port 12345` to specify a port to listen on for client connections
|
||||||
|
|
||||||
|
If you need to find an internet peer, take a look at [this list](https://publicpeers.neilalexander.dev/).
|
||||||
|
|
||||||
|
Then point your favourite Matrix client to the homeserver URL`http://localhost:8008` (or whichever `-port` you specified), create an account and log in.
|
||||||
|
|
||||||
|
If your peering connection is operational then you should see a `Connected TCP:` line in the log output. If not then try a different peer.
|
||||||
|
|
||||||
|
Once logged in, you should be able to open the room directory or join a room by its ID.
|
|
@ -17,6 +17,7 @@ package main
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"encoding/hex"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
@ -154,27 +155,28 @@ func main() {
|
||||||
base.UseHTTPAPIs,
|
base.UseHTTPAPIs,
|
||||||
)
|
)
|
||||||
|
|
||||||
ygg.NotifySessionNew(func(boxPubKey crypto.BoxPubKey) {
|
ygg.NewSession = func(serverName gomatrixserverlib.ServerName) {
|
||||||
|
logrus.Infof("Found new session %q", serverName)
|
||||||
req := &api.PerformServersAliveRequest{
|
req := &api.PerformServersAliveRequest{
|
||||||
Servers: []gomatrixserverlib.ServerName{
|
Servers: []gomatrixserverlib.ServerName{serverName},
|
||||||
gomatrixserverlib.ServerName(boxPubKey.String()),
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
res := &api.PerformServersAliveResponse{}
|
res := &api.PerformServersAliveResponse{}
|
||||||
if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil {
|
if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil {
|
||||||
logrus.WithError(err).Warn("Failed to notify server alive due to new session")
|
logrus.WithError(err).Warn("Failed to notify server alive due to new session")
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
|
|
||||||
ygg.NotifyLinkNew(func(boxPubKey crypto.BoxPubKey, linkType, remote string) {
|
ygg.NotifyLinkNew(func(_ crypto.BoxPubKey, sigPubKey crypto.SigPubKey, linkType, remote string) {
|
||||||
|
serverName := hex.EncodeToString(sigPubKey[:])
|
||||||
|
logrus.Infof("Found new peer %q", serverName)
|
||||||
req := &api.PerformServersAliveRequest{
|
req := &api.PerformServersAliveRequest{
|
||||||
Servers: []gomatrixserverlib.ServerName{
|
Servers: []gomatrixserverlib.ServerName{
|
||||||
gomatrixserverlib.ServerName(boxPubKey.String()),
|
gomatrixserverlib.ServerName(serverName),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
res := &api.PerformServersAliveResponse{}
|
res := &api.PerformServersAliveResponse{}
|
||||||
if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil {
|
if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil {
|
||||||
logrus.WithError(err).Warn("Failed to notify server alive due to new link")
|
logrus.WithError(err).Warn("Failed to notify server alive due to new session")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -46,7 +46,8 @@ func (n *Node) CreateClient(
|
||||||
tr.RegisterProtocol(
|
tr.RegisterProtocol(
|
||||||
"matrix", &yggroundtripper{
|
"matrix", &yggroundtripper{
|
||||||
inner: &http.Transport{
|
inner: &http.Transport{
|
||||||
ResponseHeaderTimeout: 15 * time.Second,
|
TLSHandshakeTimeout: 20 * time.Second,
|
||||||
|
ResponseHeaderTimeout: 10 * time.Second,
|
||||||
IdleConnTimeout: 60 * time.Second,
|
IdleConnTimeout: 60 * time.Second,
|
||||||
DialContext: n.yggdialerctx,
|
DialContext: n.yggdialerctx,
|
||||||
},
|
},
|
||||||
|
@ -62,7 +63,8 @@ func (n *Node) CreateFederationClient(
|
||||||
tr.RegisterProtocol(
|
tr.RegisterProtocol(
|
||||||
"matrix", &yggroundtripper{
|
"matrix", &yggroundtripper{
|
||||||
inner: &http.Transport{
|
inner: &http.Transport{
|
||||||
ResponseHeaderTimeout: 15 * time.Second,
|
TLSHandshakeTimeout: 20 * time.Second,
|
||||||
|
ResponseHeaderTimeout: 10 * time.Second,
|
||||||
IdleConnTimeout: 60 * time.Second,
|
IdleConnTimeout: 60 * time.Second,
|
||||||
DialContext: n.yggdialerctx,
|
DialContext: n.yggdialerctx,
|
||||||
},
|
},
|
||||||
|
|
|
@ -55,6 +55,7 @@ type Node struct {
|
||||||
quicConfig *quic.Config
|
quicConfig *quic.Config
|
||||||
sessions sync.Map // string -> quic.Session
|
sessions sync.Map // string -> quic.Session
|
||||||
incoming chan QUICStream
|
incoming chan QUICStream
|
||||||
|
NewSession func(remote gomatrixserverlib.ServerName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) BuildName() string {
|
func (n *Node) BuildName() string {
|
||||||
|
@ -137,7 +138,7 @@ func Setup(instanceName, storageDirectory string) (*Node, error) {
|
||||||
MaxIncomingStreams: 0,
|
MaxIncomingStreams: 0,
|
||||||
MaxIncomingUniStreams: 0,
|
MaxIncomingUniStreams: 0,
|
||||||
KeepAlive: true,
|
KeepAlive: true,
|
||||||
MaxIdleTimeout: time.Second * 60,
|
MaxIdleTimeout: time.Minute * 15,
|
||||||
HandshakeTimeout: time.Second * 15,
|
HandshakeTimeout: time.Second * 15,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,7 +190,9 @@ func (n *Node) PeerCount() int {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) KnownNodes() []gomatrixserverlib.ServerName {
|
func (n *Node) KnownNodes() []gomatrixserverlib.ServerName {
|
||||||
nodemap := map[string]struct{}{}
|
nodemap := map[string]struct{}{
|
||||||
|
"b5ae50589e50991dd9dd7d59c5c5f7a4521e8da5b603b7f57076272abc58b374": struct{}{},
|
||||||
|
}
|
||||||
for _, peer := range n.core.GetSwitchPeers() {
|
for _, peer := range n.core.GetSwitchPeers() {
|
||||||
nodemap[hex.EncodeToString(peer.SigningKey[:])] = struct{}{}
|
nodemap[hex.EncodeToString(peer.SigningKey[:])] = struct{}{}
|
||||||
}
|
}
|
||||||
|
@ -264,18 +267,10 @@ func (n *Node) SetStaticPeer(uri string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) NotifyLinkNew(f func(boxPubKey crypto.BoxPubKey, linkType, remote string)) {
|
func (n *Node) NotifyLinkNew(f func(boxPubKey crypto.BoxPubKey, sigPubKey crypto.SigPubKey, linkType, remote string)) {
|
||||||
n.core.NotifyLinkNew(f)
|
n.core.NotifyLinkNew(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) NotifyLinkGone(f func(boxPubKey crypto.BoxPubKey, linkType, remote string)) {
|
func (n *Node) NotifyLinkGone(f func(boxPubKey crypto.BoxPubKey, sigPubKey crypto.SigPubKey, linkType, remote string)) {
|
||||||
n.core.NotifyLinkGone(f)
|
n.core.NotifyLinkGone(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) NotifySessionNew(f func(boxPubKey crypto.BoxPubKey)) {
|
|
||||||
n.core.NotifySessionNew(f)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Node) NotifySessionGone(f func(boxPubKey crypto.BoxPubKey)) {
|
|
||||||
n.core.NotifySessionGone(f)
|
|
||||||
}
|
|
||||||
|
|
|
@ -29,6 +29,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lucas-clemente/quic-go"
|
"github.com/lucas-clemente/quic-go"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -56,6 +57,12 @@ func (n *Node) listenFromYgg() {
|
||||||
func (n *Node) listenFromQUIC(session quic.Session) {
|
func (n *Node) listenFromQUIC(session quic.Session) {
|
||||||
n.sessions.Store(session.RemoteAddr().String(), session)
|
n.sessions.Store(session.RemoteAddr().String(), session)
|
||||||
defer n.sessions.Delete(session.RemoteAddr())
|
defer n.sessions.Delete(session.RemoteAddr())
|
||||||
|
if n.NewSession != nil {
|
||||||
|
if len(session.ConnectionState().PeerCertificates) == 1 {
|
||||||
|
subjectName := session.ConnectionState().PeerCertificates[0].Subject.CommonName
|
||||||
|
go n.NewSession(gomatrixserverlib.ServerName(subjectName))
|
||||||
|
}
|
||||||
|
}
|
||||||
for {
|
for {
|
||||||
st, err := session.AcceptStream(context.TODO())
|
st, err := session.AcceptStream(context.TODO())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -256,7 +256,10 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
// PDUs waiting to be sent. By sending a message into the wake chan,
|
// PDUs waiting to be sent. By sending a message into the wake chan,
|
||||||
// the next loop iteration will try processing these PDUs again,
|
// the next loop iteration will try processing these PDUs again,
|
||||||
// subject to the backoff.
|
// subject to the backoff.
|
||||||
oq.notifyPDUs <- true
|
select {
|
||||||
|
case oq.notifyPDUs <- true:
|
||||||
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if transaction {
|
} else if transaction {
|
||||||
// If we successfully sent the transaction then clear out
|
// If we successfully sent the transaction then clear out
|
||||||
|
@ -384,7 +387,7 @@ func (oq *destinationQueue) nextTransaction(
|
||||||
// TODO: we should check for 500-ish fails vs 400-ish here,
|
// TODO: we should check for 500-ish fails vs 400-ish here,
|
||||||
// since we shouldn't queue things indefinitely in response
|
// since we shouldn't queue things indefinitely in response
|
||||||
// to a 400-ish error
|
// to a 400-ish error
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), time.Second*15)
|
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
_, err = oq.client.SendTransaction(ctx, t)
|
_, err = oq.client.SendTransaction(ctx, t)
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
|
|
|
@ -239,39 +239,37 @@ func (d *Database) CleanTransactionPDUs(
|
||||||
serverName gomatrixserverlib.ServerName,
|
serverName gomatrixserverlib.ServerName,
|
||||||
transactionID gomatrixserverlib.TransactionID,
|
transactionID gomatrixserverlib.TransactionID,
|
||||||
) error {
|
) error {
|
||||||
var err error
|
|
||||||
var nids []int64
|
|
||||||
var deleteNIDs []int64
|
var deleteNIDs []int64
|
||||||
|
nids, err := d.selectQueuePDUs(ctx, nil, serverName, transactionID, 50)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.selectQueuePDUs: %w", err)
|
||||||
|
}
|
||||||
if err = d.queuePDUsWriter.Do(d.db, func(txn *sql.Tx) error {
|
if err = d.queuePDUsWriter.Do(d.db, func(txn *sql.Tx) error {
|
||||||
nids, err = d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("d.selectQueuePDUs: %w", err)
|
|
||||||
}
|
|
||||||
if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil {
|
if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil {
|
||||||
return fmt.Errorf("d.deleteQueueTransaction: %w", err)
|
return fmt.Errorf("d.deleteQueueTransaction: %w", err)
|
||||||
}
|
}
|
||||||
var count int64
|
|
||||||
for _, nid := range nids {
|
|
||||||
count, err = d.selectQueueReferenceJSONCount(ctx, txn, nid)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err)
|
|
||||||
}
|
|
||||||
if count == 0 {
|
|
||||||
deleteNIDs = append(deleteNIDs, nid)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = d.queueJSONWriter.Do(d.db, func(txn *sql.Tx) error {
|
var count int64
|
||||||
if len(deleteNIDs) > 0 {
|
for _, nid := range nids {
|
||||||
|
count, err = d.selectQueueReferenceJSONCount(ctx, nil, nid)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err)
|
||||||
|
}
|
||||||
|
if count == 0 {
|
||||||
|
deleteNIDs = append(deleteNIDs, nid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(deleteNIDs) > 0 {
|
||||||
|
err = d.queueJSONWriter.Do(d.db, func(txn *sql.Tx) error {
|
||||||
if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
|
if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
|
||||||
return fmt.Errorf("d.deleteQueueJSON: %w", err)
|
return fmt.Errorf("d.deleteQueueJSON: %w", err)
|
||||||
}
|
}
|
||||||
}
|
return nil
|
||||||
return nil
|
})
|
||||||
})
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -36,7 +36,7 @@ require (
|
||||||
github.com/uber-go/atomic v1.3.0 // indirect
|
github.com/uber-go/atomic v1.3.0 // indirect
|
||||||
github.com/uber/jaeger-client-go v2.15.0+incompatible
|
github.com/uber/jaeger-client-go v2.15.0+incompatible
|
||||||
github.com/uber/jaeger-lib v1.5.0
|
github.com/uber/jaeger-lib v1.5.0
|
||||||
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200708124809-79077e271c6d
|
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200709151813-3c2f73ac5e86
|
||||||
go.uber.org/atomic v1.4.0
|
go.uber.org/atomic v1.4.0
|
||||||
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
|
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
|
||||||
golang.org/x/mobile v0.0.0-20200629153529-33b80540585f // indirect
|
golang.org/x/mobile v0.0.0-20200629153529-33b80540585f // indirect
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -663,6 +663,8 @@ github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200708123331-4e0b0e723459
|
||||||
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200708123331-4e0b0e723459/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE=
|
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200708123331-4e0b0e723459/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE=
|
||||||
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200708124809-79077e271c6d h1:ly327dysc3r7lfG+AKJWPSAQmGf4h++fk+Y2dD8nDV4=
|
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200708124809-79077e271c6d h1:ly327dysc3r7lfG+AKJWPSAQmGf4h++fk+Y2dD8nDV4=
|
||||||
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200708124809-79077e271c6d/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE=
|
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200708124809-79077e271c6d/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE=
|
||||||
|
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200709151813-3c2f73ac5e86 h1:l1zL1Cu/oi8MaBfcKHz4aMdSF5OWOT82SL6y5qP2law=
|
||||||
|
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200709151813-3c2f73ac5e86/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE=
|
||||||
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
|
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
|
||||||
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
|
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
|
||||||
go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA=
|
go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA=
|
||||||
|
|
Loading…
Add table
Reference in a new issue