Skip to content

Commit

Permalink
Thread scheduling improvements (#61)
Browse files Browse the repository at this point in the history
Do not starve other collectors if one collector is pushing too much data
Do not sleep when there is data to be data to be read/sent
  • Loading branch information
rshivane committed Aug 14, 2018
1 parent 704f604 commit 8a7fb93
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 15 deletions.
81 changes: 74 additions & 7 deletions collectors/0/udp_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@

import socket
import sys
import threading
import time

try:
from queue import Queue, Empty, Full
except ImportError:
from Queue import Queue, Empty, Full

from collectors.lib import utils

try:
Expand All @@ -27,6 +34,60 @@
PORT = 8953
SIZE = 8192

MAX_UNFLUSHED_DATA = 8192
MAX_PACKETS_IN_MEMORY = 100

ALIVE = True
FLUSH_BEFORE_EXIT = udp_bridge_conf and udp_bridge_conf.flush_before_exit()


class ReaderQueue(Queue):

def nput(self, value):
try:
self.put(value, False)
except Full:
utils.err("DROPPED LINES [%d] bytes" % len(value))
return False
return True


class SenderThread(threading.Thread):

def __init__(self, readerq, flush_delay):
super(SenderThread, self).__init__()
self.readerq = readerq
self.flush_delay = flush_delay

def run(self):
unflushed_data_len = 0
flush_timeout = int(time.time())
global ALIVE
queue_is_empty = False
while ALIVE or (FLUSH_BEFORE_EXIT and not queue_is_empty):
try:
data = self.readerq.get(True, 2)
trace("DEQUEUED")
print(data)
trace("PRINTED")
unflushed_data_len += len(data)
queue_is_empty = False
except Empty:
queue_is_empty = True

now = int(time.time())
if unflushed_data_len > MAX_UNFLUSHED_DATA or now > flush_timeout:
flush_timeout = now + self.flush_delay
sys.stdout.flush()
unflushed_data_len = 0

trace("SENDER THREAD EXITING")


def trace(msg):
# utils.err(msg)
pass


def main():
if not (udp_bridge_conf and udp_bridge_conf.enabled()):
Expand Down Expand Up @@ -54,27 +115,33 @@ def removePut(line):
except AttributeError:
flush_delay = 60

flush_timeout = int(time.time())
global ALIVE
readerq = ReaderQueue(MAX_PACKETS_IN_MEMORY)
sender = SenderThread(readerq, flush_delay)
sender.start()

try:
try:
while 1:
while ALIVE:
data, address = sock.recvfrom(SIZE)
if data:
trace("Read packet:")
lines = data.splitlines()
data = '\n'.join(map(removePut, lines))
trace(data)
if not data:
utils.err("invalid data")
break
print(data)
now = int(time.time())
if now > flush_timeout:
sys.stdout.flush()
flush_timeout = now + flush_delay
readerq.nput(data)
trace("ENQUEUED")

except KeyboardInterrupt:
utils.err("keyboard interrupt, exiting")
finally:
ALIVE = False
sock.close()
sender.join()
trace("MAIN THREAD EXITING")


if __name__ == "__main__":
Expand Down
4 changes: 4 additions & 0 deletions collectors/etc/udp_bridge_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ def flush_delay():

def usetcp():
return False


def flush_before_exit():
return False
6 changes: 6 additions & 0 deletions mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,19 @@ def __init__(self):
def write(self, outString):
self.lines.append(outString)

def flush(self):
pass

class Stdout():
def __init__(self):
self.lines = []

def write(self, outString):
self.lines.append(outString)

def flush(self):
pass

class Utils():
def __init__(self):
self.drop_privileges = lambda: None
Expand Down
44 changes: 36 additions & 8 deletions tcollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import sys
import threading
import time
from datetime import datetime
import json
import base64
import zlib
Expand Down Expand Up @@ -322,11 +323,11 @@ def run(self):
# while breaking out every once in a while to setup selects
# on new children.
while ALIVE:
lines_processed = 0
lines_dropped_init = self.lines_dropped
alc = all_living_collectors()
for col in alc:
for line in col.collect():
self.process_line(col, line)
lines_processed += self.process_collector(col)

if self.lines_dropped > lines_dropped_init:
LOG.error("DROPPED LINES: %s", self.lines_dropped - lines_dropped_init)
Expand All @@ -339,9 +340,28 @@ def run(self):
for col in all_collectors():
col.evict_old_keys(now)

# and here is the loop that we really should get rid of, this
# just prevents us from spinning right now
time.sleep(1)
LOG.debug('Reader -> readerq size: %s, lines processed: %s', self.readerq.qsize(), lines_processed)
if lines_processed < 100:
# and here is the loop that we really should get rid of, this
# just prevents us from spinning right now
time.sleep(1)

def process_collector(self, col):
LOG.debug('Processing collector (%s)', col.name)
start_time = int(time.time())
lines_processed = 0
for line in col.collect():
self.process_line(col, line)
lines_processed += 1

# Yield if we have been reading from the same collector for too long
if lines_processed % 1000 == 0:
now = int(time.time())
if now - start_time > 1:
LOG.info('Yielding reading output from collector: %s (pid=%d) that started at [%s]', col.name,
col.proc.pid, datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S,%f'))
break
return lines_processed

def process_line(self, col, line):
"""Parses the given line and appends the result to the reader queue."""
Expand Down Expand Up @@ -546,6 +566,7 @@ def run(self):
own packet."""

errors = 0 # How many uncaught exceptions in a row we got.
send_time = int(time.time())
while ALIVE:
try:
self.maintain_conn()
Expand All @@ -555,7 +576,6 @@ def run(self):
except Empty:
continue
self.sendq.append(line)
time.sleep(5) # Wait for more data
while True:
# prevents self.sendq fast growing in case of sending fails
# in send_data()
Expand All @@ -567,8 +587,14 @@ def run(self):
break
self.sendq.append(line)

if ALIVE:
if not ALIVE:
break

now = int(time.time())
if now - send_time > 5 or len(self.sendq) > MAX_SENDQ_SIZE:
LOG.info('Sender -> readerq size: %s', self.reader.readerq.qsize())
self.send_data()
send_time = now
errors = 0 # We managed to do a successful iteration.
except (ArithmeticError, EOFError, EnvironmentError, HTTPException, LookupError, ValueError) as e:
errors += 1
Expand All @@ -583,6 +609,7 @@ def run(self):
LOG.exception('Uncaught exception in SenderThread, going to exit')
shutdown()
raise
pass

def verify_conn(self):
"""Periodically verify that our connection to the TSD is OK
Expand Down Expand Up @@ -800,6 +827,7 @@ def send_data_via_http(self):
print("Would have sent:\n%s" % json.dumps(metrics,
sort_keys=True,
indent=4))
self.sendq = []
return

if (self.current_tsd == -1) or (len(self.hosts) > 1):
Expand Down Expand Up @@ -1439,7 +1467,7 @@ def spawn_collector(col):
# other logic and it makes no sense to update the last spawn time if the
# collector didn't actually start.
col.lastspawn = int(time.time())
# Without setting last_datapoint here, a long running check (>15s) will be
# Without setting last_datapoint here, a long running check (>15s) will be
# killed by check_children() the first time check_children is called.
col.last_datapoint = col.lastspawn
set_nonblocking(col.proc.stdout.fileno())
Expand Down
1 change: 1 addition & 0 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def setUp(self):
self.udp_globals['socket'] = mocks.Socket()
self.udp_globals['sys'] = mocks.Sys()
self.udp_globals['udp_bridge_conf'].enabled = lambda: True
self.udp_globals['udp_bridge_conf'].flush_before_exit = lambda: True
self.udp_globals['utils'] = mocks.Utils()

def exec_script(self):
Expand Down

0 comments on commit 8a7fb93

Please sign in to comment.