fix: locking in some situations for IAM store (#13595)

- Fix a bug where read locks were taken instead of write locks in some situations
- Remove an unnecessary lock when updating based on notifications.
This commit is contained in:
Aditya Manthramurthy 2021-11-07 17:42:32 -08:00 committed by GitHub
parent 12e6907512
commit fe0df01448
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 19 deletions

View File

@ -474,8 +474,8 @@ func (store *IAMStoreSys) GetMappedPolicy(name string, isGroup bool) (MappedPoli
// change (e.g. peer notification for object storage and etcd watch
// notification).
func (store *IAMStoreSys) GroupNotificationHandler(ctx context.Context, group string) error {
cache := store.rlock()
defer store.runlock()
cache := store.lock()
defer store.unlock()
err := store.loadGroup(ctx, group, cache.iamGroupsMap)
if err != nil && err != errNoSuchGroup {
@ -730,8 +730,8 @@ func (store *IAMStoreSys) GetGroupDescription(group string) (gd madmin.GroupDesc
// ListGroups - lists groups. Since this is not going to be a frequent
// operation, we fetch this info from storage, and refresh the cache as well.
func (store *IAMStoreSys) ListGroups(ctx context.Context) (res []string, err error) {
cache := store.rlock()
defer store.runlock()
cache := store.lock()
defer store.unlock()
if store.getUsersSysType() == MinIOUsersSysType {
m := map[string]GroupInfo{}
@ -834,8 +834,8 @@ func (store *IAMStoreSys) PolicyNotificationHandler(ctx context.Context, policy
return errInvalidArgument
}
cache := store.rlock()
defer store.runlock()
cache := store.lock()
defer store.unlock()
err := store.loadPolicyDoc(ctx, policy, cache.iamPolicyDocsMap)
if err == errNoSuchPolicy {
@ -1165,8 +1165,8 @@ func (store *IAMStoreSys) PolicyMappingNotificationHandler(ctx context.Context,
return errInvalidArgument
}
cache := store.rlock()
defer store.runlock()
cache := store.lock()
defer store.unlock()
m := cache.iamGroupPolicyMap
if !isGroup {
@ -1189,8 +1189,8 @@ func (store *IAMStoreSys) UserNotificationHandler(ctx context.Context, accessKey
return errInvalidArgument
}
cache := store.rlock()
defer store.runlock()
cache := store.lock()
defer store.unlock()
err := store.loadUser(ctx, accessKey, userType, cache.iamUsersMap)
if err == errNoSuchUser {
@ -1678,8 +1678,8 @@ func (store *IAMStoreSys) UpdateUserIdentity(ctx context.Context, cred auth.Cred
// LoadUser - attempts to load user info from storage and updates cache.
func (store *IAMStoreSys) LoadUser(ctx context.Context, accessKey string) {
cache := store.rlock()
defer store.runlock()
cache := store.lock()
defer store.unlock()
_, found := cache.iamUsersMap[accessKey]
if !found {

View File

@ -351,13 +351,6 @@ func (sys *IAMSys) loadWatchedEvent(outerCtx context.Context, event iamWatchEven
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
defer cancel()
// We need to read from storage and write to in-memory map, so we need
// only a read lock on storage, however in some cases we modify storage
// too (e.g. when credentials from storage are expired, we delete them),
// so we take write locks for both.
sys.Lock()
defer sys.Unlock()
if event.isCreated {
switch {
case usersPrefix: