"""md_planning main module."""
from datetime import date, datetime, timedelta
import pathlib
import re
import shutil
from typing import Any, Tuple, Union, Dict, List
from functools import singledispatch
from copy import deepcopy
import networkx as nx
from gantt import Resource, Task, Project, Milestone
import gantt
from criticalpath import Node
import yaml
from dateutil.parser import parse as parsedate, ParserError
import pertchart
import pint
import pandas as pd
from math import ceil
from contextlib import contextmanager, redirect_stderr, redirect_stdout
from os import devnull
# import debugpy
# debugpy.listen(5678)
# debugpy.wait_for_client()
# debugpy.breakpoint()
##### Work Units #####
# tailored pint unit registry (overridable) for project management
UREG = pint.UnitRegistry()
# unit = "" #dimensionless unit to define things that come as units
UREG.define("unit = [dimensionless] = _ = u")
# batch = "" #dimensionless unit to define things that come in a group of units
UREG.define("@alias unit = batch = b")
UREG.define("workhour = 1 * hour = _ = wh = workhours")
assert "workhour" in UREG
assert "wh" in UREG
UREG.define(
"workday = 8 * workhour = _ = wd = workdays"
) # average number of working hours during the day
assert "workday" in UREG
assert "wd" in UREG
UREG.define(
"workweek = 5 workday = _ = ww = workweeks"
) # average number of working days during a week
assert "workweek" in UREG
assert "ww" in UREG
UREG.define(
"workmonth = 4.33 workweek = _ = wm = workmonths"
) # average number of working weeks in a month
assert "workmonth" in UREG
assert "wm" in UREG
UREG.define("workquarter = 3 * workmonth = _ = wq = workquarters")
assert "workquarter" in UREG
assert "wq" in UREG
# UREG.define("holiday = 6 * workweek = _ = holidays")
# assert "holiday" in UREG
# UREG.define("workyear = 52 * workweek; offset: -1 * holiday = _ = wy = workyears")
# assert "workyear" in UREG
# assert "wy" in UREG
[docs]@contextmanager
def suppress_stdout_stderr():
"""suppress_stdout_stderr is a context manager that redirects stdout and stderr to devnull."""
with open(devnull, "w") as fnull:
with redirect_stderr(fnull) as err, redirect_stdout(fnull) as out:
yield (err, out)
################################################################################
# Patching Resource class
################################################################################
def _resource_get_registry(self) -> pint.UnitRegistry:
return UREG
Resource.get_registry = _resource_get_registry
def _resource_get_unit(self):
return self.price["unit"]
Resource.get_unit = _resource_get_unit
def _resource_get_price(self):
return float(self.price["value"])
Resource.get_price = _resource_get_price
def _resource_convert(
self, unit: str = "workday", number_of_days: Union[int, float, None] = None
) -> float:
"""
convert finds the price of a resource as it is used on a daily basis.
convert brings all resource, whichever unit, to a per day price for the creation of a budget for a project
:param unit: the granularity of the analysis, defaults to "workday"
:type unit: str, optional
:param number_of_days: the total length of the associated task when the price of the resource is independant from time, defaults to None
:type number_of_days: Union[int, float, None], optional
:return: the price in the user's currency unit
:rtype: float
"""
Q_ = self.get_registry()
if self.get_unit() == "unit":
quantity = Q_(f"{self.get_price()} / {number_of_days} / workday")
elif self.get_unit() == "batch":
quantity = Q_(
f"{self.get_price()} / {self.price['batch_size']} / {number_of_days} / workday"
)
else:
quantity = Q_(f" {self.get_price()} / {self.get_unit()}")
return quantity.to(f"1 / {unit}").m
Resource.convert = _resource_convert
################################################################################
# Patching the Task class
################################################################################
def _is_using(self, resource: str) -> bool:
return resource in [r.name for r in self.resources]
Task.is_using = _is_using
def _is_active(self, date: Union[str, datetime.date]) -> bool:
d = date
if isinstance(d, str):
try:
d = parsedate(date).date()
except ParserError:
raise ValueError(f"Unrecognised Date: {date}")
return self.start_date() <= d <= self.end_date()
Task.is_active = _is_active
################################################################################
# Utility functions
################################################################################
def _normalize_resources(task: dict) -> dict:
res = deepcopy(task)
if "resources" in res and isinstance(res["resources"], str):
if "," in res["resources"]: # type: ignore
res["resources"] = list(map(lambda s: s.strip(), res["resources"].split(",")))
else:
res["resources"] = [res["resources"].strip()]
regex = re.compile(r"^(\d+\.?\d*)\s+(\w+)$")
for ind, resource in enumerate(res["resources"]):
if regex.match(resource) is None:
res["resources"][ind] = f"1 {resource}"
if "resources" in res and res["resources"] is None:
res["resources"] = []
return res
def _normalize_duration(task: dict) -> dict:
res = deepcopy(task)
if res.get("duration") is None: # then best, optimal and worst are defined
res["duration"] = round(
(res["best"] + 4 * res["optimal"] + res["worst"]) / 6, 3
)
else: # no variability in the duration
res["best"] = res["optimal"] = res["worst"] = res["duration"]
return res
def _normalize_depends_on(task: dict) -> dict:
# make sure parent tasks are in the form of List[str]
res = deepcopy(task)
if "depends_on" in res:
if isinstance(res["depends_on"], str):
if "," in res["depends_on"]:
res["depends_on"] = list(
map(lambda s: s.strip(), res["depends_on"].split(","))
)
else:
res["depends_on"] = [res["depends_on"].strip()]
elif res["depends_on"] is None:
res["depends_on"] = []
else:
pass
return res
def _normalize_list_task(task: Union[list, dict]) -> dict:
if isinstance(task, list):
keys = {
"start": None,
"duration": None,
"percent_done": 0,
"resources": [],
"depends_on": [],
"color": None,
"best": None,
"optimal": None,
"worst": None,
}
res = {"type": "task"}
for ind, (k, default) in enumerate(keys.items()):
try:
res[k] = task[ind]
except IndexError:
res[k] = default
return res
elif isinstance(task, dict):
return task
else:
raise ValueError(f"Unknown task definition {task}")
[docs]@singledispatch
def normalize_task(task: Any):
"""
normalize_task changes a list defined task to a dict defined task.
:param task: task defined as a list
:type task: list
:raises NotImplementedError: if the task type is not a recognised format
"""
raise NotImplementedError
@normalize_task.register(dict)
def _(task: dict):
return task
@normalize_task.register(list)
def _(task: list):
keys = {
"start": None,
"duration": None,
"percent_done": 0,
"resources": [],
"depends_on": [],
"color": None,
"best": None,
"optimal": None,
"worst": None,
"is_milestone": False,
}
res = {}
for ind, (k, default) in enumerate(keys.items()):
try:
res[k] = task[ind]
except IndexError:
res[k] = default
return res
[docs]@singledispatch
def get_duration(task: Any):
"""
get_duration makes the duration of a task primarily from the estimated task durations and if not available, the actual duration data.
:param task: the task definition
:type task: Union[dict, list]
:raises NotImplementedType: types other than dict and list will raise error
"""
raise NotImplementedError
@get_duration.register(dict)
def _(task: dict):
if all(
[
task.get("best") is not None,
task.get("optimal") is not None,
task.get("worst") is not None,
]
): # \BUGFIX 406D86
return round((task["best"] + 4 * task["optimal"] + task["worst"]) / 6, 3)
else:
try:
return round(task["duration"], 3)
except KeyError:
raise ValueError(
"Task must define either ('best', 'optimal', 'worst') or 'duration'."
)
@get_duration.register(list)
def _(task: list):
# {
# "start": None,
# "duration": None,
# "percent_done": 0,
# "resources": [],
# "depends_on": [],
# "color": None,
# "best": None,
# "optimal": None,
# "worst": None,
# }
if len(task) == 9:
return round((task[6] + 4 * task[7] + task[8]) / 6, 3)
else:
return round(task[1], 3)
@singledispatch
def _get_resources(res: Any):
"""
_get_resources helper function that returns resources in a project compatible format.
:param res: resource
:type res: Union[str, list, None]
:raises NotImplementedError: if the data is not in a recognised format
:return: the list of resources used by the task
:rtype: List[str]
"""
if res is None:
return []
raise NotImplementedError
@_get_resources.register(str)
def _(res: str):
if "," in res:
return [r.strip() for r in res.split(",")]
else:
return [res.strip()]
@_get_resources.register(list)
def _(res: list):
return res
[docs]@singledispatch
def get_resources(task: Any):
"""
get_resources is a helper function that returns the resources used in a task in a project compatible format.
Works on dict defined tasks and list defined tasks.
:param task: the task description
:type task: Union[dict, list]
:raises NotImplementedError: if the task format is not recognised.
:return: list of resource names
:rtype: List[str]
"""
raise NotImplementedError
@get_resources.register(dict)
def _(task: dict):
return _get_resources(task["resources"])
@get_resources.register(list)
def _(task: list):
return _get_resources(task[3])
@singledispatch
def _get_dependencies(deps: Any):
if deps is None:
return []
raise NotImplementedError
@_get_dependencies.register(str)
def _(deps: str):
if "," in deps:
return list(map(lambda s: s.strip(), deps.split(",")))
else:
return [deps]
@_get_dependencies.register(list)
def _(deps: list):
return deps
[docs]@singledispatch
def get_dependencies(task: Any):
"""
get_dependencies get the dependencies from a task definition.
Works on dict and list defined tasks
:param task: the task definition
:type task: Union[dict, list]
:raises NotImplementedError: if the task definition is not recognised
"""
raise NotImplementedError
@get_dependencies.register(dict)
def _(task: dict):
return _get_dependencies(task["depends_on"])
@get_dependencies.register(list)
def _(task: list):
return _get_dependencies(task[4])
[docs]def rename(
dict_in: dict,
key_from: str,
key_to: str,
flexible: bool = False,
flex_val: Any = None,
) -> dict:
"""
Forgiving function to rename keys in a dictionary.
:dict_in:: the input dictionary
:key_from:: the key to change
:key_to:: the key to rename to
:flexible:: raise error if key_from does not exist dict_in else assign None to key_to
"""
if not flexible and key_from not in dict_in:
raise KeyError(f"{key_from} not in input dictionary")
data = deepcopy(dict_in)
data[key_to] = data.get(key_from, flex_val)
if key_from in data:
del data[key_from]
return data
[docs]@singledispatch
def get_task_type(item: Any) -> Union[str, None]:
"""
get_task_type returns the name of the task type: milestone or task.
:param item: item to be examined
:type item: Any
:raises NotImplementedError: if the item is different from a dict or list
:return: the item tag type
:rtype: str
"""
return None
@get_task_type.register(dict)
def _(item: Dict):
# flat list of tasks
res = item.get("type")
if res in ["task", "milestone"]:
return res
if "percent_done" in item:
return "task"
# nested list of milestones and tasks
return "milestone"
@get_task_type.register(list)
def _(item: List):
return "task"
[docs]def is_nested(tasks: dict) -> bool:
"""
is_nested recognizes the project format to determine if the tasks definition is in the flat or the nested format.
if any task is a milestone and contains another task then is_nested==True
:param tasks: full project task definition
:type tasks: dict
:return: is it a nested definition T/F
:rtype: bool
:BUGFIX: E17FB6 and 39651C
"""
for task in tasks.values():
if get_task_type(task) == "task":
continue
else: # is milestone
for value in task.values():
if get_task_type(value) in ["task", "milestone"]:
return True
return False
def _walk(path, child):
if isinstance(child, dict):
if get_task_type(child) == "task":
child["is_milestone"] = False
yield [tuple(path), child]
else:
for key, value in child.items():
if get_task_type(value) == "milestone":
yield [tuple(path + [key]), {"is_milestone": True}]
yield from _walk(path + [key], value)
else:
yield [tuple(path), child]
[docs]def walk_project_tasks(tasks: dict):
"""
walk_project returns normalized tasks in a nested or flat project configuration
:param tasks: project tasks
:type tasks: dict
:return: list of task_name, task_definition
:rtype: List[Tuple(str, dict)]
"""
if is_nested(tasks):
res = [(pth, normalize_task(tsk)) for (pth, tsk) in list(_walk([], tasks))]
milestones = list(
sorted(
filter(
lambda i: not isinstance(i[1], list) and i[1]["is_milestone"], res
),
key=lambda m: len(m[0]),
reverse=True,
)
)
# milestone_ids = [i[0][-1] for i in milestones]
_tasks = list(
sorted(
filter(
lambda i: isinstance(i[1], list) or not i[1]["is_milestone"], res
),
key=lambda m: len(m[0]),
reverse=True,
)
)
# replace all milestones depends_on that depend on a milestone or a group of tasks
for milestone in milestones:
milestone_path_len = len(milestone[0])
# project = Node(milestone[0][-1])
subtasks = list(
filter(
lambda task: len(task[0]) == milestone_path_len + 1
and task[0][-2] == milestone[0][-1],
res,
)
)
# for subtask in subtasks: # 1 level below milestone and subtask of milestone
# try:
# if subtask[1]["is_milestone"]:
# dep_names = get_dependencies(subtask[1])
# for dep in dep_names:
# _t = list(filter(lambda t: t[0][-1]==dep, _tasks))[0]
# project.add(
# Node(dep, duration=get_duration(subtask[1]), lag=0)
# )
# else:
# project.add(
# Node(
# subtask[0][-1],
# duration=get_duration(subtask[1]),
# lag=0,
# )
# )
# except Exception as err:
# raise ValueError(f"Definition error at {subtask[0][-1]}") from err
# add dependencies
# for subtask in subtasks:
# if (deps:=get_dependencies(subtask[1])):
# updated_deps = []
# for dep in deps: # not dependency but parent really...
# # update milestone dependency to last node in critical path
# if dep in milestone_ids:
# updated_deps.append(get_dependencies(list(filter(lambda m: m[0][-1] == dep, milestones))[0][1])[0]) # there is only one last task on critical path
# else:
# updated_deps.append(dep)
# # filter updated_deps to tasks strictly within descendants of the current milestone
# # subtasks_names = [s[0][-1] for s in subtasks]
# # updated_deps = [d for d in updated_deps if d in subtasks_names]
# # build project links
# for _d in updated_deps:
# try:
# project.link(_d, subtask[0][-1])
# except Exception as err:
# # raise ValueError(f"Cannot link {subtask[0][-1]} and {_d}") from err
# node = Node(
# _d,
# duration=0,
# lag=0,
# )
# project.add(node)
# project.link(_d, subtask[0][-1])
# project.update_all()
# critical_path = list(map(lambda i: i.name, project.get_critical_path()))
# milestone[1]["depends_on"] = [critical_path[0], critical_path[-1]]
milestone[1]["depends_on"] = [subtask[0][-1] for subtask in subtasks]
# milestone[1]["duration"] = project.duration
milestone[1]["duration"] = 0
# milestone[1]["first_node"] = critical_path[0]
# milestone[1]["last_node"] = critical_path[-1]
# # replace all tasks that depend on a milestone to its last node
# for task in _tasks:
# try:
# dependencies = get_dependencies(task[1])
# # make a temporary result list
# dependency_results = []
# # for each item in dependencies
# for dependency in dependencies:
# #if item is a milestone and milestone in task['depends_on']
# if dependency in milestone_ids:
# # get the milestone dependency
# _milestone = list(filter(lambda m: m[0][-1]==dependency, milestones))[0]
# # extend temp result list with this milestone's dependencies
# dependency_results.extend(get_dependencies(_milestone[1]))
# # else:
# else:
# dependency_results.append(dependency)
# #extend the temp result list with the current dependency
# task[1]["depends_on"] = dependency_results
# # task[1]["depends_on"] = [ get_dependencies(list(filter(lambda _milst: _milst[0][-1]==m,milestones))[0][1]) if m==d else d for d in dependencies for m in milestone_ids]
# except Exception as err:
# raise ValueError(f"Dependency error at {task[0][-1]}")
return [(task[0][-1], task[1]) for task in res]
else:
return list(tasks.items())
################################################################################
# Project reader class
################################################################################
[docs]class ProjectReader:
"""
Utility for reading raw project data.
Reads yaml data and returns a representation of the projects
suitable for a PERT and gantt analysis.
"""
def __init__(self, projstr: str, critical_color: Union[str, None] = None):
"""
Read and setup project data.
:projstr:: string representation of the project in yaml syntax
:critical_color:: string/name/hex representation of the color for the tasks on the critical path
"""
self.projstr = projstr
self.data = yaml.safe_load(projstr)
self.font = self.data.get("Font", {})
self.vacations = self.data.get("Vacations", [])
self.resources = self.data.get("Resources", {})
self.projects = {}
self.tasks = {}
self.pertcharts = {}
for project in self.data.get("Projects", []): # type: ignore
proj_name = project.get("Name")
if not proj_name:
raise ValueError("Missing project name")
self.projects[proj_name] = None
tasks: dict = project.get("Tasks")
for name, task in walk_project_tasks(tasks):
##### SETUP #####
payload = {
"project": proj_name,
"task": {
"name": name,
"start": None,
"stop": None,
"duration": None,
"depends_of": None,
"resources": None,
"percent_done": 0,
"color": None,
"fullname": None,
"display": True,
"state": "",
},
"milestone": {
"name": name,
"start": None,
"depends_of": None,
"color": None,
"fullname": None,
"display": True,
},
"pertchart": {},
"cpm": {
"name": name,
"duration": None,
"lag": 0,
"depends_of": None,
},
"is_milestone": False,
}
##### TRANSFORM #####
task = _normalize_list_task(task)
if (
task.get("is_milestone", False)
or task.get("type", "task") == "task"
):
try:
task = _normalize_duration(task)
except (TypeError, KeyError) as err:
raise ValueError(
f"Definition error for duration or estimates in task named {name}"
) from err
else:
task["duration"] = 0
task = _normalize_depends_on(task)
task = _normalize_resources(task)
##### UPDATE #####
##### CPM #####
payload["cpm"]["name"] = name
payload["cpm"]["duration"] = task["duration"]
payload["cpm"]["lag"] = 0
payload["cpm"]["depends_of"] = task.get("depends_on", None)
##### PERTCHART #####
payload["pertchart"]["Tid"] = name
payload["pertchart"]["start"] = 0
payload["pertchart"]["duration"] = task["duration"]
payload["pertchart"]["end"] = 0
payload["pertchart"]["responsible"] = ""
payload["pertchart"]["pred"] = task.get("depends_on", ["START"])
if not payload["pertchart"]["pred"]:
payload["pertchart"]["pred"] = ["START"]
##### TASK #####
for k in payload["task"]:
if k != "depends_on":
payload["task"][k] = task.get(k, payload["task"][k])
payload["task"]["depends_of"] = task.get("depends_on", None)
##### MILESTONE #####
if (
task.get("is_milestone", False)
or task.get("type", "task") == "milestone"
):
payload["is_milestone"] = True
for k in payload["milestone"]:
if k != "depends_on":
payload["milestone"][k] = task.get(
k, payload["milestone"][k]
)
payload["milestone"]["depends_of"] = task.get("depends_on", [])
self.tasks[name] = payload
##### State: Ready for post-processing #####
# CPM for PertChart
critical_path = self._get_critical_path(
{
k: t["cpm"]
for (k, t) in self.tasks.items()
if t["project"] == proj_name
}
)
for task_name in critical_path:
self.tasks[task_name]["pertchart"]["responsible"] = "CRITICAL"
if self.tasks[task_name]["task"]["color"] is None:
self.tasks[task_name]["task"]["color"] = critical_color
def _get_critical_path(self, cpm_data: dict) -> tuple:
"""
_get_critical_path returns tuple of task (node) names on the critical path.
:param cpm_data: cpm data as provided by the initialization
:type cpm_data: dict
:return: tuple of task names
:rtype: tuple
"""
proj = Node("project")
tmp = {}
for k, values in cpm_data.items(): # add nodes
# cpm_data[k]["node"] = Node(
tmp.setdefault(k, {})["node"] = Node(
values["name"], duration=values["duration"], lag=values["lag"]
)
proj.add(tmp[k]["node"])
for k, values in cpm_data.items(): # add links
for link in values["depends_of"]:
# if k and link and tmp:
proj.link(tmp[link]["node"], tmp[k]["node"])
proj.update_all()
return tuple(map(str, proj.get_critical_path())) # type: ignore
[docs] def build_project(self) -> tuple:
"""
build_project returns a tuple with gantt project data and pertchart data.
:return: pos 0 gantt data, pos 1 pert chart data
:rtype: tuple
"""
project_data = {
"font": self.font,
"vacations": self.vacations,
"resources": self.resources,
"projects": self.projects,
"tasks": self.tasks,
}
pertchart_data = {
k: {
t["pertchart"]["Tid"]: t["pertchart"]
for t in self.tasks.values()
if t["project"] == k
}
for k in self.projects
}
return (project_data, pertchart_data)
################################################################################
# Pert drawer class
################################################################################
[docs]class PertDrawer:
"""
PertParser takes a python dictionary in and extracts the pert data.
from it in order to assess project CPM.
It also cleans project data of PERT estimates in order to provide
the proper durations for a gantt representation.
"""
def __init__(self, tasks: dict):
"""
Class initialization.
:param tasks: pert compatible tasks definitions
:type tasks: dict
"""
self.tasks: dict = tasks
[docs] def draw(
self, project: Union[str, None] = None, out: Union[str, None] = None
) -> None:
"""
draw Draws the network of tasks in self.tasks.
:param project: name of the project otherwise all projects tasks will be analysed together, defaults to None
:type project: Union[str, None], optional
:param out: prefix name of the files to output, will default to the name of the project to output "project_pert.pdf" and "project_pert.gv" (graphviz file)
:type out: Union[str, None], optional
"""
if project is None:
_projects = [k for k in self.tasks]
else:
# XXX: this is not of the same type as the definition above...
_projects = [project]
pert = pertchart.PertChart()
for proj in _projects:
calculated = pert.calculate_values(self.tasks[proj])
# see bugfix 0C501D
with suppress_stdout_stderr():
pert.create_pert_chart(calculated)
# see bugfix A14E36
if pathlib.Path("PERT.gv.pdf").is_file():
shutil.move(
"PERT.gv.pdf", f"{out}_pert.pdf" if out else f"{proj}_pert.pdf"
)
if pathlib.Path("PERT.gv").is_file():
shutil.move("PERT.gv", f"{out}_pert.gv" if out else f"{proj}_pert.gv")
# XXX: (refactor) not evidently useful usecase
[docs] def get_critical_path(self, project: str):
"""
get_critical_path returns the critical path of tasks in a specific project.
:param project: name of the project
:type project: str
:raises NotImplementedError: _description_
"""
raise NotImplementedError
################################################################################
# Gantt drawer class
################################################################################
[docs]class GanttDrawer:
"""
Gantt project representation from a yaml file definition.
see tests for project definition structure and class usage
"""
def __init__(self, data):
"""
Class initialization.
:param data: definition of the project as output by ProjectReader class
:type data: dict
"""
self.registry = UREG
self.data = data
self.projects: Dict[str, Project] = {}
self.resources: Dict[str, Resource] = {}
self.vacations = []
self.tasks: Dict[str, Task] = {}
# self._register_font_attribute() #svgwrite issue
self._register_projects()
self._register_resources()
self._register_vacations()
self._register_tasks()
def _register_font_attribute(self):
"""_register_font_attributes changes the default presentation settings."""
# BUG: for whatever reason, the gantt API does not seem to work there...
gantt.define_font_attributes(self.data["font"])
def _register_projects(self) -> None:
"""_register_projects adds all projects and monkey patches class api."""
self.projects = {k: Project(k) for k in self.data["projects"]}
def _register_resources(self) -> None:
"""_register_resources adds and monkey patches class api."""
self.resources = {}
for resource in self.data["resources"]:
try:
res = Resource(resource)
res.price = self.data["resources"][resource].get(
"price", {"value": 0, "unit": "unit"}
)
if res.price["unit"] == "batch":
if "batch_size" not in res.price:
raise ValueError(
f"Error in Resource {res.name}: batch resource definition missing key 'batch_size'"
)
if (
vacations := self.data["resources"][resource].get("vacations")
) is not None:
res.add_vacations(*vacations)
if res.price["unit"] in ["unit", "batch"] and res.vacations:
raise ValueError(
f"Resource:{res.name} cannot have unit/batch unit type and holidays at the same time"
)
except Exception as err:
raise ValueError(f"Definition error in {resource}") from err
self.resources[resource] = res
def _register_vacations(self) -> None:
"""_register_vacations adds vacations at a multiproject level."""
if (vacations := self.data.get("vacations")) is not None:
for _v in vacations:
gantt.add_vacations(_v)
def _register_tasks(self) -> None:
"""
_register_tasks creates the network of tasks to register for the projects.
get_tasks starts with the root task then iterates over all the left
over tasks in search of the dependant tasks. Task names must be
unique so it is safe to refer to them verbatim.
"""
tasks = [
{
"project": t["project"],
"task_name": t["is_milestone"]
and t["milestone"]["name"]
or t["task"]["name"],
"task": t["milestone"] if t["is_milestone"] else t["task"],
"is_milestone": t["is_milestone"],
"depends_of": t["is_milestone"]
and t["milestone"]["depends_of"]
or t["task"]["depends_of"],
}
for t in self.data["tasks"].values()
]
for proj_name, project in self.projects.items():
task_registry = {}
filtered_tasks = list(filter(lambda t: t["project"] == proj_name, tasks))
linked_tasks = [
(pred, task["task_name"])
for task in filtered_tasks
for pred in task["depends_of"]
]
DG = nx.DiGraph(linked_tasks)
for _t_name in nx.topological_sort(DG):
to_register = list(
filter(lambda t: t["task_name"] == _t_name, filtered_tasks)
)[0]
dependents = [task_registry[name] for name in to_register["depends_of"]]
resources = [
self.resources[res_check]
for res in to_register["task"].get("resources", [])
for res_check in self.resources
if res_check in res
]
if to_register["is_milestone"]:
task_registry[_t_name] = Milestone(
**{**to_register["task"], "depends_of": dependents}
)
else:
task_registry[_t_name] = Task(
**{
**to_register["task"],
"depends_of": dependents,
"resources": resources,
}
)
project.add_task(task_registry[_t_name])
self.tasks.update(**task_registry)
[docs] def draw_tasks(
self,
project: str,
filename: Union[str, None] = None,
today: Union[datetime.date, str, None] = None, # type: ignore
start: Union[datetime.date, str, None] = None, # type: ignore
end: Union[datetime.date, str, None] = None, # type: ignore
scale: Union[str, None] = None, # type: ignore
) -> None:
"""
draw_tasks creates a svg visualization of the task execution in the project.
:param project: project name
:type project: str
:param filename: file name override. If None, use the project name, defaults to None
:type filename: Union[str, None], optional
:param today: today's date for task burn down evaluation, defaults to None
:type today: Union[datetime.date, str, None], optional
"""
if filename is None:
filename = f"{project}.svg"
if today == "today":
today = datetime.today().date()
if isinstance(today, str):
today = parsedate(today).date()
if isinstance(start, str):
start = parsedate(start).date()
if isinstance(end, str):
end = parsedate(end).date()
scales = {
"d": gantt.DRAW_WITH_DAILY_SCALE,
"daily": gantt.DRAW_WITH_DAILY_SCALE,
"w": gantt.DRAW_WITH_WEEKLY_SCALE,
"weekly": gantt.DRAW_WITH_WEEKLY_SCALE,
"m": gantt.DRAW_WITH_MONTHLY_SCALE,
"monthly": gantt.DRAW_WITH_MONTHLY_SCALE,
"q": gantt.DRAW_WITH_QUATERLY_SCALE,
"quarterly": gantt.DRAW_WITH_QUATERLY_SCALE,
}
if scale is None:
scale = "d"
scale: str = scales[scale]
self.projects[project].make_svg_for_tasks(
filename=filename, today=today, start=start, end=end, scale=scale
)
[docs] def draw_resources(
self,
project: str,
filename: Union[str, None] = None,
today: Union[datetime.date, str, None] = None, # type: ignore
start: Union[datetime.date, str, None] = None, # type: ignore
end: Union[datetime.date, str, None] = None, # type: ignore
resources: Union[List[str], None] = None, # type: ignore
one_line_for_tasks: bool = False,
scale: Union[str, None] = None, # type: ignore
) -> None:
"""
draw_resources creates a svg visualization of the resource use in the project.
:param project: name of the project
:type project: str
:param filename: file name override. If None use the project name, defaults to None
:type filename: Union[str, None], optional
:param today: today's date for resource use WIP evaluation, defaults to None
:type today: Union[datetime.date, str, None], optional
:param scale: python_gantt scale enumeration see doc, defaults to None
:type scale: Union[str, None], optional
"""
if filename is None:
filename = f"{project}_resources.svg"
if today == "today":
today = datetime.today().date()
if isinstance(today, str):
today = parsedate(today).date()
if isinstance(start, str):
start = parsedate(start).date()
if isinstance(end, str):
end = parsedate(end).date()
if resources:
resources: Resource = [self.resources[res] for res in resources] # type: ignore
scales = {
"d": gantt.DRAW_WITH_DAILY_SCALE,
"daily": gantt.DRAW_WITH_DAILY_SCALE,
"w": gantt.DRAW_WITH_WEEKLY_SCALE,
"weekly": gantt.DRAW_WITH_WEEKLY_SCALE,
"m": gantt.DRAW_WITH_MONTHLY_SCALE,
"monthly": gantt.DRAW_WITH_MONTHLY_SCALE,
"q": gantt.DRAW_WITH_QUATERLY_SCALE,
"quarterly": gantt.DRAW_WITH_QUATERLY_SCALE,
}
if scale is None:
scale = "d"
scale: str = scales[scale]
self.projects[project].make_svg_for_resources(
filename=filename,
today=today,
start=start,
end=end,
resources=resources,
one_line_for_tasks=one_line_for_tasks,
scale=scale,
)
[docs] def define_unit(self, unit: str) -> None:
"""
define_unit registers a new pint unit in the instance unit registry.
:param unit: unit definition (see pint documentation for syntax)
:type unit: str
"""
self.registry.define(unit)
[docs] def define_alias(self, alias: str) -> None:
"""
define_alias makes a new alias for an existing unit.
useful when using a unit in a multilingual and pluralized context
:param alias: alias definition, can start with "@alias" (full definition) or not (abbreviated syntax) (see pint documentation for more details)
:type alias: str
"""
if alias.strip().startswith("@alias"):
self.registry.define(alias)
else:
self.registry.define(f"@alias {alias}")
[docs] def get_unit(self, resource: str) -> str:
"""
get_unit helper function for a project object to get a resource's units
:param project: name of the project
:type project: str
:param resource: name of the resource
:type resource: str
"""
try:
return self.resources[resource].get_unit()
except KeyError:
raise ValueError(f"Unknown resource {resource}")
[docs] def get_price(self, resource: str) -> float:
"""
get_price returns the price of the specified resource
:param resource: name of the resource
:type resource: str
:return: the price of the resource
:rtype: float
"""
try:
return self.resources[resource].get_price()
except KeyError:
raise ValueError(f"Unknown resource {resource}")
[docs] def get_usage(self, task: str, resource: str) -> float:
"""
get_usage returns the amount of a specified resource that a task uses.
:param task: task name
:type task: str
:param resource: resource name
:type resource: str
:raises ValueError: if the task name is not defined
:raises ValueError: if the resource name is not defined
:return: the amount used of specified resource by a task
:rtype: float
"""
try:
resources = self.data["tasks"][task]["task"]["resources"]
except KeyError:
raise ValueError(f"Unknown task {task}")
if resource not in self.resources:
raise ValueError(f"Unknown resource {resource}")
for res in resources:
if res.endswith(resource):
return float(res.split()[0])
return 0.0
[docs] def get_bounds(self) -> Tuple[date, date]:
"""
get_bounds returns the start date and the end date of the whole file definition, all projects taken into consideration.
:return: start_date, end_date
:rtype: Tuple[date, date]
"""
start = None
stop = None
for proj in self.projects.values():
if start is None or start > proj.start_date():
start = proj.start_date()
if stop is None or stop < proj.end_date():
stop = proj.end_date()
return start, stop
[docs] def is_available(self, resource: str, date: Union[str, datetime.date]) -> bool:
"""
is_available returns True if a specific resource is available to use on a specified date.
:param resource: resource name
:type resource: str
:param date: date at which availability is assessed
:type date: Union[str, datetime.date]
:raises ValueError: _description_
:return: True/False
:rtype: bool
"""
start, stop = self.get_bounds()
try:
res = self.resources[resource]
except KeyError:
raise ValueError(f"Resource named {resource} not defined")
d = date
if isinstance(d, str):
try:
d = parsedate(date).date()
except ParserError:
raise ValueError(f"Unrecognised Date: {date}")
if start <= d <= stop:
return res.is_available(d)
return False
[docs] def is_using(self, task: str, resource: str) -> bool:
"""
is_using is a utility function at the project level to determine if the given task is using a specific resource.
:param task: task name
:type task: str
:param resource: resource name
:type resource: str
:return: is task ABC using resource XYZ, True/False
:rtype: bool
"""
return self.tasks[task].is_using(resource)
[docs] def convert(
self, resource: str, number_of_days: Union[int, float, None] = None
) -> float:
"""
convert is a utility function at the project level that gets the daily price of a resource with units.
:param resource: resource name
:type resource: str
:param number_of_days: length of the associated task for resources that are time independant, defaults to None
:type number_of_days: Union[int, float, None], optional
:return: a price in the user's currency unit
:rtype: float
"""
return self.resources[resource].convert(number_of_days=number_of_days)
[docs] def is_active(self, task: str, date: Union[str, datetime.date]) -> bool:
"""
is_active is a utility function at the project level that gets the activity status of a task in a project at a given date.
:param task: task name
:type task: str
:param date: the date when the activity is considered
:type date: Union[str, datetime.date]
:raises ValueError: if the date is not recognised by dateutil.parse
:return: the task is active: True/False
:rtype: bool
"""
d = date
if isinstance(d, str):
try:
d = parsedate(date).date()
except ParserError:
raise ValueError(f"Unrecognised Date: {date}")
return self.tasks[task].is_active(d)
[docs] def duration(self, task: str) -> float:
"""
duration is a utility function at the project level to retrieve a task duration.
:param task: task name
:type task: str
:return: task duration
:rtype: float
"""
return self.tasks[task].duration
[docs] def get_cost(
self, task: str, resource: str, date: Union[str, datetime.date]
) -> float:
"""
get_cost returns the actual cost of a specific resource in a given task, on a given day.
:param task: task name
:type task: str
:param resource: resource name
:type resource: str
:param date: date of consideration
:type date: Union[str, datetime.date]
:raises ValueError: if the date format is not recognised by dateutil.parse
:return: the cost in the user's currency
:rtype: float
"""
d = date
if isinstance(d, str):
try:
d = parsedate(date).date()
except ParserError:
raise ValueError(f"Unrecognised Date: {date}")
task_length = (mytask := self.tasks[task]).duration or float(
mytask.end_date() - mytask.start_date()
) # type: ignore
task_days, task_decimals = (task_length // 1, task_length % 1)
duration_left = (mytask.start_date() + timedelta(days=task_days)) - d
if duration_left.days > task_days: # task not active
return 0
time_scaling = 1 if duration_left.days != 0 else task_decimals
return (
self.convert(resource, number_of_days=ceil(task_length))
* time_scaling
* self.get_usage(task, resource)
* self.is_available(resource, d)
* self.is_using(task, resource)
* self.is_active(task, d)
)
[docs] def budget(self) -> pd.DataFrame:
"""
budget records all the planned expenses
budget records for each day, each task and each resource, the use and associated cost in the project planning.
:return: pandas.DataFrame with columns ["project", "task", "resource", "category", "subcategory", "date", "amount"]
:rtype: pd.DataFrame
"""
columns = [
"project",
"task",
"resource",
"category",
"subcategory",
"date",
"amount",
]
result = []
start, stop = self.get_bounds()
days = (stop - start).days + 1 # the end date is included in the bounds
for proj_name, project in self.projects.items():
for task in project.tasks:
if hasattr(task, "resources"):
for resource in task.resources:
for ind in range(days):
now = start + timedelta(days=ind)
if (
amount := self.get_cost(task.name, resource.name, now)
) != 0:
result.append(
[
proj_name,
task.name,
resource.name,
"",
"",
now,
amount,
]
)
return pd.DataFrame(result, columns=columns)