Merge "Serve all paused handlers before unpausing"
This commit is contained in:
commit
d73d4da72c
@ -72,7 +72,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
self.pool_name = pool_name
|
||||
self.running = False
|
||||
self.stop_event = threading.Event()
|
||||
self.paused_handler = None
|
||||
self.paused_handlers = set()
|
||||
self.request_handlers = []
|
||||
self.watermark_sleep = nodepool.watermark_sleep
|
||||
self.zk = self.getZK()
|
||||
@ -219,7 +219,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
rh = pm.getRequestHandler(self, req)
|
||||
reasons_to_decline = rh.getDeclinedReasons()
|
||||
|
||||
if self.paused_handler and not reasons_to_decline:
|
||||
if self.paused_handlers and not reasons_to_decline:
|
||||
self.log.debug("Handler is paused, deferring request")
|
||||
continue
|
||||
|
||||
@ -275,7 +275,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
label_quota[node_type] -= 1
|
||||
|
||||
if rh.paused:
|
||||
self.paused_handler = rh
|
||||
self.paused_handlers.add(rh)
|
||||
self.request_handlers.append(rh)
|
||||
|
||||
# if we exceeded the timeout stop iterating here
|
||||
@ -295,7 +295,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
if not r.poll():
|
||||
active_handlers.append(r)
|
||||
if r.paused:
|
||||
self.paused_handler = r
|
||||
self.paused_handlers.add(r)
|
||||
else:
|
||||
log.debug("Removing request handler")
|
||||
except kze.SessionExpiredError:
|
||||
@ -435,17 +435,19 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
self.updateTenantLimits(
|
||||
self.nodepool.config.tenant_resource_limits)
|
||||
|
||||
if not self.paused_handler:
|
||||
self.component_info.paused = False
|
||||
else:
|
||||
if self.paused_handlers:
|
||||
self.component_info.paused = True
|
||||
# If we are paused, one request handler could not
|
||||
# If we are paused, some request handlers could not
|
||||
# satisfy its assigned request, so give it
|
||||
# another shot. Unpause ourselves if it completed.
|
||||
self.paused_handler.run()
|
||||
if not self.paused_handler.paused:
|
||||
self.paused_handler = None
|
||||
self.component_info.paused = False
|
||||
# another shot. Unpause ourselves if all are completed.
|
||||
for rh in sorted(self.paused_handlers,
|
||||
key=lambda h: h.request.priority):
|
||||
rh.run()
|
||||
if not rh.paused:
|
||||
self.paused_handlers.remove(rh)
|
||||
|
||||
if not self.paused_handlers:
|
||||
self.component_info.paused = False
|
||||
|
||||
# Regardless of whether we are paused, run
|
||||
# assignHandlers. It will only accept requests if we
|
||||
@ -463,8 +465,9 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
self.stop_event.wait(self.watermark_sleep)
|
||||
|
||||
# Cleanup on exit
|
||||
if self.paused_handler:
|
||||
self.paused_handler.unlockNodeSet(clear_allocation=True)
|
||||
if self.paused_handlers:
|
||||
for rh in self.paused_handlers:
|
||||
rh.unlockNodeSet(clear_allocation=True)
|
||||
|
||||
def stop(self):
|
||||
'''
|
||||
|
@ -271,7 +271,7 @@ class TestDriverAws(tests.DBTestCase):
|
||||
# second request.
|
||||
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
|
||||
for _ in iterate_timeout(30, Exception, 'paused handler'):
|
||||
if pool_worker[0].paused_handler:
|
||||
if pool_worker[0].paused_handlers:
|
||||
break
|
||||
|
||||
# Release the first node so that the second can be fulfilled.
|
||||
@ -326,7 +326,7 @@ class TestDriverAws(tests.DBTestCase):
|
||||
# second request.
|
||||
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
|
||||
for _ in iterate_timeout(30, Exception, 'paused handler'):
|
||||
if pool_worker[0].paused_handler:
|
||||
if pool_worker[0].paused_handlers:
|
||||
break
|
||||
|
||||
# Release the first node so that the second can be fulfilled.
|
||||
|
@ -290,7 +290,7 @@ class TestDriverKubernetes(tests.DBTestCase):
|
||||
if pause:
|
||||
# The previous request should pause the handler
|
||||
pool_worker = pool.getPoolWorkers('kubespray')
|
||||
while not pool_worker[0].paused_handler:
|
||||
while not pool_worker[0].paused_handlers:
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
self.waitForNodeRequest(max_req, (zk.REQUESTED,))
|
||||
|
@ -235,7 +235,7 @@ class TestLauncher(tests.DBTestCase):
|
||||
# Wait until there is a paused request handler and check if there
|
||||
# are exactly two servers
|
||||
pool_worker = pool.getPoolWorkers('fake-provider')
|
||||
while not pool_worker[0].paused_handler:
|
||||
while not pool_worker[0].paused_handlers:
|
||||
time.sleep(0.1)
|
||||
self.assertEqual(len(client._server_list), 2)
|
||||
|
||||
@ -512,7 +512,7 @@ class TestLauncher(tests.DBTestCase):
|
||||
self.zk.storeNodeRequest(req2)
|
||||
|
||||
pool_worker = pool.getPoolWorkers('fake-provider')
|
||||
while not pool_worker[0].paused_handler:
|
||||
while not pool_worker[0].paused_handlers:
|
||||
time.sleep(0.1)
|
||||
|
||||
# The handler is paused now and the request should be in state PENDING
|
||||
@ -2080,7 +2080,7 @@ class TestLauncher(tests.DBTestCase):
|
||||
# causing request handling to pause.
|
||||
self.waitForNodeRequest(req, (zk.PENDING,))
|
||||
pool_worker = pool.getPoolWorkers('fake-provider')
|
||||
while not pool_worker[0].paused_handler:
|
||||
while not pool_worker[0].paused_handlers:
|
||||
time.sleep(0.1)
|
||||
self.assertTrue(mock_invalidatequotacache.called)
|
||||
|
||||
@ -2218,11 +2218,12 @@ class TestLauncher(tests.DBTestCase):
|
||||
|
||||
# Force an exception within the run handler.
|
||||
pool_worker = pool.getPoolWorkers('fake-provider')
|
||||
while not pool_worker[0].paused_handler:
|
||||
while not pool_worker[0].paused_handlers:
|
||||
time.sleep(0.1)
|
||||
pool_worker[0].paused_handler.hasProviderQuota = mock.Mock(
|
||||
side_effect=Exception('mock exception')
|
||||
)
|
||||
for rh in pool_worker[0].paused_handlers:
|
||||
rh.hasProviderQuota = mock.Mock(
|
||||
side_effect=Exception('mock exception')
|
||||
)
|
||||
|
||||
# The above exception should cause us to fail the paused request.
|
||||
req2 = self.waitForNodeRequest(req2, (zk.FAILED,))
|
||||
@ -2230,7 +2231,7 @@ class TestLauncher(tests.DBTestCase):
|
||||
|
||||
# The exception handling should make sure that we unpause AND remove
|
||||
# the request handler.
|
||||
while pool_worker[0].paused_handler:
|
||||
while pool_worker[0].paused_handlers:
|
||||
time.sleep(0.1)
|
||||
self.assertEqual(0, len(pool_worker[0].request_handlers))
|
||||
|
||||
@ -2321,7 +2322,7 @@ class TestLauncher(tests.DBTestCase):
|
||||
self.zk.storeNodeRequest(req2)
|
||||
|
||||
pool_worker = pool.getPoolWorkers('fake-provider')
|
||||
while not pool_worker[0].paused_handler:
|
||||
while not pool_worker[0].paused_handlers:
|
||||
time.sleep(0.1)
|
||||
|
||||
# The handler is paused now and the request should be in state PENDING
|
||||
@ -2413,6 +2414,60 @@ class TestLauncher(tests.DBTestCase):
|
||||
req = self.waitForNodeRequest(req)
|
||||
self.assertEqual(req.state, zk.FULFILLED)
|
||||
|
||||
def test_multiple_paused_requests(self):
|
||||
"""Test that multiple paused requests are fulfilled in order."""
|
||||
max_instances = 0
|
||||
|
||||
def fake_get_quota():
|
||||
nonlocal max_instances
|
||||
return (100, max_instances, 1000000)
|
||||
|
||||
self.useFixture(fixtures.MockPatchObject(
|
||||
fakeprovider.FakeProvider.fake_cloud, '_get_quota',
|
||||
fake_get_quota
|
||||
))
|
||||
|
||||
req1 = zk.NodeRequest()
|
||||
req1.state = zk.REQUESTED
|
||||
req1.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req1)
|
||||
|
||||
req2 = zk.NodeRequest()
|
||||
req2.state = zk.REQUESTED
|
||||
req2.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req2)
|
||||
|
||||
configfile = self.setup_config('ignore_provider_quota_true.yaml')
|
||||
self.useBuilder(configfile)
|
||||
self.waitForImage('fake-provider', 'fake-image')
|
||||
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
|
||||
self.waitForAnyNodeInState(zk.ABORTED)
|
||||
|
||||
pool_worker = pool.getPoolWorkers('fake-provider')
|
||||
while not len(pool_worker[0].paused_handlers) == 2:
|
||||
time.sleep(0.1)
|
||||
|
||||
# Bump up the quota to allow the provider to allocate a node
|
||||
max_instances = 1
|
||||
req1 = self.waitForNodeRequest(req1)
|
||||
self.assertEqual(req1.state, zk.FULFILLED)
|
||||
|
||||
req2 = self.waitForNodeRequest(req2, zk.PENDING)
|
||||
|
||||
# Release the node allocated to the first request
|
||||
req1_node = self.zk.getNode(req1.nodes[0])
|
||||
self.zk.lockNode(req1_node, blocking=False)
|
||||
|
||||
req1_node.state = zk.USED
|
||||
self.zk.storeNode(req1_node)
|
||||
self.zk.unlockNode(req1_node)
|
||||
self.waitForNodeDeletion(req1_node)
|
||||
|
||||
self.waitForNodeRequest(req2, zk.FULFILLED)
|
||||
|
||||
def test_request_order(self):
|
||||
"""Test that requests are handled in sorted order"""
|
||||
configfile = self.setup_config('node_no_min_ready.yaml')
|
||||
|
Loading…
x
Reference in New Issue
Block a user