diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index a297e1aff..4e0ca2105 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -982,16 +982,13 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set(xhttp.ContentType, "text/event-stream") - doneCh := make(chan struct{}) - defer close(doneCh) - // Trace Publisher and peer-trace-client uses nonblocking send and hence does not wait for slow receivers. // Use buffered channel to take care of burst sends or slow w.Write() traceCh := make(chan interface{}, 4000) peers := getRestClients(globalEndpoints) - globalHTTPTrace.Subscribe(traceCh, doneCh, func(entry interface{}) bool { + globalHTTPTrace.Subscribe(traceCh, ctx.Done(), func(entry interface{}) bool { return mustTrace(entry, trcAll, trcErr) }) @@ -999,7 +996,7 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) { if peer == nil { continue } - peer.Trace(traceCh, doneCh, trcAll, trcErr) + peer.Trace(traceCh, ctx.Done(), trcAll, trcErr) } keepAliveTicker := time.NewTicker(500 * time.Millisecond) @@ -1018,7 +1015,7 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) { return } w.(http.Flusher).Flush() - case <-GlobalServiceDoneCh: + case <-ctx.Done(): return } } @@ -1052,20 +1049,18 @@ func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Reque w.Header().Add("Connection", "close") w.Header().Set(xhttp.ContentType, "text/event-stream") - doneCh := make(chan struct{}) - defer close(doneCh) logCh := make(chan interface{}, 4000) peers := getRestClients(globalEndpoints) - globalConsoleSys.Subscribe(logCh, doneCh, node, limitLines, logKind, nil) + globalConsoleSys.Subscribe(logCh, ctx.Done(), node, limitLines, logKind, nil) for _, peer := range peers { if peer == nil { continue } if node == "" || strings.EqualFold(peer.host.Name, node) { - peer.ConsoleLog(logCh, doneCh) + peer.ConsoleLog(logCh, ctx.Done()) } } @@ -1089,7 +1084,7 @@ func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Reque return } w.(http.Flusher).Flush() - case <-GlobalServiceDoneCh: + case <-ctx.Done(): return } } @@ -1175,65 +1170,39 @@ func (a adminAPIHandlers) OBDInfoHandler(w http.ResponseWriter, r *http.Request) } vars := mux.Vars(r) - pulse := make(chan struct{}) - obdDone := make(chan struct{}) obdInfo := madmin.OBDInfo{} + obdInfoCh := make(chan madmin.OBDInfo) enc := json.NewEncoder(w) - doPartialWrite := func() { - logger.LogIf(ctx, enc.Encode(obdInfo)) + partialWrite := func(oinfo madmin.OBDInfo) { + obdInfoCh <- oinfo } - partialWrite := func() { - pulse <- struct{}{} - } - - finish := func() { - obdDone <- struct{}{} - } + setCommonHeaders(w) + w.Header().Set(xhttp.ContentType, "text/event-stream") + w.WriteHeader(http.StatusOK) errResp := func(err error) { errorResponse := getAPIErrorResponse(ctx, toAdminAPIErr(ctx, err), r.URL.String(), w.Header().Get(xhttp.AmzRequestID), globalDeploymentID) encodedErrorResponse := encodeResponse(errorResponse) obdInfo.Error = string(encodedErrorResponse) - finish() + logger.LogIf(ctx, enc.Encode(obdInfo)) } deadline := 3600 * time.Second - deadlineStr := r.URL.Query().Get("deadline") - if deadlineStr != "" { + if dstr := r.URL.Query().Get("deadline"); dstr != "" { var err error - deadline, err = time.ParseDuration(deadlineStr) + deadline, err = time.ParseDuration(dstr) if err != nil { errResp(err) return } } - deadlinedCtx, cancel := context.WithDeadline(ctx, time.Now().Add(deadline)) - setCommonHeaders(w) - w.Header().Set(xhttp.ContentType, string(mimeJSON)) - w.WriteHeader(http.StatusOK) + deadlinedCtx, cancel := context.WithTimeout(ctx, deadline) - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - go func() { - loop: - for { - select { - case <-ticker.C: - doPartialWrite() - case <-pulse: - doPartialWrite() - case <-obdDone: - break loop - } - } - w.(http.Flusher).Flush() - cancel() - }() + defer cancel() nsLock := objectAPI.NewNSLock(deadlinedCtx, minioMetaBucket, "obd-in-progress") if err := nsLock.GetLock(newDynamicTimeout(deadline, deadline)); err != nil { // returns a locked lock @@ -1242,89 +1211,114 @@ func (a adminAPIHandlers) OBDInfoHandler(w http.ResponseWriter, r *http.Request) } defer nsLock.Unlock() - if cpu, ok := vars["syscpu"]; ok && cpu == "true" { - cpuInfo := getLocalCPUOBDInfo(deadlinedCtx) + go func() { + defer close(obdInfoCh) - obdInfo.Sys.CPUInfo = append(obdInfo.Sys.CPUInfo, cpuInfo) - obdInfo.Sys.CPUInfo = append(obdInfo.Sys.CPUInfo, globalNotificationSys.CPUOBDInfo(deadlinedCtx)...) - partialWrite() - } + if cpu, ok := vars["syscpu"]; ok && cpu == "true" { + cpuInfo := getLocalCPUOBDInfo(deadlinedCtx) - if diskHw, ok := vars["sysdiskhw"]; ok && diskHw == "true" { - diskHwInfo := getLocalDiskHwOBD(deadlinedCtx) - - obdInfo.Sys.DiskHwInfo = append(obdInfo.Sys.DiskHwInfo, diskHwInfo) - obdInfo.Sys.DiskHwInfo = append(obdInfo.Sys.DiskHwInfo, globalNotificationSys.DiskHwOBDInfo(deadlinedCtx)...) - partialWrite() - } - - if osInfo, ok := vars["sysosinfo"]; ok && osInfo == "true" { - osInfo := getLocalOsInfoOBD(deadlinedCtx) - - obdInfo.Sys.OsInfo = append(obdInfo.Sys.OsInfo, osInfo) - obdInfo.Sys.OsInfo = append(obdInfo.Sys.OsInfo, globalNotificationSys.OsOBDInfo(deadlinedCtx)...) - partialWrite() - } - - if mem, ok := vars["sysmem"]; ok && mem == "true" { - memInfo := getLocalMemOBD(deadlinedCtx) - - obdInfo.Sys.MemInfo = append(obdInfo.Sys.MemInfo, memInfo) - obdInfo.Sys.MemInfo = append(obdInfo.Sys.MemInfo, globalNotificationSys.MemOBDInfo(deadlinedCtx)...) - partialWrite() - } - - if proc, ok := vars["sysprocess"]; ok && proc == "true" { - procInfo := getLocalProcOBD(deadlinedCtx) - - obdInfo.Sys.ProcInfo = append(obdInfo.Sys.ProcInfo, procInfo) - obdInfo.Sys.ProcInfo = append(obdInfo.Sys.ProcInfo, globalNotificationSys.ProcOBDInfo(deadlinedCtx)...) - partialWrite() - } - - if config, ok := vars["minioconfig"]; ok && config == "true" { - cfg, err := readServerConfig(ctx, objectAPI) - logger.LogIf(ctx, err) - obdInfo.Minio.Config = cfg - partialWrite() - } - - if drive, ok := vars["perfdrive"]; ok && drive == "true" { - // Get drive obd details from local server's drive(s) - driveOBDSerial := getLocalDrivesOBD(deadlinedCtx, false, globalEndpoints, r) - driveOBDParallel := getLocalDrivesOBD(deadlinedCtx, true, globalEndpoints, r) - - errStr := "" - if driveOBDSerial.Error != "" { - errStr = "serial: " + driveOBDSerial.Error - } - if driveOBDParallel.Error != "" { - errStr = errStr + " parallel: " + driveOBDParallel.Error + obdInfo.Sys.CPUInfo = append(obdInfo.Sys.CPUInfo, cpuInfo) + obdInfo.Sys.CPUInfo = append(obdInfo.Sys.CPUInfo, globalNotificationSys.CPUOBDInfo(deadlinedCtx)...) + partialWrite(obdInfo) } - driveOBD := madmin.ServerDrivesOBDInfo{ - Addr: driveOBDSerial.Addr, - Serial: driveOBDSerial.Serial, - Parallel: driveOBDParallel.Parallel, - Error: errStr, + if diskHw, ok := vars["sysdiskhw"]; ok && diskHw == "true" { + diskHwInfo := getLocalDiskHwOBD(deadlinedCtx) + + obdInfo.Sys.DiskHwInfo = append(obdInfo.Sys.DiskHwInfo, diskHwInfo) + obdInfo.Sys.DiskHwInfo = append(obdInfo.Sys.DiskHwInfo, globalNotificationSys.DiskHwOBDInfo(deadlinedCtx)...) + partialWrite(obdInfo) } - obdInfo.Perf.DriveInfo = append(obdInfo.Perf.DriveInfo, driveOBD) - // Notify all other MinIO peers to report drive obd numbers - driveOBDs := globalNotificationSys.DriveOBDInfo(deadlinedCtx) - obdInfo.Perf.DriveInfo = append(obdInfo.Perf.DriveInfo, driveOBDs...) + if osInfo, ok := vars["sysosinfo"]; ok && osInfo == "true" { + osInfo := getLocalOsInfoOBD(deadlinedCtx) - partialWrite() + obdInfo.Sys.OsInfo = append(obdInfo.Sys.OsInfo, osInfo) + obdInfo.Sys.OsInfo = append(obdInfo.Sys.OsInfo, globalNotificationSys.OsOBDInfo(deadlinedCtx)...) + partialWrite(obdInfo) + } + + if mem, ok := vars["sysmem"]; ok && mem == "true" { + memInfo := getLocalMemOBD(deadlinedCtx) + + obdInfo.Sys.MemInfo = append(obdInfo.Sys.MemInfo, memInfo) + obdInfo.Sys.MemInfo = append(obdInfo.Sys.MemInfo, globalNotificationSys.MemOBDInfo(deadlinedCtx)...) + partialWrite(obdInfo) + } + + if proc, ok := vars["sysprocess"]; ok && proc == "true" { + procInfo := getLocalProcOBD(deadlinedCtx) + + obdInfo.Sys.ProcInfo = append(obdInfo.Sys.ProcInfo, procInfo) + obdInfo.Sys.ProcInfo = append(obdInfo.Sys.ProcInfo, globalNotificationSys.ProcOBDInfo(deadlinedCtx)...) + partialWrite(obdInfo) + } + + if config, ok := vars["minioconfig"]; ok && config == "true" { + cfg, err := readServerConfig(ctx, objectAPI) + logger.LogIf(ctx, err) + obdInfo.Minio.Config = cfg + partialWrite(obdInfo) + } + + if drive, ok := vars["perfdrive"]; ok && drive == "true" { + // Get drive obd details from local server's drive(s) + driveOBDSerial := getLocalDrivesOBD(deadlinedCtx, false, globalEndpoints, r) + driveOBDParallel := getLocalDrivesOBD(deadlinedCtx, true, globalEndpoints, r) + + errStr := "" + if driveOBDSerial.Error != "" { + errStr = "serial: " + driveOBDSerial.Error + } + if driveOBDParallel.Error != "" { + errStr = errStr + " parallel: " + driveOBDParallel.Error + } + + driveOBD := madmin.ServerDrivesOBDInfo{ + Addr: driveOBDSerial.Addr, + Serial: driveOBDSerial.Serial, + Parallel: driveOBDParallel.Parallel, + Error: errStr, + } + obdInfo.Perf.DriveInfo = append(obdInfo.Perf.DriveInfo, driveOBD) + + // Notify all other MinIO peers to report drive obd numbers + driveOBDs := globalNotificationSys.DriveOBDInfo(deadlinedCtx) + obdInfo.Perf.DriveInfo = append(obdInfo.Perf.DriveInfo, driveOBDs...) + + partialWrite(obdInfo) + } + + if net, ok := vars["perfnet"]; ok && net == "true" && globalIsDistXL { + obdInfo.Perf.Net = append(obdInfo.Perf.Net, globalNotificationSys.NetOBDInfo(deadlinedCtx)) + obdInfo.Perf.Net = append(obdInfo.Perf.Net, globalNotificationSys.DispatchNetOBDInfo(deadlinedCtx)...) + obdInfo.Perf.NetParallel = globalNotificationSys.NetOBDParallelInfo(deadlinedCtx) + partialWrite(obdInfo) + } + }() + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case oinfo, ok := <-obdInfoCh: + if !ok { + return + } + logger.LogIf(ctx, enc.Encode(oinfo)) + w.(http.Flusher).Flush() + case <-ticker.C: + if _, err := w.Write([]byte(" ")); err != nil { + return + } + w.(http.Flusher).Flush() + case <-deadlinedCtx.Done(): + w.(http.Flusher).Flush() + return + } } - if net, ok := vars["perfnet"]; ok && net == "true" && globalIsDistXL { - obdInfo.Perf.Net = append(obdInfo.Perf.Net, globalNotificationSys.NetOBDInfo(deadlinedCtx)) - obdInfo.Perf.Net = append(obdInfo.Perf.Net, globalNotificationSys.DispatchNetOBDInfo(deadlinedCtx)...) - obdInfo.Perf.NetParallel = globalNotificationSys.NetOBDParallelInfo(deadlinedCtx) - partialWrite() - } - - finish() } // ServerInfoHandler - GET /minio/admin/v3/info @@ -1488,7 +1482,7 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque func fetchLambdaInfo(cfg config.Config) []map[string][]madmin.TargetIDStatus { // Fetch the configured targets - targetList, err := notify.FetchRegisteredTargets(cfg, GlobalServiceDoneCh, NewGatewayHTTPTransport(), true, false) + targetList, err := notify.FetchRegisteredTargets(cfg, GlobalContext.Done(), NewGatewayHTTPTransport(), true, false) if err != nil && err != notify.ErrTargetsOffline { logger.LogIf(GlobalContext, err) return nil diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index b15dcac94..1fc665832 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -282,16 +282,13 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit w.Header().Set(xhttp.ContentType, "text/event-stream") - doneCh := make(chan struct{}) - defer close(doneCh) - // Listen Publisher and peer-listen-client uses nonblocking send and hence does not wait for slow receivers. // Use buffered channel to take care of burst sends or slow w.Write() listenCh := make(chan interface{}, 4000) peers := getRestClients(globalEndpoints) - globalHTTPListen.Subscribe(listenCh, doneCh, func(evI interface{}) bool { + globalHTTPListen.Subscribe(listenCh, ctx.Done(), func(evI interface{}) bool { ev, ok := evI.(event.Event) if !ok { return false @@ -310,7 +307,7 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit if peer == nil { continue } - peer.Listen(listenCh, doneCh, values) + peer.Listen(listenCh, ctx.Done(), values) } keepAliveTicker := time.NewTicker(500 * time.Millisecond) @@ -336,7 +333,7 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit return } w.(http.Flusher).Flush() - case <-GlobalServiceDoneCh: + case <-ctx.Done(): return } } diff --git a/cmd/config-current.go b/cmd/config-current.go index 6d46de034..1497ece11 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -307,7 +307,7 @@ func validateConfig(s config.Config) error { return err } - return notify.TestNotificationTargets(s, GlobalServiceDoneCh, NewGatewayHTTPTransport(), + return notify.TestNotificationTargets(s, GlobalContext.Done(), NewGatewayHTTPTransport(), globalNotificationSys.ConfiguredTargetIDs()) } @@ -469,12 +469,12 @@ func lookupConfigs(s config.Config) { } } - globalConfigTargetList, err = notify.GetNotificationTargets(s, GlobalServiceDoneCh, NewGatewayHTTPTransport()) + globalConfigTargetList, err = notify.GetNotificationTargets(s, GlobalContext.Done(), NewGatewayHTTPTransport()) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err)) } - globalEnvTargetList, err = notify.GetNotificationTargets(newServerConfig(), GlobalServiceDoneCh, NewGatewayHTTPTransport()) + globalEnvTargetList, err = notify.GetNotificationTargets(newServerConfig(), GlobalContext.Done(), NewGatewayHTTPTransport()) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err)) } diff --git a/cmd/config.go b/cmd/config.go index cce69e154..895ecaffb 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -186,16 +186,14 @@ func (sys *ConfigSys) Load(objAPI ObjectLayer) error { } // WatchConfigNASDisk - watches nas disk on periodic basis. -func (sys *ConfigSys) WatchConfigNASDisk(objAPI ObjectLayer) { +func (sys *ConfigSys) WatchConfigNASDisk(ctx context.Context, objAPI ObjectLayer) { configInterval := globalRefreshIAMInterval watchDisk := func() { - ticker := time.NewTicker(configInterval) - defer ticker.Stop() for { select { - case <-GlobalServiceDoneCh: + case <-ctx.Done(): return - case <-ticker.C: + case <-time.After(configInterval): loadConfig(objAPI) } } diff --git a/cmd/consolelogger.go b/cmd/consolelogger.go index b6ebda107..103a892d1 100644 --- a/cmd/consolelogger.go +++ b/cmd/consolelogger.go @@ -74,7 +74,7 @@ func (sys *HTTPConsoleLoggerSys) HasLogListeners() bool { } // Subscribe starts console logging for this node. -func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh chan struct{}, node string, last int, logKind string, filter func(entry interface{}) bool) { +func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, node string, last int, logKind string, filter func(entry interface{}) bool) { // Enable console logging for remote client. if !sys.HasLogListeners() { logger.AddTarget(sys) diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 0e96fa422..6b15424e9 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -757,13 +757,13 @@ func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u // Removes multipart uploads if any older than `expiry` duration // on all buckets for every `cleanupInterval`, this function is // blocking and should be run in a go-routine. -func (fs *FSObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration, doneCh <-chan struct{}) { +func (fs *FSObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration) { ticker := time.NewTicker(cleanupInterval) defer ticker.Stop() for { select { - case <-doneCh: + case <-ctx.Done(): return case <-ticker.C: now := time.Now() diff --git a/cmd/fs-v1-multipart_test.go b/cmd/fs-v1-multipart_test.go index 9ac60d14b..35a11021c 100644 --- a/cmd/fs-v1-multipart_test.go +++ b/cmd/fs-v1-multipart_test.go @@ -51,7 +51,7 @@ func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) { cleanupWg.Add(1) go func() { defer cleanupWg.Done() - fs.cleanupStaleMultipartUploads(GlobalContext, time.Millisecond, 0, ctx.Done()) + fs.cleanupStaleMultipartUploads(ctx, time.Millisecond, 0) }() // Wait for 100ms such that - we have given enough time for diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 58f9f3e92..546fe8b8d 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -179,7 +179,7 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) { // or cause changes on backend format. fs.fsFormatRlk = rlk - go fs.cleanupStaleMultipartUploads(ctx, GlobalMultipartCleanupInterval, GlobalMultipartExpiry, GlobalServiceDoneCh) + go fs.cleanupStaleMultipartUploads(ctx, GlobalMultipartCleanupInterval, GlobalMultipartExpiry) // Return successfully initialized object layer. return fs, nil diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index b5a299079..eef19f7e0 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -244,7 +244,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) { logger.FatalIf(globalNotificationSys.Init(buckets, newObject), "Unable to initialize notification system") // Start watching disk for reloading config, this // is only enabled for "NAS" gateway. - globalConfigSys.WatchConfigNASDisk(newObject) + globalConfigSys.WatchConfigNASDisk(GlobalContext, newObject) } // This is only to uniquely identify each gateway deployments. globalDeploymentID = env.Get("MINIO_GATEWAY_DEPLOYMENT_ID", mustGetUUID()) diff --git a/cmd/gateway/s3/gateway-s3-sse.go b/cmd/gateway/s3/gateway-s3-sse.go index dbafd8331..5e8d8047f 100644 --- a/cmd/gateway/s3/gateway-s3-sse.go +++ b/cmd/gateway/s3/gateway-s3-sse.go @@ -679,13 +679,13 @@ func getGWContentPath(object string) string { } // Clean-up the stale incomplete encrypted multipart uploads. Should be run in a Go routine. -func (l *s3EncObjects) cleanupStaleEncMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration, doneCh <-chan struct{}) { +func (l *s3EncObjects) cleanupStaleEncMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration) { ticker := time.NewTicker(cleanupInterval) defer ticker.Stop() for { select { - case <-doneCh: + case <-ctx.Done(): return case <-ticker.C: l.cleanupStaleEncMultipartUploadsOnGW(ctx, expiry) diff --git a/cmd/gateway/s3/gateway-s3.go b/cmd/gateway/s3/gateway-s3.go index 54d6ea386..d7c3fea0c 100644 --- a/cmd/gateway/s3/gateway-s3.go +++ b/cmd/gateway/s3/gateway-s3.go @@ -242,7 +242,7 @@ func (g *S3) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) // Start stale enc multipart uploads cleanup routine. go encS.cleanupStaleEncMultipartUploads(minio.GlobalContext, - minio.GlobalMultipartCleanupInterval, minio.GlobalMultipartExpiry, minio.GlobalServiceDoneCh) + minio.GlobalMultipartCleanupInterval, minio.GlobalMultipartExpiry) return &encS, nil } diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 33cf7607d..b0a04075b 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -834,7 +834,7 @@ func (client *peerRESTClient) BackgroundHealStatus() (madmin.BgHealState, error) return state, err } -func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh chan struct{}, trcAll, trcErr bool) { +func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh <-chan struct{}, trcAll, trcErr bool) { values := make(url.Values) values.Set(peerRESTTraceAll, strconv.FormatBool(trcAll)) values.Set(peerRESTTraceErr, strconv.FormatBool(trcErr)) @@ -876,7 +876,7 @@ func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh chan stru } } -func (client *peerRESTClient) doListen(listenCh chan interface{}, doneCh chan struct{}, v url.Values) { +func (client *peerRESTClient) doListen(listenCh chan interface{}, doneCh <-chan struct{}, v url.Values) { // To cancel the REST request in case doneCh gets closed. ctx, cancel := context.WithCancel(GlobalContext) @@ -915,7 +915,7 @@ func (client *peerRESTClient) doListen(listenCh chan interface{}, doneCh chan st } // Listen - listen on peers. -func (client *peerRESTClient) Listen(listenCh chan interface{}, doneCh chan struct{}, v url.Values) { +func (client *peerRESTClient) Listen(listenCh chan interface{}, doneCh <-chan struct{}, v url.Values) { go func() { for { client.doListen(listenCh, doneCh, v) @@ -931,7 +931,7 @@ func (client *peerRESTClient) Listen(listenCh chan interface{}, doneCh chan stru } // Trace - send http trace request to peer nodes -func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh chan struct{}, trcAll, trcErr bool) { +func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh <-chan struct{}, trcAll, trcErr bool) { go func() { for { client.doTrace(traceCh, doneCh, trcAll, trcErr) @@ -947,7 +947,7 @@ func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh chan struct } // ConsoleLog - sends request to peer nodes to get console logs -func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh chan struct{}) { +func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh <-chan struct{}) { go func() { for { // get cancellation context to properly unsubscribe peers diff --git a/pkg/pubsub/pubsub.go b/pkg/pubsub/pubsub.go index 795ff99fd..2e0820782 100644 --- a/pkg/pubsub/pubsub.go +++ b/pkg/pubsub/pubsub.go @@ -50,7 +50,7 @@ func (ps *PubSub) Publish(item interface{}) { } // Subscribe - Adds a subscriber to pubsub system -func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh chan struct{}, filter func(entry interface{}) bool) { +func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filter func(entry interface{}) bool) { ps.Lock() defer ps.Unlock()