from pathlib import Path
import sys
import yaml
ROOT_PKG = Path(__file__).parents[1] # Points to install-dir/src/
sys.path.insert(0, str(ROOT_PKG))
from htv.constants import CONF_PATH, DEPENDENCIES, RUNTIME_CONF, DEFAULT_CONF
from collections.abc import Iterable
from datetime import datetime
from typing import TextIO, Any
from tqdm import tqdm
import webbrowser
import subprocess
import pyperclip
import time
import json
import os
import re
__all__ = [
'CONF',
'FsTools',
'Conf',
'Templater'
]
##### C L A S S E S #####
class Cache:
"""
Implements a cache using a temp file.
Cache contains a list of paths, which is overwritten everytime the method :class:`resources.HtbVault.list_resources` is called.
"""
__route__ = Path('/tmp/.htbtlk.cc')
@staticmethod
def get(index: int = None) -> Path | list[Path] | None:
"""Get cache entries
:param index: If None all the contents of the cache are returned
:return: `Path` or a list of them. None if cache was empty or index provided out of bounds
"""
try: # load last list results (.tmp)
with open(Cache.__route__, 'r') as file:
_cc = [Path(i) for i in json.load(file)]
return _cc if index is None else _cc.pop(index)
except FileNotFoundError:
print(f"[!] No cached results. Use option `list` to reload cache")
return None
@staticmethod
def set(items: list) -> None:
"""Update cache with provided items"""
with open(Cache.__route__, 'w') as file:
json.dump([str(i) for i in items], file)
@staticmethod
def clear() -> None:
"""Clears cache"""
if Cache.__route__.exists():
Cache.__route__.unlink()
[docs]
class Conf(dict):
"""
Configuration class. Allows to have a callable runtime instance that read/write the changes to a file.
"""
def __init__(self, runtime: dict, default: dict):
"""Initialize a Configuration instance
:param runtime: runtime parameters
:param default: default parameters. If not found in local file, they will be added again
"""
super().__init__()
self._default = default
self._runtime = runtime
self.load(**self._runtime)
def _save(self) -> None:
"""Save current configuration
Save current configuration parameters to disk.
Runtime values (keys starting by `_`) are not dump into local file.
If a parameter contained environment variables, they are shortened again
:return: None
"""
with open(CONF_PATH, 'w') as file: # Save changes to disk
_data = {}
for k, v in self.items():
if k.startswith('_'): # Skip keys starting with '_', they are only used in execution time
continue
if isinstance(v, int|float):
_data[k] = v
elif hasattr(self, f'_{k.lower()}'): # If vars were expanded, replace them
_data[k] = self.__getattribute__(f'_{k.lower()}')
elif isinstance(v, dict|list):
_data[k] = v
else:
_data[k] = str(v) # cast any non-numeric value to str to avoid serialization problems
yaml.dump(_data, file)
[docs]
def load(self, **kwargs) -> None:
"""Load configuration
(Re)load configuration from local file. Then the provided kwargs are added.
If local file does not exist it is created.
The configured runtime parameters are added.
Additionally, if any default parameter is missing it is added.
:param kwargs: parameters to be added to the configuration
:return: None
"""
try:
with open(CONF_PATH, 'r') as file:
_data = yaml.safe_load(file)
for k in self._default:
if k not in _data:
raise KeyError(f"Missing required configuration parameter '{k}'")
except (FileNotFoundError, KeyError) as e:
if isinstance(e, KeyError):
print(f"[!] '{e}'")
else:
print(f"[!] Conf file not found")
self.reset() # reset to default config
self.load() # Load configuration
else:
self.update_values(**_data) # Load saved configuration
finally:
self.update_values(**kwargs) # Add custom parameters
[docs]
def update_values(self, **kwargs) -> None:
"""Update configuration parameters
If a parameter contains environment variables, they are expanded again
:param kwargs: Values to be updated in the configuration
:return: None
"""
for k, v in kwargs.items():
if isinstance(v, str|Path) and str(v).find('$') != -1: # String contains env variables
self.__setattr__(f'_{k.lower()}', str(v)) # Save original value
self[k.upper()] = Path(os.path.expandvars(v)) # Expand variables
else:
self[k.upper()] = v
self._save()
[docs]
def remove_values(self, *args) -> None:
"""Remove configuration values
Remove configuration values then save changes to disk
:param args: Name(s) of the params to be removed from configuration
:return: None
"""
for v in args:
if v.upper() in self.keys():
self.pop(v.upper())
self._save()
[docs]
def reset(self) -> None:
"""Resets the configuration to defaults parameters
:return: None
"""
print(f"[*] Resetting default config...")
self.clear()
self.update_values(**self._default) # Default conf values
self.update_values(**self._runtime) # Add runtime parameters
self._save()
class Git:
"""
Static class to interact with the repository
"""
@staticmethod
def init() -> None:
"""Initialize Vault repository"""
subprocess.run('git init --initial-branch=main', shell=True, check=True, cwd=CONF['VAULT_DIR'])
@staticmethod
def add_ssh(email: str = None) -> None:
"""Create SSh keys to authenticate on GitHub
:param email: Email associated to your GitHub account
"""
_key_name = 'gh'
email_input = None
while email_input in [None, '']:
email_input = input('>> email: ')
if re.match(r'.+@\w+\.\w{2,}', email_input) is None:
email_input = None
print("[*] Generating keys...")
subprocess.run(f'ssh-keygen -f ~/.ssh/{_key_name} -t ed25519 -C "{email}"', shell=True, check=True)
print("[*] Updating SSH configuration for github.com")
subprocess.run(f"echo 'Host github.com\n\tUser git\n\tIdentityFile ~/.ssh/{_key_name}' >> ~/.ssh/config", shell=True)
print(f"[+] Done\n[*] To complete the setup upload your public key (~/.ssh/{_key_name}.pub) to your GitHub account")
print(f"[*] Public key: {subprocess.run('cat ~/.ssh/gh.pub', shell=True, capture_output=True, text=True).stdout}")
@staticmethod
def config_git_user() -> None:
"""
Prompts user to input name and email to be used for commits.
"""
print('[*] Git Name and email must be configured to push into the repository')
print('[*] Who is in charge of this vault?')
name_input = None
email_input = None
while name_input in [None, '']:
name_input = input('>> name: ')
while email_input in [None, '']:
email_input = input('>> email: ')
if re.match(r'.+@\w+\.\w{2,}', email_input) is None:
email_input = None
print(f"[+] Git user configured {name_input} ({email_input})")
subprocess.run(f'git config user.name "{name_input}"', shell=True, check=True, cwd=CONF['VAULT_DIR'])
subprocess.run(f'git config user.email "{email_input}"', shell=True, check=True, cwd=CONF['VAULT_DIR'])
@staticmethod
def commit(msg: str) -> None:
"""Commit all changes
:param msg: Message associated to the commit
"""
subprocess.run('git add .', shell=True, check=True, cwd=CONF['VAULT_DIR'])
subprocess.run(f'git commit -am "{msg}"', shell=True, check=True, cwd=CONF['VAULT_DIR'])
@staticmethod
def push() -> None:
"""Push changes to remote"""
_proc = subprocess.run('git branch -vv', shell=True, capture_output=True, text=True, cwd=CONF['VAULT_DIR'])
if _proc.stdout.find('[origin/main]'): # Upstream configured
subprocess.run('git push', shell=True, cwd=CONF['VAULT_DIR'])
else: # Set upstream for main branch, then push
subprocess.run('git push -u origin main', shell=True, cwd=CONF['VAULT_DIR'])
class Templater:
@staticmethod
def clean_description(text) -> str:
while re.search(r":\n{3}(.+\n{2})*", text) is not None:
start, end = re.search(r":\n{3}(.+\n{2})*", text).span()
list_text = text[start:end].strip()
list_text = re.sub(r":\n{3}", ':\n- ', list_text)
list_text = re.sub(r"\n\n", "\n- ", list_text)
text = text[:start] + f"{list_text}\n" + text[end:]
return text
@staticmethod
def front_matter(resource):
"""Generates front-matter
:param resource: HtvResource whose front-matter will be generated
"""
# layout: page # 'page' for JeKyll pages, 'post' for post pages, ...
# title: About
# date: 2022-02-02 # Overwrite file name date
# categories: [cat1, cat2]
# permalink: /:categories /: year /:mont /: day /:title.whatever
# filename: YYYY-MM-DD-TITLE.md
return yaml.dump(dict(
title=resource,
date=Templater.now(),
categories=[resource.categories[0], resource.categories[-1]], # [parent_cat, resource_type]
tags=[*resource.metadata.tags, *resource.categories], # TAG names should always be lowercase
# description=resource.metadata.description,
#permalink='/:categories /: year /:mont /: day /:title.whatever'
))
@staticmethod
def backlink(resource: str | Path | Any) -> str | None:
"""
:param resource: HtvResource or path pointing to a category
:return : Link chain from Home to resource, in Markdown format
"""
if isinstance(resource, str | Path): # Generate from path: htb/academy/module -> module, academy, htb
_link_parts = [f"[Home]({'../' * len(Path(resource).parents)}README.md)"]
for ind, _ in enumerate(reversed(resource.__str__().split('/'))):
_link_parts.insert(1, f"[{_}]({'../' * ind if ind > 0 else './'}README.md)")
return ' > '.join(_link_parts)
else:
try: # Generate from HtvResource: htb/academy/module -> htb, academy, module
_link_parts = [f"[Home]({'../' * (len(resource.categories) + 1)}README.md)", resource.metadata.title]
for ind, _ in enumerate(reversed(resource.categories), 1): # Add parent categories links
_link_parts.insert(1, f"[{_}]({'../' * ind}README.md)")
return ' > '.join(_link_parts)
except AttributeError:
print(f"[-] Cannot generate backlink of resource '{resource}'")
return None
@staticmethod
def pagination(resource, section) -> str:
"""Generates pagination link
:param resource: HtvResource instance owning the section
:param section: HtvModule.Section whose pagination links will be generated
"""
_nav_menu_md = ['---\n']
ind = resource.sections.index(section)
if ind < len(resource.sections) - 1:
_nav_menu_md.append(
f"[Next: {resource.sections[ind + 1].title}]"
f"(./{resource.sections[ind + 1].__file_name__})"
f"< br >")
elif ind > 0:
_nav_menu_md.append(
f"[Previous: {resource.sections[ind - 1].title}]"
f"(./{resource.sections[ind - 1].__file_name__})"
f"< br >")
return '\n'.join(_nav_menu_md)
@staticmethod
def now() -> str:
"""
:return : Timezone timestamp
"""
return datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S %z')
@staticmethod
def class_str(obj) -> str:
return re.sub(r"(<class '\w+\.|'>)", '', str(obj.__class__))
@staticmethod
def camel_case(text: str, sep: str = ' ', lower_first: bool = False):
_cased = ''
for ind, w in enumerate(text.split(sep)):
_cased += w if (ind == 0 and lower_first) else w[0].upper() + w[1:]
return _cased
##### F U N C T I O N S #####
def open_browser_tab(url, quiet: bool = True, delay: int = 0) -> None:
"""Open url in a new tab
:param url: URL to be opened
:param quiet: If True runs the command in the background. Else runs in foreground
:param delay: Seconds to wait before and after opening the page
:return: None
"""
time.sleep(delay)
if quiet:
subprocess.run(
'python3 -c "import webbrowser;webbrowser.open_new_tab(\'' + url + '\')" &',
shell=True,
capture_output=True
)
else:
webbrowser.open_new_tab(url)
time.sleep(delay)
def check_updates() -> int:
"""Check for dependencies updates
Check for updates of the required dependencies which are listed in constants.DEPENDENCIES
:return: 0 if dependencies are updated successfully. 1 if update failed. 2 if operation canceled
"""
# TODO: run pip install -U -r requirements.txt
print(f"[*] Checking for updates...")
try:
subprocess.run('sudo apt update', capture_output=True, shell=True, check=True)
subprocess.run(f"sudo apt upgrade {' '.join(DEPENDENCIES)}", shell=True, check=True)
except KeyboardInterrupt:
print("[!] Update cancelled")
return 2
except subprocess.CalledProcessError:
print("[!] Error updating dependencies")
return 1
else:
print(f"[+] Dependencies updated successfully")
return 0
def add_extensions(**kwargs) -> None:
"""Load add-on extensions
:param kwargs: Extensions to be added ext=cat. For example: `.ovpn='htb.vpn'`
"""
_ext = CONF.get('EXTENSIONS', dict())
_ext.update(**kwargs)
CONF.update_values(EXTENSIONS=_ext)
##### D Y N A M I C V A L U E S #####
"""Dynamic configuration"""
CONF = Conf(runtime=RUNTIME_CONF,default=DEFAULT_CONF)