- Made changes so that we verify each group of exists messages with

the same owner and raw_id separately. This ensures that if there are
  multiple exists notifications for each owner pending verification,
  we send an individual verified notification for each of them.

- Fixed race condition which was causing multiple verified messages to be
  sent by starting the async verify process per owner instead of per image.
This commit is contained in:
Anuj Mathur 2013-10-15 19:20:42 +05:30
parent 6e394705e5
commit ceb7908403
4 changed files with 103 additions and 144 deletions

View File

@ -486,10 +486,19 @@ class ImageExists(models.Model):
self.status = new_status
@staticmethod
def find(ending_max, status):
def find_and_group_by_owner_and_raw_id(ending_max, status):
params = {'audit_period_ending__lte': dt.dt_to_decimal(ending_max),
'status': status}
return ImageExists.objects.select_related().filter(**params).order_by('id')
ordered_exists = ImageExists.objects.select_related().\
filter(**params).order_by('owner')
result = {}
for exist in ordered_exists:
key = "%s-%s" % (exist.owner, exist.raw_id)
if key in result:
result[key].append(exist)
else:
result[key] = [exist]
return result
def mark_verified(self):
self.status = InstanceExists.VERIFIED
@ -501,15 +510,6 @@ class ImageExists(models.Model):
self.fail_reason = reason
self.save()
@staticmethod
def are_all_exists_for_owner_verified(owner, audit_period_beginning,
audit_period_ending):
return ImageExists.objects.filter(
~Q(status=ImageExists.VERIFIED),
audit_period_beginning=audit_period_beginning,
audit_period_ending=audit_period_ending,
owner=owner).count() == 0
def get_model_fields(model):
return model._meta.fields

View File

@ -407,87 +407,99 @@ class GlanceVerifierTestCase(StacktachBaseTestCase):
self.mox.VerifyAll()
def test_verify_should_verify_exists_for_usage_and_delete(self):
exist = self.mox.CreateMockAnything()
exist1 = self.mox.CreateMockAnything()
exist2 = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(glance_verifier, '_verify_for_usage')
glance_verifier._verify_for_usage(exist)
self.mox.StubOutWithMock(glance_verifier, '_verify_for_delete')
glance_verifier._verify_for_delete(exist)
self.mox.StubOutWithMock(glance_verifier, '_verify_validity')
for exist in [exist1, exist2]:
glance_verifier._verify_for_usage(exist)
glance_verifier._verify_for_delete(exist)
glance_verifier._verify_validity(exist)
exist.mark_verified()
self.mox.ReplayAll()
verified, exist = glance_verifier._verify(exist)
verified, exist = glance_verifier._verify([exist1, exist2])
self.mox.VerifyAll()
self.assertTrue(verified)
def test_verify_exist_marks_exist_failed_if_field_mismatch_exception(self):
exist1 = self.mox.CreateMockAnything()
exist2 = self.mox.CreateMockAnything()
def test_verify_exist_marks_exist_as_failed_if_field_mismatch_exception_is_raised(self):
exist = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(glance_verifier, '_verify_for_usage')
self.mox.StubOutWithMock(glance_verifier, '_verify_for_delete')
self.mox.StubOutWithMock(glance_verifier, '_verify_validity')
field_mismatch_exc = FieldMismatch('field', 'expected', 'actual')
glance_verifier._verify_for_usage(exist).AndRaise(exception=field_mismatch_exc)
exist.mark_failed(reason='FieldMismatch')
glance_verifier._verify_for_usage(exist1).AndRaise(
exception=field_mismatch_exc)
exist1.mark_failed(reason='FieldMismatch')
glance_verifier._verify_for_usage(exist2)
glance_verifier._verify_for_delete(exist2)
glance_verifier._verify_validity(exist2)
exist2.mark_verified()
self.mox.ReplayAll()
verified, exist = glance_verifier._verify(exist)
verified, exist = glance_verifier._verify([exist1, exist2])
self.mox.VerifyAll()
self.assertFalse(verified)
def test_verify_for_range_without_callback(self):
when_max = datetime.utcnow()
results = self.mox.CreateMockAnything()
models.ImageExists.PENDING = 'pending'
models.ImageExists.VERIFYING = 'verifying'
models.ImageExists.PENDING = 'pending'
self.mox.StubOutWithMock(models.ImageExists, 'find')
models.ImageExists.find(
ending_max=when_max,
status=models.ImageExists.PENDING).AndReturn(results)
results.count().AndReturn(2)
exist1 = self.mox.CreateMockAnything()
exist2 = self.mox.CreateMockAnything()
results.__getslice__(0, 1000).AndReturn(results)
results.__iter__().AndReturn([exist1, exist2].__iter__())
exist3 = self.mox.CreateMockAnything()
results = {'owner1': [exist1, exist2], 'owner2': [exist3]}
models.ImageExists.find_and_group_by_owner_and_raw_id(
ending_max=when_max,
status=models.ImageExists.PENDING).AndReturn(results)
exist1.save()
exist2.save()
self.pool.apply_async(glance_verifier._verify, args=(exist1,),
callback=None)
self.pool.apply_async(glance_verifier._verify, args=(exist2,),
exist3.save()
self.pool.apply_async(glance_verifier._verify,
args=([exist1, exist2],), callback=None)
self.pool.apply_async(glance_verifier._verify, args=([exist3],),
callback=None)
self.mox.ReplayAll()
self.glance_verifier.verify_for_range(when_max)
self.assertEqual(exist1.status, 'verifying')
self.assertEqual(exist2.status, 'verifying')
self.assertEqual(exist3.status, 'verifying')
self.mox.VerifyAll()
def test_verify_for_range_with_callback(self):
callback = self.mox.CreateMockAnything()
when_max = datetime.utcnow()
results = self.mox.CreateMockAnything()
models.ImageExists.PENDING = 'pending'
models.ImageExists.VERIFYING = 'verifying'
models.ImageExists.find(
ending_max=when_max,
status=models.ImageExists.PENDING).AndReturn(results)
results.count().AndReturn(2)
exist1 = self.mox.CreateMockAnything()
exist2 = self.mox.CreateMockAnything()
results.__getslice__(0, 1000).AndReturn(results)
results.__iter__().AndReturn([exist1, exist2].__iter__())
exist3 = self.mox.CreateMockAnything()
results = {'owner1': [exist1, exist2], 'owner2': [exist3]}
models.ImageExists.find_and_group_by_owner_and_raw_id(
ending_max=when_max,
status=models.ImageExists.PENDING).AndReturn(results)
exist1.save()
exist2.save()
self.pool.apply_async(glance_verifier._verify, args=(exist1,),
exist3.save()
self.pool.apply_async(glance_verifier._verify, args=([exist1, exist2],),
callback=callback)
self.pool.apply_async(glance_verifier._verify, args=(exist2,),
self.pool.apply_async(glance_verifier._verify, args=([exist3],),
callback=callback)
self.mox.ReplayAll()
self.glance_verifier.verify_for_range(
when_max, callback=callback)
self.assertEqual(exist1.status, 'verifying')
self.assertEqual(exist2.status, 'verifying')
self.assertEqual(exist3.status, 'verifying')
self.mox.VerifyAll()
def test_send_verified_notification_routing_keys(self):
@ -507,9 +519,6 @@ class GlanceVerifierTestCase(StacktachBaseTestCase):
exist.audit_period_beginning = datetime(2013, 10, 10)
exist.audit_period_ending = datetime(2013, 10, 10, 23, 59, 59)
exist.owner = "1"
models.ImageExists.are_all_exists_for_owner_verified(
exist.owner, exist.audit_period_beginning,
exist.audit_period_ending).AndReturn(True)
self.mox.StubOutWithMock(uuid, 'uuid4')
uuid.uuid4().AndReturn('some_other_uuid')
self.mox.StubOutWithMock(kombu.pools, 'producers')
@ -550,9 +559,6 @@ class GlanceVerifierTestCase(StacktachBaseTestCase):
exist.audit_period_beginning = datetime(2013, 10, 10)
exist.audit_period_ending = datetime(2013, 10, 10, 23, 59, 59)
exist.owner = "1"
models.ImageExists.are_all_exists_for_owner_verified(
exist.owner, exist.audit_period_beginning,
exist.audit_period_ending).AndReturn(True)
self.mox.StubOutWithMock(kombu.pools, 'producers')
self.mox.StubOutWithMock(kombu.common, 'maybe_declare')
producer = self.mox.CreateMockAnything()
@ -573,29 +579,3 @@ class GlanceVerifierTestCase(StacktachBaseTestCase):
self.glance_verifier.send_verified_notification(exist, exchange,
connection)
self.mox.VerifyAll()
def test_do_not_send_verified_if_all_exists_for_owner_not_verified(self):
connection = self.mox.CreateMockAnything()
exchange = self.mox.CreateMockAnything()
exist = self.mox.CreateMockAnything()
exist.raw = self.mox.CreateMockAnything()
exist_dict = [
'monitor.info',
{
'event_type': 'test',
'message_id': 'some_uuid'
}
]
exist_str = json.dumps(exist_dict)
exist.raw.json = exist_str
exist.audit_period_beginning = datetime(2013, 10, 10)
exist.audit_period_ending = datetime(2013, 10, 10, 23, 59, 59)
exist.owner = "1"
models.ImageExists.are_all_exists_for_owner_verified(
exist.owner, exist.audit_period_beginning,
exist.audit_period_ending).AndReturn(False)
self.mox.ReplayAll()
self.glance_verifier.send_verified_notification(
exist, exchange, connection)
self.mox.VerifyAll()

View File

@ -77,61 +77,40 @@ class ImageExistsTestCase(unittest.TestCase):
def tearDown(self):
self.mox.UnsetStubs()
def test_find_should_return_records_with_date_and_status_in_audit_period(self):
def test_group_exists_with_date_status_in_audit_period_by_owner_rawid(self):
end_max = datetime.utcnow()
status = 'pending'
exist1 = self.mox.CreateMockAnything()
exist1.owner = "owner1"
exist1.raw_id = "1"
exist2 = self.mox.CreateMockAnything()
exist2.owner = "owner2"
exist2.raw_id = "2"
exist3 = self.mox.CreateMockAnything()
exist3.owner = "owner1"
exist3.raw_id = "1"
exist4 = self.mox.CreateMockAnything()
exist4.owner = "owner1"
exist4.raw_id = "3"
ordered_results = [exist1, exist3, exist4, exist2]
unordered_results = self.mox.CreateMockAnything()
expected_results = [1, 2]
related_results = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(ImageExists.objects, 'select_related')
ImageExists.objects.select_related().AndReturn(related_results)
related_results.filter(audit_period_ending__lte=dt.dt_to_decimal(
end_max), status=status).AndReturn(unordered_results)
unordered_results.order_by('id').AndReturn(expected_results)
related_results.filter(
audit_period_ending__lte=dt.dt_to_decimal(end_max),
status=status).AndReturn(unordered_results)
unordered_results.order_by('owner').AndReturn(ordered_results)
self.mox.ReplayAll()
results = ImageExists.find(end_max, status)
results = ImageExists.find_and_group_by_owner_and_raw_id(end_max,
status)
self.mox.VerifyAll()
self.assertEqual(results, [1, 2])
def test_return_true_if_all_exists_for_owner_are_verified(self):
owner = "1"
audit_period_beginning = datetime(2013, 10, 10)
audit_period_ending = datetime(2013, 10, 10, 23, 59, 59)
results = self.mox.CreateMockAnything()
results.count().AndReturn(0)
self.mox.StubOutWithMock(ImageExists.objects, 'filter')
ImageExists.objects.filter(
mox.IgnoreArg(), owner=owner,
audit_period_beginning=audit_period_beginning,
audit_period_ending=audit_period_ending).AndReturn(results)
self.mox.ReplayAll()
self.assertTrue(models.ImageExists.are_all_exists_for_owner_verified(
owner, audit_period_beginning, audit_period_ending))
self.mox.VerifyAll()
def test_return_false_if_all_exists_for_owner_are_verified(self):
owner = "1"
audit_period_beginning = datetime(2013, 10, 10)
audit_period_ending = datetime(2013, 10, 10, 23, 59, 59)
results = self.mox.CreateMockAnything()
results.count().AndReturn(1)
self.mox.StubOutWithMock(ImageExists.objects, 'filter')
ImageExists.objects.filter(
mox.IgnoreArg(), owner=owner,
audit_period_beginning=audit_period_beginning,
audit_period_ending=audit_period_ending).AndReturn(results)
self.mox.ReplayAll()
self.assertFalse(ImageExists.are_all_exists_for_owner_verified(
owner=owner, audit_period_beginning=audit_period_beginning,
audit_period_ending=audit_period_ending))
self.mox.VerifyAll()
self.assertEqual(results, {'owner1-1': [exist1, exist3],
'owner1-3': [exist4],
'owner2-2': [exist2]})
class InstanceExistsTestCase(unittest.TestCase):

View File

@ -121,20 +121,21 @@ def _verify_for_delete(exist, delete=None):
delete.deleted_at)
def _verify(exist):
verified = False
def _verify(exists):
verified = True
for exist in exists:
try:
_verify_for_usage(exist)
_verify_for_delete(exist)
_verify_validity(exist)
verified = True
exist.mark_verified()
except Exception, e:
verified = False
exist.mark_failed(reason=e.__class__.__name__)
LOG.exception("glance: %s" % e)
return verified, exist
return verified, exists[0]
class GlanceVerifier(Verifier):
@ -142,18 +143,21 @@ class GlanceVerifier(Verifier):
super(GlanceVerifier, self).__init__(config, pool=pool)
def verify_for_range(self, ending_max, callback=None):
exists = models.ImageExists.find(
ending_max=ending_max, status=models.ImageExists.PENDING)
count = exists.count()
exists_grouped_by_owner_and_rawid = \
models.ImageExists.find_and_group_by_owner_and_raw_id(
ending_max=ending_max,
status=models.ImageExists.PENDING)
count = len(exists_grouped_by_owner_and_rawid)
added = 0
update_interval = datetime.timedelta(seconds=30)
next_update = datetime.datetime.utcnow() + update_interval
LOG.info("glance: Adding %s exists to queue." % count)
LOG.info("glance: Adding %s per-owner exists to queue." % count)
while added < count:
for exist in exists[0:1000]:
for exists in exists_grouped_by_owner_and_rawid.values():
for exist in exists:
exist.status = models.ImageExists.VERIFYING
exist.save()
result = self.pool.apply_async(_verify, args=(exist,),
result = self.pool.apply_async(_verify, args=(exists,),
callback=callback)
self.results.append(result)
added += 1
@ -166,10 +170,6 @@ class GlanceVerifier(Verifier):
def send_verified_notification(self, exist, connection, exchange,
routing_keys=None):
if not models.ImageExists.are_all_exists_for_owner_verified(
exist.owner, exist.audit_period_beginning,
exist.audit_period_ending):
return
body = exist.raw.json
json_body = json.loads(body)
json_body[1]['event_type'] = self.config.glance_event_type()