Cluster & Index status events. All events output cluster name.

This commit is contained in:
Boaz Leskes 2013-12-18 17:15:32 +01:00
parent 73a3ea2179
commit 10aeee046c
8 changed files with 168 additions and 70 deletions

View file

@ -20,6 +20,8 @@ package org.elasticsearch.marvel.monitor;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterIndexHealth;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
@ -67,6 +69,7 @@ public class ExportersService extends AbstractLifecycleComponent<ExportersServic
private final NodeService nodeService;
private final ClusterService clusterService;
private final Client client;
private final String clusterName;
private final IndicesLifecycle.Listener indicesLifeCycleListener;
private final ClusterStateListener clusterStateEventListener;
@ -92,6 +95,7 @@ public class ExportersService extends AbstractLifecycleComponent<ExportersServic
this.interval = componentSettings.getAsTime("interval", TimeValue.timeValueSeconds(5));
this.indicesToExport = componentSettings.getAsArray("indices", this.indicesToExport, true);
this.client = client;
this.clusterName = clusterName.value();
indicesLifeCycleListener = new IndicesLifeCycleListener();
clusterStateEventListener = new ClusterStateListener();
@ -279,20 +283,76 @@ public class ExportersService extends AbstractLifecycleComponent<ExportersServic
}
// only collect if i'm master.
long timestamp = System.currentTimeMillis();
if (!event.previousState().nodes().localNodeMaster()) {
pendingEventsQueue.add(new NodeEvent.ElectedAsMaster(timestamp, event.state().nodes().localNode(), event.source()));
}
for (DiscoveryNode node : event.nodesDelta().addedNodes()) {
pendingEventsQueue.add(new NodeEvent.NodeJoinLeave(timestamp, node, true, event.source()));
pendingEventsQueue.add(new NodeEvent.NodeJoinLeave(timestamp, clusterName, node, true, event.source()));
}
for (DiscoveryNode node : event.nodesDelta().removedNodes()) {
pendingEventsQueue.add(new NodeEvent.NodeJoinLeave(timestamp, node, false, event.source()));
pendingEventsQueue.add(new NodeEvent.NodeJoinLeave(timestamp, clusterName, node, false, event.source()));
}
if (!event.previousState().nodes().localNodeMaster()) {
pendingEventsQueue.add(new NodeEvent.ElectedAsMaster(timestamp, clusterName, event.state().nodes().localNode(),
event.source()));
}
if (event.blocksChanged()) {
// TODO: Add index blocks
List<ClusterBlock> removed = newArrayList();
List<ClusterBlock> added = newArrayList();
ImmutableSet<ClusterBlock> currentBlocks = event.state().blocks().global();
ImmutableSet<ClusterBlock> previousBlocks = event.previousState().blocks().global();
for (ClusterBlock block : previousBlocks) {
if (!currentBlocks.contains(block)) {
removed.add(block);
}
}
for (ClusterBlock block : currentBlocks) {
if (!previousBlocks.contains(block)) {
added.add(block);
}
}
for (ClusterBlock block : added) {
pendingEventsQueue.add(new ClusterEvent.ClusterBlock(timestamp, clusterName, block, true, event.source()));
}
for (ClusterBlock block : removed) {
pendingEventsQueue.add(new ClusterEvent.ClusterBlock(timestamp, clusterName, block, false, event.source()));
}
}
for (String index : event.indicesCreated()) {
pendingEventsQueue.add(new IndexEvent.IndexCreateDelete(timestamp, clusterName, index, true, event.source()));
}
for (String index : event.indicesDeleted()) {
pendingEventsQueue.add(new IndexEvent.IndexCreateDelete(timestamp, clusterName, index, false, event.source()));
}
// check for index & cluster status changes
ClusterHealthResponse prevHealth = new ClusterHealthResponse(clusterName, event.previousState().metaData().concreteAllIndices()
, event.previousState());
ClusterHealthResponse curHealth = new ClusterHealthResponse(clusterName, event.state().metaData().concreteAllIndices(), event.state());
if (prevHealth.getStatus() != curHealth.getStatus()) {
pendingEventsQueue.add(new ClusterEvent.ClusterStatus(timestamp, clusterName, event.source(), curHealth));
}
for (ClusterIndexHealth indexHealth : curHealth) {
ClusterIndexHealth prevIndexHealth = prevHealth.getIndices().get(indexHealth.getIndex());
if (prevIndexHealth != null && prevIndexHealth.getStatus() == indexHealth.getStatus()) {
continue;
}
pendingEventsQueue.add(new IndexEvent.IndexStatus(timestamp, clusterName, event.source(), indexHealth));
}
if (event.routingTableChanged()) {
// hunt for initializing shards
RoutingNodes previousRoutingNodes = event.previousState().routingNodes();
for (ShardRouting shardRouting : event.state().routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)) {
@ -320,11 +380,11 @@ public class ExportersService extends AbstractLifecycleComponent<ExportersServic
if (tmpShardRouting.relocatingNodeId() != null) {
relocatingTo = event.state().nodes().get(tmpShardRouting.relocatingNodeId());
}
pendingEventsQueue.add(new RoutingEvent.ShardRelocating(timestamp, tmpShardRouting,
pendingEventsQueue.add(new RoutingEvent.ShardRelocating(timestamp, clusterName, tmpShardRouting,
relocatingTo, event.state().nodes().get(tmpShardRouting.currentNodeId())
));
} else {
pendingEventsQueue.add(new RoutingEvent.ShardInitializing(timestamp, shardRouting,
pendingEventsQueue.add(new RoutingEvent.ShardInitializing(timestamp, clusterName, shardRouting,
event.state().nodes().get(shardRouting.currentNodeId())
));
}
@ -332,42 +392,8 @@ public class ExportersService extends AbstractLifecycleComponent<ExportersServic
}
}
if (event.blocksChanged()) {
// TODO: Add index blocks
List<ClusterBlock> removed = newArrayList();
List<ClusterBlock> added = newArrayList();
ImmutableSet<ClusterBlock> currentBlocks = event.state().blocks().global();
ImmutableSet<ClusterBlock> previousBlocks = event.previousState().blocks().global();
for (ClusterBlock block : previousBlocks) {
if (!currentBlocks.contains(block)) {
removed.add(block);
}
}
for (ClusterBlock block : currentBlocks) {
if (!previousBlocks.contains(block)) {
added.add(block);
}
}
for (ClusterBlock block : added) {
pendingEventsQueue.add(new ClusterEvent.ClusterBlock(timestamp, block, true, event.source()));
}
for (ClusterBlock block : removed) {
pendingEventsQueue.add(new ClusterEvent.ClusterBlock(timestamp, block, false, event.source()));
}
}
for (String index : event.indicesCreated()) {
pendingEventsQueue.add(new IndexEvent.IndexCreateDelete(timestamp, index, true, event.source()));
}
for (String index : event.indicesDeleted()) {
pendingEventsQueue.add(new IndexEvent.IndexCreateDelete(timestamp, index, false, event.source()));
}
}
}
@ -383,7 +409,7 @@ public class ExportersService extends AbstractLifecycleComponent<ExportersServic
}
}
pendingEventsQueue.add(new ShardEvent(System.currentTimeMillis(), currentState,
pendingEventsQueue.add(new ShardEvent(System.currentTimeMillis(), clusterName, currentState,
indexShard.shardId(), clusterService.localNode(), relocatingNode, indexShard.routingEntry(), reason));
}
}

View file

@ -19,6 +19,8 @@ package org.elasticsearch.marvel.monitor.event;
*/
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -28,8 +30,8 @@ public abstract class ClusterEvent extends Event {
protected final String event_source;
public ClusterEvent(long timestamp, String event_source) {
super(timestamp);
public ClusterEvent(long timestamp, String clusterName, String event_source) {
super(timestamp, clusterName);
this.event_source = event_source;
}
@ -52,8 +54,8 @@ public abstract class ClusterEvent extends Event {
private final org.elasticsearch.cluster.block.ClusterBlock block;
private boolean added;
public ClusterBlock(long timestamp, org.elasticsearch.cluster.block.ClusterBlock block, boolean added, String event_source) {
super(timestamp, event_source);
public ClusterBlock(long timestamp, String clusterName, org.elasticsearch.cluster.block.ClusterBlock block, boolean added, String event_source) {
super(timestamp, clusterName, event_source);
this.block = block;
this.added = added;
}
@ -77,4 +79,32 @@ public abstract class ClusterEvent extends Event {
return builder;
}
}
public static class ClusterStatus extends ClusterEvent {
ClusterHealthResponse clusterHealth;
public ClusterStatus(long timestamp, String clusterName, String event_source, ClusterHealthResponse clusterHealth) {
super(timestamp, clusterName, event_source);
this.clusterHealth = clusterHealth;
}
@Override
protected String event() {
return "cluster_status";
}
@Override
String conciseDescription() {
return "cluster status is " + clusterHealth.getStatus().name();
}
@Override
public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
// disable parent outputting of cluster name, it's part of the cluster health.
ToXContent.Params p = new ToXContent.DelegatingMapParams(ImmutableMap.of("output_cluster_name", "false"), params);
super.addXContentBody(builder, p);
return clusterHealth.toXContent(builder, params);
}
}
}

View file

@ -31,15 +31,21 @@ public abstract class Event {
public final static DateTimeFormatter datePrinter = Joda.forPattern("date_time").printer();
protected long timestamp;
protected String clusterName;
public Event(long timestamp) {
public Event(long timestamp, String clusterName) {
this.timestamp = timestamp;
this.clusterName = clusterName;
}
public long timestamp() {
return timestamp;
}
public String clusterName() {
return clusterName;
}
/**
* @return event's type as a short string without spaces
*/
@ -57,6 +63,9 @@ public abstract class Event {
public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field("@timestamp", datePrinter.print(timestamp));
if (params.paramAsBoolean("output_cluster_name", true)) {
builder.field("cluster_name", clusterName);
}
builder.field("message", conciseDescription());
return builder;
}

View file

@ -19,17 +19,20 @@ package org.elasticsearch.marvel.monitor.event;
*/
import org.elasticsearch.action.admin.cluster.health.ClusterIndexHealth;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Map;
public abstract class IndexEvent extends Event {
protected final String event_source;
public IndexEvent(long timestamp, String event_source) {
super(timestamp);
public IndexEvent(long timestamp, String clusterName, String event_source) {
super(timestamp, clusterName);
this.event_source = event_source;
}
@ -53,8 +56,8 @@ public abstract class IndexEvent extends Event {
private final String index;
private boolean created;
public IndexCreateDelete(long timestamp, String index, boolean created, String event_source) {
super(timestamp, event_source);
public IndexCreateDelete(long timestamp, String clusterName, String index, boolean created, String event_source) {
super(timestamp, clusterName, event_source);
this.index = index;
this.created = created;
}
@ -76,4 +79,33 @@ public abstract class IndexEvent extends Event {
return builder;
}
}
public static class IndexStatus extends IndexEvent {
ClusterIndexHealth indexHealth;
Map<String, String> SHARD_LEVEL_MAP = ImmutableMap.of("level", "shards");
public IndexStatus(long timestamp, String clusterName, String event_source, ClusterIndexHealth indexHealth) {
super(timestamp, clusterName, event_source);
this.indexHealth = indexHealth;
}
@Override
protected String event() {
return "index_status";
}
@Override
String conciseDescription() {
return "[" + indexHealth.getIndex() + "] status is " + indexHealth.getStatus().name();
}
@Override
public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
super.addXContentBody(builder, params);
builder.field("index", indexHealth.getIndex());
return indexHealth.toXContent(builder, new ToXContent.DelegatingMapParams(SHARD_LEVEL_MAP, params));
}
}
}

View file

@ -30,8 +30,8 @@ public abstract class NodeEvent extends Event {
protected final String event_source;
public NodeEvent(long timestamp, String event_source) {
super(timestamp);
public NodeEvent(long timestamp, String clusterName, String event_source) {
super(timestamp, clusterName);
this.event_source = event_source;
}
@ -55,8 +55,8 @@ public abstract class NodeEvent extends Event {
private final DiscoveryNode node;
public ElectedAsMaster(long timestamp, DiscoveryNode node, String event_source) {
super(timestamp, event_source);
public ElectedAsMaster(long timestamp, String clusterName, DiscoveryNode node, String event_source) {
super(timestamp, clusterName, event_source);
this.node = node;
}
@ -67,7 +67,7 @@ public abstract class NodeEvent extends Event {
@Override
String conciseDescription() {
return node.toString() + " became master";
return Utils.nodeDescription(node) + " became master";
}
// no need to render node as XContent as it will be done by the exporter.
@ -78,8 +78,8 @@ public abstract class NodeEvent extends Event {
private final DiscoveryNode node;
private boolean joined;
public NodeJoinLeave(long timestamp, DiscoveryNode node, boolean joined, String event_source) {
super(timestamp, event_source);
public NodeJoinLeave(long timestamp, String clusterName, DiscoveryNode node, boolean joined, String event_source) {
super(timestamp, clusterName, event_source);
this.node = node;
this.joined = joined;
}

View file

@ -29,8 +29,8 @@ import java.io.IOException;
public abstract class RoutingEvent extends Event {
public RoutingEvent(long timestamp) {
super(timestamp);
public RoutingEvent(long timestamp, String clusterName) {
super(timestamp, clusterName);
}
@Override
@ -56,8 +56,8 @@ public abstract class RoutingEvent extends Event {
protected final ShardRouting shardRouting;
protected final DiscoveryNode node;
public RoutingShardEvent(long timestamp, ShardRouting shardRouting, DiscoveryNode node) {
super(timestamp);
public RoutingShardEvent(long timestamp, String clusterName, ShardRouting shardRouting, DiscoveryNode node) {
super(timestamp, clusterName);
this.node = node;
this.shardRouting = shardRouting;
}
@ -78,8 +78,8 @@ public abstract class RoutingEvent extends Event {
public static class ShardInitializing extends RoutingShardEvent {
public ShardInitializing(long timestamp, ShardRouting shardRouting, DiscoveryNode node) {
super(timestamp, shardRouting, node);
public ShardInitializing(long timestamp, String clusterName, ShardRouting shardRouting, DiscoveryNode node) {
super(timestamp, clusterName, shardRouting, node);
}
@Override
@ -97,8 +97,9 @@ public abstract class RoutingEvent extends Event {
final DiscoveryNode relocatingTo;
public ShardRelocating(long timestamp, ShardRouting shardRouting, DiscoveryNode node, DiscoveryNode relocatingTo) {
super(timestamp, shardRouting, node);
public ShardRelocating(long timestamp, String clusterName, ShardRouting shardRouting,
DiscoveryNode node, DiscoveryNode relocatingTo) {
super(timestamp, clusterName, shardRouting, node);
this.relocatingTo = relocatingTo;
}

View file

@ -41,9 +41,9 @@ public class ShardEvent extends Event {
private IndexShardState shardState;
public ShardEvent(long timestamp, IndexShardState shardState, ShardId shardId, DiscoveryNode node,
public ShardEvent(long timestamp, String clusterName, IndexShardState shardState, ShardId shardId, DiscoveryNode node,
DiscoveryNode relocatingNode, ShardRouting shardRouting, String reason) {
super(timestamp);
super(timestamp, clusterName);
this.shardState = shardState;
this.shardId = shardId;
this.reason = reason;

View file

@ -512,7 +512,7 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
@Override
public void render(int index, XContentBuilder builder) throws IOException {
builder.startObject();
builder.field("cluster_name", clusterName.value());
// events output cluster name.
addNodeInfo(builder, "_source_node");
events[index].addXContentBody(builder, xContentParams);
builder.endObject();