diff --git a/files/log-gearman-worker.py b/files/log-gearman-worker.py index 608cbc7..8e9ef10 100644 --- a/files/log-gearman-worker.py +++ b/files/log-gearman-worker.py @@ -192,7 +192,8 @@ class LogRetriever(threading.Thread): }) self.mqtt.publish_single(msg, fields.get('project'), fields.get('build_change'), - 'retrieve_logs') + 'retrieve_logs', + fields.get('build_queue')) except Exception as e: logging.exception("Exception handling log event.") job.sendWorkException(str(e).encode('utf-8')) @@ -204,7 +205,8 @@ class LogRetriever(threading.Thread): }) self.mqtt.publish_single(msg, fields.get('project'), fields.get('build_change'), - 'retrieve_logs') + 'retrieve_logs', + fields.get('build_queue')) def _retrieve_log(self, source_url, retry): # TODO (clarkb): This should check the content type instead of file @@ -321,7 +323,8 @@ class StdOutLogProcessor(object): }) self.mqtt.publish_single(msg, log.get('project'), log.get('build_change'), - 'logs_to_logstash') + 'logs_to_logstash', + log.get('build_queue')) class INETLogProcessor(object): @@ -353,7 +356,8 @@ class INETLogProcessor(object): }) self.mqtt.publish_single(msg, log.get('project'), log.get('build_change'), - 'logs_to_logstash') + 'logs_to_logstash', + log.get('build_queue')) except: logging.exception("Exception sending INET event.") # Logstash seems to take about a minute to start again. Wait 90 @@ -370,7 +374,8 @@ class INETLogProcessor(object): }) self.mqtt.publish_single(msg, log.get('project'), log.get('build_change'), - 'logs_to_logstash') + 'logs_to_logstash', + log.get('build_queue')) class UDPLogProcessor(INETLogProcessor): @@ -397,8 +402,14 @@ class PushMQTT(object): def _generate_topic(self, project, job_id, action): return '/'.join([self.base_topic, project, job_id, action]) - def publish_single(self, msg, project, job_id, action): - topic = self._generate_topic(project, job_id, action) + def publish_single(self, msg, project, job_id, action, build_queue=None): + if job_id: + topic = self._generate_topic(project, job_id, action) + elif build_queue: + topic = self._generate_topic(project, build_queue, action) + else + topic = self.base_topic + '/' + project + publish.single(topic, msg, hostname=self.hostname, port=self.port, client_id=self.client_id, keepalive=self.keepalive, will=self.will,