Merge "Expose the gerrit watcher as a thread with defined transitions"

This commit is contained in:
Jenkins 2014-03-17 16:29:54 +00:00 committed by Gerrit Code Review
commit 5c85312837

View File

@ -23,8 +23,15 @@ import time
import paramiko import paramiko
CONNECTED = 'connected'
CONNECTING = 'connecting'
CONSUMING = 'consuming'
DEAD = 'dead'
DISCONNECTED = 'disconnected'
IDLE = 'idle'
class GerritWatcher(object):
class GerritWatcher(threading.Thread):
log = logging.getLogger("gerrit.GerritWatcher") log = logging.getLogger("gerrit.GerritWatcher")
def __init__( def __init__(
@ -37,6 +44,7 @@ class GerritWatcher(object):
All other parameters are optional and if not supplied are sourced from All other parameters are optional and if not supplied are sourced from
the gerrit instance. the gerrit instance.
""" """
super(GerritWatcher, self).__init__()
assert retry_delay >= 0, "Retry delay must be >= 0" assert retry_delay >= 0, "Retry delay must be >= 0"
self.username = username or gerrit.username self.username = username or gerrit.username
self.keyfile = keyfile or gerrit.keyfile self.keyfile = keyfile or gerrit.keyfile
@ -45,7 +53,7 @@ class GerritWatcher(object):
self.gerrit = gerrit self.gerrit = gerrit
self.connection_attempts = int(connection_attempts) self.connection_attempts = int(connection_attempts)
self.retry_delay = float(retry_delay) self.retry_delay = float(retry_delay)
self.connected = False self.state = IDLE
def _read(self, fd): def _read(self, fd):
l = fd.readline() l = fd.readline()
@ -122,6 +130,7 @@ class GerritWatcher(object):
"""Consumes events using the given client.""" """Consumes events using the given client."""
stdin, stdout, stderr = client.exec_command("gerrit stream-events") stdin, stdout, stderr = client.exec_command("gerrit stream-events")
self.state = CONSUMING
self._listen(stdout, stderr) self._listen(stdout, stderr)
ret = stdout.channel.recv_exit_status() ret = stdout.channel.recv_exit_status()
@ -132,9 +141,9 @@ class GerritWatcher(object):
" return code %s" % ret) " return code %s" % ret)
def _run(self): def _run(self):
self.connected = False self.state = CONNECTING
client = self._connect() client = self._connect()
self.connected = True self.state = CONNECTED
try: try:
self._consume(client) self._consume(client)
except Exception: except Exception:
@ -145,14 +154,19 @@ class GerritWatcher(object):
client.close() client.close()
except (IOError, paramiko.SSHException): except (IOError, paramiko.SSHException):
self.log.exception("Failure closing broken client") self.log.exception("Failure closing broken client")
self.state = DISCONNECTED
if self.retry_delay > 0: if self.retry_delay > 0:
self.log.info("Delaying consumption retry for %s seconds", self.log.info("Delaying consumption retry for %s seconds",
self.retry_delay) self.retry_delay)
time.sleep(self.retry_delay) time.sleep(self.retry_delay)
def run(self): def run(self):
try:
while True: while True:
self.state = DISCONNECTED
self._run() self._run()
finally:
self.state = DEAD
class Gerrit(object): class Gerrit(object):
@ -171,7 +185,7 @@ class Gerrit(object):
watcher = GerritWatcher(self, watcher = GerritWatcher(self,
connection_attempts=connection_attempts, connection_attempts=connection_attempts,
retry_delay=retry_delay) retry_delay=retry_delay)
self.watcher_thread = threading.Thread(target=watcher.run) self.watcher_thread = watcher
self.watcher_thread.daemon = True self.watcher_thread.daemon = True
self.watcher_thread.start() self.watcher_thread.start()