多读书多实践,勤思考善领悟

监控组件kapacitor使用说明

本文于1638天之前发表,文中内容可能已经过时。

初识

TIGK技术栈是一个开源的监控方案,其实本身技术栈为TICK,即Telegraf,InfluxDB,Chronograf,Kapacitor,但是由于Chronograf没有Grafana扩展性和易用性强,暂时使用了Grafana作为替代方案。

TIGK技术栈包括
InfuluxDB go语言开发的时序数据库 时序数据库相比传统关系型数据库更关注数据的实时性和并发插入时的承受能力 开放有restfulAPI。

Telegraf 同样是go语言开发的在目标机器上的agent采集工具,作为服务而言它很轻量级,并且扩展性也强,支持在linux系统下使用脚本对应用,容器等进行监控,监控采集的数据会发送给influxDB。

Grafana 可视化的监控展示服务,提供包括折线图,饼图,仪表盘等多种监控数据可视化UI,支持多种不同的时序数据库数据源,Grafana对每种数据源提供不同的查询方法,而且能很好的支持每种数据源的特性。

Kapacitor TIGK技术栈的告警服务,用户通过tickScript脚本来对时序数据库当中的数据进行过滤,筛选,批处理等进行告警,告警信息可以通过日志保存在本地,或回插到influxdb,还可以直接在告警产生后发起http请求到指定地址,kapacitor支持数据流(stream)和批处理(batch)数据。

kapacitor是一个脚本定义告警规则服务,与influxdb强相关,安装kapacitor后通过配置kapacitor.conf文件来配置和influxdb连接(通常是与influxdb开放的本地端口8086连接)

安装:

1
wget https://dl.influxdata.com/kapacitor/releases/kapacitor-1.5.3.x86_64.rpm

启停服务:

1
2
service kapacitor start
service kapacitor stop

如果influxdb安装在本地且开启了8086端口,则不需要进行任何配置,如果influxdb安装在其它机器,则需要配置kapacitor连接influxdb的api

1
vi /etc/kapacitor/kapacitor.conf

kapacitor常用命令

kapacitor list tasks –显示所有当前kapacitor加载的脚本
kapacitor define -tick -dbrp -type –安装脚本到kapacitor
kapacitor enable –脚本监控开启
kapacitor disable –脚本监控关闭
kapacitor show –显示某一个脚本的执行状态
kapacitor record -task -duration –记录某一时间段内某脚本运行状态(测试用)
kapacitor replay kapacitor replay -recording -task –重复执行测试结果

比如以下这个脚本:

1
vi task01.tick

1
2
3
4
5
6
var data = stream
|from()
.measurement('cpu')
|alert()
.info(lambda: "usage_idle"<=20)
.post('http://192.168.199.215:9001')

此脚本作用为,在抓取到cpu表中cpu空闲率低于百分之20时即产生告警,
首先需要使用kapacitor define安装

1
kapacitor define task01 -tick task01.tick -dbrp "monitor_db.autogen" -type "stream"

然后使用 kapacitor record执行脚本采集

1
kapacitor record stream -task task01 -duration 10s

此命令会阻塞10秒,十秒以后系统会返回一个rid,通过此rid可以使用命令

1
kapacitor list recordings $rid

来查看脚本执行状态,通常如果size那一栏不为空,则说明脚本有数据流入,则当前脚本是有效的
如果脚本当中有触发告警,可以到/vars/log/kapacitor.log或/tmp/alert.log当中查看告警触发结果

确认告警脚本无误后,使用enable则可开启当前脚本的监控

1
kapacitor enable task01

脚本核心概念

  • task:
    task是一个tickscript的最大粒度单位,可以说,一个tickscript就是一个task,task标识在kapacitor当中处理一段数据(a set of data)的任务集,一段脚本要起作用,首先需要经过kapacitor对当前的task文件进行编译(define)然后enable,同事还可以使用record命令来检测某一个task最近20分钟进入的数据流和节点处理的结果

  • pipeline
    和名称类似,pipeline就是指管道,标识数据的流向
    kapaciotr当中的数据是由influxdb处监听获取,从管道
    自上而下进行处理,可以有分支,可以有交汇,但是不可以回溯,管道根据数据源来源方式不同,可以分为两种,即stream和batch

  • stream 数据流
    kapacitor关键字之一,用来标识数据流来源的入口,即它会监听每一个插入influxdb的数据,然后通过管道当中 的各个节点进行处理,最终流向管道底部
    格式:

    1
    2
    3
     var varible_name = stream
    |from()
    ...
  • batch 批处理
    kapacitor关键字之一,用来标识批处理数据来源的入口,可以理解为每隔一段时间执行一条influxQL去influxdb当中查询数据,查询到的结果集会随当前batch所在的管道流向底部
    格式:

    1
    2
    3
     var varible_name = batch
    |query('''select usage_idle from cpu where host = 'xxx'''')
    ...

    stream是监听当前influxdb每一个时间点插入的数据流,而batch则是获取的InfluxQL查询的结果集,此结果集应该是单行的,stream下面可以不用直接跟from()方法指定数据库和表(即会监听当前所有插入influxdb的数据)
    但是batch方法下必须指定query()作为批处理对数据进行筛选

  • node
    用来组成管道的节点,可以理解为我们常见的水管上用来拼接成管道的连接头,也可以理解为processor point,即数据处理器节点,一个pipeline是通过多个node组成的

  • filed和tag
    是influxdb数据库的字段类型
    field表示当前插入数据值,它带有数据类型,包括double int等
    tag其实就是标签的意思,通常用于数据筛选,它的数据类型一定是字符串类型
    eg:

    1
    2
    3
    host       cpu_usage_idle
    --------------------
    '1138' 58.5

表示1138主机上的cpu使用空闲率为58.5%
在kapacitor当中是会将filed和tag进行区分的,一个明显的区别就是filed在node的处理过程当中可能会被销毁,但是tag不会

  • syntax-subspace
    语法子空间
    kapacitor借鉴了很多种语言做出了tickscript,而tickscript当中包含语法子空间的
    简单的说就是在一个task当中,不止包含tickscript语法
    举个栗子
    在使用query方法时

    1
    2
    batch
    query('''select count(usage_idle) from cpu where time > now() - 10s limit 1''')

    这里包含了influxQL查询语句,influxQL语法,就是一个语法子空间
    还有比如使用eval模块时

    1
    |eval(lambda : if("usage_idle" > 80)).as('res')

这里使用lambda表达式,则使用了tickscript 和 lambda语法
通常在kapacitor当中也只会包含这两种另外的语法

  • chaining method和property method
    连接方法和连接方法的属性方法(子方法)
    之前有说过node
    定义一个node的方式则是使用一个管道符号(“|”)然后跟一个chaining method,概念上可以把chaining method直接认为是node
    比如
    1
    2
    var data = stream
    |from()

上面的from就是一个chaining method,和管道符号一起组成了一个node
而属性方法则是定义在当前连接方法下的子方法
比如as()
连接方法join()和连接方法eval()都有as()作为properties method 但是是两个完全不同的方法(join()下的as是为交汇的pipeline的结果集字段赋予前缀,eval()下的as是创建一个新的field字段供后面的节点使用)
eg:

1
2
3
4
var data=stream
|from()
.database('aomp_monitor_db')
.retentionPolicy('one_day')

这里的database和retentionPolicy都是定义在chaining method下的property method,用来指定数据库和数据保存方法
需要注意的是,property method的执行顺序是在chaining method执行之前按顺序执行,property method的同一个chaining method下的property metohd的执行顺序也会影响执行结果(亲测)

就像不同的水管只能装不同的连接器一样,不同的node后面只能跟指定的node,有的node作为pipeline的终端后面也不可以跟node,具体可以上官网看node之间的关联关系

kapacitor脚本语法

  1. 脚本结构:
  • 通常一个task被拆分为两个部分
    declaration和expression
    即声明和操作
    声明即是声明变量
    操作则是对数据进行处理评测并发出警告等,即是上面写的pipeline定义和处理
    比如:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    var v_alarmRuleId=1894653
    var v_alarmPolicyId=1894654
    var v_level='warn'
    var v_host='{{ index .Tags "host" }}'
    var v_target_id='{{ index .Tags "host" }}'
    var v_db='aomp_monitor_db'
    var v_rp='autogen'
    var v_measurement='cpu'
    var v_groupBy='host'
    var v_whereFilter=lambda: ("host"=='1889911' OR "host"=='1874471' OR "host"=='1855534')
    var v_name='测试'
    var v_field='usage_idle'
    var v_idVar=v_name +':{{.Group}}'
    var v_message='id:{{ .ID }} ,name: {{ .Name}},taskname:{{.TaskName}},Group:{{.Group}}, host: {{ index .Tags "host" }} ,Level :warn, value:{{ index .Fields "value" }},time:{{.Time}}'
    var v_idTag='alertID'
    var v_levelTag='level'
    var v_every=10s
    var v_messageField='message'
    var v_durationField='duration'
    var v_value='{{ index .Fields "value" }}'
    var data = stream
    |from()
    .database(v_db)
    .retentionPolicy(v_rp)
    .measurement(v_measurement)
    .where(v_whereFilter)
    .groupBy(v_groupBy)
    |eval(lambda: if(("usage_idle"<=10), 1, 0)).as('res').keep()
    var trigger = data
    |alert()
    .info(lambda: "res" == 1)
    .id(v_idVar)
    .message(v_message)
    .messageField(v_messageField)
    .durationField(v_durationField)
    .stateChangesOnly()
    .details('{"v_alarmRuleId":"1894653","v_alarmPolicyId":"1894654","v_level":"warn","v_host":"{{ index .Tags "host" }}","v_target_id":"{{ index .Tags "host" }}","v_db":"aomp_monitor_db","v_rp":"autogen","v_measurement":"cpu","v_groupBy":"host","v_name":"测试多条件告警规则策略","v_field":"usage_idle","v_idVar":"v_name +:{{.Group}}","v_message":"id:{{ .ID }} ,name: {{ .Name}},taskname:{{.TaskName}},Group:{{.Group}}, host: {{ index .Tags "host" }} ,Level :warn, value:{{ index .Fields "value" }},time:{{.Time}}","v_idTag":"alertID","v_levelTag":"level","v_every":"10s","v_messageField":"message","v_durationField":"duration","v_value":"{{ index .Fields "value" }}"}')
    .post('http://192.168.183.81:7001/open_api/alarm_notify/receive')
    var solve_trigger = data
    | window()
    .period(60s)
    .every(v_every)
    |alert()
    .info(lambda: "res" == 0)
    .warn(lambda: "res" == 1)
    .id(v_idVar)
    .message(v_message)
    .messageField(v_messageField)
    .durationField(v_durationField)
    .stateChangesOnly()
    .details('{"v_alarmRuleId":"1894653","v_alarmPolicyId":"1894654","v_level":"warn","v_host":"{{ index .Tags "host" }}","v_target_id":"{{ index .Tags "host" }}","v_db":"aomp_monitor_db","v_rp":"autogen","v_measurement":"cpu","v_groupBy":"host","v_name":"测试多条件告警规则策略","v_field":"usage_idle","v_idVar":"v_name +:{{.Group}}","v_message":"id:{{ .ID }} ,name: {{ .Name}},taskname:{{.TaskName}},Group:{{.Group}}, host: {{ index .Tags "host" }} ,Level :warn, value:{{ index .Fields "value" }},time:{{.Time}}","v_idTag":"alertID","v_levelTag":"level","v_every":"10s","v_messageField":"message","v_durationField":"duration","v_value":"{{ index .Fields "value" }}"}')
    .post('http://192.168.183.81:7001/open_api/alarm_notify/solve')
  1. 变量
    kapacitor脚本中所有声明的变量都是常量,每一次评估时变量的内容不可以修改
    类似kapacitor这样的脚本有一个概念叫evaluate,即评估,每一次评估则意味着一次数据进入脚本,在数据在pipeline当中流动的时候,当前脚本定义的变量值和流动的数据本身,是不可修改的
    变量声明格式类似go和javascript
    1
    var v_db = 'monitor_linux_db'

变量名必须以英文开头,区分大小写,英文开头后可以跟数字,下划线等
变量声明不可以用kapacitor关键字,比如stream和batch

  1. 空格
    tickscript中,在expression部分,空格是可以忽略的,但是在声明变量的declaration部分要求必须使用空格将var标识和变量名隔开

  2. 单双引号
    单引号在任何时候都表示字符串,
    使用\ 反斜杠进行转义
    同时kapacitor也支持类似python的语法,用三个连续的单引号表示长字符串(避免过多的转义符)

在lambda表达式和influxql语句当中,使用双引号来表示某一个field或者tag,尤其是在influxql语句当中,强烈建议字段名加上双引号,因为influxql有特别多的内置关键字,使用双引号可以将字段名和关键字进行区分

  1. 数据结构
    常用的数据结构包含
    string duration int float lamdba list
  • string
    使用单引号表示字符串
    当需要在字符串当中引用变量时:
    引用当前script当中定义的变量直接使用{{.valrible_name }} 比如{{.ID }}
    引用结果集或者stream当中定义的变量使用{{index .Fileds "usage_idle" }}或者{{index.Tags "host" }}

  • duration
    时间数据类型
    这里列举一下时间数据类型常见字面量
    1d 1w 1h 1m 1s 1ms 1u
    天 周 小时 分 秒 毫秒 微秒

  • regular expression
    正则表达式类型
    字面量:

    1
    var cz_reg = /^cz\d+/
  • lambda表达式
    通常用来做数据处理并返回一个处理后的值
    常见字面量
    var my_lambda = lambda:1>0
    返回一个值为true的lambda表达式类型变量

  • int 和 float
    如果没有在使用lineprotocol插入数据到influxdb的时候指定某个filed字段为int类型,那么默认都是float类型,需要注意的是,有的lambda表达式方法需要的参数限制了数据类型为int,这个时候可以用int()方法强转一下
  1. comparison operator 比较器
  • lamdba比较器
    使用 == 和 !=,以及AND 和 OR 进行表达式比较
  • influxql比较器
    除了普遍的sql语法外,influxQL还支持使用=~和!~
    来进行正则表达式的匹配

7.关于node的分类

  • 关于node
    node分类为
    datasource definition nodes 比如 batch和stream
    data definition node 比如 from() query()
    data manipulation nodes 比如 sample() default()
    processing nodes
    processing nodes下分
    数据构造处理节点和传输处理节点
    比如alert()是数据传输处理节点
    join()是数据构造处理节点

8.kapacitor还可以调用自己的go方法
使用@+方法名进行调用
参考官方文档
http://docs.influxdata.com/