Add severity info to logstash and filter out DEBUG lines
This adds severity as a logstash field for every oslo formatted log line, and does not add any lines which are at DEBUG level. This means we no longer rely on the level=INFO query paremeter in order to remove DEBUG lines, so we will avoid sending them to logstash regardless of whether os-loganalyze is used. Change-Id: I8c4ac76a7fa0c3badd82fc7c54959ef6eb052732
This commit is contained in:
parent
8a8693dabd
commit
625bb48d13
@ -76,7 +76,7 @@ class CRM114Filter(object):
|
|||||||
|
|
||||||
def process(self, data):
|
def process(self, data):
|
||||||
if not self.p:
|
if not self.p:
|
||||||
return
|
return True
|
||||||
self.p.stdin.write(data['message'].encode('utf-8') + '\n')
|
self.p.stdin.write(data['message'].encode('utf-8') + '\n')
|
||||||
(r, w, x) = select.select([self.p.stdout], [],
|
(r, w, x) = select.select([self.p.stdout], [],
|
||||||
[self.p.stdin, self.p.stdout], 20)
|
[self.p.stdin, self.p.stdout], 20)
|
||||||
@ -92,6 +92,7 @@ class CRM114Filter(object):
|
|||||||
raise FilterException('Early EOF from CRM114')
|
raise FilterException('Early EOF from CRM114')
|
||||||
r = r.strip()
|
r = r.strip()
|
||||||
data['error_pr'] = float(r)
|
data['error_pr'] = float(r)
|
||||||
|
return True
|
||||||
|
|
||||||
def _catchOSError(self, method):
|
def _catchOSError(self, method):
|
||||||
try:
|
try:
|
||||||
@ -133,6 +134,35 @@ class CRM114FilterFactory(object):
|
|||||||
return CRM114Filter(self.script, path, fields['build_status'])
|
return CRM114Filter(self.script, path, fields['build_status'])
|
||||||
|
|
||||||
|
|
||||||
|
class SeverityFilter(object):
|
||||||
|
DATEFMT = '\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}((\.|\,)\d{3,6})?'
|
||||||
|
SEVERITYFMT = '(DEBUG|INFO|WARNING|ERROR|TRACE|AUDIT|CRITICAL)'
|
||||||
|
OSLO_LOGMATCH = ('^(?P<date>%s)(?P<line>(?P<pid> \d+)? '
|
||||||
|
'(?P<severity>%s).*)' %
|
||||||
|
(DATEFMT, SEVERITYFMT))
|
||||||
|
OSLORE = re.compile(OSLO_LOGMATCH)
|
||||||
|
|
||||||
|
def process(self, data):
|
||||||
|
msg = data['message']
|
||||||
|
m = self.OSLORE.match(msg)
|
||||||
|
if m:
|
||||||
|
data['severity'] = m.group('severity')
|
||||||
|
if data['severity'].lower == 'debug':
|
||||||
|
# Ignore debug-level lines
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class SeverityFilterFactory(object):
|
||||||
|
name = "Severity"
|
||||||
|
|
||||||
|
def create(self, fields):
|
||||||
|
return SeverityFilter()
|
||||||
|
|
||||||
|
|
||||||
class LogRetriever(threading.Thread):
|
class LogRetriever(threading.Thread):
|
||||||
def __init__(self, gearman_worker, filters, logq, mqtt=None):
|
def __init__(self, gearman_worker, filters, logq, mqtt=None):
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
@ -179,17 +209,22 @@ class LogRetriever(threading.Thread):
|
|||||||
base_event.update(fields)
|
base_event.update(fields)
|
||||||
base_event["tags"] = tags
|
base_event["tags"] = tags
|
||||||
for line in log_lines:
|
for line in log_lines:
|
||||||
|
keep_line = True
|
||||||
out_event = base_event.copy()
|
out_event = base_event.copy()
|
||||||
out_event["message"] = line
|
out_event["message"] = line
|
||||||
new_filters = []
|
new_filters = []
|
||||||
for f in filters:
|
for f in filters:
|
||||||
|
if not keep_line:
|
||||||
|
new_filters.append(f)
|
||||||
|
continue
|
||||||
try:
|
try:
|
||||||
f.process(out_event)
|
keep_line = f.process(out_event)
|
||||||
new_filters.append(f)
|
new_filters.append(f)
|
||||||
except FilterException:
|
except FilterException:
|
||||||
logging.exception("Exception filtering event: "
|
logging.exception("Exception filtering event: "
|
||||||
"%s" % line.encode("utf-8"))
|
"%s" % line.encode("utf-8"))
|
||||||
filters = new_filters
|
filters = new_filters
|
||||||
|
if keep_line:
|
||||||
self.logq.put(out_event)
|
self.logq.put(out_event)
|
||||||
finally:
|
finally:
|
||||||
for f in all_filters:
|
for f in all_filters:
|
||||||
@ -420,6 +455,9 @@ class Server(object):
|
|||||||
self.logqueue = Queue.Queue(16384)
|
self.logqueue = Queue.Queue(16384)
|
||||||
self.processor = None
|
self.processor = None
|
||||||
self.filter_factories = []
|
self.filter_factories = []
|
||||||
|
# Run the severity filter first so it can filter out chatty
|
||||||
|
# logs.
|
||||||
|
self.filter_factories.append(SeverityFilterFactory())
|
||||||
crmscript = self.config.get('crm114-script')
|
crmscript = self.config.get('crm114-script')
|
||||||
crmdata = self.config.get('crm114-data')
|
crmdata = self.config.get('crm114-data')
|
||||||
if crmscript and crmdata:
|
if crmscript and crmdata:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user