Service Map Data API at Runtime (#54027) (#54655)

* [APM] Runtime service maps

* Make nodes interactive

* Don't use smaller range query on initial request

* Address feedback from Ron

* Get all services separately

* Get single service as well

* Query both transactions/spans for initial request

* Optimize 'top' query for service maps

* Use agent.name from scripted metric

* adds basic loading overlay

* filter out service map node self reference edges from being rendered

* Make service map initial load time range configurable with
`xpack.apm.serviceMapInitialTimeRange` default to last 1 hour in
milliseconds

* ensure destination.address is not missing in the composite agg when
fetching sample trace ids

* wip: added incremental data fetch & progress bar

* implement progressive loading design while blocking service map interaction during loading

* adds filter that destination.address exists before fetching sample trace ids

* reduce pairs of connections to 1 bi-directional connection with arrows on both ends of the edge

* Optimize query; add update button

* Allow user interaction after 5s, auto update in that time, otherwise
show toast for user to update the map with button

* Correctly reduce nodes/connections

* - remove non-interactive state while loading
- use cytoscape element definition types

* - readability improvements to the ServiceMap component
- only show the update map button toast after last request loads

* addresses feedback for changes to the Cytoscape component

* Add span.type/span.subtype do external nodes

* PR feedback

Co-authored-by: Dario Gieselaar <d.gieselaar@gmail.com>

Co-authored-by: Dario Gieselaar <d.gieselaar@gmail.com>
This commit is contained in:
Oliver Gupte 2020-01-13 14:54:45 -08:00 committed by GitHub
parent 9e17412dfd
commit c38ed9bfff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 1143 additions and 64 deletions

View file

@ -4,6 +4,8 @@ exports[`Error CLIENT_GEO_COUNTRY_ISO_CODE 1`] = `undefined`;
exports[`Error CONTAINER_ID 1`] = `undefined`;
exports[`Error DESTINATION_ADDRESS 1`] = `undefined`;
exports[`Error ERROR_CULPRIT 1`] = `"handleOopsie"`;
exports[`Error ERROR_EXC_HANDLED 1`] = `undefined`;
@ -112,6 +114,8 @@ exports[`Span CLIENT_GEO_COUNTRY_ISO_CODE 1`] = `undefined`;
exports[`Span CONTAINER_ID 1`] = `undefined`;
exports[`Span DESTINATION_ADDRESS 1`] = `undefined`;
exports[`Span ERROR_CULPRIT 1`] = `undefined`;
exports[`Span ERROR_EXC_HANDLED 1`] = `undefined`;
@ -220,6 +224,8 @@ exports[`Transaction CLIENT_GEO_COUNTRY_ISO_CODE 1`] = `undefined`;
exports[`Transaction CONTAINER_ID 1`] = `"container1234567890abcdef"`;
exports[`Transaction DESTINATION_ADDRESS 1`] = `undefined`;
exports[`Transaction ERROR_CULPRIT 1`] = `undefined`;
exports[`Transaction ERROR_EXC_HANDLED 1`] = `undefined`;

View file

@ -14,6 +14,8 @@ export const HTTP_REQUEST_METHOD = 'http.request.method';
export const USER_ID = 'user.id';
export const USER_AGENT_NAME = 'user_agent.name';
export const DESTINATION_ADDRESS = 'destination.address';
export const OBSERVER_VERSION_MAJOR = 'observer.version_major';
export const OBSERVER_LISTENING = 'observer.listening';
export const PROCESSOR_EVENT = 'processor.event';

View file

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export interface ServiceConnectionNode {
'service.name': string;
'service.environment': string | null;
'agent.name': string;
}
export interface ExternalConnectionNode {
'destination.address': string;
'span.type': string;
'span.subtype': string;
}
export type ConnectionNode = ServiceConnectionNode | ExternalConnectionNode;
export interface Connection {
source: ConnectionNode;
destination: ConnectionNode;
}

View file

@ -71,7 +71,8 @@ export const apm: LegacyPluginInitializer = kibana => {
autocreateApmIndexPattern: Joi.boolean().default(true),
// service map
serviceMapEnabled: Joi.boolean().default(false)
serviceMapEnabled: Joi.boolean().default(false),
serviceMapInitialTimeRange: Joi.number().default(60 * 1000 * 60) // last 1 hour
}).default();
},

View file

@ -73,6 +73,7 @@ export function Cytoscape({
cy.on('data', event => {
// Add the "primary" class to the node if its id matches the serviceName.
if (cy.nodes().length > 0 && serviceName) {
cy.nodes().removeClass('primary');
cy.getElementById(serviceName).addClass('primary');
}

View file

@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import theme from '@elastic/eui/dist/eui_theme_light.json';
import React from 'react';
import { EuiProgress, EuiText, EuiSpacer } from '@elastic/eui';
import styled from 'styled-components';
import { i18n } from '@kbn/i18n';
const Container = styled.div`
position: relative;
`;
const Overlay = styled.div`
position: absolute;
top: 0;
z-index: 1;
display: flex;
flex-direction: column;
align-items: center;
width: 100%;
padding: ${theme.gutterTypes.gutterMedium};
`;
const ProgressBarContainer = styled.div`
width: 50%;
max-width: 600px;
`;
interface Props {
children: React.ReactNode;
isLoading: boolean;
percentageLoaded: number;
}
export const LoadingOverlay = ({
children,
isLoading,
percentageLoaded
}: Props) => (
<Container>
{isLoading && (
<Overlay>
<ProgressBarContainer>
<EuiProgress
value={percentageLoaded}
max={100}
color="primary"
size="m"
/>
</ProgressBarContainer>
<EuiSpacer size="s" />
<EuiText size="s" textAlign="center">
{i18n.translate('xpack.apm.loadingServiceMap', {
defaultMessage:
'Loading service map... This might take a short while.'
})}
</EuiText>
</Overlay>
)}
{children}
</Container>
);

View file

@ -8,17 +8,13 @@ import theme from '@elastic/eui/dist/eui_theme_light.json';
import { icons, defaultIcon } from './icons';
const layout = {
animate: true,
animationEasing: theme.euiAnimSlightBounce as cytoscape.Css.TransitionTimingFunction,
animationDuration: parseInt(theme.euiAnimSpeedFast, 10),
name: 'dagre',
nodeDimensionsIncludeLabels: true,
rankDir: 'LR',
spacingFactor: 2
rankDir: 'LR'
};
function isDatabaseOrExternal(agentName: string) {
return agentName === 'database' || agentName === 'external';
return !agentName;
}
const style: cytoscape.Stylesheet[] = [
@ -47,7 +43,7 @@ const style: cytoscape.Stylesheet[] = [
'font-family': 'Inter UI, Segoe UI, Helvetica, Arial, sans-serif',
'font-size': theme.euiFontSizeXS,
height: theme.avatarSizing.l.size,
label: 'data(id)',
label: 'data(label)',
'min-zoomed-font-size': theme.euiSizeL,
'overlay-opacity': 0,
shape: (el: cytoscape.NodeSingular) =>
@ -76,7 +72,18 @@ const style: cytoscape.Stylesheet[] = [
//
// @ts-ignore
'target-distance-from-node': theme.paddingSizes.xs,
width: 2
width: 1,
'source-arrow-shape': 'none'
}
},
{
selector: 'edge[bidirectional]',
style: {
'source-arrow-shape': 'triangle',
'target-arrow-shape': 'triangle',
// @ts-ignore
'source-distance-from-node': theme.paddingSizes.xs,
'target-distance-from-node': theme.paddingSizes.xs
}
}
];

View file

@ -0,0 +1,158 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { ValuesType } from 'utility-types';
import { sortBy, isEqual } from 'lodash';
import { Connection, ConnectionNode } from '../../../../common/service_map';
import { ServiceMapAPIResponse } from '../../../../server/lib/service_map/get_service_map';
import { getAPMHref } from '../../shared/Links/apm/APMLink';
function getConnectionNodeId(node: ConnectionNode): string {
if ('destination.address' in node) {
// use a prefix to distinguish exernal destination ids from services
return `>${node['destination.address']}`;
}
return node['service.name'];
}
function getConnectionId(connection: Connection) {
return `${getConnectionNodeId(connection.source)}~${getConnectionNodeId(
connection.destination
)}`;
}
export function getCytoscapeElements(
responses: ServiceMapAPIResponse[],
search: string
) {
const discoveredServices = responses.flatMap(
response => response.discoveredServices
);
const serviceNodes = responses
.flatMap(response => response.services)
.map(service => ({
...service,
id: service['service.name']
}));
// maps destination.address to service.name if possible
function getConnectionNode(node: ConnectionNode) {
let mappedNode: ConnectionNode | undefined;
if ('destination.address' in node) {
mappedNode = discoveredServices.find(map => isEqual(map.from, node))?.to;
}
if (!mappedNode) {
mappedNode = node;
}
return {
...mappedNode,
id: getConnectionNodeId(mappedNode)
};
}
// build connections with mapped nodes
const connections = responses
.flatMap(response => response.connections)
.map(connection => {
const source = getConnectionNode(connection.source);
const destination = getConnectionNode(connection.destination);
return {
source,
destination,
id: getConnectionId({ source, destination })
};
})
.filter(connection => connection.source.id !== connection.destination.id);
const nodes = connections
.flatMap(connection => [connection.source, connection.destination])
.concat(serviceNodes);
type ConnectionWithId = ValuesType<typeof connections>;
type ConnectionNodeWithId = ValuesType<typeof nodes>;
const connectionsById = connections.reduce((connectionMap, connection) => {
return {
...connectionMap,
[connection.id]: connection
};
}, {} as Record<string, ConnectionWithId>);
const nodesById = nodes.reduce((nodeMap, node) => {
return {
...nodeMap,
[node.id]: node
};
}, {} as Record<string, ConnectionNodeWithId>);
const cyNodes = (Object.values(nodesById) as ConnectionNodeWithId[]).map(
node => {
let data = {};
if ('service.name' in node) {
data = {
href: getAPMHref(
`/services/${node['service.name']}/service-map`,
search
),
agentName: node['agent.name'] || node['agent.name']
};
}
return {
group: 'nodes' as const,
data: {
id: node.id,
label:
'service.name' in node
? node['service.name']
: node['destination.address'],
...data
}
};
}
);
// instead of adding connections in two directions,
// we add a `bidirectional` flag to use in styling
const dedupedConnections = (sortBy(
Object.values(connectionsById),
// make sure that order is stable
'id'
) as ConnectionWithId[]).reduce<
Array<ConnectionWithId & { bidirectional?: boolean }>
>((prev, connection) => {
const reversedConnection = prev.find(
c =>
c.destination.id === connection.source.id &&
c.source.id === connection.destination.id
);
if (reversedConnection) {
reversedConnection.bidirectional = true;
return prev;
}
return prev.concat(connection);
}, []);
const cyEdges = dedupedConnections.map(connection => {
return {
group: 'edges' as const,
data: {
id: connection.id,
source: connection.source.id,
target: connection.destination.id,
bidirectional: connection.bidirectional ? true : undefined
}
};
}, []);
return [...cyNodes, ...cyEdges];
}

View file

@ -5,13 +5,30 @@
*/
import theme from '@elastic/eui/dist/eui_theme_light.json';
import React from 'react';
import { useFetcher } from '../../../hooks/useFetcher';
import React, {
useMemo,
useEffect,
useState,
useRef,
useCallback
} from 'react';
import { find, isEqual } from 'lodash';
import { i18n } from '@kbn/i18n';
import { EuiButton } from '@elastic/eui';
import { ElementDefinition } from 'cytoscape';
import { toMountPoint } from '../../../../../../../../src/plugins/kibana_react/public';
import { ServiceMapAPIResponse } from '../../../../server/lib/service_map/get_service_map';
import { useLicense } from '../../../hooks/useLicense';
import { useUrlParams } from '../../../hooks/useUrlParams';
import { Controls } from './Controls';
import { Cytoscape } from './Cytoscape';
import { PlatinumLicensePrompt } from './PlatinumLicensePrompt';
import { useCallApmApi } from '../../../hooks/useCallApmApi';
import { useDeepObjectIdentity } from '../../../hooks/useDeepObjectIdentity';
import { useLocation } from '../../../hooks/useLocation';
import { LoadingOverlay } from './LoadingOverlay';
import { useApmPluginContext } from '../../../hooks/useApmPluginContext';
import { getCytoscapeElements } from './get_cytoscape_elements';
interface ServiceMapProps {
serviceName?: string;
@ -37,37 +54,159 @@ ${theme.euiColorLightShade}`,
margin: `-${theme.gutterTypes.gutterLarge}`
};
export function ServiceMap({ serviceName }: ServiceMapProps) {
const {
urlParams: { start, end }
} = useUrlParams();
const MAX_REQUESTS = 5;
export function ServiceMap({ serviceName }: ServiceMapProps) {
const callApmApi = useCallApmApi();
const license = useLicense();
const { search } = useLocation();
const { urlParams, uiFilters } = useUrlParams();
const { notifications } = useApmPluginContext().core;
const params = useDeepObjectIdentity({
start: urlParams.start,
end: urlParams.end,
environment: urlParams.environment,
serviceName,
uiFilters: {
...uiFilters,
environment: undefined
}
});
const renderedElements = useRef<ElementDefinition[]>([]);
const openToast = useRef<string | null>(null);
const [responses, setResponses] = useState<ServiceMapAPIResponse[]>([]);
const [isLoading, setIsLoading] = useState(true);
const [percentageLoaded, setPercentageLoaded] = useState(0);
const [, _setUnusedState] = useState(false);
const elements = useMemo(() => getCytoscapeElements(responses, search), [
responses,
search
]);
const forceUpdate = useCallback(() => _setUnusedState(value => !value), []);
const getNext = useCallback(
async (input: { reset?: boolean; after?: string | undefined }) => {
const { start, end, uiFilters: strippedUiFilters, ...query } = params;
if (input.reset) {
renderedElements.current = [];
setResponses([]);
}
const { data } = useFetcher(
callApmApi => {
if (start && end) {
return callApmApi({
pathname: '/api/apm/service-map',
params: { query: { start, end } }
});
setIsLoading(true);
try {
const data = await callApmApi({
pathname: '/api/apm/service-map',
params: {
query: {
...query,
start,
end,
uiFilters: JSON.stringify(strippedUiFilters),
after: input.after
}
}
});
setResponses(resp => resp.concat(data));
setIsLoading(false);
const shouldGetNext =
responses.length + 1 < MAX_REQUESTS && data.after;
if (shouldGetNext) {
setPercentageLoaded(value => value + 30); // increase loading bar 30%
await getNext({ after: data.after });
}
} catch (error) {
setIsLoading(false);
notifications.toasts.addError(error, {
title: i18n.translate('xpack.apm.errorServiceMapData', {
defaultMessage: `Error loading service connections`
})
});
}
}
},
[start, end]
[callApmApi, params, responses.length, notifications.toasts]
);
const elements = Array.isArray(data) ? data : [];
const license = useLicense();
useEffect(() => {
const loadServiceMaps = async () => {
setPercentageLoaded(5);
await getNext({ reset: true });
setPercentageLoaded(100);
};
loadServiceMaps();
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [params]);
useEffect(() => {
if (renderedElements.current.length === 0) {
renderedElements.current = elements;
return;
}
const newElements = elements.filter(element => {
return !find(renderedElements.current, el => isEqual(el, element));
});
const updateMap = () => {
renderedElements.current = elements;
if (openToast.current) {
notifications.toasts.remove(openToast.current);
}
forceUpdate();
};
if (newElements.length > 0 && percentageLoaded === 100) {
openToast.current = notifications.toasts.add({
title: i18n.translate('xpack.apm.newServiceMapData', {
defaultMessage: `Newly discovered connections are available.`
}),
onClose: () => {
openToast.current = null;
},
toastLifeTimeMs: 24 * 60 * 60 * 1000,
text: toMountPoint(
<EuiButton onClick={updateMap}>
{i18n.translate('xpack.apm.updateServiceMap', {
defaultMessage: 'Update map'
})}
</EuiButton>
)
}).id;
}
return () => {
if (openToast.current) {
notifications.toasts.remove(openToast.current);
}
};
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [elements, percentageLoaded]);
const isValidPlatinumLicense =
license?.isActive &&
(license?.type === 'platinum' || license?.type === 'trial');
return isValidPlatinumLicense ? (
<Cytoscape
elements={elements}
serviceName={serviceName}
style={cytoscapeDivStyle}
>
<Controls />
</Cytoscape>
<LoadingOverlay isLoading={isLoading} percentageLoaded={percentageLoaded}>
<Cytoscape
elements={renderedElements.current}
serviceName={serviceName}
style={cytoscapeDivStyle}
>
<Controls />
</Cytoscape>
</LoadingOverlay>
) : (
<PlatinumLicensePrompt />
);

View file

@ -11,7 +11,7 @@ import {
IndicesDeleteParams,
IndicesCreateParams
} from 'elasticsearch';
import { merge } from 'lodash';
import { merge, uniqueId } from 'lodash';
import { cloneDeep, isString } from 'lodash';
import { KibanaRequest } from 'src/core/server';
import { OBSERVER_VERSION_MAJOR } from '../../../common/elasticsearch_fieldnames';
@ -127,6 +127,23 @@ export function getESClient(
? callAsInternalUser
: callAsCurrentUser;
const debug = context.params.query._debug;
function withTime<T>(
fn: (log: typeof console.log) => Promise<T>
): Promise<T> {
const log = console.log.bind(console, uniqueId());
if (!debug) {
return fn(log);
}
const time = process.hrtime();
return fn(log).then(data => {
const now = process.hrtime(time);
log(`took: ${Math.round(now[0] * 1000 + now[1] / 1e6)}ms`);
return data;
});
}
return {
search: async <
TDocument = unknown,
@ -141,27 +158,29 @@ export function getESClient(
apmOptions
);
if (context.params.query._debug) {
console.log(`--DEBUG ES QUERY--`);
console.log(
`${request.url.pathname} ${JSON.stringify(context.params.query)}`
);
console.log(`GET ${nextParams.index}/_search`);
console.log(JSON.stringify(nextParams.body, null, 2));
}
return withTime(log => {
if (context.params.query._debug) {
log(`--DEBUG ES QUERY--`);
log(
`${request.url.pathname} ${JSON.stringify(context.params.query)}`
);
log(`GET ${nextParams.index}/_search`);
log(JSON.stringify(nextParams.body, null, 2));
}
return (callMethod('search', nextParams) as unknown) as Promise<
ESSearchResponse<TDocument, TSearchRequest>
>;
return (callMethod('search', nextParams) as unknown) as Promise<
ESSearchResponse<TDocument, TSearchRequest>
>;
});
},
index: <Body>(params: APMIndexDocumentParams<Body>) => {
return callMethod('index', params);
return withTime(() => callMethod('index', params));
},
delete: (params: IndicesDeleteParams) => {
return callMethod('delete', params);
return withTime(() => callMethod('delete', params));
},
indicesCreate: (params: IndicesCreateParams) => {
return callMethod('indices.create', params);
return withTime(() => callMethod('indices.create', params));
}
};
}

View file

@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { PromiseReturnType } from '../../../typings/common';
import {
Setup,
SetupTimeRange,
SetupUIFilters
} from '../helpers/setup_request';
import { getServiceMapFromTraceIds } from './get_service_map_from_trace_ids';
import { getTraceSampleIds } from './get_trace_sample_ids';
import { getServicesProjection } from '../../../common/projections/services';
import { mergeProjection } from '../../../common/projections/util/merge_projection';
import {
SERVICE_AGENT_NAME,
SERVICE_NAME
} from '../../../common/elasticsearch_fieldnames';
export interface IEnvOptions {
setup: Setup & SetupTimeRange & SetupUIFilters;
serviceName?: string;
environment?: string;
after?: string;
}
async function getConnectionData({
setup,
serviceName,
environment,
after
}: IEnvOptions) {
const { traceIds, after: nextAfter } = await getTraceSampleIds({
setup,
serviceName,
environment,
after
});
const serviceMapData = traceIds.length
? await getServiceMapFromTraceIds({
setup,
serviceName,
environment,
traceIds
})
: { connections: [], discoveredServices: [] };
return {
after: nextAfter,
...serviceMapData
};
}
async function getServicesData(options: IEnvOptions) {
// only return services on the first request for the global service map
if (options.after) {
return [];
}
const { setup } = options;
const projection = getServicesProjection({ setup });
const { filter } = projection.body.query.bool;
const params = mergeProjection(projection, {
body: {
size: 0,
query: {
bool: {
...projection.body.query.bool,
filter: options.serviceName
? filter.concat({
term: {
[SERVICE_NAME]: options.serviceName
}
})
: filter
}
},
aggs: {
services: {
terms: {
field: projection.body.aggs.services.terms.field,
size: 500
},
aggs: {
agent_name: {
terms: {
field: SERVICE_AGENT_NAME
}
}
}
}
}
}
});
const { client } = setup;
const response = await client.search(params);
return (
response.aggregations?.services.buckets.map(bucket => {
return {
'service.name': bucket.key as string,
'agent.name':
(bucket.agent_name.buckets[0]?.key as string | undefined) || '',
'service.environment': options.environment || null
};
}) || []
);
}
export type ServiceMapAPIResponse = PromiseReturnType<typeof getServiceMap>;
export async function getServiceMap(options: IEnvOptions) {
const [connectionData, servicesData] = await Promise.all([
getConnectionData(options),
getServicesData(options)
]);
return {
...connectionData,
services: servicesData
};
}

View file

@ -0,0 +1,280 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { uniq, find } from 'lodash';
import { Setup } from '../helpers/setup_request';
import {
TRACE_ID,
PROCESSOR_EVENT
} from '../../../common/elasticsearch_fieldnames';
import {
Connection,
ServiceConnectionNode,
ConnectionNode,
ExternalConnectionNode
} from '../../../common/service_map';
export async function getServiceMapFromTraceIds({
setup,
traceIds,
serviceName,
environment
}: {
setup: Setup;
traceIds: string[];
serviceName?: string;
environment?: string;
}) {
const { indices, client } = setup;
const serviceMapParams = {
index: [
indices['apm_oss.spanIndices'],
indices['apm_oss.transactionIndices']
],
body: {
size: 0,
query: {
bool: {
filter: [
{
terms: {
[PROCESSOR_EVENT]: ['span', 'transaction']
}
},
{
terms: {
[TRACE_ID]: traceIds
}
}
]
}
},
aggs: {
service_map: {
scripted_metric: {
init_script: {
lang: 'painless',
source: `state.eventsById = new HashMap();
String[] fieldsToCopy = new String[] {
'parent.id',
'service.name',
'service.environment',
'destination.address',
'trace.id',
'processor.event',
'span.type',
'span.subtype',
'agent.name'
};
state.fieldsToCopy = fieldsToCopy;`
},
map_script: {
lang: 'painless',
source: `def id;
if (!doc['span.id'].empty) {
id = doc['span.id'].value;
} else {
id = doc['transaction.id'].value;
}
def copy = new HashMap();
copy.id = id;
for(key in state.fieldsToCopy) {
if (!doc[key].empty) {
copy[key] = doc[key].value;
}
}
state.eventsById[id] = copy`
},
combine_script: {
lang: 'painless',
source: `return state.eventsById;`
},
reduce_script: {
lang: 'painless',
source: `
def getDestination ( def event ) {
def destination = new HashMap();
destination['destination.address'] = event['destination.address'];
destination['span.type'] = event['span.type'];
destination['span.subtype'] = event['span.subtype'];
return destination;
}
def processAndReturnEvent(def context, def eventId) {
if (context.processedEvents[eventId] != null) {
return context.processedEvents[eventId];
}
def event = context.eventsById[eventId];
if (event == null) {
return null;
}
def service = new HashMap();
service['service.name'] = event['service.name'];
service['service.environment'] = event['service.environment'];
service['agent.name'] = event['agent.name'];
def basePath = new ArrayList();
def parentId = event['parent.id'];
def parent;
if (parentId != null && parentId != event['id']) {
parent = processAndReturnEvent(context, parentId);
if (parent != null) {
/* copy the path from the parent */
basePath.addAll(parent.path);
/* flag parent path for removal, as it has children */
context.locationsToRemove.add(parent.path);
/* if the parent has 'destination.address' set, and the service is different,
we've discovered a service */
if (parent['destination.address'] != null
&& parent['destination.address'] != ""
&& (parent['span.type'] == 'external'
|| parent['span.type'] == 'messaging')
&& (parent['service.name'] != event['service.name']
|| parent['service.environment'] != event['service.environment']
)
) {
def parentDestination = getDestination(parent);
context.externalToServiceMap.put(parentDestination, service);
}
}
}
def lastLocation = basePath.size() > 0 ? basePath[basePath.size() - 1] : null;
def currentLocation = service;
/* only add the current location to the path if it's different from the last one*/
if (lastLocation == null || !lastLocation.equals(currentLocation)) {
basePath.add(currentLocation);
}
/* if there is an outgoing span, create a new path */
if (event['span.type'] == 'external' || event['span.type'] == 'messaging') {
def outgoingLocation = getDestination(event);
def outgoingPath = new ArrayList(basePath);
outgoingPath.add(outgoingLocation);
context.paths.add(outgoingPath);
}
event.path = basePath;
context.processedEvents[eventId] = event;
return event;
}
def context = new HashMap();
context.processedEvents = new HashMap();
context.eventsById = new HashMap();
context.paths = new HashSet();
context.externalToServiceMap = new HashMap();
context.locationsToRemove = new HashSet();
for (state in states) {
context.eventsById.putAll(state);
}
for (entry in context.eventsById.entrySet()) {
processAndReturnEvent(context, entry.getKey());
}
def paths = new HashSet();
for(foundPath in context.paths) {
if (!context.locationsToRemove.contains(foundPath)) {
paths.add(foundPath);
}
}
def response = new HashMap();
response.paths = paths;
def discoveredServices = new HashSet();
for(entry in context.externalToServiceMap.entrySet()) {
def map = new HashMap();
map.from = entry.getKey();
map.to = entry.getValue();
discoveredServices.add(map);
}
response.discoveredServices = discoveredServices;
return response;`
}
}
}
}
}
};
const serviceMapResponse = await client.search(serviceMapParams);
const scriptResponse = serviceMapResponse.aggregations?.service_map.value as {
paths: ConnectionNode[][];
discoveredServices: Array<{
from: ExternalConnectionNode;
to: ServiceConnectionNode;
}>;
};
let paths = scriptResponse.paths;
if (serviceName || environment) {
paths = paths.filter(path => {
return path.some(node => {
let matches = true;
if (serviceName) {
matches =
matches &&
'service.name' in node &&
node['service.name'] === serviceName;
}
if (environment) {
matches =
matches &&
'service.environment' in node &&
node['service.environment'] === environment;
}
return matches;
});
});
}
const connections = uniq(
paths.flatMap(path => {
return path.reduce((conns, location, index) => {
const prev = path[index - 1];
if (prev) {
return conns.concat({
source: prev,
destination: location
});
}
return conns;
}, [] as Connection[]);
}, [] as Connection[]),
(value, index, array) => {
return find(array, value);
}
);
return {
connections,
discoveredServices: scriptResponse.discoveredServices
};
}

View file

@ -0,0 +1,177 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { uniq, take, sortBy } from 'lodash';
import {
Setup,
SetupUIFilters,
SetupTimeRange
} from '../helpers/setup_request';
import { rangeFilter } from '../helpers/range_filter';
import { ESFilter } from '../../../typings/elasticsearch';
import {
PROCESSOR_EVENT,
SERVICE_NAME,
SERVICE_ENVIRONMENT,
SPAN_TYPE,
SPAN_SUBTYPE,
DESTINATION_ADDRESS,
TRACE_ID
} from '../../../common/elasticsearch_fieldnames';
const MAX_TRACES_TO_INSPECT = 1000;
export async function getTraceSampleIds({
after,
serviceName,
environment,
setup
}: {
after?: string;
serviceName?: string;
environment?: string;
setup: Setup & SetupTimeRange & SetupUIFilters;
}) {
const isTop = !after;
const { start, end, client, indices, config } = setup;
const rangeEnd = end;
const rangeStart = isTop
? rangeEnd - config['xpack.apm.serviceMapInitialTimeRange']
: start;
const rangeQuery = { range: rangeFilter(rangeStart, rangeEnd) };
const query = {
bool: {
filter: [
{
term: {
[PROCESSOR_EVENT]: 'span'
}
},
{
exists: {
field: DESTINATION_ADDRESS
}
},
rangeQuery
] as ESFilter[]
}
} as { bool: { filter: ESFilter[]; must_not?: ESFilter[] | ESFilter } };
if (serviceName) {
query.bool.filter.push({ term: { [SERVICE_NAME]: serviceName } });
}
if (environment) {
query.bool.filter.push({ term: { [SERVICE_ENVIRONMENT]: environment } });
}
const afterObj =
after && after !== 'top'
? { after: JSON.parse(Buffer.from(after, 'base64').toString()) }
: {};
const params = {
index: [indices['apm_oss.spanIndices']],
body: {
size: 0,
query,
aggs: {
connections: {
composite: {
size: 1000,
...afterObj,
sources: [
{ [SERVICE_NAME]: { terms: { field: SERVICE_NAME } } },
{
[SERVICE_ENVIRONMENT]: {
terms: { field: SERVICE_ENVIRONMENT, missing_bucket: true }
}
},
{
[SPAN_TYPE]: {
terms: { field: SPAN_TYPE, missing_bucket: true }
}
},
{
[SPAN_SUBTYPE]: {
terms: { field: SPAN_SUBTYPE, missing_bucket: true }
}
},
{
[DESTINATION_ADDRESS]: {
terms: { field: DESTINATION_ADDRESS }
}
}
]
},
aggs: {
sample: {
sampler: {
shard_size: 30
},
aggs: {
trace_ids: {
terms: {
field: TRACE_ID,
execution_hint: 'map' as const,
// remove bias towards large traces by sorting on trace.id
// which will be random-esque
order: {
_key: 'desc' as const
}
}
}
}
}
}
}
}
}
};
const tracesSampleResponse = await client.search<
{ trace: { id: string } },
typeof params
>(params);
let nextAfter: string | undefined;
const receivedAfterKey =
tracesSampleResponse.aggregations?.connections.after_key;
if (!after) {
nextAfter = 'top';
} else if (receivedAfterKey) {
nextAfter = Buffer.from(JSON.stringify(receivedAfterKey)).toString(
'base64'
);
}
// make sure at least one trace per composite/connection bucket
// is queried
const traceIdsWithPriority =
tracesSampleResponse.aggregations?.connections.buckets.flatMap(bucket =>
bucket.sample.trace_ids.buckets.map((sampleDocBucket, index) => ({
traceId: sampleDocBucket.key as string,
priority: index
}))
) || [];
const traceIds = take(
uniq(
sortBy(traceIdsWithPriority, 'priority').map(({ traceId }) => traceId)
),
MAX_TRACES_TO_INSPECT
);
return {
after: nextAfter,
traceIds
};
}

View file

@ -58,7 +58,7 @@ import {
uiFiltersEnvironmentsRoute
} from './ui_filters';
import { createApi } from './create_api';
import { serviceMapRoute } from './services';
import { serviceMapRoute } from './service_map';
const createApmApi = () => {
const api = createApi()
@ -118,10 +118,12 @@ const createApmApi = () => {
.add(transactionsLocalFiltersRoute)
.add(serviceNodesLocalFiltersRoute)
.add(uiFiltersEnvironmentsRoute)
.add(serviceMapRoute)
// Transaction
.add(transactionByTraceIdRoute);
.add(transactionByTraceIdRoute)
// Service map
.add(serviceMapRoute);
return api;
};

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import * as t from 'io-ts';
import Boom from 'boom';
import { setupRequest } from '../lib/helpers/setup_request';
import { createRoute } from './create_route';
import { uiFiltersRt, rangeRt } from './default_api_types';
import { getServiceMap } from '../lib/service_map/get_service_map';
export const serviceMapRoute = createRoute(() => ({
path: '/api/apm/service-map',
params: {
query: t.intersection([
t.partial({ environment: t.string, serviceName: t.string }),
uiFiltersRt,
rangeRt,
t.partial({ after: t.string })
])
},
handler: async ({ context, request }) => {
if (!context.config['xpack.apm.serviceMapEnabled']) {
throw Boom.notFound();
}
const setup = await setupRequest(context, request);
const {
query: { serviceName, environment, after }
} = context.params;
return getServiceMap({ setup, serviceName, environment, after });
}
}));

View file

@ -5,7 +5,6 @@
*/
import * as t from 'io-ts';
import Boom from 'boom';
import { AgentName } from '../../typings/es_schemas/ui/fields/Agent';
import {
createApmTelementry,
@ -18,7 +17,6 @@ import { getServiceTransactionTypes } from '../lib/services/get_service_transact
import { getServiceNodeMetadata } from '../lib/services/get_service_node_metadata';
import { createRoute } from './create_route';
import { uiFiltersRt, rangeRt } from './default_api_types';
import { getServiceMap } from '../lib/services/map';
import { getServiceAnnotations } from '../lib/services/annotations';
export const servicesRoute = createRoute(() => ({
@ -87,19 +85,6 @@ export const serviceNodeMetadataRoute = createRoute(() => ({
}
}));
export const serviceMapRoute = createRoute(() => ({
path: '/api/apm/service-map',
params: {
query: rangeRt
},
handler: async ({ context }) => {
if (context.config['xpack.apm.serviceMapEnabled']) {
return getServiceMap();
}
return new Boom('Not found', { statusCode: 404 });
}
}));
export const serviceAnnotationsRoute = createRoute(() => ({
path: '/api/apm/services/{serviceName}/annotations',
params: {

View file

@ -36,6 +36,19 @@ interface MetricsAggregationResponsePart {
value: number | null;
}
type GetCompositeKeys<
TAggregationOptionsMap extends AggregationOptionsMap
> = TAggregationOptionsMap extends {
composite: { sources: Array<infer Source> };
}
? keyof Source
: never;
type CompositeOptionsSource = Record<
string,
{ terms: { field: string; missing_bucket?: boolean } } | undefined
>;
export interface AggregationOptionsByType {
terms: {
field: string;
@ -97,6 +110,22 @@ export interface AggregationOptionsByType {
buckets_path: BucketsPath;
script?: Script;
};
composite: {
size?: number;
sources: CompositeOptionsSource[];
after?: Record<string, string | number | null>;
};
diversified_sampler: {
shard_size?: number;
max_docs_per_value?: number;
} & ({ script: Script } | { field: string }); // TODO use MetricsAggregationOptions if possible
scripted_metric: {
params?: Record<string, any>;
init_script?: Script;
map_script: Script;
combine_script: Script;
reduce_script: Script;
};
}
type AggregationType = keyof AggregationOptionsByType;
@ -229,6 +258,24 @@ interface AggregationResponsePart<
value: number | null;
}
| undefined;
composite: {
after_key: Record<GetCompositeKeys<TAggregationOptionsMap>, number>;
buckets: Array<
{
key: Record<GetCompositeKeys<TAggregationOptionsMap>, number>;
doc_count: number;
} & BucketSubAggregationResponse<
TAggregationOptionsMap['aggs'],
TDocument
>
>;
};
diversified_sampler: {
doc_count: number;
} & AggregationResponseMap<TAggregationOptionsMap['aggs'], TDocument>;
scripted_metric: {
value: unknown;
};
}
// Type for debugging purposes. If you see an error in AggregationResponseMap

View file

@ -56,6 +56,7 @@ export interface ESFilter {
| string
| string[]
| number
| boolean
| Record<string, unknown>
| ESFilter[];
};

View file

@ -16,6 +16,7 @@ export const config = {
},
schema: schema.object({
serviceMapEnabled: schema.boolean({ defaultValue: false }),
serviceMapInitialTimeRange: schema.number({ defaultValue: 60 * 1000 * 60 }), // last 1 hour
autocreateApmIndexPattern: schema.boolean({ defaultValue: true }),
ui: schema.object({
enabled: schema.boolean({ defaultValue: true }),
@ -37,6 +38,7 @@ export function mergeConfigs(apmOssConfig: APMOSSConfig, apmConfig: APMXPackConf
'apm_oss.onboardingIndices': apmOssConfig.onboardingIndices,
'apm_oss.indexPattern': apmOssConfig.indexPattern,
'xpack.apm.serviceMapEnabled': apmConfig.serviceMapEnabled,
'xpack.apm.serviceMapInitialTimeRange': apmConfig.serviceMapInitialTimeRange,
'xpack.apm.ui.enabled': apmConfig.ui.enabled,
'xpack.apm.ui.maxTraceItems': apmConfig.ui.maxTraceItems,
'xpack.apm.ui.transactionGroupBucketSize': apmConfig.ui.transactionGroupBucketSize,