Handle err returned by rpc.Server.RegisterName (#2910)

This commit is contained in:
Krishnan Parthasarathi 2016-10-13 11:43:24 +05:30 committed by Harshavardhana
parent 84acc820c7
commit b59bac670a
7 changed files with 56 additions and 21 deletions

View file

@ -79,7 +79,7 @@ type controlAPIHandlers struct {
}
// Register control RPC handlers.
func registerControlRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfig) {
func registerControlRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfig) (err error) {
// Initialize Control.
ctrlHandlers := &controlAPIHandlers{
ObjectAPI: newObjectLayerFn,
@ -89,8 +89,12 @@ func registerControlRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfig)
}
ctrlRPCServer := rpc.NewServer()
ctrlRPCServer.RegisterName("Control", ctrlHandlers)
err = ctrlRPCServer.RegisterName("Control", ctrlHandlers)
if err != nil {
return traceError(err)
}
ctrlRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter()
ctrlRouter.Path(controlPath).Handler(ctrlRPCServer)
return nil
}

View file

@ -77,9 +77,9 @@ type lockServer struct {
}
// Register distributed NS lock handlers.
func registerDistNSLockRouter(mux *router.Router, serverConfig serverCmdConfig) {
func registerDistNSLockRouter(mux *router.Router, serverConfig serverCmdConfig) error {
lockServers := newLockServers(serverConfig)
registerStorageLockers(mux, lockServers)
return registerStorageLockers(mux, lockServers)
}
// Create one lock server for every local storage rpc server.
@ -128,13 +128,17 @@ func newLockServers(serverConfig serverCmdConfig) (lockServers []*lockServer) {
}
// registerStorageLockers - register locker rpc handlers for net/rpc library clients
func registerStorageLockers(mux *router.Router, lockServers []*lockServer) {
func registerStorageLockers(mux *router.Router, lockServers []*lockServer) error {
for _, lockServer := range lockServers {
lockRPCServer := rpc.NewServer()
lockRPCServer.RegisterName("Dsync", lockServer)
err := lockRPCServer.RegisterName("Dsync", lockServer)
if err != nil {
return traceError(err)
}
lockRouter := mux.PathPrefix(reservedBucket).Subrouter()
lockRouter.Path(path.Join("/lock", lockServer.rpcPath)).Handler(lockRPCServer)
}
return nil
}
/// Distributed lock handlers

View file

@ -67,24 +67,36 @@ func newObjectLayer(storageDisks []StorageAPI) (ObjectLayer, error) {
}
// configureServer handler returns final handler for the http server.
func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
func configureServerHandler(srvCmdConfig serverCmdConfig) (http.Handler, error) {
// Initialize router.
mux := router.NewRouter()
// Initialize distributed NS lock.
if srvCmdConfig.isDistXL {
// Register storage rpc router only if its a distributed setup.
registerStorageRPCRouters(mux, srvCmdConfig)
err := registerStorageRPCRouters(mux, srvCmdConfig)
if err != nil {
return nil, err
}
// Register distributed namespace lock.
registerDistNSLockRouter(mux, srvCmdConfig)
err = registerDistNSLockRouter(mux, srvCmdConfig)
if err != nil {
return nil, err
}
}
// Register S3 peer communication router.
registerS3PeerRPCRouter(mux)
err := registerS3PeerRPCRouter(mux)
if err != nil {
return nil, err
}
// Register controller rpc router.
registerControlRPCRouter(mux, srvCmdConfig)
err = registerControlRPCRouter(mux, srvCmdConfig)
if err != nil {
return nil, err
}
// set environmental variable MINIO_BROWSER=off to disable minio web browser.
// By default minio web browser is enabled.
@ -122,5 +134,5 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
}
// Register rest of the handlers.
return registerHandlers(mux, handlerFns...)
return registerHandlers(mux, handlerFns...), nil
}

View file

@ -30,14 +30,18 @@ type s3PeerAPIHandlers struct {
ObjectAPI func() ObjectLayer
}
func registerS3PeerRPCRouter(mux *router.Router) {
func registerS3PeerRPCRouter(mux *router.Router) error {
s3PeerHandlers := &s3PeerAPIHandlers{
ObjectAPI: newObjectLayerFn,
}
s3PeerRPCServer := rpc.NewServer()
s3PeerRPCServer.RegisterName("S3", s3PeerHandlers)
err := s3PeerRPCServer.RegisterName("S3", s3PeerHandlers)
if err != nil {
return traceError(err)
}
s3PeerRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter()
s3PeerRouter.Path(s3Path).Handler(s3PeerRPCServer)
return nil
}

View file

@ -346,7 +346,8 @@ func serverMain(c *cli.Context) {
}
// Configure server.
handler := configureServerHandler(srvConfig)
handler, err := configureServerHandler(srvConfig)
fatalIf(err, "Unable to configure one of server's RPC services.")
// Set nodes for dsync for distributed setup.
if srvConfig.isDistXL {
@ -375,7 +376,7 @@ func serverMain(c *cli.Context) {
}(tls)
// Wait for formatting of disks.
err := waitForFormatDisks(firstDisk, endPoints[0], storageDisks)
err = waitForFormatDisks(firstDisk, endPoints[0], storageDisks)
fatalIf(err, "formatting storage disks failed")
// Once formatted, initialize object layer.

View file

@ -257,17 +257,23 @@ func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err e
}
// registerStorageRPCRouter - register storage rpc router.
func registerStorageRPCRouters(mux *router.Router, srvCmdConfig serverCmdConfig) {
func registerStorageRPCRouters(mux *router.Router, srvCmdConfig serverCmdConfig) error {
// Initialize storage rpc servers for every disk that is hosted on this node.
storageRPCs, err := newRPCServer(srvCmdConfig)
fatalIf(err, "Unable to initialize storage RPC server.")
if err != nil {
return traceError(err)
}
// Create a unique route for each disk exported from this node.
for _, stServer := range storageRPCs {
storageRPCServer := rpc.NewServer()
storageRPCServer.RegisterName("Storage", stServer)
err = storageRPCServer.RegisterName("Storage", stServer)
if err != nil {
return traceError(err)
}
// Add minio storage routes.
storageRouter := mux.PathPrefix(reservedBucket).Subrouter()
storageRouter.Path(path.Join("/storage", stServer.path)).Handler(storageRPCServer)
}
return nil
}

View file

@ -183,12 +183,16 @@ func StartTestServer(t TestErrHandler, instanceType string) TestServer {
}
// Run TestServer.
testServer.Server = httptest.NewServer(configureServerHandler(
httpHandler, err := configureServerHandler(
serverCmdConfig{
disks: disks,
storageDisks: storageDisks,
},
))
)
if err != nil {
t.Fatalf("Failed to configure one of the RPC services <ERROR> %s", err)
}
testServer.Server = httptest.NewServer(httpHandler)
testServer.Obj = objLayer
globalObjLayerMutex.Lock()