fix simplify code to start using context (#9350)

This commit is contained in:
Harshavardhana 2020-04-16 10:56:18 -07:00 committed by GitHub
parent 787dbaff36
commit 69fb68ef0b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 141 additions and 152 deletions

View file

@ -982,16 +982,13 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set(xhttp.ContentType, "text/event-stream")
doneCh := make(chan struct{})
defer close(doneCh)
// Trace Publisher and peer-trace-client uses nonblocking send and hence does not wait for slow receivers.
// Use buffered channel to take care of burst sends or slow w.Write()
traceCh := make(chan interface{}, 4000)
peers := getRestClients(globalEndpoints)
globalHTTPTrace.Subscribe(traceCh, doneCh, func(entry interface{}) bool {
globalHTTPTrace.Subscribe(traceCh, ctx.Done(), func(entry interface{}) bool {
return mustTrace(entry, trcAll, trcErr)
})
@ -999,7 +996,7 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
if peer == nil {
continue
}
peer.Trace(traceCh, doneCh, trcAll, trcErr)
peer.Trace(traceCh, ctx.Done(), trcAll, trcErr)
}
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
@ -1018,7 +1015,7 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
return
}
w.(http.Flusher).Flush()
case <-GlobalServiceDoneCh:
case <-ctx.Done():
return
}
}
@ -1052,20 +1049,18 @@ func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Reque
w.Header().Add("Connection", "close")
w.Header().Set(xhttp.ContentType, "text/event-stream")
doneCh := make(chan struct{})
defer close(doneCh)
logCh := make(chan interface{}, 4000)
peers := getRestClients(globalEndpoints)
globalConsoleSys.Subscribe(logCh, doneCh, node, limitLines, logKind, nil)
globalConsoleSys.Subscribe(logCh, ctx.Done(), node, limitLines, logKind, nil)
for _, peer := range peers {
if peer == nil {
continue
}
if node == "" || strings.EqualFold(peer.host.Name, node) {
peer.ConsoleLog(logCh, doneCh)
peer.ConsoleLog(logCh, ctx.Done())
}
}
@ -1089,7 +1084,7 @@ func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Reque
return
}
w.(http.Flusher).Flush()
case <-GlobalServiceDoneCh:
case <-ctx.Done():
return
}
}
@ -1175,65 +1170,39 @@ func (a adminAPIHandlers) OBDInfoHandler(w http.ResponseWriter, r *http.Request)
}
vars := mux.Vars(r)
pulse := make(chan struct{})
obdDone := make(chan struct{})
obdInfo := madmin.OBDInfo{}
obdInfoCh := make(chan madmin.OBDInfo)
enc := json.NewEncoder(w)
doPartialWrite := func() {
logger.LogIf(ctx, enc.Encode(obdInfo))
partialWrite := func(oinfo madmin.OBDInfo) {
obdInfoCh <- oinfo
}
partialWrite := func() {
pulse <- struct{}{}
}
finish := func() {
obdDone <- struct{}{}
}
setCommonHeaders(w)
w.Header().Set(xhttp.ContentType, "text/event-stream")
w.WriteHeader(http.StatusOK)
errResp := func(err error) {
errorResponse := getAPIErrorResponse(ctx, toAdminAPIErr(ctx, err), r.URL.String(),
w.Header().Get(xhttp.AmzRequestID), globalDeploymentID)
encodedErrorResponse := encodeResponse(errorResponse)
obdInfo.Error = string(encodedErrorResponse)
finish()
logger.LogIf(ctx, enc.Encode(obdInfo))
}
deadline := 3600 * time.Second
deadlineStr := r.URL.Query().Get("deadline")
if deadlineStr != "" {
if dstr := r.URL.Query().Get("deadline"); dstr != "" {
var err error
deadline, err = time.ParseDuration(deadlineStr)
deadline, err = time.ParseDuration(dstr)
if err != nil {
errResp(err)
return
}
}
deadlinedCtx, cancel := context.WithDeadline(ctx, time.Now().Add(deadline))
setCommonHeaders(w)
w.Header().Set(xhttp.ContentType, string(mimeJSON))
w.WriteHeader(http.StatusOK)
deadlinedCtx, cancel := context.WithTimeout(ctx, deadline)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
go func() {
loop:
for {
select {
case <-ticker.C:
doPartialWrite()
case <-pulse:
doPartialWrite()
case <-obdDone:
break loop
}
}
w.(http.Flusher).Flush()
cancel()
}()
defer cancel()
nsLock := objectAPI.NewNSLock(deadlinedCtx, minioMetaBucket, "obd-in-progress")
if err := nsLock.GetLock(newDynamicTimeout(deadline, deadline)); err != nil { // returns a locked lock
@ -1242,89 +1211,114 @@ func (a adminAPIHandlers) OBDInfoHandler(w http.ResponseWriter, r *http.Request)
}
defer nsLock.Unlock()
if cpu, ok := vars["syscpu"]; ok && cpu == "true" {
cpuInfo := getLocalCPUOBDInfo(deadlinedCtx)
go func() {
defer close(obdInfoCh)
obdInfo.Sys.CPUInfo = append(obdInfo.Sys.CPUInfo, cpuInfo)
obdInfo.Sys.CPUInfo = append(obdInfo.Sys.CPUInfo, globalNotificationSys.CPUOBDInfo(deadlinedCtx)...)
partialWrite()
}
if cpu, ok := vars["syscpu"]; ok && cpu == "true" {
cpuInfo := getLocalCPUOBDInfo(deadlinedCtx)
if diskHw, ok := vars["sysdiskhw"]; ok && diskHw == "true" {
diskHwInfo := getLocalDiskHwOBD(deadlinedCtx)
obdInfo.Sys.DiskHwInfo = append(obdInfo.Sys.DiskHwInfo, diskHwInfo)
obdInfo.Sys.DiskHwInfo = append(obdInfo.Sys.DiskHwInfo, globalNotificationSys.DiskHwOBDInfo(deadlinedCtx)...)
partialWrite()
}
if osInfo, ok := vars["sysosinfo"]; ok && osInfo == "true" {
osInfo := getLocalOsInfoOBD(deadlinedCtx)
obdInfo.Sys.OsInfo = append(obdInfo.Sys.OsInfo, osInfo)
obdInfo.Sys.OsInfo = append(obdInfo.Sys.OsInfo, globalNotificationSys.OsOBDInfo(deadlinedCtx)...)
partialWrite()
}
if mem, ok := vars["sysmem"]; ok && mem == "true" {
memInfo := getLocalMemOBD(deadlinedCtx)
obdInfo.Sys.MemInfo = append(obdInfo.Sys.MemInfo, memInfo)
obdInfo.Sys.MemInfo = append(obdInfo.Sys.MemInfo, globalNotificationSys.MemOBDInfo(deadlinedCtx)...)
partialWrite()
}
if proc, ok := vars["sysprocess"]; ok && proc == "true" {
procInfo := getLocalProcOBD(deadlinedCtx)
obdInfo.Sys.ProcInfo = append(obdInfo.Sys.ProcInfo, procInfo)
obdInfo.Sys.ProcInfo = append(obdInfo.Sys.ProcInfo, globalNotificationSys.ProcOBDInfo(deadlinedCtx)...)
partialWrite()
}
if config, ok := vars["minioconfig"]; ok && config == "true" {
cfg, err := readServerConfig(ctx, objectAPI)
logger.LogIf(ctx, err)
obdInfo.Minio.Config = cfg
partialWrite()
}
if drive, ok := vars["perfdrive"]; ok && drive == "true" {
// Get drive obd details from local server's drive(s)
driveOBDSerial := getLocalDrivesOBD(deadlinedCtx, false, globalEndpoints, r)
driveOBDParallel := getLocalDrivesOBD(deadlinedCtx, true, globalEndpoints, r)
errStr := ""
if driveOBDSerial.Error != "" {
errStr = "serial: " + driveOBDSerial.Error
}
if driveOBDParallel.Error != "" {
errStr = errStr + " parallel: " + driveOBDParallel.Error
obdInfo.Sys.CPUInfo = append(obdInfo.Sys.CPUInfo, cpuInfo)
obdInfo.Sys.CPUInfo = append(obdInfo.Sys.CPUInfo, globalNotificationSys.CPUOBDInfo(deadlinedCtx)...)
partialWrite(obdInfo)
}
driveOBD := madmin.ServerDrivesOBDInfo{
Addr: driveOBDSerial.Addr,
Serial: driveOBDSerial.Serial,
Parallel: driveOBDParallel.Parallel,
Error: errStr,
if diskHw, ok := vars["sysdiskhw"]; ok && diskHw == "true" {
diskHwInfo := getLocalDiskHwOBD(deadlinedCtx)
obdInfo.Sys.DiskHwInfo = append(obdInfo.Sys.DiskHwInfo, diskHwInfo)
obdInfo.Sys.DiskHwInfo = append(obdInfo.Sys.DiskHwInfo, globalNotificationSys.DiskHwOBDInfo(deadlinedCtx)...)
partialWrite(obdInfo)
}
obdInfo.Perf.DriveInfo = append(obdInfo.Perf.DriveInfo, driveOBD)
// Notify all other MinIO peers to report drive obd numbers
driveOBDs := globalNotificationSys.DriveOBDInfo(deadlinedCtx)
obdInfo.Perf.DriveInfo = append(obdInfo.Perf.DriveInfo, driveOBDs...)
if osInfo, ok := vars["sysosinfo"]; ok && osInfo == "true" {
osInfo := getLocalOsInfoOBD(deadlinedCtx)
partialWrite()
obdInfo.Sys.OsInfo = append(obdInfo.Sys.OsInfo, osInfo)
obdInfo.Sys.OsInfo = append(obdInfo.Sys.OsInfo, globalNotificationSys.OsOBDInfo(deadlinedCtx)...)
partialWrite(obdInfo)
}
if mem, ok := vars["sysmem"]; ok && mem == "true" {
memInfo := getLocalMemOBD(deadlinedCtx)
obdInfo.Sys.MemInfo = append(obdInfo.Sys.MemInfo, memInfo)
obdInfo.Sys.MemInfo = append(obdInfo.Sys.MemInfo, globalNotificationSys.MemOBDInfo(deadlinedCtx)...)
partialWrite(obdInfo)
}
if proc, ok := vars["sysprocess"]; ok && proc == "true" {
procInfo := getLocalProcOBD(deadlinedCtx)
obdInfo.Sys.ProcInfo = append(obdInfo.Sys.ProcInfo, procInfo)
obdInfo.Sys.ProcInfo = append(obdInfo.Sys.ProcInfo, globalNotificationSys.ProcOBDInfo(deadlinedCtx)...)
partialWrite(obdInfo)
}
if config, ok := vars["minioconfig"]; ok && config == "true" {
cfg, err := readServerConfig(ctx, objectAPI)
logger.LogIf(ctx, err)
obdInfo.Minio.Config = cfg
partialWrite(obdInfo)
}
if drive, ok := vars["perfdrive"]; ok && drive == "true" {
// Get drive obd details from local server's drive(s)
driveOBDSerial := getLocalDrivesOBD(deadlinedCtx, false, globalEndpoints, r)
driveOBDParallel := getLocalDrivesOBD(deadlinedCtx, true, globalEndpoints, r)
errStr := ""
if driveOBDSerial.Error != "" {
errStr = "serial: " + driveOBDSerial.Error
}
if driveOBDParallel.Error != "" {
errStr = errStr + " parallel: " + driveOBDParallel.Error
}
driveOBD := madmin.ServerDrivesOBDInfo{
Addr: driveOBDSerial.Addr,
Serial: driveOBDSerial.Serial,
Parallel: driveOBDParallel.Parallel,
Error: errStr,
}
obdInfo.Perf.DriveInfo = append(obdInfo.Perf.DriveInfo, driveOBD)
// Notify all other MinIO peers to report drive obd numbers
driveOBDs := globalNotificationSys.DriveOBDInfo(deadlinedCtx)
obdInfo.Perf.DriveInfo = append(obdInfo.Perf.DriveInfo, driveOBDs...)
partialWrite(obdInfo)
}
if net, ok := vars["perfnet"]; ok && net == "true" && globalIsDistXL {
obdInfo.Perf.Net = append(obdInfo.Perf.Net, globalNotificationSys.NetOBDInfo(deadlinedCtx))
obdInfo.Perf.Net = append(obdInfo.Perf.Net, globalNotificationSys.DispatchNetOBDInfo(deadlinedCtx)...)
obdInfo.Perf.NetParallel = globalNotificationSys.NetOBDParallelInfo(deadlinedCtx)
partialWrite(obdInfo)
}
}()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case oinfo, ok := <-obdInfoCh:
if !ok {
return
}
logger.LogIf(ctx, enc.Encode(oinfo))
w.(http.Flusher).Flush()
case <-ticker.C:
if _, err := w.Write([]byte(" ")); err != nil {
return
}
w.(http.Flusher).Flush()
case <-deadlinedCtx.Done():
w.(http.Flusher).Flush()
return
}
}
if net, ok := vars["perfnet"]; ok && net == "true" && globalIsDistXL {
obdInfo.Perf.Net = append(obdInfo.Perf.Net, globalNotificationSys.NetOBDInfo(deadlinedCtx))
obdInfo.Perf.Net = append(obdInfo.Perf.Net, globalNotificationSys.DispatchNetOBDInfo(deadlinedCtx)...)
obdInfo.Perf.NetParallel = globalNotificationSys.NetOBDParallelInfo(deadlinedCtx)
partialWrite()
}
finish()
}
// ServerInfoHandler - GET /minio/admin/v3/info
@ -1488,7 +1482,7 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
func fetchLambdaInfo(cfg config.Config) []map[string][]madmin.TargetIDStatus {
// Fetch the configured targets
targetList, err := notify.FetchRegisteredTargets(cfg, GlobalServiceDoneCh, NewGatewayHTTPTransport(), true, false)
targetList, err := notify.FetchRegisteredTargets(cfg, GlobalContext.Done(), NewGatewayHTTPTransport(), true, false)
if err != nil && err != notify.ErrTargetsOffline {
logger.LogIf(GlobalContext, err)
return nil

View file

@ -282,16 +282,13 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
w.Header().Set(xhttp.ContentType, "text/event-stream")
doneCh := make(chan struct{})
defer close(doneCh)
// Listen Publisher and peer-listen-client uses nonblocking send and hence does not wait for slow receivers.
// Use buffered channel to take care of burst sends or slow w.Write()
listenCh := make(chan interface{}, 4000)
peers := getRestClients(globalEndpoints)
globalHTTPListen.Subscribe(listenCh, doneCh, func(evI interface{}) bool {
globalHTTPListen.Subscribe(listenCh, ctx.Done(), func(evI interface{}) bool {
ev, ok := evI.(event.Event)
if !ok {
return false
@ -310,7 +307,7 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
if peer == nil {
continue
}
peer.Listen(listenCh, doneCh, values)
peer.Listen(listenCh, ctx.Done(), values)
}
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
@ -336,7 +333,7 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
return
}
w.(http.Flusher).Flush()
case <-GlobalServiceDoneCh:
case <-ctx.Done():
return
}
}

View file

@ -307,7 +307,7 @@ func validateConfig(s config.Config) error {
return err
}
return notify.TestNotificationTargets(s, GlobalServiceDoneCh, NewGatewayHTTPTransport(),
return notify.TestNotificationTargets(s, GlobalContext.Done(), NewGatewayHTTPTransport(),
globalNotificationSys.ConfiguredTargetIDs())
}
@ -469,12 +469,12 @@ func lookupConfigs(s config.Config) {
}
}
globalConfigTargetList, err = notify.GetNotificationTargets(s, GlobalServiceDoneCh, NewGatewayHTTPTransport())
globalConfigTargetList, err = notify.GetNotificationTargets(s, GlobalContext.Done(), NewGatewayHTTPTransport())
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err))
}
globalEnvTargetList, err = notify.GetNotificationTargets(newServerConfig(), GlobalServiceDoneCh, NewGatewayHTTPTransport())
globalEnvTargetList, err = notify.GetNotificationTargets(newServerConfig(), GlobalContext.Done(), NewGatewayHTTPTransport())
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err))
}

View file

@ -186,16 +186,14 @@ func (sys *ConfigSys) Load(objAPI ObjectLayer) error {
}
// WatchConfigNASDisk - watches nas disk on periodic basis.
func (sys *ConfigSys) WatchConfigNASDisk(objAPI ObjectLayer) {
func (sys *ConfigSys) WatchConfigNASDisk(ctx context.Context, objAPI ObjectLayer) {
configInterval := globalRefreshIAMInterval
watchDisk := func() {
ticker := time.NewTicker(configInterval)
defer ticker.Stop()
for {
select {
case <-GlobalServiceDoneCh:
case <-ctx.Done():
return
case <-ticker.C:
case <-time.After(configInterval):
loadConfig(objAPI)
}
}

View file

@ -74,7 +74,7 @@ func (sys *HTTPConsoleLoggerSys) HasLogListeners() bool {
}
// Subscribe starts console logging for this node.
func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh chan struct{}, node string, last int, logKind string, filter func(entry interface{}) bool) {
func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, node string, last int, logKind string, filter func(entry interface{}) bool) {
// Enable console logging for remote client.
if !sys.HasLogListeners() {
logger.AddTarget(sys)

View file

@ -757,13 +757,13 @@ func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u
// Removes multipart uploads if any older than `expiry` duration
// on all buckets for every `cleanupInterval`, this function is
// blocking and should be run in a go-routine.
func (fs *FSObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration, doneCh <-chan struct{}) {
func (fs *FSObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration) {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for {
select {
case <-doneCh:
case <-ctx.Done():
return
case <-ticker.C:
now := time.Now()

View file

@ -51,7 +51,7 @@ func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) {
cleanupWg.Add(1)
go func() {
defer cleanupWg.Done()
fs.cleanupStaleMultipartUploads(GlobalContext, time.Millisecond, 0, ctx.Done())
fs.cleanupStaleMultipartUploads(ctx, time.Millisecond, 0)
}()
// Wait for 100ms such that - we have given enough time for

View file

@ -179,7 +179,7 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) {
// or cause changes on backend format.
fs.fsFormatRlk = rlk
go fs.cleanupStaleMultipartUploads(ctx, GlobalMultipartCleanupInterval, GlobalMultipartExpiry, GlobalServiceDoneCh)
go fs.cleanupStaleMultipartUploads(ctx, GlobalMultipartCleanupInterval, GlobalMultipartExpiry)
// Return successfully initialized object layer.
return fs, nil

View file

@ -244,7 +244,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
logger.FatalIf(globalNotificationSys.Init(buckets, newObject), "Unable to initialize notification system")
// Start watching disk for reloading config, this
// is only enabled for "NAS" gateway.
globalConfigSys.WatchConfigNASDisk(newObject)
globalConfigSys.WatchConfigNASDisk(GlobalContext, newObject)
}
// This is only to uniquely identify each gateway deployments.
globalDeploymentID = env.Get("MINIO_GATEWAY_DEPLOYMENT_ID", mustGetUUID())

View file

@ -679,13 +679,13 @@ func getGWContentPath(object string) string {
}
// Clean-up the stale incomplete encrypted multipart uploads. Should be run in a Go routine.
func (l *s3EncObjects) cleanupStaleEncMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration, doneCh <-chan struct{}) {
func (l *s3EncObjects) cleanupStaleEncMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration) {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for {
select {
case <-doneCh:
case <-ctx.Done():
return
case <-ticker.C:
l.cleanupStaleEncMultipartUploadsOnGW(ctx, expiry)

View file

@ -242,7 +242,7 @@ func (g *S3) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error)
// Start stale enc multipart uploads cleanup routine.
go encS.cleanupStaleEncMultipartUploads(minio.GlobalContext,
minio.GlobalMultipartCleanupInterval, minio.GlobalMultipartExpiry, minio.GlobalServiceDoneCh)
minio.GlobalMultipartCleanupInterval, minio.GlobalMultipartExpiry)
return &encS, nil
}

View file

@ -834,7 +834,7 @@ func (client *peerRESTClient) BackgroundHealStatus() (madmin.BgHealState, error)
return state, err
}
func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh chan struct{}, trcAll, trcErr bool) {
func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh <-chan struct{}, trcAll, trcErr bool) {
values := make(url.Values)
values.Set(peerRESTTraceAll, strconv.FormatBool(trcAll))
values.Set(peerRESTTraceErr, strconv.FormatBool(trcErr))
@ -876,7 +876,7 @@ func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh chan stru
}
}
func (client *peerRESTClient) doListen(listenCh chan interface{}, doneCh chan struct{}, v url.Values) {
func (client *peerRESTClient) doListen(listenCh chan interface{}, doneCh <-chan struct{}, v url.Values) {
// To cancel the REST request in case doneCh gets closed.
ctx, cancel := context.WithCancel(GlobalContext)
@ -915,7 +915,7 @@ func (client *peerRESTClient) doListen(listenCh chan interface{}, doneCh chan st
}
// Listen - listen on peers.
func (client *peerRESTClient) Listen(listenCh chan interface{}, doneCh chan struct{}, v url.Values) {
func (client *peerRESTClient) Listen(listenCh chan interface{}, doneCh <-chan struct{}, v url.Values) {
go func() {
for {
client.doListen(listenCh, doneCh, v)
@ -931,7 +931,7 @@ func (client *peerRESTClient) Listen(listenCh chan interface{}, doneCh chan stru
}
// Trace - send http trace request to peer nodes
func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh chan struct{}, trcAll, trcErr bool) {
func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh <-chan struct{}, trcAll, trcErr bool) {
go func() {
for {
client.doTrace(traceCh, doneCh, trcAll, trcErr)
@ -947,7 +947,7 @@ func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh chan struct
}
// ConsoleLog - sends request to peer nodes to get console logs
func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh chan struct{}) {
func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh <-chan struct{}) {
go func() {
for {
// get cancellation context to properly unsubscribe peers

View file

@ -50,7 +50,7 @@ func (ps *PubSub) Publish(item interface{}) {
}
// Subscribe - Adds a subscriber to pubsub system
func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh chan struct{}, filter func(entry interface{}) bool) {
func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filter func(entry interface{}) bool) {
ps.Lock()
defer ps.Unlock()