Source code for pykern.pkcli.github_orgmode

# -*- coding: utf-8 -*-
"""convert to/from orgmode

:copyright: Copyright (c) 2022 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdlog, pkdp
import pykern.pkcli
import pykern.pkcli.github
import pykern.pkcollections
import pykern.pkio
import re


_PROPERTIES = (
    # order matters, and underscore is a not a GitHub API name
    "assignees",
    "_repo",
    "created_at",
    "html_url",
    "milestone",
    "number",
    "user",
)


[docs] def assignee_issues(user, org_d="~/org"): """Export issues for auth user to orgmode file named ``org_d/user.org`` Args: user (str): user name to use org_d (str): where to store org files [~/org] Returns: str: Name of org file created """ return str(_OrgModeGen(user=user, org_d=org_d).from_issues())
[docs] def from_issues(*repos, org_d="~/org"): """Export issues to orgmode file named ``org_d/repo.org`` Args: repos (str): will add radiasoft/ if missing org_d (str): where to store org files [~/org] Returns: str: Name of org file created """ return str(_OrgModeGen(repos=repos, org_d=org_d).from_issues())
[docs] def test_data(repo, path): """Used to generate the unit test data""" from pykern import pkjson if len(repo): r = pykern.pkcli.github.GitHub().repo_arg(repo) res = _dict(r) else: r = pykern.pkcli.github.GitHub().login() res = PKDict() res._issues = sorted( [_dict(i) for i in r.issues(state="open")], key=lambda x: x.number, ) pkjson.dump_pretty(res, filename=path)
[docs] def to_issues(org_path, dry_run=False): """Import (existing) issues from ``org_d/repo.org`` Args: org_path (str): org mode file dry_run (bool): whether to update issues or not Returns: str: updates made """ return _OrgModeParser(org_path=org_path).to_issues(dry_run)
class _Base: _NON_PROPERTIES = ("body", "labels", "title") _ATTRS = tuple(k for k in _PROPERTIES + _NON_PROPERTIES if not k.startswith("_")) _COMMENT_TAG = ":_separator_:" def __init__(self, **kwargs): self._github = pykern.pkcli.github.GitHub() def _issue_as_dict(self, issue): def _str(item): if isinstance(item, list): return " ".join(sorted(_str(i) for i in item)) if isinstance(item, str): return item if item is None: return "" if isinstance(item, int): return str(item) if isinstance(item, dict): for k in "name", "login", "title": if k in item: return item[k] raise AssertionError(f"unknown type={type(item)} item={item}") i = _dict(issue) return PKDict({k: _str(i[k]) for k in self._ATTRS if k in i}) def _iter_issues(self, issues): for i in issues: if not i.pull_request(): yield i def _open_issues(self, repo): """Ignores pull requests""" return self._iter_issues(repo.issues(state="open")) class _OrgModeGen(_Base): _TITLE = re.compile(r"^(\d{4})-?(\d\d)-?(\d\d)\s*(.*)") # strict for now _HTML_URL = re.compile(r"https://github.com/([\.\w-]+/[\.\w-]+)/issues/\d+$") _NO_DEADLINES_MARK = "ISSUES DO NOT HAVE DEADLINES AFTER THIS " + _Base._COMMENT_TAG _CFG = "#+STARTUP: showeverything\n#+COLUMNS: %13DEADLINE %50ITEM %number(Num) %15assignees %TAGS\n" def __init__(self, **kwargs): super().__init__(**kwargs) self._org_d = pykern.pkio.py_path(kwargs["org_d"]) if "repos" in kwargs: if not kwargs["repos"]: self._error("no repos supplied") if len(kwargs["repos"]) == 1: r = self._github.repo_arg(kwargs["repos"][0]) b = f"{r.organization['login']}-{r.name}" self._issues = self._open_issues(r) else: b = "issues" self._issues = [] for r in kwargs["repos"]: self._issues.extend(self._open_issues(self._github.repo_arg(r))) elif "user" in kwargs: b = kwargs["user"] self._issues = self._iter_issues( ( i.issue for i in self._github.login().search_issues( query=f"assignee:{b} state:open" ) ), ) else: raise AssertionError(f"kwargs={kwargs} invalid") self._org_path = self._org_d.join(f"{b}.org") def from_issues(self): self._no_deadlines = None return self._write(self._CFG + "".join(self._issue(i) for i in self._sorted())) def _error(self, msg): pykern.pkcli.command_error("{} org_d={}", msg, self._org_d) def _issue(self, issue): def _deadline(): d = issue.get("_deadline") if not d: return "" return f"DEADLINE: <{d}>\n" def _drawer(name, body): return f":{name}:\n{body}:END:\n" def _properties(): return _drawer( "PROPERTIES", "".join(_property(k) for k in _PROPERTIES), ) def _property(name): res = f":{name}:" if len(issue[name]) > 0: res += f" {issue[name]}" return res + "\n" def _tags(): if not issue.labels: return "" return " :" + issue.labels.replace(" ", ":") + ":" def _title(): res = "" if self._no_deadlines is None and issue.get("_deadline") is None: self._no_deadlines = True res = f"* {self._NO_DEADLINES_MARK}\n" return f"{res}* {issue.title or ''}{_tags()}\n" return _title() + pykern.pkcli.github.GitHub.indent2( _deadline() + _properties() + _drawer("BODY", pykern.pkcli.github.GitHub.issue_body(issue)), ) def _sorted(self): repo_name_cache = PKDict() def _dict(issue): res = self._issue_as_dict(issue) m = self._TITLE.search(res.title) if m: k = res._deadline = f"{m.group(1)}-{m.group(2)}-{m.group(3)}" res.title = m.group(4) else: k = "9999-01-01" # created_at makes deterministic res._key = f"{k} {res.number:>010} {res.created_at}" res._repo = _repo_name(res.html_url) return res def _repo_name(url): return repo_name_cache.pksetdefault(url, lambda: _repo_name_calc(url))[url] def _repo_name_calc(url): m = self._HTML_URL.search(url) if not m: raise ValueError(f"html_url={url} invalid format") return m.group(1) return sorted([_dict(i) for i in self._issues], key=lambda x: x._key) def _write(self, text): return pykern.pkio.write_text(self._org_path, text) class _OrgModeParser(_Base): _ARRAY_ATTRS = ("assignees", "labels") _DEADLINE = re.compile(r"^\s*DEADLINE:\s*<(\d{4})-(\d\d)-(\d\d)") _HEADING = re.compile(r"^\*+\s*(.*)") #: orgmode indent is always 2; POSIT indent2() indents by 2 _INDENT = 2 _PROPERTY = re.compile(r"^:(\w+):\s*(.*)") _TAGS = re.compile(r"^(.+)\s+(:(?:\S+:)+)\s*$") def __init__(self, org_path): super().__init__() self._org_path = pykern.pkio.py_path(org_path) def to_issues(self, dry_run): self._dry_run = dry_run self._lines = pykern.pkio.read_text(self._org_path).splitlines() self._parse() return self._update() def _add_issue(self, issue): i = self._repos.setdefault(issue._repo, PKDict()) if issue.number in i: self._error( f"number={issue.number} duplicated in prev={i[issue.number]} curr={issue}", ) i[issue.number] = issue def _body(self, issue): issue.body = "\n".join(self._drawer("BODY")) if len(issue.body): issue.body += "\n" def _deadline(self, issue): l = self._next("DEADLINE:") m = self._DEADLINE.search(l) if not m: # Optional self._lines.insert(0, l) return issue.title = f"{m.group(1)}{m.group(2)}{m.group(3)} {issue.title}" def _drawer(self, name): def _line(key): x = f":{key}:" l = self._next(x)[self._INDENT :] if x == l: return None if key != "END": self._error(f"expect={x} but got line={l}") return l _line(name) res = [] while True: l = _line("END") if l is None: break res.append(l) return res def _error(self, msg): pykern.pkcli.command_error("{} path={}", msg, self._org_path) def _next(self, expect=None): while self._lines: res = self._lines.pop(0) if res.startswith("#"): continue return res if expect is None: return None self._error(f"expect={expect} but got EOF") def _parse(self): self._repos = PKDict() while True: l = self._next(None) if l is None: return m = self._HEADING.search(l) if not m: continue if self._COMMENT_TAG in m.group(1): continue self._add_issue(self._parse_issue(l, m.group(1))) def _parse_issue(self, line, title): res = PKDict() self._title(res, title) self._deadline(res) self._properties(res) self._body(res) return res def _properties(self, issue): for l in self._drawer("PROPERTIES"): m = self._PROPERTY.search(l) if not m: self._error(f"expected :property: value but got line={l}") issue[m.group(1)] = m.group(2).strip() def _title(self, issue, line): m = self._TAGS.search(line) if m: issue.title = m.group(1) issue.labels = " ".join( sorted(t for t in m.group(2).split(":") if len(t) > 0), ) else: issue.title = line issue.labels = "" def _update(self): res = PKDict() for k in sorted(self._repos): e = self._update_repo(self._github.repo_arg(k), self._repos[k]) if e: res[k] = e return res def _update_repo(self, repo, issues): def _fix_milestone(edits): m = edits.get("milestone") if m is None: return edits res = edits.copy() try: v = self._github.milestone(repo, m) except KeyError: v = repo.create_milestone(m).number res.milestone = v return res def _edits(base, update): res = PKDict() for k in "assignees", "body", "labels", "milestone", "title": if base[k] != update[k]: res[k] = ( sorted(update[k].split()) if k in self._ARRAY_ATTRS else update[k] ) if k == "labels": y = set(res[k]) - set(x.name for x in repo.labels()) if y: # labels will be created automatically, which is unlikely # something we want, since labels have colors and we # have a tool to create labels. raise ValueError(f"non-existent labels={y}") return res res = PKDict() # only update issues that are still open for i in self._open_issues(repo): try: u = issues.get(str(i.number)) if not u: continue e = None e = _edits(self._issue_as_dict(i), u) if e and not self._dry_run: i.edit(**_fix_milestone(e)) if e: res[i.number] = e except Exception: pkdlog("edits={} error in issue={}", e, f"{repo}#{i.number}") raise return res def _dict(model): return pykern.pkcollections.canonicalize(model.as_dict())