package main import ( "bufio" "bytes" "context" "encoding/base64" "encoding/json" "errors" "fmt" "io" "net" "net/http" "net/url" "os" "os/exec" "path/filepath" "regexp" "runtime" "strconv" "strings" "time" ) type AgentConfig struct { APIBase string AgentKey string Token string PollMS time.Duration } type HeartbeatResponse struct { OK bool `json:"ok"` Control struct { Enabled bool `json:"enabled"` MonitoringEnabled bool `json:"monitoringEnabled"` LogIngestEnabled bool `json:"logIngestEnabled"` ProactivePushEnabled bool `json:"proactivePushEnabled"` HeartbeatIntervalMs int `json:"heartbeatIntervalMs"` } `json:"control"` } type ClaimResponse struct { Job *Job `json:"job"` } type Job struct { RunID string `json:"runId"` TraceID string `json:"traceId"` TargetID string `json:"targetId"` TargetName string `json:"targetName"` ServiceKey string `json:"serviceKey"` Type string `json:"type"` Endpoint string `json:"endpoint"` Config map[string]any `json:"config"` } type ResultPayload struct { Status string `json:"status"` ErrorTag *string `json:"errorTag"` Summary string `json:"summary"` FailureReason *string `json:"failureReason"` Metrics map[string]any `json:"metrics"` Details map[string]any `json:"details"` LogExcerpt *string `json:"logExcerpt"` } func main() { cfg, err := loadConfig() if err != nil { fmt.Fprintln(os.Stderr, "load config failed:", err) os.Exit(1) } if err := loop(cfg); err != nil { fmt.Fprintln(os.Stderr, "agent loop failed:", err) os.Exit(1) } } func loadConfig() (*AgentConfig, error) { loadEnvFile("agent-config.env") apiBase := strings.TrimRight(envOr("MONITORING_API_BASE_URL", "http://127.0.0.1:3001"), "/") agentKey := strings.TrimSpace(envOr("MONITORING_AGENT_KEY", "")) token := strings.TrimSpace(envOr("MONITORING_AGENT_TOKEN", "")) if agentKey == "" || token == "" { return nil, errors.New("MONITORING_AGENT_KEY and MONITORING_AGENT_TOKEN are required") } pollValue := envOr("MONITORING_AGENT_POLL_MS", "10000") pollMS, err := strconv.Atoi(strings.TrimSpace(pollValue)) if err != nil || pollMS <= 0 { pollMS = 10000 } return &AgentConfig{ APIBase: apiBase, AgentKey: agentKey, Token: token, PollMS: time.Duration(pollMS) * time.Millisecond, }, nil } func loop(cfg *AgentConfig) error { client := &http.Client{Timeout: 15 * time.Second} sleepFor := cfg.PollMS for { heartbeat, err := heartbeat(client, cfg) if err != nil { fmt.Fprintln(os.Stderr, "heartbeat failed:", err) time.Sleep(sleepFor) continue } if heartbeat.Control.HeartbeatIntervalMs > 0 { sleepFor = time.Duration(heartbeat.Control.HeartbeatIntervalMs) * time.Millisecond } if !heartbeat.Control.Enabled || !heartbeat.Control.MonitoringEnabled { time.Sleep(sleepFor) continue } claim, err := claimJob(client, cfg) if err != nil { fmt.Fprintln(os.Stderr, "claim job failed:", err) time.Sleep(sleepFor) continue } if claim.Job == nil { time.Sleep(sleepFor) continue } result := executeJob(claim.Job) if err := submitResult(client, cfg, claim.Job.RunID, result); err != nil { fmt.Fprintln(os.Stderr, "submit result failed:", err) } } } func heartbeat(client *http.Client, cfg *AgentConfig) (*HeartbeatResponse, error) { payload := map[string]any{ "agentKey": cfg.AgentKey, "host": hostname(), "platform": runtime.GOOS, "version": "host-agent-go-0.1", "meta": map[string]any{"arch": runtime.GOARCH}, } var response HeartbeatResponse if err := doJSON(client, cfg, http.MethodPost, "/api/monitoring/agent/heartbeat", payload, &response); err != nil { return nil, err } return &response, nil } func claimJob(client *http.Client, cfg *AgentConfig) (*ClaimResponse, error) { payload := map[string]any{"agentKey": cfg.AgentKey} var response ClaimResponse if err := doJSON(client, cfg, http.MethodPost, "/api/monitoring/agent/jobs/claim", payload, &response); err != nil { return nil, err } return &response, nil } func submitResult(client *http.Client, cfg *AgentConfig, runID string, result ResultPayload) error { return doJSON(client, cfg, http.MethodPost, "/api/monitoring/agent/jobs/"+runID+"/result", result, nil) } func doJSON(client *http.Client, cfg *AgentConfig, method, path string, payload any, out any) error { var body io.Reader if payload != nil { raw, err := json.Marshal(payload) if err != nil { return err } body = bytes.NewReader(raw) } req, err := http.NewRequestWithContext(context.Background(), method, cfg.APIBase+path, body) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.Header.Set("x-monitoring-agent-token", cfg.Token) resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() data, err := io.ReadAll(resp.Body) if err != nil { return err } if resp.StatusCode >= 400 { return fmt.Errorf("HTTP_%d: %s", resp.StatusCode, strings.TrimSpace(string(data))) } if out != nil && len(data) > 0 { if err := json.Unmarshal(data, out); err != nil { return err } } return nil } func executeJob(job *Job) ResultPayload { switch job.Type { case "tcp": return executeTCPJob(job) case "redis": return executeRedisJob(job) case "rabbitmq": return executeRabbitMQJob(job) case "elasticsearch": return executeESJob(job) case "java-app": return executeJavaJob(job) default: return executeHTTPJob(job) } } func executeHTTPJob(job *Job) ResultPayload { client := &http.Client{Timeout: timeoutFor(job.Config, 8*time.Second)} req, err := http.NewRequest(http.MethodGet, job.Endpoint, nil) if err != nil { return failed("HTTP_CHECK_FAILED", "HTTP 检测失败", err.Error(), nil, map[string]any{"endpoint": job.Endpoint}) } applyBasicAuth(req, job.Config) resp, err := client.Do(req) if err != nil { return failed("HTTP_CHECK_FAILED", "HTTP 检测失败", err.Error(), nil, map[string]any{"endpoint": job.Endpoint}) } defer resp.Body.Close() body, _ := io.ReadAll(io.LimitReader(resp.Body, 64*1024)) if resp.StatusCode >= 200 && resp.StatusCode < 400 { return success("HTTP 目标检测通过", map[string]any{"statusCode": resp.StatusCode, "responseBytes": len(body)}, map[string]any{"endpoint": job.Endpoint, "bodyPreview": string(body)}) } return failed("HTTP_CHECK_FAILED", "HTTP 目标检测失败", fmt.Sprintf("HTTP_%d", resp.StatusCode), map[string]any{"statusCode": resp.StatusCode}, map[string]any{"endpoint": job.Endpoint, "bodyPreview": string(body)}) } func executeTCPJob(job *Job) ResultPayload { host := stringValue(job.Config, "tcpHost", job.Endpoint) port := intValue(job.Config, "tcpPort", 0) if host == "" || port == 0 { return failed("TCP_CONFIG_INVALID", "TCP 检测失败", "tcpHost / tcpPort 未配置", nil, nil) } timeout := timeoutFor(job.Config, 5*time.Second) conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, strconv.Itoa(port)), timeout) if err != nil { return failed("TCP_CONNECT_FAILED", "TCP 端口不可达", err.Error(), map[string]any{"host": host, "port": port}, nil) } _ = conn.Close() return success("TCP 端口连通", map[string]any{"host": host, "port": port}, nil) } func executeRedisJob(job *Job) ResultPayload { host := stringValue(job.Config, "tcpHost", job.Endpoint) port := intValue(job.Config, "tcpPort", 6379) timeout := timeoutFor(job.Config, 5*time.Second) conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, strconv.Itoa(port)), timeout) if err != nil { return failed("REDIS_CONNECT_FAILED", "Redis 检测失败", err.Error(), map[string]any{"host": host, "port": port}, nil) } defer conn.Close() _ = conn.SetDeadline(time.Now().Add(timeout)) if password := stringValue(job.Config, "authPassword", ""); password != "" { if _, err := conn.Write([]byte(respCommand("AUTH", password))); err != nil { return failed("REDIS_AUTH_FAILED", "Redis 检测失败", err.Error(), nil, nil) } if _, err := readRedisReply(conn); err != nil { return failed("REDIS_AUTH_FAILED", "Redis 检测失败", err.Error(), nil, nil) } } if _, err := conn.Write([]byte(respCommand("PING"))); err != nil { return failed("REDIS_PING_FAILED", "Redis 检测失败", err.Error(), nil, nil) } pingReply, err := readRedisReply(conn) if err != nil || !strings.Contains(strings.ToUpper(pingReply), "PONG") { return failed("REDIS_PING_FAILED", "Redis 检测失败", safeErr(err, "PING 未返回 PONG"), nil, nil) } if _, err := conn.Write([]byte(respCommand("INFO"))); err != nil { return failed("REDIS_INFO_FAILED", "Redis 检测失败", err.Error(), nil, nil) } infoText, err := readRedisReply(conn) if err != nil { return failed("REDIS_INFO_FAILED", "Redis 检测失败", err.Error(), nil, nil) } info := parseRedisInfo(infoText) return success("Redis 检测通过", map[string]any{ "host": host, "port": port, "role": info["role"], "usedMemory": info["used_memory"], "connectedClients": info["connected_clients"], }, map[string]any{"infoPreview": trimMap(info, 30)}) } func executeRabbitMQJob(job *Job) ResultPayload { client := &http.Client{Timeout: timeoutFor(job.Config, 8*time.Second)} overview, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "rabbitmqOverviewPath", "/api/overview"), job.Config) if err != nil { return failed("RABBITMQ_REQUEST_FAILED", "RabbitMQ 检测失败", err.Error(), nil, nil) } queues, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "rabbitmqQueuesPath", "/api/queues"), job.Config) if err != nil { return failed("RABBITMQ_REQUEST_FAILED", "RabbitMQ 检测失败", err.Error(), nil, nil) } nodes, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "rabbitmqNodesPath", "/api/nodes"), job.Config) if err != nil { return failed("RABBITMQ_REQUEST_FAILED", "RabbitMQ 检测失败", err.Error(), nil, nil) } return success("RabbitMQ 管理接口检测通过", map[string]any{ "queueCount": arrayLen(queues), "nodeCount": arrayLen(nodes), }, map[string]any{ "overview": overview, "queuePreview": firstItems(queues, 10), "nodePreview": firstItems(nodes, 10), }) } func executeESJob(job *Job) ResultPayload { client := &http.Client{Timeout: timeoutFor(job.Config, 8*time.Second)} health, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "elasticsearchHealthPath", "/_cluster/health"), job.Config) if err != nil { return failed("ES_REQUEST_FAILED", "Elasticsearch 检测失败", err.Error(), nil, nil) } nodes, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "elasticsearchNodesPath", "/_nodes/stats"), job.Config) if err != nil { return failed("ES_REQUEST_FAILED", "Elasticsearch 检测失败", err.Error(), nil, nil) } healthMap, _ := health.(map[string]any) clusterStatus := strings.ToLower(fmt.Sprintf("%v", healthMap["status"])) ok := clusterStatus == "green" || clusterStatus == "yellow" if ok { return success("Elasticsearch 集群检测通过", map[string]any{ "clusterStatus": clusterStatus, "nodeCount": healthMap["number_of_nodes"], }, map[string]any{"health": health, "nodesPreview": nodes}) } return failed("ES_CLUSTER_UNHEALTHY", "Elasticsearch 集群异常", "cluster status="+clusterStatus, map[string]any{"clusterStatus": clusterStatus}, map[string]any{"health": health}) } func executeJavaJob(job *Job) ResultPayload { processMode := strings.ToLower(stringValue(job.Config, "processMatchMode", "command")) processValue := strings.TrimSpace(stringValue(job.Config, "processMatchValue", "")) if processValue == "" { return failed("JAVA_CONFIG_INVALID", "Java 程序监测失败", "processMatchValue 未配置", nil, nil) } var pid string var commandLine string if processMode == "pid" { pid = processValue commandLine = processValue } else { out, err := runCommand("ps", "-eww", "-o", "pid=,command=") if err != nil { return failed("PROCESS_DISCOVERY_FAILED", "Java 程序监测失败", err.Error(), nil, nil) } pid, commandLine = findJavaProcess(out, processValue) if pid == "" { return failed("JAVA_PROCESS_NOT_FOUND", "Java 程序监测失败", "未找到匹配的 Java 进程", map[string]any{"keyword": processValue}, nil) } } details := map[string]any{ "process": map[string]any{"pid": pid, "commandLine": commandLine}, "agentKey": stringValue(job.Config, "agentKey", ""), } metrics := map[string]any{"pid": pid, "host": hostname()} if jcmdPath := firstExistingExecutable(stringValue(job.Config, "jcmdPath", ""), "/usr/bin/jcmd", "/bin/jcmd"); jcmdPath != "" { results := map[string]string{} for _, cmd := range stringSlice(job.Config, "jcmdCommands", []string{"VM.command_line", "GC.heap_info"}) { out, err := runCommand(jcmdPath, pid, cmd) results[cmd] = strings.TrimSpace(safeErr(err, out)) } details["jcmd"] = results } if jstatPath := firstExistingExecutable(stringValue(job.Config, "jstatPath", ""), "/usr/bin/jstat", "/bin/jstat"); jstatPath != "" { results := map[string]string{} for _, option := range stringSlice(job.Config, "jstatOptions", []string{"-gcutil"}) { out, err := runCommand(jstatPath, option, pid) results[option] = strings.TrimSpace(safeErr(err, out)) } details["jstat"] = results metrics["jstatCollected"] = true } var logExcerpt *string if logPath := stringValue(job.Config, "logFilePath", ""); logPath != "" { tailLines := intValue(job.Config, "logTailLines", 120) pattern := stringValue(job.Config, "logKeyword", "ERROR|WARN|Exception|Timeout|Refused") excerpt := buildLogExcerpt(logPath, tailLines, pattern) if excerpt != "" { logExcerpt = &excerpt } } return ResultPayload{ Status: "success", ErrorTag: nil, Summary: "宿主机 Agent 已完成 Java 程序监测", FailureReason: nil, Metrics: metrics, Details: details, LogExcerpt: logExcerpt, } } func fetchJSON(client *http.Client, base, path string, config map[string]any) (any, error) { u, err := url.Parse(base) if err != nil { return nil, err } relative, err := url.Parse(path) if err != nil { return nil, err } target := u.ResolveReference(relative) req, err := http.NewRequest(http.MethodGet, target.String(), nil) if err != nil { return nil, err } applyBasicAuth(req, config) resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode >= 400 { body, _ := io.ReadAll(io.LimitReader(resp.Body, 16*1024)) return nil, fmt.Errorf("HTTP_%d: %s", resp.StatusCode, strings.TrimSpace(string(body))) } var payload any if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { return nil, err } return payload, nil } func applyBasicAuth(req *http.Request, config map[string]any) { username := stringValue(config, "authUsername", "") password := stringValue(config, "authPassword", "") if username == "" || password == "" { return } token := base64.StdEncoding.EncodeToString([]byte(username + ":" + password)) req.Header.Set("Authorization", "Basic "+token) } func buildLogExcerpt(logPath string, tailLines int, pattern string) string { if tailLines <= 0 { tailLines = 120 } content, err := os.ReadFile(filepath.Clean(logPath)) if err != nil { return "" } lines := strings.Split(string(content), "\n") start := 0 if len(lines) > tailLines { start = len(lines) - tailLines } window := lines[start:] if pattern == "" { return strings.TrimSpace(strings.Join(window, "\n")) } re, err := regexp.Compile(pattern) if err != nil { return strings.TrimSpace(strings.Join(window, "\n")) } filtered := make([]string, 0, len(window)) for _, line := range window { if re.MatchString(line) { filtered = append(filtered, line) } } if len(filtered) == 0 { return strings.TrimSpace(strings.Join(window, "\n")) } return strings.TrimSpace(strings.Join(filtered, "\n")) } func firstExistingExecutable(paths ...string) string { for _, candidate := range paths { candidate = strings.TrimSpace(candidate) if candidate == "" { continue } if _, err := os.Stat(candidate); err == nil { return candidate } } return "" } func findJavaProcess(psOutput, keyword string) (string, string) { loweredKeyword := strings.ToLower(strings.TrimSpace(keyword)) scanner := bufio.NewScanner(strings.NewReader(psOutput)) for scanner.Scan() { line := strings.TrimSpace(scanner.Text()) loweredLine := strings.ToLower(line) if line == "" || !strings.Contains(loweredLine, "java") || !strings.Contains(loweredLine, loweredKeyword) { continue } parts := strings.Fields(line) if len(parts) < 2 { continue } return parts[0], strings.TrimSpace(strings.TrimPrefix(line, parts[0])) } return "", "" } func runCommand(name string, args ...string) (string, error) { cmd := exec.Command(name, args...) output, err := cmd.CombinedOutput() return string(output), err } func loadEnvFile(path string) { content, err := os.ReadFile(path) if err != nil { return } for _, line := range strings.Split(string(content), "\n") { trimmed := strings.TrimSpace(line) if trimmed == "" || strings.HasPrefix(trimmed, "#") { continue } index := strings.Index(trimmed, "=") if index < 0 { continue } key := strings.TrimSpace(trimmed[:index]) value := strings.TrimSpace(trimmed[index+1:]) if os.Getenv(key) == "" { _ = os.Setenv(key, value) } } } func envOr(key, fallback string) string { if value := strings.TrimSpace(os.Getenv(key)); value != "" { return value } return fallback } func stringValue(config map[string]any, key, fallback string) string { if config == nil { return fallback } value, ok := config[key] if !ok || value == nil { return fallback } return strings.TrimSpace(fmt.Sprintf("%v", value)) } func intValue(config map[string]any, key string, fallback int) int { if config == nil { return fallback } value, ok := config[key] if !ok || value == nil { return fallback } switch typed := value.(type) { case float64: return int(typed) case int: return typed case string: parsed, err := strconv.Atoi(strings.TrimSpace(typed)) if err == nil { return parsed } } return fallback } func stringSlice(config map[string]any, key string, fallback []string) []string { if config == nil { return fallback } value, ok := config[key] if !ok || value == nil { return fallback } switch typed := value.(type) { case []any: result := make([]string, 0, len(typed)) for _, item := range typed { text := strings.TrimSpace(fmt.Sprintf("%v", item)) if text != "" { result = append(result, text) } } if len(result) > 0 { return result } case []string: if len(typed) > 0 { return typed } } return fallback } func timeoutFor(config map[string]any, fallback time.Duration) time.Duration { ms := intValue(config, "timeoutMs", 0) if ms > 0 { return time.Duration(ms) * time.Millisecond } return fallback } func respCommand(parts ...string) string { var builder strings.Builder builder.WriteString("*") builder.WriteString(strconv.Itoa(len(parts))) builder.WriteString("\r\n") for _, part := range parts { builder.WriteString("$") builder.WriteString(strconv.Itoa(len(part))) builder.WriteString("\r\n") builder.WriteString(part) builder.WriteString("\r\n") } return builder.String() } func readRedisReply(conn net.Conn) (string, error) { reader := bufio.NewReader(conn) prefix, err := reader.ReadByte() if err != nil { return "", err } line, err := reader.ReadString('\n') if err != nil { return "", err } switch prefix { case '+', '-', ':': return strings.TrimSpace(line), nil case '$': size, err := strconv.Atoi(strings.TrimSpace(line)) if err != nil || size < 0 { return "", errors.New("invalid bulk string size") } buf := make([]byte, size+2) if _, err := io.ReadFull(reader, buf); err != nil { return "", err } return string(buf[:size]), nil default: return strings.TrimSpace(line), nil } } func parseRedisInfo(info string) map[string]any { result := map[string]any{} for _, line := range strings.Split(info, "\n") { line = strings.TrimSpace(line) if line == "" || strings.HasPrefix(line, "#") || !strings.Contains(line, ":") { continue } parts := strings.SplitN(line, ":", 2) key := parts[0] value := parts[1] if n, err := strconv.ParseInt(value, 10, 64); err == nil { result[key] = n } else { result[key] = value } } return result } func trimMap(input map[string]any, limit int) map[string]any { output := map[string]any{} count := 0 for key, value := range input { output[key] = value count++ if count >= limit { break } } return output } func firstItems(value any, limit int) any { items, ok := value.([]any) if !ok { return value } if len(items) <= limit { return items } return items[:limit] } func arrayLen(value any) int { items, ok := value.([]any) if !ok { return 0 } return len(items) } func hostname() string { name, err := os.Hostname() if err != nil { return "unknown" } return name } func success(summary string, metrics, details map[string]any) ResultPayload { if metrics == nil { metrics = map[string]any{} } if details == nil { details = map[string]any{} } return ResultPayload{ Status: "success", ErrorTag: nil, Summary: summary, FailureReason: nil, Metrics: metrics, Details: details, LogExcerpt: nil, } } func failed(tag, summary, reason string, metrics, details map[string]any) ResultPayload { if metrics == nil { metrics = map[string]any{} } if details == nil { details = map[string]any{} } return ResultPayload{ Status: "failed", ErrorTag: ptr(tag), Summary: summary, FailureReason: ptr(reason), Metrics: metrics, Details: details, LogExcerpt: nil, } } func ptr[T any](value T) *T { return &value } func safeErr(err error, fallback string) string { if err != nil { return err.Error() } return fallback }