2026/1/27 7:28:40
网站建设
项目流程
360网站推广官网网址,番禺人才网官方网站信息公布,中文网站建设计划书,网站建设讲解材料#x1f4cb; 本文概览
学习目标#xff1a;
掌握Prometheus监控系统的核心概念和部署学会定义和采集自定义业务指标实现Grafana可视化仪表盘配置构建智能告警规则和通知系统掌握性能瓶颈分析和优化方法理解监控系统的最佳实践
技术栈#xff1a;
Prometheus#xff08… 本文概览学习目标掌握Prometheus监控系统的核心概念和部署学会定义和采集自定义业务指标实现Grafana可视化仪表盘配置构建智能告警规则和通知系统掌握性能瓶颈分析和优化方法理解监控系统的最佳实践技术栈Prometheus指标采集和存储Grafana可视化仪表盘AlertManager告警管理Python Client指标暴露FastAPI集成监控Docker容器化部署预计阅读时间70分钟前置知识要求了解时间序列数据库概念熟悉Docker容器化部署掌握FastAPI框架理解工作流执行引擎参考第4-5篇了解日志系统设计参考第17篇 业务场景为什么需要性能监控系统在工作流自动化平台中性能监控是保障系统稳定性和用户体验的关键。没有监控就像盲人开车。场景1系统性能下降问题 - 工作流执行时间突然变长 - 用户投诉系统响应慢 - 不知道哪个环节出了问题 传统做法 - 查看日志海量日志难以分析 - 手动测试无法复现问题 - 凭经验猜测效率低下 监控解决方案 - 实时性能指标图表 - 自动异常检测 - 精准定位瓶颈 - 历史趋势分析场景2资源使用异常问题 - 数据库连接池耗尽 - Redis内存占用过高 - Celery队列堆积 - CPU/内存飙升 Prometheus监控 - 连接池使用率85% → 告警 - Redis内存4GB/8GB → 正常 - Celery队列长度1000 → 告警 - CPU使用率90% → 告警场景3业务指标异常需求 - 监控工作流执行成功率 - 追踪节点执行时长 - 统计用户活跃度 - 分析错误类型分布 Grafana仪表盘 ┌─────────────────────────────────────┐ │ 工作流执行成功率97.5% ↓ (昨日98.2%)│ │ 平均执行时长2.3s ↑ (昨日1.8s) │ │ 活跃用户数1,234 ↑ (昨日1,100) │ │ 错误率2.5% ↑ (昨日1.8%) │ └─────────────────────────────────────┘业界解决方案对比方案优势劣势成本适用场景Prometheus Grafana开源免费、生态丰富、灵活强大需要自己运维低中小型项目DataDog功能全面、易用性好、SaaS服务价格昂贵高大型企业New RelicAPM专业、深度追踪价格高、学习曲线陡高企业级应用CloudWatchAWS原生、集成好仅限AWS、功能有限中AWS环境自研监控高度定制开发成本高、维护困难高特殊需求️ 架构设计整体架构图graph TB subgraph 应用层 APP[FastAPI应用] WORKER[Celery Worker] ENGINE[工作流引擎] end subgraph 指标暴露 APP_METRICS[/metrics端点] WORKER_METRICS[Worker指标] CUSTOM_METRICS[自定义指标] end subgraph Prometheus PROM[Prometheus Server] SCRAPE[指标抓取] TSDB[(时序数据库)] RULES[告警规则] end subgraph AlertManager AM[AlertManager] ROUTE[路由规则] NOTIFY[通知渠道] end subgraph Grafana GRAFANA[Grafana Server] DASHBOARD[仪表盘] PANEL[图表面板] end subgraph 通知渠道 EMAIL[邮件] SLACK[Slack] WEBHOOK[Webhook] SMS[短信] end APP -- APP_METRICS WORKER -- WORKER_METRICS ENGINE -- CUSTOM_METRICS APP_METRICS -- SCRAPE WORKER_METRICS -- SCRAPE CUSTOM_METRICS -- SCRAPE SCRAPE -- PROM PROM -- TSDB PROM -- RULES RULES -- AM AM -- ROUTE ROUTE -- NOTIFY NOTIFY -- EMAIL NOTIFY -- SLACK NOTIFY -- WEBHOOK NOTIFY -- SMS TSDB -- GRAFANA GRAFANA -- DASHBOARD DASHBOARD -- PANEL style PROM fill:#E6522C style GRAFANA fill:#F46800 style AM fill:#00D1B2 style TSDB fill:#3B82F6核心模块说明1. 指标采集层应用指标HTTP请求、响应时间、错误率系统指标CPU、内存、磁盘、网络业务指标工作流执行、节点性能、用户行为中间件指标数据库、Redis、Celery2. Prometheus Server指标抓取定期拉取各服务的/metrics端点时序存储高效存储时间序列数据查询引擎PromQL查询语言告警评估实时评估告警规则3. AlertManager告警聚合合并相似告警告警路由根据标签分发告警告警抑制避免告警风暴通知发送多渠道通知4. Grafana数据源管理连接Prometheus仪表盘设计可视化配置图表展示多种图表类型告警集成可视化告警数据流图sequenceDiagram participant App as FastAPI应用 participant Metrics as /metrics端点 participant Prom as Prometheus participant TSDB as 时序数据库 participant Rules as 告警规则 participant AM as AlertManager participant Grafana as Grafana participant User as 用户 App-Metrics: 记录指标 Note over App,Metrics: Counter, Gauge, Histogram loop 每15秒 Prom-Metrics: 抓取指标 Metrics--Prom: 返回指标数据 Prom-TSDB: 存储时序数据 end Prom-Rules: 评估告警规则 alt 触发告警 Rules-AM: 发送告警 AM-AM: 聚合/路由/抑制 AM-User: 发送通知 end User-Grafana: 访问仪表盘 Grafana-Prom: 查询指标 Prom-TSDB: 读取数据 TSDB--Prom: 返回数据 Prom--Grafana: 返回查询结果 Grafana--User: 展示图表 代码实现1. Prometheus指标定义# monitoring/metrics.py from prometheus_client import Counter, Gauge, Histogram, Summary from prometheus_client import CollectorRegistry, generate_latest from typing import Dict, Any import time from functools import wraps # 创建注册表 registry CollectorRegistry() # # HTTP请求指标 # # 请求总数Counter只增不减 http_requests_total Counter( http_requests_total, Total HTTP requests, [method, endpoint, status], registryregistry ) # 请求延迟Histogram分布统计 http_request_duration_seconds Histogram( http_request_duration_seconds, HTTP request latency, [method, endpoint], buckets(0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0), registryregistry ) # 活跃请求数Gauge可增可减 http_requests_in_progress Gauge( http_requests_in_progress, HTTP requests in progress, [method, endpoint], registryregistry ) # # 工作流执行指标 # # 工作流执行总数 workflow_executions_total Counter( workflow_executions_total, Total workflow executions, [workflow_id, status], registryregistry ) # 工作流执行时长 workflow_execution_duration_seconds Histogram( workflow_execution_duration_seconds, Workflow execution duration, [workflow_id], buckets(1, 5, 10, 30, 60, 120, 300, 600), registryregistry ) # 正在执行的工作流数 workflow_executions_in_progress Gauge( workflow_executions_in_progress, Workflows currently executing, registryregistry ) # 工作流执行成功率自定义计算 workflow_success_rate Gauge( workflow_success_rate, Workflow success rate, [workflow_id], registryregistry ) # # 节点执行指标 # # 节点执行总数 node_executions_total Counter( node_executions_total, Total node executions, [node_type, status], registryregistry ) # 节点执行时长 node_execution_duration_seconds Histogram( node_execution_duration_seconds, Node execution duration, [node_type], buckets(0.1, 0.5, 1, 2, 5, 10, 30), registryregistry ) # # 数据库指标 # # 数据库连接池 db_connections_total Gauge( db_connections_total, Total database connections, registryregistry ) db_connections_in_use Gauge( db_connections_in_use, Database connections in use, registryregistry ) db_connections_idle Gauge( db_connections_idle, Idle database connections, registryregistry ) # 数据库查询 db_query_duration_seconds Histogram( db_query_duration_seconds, Database query duration, [operation], buckets(0.001, 0.01, 0.05, 0.1, 0.5, 1.0), registryregistry ) # # Redis指标 # # Redis操作 redis_operations_total Counter( redis_operations_total, Total Redis operations, [operation, status], registryregistry ) redis_operation_duration_seconds Histogram( redis_operation_duration_seconds, Redis operation duration, [operation], buckets(0.001, 0.005, 0.01, 0.05, 0.1), registryregistry ) # Redis内存使用 redis_memory_used_bytes Gauge( redis_memory_used_bytes, Redis memory used in bytes, registryregistry ) # # Celery指标 # # Celery任务 celery_tasks_total Counter( celery_tasks_total, Total Celery tasks, [task_name, status], registryregistry ) celery_task_duration_seconds Histogram( celery_task_duration_seconds, Celery task duration, [task_name], buckets(1, 5, 10, 30, 60, 300), registryregistry ) # Celery队列长度 celery_queue_length Gauge( celery_queue_length, Celery queue length, [queue_name], registryregistry ) # # 业务指标 # # 活跃用户数 active_users Gauge( active_users, Number of active users, registryregistry ) # 用户操作 user_operations_total Counter( user_operations_total, Total user operations, [operation_type], registryregistry ) # 错误率 error_rate Gauge( error_rate, Error rate, [error_type], registryregistry ) # # 装饰器自动记录指标 # def track_request_metrics(func): HTTP请求指标跟踪装饰器 wraps(func) async def wrapper(*args, **kwargs): method kwargs.get(request).method if request in kwargs else UNKNOWN endpoint func.__name__ # 增加活跃请求数 http_requests_in_progress.labels(methodmethod, endpointendpoint).inc() start_time time.time() status success try: result await func(*args, **kwargs) return result except Exception as e: status error raise finally: # 记录请求时长 duration time.time() - start_time http_request_duration_seconds.labels( methodmethod, endpointendpoint ).observe(duration) # 增加请求总数 http_requests_total.labels( methodmethod, endpointendpoint, statusstatus ).inc() # 减少活跃请求数 http_requests_in_progress.labels(methodmethod, endpointendpoint).dec() return wrapper def track_workflow_metrics(func): 工作流执行指标跟踪装饰器 wraps(func) async def wrapper(workflow_id: str, *args, **kwargs): # 增加正在执行的工作流数 workflow_executions_in_progress.inc() start_time time.time() status success try: result await func(workflow_id, *args, **kwargs) return result except Exception as e: status failure raise finally: # 记录执行时长 duration time.time() - start_time workflow_execution_duration_seconds.labels( workflow_idworkflow_id ).observe(duration) # 增加执行总数 workflow_executions_total.labels( workflow_idworkflow_id, statusstatus ).inc() # 减少正在执行的工作流数 workflow_executions_in_progress.dec() # 更新成功率简化计算 total workflow_executions_total.labels( workflow_idworkflow_id, statussuccess )._value.get() workflow_executions_total.labels( workflow_idworkflow_id, statusfailure )._value.get() success workflow_executions_total.labels( workflow_idworkflow_id, statussuccess )._value.get() if total 0: workflow_success_rate.labels(workflow_idworkflow_id).set( success / total ) return wrapper def track_node_metrics(node_type: str): 节点执行指标跟踪装饰器 def decorator(func): wraps(func) async def wrapper(*args, **kwargs): start_time time.time() status success try: result await func(*args, **kwargs) return result except Exception as e: status failure raise finally: # 记录执行时长 duration time.time() - start_time node_execution_duration_seconds.labels( node_typenode_type ).observe(duration) # 增加执行总数 node_executions_total.labels( node_typenode_type, statusstatus ).inc() return wrapper return decorator # # 指标导出 # def get_metrics() - bytes: 获取Prometheus格式的指标 return generate_latest(registry)2. FastAPI集成# api/monitoring.py from fastapi import APIRouter, Response from monitoring.metrics import ( get_metrics, track_request_metrics, db_connections_total, db_connections_in_use, db_connections_idle, redis_memory_used_bytes, celery_queue_length, active_users ) from database import engine from redis_client import redis from celery_app import celery_app import asyncio router APIRouter() router.get(/metrics) async def metrics(): Prometheus指标端点 返回Prometheus格式的指标数据 # 更新数据库连接池指标 pool engine.pool db_connections_total.set(pool.size()) db_connections_in_use.set(pool.checkedout()) db_connections_idle.set(pool.size() - pool.checkedout()) # 更新Redis内存指标 redis_info await redis.info(memory) redis_memory_used_bytes.set(redis_info[used_memory]) # 更新Celery队列长度 inspect celery_app.control.inspect() active_tasks inspect.active() if active_tasks: for queue_name, tasks in active_tasks.items(): celery_queue_length.labels(queue_namequeue_name).set(len(tasks)) # 更新活跃用户数示例 # 实际应该从Redis或数据库查询 active_users.set(1234) # 返回指标 metrics_data get_metrics() return Response(contentmetrics_data, media_typetext/plain) router.get(/health) track_request_metrics async def health_check(): 健康检查端点 return { status: healthy, timestamp: datetime.now().isoformat() }3. 工作流引擎集成# engine/workflow_engine.py from monitoring.metrics import ( track_workflow_metrics, track_node_metrics, workflow_executions_in_progress, node_executions_total ) class WorkflowEngine: 工作流执行引擎集成监控 track_workflow_metrics async def execute_workflow(self, workflow_id: str): 执行工作流 自动记录执行指标 workflow await self.load_workflow(workflow_id) dag self.build_dag(workflow) execution_order list(nx.topological_sort(dag)) results {} for node_id in execution_order: node dag.nodes[node_id][data] results[node_id] await self.execute_node(node, results) return results async def execute_node(self, node, context): 执行节点根据类型应用不同的监控 node_type node.type # 应用节点类型特定的监控 if node_type http_request: return await self._execute_http_node(node, context) elif node_type database: return await self._execute_db_node(node, context) # ... 其他节点类型 track_node_metrics(http_request) async def _execute_http_node(self, node, context): 执行HTTP请求节点 # 实现HTTP请求逻辑 pass track_node_metrics(database) async def _execute_db_node(self, node, context): 执行数据库节点 # 实现数据库操作逻辑 pass4. Docker Compose配置# docker-compose.monitoring.yml version: 3.8 services: # Prometheus服务 prometheus: image: prom/prometheus:latest container_name: quantumflow-prometheus ports: - 9090:9090 volumes: - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml - ./monitoring/rules:/etc/prometheus/rules - prometheus-data:/prometheus command: - --config.file/etc/prometheus/prometheus.yml - --storage.tsdb.path/prometheus - --web.console.libraries/usr/share/prometheus/console_libraries - --web.console.templates/usr/share/prometheus/consoles - --storage.tsdb.retention.time30d networks: - monitoring restart: unless-stopped # Grafana服务 grafana: image: grafana/grafana:latest container_name: quantumflow-grafana ports: - 3000:3000 environment: - GF_SECURITY_ADMIN_USERadmin - GF_SECURITY_ADMIN_PASSWORDadmin123 - GF_USERS_ALLOW_SIGN_UPfalse volumes: - ./monitoring/grafana/provisioning:/etc/grafana/provisioning - ./monitoring/grafana/dashboards:/var/lib/grafana/dashboards - grafana-data:/var/lib/grafana networks: - monitoring restart: unless-stopped depends_on: - prometheus # AlertManager服务 alertmanager: image: prom/alertmanager:latest container_name: quantumflow-alertmanager ports: - 9093:9093 volumes: - ./monitoring/alertmanager.yml:/etc/alertmanager/alertmanager.yml - alertmanager-data:/alertmanager command: - --config.file/etc/alertmanager/alertmanager.yml - --storage.path/alertmanager networks: - monitoring restart: unless-stopped # Node Exporter系统指标 node-exporter: image: prom/node-exporter:latest container_name: quantumflow-node-exporter ports: - 9100:9100 command: - --path.procfs/host/proc - --path.sysfs/host/sys - --collector.filesystem.mount-points-exclude^/(sys|proc|dev|host|etc)($$|/) volumes: - /proc:/host/proc:ro - /sys:/host/sys:ro - /:/rootfs:ro networks: - monitoring restart: unless-stopped volumes: prometheus-data: grafana-data: alertmanager-data: networks: monitoring: driver: bridge5. Prometheus配置# monitoring/prometheus.yml global: scrape_interval: 15s evaluation_interval: 15s external_labels: cluster: quantumflow-production environment: production # 告警规则文件 rule_files: - /etc/prometheus/rules/*.yml # AlertManager配置 alerting: alertmanagers: - static_configs: - targets: - alertmanager:9093 # 抓取配置 scrape_configs: # Prometheus自身 - job_name: prometheus static_configs: - targets: [localhost:9090] # FastAPI应用 - job_name: quantumflow-api static_configs: - targets: [api:8000] metrics_path: /api/metrics scrape_interval: 10s # Celery Worker - job_name: quantumflow-worker static_configs: - targets: [worker:9091] scrape_interval: 10s # Node Exporter系统指标 - job_name: node-exporter static_configs: - targets: [node-exporter:9100] # PostgreSQL Exporter - job_name: postgres static_configs: - targets: [postgres-exporter:9187] # Redis Exporter - job_name: redis static_configs: - targets: [redis-exporter:9121]6. 告警规则配置# monitoring/rules/alerts.yml groups: - name: quantumflow_alerts interval: 30s rules: # # 系统级告警 # # CPU使用率过高 - alert: HighCPUUsage expr: 100 - (avg by(instance) (rate(node_cpu_seconds_total{modeidle}[5m])) * 100) 80 for: 5m labels: severity: warning category: system annotations: summary: CPU使用率过高 description: 实例 {{ $labels.instance }} CPU使用率为 {{ $value }}% # 内存使用率过高 - alert: HighMemoryUsage expr: (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100 85 for: 5m labels: severity: warning category: system annotations: summary: 内存使用率过高 description: 实例 {{ $labels.instance }} 内存使用率为 {{ $value }}% # 磁盘空间不足 - alert: LowDiskSpace expr: (1 - (node_filesystem_avail_bytes / node_filesystem_size_bytes)) * 100 85 for: 10m labels: severity: critical category: system annotations: summary: 磁盘空间不足 description: 实例 {{ $labels.instance }} 磁盘使用率为 {{ $value }}% # # 应用级告警 # # HTTP错误率过高 - alert: HighHTTPErrorRate expr: | ( sum(rate(http_requests_total{status~5..}[5m])) / sum(rate(http_requests_total[5m])) ) * 100 5 for: 5m labels: severity: critical category: application annotations: summary: HTTP错误率过高 description: HTTP 5xx错误率为 {{ $value }}% # 请求延迟过高 - alert: HighRequestLatency expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) 2 for: 5m labels: severity: warning category: application annotations: summary: 请求延迟过高 description: 95分位延迟为 {{ $value }}秒 # 活跃请求数过多 - alert: TooManyActiveRequests expr: sum(http_requests_in_progress) 100 for: 5m labels: severity: warning category: application annotations: summary: 活跃请求数过多 description: 当前活跃请求数为 {{ $value }} # # 工作流告警 # # 工作流执行失败率过高 - alert: HighWorkflowFailureRate expr: | ( sum(rate(workflow_executions_total{statusfailure}[10m])) / sum(rate(workflow_executions_total[10m])) ) * 100 10 for: 10m labels: severity: critical category: workflow annotations: summary: 工作流失败率过高 description: 工作流失败率为 {{ $value }}% # 工作流执行时长过长 - alert: SlowWorkflowExecution expr: histogram_quantile(0.95, rate(workflow_execution_duration_seconds_bucket[10m])) 300 for: 10m labels: severity: warning category: workflow annotations: summary: 工作流执行时长过长 description: 95分位执行时长为 {{ $value }}秒 # 正在执行的工作流过多 - alert: TooManyActiveWorkflows expr: workflow_executions_in_progress 50 for: 5m labels: severity: warning category: workflow annotations: summary: 正在执行的工作流过多 description: 当前正在执行 {{ $value }} 个工作流 # # 数据库告警 # # 数据库连接池耗尽 - alert: DatabaseConnectionPoolExhausted expr: (db_connections_in_use / db_connections_total) * 100 90 for: 5m labels: severity: critical category: database annotations: summary: 数据库连接池即将耗尽 description: 连接池使用率为 {{ $value }}% # 数据库查询慢 - alert: SlowDatabaseQueries expr: histogram_quantile(0.95, rate(db_query_duration_seconds_bucket[5m])) 1 for: 5m labels: severity: warning category: database annotations: summary: 数据库查询慢 description: 95分位查询时长为 {{ $value }}秒 # # Redis告警 # # Redis内存使用率过高 - alert: HighRedisMemoryUsage expr: (redis_memory_used_bytes / redis_memory_max_bytes) * 100 85 for: 5m labels: severity: warning category: redis annotations: summary: Redis内存使用率过高 description: Redis内存使用率为 {{ $value }}% # # Celery告警 # # Celery队列堆积 - alert: CeleryQueueBacklog expr: celery_queue_length 1000 for: 10m labels: severity: warning category: celery annotations: summary: Celery队列堆积 description: 队列 {{ $labels.queue_name }} 长度为 {{ $value }} # Celery任务失败率过高 - alert: HighCeleryTaskFailureRate expr: | ( sum(rate(celery_tasks_total{statusfailure}[10m])) / sum(rate(celery_tasks_total[10m])) ) * 100 10 for: 10m labels: severity: critical category: celery annotations: summary: Celery任务失败率过高 description: 任务失败率为 {{ $value }}%7. AlertManager配置# monitoring/alertmanager.yml global: resolve_timeout: 5m smtp_smarthost: smtp.gmail.com:587 smtp_from: alertsquantumflow.com smtp_auth_username: alertsquantumflow.com smtp_auth_password: your-password # 告警路由 route: group_by: [alertname, cluster, service] group_wait: 10s group_interval: 10s repeat_interval: 12h receiver: default routes: # 系统级告警 - 运维团队 - match: category: system receiver: ops-team continue: true # 应用级告警 - 开发团队 - match: category: application receiver: dev-team continue: true # 工作流告警 - 产品团队 - match: category: workflow receiver: product-team continue: true # 严重告警 - 所有人 - match: severity: critical receiver: all-teams # 告警接收器 receivers: - name: default email_configs: - to: teamquantumflow.com - name: ops-team email_configs: - to: opsquantumflow.com slack_configs: - api_url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL channel: #ops-alerts title: 运维告警 text: {{ range .Alerts }}{{ .Annotations.summary }}\n{{ end }} - name: dev-team email_configs: - to: devquantumflow.com slack_configs: - api_url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL channel: #dev-alerts title: 开发告警 - name: product-team email_configs: - to: productquantumflow.com slack_configs: - api_url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL channel: #product-alerts title: 产品告警 - name: all-teams email_configs: - to: teamquantumflow.com slack_configs: - api_url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL channel: #critical-alerts title: 严重告警 webhook_configs: - url: http://api:8000/api/alerts/webhook # 告警抑制规则 inhibit_rules: # 如果有严重告警抑制警告级别的相同告警 - source_match: severity: critical target_match: severity: warning equal: [alertname, instance]8. Grafana仪表盘配置{ dashboard: { id: null, uid: quantumflow-overview, title: QuantumFlow 系统概览, tags: [quantumflow, overview], timezone: browser, schemaVersion: 16, version: 1, refresh: 30s, panels: [ { id: 1, title: HTTP请求速率, type: graph, gridPos: {h: 8, w: 12, x: 0, y: 0}, targets: [ { expr: sum(rate(http_requests_total[5m])) by (status), legendFormat: {{ status }}, refId: A } ], yaxes: [ {format: reqps, label: 请求/秒}, {format: short} ] }, { id: 2, title: HTTP请求延迟, type: graph, gridPos: {h: 8, w: 12, x: 12, y: 0}, targets: [ { expr: histogram_quantile(0.50, rate(http_request_duration_seconds_bucket[5m])), legendFormat: P50, refId: A }, { expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])), legendFormat: P95, refId: B }, { expr: histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m])), legendFormat: P99, refId: C } ], yaxes: [ {format: s, label: 延迟}, {format: short} ] }, { id: 3, title: 工作流执行成功率, type: stat, gridPos: {h: 4, w: 6, x: 0, y: 8}, targets: [ { expr: avg(workflow_success_rate) * 100, refId: A } ], options: { graphMode: area, colorMode: value, unit: percent, thresholds: { mode: absolute, steps: [ {value: 0, color: red}, {value: 90, color: yellow}, {value: 95, color: green} ] } } }, { id: 4, title: 活跃工作流数, type: stat, gridPos: {h: 4, w: 6, x: 6, y: 8}, targets: [ { expr: workflow_executions_in_progress, refId: A } ], options: { graphMode: area, colorMode: value, unit: short } }, { id: 5, title: 数据库连接池, type: gauge, gridPos: {h: 4, w: 6, x: 12, y: 8}, targets: [ { expr: (db_connections_in_use / db_connections_total) * 100, refId: A } ], options: { showThresholdLabels: false, showThresholdMarkers: true, unit: percent, thresholds: { mode: absolute, steps: [ {value: 0, color: green}, {value: 70, color: yellow}, {value: 90, color: red} ] } } }, { id: 6, title: Celery队列长度, type: stat, gridPos: {h: 4, w: 6, x: 18, y: 8}, targets: [ { expr: sum(celery_queue_length), refId: A } ], options: { graphMode: area, colorMode: value, unit: short, thresholds: { mode: absolute, steps: [ {value: 0, color: green}, {value: 500, color: yellow}, {value: 1000, color: red} ] } } }, { id: 7, title: 节点执行时长分布, type: heatmap, gridPos: {h: 8, w: 24, x: 0, y: 12}, targets: [ { expr: sum(rate(node_execution_duration_seconds_bucket[5m])) by (le, node_type), format: heatmap, refId: A } ] }, { id: 8, title: 系统资源使用, type: graph, gridPos: {h: 8, w: 12, x: 0, y: 20}, targets: [ { expr: 100 - (avg(rate(node_cpu_seconds_total{mode\idle\}[5m])) * 100), legendFormat: CPU使用率, refId: A }, { expr: (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100, legendFormat: 内存使用率, refId: B } ], yaxes: [ {format: percent, label: 使用率}, {format: short} ] }, { id: 9, title: 错误类型分布, type: piechart, gridPos: {h: 8, w: 12, x: 12, y: 20}, targets: [ { expr: sum(rate(node_executions_total{status\failure\}[1h])) by (node_type), refId: A } ] } ] } } 测试验证性能测试脚本# tests/performance_test.py import asyncio import aiohttp import time from typing import List import statistics async def test_api_performance( url: str, num_requests: int 1000, concurrency: int 10 ): API性能测试 Args: url: 测试URL num_requests: 总请求数 concurrency: 并发数 async def make_request(session, request_id): start_time time.time() try: async with session.get(url) as response: await response.text() duration time.time() - start_time return { request_id: request_id, status: response.status, duration: duration, success: response.status 200 } except Exception as e: duration time.time() - start_time return { request_id: request_id, status: 0, duration: duration, success: False, error: str(e) } # 创建会话 async with aiohttp.ClientSession() as session: # 分批发送请求 results [] for batch_start in range(0, num_requests, concurrency): batch_end min(batch_start concurrency, num_requests) batch_tasks [ make_request(session, i) for i in range(batch_start, batch_end) ] batch_results await asyncio.gather(*batch_tasks) results.extend(batch_results) # 统计结果 durations [r[duration] for r in results] success_count sum(1 for r in results if r[success]) print(f\n性能测试结果:) print(f总请求数: {num_requests}) print(f成功数: {success_count}) print(f失败数: {num_requests - success_count}) print(f成功率: {success_count / num_requests * 100:.2f}%) print(f\n延迟统计:) print(f最小值: {min(durations):.3f}s) print(f最大值: {max(durations):.3f}s) print(f平均值: {statistics.mean(durations):.3f}s) print(f中位数: {statistics.median(durations):.3f}s) print(fP95: {statistics.quantiles(durations, n20)[18]:.3f}s) print(fP99: {statistics.quantiles(durations, n100)[98]:.3f}s) if __name__ __main__: asyncio.run(test_api_performance( urlhttp://localhost:8000/api/workflows, num_requests1000, concurrency50 )) 性能优化1. 指标采集优化# 使用批量更新减少锁竞争 from prometheus_client import CollectorRegistry, Counter import threading class BatchedCounter: 批量更新的计数器 def __init__(self, counter: Counter, batch_size: int 100): self.counter counter self.batch_size batch_size self.buffer {} self.lock threading.Lock() def inc(self, labels: dict, value: float 1): 增加计数 key tuple(sorted(labels.items())) with self.lock: self.buffer[key] self.buffer.get(key, 0) value if len(self.buffer) self.batch_size: self._flush() def _flush(self): 批量刷新到Prometheus for key, value in self.buffer.items(): labels dict(key) self.counter.labels(**labels).inc(value) self.buffer.clear()2. Prometheus查询优化# 不好的查询计算量大 sum(rate(http_requests_total[5m])) by (endpoint) # 优化后使用recording rule预计算 sum(http_requests:rate5m) by (endpoint)Recording Rule配置# monitoring/rules/recording.yml groups: - name: http_metrics interval: 30s rules: # 预计算5分钟请求速率 - record: http_requests:rate5m expr: rate(http_requests_total[5m]) # 预计算错误率 - record: http_requests:error_rate expr: | sum(rate(http_requests_total{status~5..}[5m])) / sum(rate(http_requests_total[5m]))3. Grafana查询优化{ targets: [ { expr: http_requests:rate5m, interval: 30s, intervalFactor: 2, step: 60 } ] } 深入探讨1. 自定义Exporter开发# exporters/workflow_exporter.py from prometheus_client import start_http_server, Gauge import time from database import get_db from models import Workflow, Execution # 定义指标 workflow_count Gauge(workflow_count, Total workflows, [status]) execution_count Gauge(execution_count, Total executions, [status]) async def collect_metrics(): 采集业务指标 async for db in get_db(): # 统计工作流数量 active_workflows await db.execute( SELECT COUNT(*) FROM workflows WHERE status active ) workflow_count.labels(statusactive).set(active_workflows.scalar()) # 统计执行数量 success_executions await db.execute( SELECT COUNT(*) FROM executions WHERE status success ) execution_count.labels(statussuccess).set(success_executions.scalar()) if __name__ __main__: # 启动HTTP服务器 start_http_server(9091) # 定期采集指标 while True: asyncio.run(collect_metrics()) time.sleep(60)2. 分布式追踪集成# 集成OpenTelemetry from opentelemetry import trace from opentelemetry.exporter.prometheus import PrometheusMetricReader from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.trace import TracerProvider # 配置追踪 trace.set_tracer_provider(TracerProvider()) tracer trace.get_tracer(__name__) # 配置指标 reader PrometheusMetricReader() provider MeterProvider(metric_readers[reader]) tracer.start_as_current_span(execute_workflow) async def execute_workflow(workflow_id: str): 带追踪的工作流执行 span trace.get_current_span() span.set_attribute(workflow.id, workflow_id) # 执行逻辑 result await _do_execute(workflow_id) span.set_attribute(workflow.status, result.status) return result3. 成本优化# Prometheus数据保留策略 global: scrape_interval: 15s # 不同数据保留不同时长 scrape_configs: - job_name: high-frequency scrape_interval: 5s # 仅保留1天 metric_relabel_configs: - source_labels: [__name__] regex: http_.* action: keep - job_name: low-frequency scrape_interval: 60s # 保留30天 参考资料官方文档[Prometheus Documentation](https://prometheus.io/docs