Finally got around to installing This amazing component by @ludeeus

This commit is contained in:
ccostan
2020-02-13 21:52:51 -05:00
parent eed7d54bf2
commit 3b8847acd9
55 changed files with 7396 additions and 0 deletions

View File

@@ -0,0 +1,6 @@
"""Initialize repositories."""
from .theme import HacsTheme
from .integration import HacsIntegration
from .python_script import HacsPythonScript
from .appdaemon import HacsAppdaemon
from .plugin import HacsPlugin

View File

@@ -0,0 +1,95 @@
"""Class for appdaemon apps in HACS."""
from aiogithubapi import AIOGitHubException
from .repository import HacsRepository, register_repository_class
from ..hacsbase.exceptions import HacsException
@register_repository_class
class HacsAppdaemon(HacsRepository):
"""Appdaemon apps in HACS."""
category = "appdaemon"
def __init__(self, full_name):
"""Initialize."""
super().__init__()
self.information.full_name = full_name
self.information.category = self.category
self.content.path.local = self.localpath
self.content.path.remote = "apps"
@property
def localpath(self):
"""Return localpath."""
return f"{self.system.config_path}/appdaemon/apps/{self.information.name}"
async def validate_repository(self):
"""Validate."""
await self.common_validate()
# Custom step 1: Validate content.
try:
addir = await self.repository_object.get_contents("apps", self.ref)
except AIOGitHubException:
raise HacsException(
f"Repostitory structure for {self.ref.replace('tags/','')} is not compliant"
)
if not isinstance(addir, list):
self.validate.errors.append("Repostitory structure not compliant")
self.content.path.remote = addir[0].path
self.information.name = addir[0].name
self.content.objects = await self.repository_object.get_contents(
self.content.path.remote, self.ref
)
self.content.files = []
for filename in self.content.objects:
self.content.files.append(filename.name)
# Handle potential errors
if self.validate.errors:
for error in self.validate.errors:
if not self.system.status.startup:
self.logger.error(error)
return self.validate.success
async def registration(self):
"""Registration."""
if not await self.validate_repository():
return False
# Run common registration steps.
await self.common_registration()
# Set local path
self.content.path.local = self.localpath
async def update_repository(self):
"""Update."""
if self.github.ratelimits.remaining == 0:
return
await self.common_update()
# Get appdaemon objects.
if self.repository_manifest:
if self.repository_manifest.content_in_root:
self.content.path.remote = ""
if self.content.path.remote == "apps":
addir = await self.repository_object.get_contents(
self.content.path.remote, self.ref
)
self.content.path.remote = addir[0].path
self.information.name = addir[0].name
self.content.objects = await self.repository_object.get_contents(
self.content.path.remote, self.ref
)
self.content.files = []
for filename in self.content.objects:
self.content.files.append(filename.name)
# Set local path
self.content.path.local = self.localpath

View File

@@ -0,0 +1,165 @@
"""Class for integrations in HACS."""
import json
from aiogithubapi import AIOGitHubException
from homeassistant.loader import async_get_custom_components
from .repository import HacsRepository, register_repository_class
from ..hacsbase.exceptions import HacsException
@register_repository_class
class HacsIntegration(HacsRepository):
"""Integrations in HACS."""
category = "integration"
def __init__(self, full_name):
"""Initialize."""
super().__init__()
self.information.full_name = full_name
self.information.category = self.category
self.domain = None
self.content.path.remote = "custom_components"
self.content.path.local = self.localpath
@property
def localpath(self):
"""Return localpath."""
return f"{self.system.config_path}/custom_components/{self.domain}"
async def validate_repository(self):
"""Validate."""
await self.common_validate()
# Attach repository
if self.repository_object is None:
self.repository_object = await self.github.get_repo(
self.information.full_name
)
# Custom step 1: Validate content.
if self.repository_manifest:
if self.repository_manifest.content_in_root:
self.content.path.remote = ""
if self.content.path.remote == "custom_components":
try:
ccdir = await self.repository_object.get_contents(
self.content.path.remote, self.ref
)
except AIOGitHubException:
raise HacsException(
f"Repostitory structure for {self.ref.replace('tags/','')} is not compliant"
)
for item in ccdir or []:
if item.type == "dir":
self.content.path.remote = item.path
break
if self.repository_manifest.zip_release:
self.content.objects = self.releases.last_release_object.assets
else:
self.content.objects = await self.repository_object.get_contents(
self.content.path.remote, self.ref
)
self.content.files = []
for filename in self.content.objects or []:
self.content.files.append(filename.name)
if not await self.get_manifest():
self.validate.errors.append("Missing manifest file.")
# Handle potential errors
if self.validate.errors:
for error in self.validate.errors:
if not self.system.status.startup:
self.logger.error(error)
return self.validate.success
async def registration(self):
"""Registration."""
if not await self.validate_repository():
return False
# Run common registration steps.
await self.common_registration()
# Get the content of the manifest file.
await self.get_manifest()
# Set local path
self.content.path.local = self.localpath
async def update_repository(self):
"""Update."""
if self.github.ratelimits.remaining == 0:
return
await self.common_update()
# Get integration objects.
if self.repository_manifest:
if self.repository_manifest.content_in_root:
self.content.path.remote = ""
if self.content.path.remote == "custom_components":
ccdir = await self.repository_object.get_contents(
self.content.path.remote, self.ref
)
if not isinstance(ccdir, list):
self.validate.errors.append("Repostitory structure not compliant")
self.content.path.remote = ccdir[0].path
try:
self.content.objects = await self.repository_object.get_contents(
self.content.path.remote, self.ref
)
except AIOGitHubException:
return
self.content.files = []
if isinstance(self.content.objects, list):
for filename in self.content.objects or []:
self.content.files.append(filename.name)
await self.get_manifest()
# Set local path
self.content.path.local = self.localpath
async def reload_custom_components(self):
"""Reload custom_components (and config flows)in HA."""
self.logger.info("Reloading custom_component cache")
del self.hass.data["custom_components"]
await async_get_custom_components(self.hass)
async def get_manifest(self):
"""Get info from the manifest file."""
manifest_path = f"{self.content.path.remote}/manifest.json"
try:
manifest = await self.repository_object.get_contents(
manifest_path, self.ref
)
manifest = json.loads(manifest.content)
except Exception: # pylint: disable=broad-except
return False
if manifest:
try:
self.manifest = manifest
self.information.authors = manifest["codeowners"]
self.domain = manifest["domain"]
self.information.name = manifest["name"]
self.information.homeassistant_version = manifest.get("homeassistant")
# Set local path
self.content.path.local = self.localpath
return True
except KeyError as exception:
raise HacsException(
f"Missing expected key {exception} in 'manifest.json'"
)
return False

View File

@@ -0,0 +1,42 @@
"""
Manifest handling of a repository.
https://hacs.xyz/docs/publish/start#hacsjson
"""
from typing import List
import attr
from custom_components.hacs.hacsbase.exceptions import HacsException
@attr.s(auto_attribs=True)
class HacsManifest:
"""HacsManifest class."""
name: str = None
content_in_root: bool = False
zip_release: bool = False
filename: str = None
manifest: dict = {}
hacs: str = None
hide_default_branch: bool = False
domains: List[str] = []
country: List[str] = []
homeassistant: str = None
persistent_directory: str = None
iot_class: str = None
render_readme: bool = False
@staticmethod
def from_dict(manifest: dict):
"""Set attributes from dicts."""
if manifest is None:
raise HacsException("Missing manifest data")
manifest_data = HacsManifest()
manifest_data.manifest = manifest
for key in manifest:
setattr(manifest_data, key, manifest[key])
return manifest_data

View File

@@ -0,0 +1,172 @@
"""Class for plugins in HACS."""
import json
from aiogithubapi import AIOGitHubException
from .repository import HacsRepository, register_repository_class
from ..hacsbase.exceptions import HacsException
@register_repository_class
class HacsPlugin(HacsRepository):
"""Plugins in HACS."""
category = "plugin"
def __init__(self, full_name):
"""Initialize."""
super().__init__()
self.information.full_name = full_name
self.information.category = self.category
self.information.file_name = None
self.information.javascript_type = None
self.content.path.local = (
f"{self.system.config_path}/www/community/{full_name.split('/')[-1]}"
)
async def validate_repository(self):
"""Validate."""
# Run common validation steps.
await self.common_validate()
# Custom step 1: Validate content.
await self.get_plugin_location()
if self.content.path.remote is None:
raise HacsException(
f"Repostitory structure for {self.ref.replace('tags/','')} is not compliant"
)
if self.content.path.remote == "release":
self.content.single = True
self.content.files = []
for filename in self.content.objects:
self.content.files.append(filename.name)
# Handle potential errors
if self.validate.errors:
for error in self.validate.errors:
if not self.system.status.startup:
self.logger.error(error)
return self.validate.success
async def registration(self):
"""Registration."""
if not await self.validate_repository():
return False
# Run common registration steps.
await self.common_registration()
async def update_repository(self):
"""Update."""
if self.github.ratelimits.remaining == 0:
return
# Run common update steps.
await self.common_update()
# Get plugin objects.
await self.get_plugin_location()
# Get JS type
await self.parse_readme_for_jstype()
if self.content.path.remote is None:
self.validate.errors.append("Repostitory structure not compliant")
if self.content.path.remote == "release":
self.content.single = True
self.content.files = []
for filename in self.content.objects:
self.content.files.append(filename.name)
async def get_plugin_location(self):
"""Get plugin location."""
if self.content.path.remote is not None:
return
possible_locations = ["dist", "release", ""]
if self.repository_manifest:
if self.repository_manifest.content_in_root:
possible_locations = [""]
for location in possible_locations:
if self.content.path.remote is not None:
continue
try:
objects = []
files = []
if location != "release":
try:
objects = await self.repository_object.get_contents(
location, self.ref
)
except AIOGitHubException:
continue
else:
await self.get_releases()
if self.releases.releases:
if self.releases.last_release_object.assets is not None:
objects = self.releases.last_release_object.assets
for item in objects:
if item.name.endswith(".js"):
files.append(item.name)
# Handler for plug requirement 3
valid_filenames = [
f"{self.information.name.replace('lovelace-', '')}.js",
f"{self.information.name}.js",
f"{self.information.name}.umd.js",
f"{self.information.name}-bundle.js",
]
if self.repository_manifest:
if self.repository_manifest.filename:
valid_filenames.append(self.repository_manifest.filename)
for name in valid_filenames:
if name in files:
# YES! We got it!
self.information.file_name = name
self.content.path.remote = location
self.content.objects = objects
self.content.files = files
break
except SystemError:
pass
async def get_package_content(self):
"""Get package content."""
try:
package = await self.repository_object.get_contents("package.json")
package = json.loads(package.content)
if package:
self.information.authors = package["author"]
except Exception: # pylint: disable=broad-except
pass
async def parse_readme_for_jstype(self):
"""Parse the readme looking for js type."""
readme = None
readme_files = ["readme", "readme.md"]
root = await self.repository_object.get_contents("")
for file in root:
if file.name.lower() in readme_files:
readme = await self.repository_object.get_contents(file.name)
break
if readme is None:
return
readme = readme.content
for line in readme.splitlines():
if "type: module" in line:
self.information.javascript_type = "module"
break
elif "type: js" in line:
self.information.javascript_type = "js"
break

View File

@@ -0,0 +1,87 @@
"""Class for python_scripts in HACS."""
from aiogithubapi import AIOGitHubException
from .repository import HacsRepository, register_repository_class
from ..hacsbase.exceptions import HacsException
@register_repository_class
class HacsPythonScript(HacsRepository):
"""python_scripts in HACS."""
category = "python_script"
def __init__(self, full_name):
"""Initialize."""
super().__init__()
self.information.full_name = full_name
self.information.category = self.category
self.content.path.remote = "python_scripts"
self.content.path.local = f"{self.system.config_path}/python_scripts"
self.content.single = True
async def validate_repository(self):
"""Validate."""
# Run common validation steps.
await self.common_validate()
# Custom step 1: Validate content.
try:
self.content.objects = await self.repository_object.get_contents(
self.content.path.remote, self.ref
)
except AIOGitHubException:
raise HacsException(
f"Repostitory structure for {self.ref.replace('tags/','')} is not compliant"
)
if not isinstance(self.content.objects, list):
self.validate.errors.append("Repostitory structure not compliant")
self.content.files = []
for filename in self.content.objects:
self.content.files.append(filename.name)
# Handle potential errors
if self.validate.errors:
for error in self.validate.errors:
if not self.system.status.startup:
self.logger.error(error)
return self.validate.success
async def registration(self):
"""Registration."""
if not await self.validate_repository():
return False
# Run common registration steps.
await self.common_registration()
# Set name
self.information.name = self.content.objects[0].name.replace(".py", "")
async def update_repository(self): # lgtm[py/similar-function]
"""Update."""
if self.github.ratelimits.remaining == 0:
return
# Run common update steps.
await self.common_update()
# Get python_script objects.
if self.repository_manifest:
if self.repository_manifest.content_in_root:
self.content.path.remote = ""
self.content.objects = await self.repository_object.get_contents(
self.content.path.remote, self.ref
)
self.content.files = []
for filename in self.content.objects:
self.content.files.append(filename.name)
# Update name
self.information.name = self.content.objects[0].name.replace(".py", "")
self.content.files = []
for filename in self.content.objects:
self.content.files.append(filename.name)

View File

@@ -0,0 +1,603 @@
"""Repository."""
# pylint: disable=broad-except, bad-continuation, no-member
import pathlib
import json
import os
import tempfile
import zipfile
from integrationhelper import Validate, Logger
from aiogithubapi import AIOGitHubException
from .manifest import HacsManifest
from ..helpers.misc import get_repository_name
from ..hacsbase import Hacs
from ..hacsbase.exceptions import HacsException
from ..hacsbase.backup import Backup
from ..handler.download import async_download_file, async_save_file
from ..helpers.misc import version_left_higher_then_right
from ..helpers.install import install_repository, version_to_install
RERPOSITORY_CLASSES = {}
def register_repository_class(cls):
"""Register class."""
RERPOSITORY_CLASSES[cls.category] = cls
return cls
class RepositoryVersions:
"""Versions."""
available = None
available_commit = None
installed = None
installed_commit = None
class RepositoryStatus:
"""Repository status."""
hide = False
installed = False
last_updated = None
new = True
selected_tag = None
show_beta = False
track = True
updated_info = False
first_install = True
class RepositoryInformation:
"""RepositoryInformation."""
additional_info = None
authors = []
category = None
default_branch = None
description = ""
state = None
full_name = None
file_name = None
javascript_type = None
homeassistant_version = None
last_updated = None
uid = None
stars = 0
info = None
name = None
topics = []
class RepositoryReleases:
"""RepositoyReleases."""
last_release = None
last_release_object = None
last_release_object_downloads = None
published_tags = []
objects = []
releases = False
class RepositoryPath:
"""RepositoryPath."""
local = None
remote = None
class RepositoryContent:
"""RepositoryContent."""
path = None
files = []
objects = []
single = False
class HacsRepository(Hacs):
"""HacsRepository."""
def __init__(self):
"""Set up HacsRepository."""
self.data = {}
self.content = RepositoryContent()
self.content.path = RepositoryPath()
self.information = RepositoryInformation()
self.repository_object = None
self.status = RepositoryStatus()
self.state = None
self.manifest = {}
self.repository_manifest = HacsManifest.from_dict({})
self.validate = Validate()
self.releases = RepositoryReleases()
self.versions = RepositoryVersions()
self.pending_restart = False
self.logger = None
self.tree = []
self.treefiles = []
self.ref = None
@property
def pending_upgrade(self):
"""Return pending upgrade."""
if self.status.installed:
if self.status.selected_tag is not None:
if self.status.selected_tag == self.information.default_branch:
if self.versions.installed_commit != self.versions.available_commit:
return True
return False
if self.display_installed_version != self.display_available_version:
return True
return False
@property
def config_flow(self):
"""Return bool if integration has config_flow."""
if self.manifest:
if self.information.full_name == "hacs/integration":
return False
return self.manifest.get("config_flow", False)
return False
@property
def custom(self):
"""Return flag if the repository is custom."""
if self.information.full_name.split("/")[0] in [
"custom-components",
"custom-cards",
]:
return False
if self.information.full_name in self.common.default:
return False
if self.information.full_name == "hacs/integration":
return False
return True
@property
def can_install(self):
"""Return bool if repository can be installed."""
target = None
if self.information.homeassistant_version is not None:
target = self.information.homeassistant_version
if self.repository_manifest is not None:
if self.repository_manifest.homeassistant is not None:
target = self.repository_manifest.homeassistant
if target is not None:
if self.releases.releases:
if not version_left_higher_then_right(self.system.ha_version, target):
return False
return True
@property
def display_name(self):
"""Return display name."""
return get_repository_name(
self.repository_manifest,
self.information.name,
self.information.category,
self.manifest,
)
@property
def display_status(self):
"""Return display_status."""
if self.status.new:
status = "new"
elif self.pending_restart:
status = "pending-restart"
elif self.pending_upgrade:
status = "pending-upgrade"
elif self.status.installed:
status = "installed"
else:
status = "default"
return status
@property
def display_status_description(self):
"""Return display_status_description."""
description = {
"default": "Not installed.",
"pending-restart": "Restart pending.",
"pending-upgrade": "Upgrade pending.",
"installed": "No action required.",
"new": "This is a newly added repository.",
}
return description[self.display_status]
@property
def display_installed_version(self):
"""Return display_authors"""
if self.versions.installed is not None:
installed = self.versions.installed
else:
if self.versions.installed_commit is not None:
installed = self.versions.installed_commit
else:
installed = ""
return installed
@property
def display_available_version(self):
"""Return display_authors"""
if self.versions.available is not None:
available = self.versions.available
else:
if self.versions.available_commit is not None:
available = self.versions.available_commit
else:
available = ""
return available
@property
def display_version_or_commit(self):
"""Does the repositoriy use releases or commits?"""
if self.releases.releases:
version_or_commit = "version"
else:
version_or_commit = "commit"
return version_or_commit
@property
def main_action(self):
"""Return the main action."""
actions = {
"new": "INSTALL",
"default": "INSTALL",
"installed": "REINSTALL",
"pending-restart": "REINSTALL",
"pending-upgrade": "UPGRADE",
}
return actions[self.display_status]
async def common_validate(self):
"""Common validation steps of the repository."""
# Attach helpers
self.validate.errors = []
self.logger = Logger(
f"hacs.repository.{self.information.category}.{self.information.full_name}"
)
if self.ref is None:
self.ref = version_to_install(self)
# Step 1: Make sure the repository exist.
self.logger.debug("Checking repository.")
try:
self.repository_object = await self.github.get_repo(
self.information.full_name
)
self.data = self.repository_object.attributes
except Exception as exception: # Gotta Catch 'Em All
if not self.system.status.startup:
self.logger.error(exception)
self.validate.errors.append("Repository does not exist.")
return
if not self.tree:
self.tree = await self.repository_object.get_tree(self.ref)
self.treefiles = []
for treefile in self.tree:
self.treefiles.append(treefile.full_path)
# Step 2: Make sure the repository is not archived.
if self.repository_object.archived:
self.validate.errors.append("Repository is archived.")
return
# Step 3: Make sure the repository is not in the blacklist.
if self.information.full_name in self.common.blacklist:
self.validate.errors.append("Repository is in the blacklist.")
return
# Step 4: default branch
self.information.default_branch = self.repository_object.default_branch
# Step 5: Get releases.
await self.get_releases()
# Step 6: Get the content of hacs.json
await self.get_repository_manifest_content()
# Set repository name
self.information.name = self.information.full_name.split("/")[1]
async def common_registration(self):
"""Common registration steps of the repository."""
# Attach logger
if self.logger is None:
self.logger = Logger(
f"hacs.repository.{self.information.category}.{self.information.full_name}"
)
# Attach repository
if self.repository_object is None:
self.repository_object = await self.github.get_repo(
self.information.full_name
)
# Set id
self.information.uid = str(self.repository_object.id)
# Set topics
self.information.topics = self.repository_object.topics
# Set stargazers_count
self.information.stars = self.repository_object.attributes.get(
"stargazers_count", 0
)
# Set description
if self.repository_object.description:
self.information.description = self.repository_object.description
async def common_update(self):
"""Common information update steps of the repository."""
# Attach logger
if self.logger is None:
self.logger = Logger(
f"hacs.repository.{self.information.category}.{self.information.full_name}"
)
self.logger.debug("Getting repository information")
# Set ref
if self.ref is None:
self.ref = version_to_install(self)
# Attach repository
self.repository_object = await self.github.get_repo(self.information.full_name)
# Update tree
self.tree = await self.repository_object.get_tree(self.ref)
self.treefiles = []
for treefile in self.tree:
self.treefiles.append(treefile.full_path)
# Update description
if self.repository_object.description:
self.information.description = self.repository_object.description
# Set stargazers_count
self.information.stars = self.repository_object.attributes.get(
"stargazers_count", 0
)
# Update default branch
self.information.default_branch = self.repository_object.default_branch
# Update last updaeted
self.information.last_updated = self.repository_object.attributes.get(
"pushed_at", 0
)
# Update topics
self.information.topics = self.repository_object.topics
# Update last available commit
await self.repository_object.set_last_commit()
self.versions.available_commit = self.repository_object.last_commit
# Get the content of hacs.json
await self.get_repository_manifest_content()
# Update "info.md"
await self.get_info_md_content()
# Update releases
await self.get_releases()
async def install(self):
"""Common installation steps of the repository."""
await install_repository(self)
async def download_zip(self, validate):
"""Download ZIP archive from repository release."""
try:
contents = False
for release in self.releases.objects:
self.logger.info(f"ref: {self.ref} --- tag: {release.tag_name}")
if release.tag_name == self.ref.split("/")[1]:
contents = release.assets
if not contents:
return validate
for content in contents or []:
filecontent = await async_download_file(self.hass, content.download_url)
if filecontent is None:
validate.errors.append(f"[{content.name}] was not downloaded.")
continue
result = await async_save_file(
f"{tempfile.gettempdir()}/{self.repository_manifest.filename}",
filecontent,
)
with zipfile.ZipFile(
f"{tempfile.gettempdir()}/{self.repository_manifest.filename}", "r"
) as zip_file:
zip_file.extractall(self.content.path.local)
if result:
self.logger.info(f"download of {content.name} complete")
continue
validate.errors.append(f"[{content.name}] was not downloaded.")
except Exception:
validate.errors.append(f"Download was not complete.")
return validate
async def download_content(self, validate, directory_path, local_directory, ref):
"""Download the content of a directory."""
from custom_components.hacs.helpers.download import download_content
validate = await download_content(self, validate, local_directory)
return validate
async def get_repository_manifest_content(self):
"""Get the content of the hacs.json file."""
if self.ref is None:
self.ref = version_to_install(self)
try:
manifest = await self.repository_object.get_contents("hacs.json", self.ref)
self.repository_manifest = HacsManifest.from_dict(
json.loads(manifest.content)
)
except (AIOGitHubException, Exception): # Gotta Catch 'Em All
pass
async def get_info_md_content(self):
"""Get the content of info.md"""
from ..handler.template import render_template
if self.ref is None:
self.ref = version_to_install(self)
info = None
info_files = ["info", "info.md"]
if self.repository_manifest is not None:
if self.repository_manifest.render_readme:
info_files = ["readme", "readme.md"]
try:
root = await self.repository_object.get_contents("", self.ref)
for file in root:
if file.name.lower() in info_files:
info = await self.repository_object.get_contents(
file.name, self.ref
)
break
if info is None:
self.information.additional_info = ""
else:
info = info.content.replace("<svg", "<disabled").replace(
"</svg", "</disabled"
)
self.information.additional_info = render_template(info, self)
except (AIOGitHubException, Exception):
self.information.additional_info = ""
async def get_releases(self):
"""Get repository releases."""
if self.status.show_beta:
self.releases.objects = await self.repository_object.get_releases(
prerelease=True, returnlimit=self.configuration.release_limit
)
else:
self.releases.objects = await self.repository_object.get_releases(
prerelease=False, returnlimit=self.configuration.release_limit
)
if not self.releases.objects:
return
self.releases.releases = True
self.releases.published_tags = []
for release in self.releases.objects:
self.releases.published_tags.append(release.tag_name)
self.releases.last_release_object = self.releases.objects[0]
if self.status.selected_tag is not None:
if self.status.selected_tag != self.information.default_branch:
for release in self.releases.objects:
if release.tag_name == self.status.selected_tag:
self.releases.last_release_object = release
break
if self.releases.last_release_object.assets:
self.releases.last_release_object_downloads = self.releases.last_release_object.assets[
0
].attributes.get(
"download_count"
)
self.versions.available = self.releases.objects[0].tag_name
def remove(self):
"""Run remove tasks."""
# Attach logger
if self.logger is None:
self.logger = Logger(
f"hacs.repository.{self.information.category}.{self.information.full_name}"
)
self.logger.info("Starting removal")
if self.information.uid in self.common.installed:
self.common.installed.remove(self.information.uid)
for repository in self.repositories:
if repository.information.uid == self.information.uid:
self.repositories.remove(repository)
async def uninstall(self):
"""Run uninstall tasks."""
# Attach logger
if self.logger is None:
self.logger = Logger(
f"hacs.repository.{self.information.category}.{self.information.full_name}"
)
self.logger.info("Uninstalling")
await self.remove_local_directory()
self.status.installed = False
if self.information.category == "integration":
if self.config_flow:
await self.reload_custom_components()
else:
self.pending_restart = True
elif self.information.category == "theme":
try:
await self.hass.services.async_call("frontend", "reload_themes", {})
except Exception: # pylint: disable=broad-except
pass
if self.information.full_name in self.common.installed:
self.common.installed.remove(self.information.full_name)
self.versions.installed = None
self.versions.installed_commit = None
self.hass.bus.async_fire(
"hacs/repository",
{
"id": 1337,
"action": "uninstall",
"repository": self.information.full_name,
},
)
async def remove_local_directory(self):
"""Check the local directory."""
import shutil
from asyncio import sleep
try:
if self.information.category == "python_script":
local_path = "{}/{}.py".format(
self.content.path.local, self.information.name
)
elif self.information.category == "theme":
local_path = "{}/{}.yaml".format(
self.content.path.local, self.information.name
)
else:
local_path = self.content.path.local
if os.path.exists(local_path):
self.logger.debug(f"Removing {local_path}")
if self.information.category in ["python_script", "theme"]:
os.remove(local_path)
else:
shutil.rmtree(local_path)
while os.path.exists(local_path):
await sleep(1)
except Exception as exception:
self.logger.debug(f"Removing {local_path} failed with {exception}")
return

View File

@@ -0,0 +1,110 @@
"""Class for themes in HACS."""
from .repository import HacsRepository, register_repository_class
from ..hacsbase.exceptions import HacsException
from ..helpers.filters import filter_content_return_one_of_type, find_first_of_filetype
@register_repository_class
class HacsTheme(HacsRepository):
"""Themes in HACS."""
category = "theme"
def __init__(self, full_name):
"""Initialize."""
super().__init__()
self.information.full_name = full_name
self.information.category = self.category
self.content.path.remote = "themes"
self.content.path.local = f"{self.system.config_path}/themes"
self.content.single = False
async def validate_repository(self):
"""Validate."""
# Run common validation steps.
await self.common_validate()
# Custom step 1: Validate content.
compliant = False
for treefile in self.treefiles:
self.logger.debug(treefile)
if treefile.startswith("themes/") and treefile.endswith(".yaml"):
compliant = True
break
if not compliant:
raise HacsException(
f"Repostitory structure for {self.ref.replace('tags/','')} is not compliant"
)
if self.repository_manifest:
if self.repository_manifest.content_in_root:
self.content.path.remote = ""
self.content.objects = await self.repository_object.get_contents(
self.content.path.remote, self.ref
)
if not isinstance(self.content.objects, list):
self.validate.errors.append("Repostitory structure not compliant")
self.content.files = filter_content_return_one_of_type(
self.treefiles, "themes", "yaml"
)
# Handle potential errors
if self.validate.errors:
for error in self.validate.errors:
if not self.system.status.startup:
self.logger.error(error)
return self.validate.success
async def registration(self):
"""Registration."""
if not await self.validate_repository():
return False
# Run common registration steps.
await self.common_registration()
# Set name
if self.repository_manifest.filename is not None:
self.information.file_name = self.repository_manifest.filename
else:
self.information.file_name = find_first_of_filetype(
self.content.files, "yaml"
).split("/")[-1]
self.information.name = self.information.file_name.replace(".yaml", "")
self.content.path.local = (
f"{self.system.config_path}/themes/{self.information.name}"
)
async def update_repository(self): # lgtm[py/similar-function]
"""Update."""
if self.github.ratelimits.remaining == 0:
return
# Run common update steps.
await self.common_update()
# Get theme objects.
if self.repository_manifest:
if self.repository_manifest.content_in_root:
self.content.path.remote = ""
self.content.objects = await self.repository_object.get_contents(
self.content.path.remote, self.ref
)
self.content.files = filter_content_return_one_of_type(
self.treefiles, "themes", "yaml"
)
# Update name
if self.repository_manifest.filename is not None:
self.information.file_name = self.repository_manifest.filename
else:
self.information.file_name = find_first_of_filetype(
self.content.files, "yaml"
).split("/")[-1]
self.information.name = self.information.file_name.replace(".yaml", "")
self.content.path.local = (
f"{self.system.config_path}/themes/{self.information.name}"
)