Starting to implement main & CLI

Adds tests for date utils and CLI
This commit is contained in:
Yann Weber 2023-11-25 14:36:21 +01:00
commit 0ccf58e5b5
2 changed files with 267 additions and 13 deletions

133
git_oh.py
View file

@ -1,18 +1,47 @@
#!/usr/bin/python3 #!/usr/bin/python3
""" Entrouvert test """ """ Entrouvert test """
import argparse
import calendar
import datetime import datetime
import shutil import shutil
import sys
import tempfile import tempfile
import warnings import warnings
from collections.abc import Sequence from collections.abc import Sequence
from typing import Generator from typing import Generator
import git import git
from git.repo.base import Repo from git.repo.base import Repo
from git.objects.commit import Commit from git.objects.commit import Commit
def main():
""" Main function parse args from CLI and run the program """
args = parse_args()
repo = TempRemoteRepo(args.url, silent=args.silent)
commit_stats = {}
for commit in repo:
author = commit.author
if author not in commit_stats:
commit_stats[author] = {'total': 0, 'off': 0}
commit_stats[author]['total'] += 1
if not in_office_hours(commit.committed_datetime,
starthour=args.daystart,
stophour=args.daystop,
weekend=args.weekend):
print(f"{commit} {commit.author.name:>30s}\
{commit.committed_datetime.strftime('%a %H:%M %Y-%m-%d')}")
commit_stats[author]['off'] += 1
for author, stats in commit_stats.items():
pct_off = (stats['off'] / stats['total'])*100
print(f"{author.name:20s} : {pct_off:3.1f}% off ({stats['off']}/{stats['total']})")
def in_office_hours(moment:datetime.datetime=datetime.datetime.now(), def in_office_hours(moment:datetime.datetime=datetime.datetime.now(),
starthour:datetime.time=datetime.time(8,0,0), starthour:datetime.time=datetime.time(8,0,0),
@ -60,6 +89,16 @@ def iter_commits(repo:Repo)->Generator[Commit, None, None]:
encountered.add(commit.binsha) encountered.add(commit.binsha)
yield commit yield commit
class RemoteFetchProgress(git.RemoteProgress):
""" Report progress for remote fetch """
def update(self, op_code, cur_count, max_count=None, message=""):
""" Display progression on a single line of stderr """
if not max_count:
return
pct = (cur_count / max_count)*100
msg = f"{pct:5.1f}% {message}"
print(f"{msg:<80s}", end="\r", file=sys.stderr)
class TempRemoteRepo(Repo): class TempRemoteRepo(Repo):
""" A temporary repository referencing a remote repository """ A temporary repository referencing a remote repository
@ -67,16 +106,19 @@ class TempRemoteRepo(Repo):
Allows to iterate on all commits without cloning the remote Allows to iterate on all commits without cloning the remote
""" """
def __init__(self, remote_url:str): def __init__(self, remote_url:str, silent=True):
""" Initialize a new empty repository referencing a remote repo """ Initialize a new empty repository referencing a remote repo
Arguments : Arguments :
- remote_url : The url of the remote repo to reference - remote_url : The url of the remote repo to reference
- silent : if False display progress on stderr
""" """
self.temppath = tempfile.mkdtemp(prefix="git_oh_") self.temppath = tempfile.mkdtemp(prefix="git_oh_")
git.Repo.init(self.temppath) git.Repo.init(self.temppath)
super().__init__(self.temppath) super().__init__(self.temppath)
self.create_remote("origin", remote_url).fetch()
remote_progress = None if silent else RemoteFetchProgress()
self.create_remote("origin", remote_url).fetch(progress=remote_progress)
def __del__(self): def __del__(self):
shutil.rmtree(self.temppath) shutil.rmtree(self.temppath)
@ -84,3 +126,88 @@ class TempRemoteRepo(Repo):
def __iter__(self)->Generator[Commit, None, None]: def __iter__(self)->Generator[Commit, None, None]:
return iter_commits(self) return iter_commits(self)
def valid_day(value:str) -> int:
""" Take a locale dayname and convert it to a day number
Valid days are calendar.day_name, calendar.day_abbr and integers
in [0..6]
Arguments :
- value : the value to convert
Returns an int in [0..6]
"""
value = value.lower()
try:
return [day.lower() for day in calendar.day_name].index(value)
except ValueError:
pass
try:
return [day.lower() for day in calendar.day_abbr].index(value)
except ValueError:
pass
try:
val = int(value)
if val not in range(7):
raise ValueError()
return val
except ValueError:
pass
days = [str(daynum) for daynum in range(7)]
days += [day.lower() for day in calendar.day_abbr]
days += [day.lower() for day in calendar.day_name]
valid_days = ', '.join(days)
raise ValueError(f"Invalid day {value!r}. Valid days are : {valid_days}")
def parse_args(argv:list[str]=None):
""" Argument parser for CLI
Exits printing help if invalid arguments given, else returns
arguments in a named tuple.
"""
parser = argparse.ArgumentParser(description="""Program description
With fancy multiline explanations
""")
parser.add_argument("url", help="Git repository URL")
parser.add_argument("-d", "--daystart", type=datetime.time.fromisoformat,
help="Day start time (ISO 8601)")
parser.add_argument("-D", "--daystop", type=datetime.time.fromisoformat,
help="Day stop time (ISO 8601)")
parser.add_argument("-w", "--weekend", type=str, default="5,6",
help="Indicate days off separated by ','. Locale day names or \
days number (0..6 starting with monday) can be used. By default saturday \
and sunday are off. Use special value -w NUL to indicate no days off.")
parser.add_argument("-s", "--silent", action="store_true", default=False,
help="Do not display fetch progression on stderr")
args = parser.parse_args(argv)
# parse weekend days list
if args.weekend == 'NUL':
args.weekend = []
else:
weekend = []
for day in args.weekend.split(','):
try:
weekend.append(valid_day(day.strip()))
except ValueError as expt:
print(expt, file=sys.stderr)
parser.print_help()
sys.exit(1)
args.weekend = list(set(weekend))
return args
if __name__ == "__main__":
main()

147
test.py
View file

@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" Tests for git_oh script """ """ Tests for git_oh script """
import calendar
import datetime import datetime
import gc import gc
import os import os
@ -74,7 +75,7 @@ class TestGitIterations(unittest.TestCase):
self.assertEqual(commits, found_commits) self.assertEqual(commits, found_commits)
def test_remote(self): def test_temp_remote_repo(self):
""" Testing TempRemoteRepo class commit fetch __iter__ method """ """ Testing TempRemoteRepo class commit fetch __iter__ method """
commits = {self.git_commit_mod(author=self.actors[0]).hexsha commits = {self.git_commit_mod(author=self.actors[0]).hexsha
for _ in range(10)} for _ in range(10)}
@ -85,30 +86,35 @@ class TestGitIterations(unittest.TestCase):
self.assertEqual(commits, found_commits) self.assertEqual(commits, found_commits)
def test_remote_cleanup(self): def test_temp_remote_repo_cleanup(self):
""" Testing TempRemoteRepo class cleanup """ """ Testing TempRemoteRepo class cleanup """
self.git_commit_mod(author=self.actors[0])
repo = git_oh.TempRemoteRepo(f"file://{self.repo_path:s}") repo = git_oh.TempRemoteRepo(f"file://{self.repo_path:s}")
tmppath = repo.temppath tmppath = repo.temppath
_ = list(repo)
self.assertTrue(os.path.isdir(tmppath)) self.assertTrue(os.path.isdir(tmppath))
# ref to commits should prevent gc to call repo.__del__()
commits = list(repo)
del repo del repo
gc.collect() # asking gc to call repo.__del__() gc.collect()
self.assertTrue(os.path.isdir(tmppath))
del commits
gc.collect() # not more refs, gc should have called repo.__del__()
self.assertFalse(os.path.isdir(tmppath)) self.assertFalse(os.path.isdir(tmppath))
def test_remote_branches(self): def test_temp_remote_repo_branches(self):
""" Testing TempRemoteRepo branch commit fetch """ """ Testing TempRemoteRepo branch commit fetch """
commits = [] commits = []
for i in range(5): for i in range(5):
commits += [self.git_commit_mod(author=self.actors[0]) commits += [self.git_commit_mod(author=self.actors[0])
for _ in range(10)] for _ in range(10)]
self.repo.git.checkout("HEAD", b=f"branch-{i:d}") self.repo.git.checkout("HEAD", b=f"branch-{i:d}")
repo_url = f"file://{self.repo_path:s}"
repo = git_oh.TempRemoteRepo(f"file://{self.repo_path:s}") repo = git_oh.TempRemoteRepo(repo_url)
found_commits = {commit.hexsha for commit in repo} found_commits = {commit.hexsha for commit in repo}
commits = {commit.hexsha for commit in commits} commits = {commit.hexsha for commit in commits}
@ -132,5 +138,126 @@ class TestGitIterations(unittest.TestCase):
return self.repo.index.commit(msg, **commit_kwargs) return self.repo.index.commit(msg, **commit_kwargs)
class TestTimeUtils(unittest.TestCase):
""" Testing in_office_hours() filter and valid_day() argument validator """
def test_valid_days(self):
""" Test day argument validator/convertion """
for i, day in enumerate(calendar.day_name):
self.assertEqual(i, git_oh.valid_day(day))
for i, day in enumerate(calendar.day_abbr):
self.assertEqual(i, git_oh.valid_day(day))
for i in range(7):
with self.subTest(day=i):
self.assertEqual(i, git_oh.valid_day(str(i)))
def test_office_hours_weekend(self):
""" Test in_office_hours filtering weekend """
weekends = ([5,6], [0], [], [1,2,3,4,5,6])
daydelta = datetime.timedelta(days=1)
for weekend in weekends:
test_dt = datetime.datetime(1988,12,13,12,0,0)
for _ in range(14):
with self.subTest(weekend=weekend, moment=test_dt):
res = git_oh.in_office_hours(test_dt,
starthour=datetime.time(8,0,0),
stophour=datetime.time(20,0,0),
weekend=weekend)
expected = test_dt.weekday() not in weekend
self.assertEqual(expected, res)
test_dt -= daydelta
def test_office_hours(self):
""" Test in_office_hours filtering hours """
starthour = datetime.time(8,0)
stophour = datetime.time(20,0)
weekend = (5,6)
off_oh = [(7,59,59),(20,0,1), (20,1), (0,0), (1,0)]
in_oh = [(8,0), (20,0), (12,0)]
for time_arg in off_oh:
moment = datetime.datetime(2023, 11, 1, *time_arg)
self.assertFalse(git_oh.in_office_hours(moment,
starthour, stophour, weekend=weekend))
for time_arg in in_oh:
moment = datetime.datetime(2023, 11, 1, *time_arg)
self.assertTrue(git_oh.in_office_hours(moment,
starthour, stophour, weekend=weekend))
def test_office_hours_tz_drop(self):
""" Checks that in_office_hours do not use tzinfo to compare
moment with start & stop hours
"""
moment = datetime.datetime.fromisoformat("2023-11-01T08:00:01+0200")
self.assertTrue(git_oh.in_office_hours(
moment,
datetime.time(8,0),
datetime.time(8,1),
weekend=[]))
moment = moment.astimezone(datetime.timezone.utc)
self.assertFalse(git_oh.in_office_hours(
moment,
datetime.time(8,0),
datetime.time(8,1),
weekend=[]))
def test_office_hours_invalid_weekend(self):
""" Test in_office_hours weekend validation """
bad_weekend = ([-1,0,1], [5,6,7], [7])
for badarg in bad_weekend:
with self.assertRaises(ValueError):
git_oh.in_office_hours(datetime.datetime.now(),
weekend=badarg)
def test_office_hours_tz_warn(self):
""" in_office_hours should warn when tz aware times are given """
utc_tz = datetime.timezone.utc
some_tz = datetime.timezone(datetime.timedelta(hours=2))
tz_aware = [
(datetime.time(1,2,tzinfo=utc_tz), datetime.time(0,0)),
(datetime.time(3,4,tzinfo=some_tz), datetime.time(1,0)),
(datetime.time(4,5), datetime.time(2,0, tzinfo=utc_tz)),
(datetime.time(4,5), datetime.time(2,0, tzinfo=some_tz)),
(datetime.time(1,2,tzinfo=utc_tz),
datetime.time(0,0, tzinfo=some_tz)),
]
for bad_args in tz_aware:
with self.assertWarns(Warning):
git_oh.in_office_hours(datetime.datetime.now(), *bad_args)
class TestCli(unittest.TestCase):
""" Testing CLI arguments parsing """
def test_weekend_parser(self):
""" Testing weekend parser """
for _ in range(10):
weekend = random.choices(list(range(7)), k=random.randint(1,5))
weekend_arg = " , ".join([calendar.day_abbr[i] for i in weekend])
with self.subTest(weekend=f"-w '{weekend_arg}'"):
try:
cliargs = ["file://fake_url", "-w", weekend_arg]
args = git_oh.parse_args(cliargs)
self.assertEqual(set(weekend), set(args.weekend))
except SystemExit:
self.fail(msg="Parser fails to parse --weekend argument")
def test_nul_weekend_parser(self):
""" Testing -w NUl argument support """
try:
args = git_oh.parse_args(["file://fake_url", "-w", "NUL"])
except SystemExit:
self.fail(msg="Parser fails to parse -w NUL")
self.assertEqual(len(args.weekend), 0)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()