| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762 |
- package main
- import (
- "bufio"
- "bytes"
- "context"
- "encoding/base64"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net"
- "net/http"
- "net/url"
- "os"
- "os/exec"
- "path/filepath"
- "regexp"
- "runtime"
- "strconv"
- "strings"
- "time"
- )
- type AgentConfig struct {
- APIBase string
- AgentKey string
- Token string
- PollMS time.Duration
- }
- type HeartbeatResponse struct {
- OK bool `json:"ok"`
- Control struct {
- Enabled bool `json:"enabled"`
- MonitoringEnabled bool `json:"monitoringEnabled"`
- LogIngestEnabled bool `json:"logIngestEnabled"`
- ProactivePushEnabled bool `json:"proactivePushEnabled"`
- HeartbeatIntervalMs int `json:"heartbeatIntervalMs"`
- } `json:"control"`
- }
- type ClaimResponse struct {
- Job *Job `json:"job"`
- }
- type Job struct {
- RunID string `json:"runId"`
- TraceID string `json:"traceId"`
- TargetID string `json:"targetId"`
- TargetName string `json:"targetName"`
- ServiceKey string `json:"serviceKey"`
- Type string `json:"type"`
- Endpoint string `json:"endpoint"`
- Config map[string]any `json:"config"`
- }
- type ResultPayload struct {
- Status string `json:"status"`
- ErrorTag *string `json:"errorTag"`
- Summary string `json:"summary"`
- FailureReason *string `json:"failureReason"`
- Metrics map[string]any `json:"metrics"`
- Details map[string]any `json:"details"`
- LogExcerpt *string `json:"logExcerpt"`
- }
- func main() {
- cfg, err := loadConfig()
- if err != nil {
- fmt.Fprintln(os.Stderr, "load config failed:", err)
- os.Exit(1)
- }
- if err := loop(cfg); err != nil {
- fmt.Fprintln(os.Stderr, "agent loop failed:", err)
- os.Exit(1)
- }
- }
- func loadConfig() (*AgentConfig, error) {
- loadEnvFile("agent-config.env")
- apiBase := strings.TrimRight(envOr("MONITORING_API_BASE_URL", "http://127.0.0.1:3001"), "/")
- agentKey := strings.TrimSpace(envOr("MONITORING_AGENT_KEY", ""))
- token := strings.TrimSpace(envOr("MONITORING_AGENT_TOKEN", ""))
- if agentKey == "" || token == "" {
- return nil, errors.New("MONITORING_AGENT_KEY and MONITORING_AGENT_TOKEN are required")
- }
- pollValue := envOr("MONITORING_AGENT_POLL_MS", "10000")
- pollMS, err := strconv.Atoi(strings.TrimSpace(pollValue))
- if err != nil || pollMS <= 0 {
- pollMS = 10000
- }
- return &AgentConfig{
- APIBase: apiBase,
- AgentKey: agentKey,
- Token: token,
- PollMS: time.Duration(pollMS) * time.Millisecond,
- }, nil
- }
- func loop(cfg *AgentConfig) error {
- client := &http.Client{Timeout: 15 * time.Second}
- sleepFor := cfg.PollMS
- for {
- heartbeat, err := heartbeat(client, cfg)
- if err != nil {
- fmt.Fprintln(os.Stderr, "heartbeat failed:", err)
- time.Sleep(sleepFor)
- continue
- }
- if heartbeat.Control.HeartbeatIntervalMs > 0 {
- sleepFor = time.Duration(heartbeat.Control.HeartbeatIntervalMs) * time.Millisecond
- }
- if !heartbeat.Control.Enabled || !heartbeat.Control.MonitoringEnabled {
- time.Sleep(sleepFor)
- continue
- }
- claim, err := claimJob(client, cfg)
- if err != nil {
- fmt.Fprintln(os.Stderr, "claim job failed:", err)
- time.Sleep(sleepFor)
- continue
- }
- if claim.Job == nil {
- time.Sleep(sleepFor)
- continue
- }
- result := executeJob(claim.Job)
- if err := submitResult(client, cfg, claim.Job.RunID, result); err != nil {
- fmt.Fprintln(os.Stderr, "submit result failed:", err)
- }
- }
- }
- func heartbeat(client *http.Client, cfg *AgentConfig) (*HeartbeatResponse, error) {
- payload := map[string]any{
- "agentKey": cfg.AgentKey,
- "host": hostname(),
- "platform": runtime.GOOS,
- "version": "host-agent-go-0.1",
- "meta": map[string]any{"arch": runtime.GOARCH},
- }
- var response HeartbeatResponse
- if err := doJSON(client, cfg, http.MethodPost, "/api/monitoring/agent/heartbeat", payload, &response); err != nil {
- return nil, err
- }
- return &response, nil
- }
- func claimJob(client *http.Client, cfg *AgentConfig) (*ClaimResponse, error) {
- payload := map[string]any{"agentKey": cfg.AgentKey}
- var response ClaimResponse
- if err := doJSON(client, cfg, http.MethodPost, "/api/monitoring/agent/jobs/claim", payload, &response); err != nil {
- return nil, err
- }
- return &response, nil
- }
- func submitResult(client *http.Client, cfg *AgentConfig, runID string, result ResultPayload) error {
- return doJSON(client, cfg, http.MethodPost, "/api/monitoring/agent/jobs/"+runID+"/result", result, nil)
- }
- func doJSON(client *http.Client, cfg *AgentConfig, method, path string, payload any, out any) error {
- var body io.Reader
- if payload != nil {
- raw, err := json.Marshal(payload)
- if err != nil {
- return err
- }
- body = bytes.NewReader(raw)
- }
- req, err := http.NewRequestWithContext(context.Background(), method, cfg.APIBase+path, body)
- if err != nil {
- return err
- }
- req.Header.Set("Content-Type", "application/json")
- req.Header.Set("x-monitoring-agent-token", cfg.Token)
- resp, err := client.Do(req)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
- data, err := io.ReadAll(resp.Body)
- if err != nil {
- return err
- }
- if resp.StatusCode >= 400 {
- return fmt.Errorf("HTTP_%d: %s", resp.StatusCode, strings.TrimSpace(string(data)))
- }
- if out != nil && len(data) > 0 {
- if err := json.Unmarshal(data, out); err != nil {
- return err
- }
- }
- return nil
- }
- func executeJob(job *Job) ResultPayload {
- switch job.Type {
- case "tcp":
- return executeTCPJob(job)
- case "redis":
- return executeRedisJob(job)
- case "rabbitmq":
- return executeRabbitMQJob(job)
- case "elasticsearch":
- return executeESJob(job)
- case "java-app":
- return executeJavaJob(job)
- default:
- return executeHTTPJob(job)
- }
- }
- func executeHTTPJob(job *Job) ResultPayload {
- client := &http.Client{Timeout: timeoutFor(job.Config, 8*time.Second)}
- req, err := http.NewRequest(http.MethodGet, job.Endpoint, nil)
- if err != nil {
- return failed("HTTP_CHECK_FAILED", "HTTP 检测失败", err.Error(), nil, map[string]any{"endpoint": job.Endpoint})
- }
- applyBasicAuth(req, job.Config)
- resp, err := client.Do(req)
- if err != nil {
- return failed("HTTP_CHECK_FAILED", "HTTP 检测失败", err.Error(), nil, map[string]any{"endpoint": job.Endpoint})
- }
- defer resp.Body.Close()
- body, _ := io.ReadAll(io.LimitReader(resp.Body, 64*1024))
- if resp.StatusCode >= 200 && resp.StatusCode < 400 {
- return success("HTTP 目标检测通过", map[string]any{"statusCode": resp.StatusCode, "responseBytes": len(body)}, map[string]any{"endpoint": job.Endpoint, "bodyPreview": string(body)})
- }
- return failed("HTTP_CHECK_FAILED", "HTTP 目标检测失败", fmt.Sprintf("HTTP_%d", resp.StatusCode), map[string]any{"statusCode": resp.StatusCode}, map[string]any{"endpoint": job.Endpoint, "bodyPreview": string(body)})
- }
- func executeTCPJob(job *Job) ResultPayload {
- host := stringValue(job.Config, "tcpHost", job.Endpoint)
- port := intValue(job.Config, "tcpPort", 0)
- if host == "" || port == 0 {
- return failed("TCP_CONFIG_INVALID", "TCP 检测失败", "tcpHost / tcpPort 未配置", nil, nil)
- }
- timeout := timeoutFor(job.Config, 5*time.Second)
- conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, strconv.Itoa(port)), timeout)
- if err != nil {
- return failed("TCP_CONNECT_FAILED", "TCP 端口不可达", err.Error(), map[string]any{"host": host, "port": port}, nil)
- }
- _ = conn.Close()
- return success("TCP 端口连通", map[string]any{"host": host, "port": port}, nil)
- }
- func executeRedisJob(job *Job) ResultPayload {
- host := stringValue(job.Config, "tcpHost", job.Endpoint)
- port := intValue(job.Config, "tcpPort", 6379)
- timeout := timeoutFor(job.Config, 5*time.Second)
- conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, strconv.Itoa(port)), timeout)
- if err != nil {
- return failed("REDIS_CONNECT_FAILED", "Redis 检测失败", err.Error(), map[string]any{"host": host, "port": port}, nil)
- }
- defer conn.Close()
- _ = conn.SetDeadline(time.Now().Add(timeout))
- if password := stringValue(job.Config, "authPassword", ""); password != "" {
- if _, err := conn.Write([]byte(respCommand("AUTH", password))); err != nil {
- return failed("REDIS_AUTH_FAILED", "Redis 检测失败", err.Error(), nil, nil)
- }
- if _, err := readRedisReply(conn); err != nil {
- return failed("REDIS_AUTH_FAILED", "Redis 检测失败", err.Error(), nil, nil)
- }
- }
- if _, err := conn.Write([]byte(respCommand("PING"))); err != nil {
- return failed("REDIS_PING_FAILED", "Redis 检测失败", err.Error(), nil, nil)
- }
- pingReply, err := readRedisReply(conn)
- if err != nil || !strings.Contains(strings.ToUpper(pingReply), "PONG") {
- return failed("REDIS_PING_FAILED", "Redis 检测失败", safeErr(err, "PING 未返回 PONG"), nil, nil)
- }
- if _, err := conn.Write([]byte(respCommand("INFO"))); err != nil {
- return failed("REDIS_INFO_FAILED", "Redis 检测失败", err.Error(), nil, nil)
- }
- infoText, err := readRedisReply(conn)
- if err != nil {
- return failed("REDIS_INFO_FAILED", "Redis 检测失败", err.Error(), nil, nil)
- }
- info := parseRedisInfo(infoText)
- return success("Redis 检测通过", map[string]any{
- "host": host,
- "port": port,
- "role": info["role"],
- "usedMemory": info["used_memory"],
- "connectedClients": info["connected_clients"],
- }, map[string]any{"infoPreview": trimMap(info, 30)})
- }
- func executeRabbitMQJob(job *Job) ResultPayload {
- client := &http.Client{Timeout: timeoutFor(job.Config, 8*time.Second)}
- overview, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "rabbitmqOverviewPath", "/api/overview"), job.Config)
- if err != nil {
- return failed("RABBITMQ_REQUEST_FAILED", "RabbitMQ 检测失败", err.Error(), nil, nil)
- }
- queues, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "rabbitmqQueuesPath", "/api/queues"), job.Config)
- if err != nil {
- return failed("RABBITMQ_REQUEST_FAILED", "RabbitMQ 检测失败", err.Error(), nil, nil)
- }
- nodes, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "rabbitmqNodesPath", "/api/nodes"), job.Config)
- if err != nil {
- return failed("RABBITMQ_REQUEST_FAILED", "RabbitMQ 检测失败", err.Error(), nil, nil)
- }
- return success("RabbitMQ 管理接口检测通过", map[string]any{
- "queueCount": arrayLen(queues),
- "nodeCount": arrayLen(nodes),
- }, map[string]any{
- "overview": overview,
- "queuePreview": firstItems(queues, 10),
- "nodePreview": firstItems(nodes, 10),
- })
- }
- func executeESJob(job *Job) ResultPayload {
- client := &http.Client{Timeout: timeoutFor(job.Config, 8*time.Second)}
- health, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "elasticsearchHealthPath", "/_cluster/health"), job.Config)
- if err != nil {
- return failed("ES_REQUEST_FAILED", "Elasticsearch 检测失败", err.Error(), nil, nil)
- }
- nodes, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "elasticsearchNodesPath", "/_nodes/stats"), job.Config)
- if err != nil {
- return failed("ES_REQUEST_FAILED", "Elasticsearch 检测失败", err.Error(), nil, nil)
- }
- healthMap, _ := health.(map[string]any)
- clusterStatus := strings.ToLower(fmt.Sprintf("%v", healthMap["status"]))
- ok := clusterStatus == "green" || clusterStatus == "yellow"
- if ok {
- return success("Elasticsearch 集群检测通过", map[string]any{
- "clusterStatus": clusterStatus,
- "nodeCount": healthMap["number_of_nodes"],
- }, map[string]any{"health": health, "nodesPreview": nodes})
- }
- return failed("ES_CLUSTER_UNHEALTHY", "Elasticsearch 集群异常", "cluster status="+clusterStatus, map[string]any{"clusterStatus": clusterStatus}, map[string]any{"health": health})
- }
- func executeJavaJob(job *Job) ResultPayload {
- processMode := strings.ToLower(stringValue(job.Config, "processMatchMode", "command"))
- processValue := strings.TrimSpace(stringValue(job.Config, "processMatchValue", ""))
- if processValue == "" {
- return failed("JAVA_CONFIG_INVALID", "Java 程序监测失败", "processMatchValue 未配置", nil, nil)
- }
- var pid string
- var commandLine string
- if processMode == "pid" {
- pid = processValue
- commandLine = processValue
- } else {
- out, err := runCommand("ps", "-eww", "-o", "pid=,command=")
- if err != nil {
- return failed("PROCESS_DISCOVERY_FAILED", "Java 程序监测失败", err.Error(), nil, nil)
- }
- pid, commandLine = findJavaProcess(out, processValue)
- if pid == "" {
- return failed("JAVA_PROCESS_NOT_FOUND", "Java 程序监测失败", "未找到匹配的 Java 进程", map[string]any{"keyword": processValue}, nil)
- }
- }
- details := map[string]any{
- "process": map[string]any{"pid": pid, "commandLine": commandLine},
- "agentKey": stringValue(job.Config, "agentKey", ""),
- }
- metrics := map[string]any{"pid": pid, "host": hostname()}
- if jcmdPath := firstExistingExecutable(stringValue(job.Config, "jcmdPath", ""), "/usr/bin/jcmd", "/bin/jcmd"); jcmdPath != "" {
- results := map[string]string{}
- for _, cmd := range stringSlice(job.Config, "jcmdCommands", []string{"VM.command_line", "GC.heap_info"}) {
- out, err := runCommand(jcmdPath, pid, cmd)
- results[cmd] = strings.TrimSpace(safeErr(err, out))
- }
- details["jcmd"] = results
- }
- if jstatPath := firstExistingExecutable(stringValue(job.Config, "jstatPath", ""), "/usr/bin/jstat", "/bin/jstat"); jstatPath != "" {
- results := map[string]string{}
- for _, option := range stringSlice(job.Config, "jstatOptions", []string{"-gcutil"}) {
- out, err := runCommand(jstatPath, option, pid)
- results[option] = strings.TrimSpace(safeErr(err, out))
- }
- details["jstat"] = results
- metrics["jstatCollected"] = true
- }
- var logExcerpt *string
- if logPath := stringValue(job.Config, "logFilePath", ""); logPath != "" {
- tailLines := intValue(job.Config, "logTailLines", 120)
- pattern := stringValue(job.Config, "logKeyword", "ERROR|WARN|Exception|Timeout|Refused")
- excerpt := buildLogExcerpt(logPath, tailLines, pattern)
- if excerpt != "" {
- logExcerpt = &excerpt
- }
- }
- return ResultPayload{
- Status: "success",
- ErrorTag: nil,
- Summary: "宿主机 Agent 已完成 Java 程序监测",
- FailureReason: nil,
- Metrics: metrics,
- Details: details,
- LogExcerpt: logExcerpt,
- }
- }
- func fetchJSON(client *http.Client, base, path string, config map[string]any) (any, error) {
- u, err := url.Parse(base)
- if err != nil {
- return nil, err
- }
- relative, err := url.Parse(path)
- if err != nil {
- return nil, err
- }
- target := u.ResolveReference(relative)
- req, err := http.NewRequest(http.MethodGet, target.String(), nil)
- if err != nil {
- return nil, err
- }
- applyBasicAuth(req, config)
- resp, err := client.Do(req)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- if resp.StatusCode >= 400 {
- body, _ := io.ReadAll(io.LimitReader(resp.Body, 16*1024))
- return nil, fmt.Errorf("HTTP_%d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
- }
- var payload any
- if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
- return nil, err
- }
- return payload, nil
- }
- func applyBasicAuth(req *http.Request, config map[string]any) {
- username := stringValue(config, "authUsername", "")
- password := stringValue(config, "authPassword", "")
- if username == "" || password == "" {
- return
- }
- token := base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
- req.Header.Set("Authorization", "Basic "+token)
- }
- func buildLogExcerpt(logPath string, tailLines int, pattern string) string {
- if tailLines <= 0 {
- tailLines = 120
- }
- content, err := os.ReadFile(filepath.Clean(logPath))
- if err != nil {
- return ""
- }
- lines := strings.Split(string(content), "\n")
- start := 0
- if len(lines) > tailLines {
- start = len(lines) - tailLines
- }
- window := lines[start:]
- if pattern == "" {
- return strings.TrimSpace(strings.Join(window, "\n"))
- }
- re, err := regexp.Compile(pattern)
- if err != nil {
- return strings.TrimSpace(strings.Join(window, "\n"))
- }
- filtered := make([]string, 0, len(window))
- for _, line := range window {
- if re.MatchString(line) {
- filtered = append(filtered, line)
- }
- }
- if len(filtered) == 0 {
- return strings.TrimSpace(strings.Join(window, "\n"))
- }
- return strings.TrimSpace(strings.Join(filtered, "\n"))
- }
- func firstExistingExecutable(paths ...string) string {
- for _, candidate := range paths {
- candidate = strings.TrimSpace(candidate)
- if candidate == "" {
- continue
- }
- if _, err := os.Stat(candidate); err == nil {
- return candidate
- }
- }
- return ""
- }
- func findJavaProcess(psOutput, keyword string) (string, string) {
- loweredKeyword := strings.ToLower(strings.TrimSpace(keyword))
- scanner := bufio.NewScanner(strings.NewReader(psOutput))
- for scanner.Scan() {
- line := strings.TrimSpace(scanner.Text())
- loweredLine := strings.ToLower(line)
- if line == "" || !strings.Contains(loweredLine, "java") || !strings.Contains(loweredLine, loweredKeyword) {
- continue
- }
- parts := strings.Fields(line)
- if len(parts) < 2 {
- continue
- }
- return parts[0], strings.TrimSpace(strings.TrimPrefix(line, parts[0]))
- }
- return "", ""
- }
- func runCommand(name string, args ...string) (string, error) {
- cmd := exec.Command(name, args...)
- output, err := cmd.CombinedOutput()
- return string(output), err
- }
- func loadEnvFile(path string) {
- content, err := os.ReadFile(path)
- if err != nil {
- return
- }
- for _, line := range strings.Split(string(content), "\n") {
- trimmed := strings.TrimSpace(line)
- if trimmed == "" || strings.HasPrefix(trimmed, "#") {
- continue
- }
- index := strings.Index(trimmed, "=")
- if index < 0 {
- continue
- }
- key := strings.TrimSpace(trimmed[:index])
- value := strings.TrimSpace(trimmed[index+1:])
- if os.Getenv(key) == "" {
- _ = os.Setenv(key, value)
- }
- }
- }
- func envOr(key, fallback string) string {
- if value := strings.TrimSpace(os.Getenv(key)); value != "" {
- return value
- }
- return fallback
- }
- func stringValue(config map[string]any, key, fallback string) string {
- if config == nil {
- return fallback
- }
- value, ok := config[key]
- if !ok || value == nil {
- return fallback
- }
- return strings.TrimSpace(fmt.Sprintf("%v", value))
- }
- func intValue(config map[string]any, key string, fallback int) int {
- if config == nil {
- return fallback
- }
- value, ok := config[key]
- if !ok || value == nil {
- return fallback
- }
- switch typed := value.(type) {
- case float64:
- return int(typed)
- case int:
- return typed
- case string:
- parsed, err := strconv.Atoi(strings.TrimSpace(typed))
- if err == nil {
- return parsed
- }
- }
- return fallback
- }
- func stringSlice(config map[string]any, key string, fallback []string) []string {
- if config == nil {
- return fallback
- }
- value, ok := config[key]
- if !ok || value == nil {
- return fallback
- }
- switch typed := value.(type) {
- case []any:
- result := make([]string, 0, len(typed))
- for _, item := range typed {
- text := strings.TrimSpace(fmt.Sprintf("%v", item))
- if text != "" {
- result = append(result, text)
- }
- }
- if len(result) > 0 {
- return result
- }
- case []string:
- if len(typed) > 0 {
- return typed
- }
- }
- return fallback
- }
- func timeoutFor(config map[string]any, fallback time.Duration) time.Duration {
- ms := intValue(config, "timeoutMs", 0)
- if ms > 0 {
- return time.Duration(ms) * time.Millisecond
- }
- return fallback
- }
- func respCommand(parts ...string) string {
- var builder strings.Builder
- builder.WriteString("*")
- builder.WriteString(strconv.Itoa(len(parts)))
- builder.WriteString("\r\n")
- for _, part := range parts {
- builder.WriteString("$")
- builder.WriteString(strconv.Itoa(len(part)))
- builder.WriteString("\r\n")
- builder.WriteString(part)
- builder.WriteString("\r\n")
- }
- return builder.String()
- }
- func readRedisReply(conn net.Conn) (string, error) {
- reader := bufio.NewReader(conn)
- prefix, err := reader.ReadByte()
- if err != nil {
- return "", err
- }
- line, err := reader.ReadString('\n')
- if err != nil {
- return "", err
- }
- switch prefix {
- case '+', '-', ':':
- return strings.TrimSpace(line), nil
- case '$':
- size, err := strconv.Atoi(strings.TrimSpace(line))
- if err != nil || size < 0 {
- return "", errors.New("invalid bulk string size")
- }
- buf := make([]byte, size+2)
- if _, err := io.ReadFull(reader, buf); err != nil {
- return "", err
- }
- return string(buf[:size]), nil
- default:
- return strings.TrimSpace(line), nil
- }
- }
- func parseRedisInfo(info string) map[string]any {
- result := map[string]any{}
- for _, line := range strings.Split(info, "\n") {
- line = strings.TrimSpace(line)
- if line == "" || strings.HasPrefix(line, "#") || !strings.Contains(line, ":") {
- continue
- }
- parts := strings.SplitN(line, ":", 2)
- key := parts[0]
- value := parts[1]
- if n, err := strconv.ParseInt(value, 10, 64); err == nil {
- result[key] = n
- } else {
- result[key] = value
- }
- }
- return result
- }
- func trimMap(input map[string]any, limit int) map[string]any {
- output := map[string]any{}
- count := 0
- for key, value := range input {
- output[key] = value
- count++
- if count >= limit {
- break
- }
- }
- return output
- }
- func firstItems(value any, limit int) any {
- items, ok := value.([]any)
- if !ok {
- return value
- }
- if len(items) <= limit {
- return items
- }
- return items[:limit]
- }
- func arrayLen(value any) int {
- items, ok := value.([]any)
- if !ok {
- return 0
- }
- return len(items)
- }
- func hostname() string {
- name, err := os.Hostname()
- if err != nil {
- return "unknown"
- }
- return name
- }
- func success(summary string, metrics, details map[string]any) ResultPayload {
- if metrics == nil {
- metrics = map[string]any{}
- }
- if details == nil {
- details = map[string]any{}
- }
- return ResultPayload{
- Status: "success",
- ErrorTag: nil,
- Summary: summary,
- FailureReason: nil,
- Metrics: metrics,
- Details: details,
- LogExcerpt: nil,
- }
- }
- func failed(tag, summary, reason string, metrics, details map[string]any) ResultPayload {
- if metrics == nil {
- metrics = map[string]any{}
- }
- if details == nil {
- details = map[string]any{}
- }
- return ResultPayload{
- Status: "failed",
- ErrorTag: ptr(tag),
- Summary: summary,
- FailureReason: ptr(reason),
- Metrics: metrics,
- Details: details,
- LogExcerpt: nil,
- }
- }
- func ptr[T any](value T) *T {
- return &value
- }
- func safeErr(err error, fallback string) string {
- if err != nil {
- return err.Error()
- }
- return fallback
- }
|