神刀安全网

ELK维护的一些点(二)

很杂, 涉及到最近处理的一些点

根据string转浮点数的某个字段排序

一个字段, resp_time , mapping中是string, 有需求是, 按照响应时间降序排序, 此时需要构造qsl(在search中使用), 使用该字段转换为浮点数, 降序排列

第一步, 修改es配置, 增加groovy支持

elasticsearch.yml中加入

script.engine.groovy.inline.search: on

然后, 执行rolling restart, 逐一重启集群每个节点

第二步, 构造qsl, sort 中, 增加 _script 使用groovy脚本, 将对应字段从string转成数字, 再进行排序

'sort': [{'_script': {'lang': 'groovy',                        'order': 'desc',                        'script': 'Float.parseFloat(doc["resp_time"].value)',                        'type': 'number'}},           {'@timestamp': 'desc'}           ]

scripting文档

fielddata-format-disabled 导致的排序失效

有个集群, 升级后, 发现 resp_time 字段的mapping是

"resp_time" : { "type" : "string", "norms" : {     "enabled" : false }, "fielddata" : {     "format" : "disabled" }, "fields" : {     "raw" : {     "type" : "string",     "index" : "not_analyzed",     "ignore_above" : 256     } }

注意这里的, 是因为升级es 2.0之后, 默认值变更带来的问题

"fielddata" : {   "format" : "disabled" },

fielddata文档

此时, 排序的qsl将会报错, 无法按照对应要求排序

Field data loading is forbidden on resp_time

解决方案, 挺简单的, 使用 foo.raw 即可

'sort': [{'_script': {'lang': 'groovy',     'order': 'desc',     'script': 'Float.parseFloat(doc["resp_time.raw"].value)',     'type': 'number'}}, {'@timestamp': 'desc'} ]

使用聚合

把string类型的 resp_time 放到 aggs 中做聚合的时候.

"aggs": {      "resp_time_stats": {"stats": {"script": 'Float.parseFloat(doc["resp_time.raw"].value)'}} }

此时, 会报错

{u'error': {u'failed_shards': [{u'index': u'logstash-2016.04.10',                                 u'node': u'AvemqKN-RGKy68zJXUapBg',                                 u'reason': {u'reason': u'scripts of type [inline], operation [aggs] and lang [groovy] are disabled',                                             u'type': u'script_exception'},                                 u'shard': 0}],             u'grouped': True,             u'phase': u'query',             u'reason': u'all shards failed',             u'root_cause': [{u'reason': u'scripts of type [inline], operation [aggs] and lang [groovy] are disabled',                              u'type': u'script_exception'}],             u'type': u'search_phase_execution_exception'},  u'status': 500}

处理, es加配置, 逐一重启

script.engine.groovy.inline.aggs: on

相关 文档

logstash grok default patterns

默认的一些pattern, 见 grok-patterns

grok检查在线实时编辑, https://grokdebug.herokuapp.com/

logstash codec multiline 限制行数和日志大小

配置, 具体见 multiline文档

input {         codec => multiline {             patterns_dir => "./patterns"             pattern => ""             what => "previous"             negate  => true             max_lines => 100             max_bytes => "50kib"         } }

单位 bytes

实践中, 使用 max_bytes , 当 what=previous + negate=true 的情况下, 即不匹配模式的, 归属前一部分, 这种情况下, 性能ok, 反之 what=next + negate=true 的情况下, 不匹配成功归属于后半部分, 此时产生的cpu消耗非常之大, 可以将一台机器跑满.

另外, 假设配置 max_bytes=1M , 此时用户打了50M, 会给这个event打上tag multiline_codec_max_bytes_reache , 但是, 这50M 最终还是会经logstash灌入到es里面. 即, 超了, 但是并不自动截掉

这时候, 我们可以, 使用 mutate-replace 直接替换掉

# if multiline_codec_max_lines_reached     if ("multiline_codec_max_bytes_reached" in [tags]) {         mutate {             replace => {                 "message" => "Log System Warnning: multiline_codec_max_lines_reached, Your log has exceeded 50kB(51200 chars), it was blocked by log system. Please check your code to make your log info shorter and useful"                 "msg" => "Log System Warnning: multiline_codec_max_lines_reached, Your log has exceeded 50kB(51200 chars), it was blocked by log system. Please check your code to make your log info shorter and useful"             }         }     }

使用supervisord管理logstash进程

之前提到, 升级集群后, 使用supervisord统一管理logstash进程,链接

查看当前机器logstash进程top

有时, 需要上机器看看对应采集端所有logstash进程是否存在问题, 常常用到 top 命令, 所以写了个简单的脚本, 配合supervisord的脚本使用

ltop.sh

#!/bin/bash ./logstashd.sh status top -p $(./logstashd.sh status | awk '{print $4}' | awk -F',' '{print $1}' | tr '/n' ',' | sed 's/,$//g')

进程占用cpu检测脚本

#!/bin/bash BASEDIR=$(dirname $0) cd $BASEDIR CURRENT_DIR=`pwd`  exec >> /tmp/log/monitor.log 2>&1 echo "==============================================" date function check() {     PNAME=$1     PID=$2     CPU_USE=$(ps -p $PID -o %cpu | sed -n '2p')     INT_CPU_USE=$(printf "%.0f/n" $CPU_USE)     echo $PNAME" - "$CPU_USE" - "$INT_CPU_USE      if [ $INT_CPU_USE -gt 85 ]     then        echo "$PNAME cpu usage greater than 85%,do restart"        ./logstashd.sh restart $PNAME     fi } export -f check ./logstashd.sh status | awk '{print "-", $1, $4}' | awk -F',' '{print $1}' | xargs -n3 bash -c 'check $@'

数据盘满了导致集群状态yellow

机器节点本身有1T 硬盘, 由两块盘组成, 配置es的时候, 数据分别写到了两个盘上, 然后有一天集群状态告警了

"status" : "yellow",

查看es的日志

[2016-03-21 12:43:45,934][INFO ][cluster.routing.allocation.decider] [node_01] low disk watermark [85%] exceeded on [AvemqKN-RGKy68zJXUapBg][node_01][/data/LogNewData/xxx/nodes/0] free: 75.5gb[14.1%], replicas will not be assigned to this node

处理: 腾磁盘空间出来, es会自动检测恢复

PS: 磁盘大小要预估好

查看redis中队列的堆积

历史遗留问题, 有些节点采集发送到redis的key, 在indexer阶段并没有被消费, 导致越堆越多….

这时候, 可以通过redis查下哪些队列堆积了

bin/redis-cli -h 127.0.0.1 -p 6379 -a blueking_log --bigkeys

需要redis版本支持 bigkeys => This is a "new" feature beginning with 2.8

解析失败丢弃及黑名单实现

grok解析失败, 丢弃

if ("_grokparsefailure" in [tags]) {     drop {} }

有时候, 需要禁止采集某些文件, 但由于 file 类型的 exclude 只能用文件名, 而没有更强大的规则, 所以只能采集进来再丢弃, 此时, 可以根据路径grok解析出关键字, 然后判断丢弃

if ([keyworod] in ["data", "not_exists"]) {     drop {} }

启动限制使用的worker数

默认情况, 有可能把所有cpu跑满, 这时候, 可以专门加下

-w, --pipeline-workers COUNT  Sets the number of pipeline workers to run. (default: 24)  logstash agent -f conf/xxx.conf -w 2

几个简单脚本

health.sh

#!/bin/bash curl 'http://127.0.0.1:9200/_cluster/health?pretty=true'

indices.sh

#!/bin/bash curl 'http://127.0.0.1:9200/_cat/indices?v' | sort -k 3

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » ELK维护的一些点(二)

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址