 a7026aba8a
			
		
	
	a7026aba8a
	
	
	
		
			
			This adds optional SSL support to zookeeper-statsd. This could come in handy if we ever decide to turn off the plaintext localhost-only port. This also corrects the type handling for the latency value, which can be a floating point. Change-Id: Id39fc8bd924eda528723c40d2e7e24993a60d6a5
		
			
				
	
	
		
			168 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			168 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| 
 | |
| # Copyright (C) 2015 Hewlett-Packard Development Company, L.P.
 | |
| # Copyright (C) 2021 Acme Gating, LLC
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #    http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 | |
| # implied.
 | |
| #
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| import logging
 | |
| import re
 | |
| import socket
 | |
| import time
 | |
| import os
 | |
| import ssl
 | |
| 
 | |
| from statsd.defaults.env import statsd
 | |
| 
 | |
| INTERVAL = 10
 | |
| GAUGES = [
 | |
|     'zk_avg_latency',
 | |
|     'zk_min_latency',
 | |
|     'zk_max_latency',
 | |
|     'zk_outstanding_requests',
 | |
|     'zk_znode_count',
 | |
|     'zk_followers',
 | |
|     'zk_synced_followers',
 | |
|     'zk_pending_syncs',
 | |
|     'zk_watch_count',
 | |
|     'zk_ephemerals_count',
 | |
|     'zk_approximate_data_size',
 | |
|     'zk_open_file_descriptor_count',
 | |
|     'zk_max_file_descriptor_count',
 | |
| ]
 | |
| 
 | |
| COUNTERS = [
 | |
|     'zk_packets_received',
 | |
|     'zk_packets_sent',
 | |
| ]
 | |
| 
 | |
| 
 | |
| class Socket:
 | |
|     def __init__(self, host, port, ca_cert, client_cert, client_key):
 | |
|         self.host = host
 | |
|         self.port = port
 | |
|         self.ca_cert = ca_cert
 | |
|         self.client_cert = client_cert
 | |
|         self.client_key = client_key
 | |
|         self.socket = None
 | |
| 
 | |
|     def open(self):
 | |
|         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | |
|         s.settimeout(5)
 | |
|         if self.client_key:
 | |
|             context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
 | |
|             context.load_verify_locations(self.ca_cert)
 | |
|             context.load_cert_chain(self.client_cert, self.client_key)
 | |
|             context.check_hostname = False
 | |
|             s = context.wrap_socket(s, server_hostname=self.host)
 | |
|         s.connect((self.host, self.port))
 | |
|         self.socket = s
 | |
| 
 | |
|     def __enter__(self):
 | |
|         self.open()
 | |
|         return self.socket
 | |
| 
 | |
|     def __exit__(self, etype, value, tb):
 | |
|         self.socket.close()
 | |
|         self.socket = None
 | |
| 
 | |
| 
 | |
| class ZooKeeperStats:
 | |
|     def __init__(self, host, port=None,
 | |
|                  ca_cert=None, client_cert=None, client_key=None):
 | |
|         if client_key:
 | |
|             port = port or 2281
 | |
|         else:
 | |
|             port = port or 2181
 | |
|         self.socket = Socket(host, port, ca_cert, client_cert, client_key)
 | |
|         # The hostname to use when reporting stats (e.g., zk01)
 | |
|         if host in ('localhost', '127.0.0.1', '::1'):
 | |
|             self.hostname = socket.gethostname()
 | |
|         else:
 | |
|             self.hostname = host
 | |
|         self.log = logging.getLogger("ZooKeeperStats")
 | |
|         self.prevdata = {}
 | |
| 
 | |
|     def command(self, command):
 | |
|         with self.socket as socket:
 | |
|             socket.send((command + '\n').encode('utf8'))
 | |
|             data = ''
 | |
|             while True:
 | |
|                 r = socket.recv(4096)
 | |
|                 data += r.decode('utf8')
 | |
|                 if not r:
 | |
|                     break
 | |
|             return data
 | |
| 
 | |
|     def getStats(self):
 | |
|         data = self.command('mntr')
 | |
|         lines = data.split('\n')
 | |
|         ret = []
 | |
|         for line in lines:
 | |
|             if not line:
 | |
|                 continue
 | |
|             if '\t' not in line:
 | |
|                 continue
 | |
|             key, value = line.split('\t')
 | |
|             ret.append((key, value))
 | |
|         return dict(ret)
 | |
| 
 | |
|     def reportStats(self, stats):
 | |
|         pipe = statsd.pipeline()
 | |
|         base = 'zk.%s.' % (self.hostname,)
 | |
|         for key in GAUGES:
 | |
|             try:
 | |
|                 value = stats.get(key, '0')
 | |
|                 if '.' in value:
 | |
|                     value = float(value)
 | |
|                 else:
 | |
|                     value = int(value)
 | |
|                 pipe.gauge(base + key, value)
 | |
|             except Exception:
 | |
|                 self.log.exception("Unable to process %s", key)
 | |
|         for key in COUNTERS:
 | |
|             try:
 | |
|                 newvalue = int(stats.get(key, 0))
 | |
|                 oldvalue = self.prevdata.get(key)
 | |
|                 if oldvalue is not None:
 | |
|                     value = newvalue - oldvalue
 | |
|                     pipe.incr(base + key, value)
 | |
|                 self.prevdata[key] = newvalue
 | |
|             except Exception:
 | |
|                 self.log.exception("Unable to process %s", key)
 | |
|         pipe.send()
 | |
| 
 | |
|     def run(self):
 | |
|         while True:
 | |
|             try:
 | |
|                 self._run()
 | |
|             except Exception:
 | |
|                 self.log.exception("Exception in main loop:")
 | |
| 
 | |
|     def _run(self):
 | |
|         time.sleep(INTERVAL)
 | |
|         stats = self.getStats()
 | |
|         self.reportStats(stats)
 | |
| 
 | |
| 
 | |
| ca_cert = os.environ.get("ZK_CA_CERT")
 | |
| client_cert = os.environ.get("ZK_CLIENT_CERT")
 | |
| client_key = os.environ.get("ZK_CLIENT_KEY")
 | |
| 
 | |
| logging.basicConfig(level=logging.DEBUG)
 | |
| p = ZooKeeperStats('localhost', ca_cert=ca_cert,
 | |
|                    client_cert=client_cert, client_key=client_key)
 | |
| p.run()
 |