Improved routing events and added Shard started & Shard promoted to primary events

This commit is contained in:
Boaz Leskes 2013-12-19 21:24:06 +01:00
parent ff61163367
commit b48318f219
2 changed files with 126 additions and 41 deletions

View file

@ -32,9 +32,12 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableSet;
@ -332,6 +335,41 @@ public class ExportersService extends AbstractLifecycleComponent<ExportersServic
pendingEventsQueue.add(new IndexEvent.IndexCreateDelete(timestamp, clusterName, index, false, event.source()));
}
if (event.routingTableChanged()) {
for (ShardRoutingDelta changedShard : getAssignedShardRoutingDelta(event.previousState(), event.state())) {
ShardRouting current = changedShard.current;
if (changedShard.previous == null) {
pendingEventsQueue.add(new RoutingEvent.ShardInitializing(timestamp, clusterName, current,
event.state().nodes().get(current.currentNodeId())
));
} else if (current.state() != changedShard.previous.state()) {
// state change - remember these are only assigned shard
switch (current.state()) {
case STARTED:
pendingEventsQueue.add(new RoutingEvent.ShardStarted(timestamp, clusterName, current,
event.state().nodes().get(current.currentNodeId())
));
break;
case RELOCATING:
pendingEventsQueue.add(new RoutingEvent.ShardRelocating(timestamp, clusterName, changedShard.current,
event.state().nodes().get(current.currentNodeId()),
event.state().nodes().get(current.relocatingNodeId()))
);
break;
default:
// we shouldn't get here as INITIALIZING will not have a previous and UNASSIGNED will not be in the change list.
assert false : "changed shard has an unexpected state [" + current.state() + "]";
}
} else if (current.primary() && !changedShard.previous.primary()) {
pendingEventsQueue.add(new RoutingEvent.ShardPromotedToPrimary(timestamp, clusterName, current,
event.state().nodes().get(current.currentNodeId())
));
}
}
}
// check for index & cluster status changes
ClusterHealthResponse prevHealth = new ClusterHealthResponse(clusterName, event.previousState().metaData().concreteAllIndices()
, event.previousState());
@ -350,53 +388,69 @@ public class ExportersService extends AbstractLifecycleComponent<ExportersServic
pendingEventsQueue.add(new IndexEvent.IndexStatus(timestamp, clusterName, event.source(), indexHealth));
}
}
if (event.routingTableChanged()) {
}
// hunt for initializing shards
RoutingNodes previousRoutingNodes = event.previousState().routingNodes();
for (ShardRouting shardRouting : event.state().routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)) {
RoutingNode oldRoutingNode = previousRoutingNodes.node(shardRouting.currentNodeId());
boolean changed = true;
if (oldRoutingNode != null) {
for (ShardRouting oldShardRouting : oldRoutingNode) {
if (oldShardRouting.equals(shardRouting)) {
changed = false;
break;
}
static class ShardRoutingDelta {
@Nullable
final public ShardRouting previous;
final public ShardRouting current;
public ShardRoutingDelta(@Nullable ShardRouting previous, ShardRouting current) {
this.previous = previous;
this.current = current;
}
}
protected List<ShardRoutingDelta> getAssignedShardRoutingDelta(ClusterState previousState, ClusterState currentState) {
List<ShardRoutingDelta> changedShards = new ArrayList<ShardRoutingDelta>();
for (IndexRoutingTable indexRoutingTable : currentState.routingTable()) {
IndexRoutingTable prevIndexRoutingTable = previousState.routingTable().getIndicesRouting().get(indexRoutingTable.index());
if (prevIndexRoutingTable == null) {
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : shardRoutingTable.assignedShards()) {
changedShards.add(new ShardRoutingDelta(null, shardRouting));
}
}
continue;
}
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
IndexShardRoutingTable prevShardRoutingTable = prevIndexRoutingTable.shard(shardRoutingTable.shardId().id());
if (prevShardRoutingTable == null) {
for (ShardRouting shardRouting : shardRoutingTable.assignedShards()) {
changedShards.add(new ShardRoutingDelta(null, shardRouting));
}
continue;
}
for (ShardRouting shardRouting : shardRoutingTable.getAssignedShards()) {
ShardRouting prevShardRouting = null;
for (ShardRouting candidate : prevShardRoutingTable.assignedShards()) {
if (candidate.currentNodeId().equals(shardRouting.currentNodeId())) {
prevShardRouting = candidate;
break;
} else if (shardRouting.currentNodeId().equals(candidate.relocatingNodeId())) {
// the shard relocated here
prevShardRouting = candidate;
break;
}
}
if (!changed) {
continue; // no event.
}
if (shardRouting.relocatingNodeId() != null) {
// if relocating node is not null, this shard is initializing due to a relocation
ShardRouting tmpShardRouting = new MutableShardRouting(
shardRouting.index(), shardRouting.id(), shardRouting.relocatingNodeId(),
shardRouting.currentNodeId(), shardRouting.primary(),
ShardRoutingState.RELOCATING, shardRouting.version());
DiscoveryNode relocatingTo = null;
if (tmpShardRouting.relocatingNodeId() != null) {
relocatingTo = event.state().nodes().get(tmpShardRouting.relocatingNodeId());
}
pendingEventsQueue.add(new RoutingEvent.ShardRelocating(timestamp, clusterName, tmpShardRouting,
relocatingTo, event.state().nodes().get(tmpShardRouting.currentNodeId())
));
} else {
pendingEventsQueue.add(new RoutingEvent.ShardInitializing(timestamp, clusterName, shardRouting,
event.state().nodes().get(shardRouting.currentNodeId())
));
if (prevShardRouting != null && prevShardRouting.equals(shardRouting)) {
continue; // nothing changed.
}
changedShards.add(new ShardRoutingDelta(prevShardRouting, shardRouting));
}
}
}
return changedShards;
}
class IndicesLifeCycleListener extends IndicesLifecycle.Listener {
@Override

View file

@ -89,7 +89,41 @@ public abstract class RoutingEvent extends Event {
@Override
String conciseDescription() {
return shardDescription(shardRouting) + " set to initializing on " + Utils.nodeDescription(node);
return shardDescription(shardRouting) + " initializing on " + Utils.nodeDescription(node);
}
}
public static class ShardStarted extends RoutingShardEvent {
public ShardStarted(long timestamp, String clusterName, ShardRouting shardRouting, DiscoveryNode node) {
super(timestamp, clusterName, shardRouting, node);
}
@Override
public String event() {
return "shard_started";
}
@Override
String conciseDescription() {
return shardDescription(shardRouting) + " started on " + Utils.nodeDescription(node);
}
}
public static class ShardPromotedToPrimary extends RoutingShardEvent {
public ShardPromotedToPrimary(long timestamp, String clusterName, ShardRouting shardRouting, DiscoveryNode node) {
super(timestamp, clusterName, shardRouting, node);
}
@Override
public String event() {
return "shard_promoted";
}
@Override
String conciseDescription() {
return shardRouting.shardId() + " promoted to primary on " + Utils.nodeDescription(node);
}
}
@ -110,11 +144,8 @@ public abstract class RoutingEvent extends Event {
@Override
String conciseDescription() {
String s = shardDescription(shardRouting) + " set to relocate";
if (relocatingTo != null) {
s += " to " + relocatingTo;
}
return s + " from " + Utils.nodeDescription(node);
return shardDescription(shardRouting) + " relocating to " + Utils.nodeDescription(relocatingTo) +
" from " + Utils.nodeDescription(node);
}
@Override