Update curator to better metric storage

Now that the roll-up has been implemented the original shrink method is
no longer required or useful. This change cleans up things up.

Change-Id: I24fd5b4daafc2f48ee5a3421f6b58b157a7aff6c
Signed-off-by: Kevin Carter <kevin.carter@rackspace.com>
This commit is contained in:
Kevin Carter 2018-08-13 22:00:44 -05:00 committed by Kevin Carter (cloudnull)
parent 5f2fb9b022
commit 6da0fca375
7 changed files with 150 additions and 107 deletions

View File

@ -32,5 +32,5 @@
state: "started" state: "started"
options: options:
OnBootSec: 30min OnBootSec: 30min
OnUnitActiveSec: 24h OnUnitActiveSec: 6h
Persistent: true Persistent: true

View File

@ -17,7 +17,7 @@
cron: cron:
name: "Run curator" name: "Run curator"
minute: 0 minute: 0
hour: 2 hour: */6
user: "curator" user: "curator"
job: "/opt/elasticsearch-curator/bin/curator --config /var/lib/curator/curator.yml /var/lib/curator/actions.yml" job: "/opt/elasticsearch-curator/bin/curator --config /var/lib/curator/curator.yml /var/lib/curator/actions.yml"
cron_file: "elasticsearch-curator" cron_file: "elasticsearch-curator"

View File

@ -17,11 +17,16 @@
{# Delete index loop #} {# Delete index loop #}
{% for key in elastic_beat_retention_policy_keys -%} {% for key in elastic_beat_retention_policy_keys -%}
{% set delete_indices = {} -%} {% set delete_indices = {} -%}
{% set index_retention = hostvars[inventory_hostname]['elastic_' + key + '_retention'] -%} {# Total retention size in days #}
{% set _index_retention = hostvars[inventory_hostname]['elastic_' + key + '_retention'] -%}
{% set index_retention = ((_index_retention | int) > 0) | ternary(_index_retention, 1) | int %}
{# Total retention size in gigabytes #}
{% set _index_size = ((hostvars[inventory_hostname]['elastic_' + key + '_size'] | int) // 1024) -%}
{% set index_size = ((_index_size | int) > 0) | ternary(_index_size, 1) | int %}
{% set _ = delete_indices.update( {% set _ = delete_indices.update(
{ {
'action': 'delete_indices', 'action': 'delete_indices',
'description': 'Prune indices for ' + key + ' after ' ~ (index_retention | int) ~ ' days.', 'description': 'Prune indices for ' + key + ' after ' ~ index_retention ~ ' days or index is > ' ~ index_size ~ 'gb',
'options': { 'options': {
'ignore_empty_list': true, 'ignore_empty_list': true,
'disable_action': false 'disable_action': false
@ -29,7 +34,6 @@
} }
) )
-%} -%}
{# add the filter loop #}
{% set filters = [] -%} {% set filters = [] -%}
{% set _ = filters.append( {% set _ = filters.append(
{ {
@ -39,6 +43,15 @@
} }
) )
-%} -%}
{% set _ = filters.append(
{
'filtertype': 'space',
'disk_space': index_size,
'use_age': true,
'source': 'creation_date'
}
)
-%}
{% set _ = filters.append( {% set _ = filters.append(
{ {
'filtertype': 'age', 'filtertype': 'age',
@ -46,72 +59,12 @@
'direction': 'older', 'direction': 'older',
'timestring': '%Y.%m.%d', 'timestring': '%Y.%m.%d',
'unit': 'days', 'unit': 'days',
'unit_count': (index_retention | int) 'unit_count': index_retention
} }
) )
-%} -%}
{% set _ = delete_indices.update({'filters': filters}) -%} {% set _ = delete_indices.update({'filters': filters}) -%}
{% set _ = action_items.append(delete_indices) -%} {% set _ = action_items.append(delete_indices) -%}
{# Set shrink curator options #}
{% set shrink_indices = {} -%}
{% set _ = shrink_indices.update(
{
'action': 'shrink',
'description': 'Shrink ' + key + ' indices older than ' ~ (index_retention | int) // 4 ~ ' days',
'options': {
"disable_action": false,
"ignore_empty_list": true,
"shrink_node": "DETERMINISTIC",
"node_filters": {
"permit_masters": ((master_nodes | length) < (data_nodes | length)) | ternary(true, false),
"exclude_nodes": (groups['kibana'] | map('extract', hostvars, 'ansible_host') | list)
},
"number_of_shards": 1,
"number_of_replicas": (elasticsearch_number_of_replicas | int),
"shrink_suffix": '-shrink',
"copy_aliases": true,
"delete_after": true,
"post_allocation": {
"allocation_type": "include",
"key": "node_tag",
"value": "cold"
},
"wait_for_active_shards": 1,
"extra_settings": {
"settings": {
"index.codec": "best_compression"
}
},
"wait_for_completion": true,
"wait_for_rebalance": true,
"wait_interval": 9,
"max_wait": -1
}
}
)
-%}
{% set filters = [] -%}
{% set _ = filters.append(
{
'filtertype': 'pattern',
'kind': 'prefix',
'value': key + '-'
}
)
-%}
{% set _ = filters.append(
{
'filtertype': 'age',
'source': 'creation_date',
'direction': 'older',
'unit': 'days',
'unit_count': (index_retention | int) // 4
}
)
-%}
{% set _ = shrink_indices.update({'filters': filters}) -%}
{% set _ = action_items.append(shrink_indices) -%}
{% endfor -%} {% endfor -%}
{% set actions = {} -%} {% set actions = {} -%}

View File

@ -16,7 +16,9 @@
elastic_index_retention_algorithm: default elastic_index_retention_algorithm: default
### Elastic curator variables ### Elastic curator variables
## Default retention policy options. All retention options are in days. ## If any of these retention policy option are undefined a dynamic fact will be
## generated.
## These options are all in days.
# elastic_logstash_retention: 1 # elastic_logstash_retention: 1
# elastic_apm_retention: 1 # elastic_apm_retention: 1
# elastic_auditbeat_retention: 1 # elastic_auditbeat_retention: 1
@ -26,25 +28,69 @@ elastic_index_retention_algorithm: default
# elastic_metricbeat_retention: 1 # elastic_metricbeat_retention: 1
# elastic_packetbeat_retention: 1 # elastic_packetbeat_retention: 1
# This is used to calculate the storage a beat could generate per node, per day. ## These options are all in megabytes.
# This constant is used as a multiplier. If the expected storage is larger than # elastic_logstash_size: 1024
# the actual available storage after the buffer is calculated the multiplier # elastic_apm_size: 1024
# will be doubled there-by cutting the potential storage days in half. # elastic_auditbeat_size: 1024
elastic_beat_storage_constant: 512 # elastic_filebeat_size: 1024
# elastic_heartbeat_size: 1024
# elastic_journalbeat_size: 1024
# elastic_metricbeat_size: 1024
# elastic_packetbeat_size: 1024
## WHen a static retention policy option is not defined these options will be
## used for dynamic fact generation.
##
## Facts will be generated for the general retention using the total available
## storage from the ES data nodes, subtracting 25%. Using the weights, each
## index will be given a percentage of the total available storage. Indexes with
## higher weights are expected to use more storage. The list of hosts in a given
## index will be used to determine the number of days data can exist within an
## index before it's pruned.
## Example:
# es cluster has 4TiB of storage
# filebeat is deployed to 100 hosts
# filebeat has a weight of 10
# metricbeat is deployed to 125 hosts
# metricbeat has a weight of 2
#
# es storage in MiB: 4194304
# hosts and weighting total: (100 + 125) x (10 + 2) = 2700
# filebeat pct: (100 x 10) / 2700 = 0.37
# filebeat storage allowed: 0.37 * 4194304 = 1551892.48 MiB
# filebeat days allowed: 1551892.48 / (100 * 1024) = 15.1552 Days
# filebeat result: 15 days of retention or 1.5TiB of storage, whatever comes first
# metricbeat pct: (125 x 2) / 2700 = 0.09
# metricbeat storage allowed: 0.09 * 4194304 = 377487.36 MiB
# metricbeat days allowed: 377487.36 / (125 * 1024) = 2.94912 Days
# metricbeat result: 2 days of retention or 38GiB of storage, whatever comes first
## If any retention policy option is undefined a dynamic fact will be generated.
## Fact will be generated for the general retention using the storage constant
## per node, per index, where a given collector is expected to be deployed. The
## equation used will take the total available storage from the ES data nodes
## subtract 25% divided by the total number of data nodes. That is then divided
## by number of hosts assumed to be a beat target which is multiplied by the
## storage constant.
elastic_beat_retention_policy_hosts: elastic_beat_retention_policy_hosts:
logstash: "{{ groups['elastic-logstash'] | default([null]) | length }}" logstash:
apm: "{{ groups['apm-server'] | default([null]) | length }}" weight: 1
auditbeat: "{{ (groups['hosts'] | default([null]) | length) * 2 }}" hosts: "{{ groups['elastic-logstash'] | default([]) }}"
filebeat: "{{ (groups['hosts'] | default([null]) | length) * 2 }}" apm:
heartbeat: "{{ groups['kibana'][:3] | default([null]) | length }}" weight: 1
journalbeat: "{{ (groups['all'] | default([null]) | length) * 1.5 }}" hosts: "{{ groups['apm-server'] | default([]) }}"
metricbeat: "{{ (groups['all'] | default([null]) | length) * 1.5 }}" auditbeat:
packetbeat: "{{ (groups['hosts'] | default([null]) | length) * 5 }}" weight: 10
hosts: "{{ groups['hosts'] | default([]) }}"
filebeat:
weight: 10
hosts: "{{ groups['hosts'] | default([]) }}"
syslog:
weight: 1
hosts: "{{ groups['hosts'] | default([]) }}"
heartbeat:
weight: 1
hosts: "{{ groups['kibana'][:3] | default([]) }}"
journalbeat:
weight: 3
hosts: "{{ groups['all'] | default([]) }}"
metricbeat:
weight: 2
hosts: "{{ groups['all'] | default([]) }}"
packetbeat:
weight: 1
hosts: "{{ groups['hosts'] | default([]) }}"

View File

@ -18,21 +18,32 @@
url: "http://{{ coordination_nodes[0] }}/_nodes/{{ (data_nodes | map('extract', hostvars, 'ansible_host') | list) | join(',') }}/stats/fs" url: "http://{{ coordination_nodes[0] }}/_nodes/{{ (data_nodes | map('extract', hostvars, 'ansible_host') | list) | join(',') }}/stats/fs"
method: GET method: GET
register: elk_data register: elk_data
until: elk_data is success until:
- elk_data is success and elk_data['json'] is defined
retries: 5 retries: 5
delay: 5 delay: 10
run_once: true run_once: true
- name: Load data node variables - name: Set retention keys fact
set_fact:
es_storage_json: "{{ elk_data['json'] }}"
- name: Load retention algo variables
include_vars: "calculate_index_retention_{{ elastic_index_retention_algorithm }}.yml" include_vars: "calculate_index_retention_{{ elastic_index_retention_algorithm }}.yml"
tags: tags:
- always - always
- name: Set retention facts - name: Set retention facts (mb size)
set_fact: "elastic_{{ item.key }}_retention={{ (es_assumed_usable_storage_per_node | int) // ((item.value | int) * (es_storage_multiplier | int)) }}" set_fact: "elastic_{{ item.key }}_size={{ item.value }}"
when: when:
- hostvars[inventory_hostname]["elastic_" + item.key + "_retention"] is undefined - hostvars[inventory_hostname]["elastic_" ~ item.key ~ "_size"] is undefined
with_dict: "{{ elastic_beat_retention_policy_hosts }}" with_dict: "{{ es_storage_per_index }}"
- name: Set retention facts (days)
set_fact: "elastic_{{ item.key }}_retention={{ item.value }}"
when:
- hostvars[inventory_hostname]["elastic_" ~ item.key ~ "_retention"] is undefined
with_dict: "{{ es_days_per_index }}"
- name: Set retention keys fact - name: Set retention keys fact
set_fact: set_fact:

View File

@ -13,18 +13,46 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# Set available storage fact # Set available storage fact. This tasks the total amount of storage found
es_total_available_storage: "{{ ((elk_data['json']['nodes'].values() | list) | map(attribute='fs.total.total_in_bytes') | list | sum) // 1024 // 1024 }}" # within the data nodes of the elasticsearch cluster and converts bytes to
# megabytes.
es_total_available_storage: "{{ ((es_storage_json['nodes'].values() | list) | map(attribute='fs.total.total_in_bytes') | list | sum) // 1024 // 1024 }}"
# Set assumed buffer storage fact # Set assumed buffer storage fact. This will result in 25% of the total
# available storage.
es_assumed_buffer_storage: "{{ ((es_total_available_storage | int) * 0.25) | round | int }}" es_assumed_buffer_storage: "{{ ((es_total_available_storage | int) * 0.25) | round | int }}"
# Set usable buffer storage fact(s) # Set usable buffer storage fact(s). This is the toal storage minus the buffer.
es_usable_buffer_storage: "{{ (es_total_available_storage | int) - (es_assumed_buffer_storage | int) }}" es_usable_buffer_storage: "{{ (es_total_available_storage | int) - (es_assumed_buffer_storage | int) }}"
es_expected_storage: "{{ ((elastic_beat_retention_policy_hosts.values() | map('int') | list) | sum) * (elastic_beat_storage_constant | int) }}"
# Set buffer storage fact # This function will take the sum total of all hosts in the retention policy
es_assumed_usable_storage_per_node: "{{ (es_usable_buffer_storage | int) // (data_nodes | length | int) }}" # after weighting. Once the policy is set the sum total will be carved up into
# individual percentages of the total amount of usable storage after the buffer
# is calculated.
es_storage_per_index: |-
{%- set es_hash = {} %}
{%- set total_weight = (elastic_beat_retention_policy_hosts.values() | list | map(attribute='weight') | list | sum) %}
{%- set host_count = (elastic_beat_retention_policy_hosts.values() | list | map(attribute='hosts') | list | map('flatten') | list | length) %}
{%- set total_values = (total_weight | int) * (host_count | int) %}
{%- for key, value in elastic_beat_retention_policy_hosts.items() %}
{%- set value_pct = (((value.weight | int) * (value.hosts | length)) / (total_values | int)) %}
{%- set value_total = ((value_pct | float) * (es_usable_buffer_storage | int)) %}
{%- set _ = es_hash.__setitem__(key, value_total | int) %}
{%- endfor %}
{{ es_hash }}
# Set storage the mulitplier # The assumed number of days an index will be retained is based on the size of
es_storage_multiplier: "{{ ((es_usable_buffer_storage | int) < (es_expected_storage | int)) | ternary(((elastic_beat_storage_constant | int) * 2), elastic_beat_storage_constant | int) }}" # the given index. With the sizes all figured out in the function above this
# function will divide each retention size be a constant of 1024 and the number
# of hosts within a given collector segment.
es_days_per_index: |-
{%- set es_hash = {} %}
{%- for key, value in elastic_beat_retention_policy_hosts.items() %}
{%- if (es_storage_per_index[key] | int) > 0 %}
{%- set value_days = ((es_storage_per_index[key] | int) // ((value.hosts | length) * 1024)) %}
{%- set _ = es_hash.__setitem__(key, ((value_days | int) > 0) | ternary(value_days, 1) ) %}
{%- else %}
{%- set _ = es_hash.__setitem__(key, 1) %}
{%- endif %}
{%- endfor %}
{{ es_hash }}

View File

@ -37,16 +37,21 @@
- name: Create rollup block - name: Create rollup block
block: block:
- name: Set retention days fact - name: Set min retention days fact
set_fact: set_fact:
days_until_rollup: |- min_days_until_rollup: |-
{% set index_retention = [] %} {% set index_retention = [] %}
{% for item in ansible_play_hosts %} {% for item in ansible_play_hosts %}
{% set _ = index_retention.append((hostvars[item]['elastic_' + index_name + '_retention'] | int) // 3) %} {% set _ = index_retention.append(hostvars[item]['elastic_' + index_name + '_retention'] | int) %}
{% endfor %} {% endfor %}
{{ index_retention | min }} {{ index_retention | min }}
run_once: true run_once: true
- name: Set retention days fact
set_fact:
days_until_rollup: "{{ ((min_days_until_rollup | int) > 1) | ternary(((min_days_until_rollup | int) - 1), min_days_until_rollup) }}"
run_once: true
- name: Create rollup job - name: Create rollup job
uri: uri:
url: "{{ item.url }}" url: "{{ item.url }}"
@ -59,7 +64,7 @@
retries: 5 retries: 5
delay: 5 delay: 5
when: when:
- hostvars[inventory_hostname]['elastic_' + index_name + '_retention'] > days_until_rollup - (days_until_rollup | int) > 0
with_items: with_items:
- url: "http://{{ coordination_nodes[0] }}/_xpack/rollup/job/rollup_{{ index_name }}/_stop" - url: "http://{{ coordination_nodes[0] }}/_xpack/rollup/job/rollup_{{ index_name }}/_stop"
method: POST method: POST