Source code for repo_stats.git_metrics

import ast
import contextlib
import subprocess
import time
from collections import Counter, defaultdict
from datetime import UTC, datetime
from pathlib import Path

import numpy as np
import requests

from repo_stats.utilities import fill_missed_months, update_cache


[docs] class GitMetrics: def __init__(self, token, repo_owner, repo_name, cache_dir) -> None: """ Class for getting and processing repository data (commit history, issues, pull requests, contributors) from GitHub for a given repository. Arguments --------- token : str Authorization token for GitHub queries repo_owner : str Owner (or organization) of repository on GitHub repo_name : str Name of repository on GitHub cache_dir : str, default=None Path to directory that will be populated with caches of git data """ self.token = token self.repo_owner = repo_owner self.repo_name = repo_name self.cache_dir = cache_dir
[docs] def get_age(self, date): """ Get the 'datetime' age of a string 'date'. Arguments --------- date : str Dates with assumed string format "2024-01-01..." Returns ------- age : 'datetime.timedelta' instance or int Age of the item (int if 'days_since' is True) """ if date is None: return -1 now = datetime.now(UTC) date_utc = datetime.strptime(date[:10], "%Y-%m-%d").replace(tzinfo=UTC) return now - date_utc
[docs] def parse_log_line(self, line): """ Break an individual 'git log' line 'line' into its component parts (commit hash, date, author). Arguments --------- line : str Dates with assumed string format "2024-01-01..." Returns ------- parsed : list of str The commit's hash, date, author """ line = line.split(",") return [line[ii].lstrip('"').rstrip('"\n') for ii in range(3)]
[docs] def get_commits(self): """ Obtain the commit history for a repository with 'git log', and parse the output. Returns ------- all_items : list of dict A dictionary entry for each commit in the history, including the identifiers below in 'query' """ print("\nCollecting git commit history") cache_file = f"{self.cache_dir}/{self.repo_name}_commits.txt" if not Path(cache_file).exists(): Path(cache_file).open("w").close() with Path(cache_file).open() as f: old_items = f.readlines() print(f" {len(old_items)} commits found in cache at {cache_file}") # NOTE: 'after' here differs from GitMetrics.get_issues_prs, as does its type in 'query' below ('String' vs. 'String!') - see https://github.com/orgs/community/discussions/24443 # convert string to dict and get relevant key after = None if old_items == [] else ast.literal_eval(old_items[-1].rstrip("\n"))["endCursor"] # For query syntax, see https://docs.github.com/en/graphql/reference/objects#repository # and https://docs.github.com/en/graphql/reference/objects#commit # and https://docs.github.com/en/graphql/reference/objects#gitactor # and https://docs.github.com/en/graphql/guides/using-pagination-in-the-graphql-api # To quickly test a query, try https://docs.github.com/en/graphql/overview/explorer query = """ query($owner: String!, $name: String!, $after: String) { repository(name: $name, owner: $owner) { ref(qualifiedName: "main") { target { ... on Commit { history(first: 100, after: $after) { pageInfo { hasNextPage endCursor } edges { node { oid authoredDate author { name email user { databaseId } } } } } } } } } } """ headers = {"Authorization": f"token {self.token}"} # with 'after', traverse through items (issues or PRs) from oldest to newest variables = { "owner": self.repo_owner, "name": self.repo_name, "after": after, } # must traverse through pages of items has_next_page = True new_items = [] items_retrieved = 0 while has_next_page is True: response = requests.post( "https://api.github.com/graphql", json={"query": query, "variables": variables}, headers=headers, timeout=180, ) if response.status_code == 200: result = response.json() try: result["data"] except KeyError: print(f"Query syntax is likely wrong. Response to query: {result}") raise items_retrieved += len(result["data"]["repository"]["ref"]["target"]["history"]["edges"]) time_to_reset = datetime.fromtimestamp( int(response.headers["X-RateLimit-Reset"]) - time.time(), tz=UTC, ).strftime("%M:%S") if len(result["data"]["repository"]["ref"]["target"]["history"]["edges"]) > 0: print( f"\r Retrieved {items_retrieved} new commits (rate limit used: {response.headers['X-RateLimit-Used']} of {response.headers['X-RateLimit-Limit']} - resets in {time_to_reset})", end="", flush=True, ) # store ID of the chronologically newest commit on current page, used to later reference newest item in cache result["data"]["repository"]["ref"]["target"]["history"]["edges"][-1]["endCursor"] = result["data"][ "repository" ]["ref"]["target"]["history"]["pageInfo"]["endCursor"] new_items.extend(result["data"]["repository"]["ref"]["target"]["history"]["edges"]) has_next_page = result["data"]["repository"]["ref"]["target"]["history"]["pageInfo"]["hasNextPage"] variables["after"] = result["data"]["repository"]["ref"]["target"]["history"]["pageInfo"]["endCursor"] else: msg = f"Query failed -- return code {response.status_code}" raise Exception(msg) # prevent last flush, without printing new line print(end="") return update_cache(cache_file, old_items, new_items)
[docs] def get_commits_via_git_log(self, repo_local_path): """ Obtain the commit history for a repository with 'git log' and a local copy of the repository; and parse the output. Arguments --------- repo_local_path : str Path to local copy of repository Returns ------- dates : list of str Date of each commit author_commits : dict Keys are the authors and the value is a list of the commits they have contributed """ print("\nCollecting git commit history") git_log = subprocess.run( args=f'git -C {repo_local_path} log --use-mailmap --date=iso-local --format="%H","%as","%aN"'.split(), stdout=subprocess.PIPE, # preserve non-English letters in names text=True, check=False, ) dates = [] authors_with_commits = defaultdict(list) for line in git_log.stdout.splitlines(): commit, date, author = self.parse_log_line(line) dates.append(date) authors_with_commits[author].append(commit) if len(dates) == 0: msg = f"0 commits found for repository in git log. Check that 'repo_dir' {repo_local_path} in the .json parameter file is correct." raise RuntimeError(msg) return dates, authors_with_commits
[docs] def process_commits(self, results, age_recent=90): """ Process (obtain statistics for) git commit data. Arguments --------- results : list of dict A dictionary entry for each commit in the history (see `Git_metrics.get_commits`) age_recent : int, default=90 Days before present used to categorize recent commit statistics Returns ------- stats : dict Commit statistics: - 'age_recent_commit': the input arg 'age_recent' - 'unique_authors': each commit author, their number of commits and index of first commit - 'new_authors': list of authors with their first commit in 'age_recent' - 'n_recent_authors': number of authors with commits in 'age_recent' - 'authors_per_month': number of commit authors per month, over time - 'new_authors_per_month': number of new commit authors per month, over time - 'multi_authors_per_month': number of commit authors per month with >1 commit that month, over time """ dates, authors, user_ids = [], [], [] bots = [ "dependabot[bot]", "github-actions", "github-actions[bot]", "meeseeksmachine", "odidev", "pre-commit-ci[bot]", "codetriage-readme-bot", "unknown", ] for ii in results: if ii["node"]["author"]["name"] not in bots: dates.append(ii["node"]["authoredDate"]) authors.append(ii["node"]["author"]["name"]) # some authors have None in 'user' field try: user_ids.append(ii["node"]["author"]["user"]["databaseId"]) except TypeError: user_ids.append(ii["node"]["author"]["name"]) print(f" {len(dates)} total commits") # assuming we don't have a .mailmap file to connect unique authors to multiple versions of their name and/or emails, # use their GitHub IDs (likely won't catch all variations) unique_ids = np.unique(user_ids, return_index=True, return_counts=True) # IDs that occur in commit history at least twice unique_ids_repeat = [x for i, x in enumerate(unique_ids[0]) if unique_ids[2][i] > 1] # set author name for all instances of the ID repeat to their name at last (most recent) instance for i in unique_ids_repeat: idxs = np.where(np.array(user_ids) == i)[0] for j in idxs: authors[j] = authors[idxs[-1]] commits_for_each_author = Counter(authors) dates_strip_day = [d[:7] for d in dates] zipped = list(zip(dates_strip_day, authors, strict=False)) unique_month_author_pairs = np.unique(zipped, axis=0, return_counts=True) # number of unique commit authors per month authors_per_month = np.unique([x[0] for x in unique_month_author_pairs[0]], axis=0, return_counts=True) # possible that not every month has commits, # so insert months without commits and 0 for their number of authors authors_per_month = fill_missed_months(authors_per_month) # number of authors per month with >1 commit that month multi_authors_per_month = np.unique( [x[0] for i, x in enumerate(unique_month_author_pairs[0]) if unique_month_author_pairs[1][i] > 1], axis=0, return_counts=True, ) multi_authors_per_month = fill_missed_months(multi_authors_per_month) # '*last*' and '*first*' variables assume the git log is in reverse chronological order unique_authors_last_commit = np.unique(authors, return_index=True, return_counts=True) unique_authors_first_commit = np.unique(authors[::-1], return_index=True, return_counts=True) # last and first commit dates per author date_last_commit = [dates[i] for i in unique_authors_last_commit[1]] date_first_commit = [dates[::-1][i] for i in unique_authors_first_commit[1]] # number of new authors per month new_authors_per_month = np.unique([x[:7] for x in date_first_commit], return_counts=True) new_authors_per_month = fill_missed_months(new_authors_per_month) n_recent_authors, new_authors = 0, [] for ii, jj in enumerate(unique_authors_last_commit[0]): last_commit_age = self.get_age(date_last_commit[ii]) first_commit_age = self.get_age(date_first_commit[ii]) if last_commit_age.days <= age_recent: n_recent_authors += 1 # authors with their first commit(s) in this period if first_commit_age.days <= age_recent: new_authors.append(str(jj)) return { "age_recent_commit": age_recent, "unique_authors": unique_authors_first_commit, "new_authors": new_authors, "n_recent_authors": n_recent_authors, "authors_per_month": authors_per_month, "new_authors_per_month": new_authors_per_month, "multi_authors_per_month": multi_authors_per_month, "commits_for_each_author": commits_for_each_author, }
[docs] def get_issues_prs(self, item_type): """ Obtain the issue or pull request history for a GitHub repository by querying the GraphQL API. Arguments --------- item_type : str One of ['issues', 'pullRequests'] to obtain the corresponding history Returns ------- all_items : list of dict A dictionary entry for each issue or pull request in the history, including the identifiers below in 'query' """ print(f"\nCollecting GitHub {item_type} history") supported_items = ["issues", "pullRequests"] if item_type not in supported_items: msg = f"item_type {item_type} invalid; must be one of {supported_items}" raise ValueError(msg) cache_file = f"{self.cache_dir}/{self.repo_name}_{item_type}.txt" if not Path(cache_file).exists(): Path(cache_file).open("w").close() with Path(cache_file).open() as f: old_items = f.readlines() print(f" {len(old_items)} {item_type} found in cache at {cache_file}") after = "" if old_items == [] else ast.literal_eval(old_items[-1].rstrip("\n"))["endCursor"] # For query syntax, see https://docs.github.com/en/graphql/reference/objects#repository # and https://docs.github.com/en/graphql/reference/objects#issue # and https://docs.github.com/en/graphql/reference/objects#pullrequest # and https://docs.github.com/en/graphql/guides/using-pagination-in-the-graphql-api # To quickly test a query, try https://docs.github.com/en/graphql/overview/explorer query = ( """ query($owner: String!, $name: String!, $after: String!) { repository(owner: $owner, name: $name) { """ + item_type + """(first: 100, after: $after) { totalCount pageInfo { hasNextPage endCursor } edges { node { number state createdAt updatedAt closedAt labels(first: 25) { edges { node { name } } } } } } } } """ ) headers = {"Authorization": f"token {self.token}"} # with 'after', traverse through items (issues or PRs) from oldest to newest variables = { "owner": self.repo_owner, "name": self.repo_name, "after": after, } # must traverse through pages of items has_next_page = True new_items = [] items_retrieved = 0 while has_next_page is True: response = requests.post( "https://api.github.com/graphql", json={"query": query, "variables": variables}, headers=headers, timeout=180, ) if response.status_code == 200: result = response.json() try: result["data"] except KeyError: print(f"Query syntax is likely wrong. Response to query: {result}") raise items_retrieved += len(result["data"]["repository"][item_type]["edges"]) items_total = result["data"]["repository"][item_type]["totalCount"] time_to_reset = datetime.fromtimestamp( int(response.headers["X-RateLimit-Reset"]) - time.time(), tz=UTC, ).strftime("%M:%S") if len(result["data"]["repository"][item_type]["edges"]) > 0: print( f"\r Retrieved {items_retrieved} new of {items_total} total {item_type} (rate limit used: {response.headers['X-RateLimit-Used']} of {response.headers['X-RateLimit-Limit']} - resets in {time_to_reset})", end="", flush=True, ) # store ID of the chronologically newest item (issue or PR) on current page, used to later reference newest item in cache result["data"]["repository"][item_type]["edges"][-1]["endCursor"] = result["data"]["repository"][ item_type ]["pageInfo"]["endCursor"] new_items.extend(result["data"]["repository"][item_type]["edges"]) has_next_page = result["data"]["repository"][item_type]["pageInfo"]["hasNextPage"] variables["after"] = result["data"]["repository"][item_type]["pageInfo"]["endCursor"] else: msg = f"Query failed -- return code {response.status_code}" raise Exception(msg) # prevent last flush, without printing new line print(end="") return update_cache(cache_file, old_items, new_items)
[docs] def process_issues_prs(self, results, items, labels, age_recent=90): """ Process (obtain statistics for) and aggregate issue and pull request data in 'results'. Arguments --------- results : list of dict A dictionary entry for each issue or pull request in the history (see `git_metrics.get_issues_prs`) items : list of str Names for the dictionary entries in the return 'issues_prs' labels : list of str GitHub labels (those added to an issue or pull request) to obtain additional statistics age_recent : int, default=90 Days before present used to categorize recent issue and pull request statistics Returns ------- issues_prs : list of dict Statistics for issues and separately for pull requests: - 'age_recent': the input arg 'age_recent' - 'recent_open': number of items (issues or pull requests) opened in 'age_recent' - 'recent_close': number of items closed in 'age_recent' - 'open_per_month': number of items opened per month, over time - 'close_per_month': number of items closed per month, over time - 'label_open': the input arg 'labels' and the number of currently open items with each label """ issues_prs = {} for hh, ii in enumerate(results): recent_open, recent_close, date_open, date_close = 0, 0, [], [] label_open_items = np.zeros(len(labels)) for jj in ii: # store dates as year-month e.g. '2024-01' date_open.append(jj["node"]["createdAt"][:7]) if jj["node"]["state"] != "OPEN": date_close.append(jj["node"]["closedAt"][:7]) # store age as days before present created_age = self.get_age(jj["node"]["createdAt"]) if created_age != -1: created_age = created_age.days if created_age <= age_recent: recent_open += 1 closed_age = self.get_age(jj["node"]["closedAt"]) if closed_age != -1: closed_age = closed_age.days if closed_age <= age_recent: recent_close += 1 if jj["node"]["state"] == "OPEN": for kk in jj["node"]["labels"]["edges"]: with contextlib.suppress(ValueError): label_open_items[labels.index(kk["node"]["name"])] += 1 open_per_month = np.unique(date_open, return_counts=True) close_per_month = np.unique(date_close, return_counts=True) # not every month has newly opened/closed issues/PRs, # so insert missed months open_per_month = fill_missed_months(open_per_month) close_per_month = fill_missed_months(close_per_month) issues_prs[items[hh]] = { "age_recent": age_recent, "recent_open": recent_open, "recent_close": recent_close, "open_per_month": open_per_month, "close_per_month": close_per_month, "label_open": dict(zip(labels, label_open_items, strict=False)), } return issues_prs