Source code for repo_people.repo_people

import os
import json
import csv
import logging
import warnings
import dataclasses
import threading
import concurrent.futures
import time
from collections import Counter
from datetime import datetime, timezone
from urllib.parse import urlparse
warnings.filterwarnings("ignore", category=ResourceWarning)
# Suppress PyGithub's verbose backoff messages
logging.getLogger("github.Requester").setLevel(logging.ERROR)
from github import Github, Auth
from typing import Optional, List, Dict, Set
from .users import GitHubUserInfo
from .utils import validate_owner_repo, _is_bot
from . import export

__all__ = ["RepoPeople", "UserDataView"]


class UserDataView(dict):
    """
    A ``dict`` subclass returned by :meth:`RepoPeople.get_users` and
    :meth:`RepoPeople.get_users_async`.

    Supports all standard ``dict`` operations. Additionally, any valid
    user-profile field name can be accessed via dot notation to retrieve
    that field across every collected user::

        user_data = rp.get_users()
        user_data.email_public
        # {"alice": {"email_public": "alice@example.com"}, "bob": {"email_public": ""}, ...}

    Raises :exc:`AttributeError` for names that are not valid profile fields.
    """

    _valid_fields: Optional[frozenset] = None

    @classmethod
    def _get_valid_fields(cls) -> frozenset:
        if cls._valid_fields is None:
            from .users import UserSnapshot
            cls._valid_fields = frozenset(
                f.name for f in dataclasses.fields(UserSnapshot)
            ) | frozenset(["roles"])
        return cls._valid_fields

    @classmethod
    def _clear_valid_fields_cache(cls) -> None:
        """Reset the cached valid-fields set (useful in tests that patch UserSnapshot)."""
        cls._valid_fields = None

    def __getattr__(self, name: str):
        # Avoid intercepting dunder/private names (prevents pickle/copy issues)
        if name.startswith("_"):
            raise AttributeError(name)
        valid = self._get_valid_fields()
        if name in valid:
            return {
                username: {name: record.get(name)}
                for username, record in self.items()
            }
        raise AttributeError(
            f"'UserDataView' object has no attribute {name!r}. "
            f"Valid fields: {sorted(valid)}"
        )


[docs] class RepoPeople: """ Collects and exports all user data for a given GitHub repository. Gathers users across every repo role (contributors, maintainers, stargazers, watchers, issue/PR authors, fork owners, commit authors, dependents), then fetches full GitHub profile details for each unique user via the GitHub API. Basic usage:: rp = RepoPeople("owner", "repo", token="ghp_...") user_data = rp.get_users(export_json=True) """ def __init__( self, owner: str, repo: str, token: Optional[str] = None, outdir: Optional[str] = None, skip_codeowners: bool = False, skip_collaborators: bool = False, ): validate_owner_repo(owner, repo) self.owner = owner self.repo = repo # Store token as a private attribute to reduce accidental exposure # (e.g. in repr(), vars(), or debug logs). self._token = token # All files are stored flat in outputs/ with an owner_repo_ filename prefix self.outdir = outdir or "outputs" self.file_prefix = f"{owner}_{repo}_" self.skip_codeowners = skip_codeowners self.skip_collaborators = skip_collaborators # Initialise GitHub client (authenticated when token is provided) self.gh = Github(auth=Auth.Token(token)) if token else Github() # Fail fast if the token/connection is invalid try: self.gh.get_rate_limit() except Exception as e: raise ConnectionError(f"GitHub connection failed — verify your token. ({e})") from e self.repo_obj = self.gh.get_repo(f"{owner}/{repo}") @property def token(self) -> Optional[str]: """GitHub personal access token (private; store via constructor only).""" return self._token def __repr__(self) -> str: return ( f"RepoPeople(owner={self.owner!r}, repo={self.repo!r}, " f"outdir={self.outdir!r}, valid_roles={len(self.VALID_ROLES)})" ) def _print_rate_limit_status(self, context: str = "") -> None: """Print the current GitHub rate-limit window when available.""" try: remaining, total_limit = self.gh.rate_limiting reset_epoch = self.gh.rate_limiting_resettime reset_in = max(0, int((reset_epoch - time.time()) / 60)) auth_state = "authenticated" if self._token else "unauthenticated" prefix = f"{context} " if context else "" print( f"{prefix}Rate limit: {remaining}/{total_limit} remaining, " f"resets in {reset_in}m ({auth_state})" ) except Exception: pass # ------------------------------------------------------------------ # Step 1 - collect usernames from every repo role # ------------------------------------------------------------------ # All valid role keys that can be passed to the roles parameter VALID_ROLES: Set[str] = { "contributors", "maintainers", "stargazers", "watchers", "issue_authors", "pr_authors", "fork_owners", "commit_authors", "dependents", }
[docs] def collect_all_usernames( self, roles: Optional[List[str]] = None, ) -> Dict[str, List[str]]: """ Fetch usernames from each repo role and return them grouped by role. Returns a dict with keys: contributors, maintainers, stargazers, watchers, issue_authors, pr_authors, fork_owners, commit_authors, dependents. Each value is a list of GitHub login strings. If roles is provided, only the specified roles are collected. """ # Validate any explicitly requested roles if roles is not None: invalid = set(roles) - self.VALID_ROLES if invalid: raise ValueError(f"Invalid role(s): {invalid}. Valid roles: {self.VALID_ROLES}") # Map each role name to a callable that fetches it role_fetchers = { "contributors": lambda: export.export_contributors( self.owner, self.repo, self.token, self.outdir, return_data=True ), "maintainers": lambda: export.export_maintainers( self.owner, self.repo, self.token, self.outdir, self.skip_codeowners, self.skip_collaborators, return_data=True ), "stargazers": lambda: export.export_stargazers( self.owner, self.repo, self.token, self.outdir, return_data=True ), "watchers": lambda: export.export_watchers( self.owner, self.repo, self.token, self.outdir, return_data=True ), "issue_authors": lambda: export.export_issue_authors( self.owner, self.repo, self.token, self.outdir, return_data=True ), "pr_authors": lambda: export.export_pr_authors( self.owner, self.repo, self.token, self.outdir, return_data=True ), "fork_owners": lambda: export.export_fork_owners( self.owner, self.repo, self.token, self.outdir, return_data=True ), "commit_authors": lambda: export.export_commit_authors( self.owner, self.repo, self.token, self.outdir, return_data=True ), "dependents": lambda: export.export_dependents( self.owner, self.repo, self.outdir, return_data=True ), } # Only fetch the requested roles (lazy — avoids unnecessary API calls) active_roles = roles if roles is not None else list(role_fetchers) results: Dict[str, List[str]] = {} def _fetch_role(role: str) -> tuple: return role, role_fetchers[role]() with concurrent.futures.ThreadPoolExecutor(max_workers=min(len(active_roles), 9)) as executor: futures = {executor.submit(_fetch_role, role): role for role in active_roles} for future in concurrent.futures.as_completed(futures): role, data = future.result() results[role] = data # Return in the same order as active_roles for deterministic output return {role: results[role] for role in active_roles}
# ------------------------------------------------------------------ # Step 2 - fetch full GitHub profile for each unique user # ------------------------------------------------------------------
[docs] def get_user_details( self, usernames: List[str], save_each_iteration: bool = False, limit: Optional[int] = None, exclude: Optional[List[str]] = None, exclude_bots: bool = False, resume: bool = False, verbose: bool = True, include_social_accounts: bool = False, workers: int = 1, ) -> Dict[str, dict]: """ Fetch full GitHub profile details for each username via the GitHub API. Returns a dict keyed by login containing all available user fields (profile info, counters, orgs, computed metrics, etc.). Users that cannot be fetched are skipped with a warning. If save_each_iteration is True, user_details.json is updated after every 10 successful fetches so progress is preserved if the process is interrupted (batched to reduce I/O overhead). If limit is set, only the first N usernames are fetched. Note: usernames are sorted alphabetically before any limit is applied, so results are deterministic. If exclude is provided, those logins are skipped. If exclude_bots is True, logins ending in '[bot]' or '-bot' are skipped. If resume is True, any logins already present in user_details.json are skipped. If verbose is False, per-user fetch messages are suppressed. If include_social_accounts is True, an extra REST call fetches each user's linked social accounts (LinkedIn, Mastodon, YouTube, npm, etc.). workers controls the number of concurrent fetches (default 1 = sequential). Maximum supported value is 32; higher values are capped with a warning. """ save_path = os.path.join(self.outdir, f"{self.file_prefix}user_details.json") # Load existing data from disk when resuming if resume and os.path.isfile(save_path): with open(save_path, "r", encoding="utf-8") as f: user_data = json.load(f) print(f" Resuming — {len(user_data)} users already fetched, skipping them.") else: user_data = {} # Build the exclusion set (already-fetched logins + explicit excludes) exclude_set: Set[str] = set(user_data.keys()) if exclude: exclude_set.update(exclude) # Filter, apply bot exclusion, then apply limit filtered = [ login for login in usernames if login not in exclude_set and not (exclude_bots and login.endswith("[bot]")) ] filtered = filtered[:limit] if limit is not None else filtered if save_each_iteration or resume: os.makedirs(self.outdir, exist_ok=True) # Cap workers to a safe upper bound to prevent connection pool exhaustion _MAX_WORKERS = 32 if workers > _MAX_WORKERS: warnings.warn( f"workers={workers} exceeds the maximum of {_MAX_WORKERS}; capping at {_MAX_WORKERS}.", UserWarning, stacklevel=2, ) workers = _MAX_WORKERS total = len(filtered) completed = 0 failed: List[str] = [] lock = threading.Lock() def _fetch_one(login: str) -> dict: if verbose: print(f" Fetching: {login}") info = GitHubUserInfo(self.gh, username=login) return info.to_dict(include_social_accounts=include_social_accounts) with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(_fetch_one, login): login for login in filtered} for future in concurrent.futures.as_completed(futures): login = futures[future] try: data = future.result() # Skip bots identified by profile flag in addition to login suffix if exclude_bots and data.get("is_bot"): pass # Only store records with a valid login elif data.get("login"): with lock: user_data[data["login"]] = data # Persist progress in batches of 10 to reduce I/O overhead if save_each_iteration and len(user_data) % 10 == 0: with open(save_path, "w", encoding="utf-8") as f: json.dump(user_data, f, indent=2, ensure_ascii=False, default=str) except Exception as e: print(f" [WARNING] Could not fetch data for {login}: {e}") with lock: failed.append(login) completed += 1 # Print rate-limit status every 50 users and at the end # Read from PyGithub's in-memory cache (populated by the last API # response) so we don't burn an extra API call per progress update. if completed % 50 == 0 or completed == total: try: remaining, total_limit = self.gh.rate_limiting reset_epoch = self.gh.rate_limiting_resettime reset_in = max(0, int((reset_epoch - time.time()) / 60)) print( f" [Progress: {completed}/{total} | " f"Rate limit: {remaining}/{total_limit} remaining, " f"resets in {reset_in}m]" ) except Exception: pass # Print summary of any users that could not be fetched if failed: print(f" Skipped {len(failed)} user(s): {failed}") # Final flush — write whatever was collected that didn't hit a batch boundary if save_each_iteration and user_data: with open(save_path, "w", encoding="utf-8") as f: json.dump(user_data, f, indent=2, ensure_ascii=False, default=str) return user_data
# ------------------------------------------------------------------ # Step 3 - export to file # ------------------------------------------------------------------
[docs] def export_to_json( self, user_data: Dict[str, dict], filename: Optional[str] = None, lines: bool = False, ) -> str: """Write user data dict to a JSON file in outdir. Returns the output path. Parameters ---------- lines: When ``True``, writes one JSON object per line (JSON Lines / JSONL format) instead of a single pretty-printed JSON object. Useful for streaming large datasets to downstream tools. The output filename will end in ``.jsonl`` instead of ``.json`` unless *filename* is explicitly set. """ if lines and filename is None: filename = f"{self.file_prefix}user_details.jsonl" else: filename = filename or f"{self.file_prefix}user_details.json" os.makedirs(self.outdir, exist_ok=True) path = os.path.join(self.outdir, filename) with open(path, "w", encoding="utf-8") as f: if lines: for record in user_data.values(): f.write(json.dumps(record, ensure_ascii=False, default=str) + "\n") else: json.dump(user_data, f, indent=2, ensure_ascii=False, default=str) return path
[docs] def export_to_csv( self, user_data: Dict[str, dict], filename: Optional[str] = None, ) -> str: """ Write flattened user data to a CSV file in outdir. List/tuple fields are serialised as semicolon-separated strings. Returns the output path, or an empty string if user_data is empty. """ if not user_data: return "" filename = filename or f"{self.file_prefix}user_details.csv" os.makedirs(self.outdir, exist_ok=True) path = os.path.join(self.outdir, filename) # Derive column names from the first record fields = list(next(iter(user_data.values())).keys()) with open(path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore") writer.writeheader() for record in user_data.values(): # Flatten list/tuple values to semicolon-separated strings row = { k: (";".join(str(x) for x in v) if isinstance(v, (list, tuple)) else v) for k, v in record.items() } writer.writerow(row) return path
[docs] def export_to_markdown( self, user_data: Dict[str, dict], filename: Optional[str] = None, fields: Optional[List[str]] = None, ) -> str: """ Write user data as a Markdown table to a file in outdir. Defaults to a concise set of columns; pass fields to override. Returns the output path, or an empty string if user_data is empty. """ if not user_data: return "" filename = filename or f"{self.file_prefix}user_details.md" # Default columns for a readable summary table default_fields = ["login", "name", "location", "company", "followers", "public_repos", "html_url"] cols = fields if fields is not None else default_fields os.makedirs(self.outdir, exist_ok=True) path = os.path.join(self.outdir, filename) with open(path, "w", encoding="utf-8") as f: # Header row f.write("| " + " | ".join(cols) + " |\n") f.write("| " + " | ".join(["---"] * len(cols)) + " |\n") for record in user_data.values(): # Escape pipe characters inside cell values row = [str(record.get(c, "") or "").replace("|", "\\|") for c in cols] f.write("| " + " | ".join(row) + " |\n") return path
[docs] def print_markdown( self, user_data: Dict[str, dict], fields: Optional[List[str]] = None, ) -> None: """ Print a Markdown table of user data to stdout. Produces the same table format as :meth:`export_to_markdown` but writes to stdout instead of a file. Useful for quick inspection in a terminal or notebook. Does nothing when user_data is empty. """ if not user_data: return default_fields = ["login", "name", "location", "company", "followers", "public_repos", "html_url"] cols = fields if fields is not None else default_fields print("| " + " | ".join(cols) + " |") print("| " + " | ".join(["---"] * len(cols)) + " |") for record in user_data.values(): row = [str(record.get(c, "") or "").replace("|", "\\|") for c in cols] print("| " + " | ".join(row) + " |")
# ------------------------------------------------------------------ # Analysis helpers # ------------------------------------------------------------------
[docs] def summarise(self, user_data: Dict[str, dict], top_n: int = 5) -> dict: """ Print and return a summary breakdown of the fetched user data. Covers: total users, bot vs human split, top locations, top companies, and account age distribution (by quartile). Pass top_n to control how many top locations/companies are shown. """ users = list(user_data.values()) total = len(users) if not total: print("No user data to summarise.") return {} # Bot vs human bots = sum(1 for u in users if u.get("is_bot")) humans = total - bots # Top locations (skip empty) locations = Counter( u.get("location_normalized") or u.get("location") for u in users if u.get("location_normalized") or u.get("location") ) # Top companies (skip empty) companies = Counter( u.get("company_normalized") or u.get("company") for u in users if u.get("company_normalized") or u.get("company") ) # Account age distribution — split into four rough bands ages = sorted( [u.get("account_age_days", 0) for u in users if isinstance(u.get("account_age_days"), (int, float))] ) def _band(days: int) -> str: if days < 365: return "< 1 year" if days < 1825: return "1–5 years" if days < 3650: return "5–10 years" return "> 10 years" age_bands = Counter(_band(d) for d in ages) summary = { "total": total, "humans": humans, "bots": bots, "top_locations": locations.most_common(top_n), "top_companies": companies.most_common(top_n), "account_age_distribution": dict(age_bands), } # Role distribution — count how many users appear under each role role_distribution: Dict[str, int] = {} for u in users: for role in (u.get("roles") or []): role_distribution[role] = role_distribution.get(role, 0) + 1 summary["role_distribution"] = role_distribution # Print formatted summary print(f"\n=== User Summary: {self.owner}/{self.repo} ===") print(f" Total users : {total}") print(f" Humans : {humans}") print(f" Bots : {bots}") print(f"\n Top {top_n} locations:") for loc, count in summary["top_locations"]: print(f" {loc}: {count}") print(f"\n Top {top_n} companies:") for co, count in summary["top_companies"]: print(f" {co}: {count}") print("\n Account age distribution:") for band in ["< 1 year", "1–5 years", "5–10 years", "> 10 years"]: print(f" {band}: {age_bands.get(band, 0)}") if role_distribution: print("\n Role distribution:") for role, count in sorted(role_distribution.items()): print(f" {role}: {count}") print() return summary
[docs] def top_users( self, user_data: Dict[str, dict], n: int = 10, by: str = "followers", ) -> List[dict]: """ Return the top N users ranked by a numeric profile field. Common values for 'by': followers, public_repos, account_age_days, following, public_gists, total_public_stars_sampled. Users missing the field are ranked last. """ ranked = sorted( user_data.values(), key=lambda u: (u.get(by) or 0), reverse=True, ) return ranked[:n]
[docs] def compare( self, other: "RepoPeople", user_data_self: Dict[str, dict], user_data_other: Dict[str, dict], ) -> Dict[str, object]: """ Compare user populations between this repo and another ``RepoPeople`` instance. Returns a dict with three keys: - ``"only_in_self"`` — logins present in this repo but not the other. - ``"only_in_other"`` — logins present in the other repo but not this one. - ``"in_both"`` — logins that appear in both repos. Example:: rp_a = RepoPeople("owner", "repo-a", token="ghp_...") rp_b = RepoPeople("owner", "repo-b", token="ghp_...") data_a = rp_a.get_users() data_b = rp_b.get_users() diff = rp_a.compare(rp_b, data_a, data_b) print(diff["in_both"]) """ logins_self = set(user_data_self.keys()) logins_other = set(user_data_other.keys()) return { "only_in_self": sorted(logins_self - logins_other), "only_in_other": sorted(logins_other - logins_self), "in_both": sorted(logins_self & logins_other), }
[docs] def get_users( self, export: bool = False, export_csv: bool = False, save_each_iteration: bool = False, limit: Optional[int] = None, roles: Optional[List[str]] = None, exclude: Optional[List[str]] = None, exclude_bots: bool = False, resume: bool = False, verbose: bool = True, fields: Optional[List[str]] = None, include_social_accounts: bool = False, workers: int = 1, ) -> UserDataView: """ Full pipeline: collect all repo usernames -> fetch user details -> export. Steps: 1. Collect usernames from every repo role (contributors, stargazers, ...). 2. Deduplicate across all roles. 3. Fetch the full GitHub profile for each unique user. 4. Optionally export to user_details.json / user_details.csv inside outdir. Parameters: export -- save results to user_details.json when True. export_csv -- save results to user_details.csv when True. save_each_iteration -- write user_details.json after every successful fetch. limit -- stop after fetching this many user profiles. roles -- only collect users from these role categories (e.g. ["contributors", "stargazers"]). exclude -- list of logins to skip entirely. exclude_bots -- skip logins ending in '[bot]' and profiles with is_bot=True. resume -- load existing user_details.json and skip already-fetched users. verbose -- print a line for each user being fetched. fields -- if set, only these attributes are kept per user in the output (e.g. ["login", "type", "updated_at"]). include_social_accounts -- fetch each user's linked social accounts (LinkedIn, Mastodon, YouTube, npm, …). Costs one extra API call per user. workers -- number of concurrent fetch threads (default 1 = sequential). Returns a dict keyed by GitHub login with full user profile data. Each record always includes a "roles" key listing the role(s) the user appeared under, regardless of the fields parameter. """ # Validate fields against UserSnapshot before any network calls if fields is not None: from .users import UserSnapshot valid_fields = {f.name for f in dataclasses.fields(UserSnapshot)} if isinstance(fields, str): fields = [fields] invalid = [f for f in fields if f not in valid_fields] if invalid: raise ValueError( f"Invalid field(s): {invalid}. " f"Valid fields are: {sorted(valid_fields)}" ) # Validate roles before any network calls if roles is not None: if isinstance(roles, str): roles = [roles] invalid_roles = [r for r in roles if r not in self.VALID_ROLES] if invalid_roles: raise ValueError( f"Invalid role(s): {invalid_roles}. " f"Valid roles are: {sorted(self.VALID_ROLES)}" ) # Step 1: collect usernames from the requested roles print(f"Collecting users for {self.owner}/{self.repo}...") username_groups = self.collect_all_usernames(roles=roles) # Build a login -> [roles] mapping for output annotation login_roles: Dict[str, List[str]] = {} for role, logins in username_groups.items(): for login in logins: login_roles.setdefault(login, []).append(role) # Deduplicate across all collected roles into a single sorted list all_logins: Set[str] = { login for logins in username_groups.values() for login in logins if login } print(f"Found {len(all_logins)} unique users across all roles.") # Step 2: fetch full GitHub profile for each unique user print("Fetching user details from GitHub API...") self._print_rate_limit_status("Preflight") user_data = self.get_user_details( sorted(all_logins), save_each_iteration=save_each_iteration, limit=limit, exclude=exclude, exclude_bots=exclude_bots, resume=resume, verbose=verbose, include_social_accounts=include_social_accounts, workers=workers, ) print(f"Retrieved profile data for {len(user_data)} users.") # Restrict each record to the requested subset of fields if fields: user_data = { login: {k: v for k, v in record.items() if k in fields} for login, record in user_data.items() } # Annotate each record with the roles the user appeared under for login, record in user_data.items(): record["roles"] = sorted(login_roles.get(login, [])) # Step 3: export to file(s) os.makedirs(self.outdir, exist_ok=True) if export: path = self.export_to_json(user_data) print(f"Exported to: {path}") if export_csv: path = self.export_to_csv(user_data) print(f"Exported to: {path}") return UserDataView(user_data)
# ------------------------------------------------------------------ # Async API (asyncio + aiohttp) # ------------------------------------------------------------------
[docs] async def get_user_details_async( self, usernames: List[str], save_each_iteration: bool = False, limit: Optional[int] = None, exclude: Optional[List[str]] = None, exclude_bots: bool = False, resume: bool = False, verbose: bool = True, concurrency: int = 10, ) -> Dict[str, dict]: """ Async version of get_user_details using aiohttp. Fetches raw user profiles directly from the GitHub REST API (GET /users/{login}) using an asyncio.Semaphore to cap simultaneous connections. Supports the same filtering params as the sync path. Parameters: usernames -- list of GitHub logins to fetch. save_each_iteration -- persist user_details.json after each fetch. limit -- cap the number of profiles fetched. exclude -- logins to skip. exclude_bots -- skip logins ending in '[bot]'. resume -- skip logins already in user_details.json. verbose -- print a line per fetched user. concurrency -- max simultaneous aiohttp requests (default 10). Returns a dict keyed by login with profile data dicts. """ import aiohttp import asyncio save_path = os.path.join(self.outdir, f"{self.file_prefix}user_details.json") # Load existing data when resuming if resume and os.path.isfile(save_path): with open(save_path, "r", encoding="utf-8") as f: user_data: Dict[str, dict] = json.load(f) print(f" Resuming — {len(user_data)} users already fetched, skipping them.") else: user_data = {} # Build exclusion set from already-fetched and explicit excludes exclude_set: Set[str] = set(user_data.keys()) if exclude: exclude_set.update(exclude) # Filter, strip bots by login suffix, apply limit filtered = [ login for login in usernames if login not in exclude_set and not (exclude_bots and login.endswith("[bot]")) ] filtered = filtered[:limit] if limit is not None else filtered if save_each_iteration or resume: os.makedirs(self.outdir, exist_ok=True) # Build auth headers for raw REST calls headers = { "Accept": "application/vnd.github+json", "User-Agent": "repo-people/async", } if self.token: headers["Authorization"] = f"Bearer {self.token}" sem = asyncio.Semaphore(concurrency) failed: List[str] = [] lock = asyncio.Lock() async def _fetch_one(session: aiohttp.ClientSession, login: str) -> None: async with sem: if verbose: print(f" Fetching: {login}") # Helper: GET a URL and return parsed JSON, or None on non-200 async def _get_json(url: str, params=None): async with session.get(url, headers=headers, params=params) as r: return await r.json() if r.status == 200 else None base_url = f"https://api.github.com/users/{login}" try: # Fetch base profile, orgs, latest public event, and owned repos concurrently raw, orgs_data, events_data, repos_data = await asyncio.gather( _get_json(base_url), _get_json(f"{base_url}/orgs", {"per_page": 100}), _get_json(f"{base_url}/events/public", {"per_page": 1}), _get_json(f"{base_url}/repos", {"per_page": 50, "type": "owner"}), ) if raw is None: raise ValueError("HTTP error fetching base profile") except Exception as e: print(f" [WARNING] Could not fetch data for {login}: {e}") async with lock: failed.append(login) return # Skip bot accounts flagged by profile type or login pattern if exclude_bots and _is_bot(login, raw.get("type", "")): return # --- Derived string fields (no extra calls needed) --- email = raw.get("email") or "" email_domain = email.split("@", 1)[1].lower() if "@" in email else "" blog = raw.get("blog") or "" blog_host = (urlparse(blog).hostname or "").lower() if blog else "" company = raw.get("company") or "" company_normalized = company.strip() if company_normalized.startswith("@"): company_normalized = company_normalized[1:] location = raw.get("location") or "" location_normalized = location.strip().lower() # --- Orgs --- orgs_list = orgs_data if isinstance(orgs_data, list) else [] public_orgs = [o.get("login", "") for o in orgs_list if o.get("login")] # --- Last public event (for recently_active, matching sync path) --- events_list = events_data if isinstance(events_data, list) else [] last_public_event_at = events_list[0].get("created_at", "") if events_list else "" # --- Repos: top languages + star/fork sums (matches sync default include_langs=True) --- repos_list = repos_data if isinstance(repos_data, list) else [] lang_counts: Dict[str, int] = {} total_stars = 0 total_forks = 0 for r in repos_list: lang = r.get("language") if lang: lang_counts[lang] = lang_counts.get(lang, 0) + 1 total_stars += r.get("stargazers_count", 0) total_forks += r.get("forks_count", 0) top_languages = sorted(lang_counts.items(), key=lambda x: x[1], reverse=True)[:3] # --- Computed date/ratio metrics --- created_str = raw.get("created_at", "") or "" updated_str = raw.get("updated_at", "") or "" account_age_days = 0 repos_per_year = 0.0 if created_str: try: created_dt = datetime.fromisoformat(created_str.replace("Z", "+00:00")) account_age_days = (datetime.now(timezone.utc) - created_dt).days repos_per_year = round( raw.get("public_repos", 0) / max(account_age_days / 365, 1), 2 ) except ValueError: pass followers = raw.get("followers", 0) or 0 following = raw.get("following", 0) or 0 followers_following_ratio = round( followers / following if following else float(followers), 2 ) # recently_active uses last_public_event_at (same signal as sync path) recently_active = False if last_public_event_at: try: ev_dt = datetime.fromisoformat(last_public_event_at.replace("Z", "+00:00")) recently_active = (datetime.now(timezone.utc) - ev_dt).days <= 90 except ValueError: pass # --- Assemble record matching GitHubUserInfo.to_dict() field set --- record = { "login": raw.get("login", ""), "id": raw.get("id"), "node_id": raw.get("node_id", ""), "type": raw.get("type", ""), "name": raw.get("name") or "", "company": company, "location": location, "email_public": email, "email_domain": email_domain, "blog": blog, "blog_host": blog_host, "twitter": raw.get("twitter_username") or "", "bio": raw.get("bio") or "", "avatar_url": raw.get("avatar_url", ""), "html_url": raw.get("html_url", ""), "hireable": raw.get("hireable"), "site_admin": raw.get("site_admin", False), "created_at": created_str, "updated_at": updated_str, "followers": followers, "following": following, "public_repos": raw.get("public_repos", 0), "public_gists": raw.get("public_gists", 0), "public_orgs": public_orgs, "orgs_public_count": len(public_orgs), # is_bot: matches sync path in users.py (type, [bot] suffix, -bot suffix) "is_bot": _is_bot(raw.get("login", login), raw.get("type", "")), "last_public_event_at": last_public_event_at, "has_public_email": bool(email), "has_blog": bool(blog), "has_twitter": bool(raw.get("twitter_username")), "company_normalized": company_normalized, "location_normalized": location_normalized, "account_age_days": account_age_days, "followers_following_ratio": followers_following_ratio, "repos_per_year": repos_per_year, "recently_active": recently_active, "top_languages": top_languages, "total_public_stars_sampled": total_stars, "total_public_forks_sampled": total_forks, # Optional fields not populated in async path (match sync defaults) "ssh_keys_count": None, "gpg_keys_count": None, "starred_repos_sampled": None, "social_accounts": None, "is_collaborator": None, "permission_on_repo": None, } if record.get("login"): async with lock: user_data[record["login"]] = record if save_each_iteration: with open(save_path, "w", encoding="utf-8") as f: json.dump(user_data, f, indent=2, ensure_ascii=False, default=str) async with aiohttp.ClientSession() as session: try: await asyncio.gather(*[_fetch_one(session, login) for login in filtered]) except Exception as e: print(f" [ERROR] Unexpected error during async fetch: {e}") if failed: print(f" Skipped {len(failed)} user(s): {failed}") return user_data
[docs] async def get_users_async( self, export: bool = False, export_csv: bool = False, save_each_iteration: bool = False, limit: Optional[int] = None, roles: Optional[List[str]] = None, exclude: Optional[List[str]] = None, exclude_bots: bool = False, resume: bool = False, verbose: bool = True, fields: Optional[List[str]] = None, concurrency: int = 10, ) -> UserDataView: """ Async version of get_users. Collects usernames synchronously (same as get_users), then fetches all profiles concurrently via aiohttp. Accepts the same parameters as get_users except workers is replaced by concurrency. Parameters: export -- save results to user_details.json. export_csv -- save results to user_details.csv. save_each_iteration -- persist after every fetch. limit -- cap the number of profiles fetched. roles -- restrict which role categories are collected. exclude -- logins to skip entirely. exclude_bots -- skip bot accounts. resume -- skip logins already in user_details.json. verbose -- print per-user progress. fields -- restrict which fields appear in the output dict. concurrency -- max simultaneous aiohttp connections (default 10). Returns a dict keyed by GitHub login with profile data, including a 'roles' key on every record. """ # Validate fields before any network calls if fields is not None: from .users import UserSnapshot valid_fields = {f.name for f in dataclasses.fields(UserSnapshot)} if isinstance(fields, str): fields = [fields] invalid = [f for f in fields if f not in valid_fields] if invalid: raise ValueError( f"Invalid field(s): {invalid}. " f"Valid fields are: {sorted(valid_fields)}" ) # Validate roles before any network calls if roles is not None: if isinstance(roles, str): roles = [roles] invalid_roles = [r for r in roles if r not in self.VALID_ROLES] if invalid_roles: raise ValueError( f"Invalid role(s): {invalid_roles}. " f"Valid roles are: {sorted(self.VALID_ROLES)}" ) # Step 1: collect usernames synchronously (no async needed here) print(f"Collecting users for {self.owner}/{self.repo}...") username_groups = self.collect_all_usernames(roles=roles) # Build login -> [roles] mapping for output annotation login_roles: Dict[str, List[str]] = {} for role, logins in username_groups.items(): for login in logins: login_roles.setdefault(login, []).append(role) all_logins: Set[str] = { login for logins in username_groups.values() for login in logins if login } print(f"Found {len(all_logins)} unique users across all roles.") # Step 2: fetch profiles asynchronously print("Fetching user details from GitHub API (async)...") user_data = await self.get_user_details_async( sorted(all_logins), save_each_iteration=save_each_iteration, limit=limit, exclude=exclude, exclude_bots=exclude_bots, resume=resume, verbose=verbose, concurrency=concurrency, ) print(f"Retrieved profile data for {len(user_data)} users.") # Restrict to requested field subset if fields: user_data = { login: {k: v for k, v in record.items() if k in fields} for login, record in user_data.items() } # Annotate every record with the roles the user appeared under for login, record in user_data.items(): record["roles"] = sorted(login_roles.get(login, [])) # Step 3: export os.makedirs(self.outdir, exist_ok=True) if export: path = self.export_to_json(user_data) print(f"Exported to: {path}") if export_csv: path = self.export_to_csv(user_data) print(f"Exported to: {path}") return UserDataView(user_data)