|
|
|
|
@ -16,12 +16,15 @@ |
|
|
|
|
package collector |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"encoding/binary" |
|
|
|
|
"fmt" |
|
|
|
|
"log/slog" |
|
|
|
|
"os" |
|
|
|
|
"strconv" |
|
|
|
|
"syscall" |
|
|
|
|
"unsafe" |
|
|
|
|
|
|
|
|
|
"github.com/alecthomas/kingpin/v2" |
|
|
|
|
"github.com/mdlayher/netlink" |
|
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
|
) |
|
|
|
|
@ -57,6 +60,11 @@ const ( |
|
|
|
|
tcpTxQueuedBytes |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
tcpstatSourcePorts = kingpin.Flag("collector.tcpstat.port.source", "List of tcpstat source ports").Strings() |
|
|
|
|
tcpstatDestPorts = kingpin.Flag("collector.tcpstat.port.dest", "List of tcpstat destination ports").Strings() |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type tcpStatCollector struct { |
|
|
|
|
desc typedDesc |
|
|
|
|
logger *slog.Logger |
|
|
|
|
@ -72,7 +80,7 @@ func NewTCPStatCollector(logger *slog.Logger) (Collector, error) { |
|
|
|
|
desc: typedDesc{prometheus.NewDesc( |
|
|
|
|
prometheus.BuildFQName(namespace, "tcp", "connection_states"), |
|
|
|
|
"Number of connection states.", |
|
|
|
|
[]string{"state"}, nil, |
|
|
|
|
[]string{"state", "port", "direction"}, nil, |
|
|
|
|
), prometheus.GaugeValue}, |
|
|
|
|
logger: logger, |
|
|
|
|
}, nil |
|
|
|
|
@ -128,31 +136,97 @@ func parseInetDiagMsg(b []byte) *InetDiagMsg { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c *tcpStatCollector) Update(ch chan<- prometheus.Metric) error { |
|
|
|
|
tcpStats, err := getTCPStats(syscall.AF_INET) |
|
|
|
|
messages, err := getMessagesFromSocket(syscall.AF_INET) |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("couldn't get tcpstats: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// if enabled ipv6 system
|
|
|
|
|
tcpStats, err := parseTCPStats(messages) |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("couldn't parse tcpstats: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if _, hasIPv6 := os.Stat(procFilePath("net/tcp6")); hasIPv6 == nil { |
|
|
|
|
tcp6Stats, err := getTCPStats(syscall.AF_INET6) |
|
|
|
|
messagesIPv6, err := getMessagesFromSocket(syscall.AF_INET6) |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("couldn't get tcp6stats: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
tcp6Stats, err := parseTCPStats(messagesIPv6) |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("couldn't parse tcp6stats: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for st, value := range tcp6Stats { |
|
|
|
|
tcpStats[st] += value |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for st, value := range tcpStats { |
|
|
|
|
ch <- c.desc.mustNewConstMetric(value, st.String()) |
|
|
|
|
messages = append(messages, messagesIPv6...) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
emitTotalTCPStats(c, ch, tcpStats) |
|
|
|
|
emitTCPStatsPerPort(c, ch, messages, *tcpstatSourcePorts, "source", true) |
|
|
|
|
emitTCPStatsPerPort(c, ch, messages, *tcpstatDestPorts, "dest", false) |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func getTCPStats(family uint8) (map[tcpConnectionState]float64, error) { |
|
|
|
|
func emitTotalTCPStats(c *tcpStatCollector, ch chan<- prometheus.Metric, stats map[tcpConnectionState]float64) { |
|
|
|
|
for st, value := range stats { |
|
|
|
|
ch <- c.desc.mustNewConstMetric(value, st.String(), "0", "total") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func emitTCPStatsPerPort( |
|
|
|
|
c *tcpStatCollector, |
|
|
|
|
ch chan<- prometheus.Metric, |
|
|
|
|
messages []netlink.Message, |
|
|
|
|
ports []string, |
|
|
|
|
direction string, |
|
|
|
|
isSource bool, |
|
|
|
|
) { |
|
|
|
|
if len(ports) == 0 { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
portSet := map[string]struct{}{} |
|
|
|
|
for _, p := range ports { |
|
|
|
|
portSet[p] = struct{}{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
counts := map[string]map[string]float64{} |
|
|
|
|
|
|
|
|
|
for _, m := range messages { |
|
|
|
|
msg := parseInetDiagMsg(m.Data) |
|
|
|
|
|
|
|
|
|
state := tcpConnectionState(msg.State).String() |
|
|
|
|
|
|
|
|
|
var rawPort uint16 |
|
|
|
|
if isSource { |
|
|
|
|
rawPort = binary.BigEndian.Uint16(msg.ID.SourcePort[:]) |
|
|
|
|
} else { |
|
|
|
|
rawPort = binary.BigEndian.Uint16(msg.ID.DestPort[:]) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
portStr := strconv.Itoa(int(rawPort)) |
|
|
|
|
|
|
|
|
|
if _, ok := portSet[portStr]; ok { |
|
|
|
|
if _, ok := counts[state]; !ok { |
|
|
|
|
counts[state] = make(map[string]float64) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
counts[state][portStr]++ |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for state, portMap := range counts { |
|
|
|
|
for port, count := range portMap { |
|
|
|
|
ch <- c.desc.mustNewConstMetric(count, state, port, direction) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func getMessagesFromSocket(family uint8) ([]netlink.Message, error) { |
|
|
|
|
const TCPFAll = 0xFFF |
|
|
|
|
const InetDiagInfo = 2 |
|
|
|
|
const SockDiagByFamily = 20 |
|
|
|
|
@ -176,26 +250,20 @@ func getTCPStats(family uint8) (map[tcpConnectionState]float64, error) { |
|
|
|
|
}).Serialize(), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
messages, err := conn.Execute(msg) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return parseTCPStats(messages) |
|
|
|
|
return conn.Execute(msg) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func parseTCPStats(msgs []netlink.Message) (map[tcpConnectionState]float64, error) { |
|
|
|
|
tcpStats := map[tcpConnectionState]float64{} |
|
|
|
|
stats := make(map[tcpConnectionState]float64) |
|
|
|
|
|
|
|
|
|
for _, m := range msgs { |
|
|
|
|
msg := parseInetDiagMsg(m.Data) |
|
|
|
|
|
|
|
|
|
tcpStats[tcpTxQueuedBytes] += float64(msg.WQueue) |
|
|
|
|
tcpStats[tcpRxQueuedBytes] += float64(msg.RQueue) |
|
|
|
|
tcpStats[tcpConnectionState(msg.State)]++ |
|
|
|
|
stats[tcpTxQueuedBytes] += float64(msg.WQueue) |
|
|
|
|
stats[tcpRxQueuedBytes] += float64(msg.RQueue) |
|
|
|
|
stats[tcpConnectionState(msg.State)]++ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return tcpStats, nil |
|
|
|
|
return stats, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (st tcpConnectionState) String() string { |
|
|
|
|
|