main.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/base64"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net"
  12. "net/http"
  13. "net/url"
  14. "os"
  15. "os/exec"
  16. "path/filepath"
  17. "regexp"
  18. "runtime"
  19. "strconv"
  20. "strings"
  21. "time"
  22. )
  23. type AgentConfig struct {
  24. APIBase string
  25. AgentKey string
  26. Token string
  27. PollMS time.Duration
  28. }
  29. type HeartbeatResponse struct {
  30. OK bool `json:"ok"`
  31. Control struct {
  32. Enabled bool `json:"enabled"`
  33. MonitoringEnabled bool `json:"monitoringEnabled"`
  34. LogIngestEnabled bool `json:"logIngestEnabled"`
  35. ProactivePushEnabled bool `json:"proactivePushEnabled"`
  36. HeartbeatIntervalMs int `json:"heartbeatIntervalMs"`
  37. } `json:"control"`
  38. }
  39. type ClaimResponse struct {
  40. Job *Job `json:"job"`
  41. }
  42. type Job struct {
  43. RunID string `json:"runId"`
  44. TraceID string `json:"traceId"`
  45. TargetID string `json:"targetId"`
  46. TargetName string `json:"targetName"`
  47. ServiceKey string `json:"serviceKey"`
  48. Type string `json:"type"`
  49. Endpoint string `json:"endpoint"`
  50. Config map[string]any `json:"config"`
  51. }
  52. type ResultPayload struct {
  53. Status string `json:"status"`
  54. ErrorTag *string `json:"errorTag"`
  55. Summary string `json:"summary"`
  56. FailureReason *string `json:"failureReason"`
  57. Metrics map[string]any `json:"metrics"`
  58. Details map[string]any `json:"details"`
  59. LogExcerpt *string `json:"logExcerpt"`
  60. }
  61. func main() {
  62. cfg, err := loadConfig()
  63. if err != nil {
  64. fmt.Fprintln(os.Stderr, "load config failed:", err)
  65. os.Exit(1)
  66. }
  67. if err := loop(cfg); err != nil {
  68. fmt.Fprintln(os.Stderr, "agent loop failed:", err)
  69. os.Exit(1)
  70. }
  71. }
  72. func loadConfig() (*AgentConfig, error) {
  73. loadEnvFile("agent-config.env")
  74. apiBase := strings.TrimRight(envOr("MONITORING_API_BASE_URL", "http://127.0.0.1:3001"), "/")
  75. agentKey := strings.TrimSpace(envOr("MONITORING_AGENT_KEY", ""))
  76. token := strings.TrimSpace(envOr("MONITORING_AGENT_TOKEN", ""))
  77. if agentKey == "" || token == "" {
  78. return nil, errors.New("MONITORING_AGENT_KEY and MONITORING_AGENT_TOKEN are required")
  79. }
  80. pollValue := envOr("MONITORING_AGENT_POLL_MS", "10000")
  81. pollMS, err := strconv.Atoi(strings.TrimSpace(pollValue))
  82. if err != nil || pollMS <= 0 {
  83. pollMS = 10000
  84. }
  85. return &AgentConfig{
  86. APIBase: apiBase,
  87. AgentKey: agentKey,
  88. Token: token,
  89. PollMS: time.Duration(pollMS) * time.Millisecond,
  90. }, nil
  91. }
  92. func loop(cfg *AgentConfig) error {
  93. client := &http.Client{Timeout: 15 * time.Second}
  94. sleepFor := cfg.PollMS
  95. for {
  96. heartbeat, err := heartbeat(client, cfg)
  97. if err != nil {
  98. fmt.Fprintln(os.Stderr, "heartbeat failed:", err)
  99. time.Sleep(sleepFor)
  100. continue
  101. }
  102. if heartbeat.Control.HeartbeatIntervalMs > 0 {
  103. sleepFor = time.Duration(heartbeat.Control.HeartbeatIntervalMs) * time.Millisecond
  104. }
  105. if !heartbeat.Control.Enabled || !heartbeat.Control.MonitoringEnabled {
  106. time.Sleep(sleepFor)
  107. continue
  108. }
  109. claim, err := claimJob(client, cfg)
  110. if err != nil {
  111. fmt.Fprintln(os.Stderr, "claim job failed:", err)
  112. time.Sleep(sleepFor)
  113. continue
  114. }
  115. if claim.Job == nil {
  116. time.Sleep(sleepFor)
  117. continue
  118. }
  119. result := executeJob(claim.Job)
  120. if err := submitResult(client, cfg, claim.Job.RunID, result); err != nil {
  121. fmt.Fprintln(os.Stderr, "submit result failed:", err)
  122. }
  123. }
  124. }
  125. func heartbeat(client *http.Client, cfg *AgentConfig) (*HeartbeatResponse, error) {
  126. payload := map[string]any{
  127. "agentKey": cfg.AgentKey,
  128. "host": hostname(),
  129. "platform": runtime.GOOS,
  130. "version": "host-agent-go-0.1",
  131. "meta": map[string]any{"arch": runtime.GOARCH},
  132. }
  133. var response HeartbeatResponse
  134. if err := doJSON(client, cfg, http.MethodPost, "/api/monitoring/agent/heartbeat", payload, &response); err != nil {
  135. return nil, err
  136. }
  137. return &response, nil
  138. }
  139. func claimJob(client *http.Client, cfg *AgentConfig) (*ClaimResponse, error) {
  140. payload := map[string]any{"agentKey": cfg.AgentKey}
  141. var response ClaimResponse
  142. if err := doJSON(client, cfg, http.MethodPost, "/api/monitoring/agent/jobs/claim", payload, &response); err != nil {
  143. return nil, err
  144. }
  145. return &response, nil
  146. }
  147. func submitResult(client *http.Client, cfg *AgentConfig, runID string, result ResultPayload) error {
  148. return doJSON(client, cfg, http.MethodPost, "/api/monitoring/agent/jobs/"+runID+"/result", result, nil)
  149. }
  150. func doJSON(client *http.Client, cfg *AgentConfig, method, path string, payload any, out any) error {
  151. var body io.Reader
  152. if payload != nil {
  153. raw, err := json.Marshal(payload)
  154. if err != nil {
  155. return err
  156. }
  157. body = bytes.NewReader(raw)
  158. }
  159. req, err := http.NewRequestWithContext(context.Background(), method, cfg.APIBase+path, body)
  160. if err != nil {
  161. return err
  162. }
  163. req.Header.Set("Content-Type", "application/json")
  164. req.Header.Set("x-monitoring-agent-token", cfg.Token)
  165. resp, err := client.Do(req)
  166. if err != nil {
  167. return err
  168. }
  169. defer resp.Body.Close()
  170. data, err := io.ReadAll(resp.Body)
  171. if err != nil {
  172. return err
  173. }
  174. if resp.StatusCode >= 400 {
  175. return fmt.Errorf("HTTP_%d: %s", resp.StatusCode, strings.TrimSpace(string(data)))
  176. }
  177. if out != nil && len(data) > 0 {
  178. if err := json.Unmarshal(data, out); err != nil {
  179. return err
  180. }
  181. }
  182. return nil
  183. }
  184. func executeJob(job *Job) ResultPayload {
  185. switch job.Type {
  186. case "tcp":
  187. return executeTCPJob(job)
  188. case "redis":
  189. return executeRedisJob(job)
  190. case "rabbitmq":
  191. return executeRabbitMQJob(job)
  192. case "elasticsearch":
  193. return executeESJob(job)
  194. case "java-app":
  195. return executeJavaJob(job)
  196. default:
  197. return executeHTTPJob(job)
  198. }
  199. }
  200. func executeHTTPJob(job *Job) ResultPayload {
  201. client := &http.Client{Timeout: timeoutFor(job.Config, 8*time.Second)}
  202. req, err := http.NewRequest(http.MethodGet, job.Endpoint, nil)
  203. if err != nil {
  204. return failed("HTTP_CHECK_FAILED", "HTTP 检测失败", err.Error(), nil, map[string]any{"endpoint": job.Endpoint})
  205. }
  206. applyBasicAuth(req, job.Config)
  207. resp, err := client.Do(req)
  208. if err != nil {
  209. return failed("HTTP_CHECK_FAILED", "HTTP 检测失败", err.Error(), nil, map[string]any{"endpoint": job.Endpoint})
  210. }
  211. defer resp.Body.Close()
  212. body, _ := io.ReadAll(io.LimitReader(resp.Body, 64*1024))
  213. if resp.StatusCode >= 200 && resp.StatusCode < 400 {
  214. return success("HTTP 目标检测通过", map[string]any{"statusCode": resp.StatusCode, "responseBytes": len(body)}, map[string]any{"endpoint": job.Endpoint, "bodyPreview": string(body)})
  215. }
  216. return failed("HTTP_CHECK_FAILED", "HTTP 目标检测失败", fmt.Sprintf("HTTP_%d", resp.StatusCode), map[string]any{"statusCode": resp.StatusCode}, map[string]any{"endpoint": job.Endpoint, "bodyPreview": string(body)})
  217. }
  218. func executeTCPJob(job *Job) ResultPayload {
  219. host := stringValue(job.Config, "tcpHost", job.Endpoint)
  220. port := intValue(job.Config, "tcpPort", 0)
  221. if host == "" || port == 0 {
  222. return failed("TCP_CONFIG_INVALID", "TCP 检测失败", "tcpHost / tcpPort 未配置", nil, nil)
  223. }
  224. timeout := timeoutFor(job.Config, 5*time.Second)
  225. conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, strconv.Itoa(port)), timeout)
  226. if err != nil {
  227. return failed("TCP_CONNECT_FAILED", "TCP 端口不可达", err.Error(), map[string]any{"host": host, "port": port}, nil)
  228. }
  229. _ = conn.Close()
  230. return success("TCP 端口连通", map[string]any{"host": host, "port": port}, nil)
  231. }
  232. func executeRedisJob(job *Job) ResultPayload {
  233. host := stringValue(job.Config, "tcpHost", job.Endpoint)
  234. port := intValue(job.Config, "tcpPort", 6379)
  235. timeout := timeoutFor(job.Config, 5*time.Second)
  236. conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, strconv.Itoa(port)), timeout)
  237. if err != nil {
  238. return failed("REDIS_CONNECT_FAILED", "Redis 检测失败", err.Error(), map[string]any{"host": host, "port": port}, nil)
  239. }
  240. defer conn.Close()
  241. _ = conn.SetDeadline(time.Now().Add(timeout))
  242. if password := stringValue(job.Config, "authPassword", ""); password != "" {
  243. if _, err := conn.Write([]byte(respCommand("AUTH", password))); err != nil {
  244. return failed("REDIS_AUTH_FAILED", "Redis 检测失败", err.Error(), nil, nil)
  245. }
  246. if _, err := readRedisReply(conn); err != nil {
  247. return failed("REDIS_AUTH_FAILED", "Redis 检测失败", err.Error(), nil, nil)
  248. }
  249. }
  250. if _, err := conn.Write([]byte(respCommand("PING"))); err != nil {
  251. return failed("REDIS_PING_FAILED", "Redis 检测失败", err.Error(), nil, nil)
  252. }
  253. pingReply, err := readRedisReply(conn)
  254. if err != nil || !strings.Contains(strings.ToUpper(pingReply), "PONG") {
  255. return failed("REDIS_PING_FAILED", "Redis 检测失败", safeErr(err, "PING 未返回 PONG"), nil, nil)
  256. }
  257. if _, err := conn.Write([]byte(respCommand("INFO"))); err != nil {
  258. return failed("REDIS_INFO_FAILED", "Redis 检测失败", err.Error(), nil, nil)
  259. }
  260. infoText, err := readRedisReply(conn)
  261. if err != nil {
  262. return failed("REDIS_INFO_FAILED", "Redis 检测失败", err.Error(), nil, nil)
  263. }
  264. info := parseRedisInfo(infoText)
  265. return success("Redis 检测通过", map[string]any{
  266. "host": host,
  267. "port": port,
  268. "role": info["role"],
  269. "usedMemory": info["used_memory"],
  270. "connectedClients": info["connected_clients"],
  271. }, map[string]any{"infoPreview": trimMap(info, 30)})
  272. }
  273. func executeRabbitMQJob(job *Job) ResultPayload {
  274. client := &http.Client{Timeout: timeoutFor(job.Config, 8*time.Second)}
  275. overview, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "rabbitmqOverviewPath", "/api/overview"), job.Config)
  276. if err != nil {
  277. return failed("RABBITMQ_REQUEST_FAILED", "RabbitMQ 检测失败", err.Error(), nil, nil)
  278. }
  279. queues, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "rabbitmqQueuesPath", "/api/queues"), job.Config)
  280. if err != nil {
  281. return failed("RABBITMQ_REQUEST_FAILED", "RabbitMQ 检测失败", err.Error(), nil, nil)
  282. }
  283. nodes, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "rabbitmqNodesPath", "/api/nodes"), job.Config)
  284. if err != nil {
  285. return failed("RABBITMQ_REQUEST_FAILED", "RabbitMQ 检测失败", err.Error(), nil, nil)
  286. }
  287. return success("RabbitMQ 管理接口检测通过", map[string]any{
  288. "queueCount": arrayLen(queues),
  289. "nodeCount": arrayLen(nodes),
  290. }, map[string]any{
  291. "overview": overview,
  292. "queuePreview": firstItems(queues, 10),
  293. "nodePreview": firstItems(nodes, 10),
  294. })
  295. }
  296. func executeESJob(job *Job) ResultPayload {
  297. client := &http.Client{Timeout: timeoutFor(job.Config, 8*time.Second)}
  298. health, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "elasticsearchHealthPath", "/_cluster/health"), job.Config)
  299. if err != nil {
  300. return failed("ES_REQUEST_FAILED", "Elasticsearch 检测失败", err.Error(), nil, nil)
  301. }
  302. nodes, err := fetchJSON(client, job.Endpoint, stringValue(job.Config, "elasticsearchNodesPath", "/_nodes/stats"), job.Config)
  303. if err != nil {
  304. return failed("ES_REQUEST_FAILED", "Elasticsearch 检测失败", err.Error(), nil, nil)
  305. }
  306. healthMap, _ := health.(map[string]any)
  307. clusterStatus := strings.ToLower(fmt.Sprintf("%v", healthMap["status"]))
  308. ok := clusterStatus == "green" || clusterStatus == "yellow"
  309. if ok {
  310. return success("Elasticsearch 集群检测通过", map[string]any{
  311. "clusterStatus": clusterStatus,
  312. "nodeCount": healthMap["number_of_nodes"],
  313. }, map[string]any{"health": health, "nodesPreview": nodes})
  314. }
  315. return failed("ES_CLUSTER_UNHEALTHY", "Elasticsearch 集群异常", "cluster status="+clusterStatus, map[string]any{"clusterStatus": clusterStatus}, map[string]any{"health": health})
  316. }
  317. func executeJavaJob(job *Job) ResultPayload {
  318. processMode := strings.ToLower(stringValue(job.Config, "processMatchMode", "command"))
  319. processValue := strings.TrimSpace(stringValue(job.Config, "processMatchValue", ""))
  320. if processValue == "" {
  321. return failed("JAVA_CONFIG_INVALID", "Java 程序监测失败", "processMatchValue 未配置", nil, nil)
  322. }
  323. var pid string
  324. var commandLine string
  325. if processMode == "pid" {
  326. pid = processValue
  327. commandLine = processValue
  328. } else {
  329. out, err := runCommand("ps", "-eww", "-o", "pid=,command=")
  330. if err != nil {
  331. return failed("PROCESS_DISCOVERY_FAILED", "Java 程序监测失败", err.Error(), nil, nil)
  332. }
  333. pid, commandLine = findJavaProcess(out, processValue)
  334. if pid == "" {
  335. return failed("JAVA_PROCESS_NOT_FOUND", "Java 程序监测失败", "未找到匹配的 Java 进程", map[string]any{"keyword": processValue}, nil)
  336. }
  337. }
  338. details := map[string]any{
  339. "process": map[string]any{"pid": pid, "commandLine": commandLine},
  340. "agentKey": stringValue(job.Config, "agentKey", ""),
  341. }
  342. metrics := map[string]any{"pid": pid, "host": hostname()}
  343. if jcmdPath := firstExistingExecutable(stringValue(job.Config, "jcmdPath", ""), "/usr/bin/jcmd", "/bin/jcmd"); jcmdPath != "" {
  344. results := map[string]string{}
  345. for _, cmd := range stringSlice(job.Config, "jcmdCommands", []string{"VM.command_line", "GC.heap_info"}) {
  346. out, err := runCommand(jcmdPath, pid, cmd)
  347. results[cmd] = strings.TrimSpace(safeErr(err, out))
  348. }
  349. details["jcmd"] = results
  350. }
  351. if jstatPath := firstExistingExecutable(stringValue(job.Config, "jstatPath", ""), "/usr/bin/jstat", "/bin/jstat"); jstatPath != "" {
  352. results := map[string]string{}
  353. for _, option := range stringSlice(job.Config, "jstatOptions", []string{"-gcutil"}) {
  354. out, err := runCommand(jstatPath, option, pid)
  355. results[option] = strings.TrimSpace(safeErr(err, out))
  356. }
  357. details["jstat"] = results
  358. metrics["jstatCollected"] = true
  359. }
  360. var logExcerpt *string
  361. if logPath := stringValue(job.Config, "logFilePath", ""); logPath != "" {
  362. tailLines := intValue(job.Config, "logTailLines", 120)
  363. pattern := stringValue(job.Config, "logKeyword", "ERROR|WARN|Exception|Timeout|Refused")
  364. excerpt := buildLogExcerpt(logPath, tailLines, pattern)
  365. if excerpt != "" {
  366. logExcerpt = &excerpt
  367. }
  368. }
  369. return ResultPayload{
  370. Status: "success",
  371. ErrorTag: nil,
  372. Summary: "宿主机 Agent 已完成 Java 程序监测",
  373. FailureReason: nil,
  374. Metrics: metrics,
  375. Details: details,
  376. LogExcerpt: logExcerpt,
  377. }
  378. }
  379. func fetchJSON(client *http.Client, base, path string, config map[string]any) (any, error) {
  380. u, err := url.Parse(base)
  381. if err != nil {
  382. return nil, err
  383. }
  384. relative, err := url.Parse(path)
  385. if err != nil {
  386. return nil, err
  387. }
  388. target := u.ResolveReference(relative)
  389. req, err := http.NewRequest(http.MethodGet, target.String(), nil)
  390. if err != nil {
  391. return nil, err
  392. }
  393. applyBasicAuth(req, config)
  394. resp, err := client.Do(req)
  395. if err != nil {
  396. return nil, err
  397. }
  398. defer resp.Body.Close()
  399. if resp.StatusCode >= 400 {
  400. body, _ := io.ReadAll(io.LimitReader(resp.Body, 16*1024))
  401. return nil, fmt.Errorf("HTTP_%d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
  402. }
  403. var payload any
  404. if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
  405. return nil, err
  406. }
  407. return payload, nil
  408. }
  409. func applyBasicAuth(req *http.Request, config map[string]any) {
  410. username := stringValue(config, "authUsername", "")
  411. password := stringValue(config, "authPassword", "")
  412. if username == "" || password == "" {
  413. return
  414. }
  415. token := base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
  416. req.Header.Set("Authorization", "Basic "+token)
  417. }
  418. func buildLogExcerpt(logPath string, tailLines int, pattern string) string {
  419. if tailLines <= 0 {
  420. tailLines = 120
  421. }
  422. content, err := os.ReadFile(filepath.Clean(logPath))
  423. if err != nil {
  424. return ""
  425. }
  426. lines := strings.Split(string(content), "\n")
  427. start := 0
  428. if len(lines) > tailLines {
  429. start = len(lines) - tailLines
  430. }
  431. window := lines[start:]
  432. if pattern == "" {
  433. return strings.TrimSpace(strings.Join(window, "\n"))
  434. }
  435. re, err := regexp.Compile(pattern)
  436. if err != nil {
  437. return strings.TrimSpace(strings.Join(window, "\n"))
  438. }
  439. filtered := make([]string, 0, len(window))
  440. for _, line := range window {
  441. if re.MatchString(line) {
  442. filtered = append(filtered, line)
  443. }
  444. }
  445. if len(filtered) == 0 {
  446. return strings.TrimSpace(strings.Join(window, "\n"))
  447. }
  448. return strings.TrimSpace(strings.Join(filtered, "\n"))
  449. }
  450. func firstExistingExecutable(paths ...string) string {
  451. for _, candidate := range paths {
  452. candidate = strings.TrimSpace(candidate)
  453. if candidate == "" {
  454. continue
  455. }
  456. if _, err := os.Stat(candidate); err == nil {
  457. return candidate
  458. }
  459. }
  460. return ""
  461. }
  462. func findJavaProcess(psOutput, keyword string) (string, string) {
  463. loweredKeyword := strings.ToLower(strings.TrimSpace(keyword))
  464. scanner := bufio.NewScanner(strings.NewReader(psOutput))
  465. for scanner.Scan() {
  466. line := strings.TrimSpace(scanner.Text())
  467. loweredLine := strings.ToLower(line)
  468. if line == "" || !strings.Contains(loweredLine, "java") || !strings.Contains(loweredLine, loweredKeyword) {
  469. continue
  470. }
  471. parts := strings.Fields(line)
  472. if len(parts) < 2 {
  473. continue
  474. }
  475. return parts[0], strings.TrimSpace(strings.TrimPrefix(line, parts[0]))
  476. }
  477. return "", ""
  478. }
  479. func runCommand(name string, args ...string) (string, error) {
  480. cmd := exec.Command(name, args...)
  481. output, err := cmd.CombinedOutput()
  482. return string(output), err
  483. }
  484. func loadEnvFile(path string) {
  485. content, err := os.ReadFile(path)
  486. if err != nil {
  487. return
  488. }
  489. for _, line := range strings.Split(string(content), "\n") {
  490. trimmed := strings.TrimSpace(line)
  491. if trimmed == "" || strings.HasPrefix(trimmed, "#") {
  492. continue
  493. }
  494. index := strings.Index(trimmed, "=")
  495. if index < 0 {
  496. continue
  497. }
  498. key := strings.TrimSpace(trimmed[:index])
  499. value := strings.TrimSpace(trimmed[index+1:])
  500. if os.Getenv(key) == "" {
  501. _ = os.Setenv(key, value)
  502. }
  503. }
  504. }
  505. func envOr(key, fallback string) string {
  506. if value := strings.TrimSpace(os.Getenv(key)); value != "" {
  507. return value
  508. }
  509. return fallback
  510. }
  511. func stringValue(config map[string]any, key, fallback string) string {
  512. if config == nil {
  513. return fallback
  514. }
  515. value, ok := config[key]
  516. if !ok || value == nil {
  517. return fallback
  518. }
  519. return strings.TrimSpace(fmt.Sprintf("%v", value))
  520. }
  521. func intValue(config map[string]any, key string, fallback int) int {
  522. if config == nil {
  523. return fallback
  524. }
  525. value, ok := config[key]
  526. if !ok || value == nil {
  527. return fallback
  528. }
  529. switch typed := value.(type) {
  530. case float64:
  531. return int(typed)
  532. case int:
  533. return typed
  534. case string:
  535. parsed, err := strconv.Atoi(strings.TrimSpace(typed))
  536. if err == nil {
  537. return parsed
  538. }
  539. }
  540. return fallback
  541. }
  542. func stringSlice(config map[string]any, key string, fallback []string) []string {
  543. if config == nil {
  544. return fallback
  545. }
  546. value, ok := config[key]
  547. if !ok || value == nil {
  548. return fallback
  549. }
  550. switch typed := value.(type) {
  551. case []any:
  552. result := make([]string, 0, len(typed))
  553. for _, item := range typed {
  554. text := strings.TrimSpace(fmt.Sprintf("%v", item))
  555. if text != "" {
  556. result = append(result, text)
  557. }
  558. }
  559. if len(result) > 0 {
  560. return result
  561. }
  562. case []string:
  563. if len(typed) > 0 {
  564. return typed
  565. }
  566. }
  567. return fallback
  568. }
  569. func timeoutFor(config map[string]any, fallback time.Duration) time.Duration {
  570. ms := intValue(config, "timeoutMs", 0)
  571. if ms > 0 {
  572. return time.Duration(ms) * time.Millisecond
  573. }
  574. return fallback
  575. }
  576. func respCommand(parts ...string) string {
  577. var builder strings.Builder
  578. builder.WriteString("*")
  579. builder.WriteString(strconv.Itoa(len(parts)))
  580. builder.WriteString("\r\n")
  581. for _, part := range parts {
  582. builder.WriteString("$")
  583. builder.WriteString(strconv.Itoa(len(part)))
  584. builder.WriteString("\r\n")
  585. builder.WriteString(part)
  586. builder.WriteString("\r\n")
  587. }
  588. return builder.String()
  589. }
  590. func readRedisReply(conn net.Conn) (string, error) {
  591. reader := bufio.NewReader(conn)
  592. prefix, err := reader.ReadByte()
  593. if err != nil {
  594. return "", err
  595. }
  596. line, err := reader.ReadString('\n')
  597. if err != nil {
  598. return "", err
  599. }
  600. switch prefix {
  601. case '+', '-', ':':
  602. return strings.TrimSpace(line), nil
  603. case '$':
  604. size, err := strconv.Atoi(strings.TrimSpace(line))
  605. if err != nil || size < 0 {
  606. return "", errors.New("invalid bulk string size")
  607. }
  608. buf := make([]byte, size+2)
  609. if _, err := io.ReadFull(reader, buf); err != nil {
  610. return "", err
  611. }
  612. return string(buf[:size]), nil
  613. default:
  614. return strings.TrimSpace(line), nil
  615. }
  616. }
  617. func parseRedisInfo(info string) map[string]any {
  618. result := map[string]any{}
  619. for _, line := range strings.Split(info, "\n") {
  620. line = strings.TrimSpace(line)
  621. if line == "" || strings.HasPrefix(line, "#") || !strings.Contains(line, ":") {
  622. continue
  623. }
  624. parts := strings.SplitN(line, ":", 2)
  625. key := parts[0]
  626. value := parts[1]
  627. if n, err := strconv.ParseInt(value, 10, 64); err == nil {
  628. result[key] = n
  629. } else {
  630. result[key] = value
  631. }
  632. }
  633. return result
  634. }
  635. func trimMap(input map[string]any, limit int) map[string]any {
  636. output := map[string]any{}
  637. count := 0
  638. for key, value := range input {
  639. output[key] = value
  640. count++
  641. if count >= limit {
  642. break
  643. }
  644. }
  645. return output
  646. }
  647. func firstItems(value any, limit int) any {
  648. items, ok := value.([]any)
  649. if !ok {
  650. return value
  651. }
  652. if len(items) <= limit {
  653. return items
  654. }
  655. return items[:limit]
  656. }
  657. func arrayLen(value any) int {
  658. items, ok := value.([]any)
  659. if !ok {
  660. return 0
  661. }
  662. return len(items)
  663. }
  664. func hostname() string {
  665. name, err := os.Hostname()
  666. if err != nil {
  667. return "unknown"
  668. }
  669. return name
  670. }
  671. func success(summary string, metrics, details map[string]any) ResultPayload {
  672. if metrics == nil {
  673. metrics = map[string]any{}
  674. }
  675. if details == nil {
  676. details = map[string]any{}
  677. }
  678. return ResultPayload{
  679. Status: "success",
  680. ErrorTag: nil,
  681. Summary: summary,
  682. FailureReason: nil,
  683. Metrics: metrics,
  684. Details: details,
  685. LogExcerpt: nil,
  686. }
  687. }
  688. func failed(tag, summary, reason string, metrics, details map[string]any) ResultPayload {
  689. if metrics == nil {
  690. metrics = map[string]any{}
  691. }
  692. if details == nil {
  693. details = map[string]any{}
  694. }
  695. return ResultPayload{
  696. Status: "failed",
  697. ErrorTag: ptr(tag),
  698. Summary: summary,
  699. FailureReason: ptr(reason),
  700. Metrics: metrics,
  701. Details: details,
  702. LogExcerpt: nil,
  703. }
  704. }
  705. func ptr[T any](value T) *T {
  706. return &value
  707. }
  708. func safeErr(err error, fallback string) string {
  709. if err != nil {
  710. return err.Error()
  711. }
  712. return fallback
  713. }