Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ config :supavisor,
reconnect_retries: System.get_env("RECONNECT_RETRIES", "5") |> String.to_integer(),
subscribe_retries: System.get_env("SUBSCRIBE_RETRIES", "20") |> String.to_integer()

config :prom_ex, storage_adapter: PromEx.Storage.Peep
config :prom_ex, storage_adapter: Supavisor.Monitoring.PromEx.Store

# Configures the endpoint
config :supavisor, SupavisorWeb.Endpoint,
Expand Down
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
[
pkgs.protobuf
pkgs.cargo-outdated
pkgs.prom2json
]
++ lib.optionals pkgs.stdenv.isDarwin (with pkgs.darwin.apple_sdk; [
frameworks.System
Expand Down
2 changes: 1 addition & 1 deletion lib/supavisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ defmodule Supavisor do
pool: get_local_pool(id)
}

if Map.values(workers) |> Enum.member?(nil) do
if nil in Map.values(workers) do
Logger.error("Could not get workers for tenant #{inspect(id)}")
{:error, :worker_not_found}
else
Expand Down
15 changes: 13 additions & 2 deletions lib/supavisor/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ defmodule Supavisor.Application do
az: Application.get_env(:supavisor, :availability_zone),
region: region,
location: System.get_env("LOCATION_KEY") || region,
instance_id: System.get_env("INSTANCE_ID")
instance_id: System.get_env("INSTANCE_ID"),
short_node_id: short_node_id()
}

:ok =
Expand Down Expand Up @@ -118,7 +119,6 @@ defmodule Supavisor.Application do
if @metrics_disabled do
children
else
PromEx.set_metrics_tags()
children ++ [PromEx, Supavisor.TenantsMetrics, Supavisor.MetricsCleaner]
end

Expand All @@ -143,4 +143,15 @@ defmodule Supavisor.Application do
SupavisorWeb.Endpoint.config_change(changed, removed)
:ok
end

@spec short_node_id() :: String.t() | nil
defp short_node_id do
with {:ok, fly_alloc_id} when is_binary(fly_alloc_id) <-
Application.fetch_env(:supavisor, :fly_alloc_id),
[short_alloc_id, _] <- String.split(fly_alloc_id, "-", parts: 2) do
short_alloc_id
else
_ -> nil
end
end
end
2 changes: 2 additions & 0 deletions lib/supavisor/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,8 @@ defmodule Supavisor.ClientHandler do
@spec app_name(any()) :: String.t()
def app_name(name) when is_binary(name), do: name

def app_name(nil), do: "Supavisor"

def app_name(name) do
Logger.debug("ClientHandler: Invalid application name #{inspect(name)}")
"Supavisor"
Expand Down
226 changes: 160 additions & 66 deletions lib/supavisor/monitoring/prom_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,49 @@ defmodule Supavisor.Monitoring.PromEx do
use PromEx, otp_app: :supavisor
require Logger

alias Peep.Storage
alias PromEx.Plugins
alias Supavisor.PromEx.Plugins.{OsMon, Tenant}
alias Telemetry.Metrics

defmodule Store do
@moduledoc """
Storage module for PromEx that provide additional functionality of using
global tags (extracted from Logger global metadata). It also disables
scraping using `PromEx.scrape/1` function as it should not be used directly.
We expose scraping via `Supavisor.Monitoring.PromEx.get_metrics/0` function.
"""

@behaviour PromEx.Storage

@impl true
def scrape(_name) do
# Hack to not report errors from ETSCronFlusher
if match?({PromEx.ETSCronFlusher, _, _}, Process.get(:"$initial_call")) do
""
else
raise(
"Do not use PromEx.scrape/1, instead use Supavisor.Monitoring.PromEx.fetch_cluster_metrics/0"
)
end
end

@impl true
def child_spec(name, metrics) do
global_tags = :logger.get_primary_config().metadata
global_tags_keys = Map.keys(global_tags)

Peep.child_spec(
name: name,
metrics: Enum.map(metrics, &extent_tags(&1, global_tags_keys)),
global_tags: global_tags
)
end

defp extent_tags(%{tags: tags} = metric, global_tags) do
%{metric | tags: tags ++ global_tags}
end
end

@impl true
def plugins do
Expand All @@ -28,97 +69,150 @@ defmodule Supavisor.Monitoring.PromEx do
]
end

@spec set_metrics_tags() :: map()
def set_metrics_tags do
metrics_tags = :logger.get_primary_config().metadata

metrics_tags =
case short_node_id() do
nil -> metrics_tags
short_alloc_id -> Map.put(metrics_tags, :short_alloc_id, short_alloc_id)
end

Application.put_env(:supavisor, :metrics_tags, metrics_tags)
metrics_tags
end

@spec short_node_id() :: String.t() | nil
def short_node_id do
with {:ok, fly_alloc_id} when is_binary(fly_alloc_id) <-
Application.fetch_env(:supavisor, :fly_alloc_id),
[short_alloc_id, _] <- String.split(fly_alloc_id, "-", parts: 2) do
short_alloc_id
else
_ -> nil
end
end

@spec get_metrics() :: iodata()
def get_metrics do
metrics_tags =
case Application.fetch_env(:supavisor, :metrics_tags) do
:error -> set_metrics_tags()
{:ok, tags} -> tags
end

def_tags = Enum.map_join(metrics_tags, ",", fn {k, v} -> "#{k}=\"#{v}\"" end)

metrics =
PromEx.get_metrics(__MODULE__)
|> String.split("\n")
|> Enum.map(&parse_and_add_tags(&1, def_tags))

Supavisor.Monitoring.PromEx.ETSCronFlusher
|> PromEx.ETSCronFlusher.defer_ets_flush()
fetch_metrics()
|> Peep.Prometheus.export()
end

metrics
@spec get_cluster_metrics() :: iodata()
def get_cluster_metrics do
fetch_cluster_metrics()
|> Peep.Prometheus.export()
end

@spec do_cache_tenants_metrics() :: list
def do_cache_tenants_metrics do
metrics = get_metrics() |> IO.iodata_to_binary() |> String.split("\n")

pools =
Registry.select(Supavisor.Registry.TenantClients, [{{:"$1", :_, :_}, [], [:"$1"]}])
|> Enum.uniq()

_ =
Enum.reduce(pools, metrics, fn {{_type, tenant}, _, _, _, _}, acc ->
{matched, rest} = Enum.split_with(acc, &String.contains?(&1, "tenant=\"#{tenant}\""))

if matched != [] do
Cachex.put(Supavisor.Cache, {:metrics, tenant}, Enum.join(matched, "\n"))
end
Enum.each(pools, fn {{_type, tenant}, _, _, _, _} ->
metrics = fetch_metrics_for(tenant: tenant)

rest
end)
if metrics != %{} do
Cachex.put(Supavisor.Cache, {:metrics, tenant}, metrics)
end
end)

pools
end

@spec get_cluster_tenant_metrics(String.t()) :: iodata()
def get_cluster_tenant_metrics(tenant) do
fetch_cluster_tenant_metrics(tenant)
|> Peep.Prometheus.export()
end

@spec get_tenant_metrics(String.t()) :: String.t()
def get_tenant_metrics(tenant) do
case Cachex.get(Supavisor.Cache, {:metrics, tenant}) do
{_, metrics} when is_binary(metrics) -> metrics
{_, metrics} when is_map(metrics) -> Peep.Prometheus.export(metrics)
_ -> ""
end
end

@spec parse_and_add_tags(String.t(), String.t()) :: iodata()
defp parse_and_add_tags(line, def_tags) do
case Regex.run(~r/(?!\#)^(\w+)(?:{(.*?)})?\s*(.+)$/, line) do
nil ->
[line, "\n"]
def fetch_metrics do
Peep.get_all_metrics(__metrics_collector_name__())
end

def fetch_tenant_metrics(tenant) do
case Cachex.get(Supavisor.Cache, {:metrics, tenant}) do
{_, metrics} when is_map(metrics) -> metrics
_ -> %{}
end
end

def fetch_cluster_metrics do
[node() | Node.list()]
|> Task.async_stream(&fetch_node_metrics/1, timeout: :infinity)
|> Stream.map(fn {_, map} -> map end)
|> Enum.reduce(&merge_metrics/2)
end

def fetch_cluster_tenant_metrics(tenant) do
[node() | Node.list()]
|> Task.async_stream(&fetch_node_tenant_metrics(&1, tenant), timeout: :infinity)
|> Stream.map(fn {_, map} -> map end)
|> Enum.reduce(&merge_metrics/2)
end

@spec fetch_node_metrics(atom()) :: map()
defp fetch_node_metrics(node), do: do_fetch(node, :fetch_metrics, [])

@spec fetch_node_tenant_metrics(atom(), String.t()) :: map()
defp fetch_node_tenant_metrics(node, tenant),
do: do_fetch(node, :fetch_tenant_metrics, [tenant])

@spec do_fetch(node(), atom(), list()) :: map()
defp do_fetch(node, f, a) do
case :rpc.call(node, __MODULE__, f, a, 25_000) do
map when is_map(map) ->
map

[_, key, tags, value] ->
tags =
if tags == "" do
def_tags
else
[tags, ",", def_tags]
end
{:badrpc, reason} ->
Logger.error(
"Cannot fetch metrics from the node #{inspect(node)} because #{inspect(reason)} (call #{f} with #{inspect(a)})"
)

[key, "{", tags, "}", value, "\n"]
%{}
end
end

defp merge_metrics(a, b), do: Map.merge(a, b, &do_merge/3)

defp do_merge(%Metrics.Counter{}, a, b), do: sum_merge(a, b)
defp do_merge(%Metrics.Sum{}, a, b), do: sum_merge(a, b)
defp do_merge(%Metrics.LastValue{}, a, b), do: Map.merge(a, b)

defp do_merge(%Metrics.Distribution{}, a, b) do
Map.merge(a, b, fn _, a, b -> sum_merge(a, b) end)
end

defp sum_merge(a, b), do: Map.merge(a, b, fn _, a, b -> a + b end)

def fetch_metrics_for(tags) do
match =
for {name, value} <- tags do
{:"=:=", {:map_get, {:const, name}, :"$1"}, {:const, value}}
end

{_, store} = Peep.Persistent.storage(__metrics_collector_name__())

store
|> List.wrap()
|> Enum.flat_map(fn tid ->
:ets.select(tid, [{{{:_, :"$1", :_}, :_}, match, [:"$_"]}])
end)
|> group_metrics(%{})
end

# Copied from Peep. Probably will work only with ETS storage (that we
# currently use).
# To be removed if Peep will accept feature request for similar functionality,
# see: https://github.com/rkallos/peep/issues/35
defp group_metrics([], acc) do
acc
end

defp group_metrics([metric | rest], acc) do
acc2 = group_metric(metric, acc)
group_metrics(rest, acc2)
end

defp group_metric({{%Metrics.Counter{} = metric, tags, _}, value}, acc) do
update_in(acc, [Access.key(metric, %{}), Access.key(tags, 0)], &(&1 + value))
end

defp group_metric({{%Metrics.Sum{} = metric, tags, _}, value}, acc) do
update_in(acc, [Access.key(metric, %{}), Access.key(tags, 0)], &(&1 + value))
end

defp group_metric({{%Metrics.LastValue{} = metric, tags}, value}, acc) do
put_in(acc, [Access.key(metric, %{}), Access.key(tags)], value)
end

defp group_metric({{%Metrics.Distribution{} = metric, tags}, atomics}, acc) do
put_in(acc, [Access.key(metric, %{}), Access.key(tags)], Storage.Atomics.values(atomics))
end
end
39 changes: 3 additions & 36 deletions lib/supavisor_web/controllers/metrics_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,52 +10,19 @@ defmodule SupavisorWeb.MetricsController do

@spec index(Plug.Conn.t(), any()) :: Plug.Conn.t()
def index(conn, _) do
cluster_metrics = fetch_cluster_metrics()
cluster_metrics = PromEx.get_cluster_metrics()

conn
|> put_resp_content_type("text/plain")
|> send_resp(200, cluster_metrics)
end

def tenant(conn, %{"external_id" => ext_id}) do
cluster_metrics = fetch_cluster_metrics(ext_id)
code = if cluster_metrics == "", do: 404, else: 200
cluster_metrics = PromEx.get_cluster_tenant_metrics(ext_id)
code = if cluster_metrics == [], do: 404, else: 200

conn
|> put_resp_content_type("text/plain")
|> send_resp(code, [cluster_metrics, "\n"])
end

@spec fetch_cluster_metrics() :: String.t()
def fetch_cluster_metrics do
Node.list()
|> Task.async_stream(&fetch_node_metrics/1, timeout: :infinity)
|> Enum.reduce(PromEx.get_metrics(), &merge_node_metrics/2)
end

@spec fetch_node_metrics(atom()) :: {atom(), term()}
def fetch_node_metrics(node) do
{node, :rpc.call(node, PromEx, :get_metrics, [], 25_000)}
end

@spec fetch_cluster_metrics(String.t()) :: String.t()
def fetch_cluster_metrics(tenant) do
Node.list()
|> Task.async_stream(&fetch_node_metrics(&1, tenant), timeout: :infinity)
|> Enum.reduce(PromEx.get_tenant_metrics(tenant), &merge_node_metrics/2)
end

@spec fetch_node_metrics(atom(), String.t()) :: {atom(), term()}
def fetch_node_metrics(node, tenant) do
{node, :rpc.call(node, PromEx, :get_tenant_metrics, [tenant], 25_000)}
end

def merge_node_metrics({_, {node, {:badrpc, reason}}}, acc) do
Logger.error("Cannot fetch metrics from the node #{inspect(node)} because #{inspect(reason)}")
acc
end

def merge_node_metrics({_, {_node, metrics}}, acc) do
[metrics, "\n" | acc]
end
end
Loading
Loading