diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index a9bbf9a09..383a5e099 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -381,6 +381,11 @@ class RecordProcessor(object): for r in self._process_blueprint(record): yield r + def _renew_record_date(self, record): + record['week'] = utils.timestamp_to_week(record['date']) + if ('release' not in record) or (not record['release']): + record['release'] = self._get_release(record['date']) + def process(self, record_iterator): for record in record_iterator: for r in self._apply_type_based_processing(record): @@ -388,9 +393,7 @@ class RecordProcessor(object): if r['company_name'] == '*robots': continue - r['week'] = utils.timestamp_to_week(r['date']) - if ('release' not in r) or (not r['release']): - r['release'] = self._get_release(r['date']) + self._renew_record_date(r) yield r @@ -421,12 +424,35 @@ class RecordProcessor(object): if need_update: yield record - def _get_records_for_users_to_update(self): - users_reviews = {} + def _update_records_with_user_info(self): + LOG.debug('Update user info in records') + + for record in self.runtime_storage_inst.get_all_records(): + need_update = False + + if record['user_id'] in self.updated_users: + user = utils.load_user(self.runtime_storage_inst, + record['user_id']) + user_company_name = user['companies'][0]['company_name'] + if record['company_name'] != user_company_name: + LOG.debug('Update record %s: company changed to: %s', + record['primary_key'], user_company_name) + record['company_name'] = user_company_name + need_update = True + if record['user_id'] != user['user_id']: + LOG.debug('Update record %s, user id changed to: %s', + record['primary_key'], user['user_id']) + record['user_id'] = user['user_id'] + need_update = True + + if need_update: + yield record + + def _update_records_with_blueprint_mention_info(self): + LOG.debug('Process blueprints and calculate mention info') + valid_blueprints = {} mentioned_blueprints = {} - core_engineers = {} - quarter_ago = int(time.time()) - 60 * 60 * 24 * 30 * 3 # a quarter ago for record in self.runtime_storage_inst.get_all_records(): for bp in record.get('blueprint_id', []): if bp in mentioned_blueprints: @@ -445,14 +471,6 @@ class RecordProcessor(object): 'date': record['date'] } - if record['record_type'] == 'review': - launchpad_id = record['launchpad_id'] - review = {'date': record['date'], 'id': record['id']} - if launchpad_id in users_reviews: - users_reviews[launchpad_id].append(review) - else: - users_reviews[launchpad_id] = [review] - for bp_name, bp in valid_blueprints.iteritems(): if bp_name in mentioned_blueprints: bp['count'] = mentioned_blueprints[bp_name]['count'] @@ -461,34 +479,9 @@ class RecordProcessor(object): bp['count'] = 0 bp['date'] = 0 - reviews_index = {} - for launchpad_id, reviews in users_reviews.iteritems(): - reviews.sort(key=lambda x: x['date']) - review_number = 0 - for review in reviews: - review_number += 1 - review['review_number'] = review_number - reviews_index[review['id']] = review - for record in self.runtime_storage_inst.get_all_records(): - need_update = False - if record['user_id'] in self.updated_users: - user = utils.load_user(self.runtime_storage_inst, - record['user_id']) - user_company_name = user['companies'][0]['company_name'] - if record['company_name'] != user_company_name: - LOG.debug('Update record %s: company changed to: %s', - record['primary_key'], user_company_name) - record['company_name'] = user_company_name - need_update = True - if record['user_id'] != user['user_id']: - LOG.debug('Update record %s, user id changed to: %s', - record['primary_key'], user['user_id']) - record['user_id'] = user['user_id'] - need_update = True - valid_bp = set([]) for bp in record.get('blueprint_id', []): if bp in valid_blueprints: @@ -509,12 +502,45 @@ class RecordProcessor(object): record['primary_key'], bp['count'], bp['date']) need_update = True + if need_update: + yield record + + def _update_records_with_review_number(self): + LOG.debug('Set review number in review records') + + users_reviews = {} + for record in self.runtime_storage_inst.get_all_records(): + if record['record_type'] == 'review': + launchpad_id = record['launchpad_id'] + review = {'date': record['date'], 'id': record['id']} + if launchpad_id in users_reviews: + users_reviews[launchpad_id].append(review) + else: + users_reviews[launchpad_id] = [review] + + reviews_index = {} + for launchpad_id, reviews in users_reviews.iteritems(): + reviews.sort(key=lambda x: x['date']) + review_number = 0 + for review in reviews: + review_number += 1 + review['review_number'] = review_number + reviews_index[review['id']] = review + + for record in self.runtime_storage_inst.get_all_records(): if record['record_type'] == 'review': review = reviews_index[record['id']] if record.get('review_number') != review['review_number']: record['review_number'] = review['review_number'] - need_update = True + yield record + def _determine_core_contributors(self): + LOG.debug('Determine core contributors') + + core_engineers = {} + quarter_ago = int(time.time()) - 60 * 60 * 24 * 30 * 3 # a quarter ago + + for record in self.runtime_storage_inst.get_all_records(): if (record['record_type'] == 'mark' and record['date'] > quarter_ago and record['value'] in [2, -2]): @@ -524,18 +550,15 @@ class RecordProcessor(object): core_engineers[user_id].add(module_branch) else: core_engineers[user_id] = set([module_branch]) - - if need_update: - yield record - for user in self.runtime_storage_inst.get_all_users(): core_old = user.get('core') user['core'] = list(core_engineers.get(user['user_id'], [])) if user['core'] != core_old: utils.store_user(self.runtime_storage_inst, user) - def _marks_with_disagreement(self): + def _update_records_with_disagreement(self): LOG.debug('Process marks to find disagreements') + marks_per_patch = {} for record in self.runtime_storage_inst.get_all_records(): if record['record_type'] == 'mark' and record['type'] == 'CRVW': @@ -576,7 +599,16 @@ class RecordProcessor(object): def finalize(self): self.runtime_storage_inst.set_records( - self._get_records_for_users_to_update()) + self._update_records_with_user_info()) self.runtime_storage_inst.set_records( - self._marks_with_disagreement()) + self._update_records_with_review_number()) + + self.runtime_storage_inst.set_records( + self._update_records_with_blueprint_mention_info()) + + self._determine_core_contributors() + + # disagreement calculation must go after determining core contributors + self.runtime_storage_inst.set_records( + self._update_records_with_disagreement())