Only send mqtt events for processed files
We were previously sending events for every file we attempted to process, not just those that were processed and also for every single log line event. This effectively doubled the io performed by the logstash workers which seemed to slow the whole pipeline down. Trim it down to only recording events for log files that are processed which should significantly trim down the total number of events. Change-Id: I0daf3eb2e2b3240e3efa4f2c7bac57de99505df0
This commit is contained in:
parent
becc05e0aa
commit
88e0d21347
@ -140,6 +140,7 @@ class LogRetriever(threading.Thread):
|
||||
|
||||
def _handle_event(self):
|
||||
fields = {}
|
||||
log_lines = None
|
||||
source_url = ''
|
||||
job = self.gearman_worker.getJob()
|
||||
try:
|
||||
@ -184,7 +185,8 @@ class LogRetriever(threading.Thread):
|
||||
for f in all_filters:
|
||||
f.close()
|
||||
job.sendWorkComplete()
|
||||
if self.mqtt:
|
||||
# Only send mqtt events for log files we processed.
|
||||
if self.mqtt and log_lines:
|
||||
msg = json.dumps({
|
||||
'build_uuid': fields.get('build_uuid'),
|
||||
'source_url': source_url,
|
||||
@ -301,10 +303,9 @@ class LogRetriever(threading.Thread):
|
||||
|
||||
|
||||
class StdOutLogProcessor(object):
|
||||
def __init__(self, logq, pretty_print=False, mqtt=None):
|
||||
def __init__(self, logq, pretty_print=False):
|
||||
self.logq = logq
|
||||
self.pretty_print = pretty_print
|
||||
self.mqtt = mqtt
|
||||
|
||||
def handle_log_event(self):
|
||||
log = self.logq.get()
|
||||
@ -315,27 +316,16 @@ class StdOutLogProcessor(object):
|
||||
print(json.dumps(log))
|
||||
# Push each log event through to keep logstash up to date.
|
||||
sys.stdout.flush()
|
||||
if self.mqtt:
|
||||
msg = json.dumps({
|
||||
'build_uuid': log.get('build_uuid'),
|
||||
'source_url': log.get('log_url'),
|
||||
'status': 'success',
|
||||
})
|
||||
self.mqtt.publish_single(msg, log.get('project'),
|
||||
log.get('build_change'),
|
||||
'logs_to_logstash',
|
||||
log.get('build_queue'))
|
||||
|
||||
|
||||
class INETLogProcessor(object):
|
||||
socket_type = None
|
||||
|
||||
def __init__(self, logq, host, port, mqtt=None):
|
||||
def __init__(self, logq, host, port):
|
||||
self.logq = logq
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.socket = None
|
||||
self.mqtt = mqtt
|
||||
|
||||
def _connect_socket(self):
|
||||
logging.debug("Creating socket.")
|
||||
@ -348,16 +338,6 @@ class INETLogProcessor(object):
|
||||
if self.socket is None:
|
||||
self._connect_socket()
|
||||
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
|
||||
if self.mqtt:
|
||||
msg = json.dumps({
|
||||
'build_uuid': log.get('build_uuid'),
|
||||
'source_url': log.get('log_url'),
|
||||
'status': 'success',
|
||||
})
|
||||
self.mqtt.publish_single(msg, log.get('project'),
|
||||
log.get('build_change'),
|
||||
'logs_to_logstash',
|
||||
log.get('build_queue'))
|
||||
except:
|
||||
logging.exception("Exception sending INET event.")
|
||||
# Logstash seems to take about a minute to start again. Wait 90
|
||||
@ -367,15 +347,6 @@ class INETLogProcessor(object):
|
||||
semi_busy_wait(90)
|
||||
self._connect_socket()
|
||||
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
|
||||
if self.mqtt:
|
||||
msg = json.dumps({
|
||||
'build_uuid': log.get('build_uuid'),
|
||||
'status': 'success',
|
||||
})
|
||||
self.mqtt.publish_single(msg, log.get('project'),
|
||||
log.get('build_change'),
|
||||
'logs_to_logstash',
|
||||
log.get('build_queue'))
|
||||
|
||||
|
||||
class UDPLogProcessor(INETLogProcessor):
|
||||
@ -496,17 +467,15 @@ class Server(object):
|
||||
if self.output_mode == "tcp":
|
||||
self.processor = TCPLogProcessor(self.logqueue,
|
||||
self.output_host,
|
||||
self.output_port,
|
||||
mqtt=self.mqtt)
|
||||
self.output_port)
|
||||
elif self.output_mode == "udp":
|
||||
self.processor = UDPLogProcessor(self.logqueue,
|
||||
self.output_host,
|
||||
self.output_port,
|
||||
mqtt=self.mqtt)
|
||||
self.output_port)
|
||||
else:
|
||||
# Note this processor will not work if the process is run as a
|
||||
# daemon. You must use the --foreground option.
|
||||
self.processor = StdOutLogProcessor(self.logqueue, mqtt=self.mqtt)
|
||||
self.processor = StdOutLogProcessor(self.logqueue)
|
||||
|
||||
def main(self):
|
||||
self.setup_retriever()
|
||||
|
Loading…
x
Reference in New Issue
Block a user