cleaned up, and updated to latest version.

This commit is contained in:
Mahasri Kalavala
2022-11-29 19:50:52 -05:00
parent 9a8c586542
commit 3bedcbdc20
77 changed files with 3853 additions and 7279 deletions

View File

@@ -1,179 +0,0 @@
# """
# Search images for tagged objects via a local Tagbox instance.
# For more details about this platform, please refer to the documentation at
# https://home-assistant.io/components/image_processing.tagbox
# This file is stolen from @robmarkcole's repo
# """
# import base64
# import requests
# import logging
# import voluptuous as vol
# from homeassistant.core import (
# callback, split_entity_id)
# import homeassistant.helpers.config_validation as cv
# from homeassistant.components.image_processing import (
# PLATFORM_SCHEMA, ImageProcessingEntity, ATTR_CONFIDENCE, CONF_CONFIDENCE,
# CONF_SOURCE, CONF_ENTITY_ID, CONF_NAME)
# from homeassistant.const import (
# ATTR_ENTITY_ID, ATTR_NAME, CONF_IP_ADDRESS, CONF_PORT)
# from homeassistant.util.async_ import run_callback_threadsafe
# _LOGGER = logging.getLogger(__name__)
# CLASSIFIER = 'tagbox'
# EVENT_DETECT_TAG = 'image_processing.detect_tag'
# TIMEOUT = 9
# PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
# vol.Required(CONF_IP_ADDRESS): cv.string,
# vol.Required(CONF_PORT): cv.port,
# })
# def encode_image(image):
# """base64 encode an image stream."""
# base64_img = base64.b64encode(image).decode('ascii')
# return base64_img
# def get_matched_tags(tags, confidence):
# """Return the name and rounded confidence of matched tags."""
# return {tag['name']: tag['confidence']
# for tag in tags if tag['confidence'] > confidence}
# def parse_tags(api_tags):
# """Parse the API tag data into the format required."""
# parsed_tags = []
# for entry in api_tags:
# tag = {}
# tag[ATTR_NAME] = entry['tag']
# tag[ATTR_CONFIDENCE] = round(100.0*entry['confidence'], 2)
# parsed_tags.append(tag)
# return parsed_tags
# def post_image(url, image):
# """Post an image to the classifier."""
# try:
# response = requests.post(
# url,
# json={"base64": encode_image(image)},
# timeout=TIMEOUT
# )
# return response
# except requests.exceptions.ConnectionError:
# _LOGGER.error("ConnectionError: Is %s running?", CLASSIFIER)
# def setup_platform(hass, config, add_devices, discovery_info=None):
# """Set up the classifier."""
# entities = []
# for camera in config[CONF_SOURCE]:
# entities.append(ImageProcessingTagEntity(
# config[CONF_IP_ADDRESS],
# config[CONF_PORT],
# camera[CONF_ENTITY_ID],
# camera.get(CONF_NAME),
# config[CONF_CONFIDENCE],
# ))
# add_devices(entities)
# class ImageProcessingTagEntity(ImageProcessingEntity):
# """Perform a tag search via a Tagbox."""
# def __init__(self, ip, port, camera_entity, name, confidence):
# """Init with the IP and PORT"""
# super().__init__()
# self._url_check = "http://{}:{}/{}/check".format(ip, port, CLASSIFIER)
# self._camera = camera_entity
# if name:
# self._name = name
# else:
# camera_name = split_entity_id(camera_entity)[1]
# self._name = "{} {}".format(
# CLASSIFIER, camera_name)
# self._confidence = confidence
# self.tags = []
# self._matched = {}
# def process_image(self, image):
# """Process an image."""
# response = post_image(self._url_check, image)
# if response is not None:
# response_json = response.json()
# if response_json['success']:
# api_tags = response_json['tags'] + response_json['custom_tags']
# tags = parse_tags(api_tags)
# self.process_tags(tags)
# self._matched = get_matched_tags(tags, self.confidence)
# else:
# self.tags = []
# self._matched = {}
# @property
# def confidence(self):
# """Return minimum confidence for send events."""
# return self._confidence
# @property
# def state(self):
# """Return the state of the entity."""
# state = None
# if len(self._matched) > 0:
# return self.tags[0][ATTR_NAME]
# return state
# def process_tags(self, tags):
# """Send event with detected tags and store data."""
# run_callback_threadsafe(
# self.hass.loop, self.async_process_tags, tags).result()
# @callback
# def async_process_tags(self, tags):
# """Send event with detected tags and store data.
# Tags are a dict in follow format:
# [
# {
# ATTR_CONFIDENCE: 80,
# ATTR_NAME: 'people',
# },
# ]
# This method must be run in the event loop.
# """
# # Send events
# for tag in tags:
# tag.update({ATTR_ENTITY_ID: self.entity_id})
# if tag[ATTR_CONFIDENCE] > self.confidence:
# self.hass.async_add_job(
# self.hass.bus.async_fire, EVENT_DETECT_TAG, tag
# )
# # Update entity store
# self.tags = tags
# @property
# def camera_entity(self):
# """Return camera entity id from process pictures."""
# return self._camera
# @property
# def name(self):
# """Return the name of the sensor."""
# return self._name
# @property
# def device_state_attributes(self):
# """Return other details about the sensor state."""
# return {
# 'tags': self.tags,
# 'total_tags': len(self.tags),
# 'matched_tags': self._matched,
# 'total_matched_tags': len(self._matched),
# }

View File

@@ -1,347 +0,0 @@
# """
# Component that performs TensorFlow classification on images.
# For a quick start, pick a pre-trained COCO model from:
# https://github.com/tensorflow/models/blob/master/research/object_detection/g3doc/detection_model_zoo.md
# For more details about this platform, please refer to the documentation at
# https://home-assistant.io/components/image_processing.tensorflow/
# """
# import logging
# import sys
# import os
# import voluptuous as vol
# from homeassistant.components.image_processing import (
# CONF_CONFIDENCE, CONF_ENTITY_ID, CONF_NAME, CONF_SOURCE, PLATFORM_SCHEMA,
# ImageProcessingEntity)
# from homeassistant.core import split_entity_id
# from homeassistant.helpers import template
# import homeassistant.helpers.config_validation as cv
# REQUIREMENTS = ['numpy==1.15.3', 'pillow==5.2.0', 'protobuf==3.6.1']
# _LOGGER = logging.getLogger(__name__)
# ATTR_MATCHES = 'matches'
# ATTR_SUMMARY = 'summary'
# ATTR_TOTAL_MATCHES = 'total_matches'
# CONF_FILE_OUT = 'file_out'
# CONF_MODEL = 'model'
# CONF_GRAPH = 'graph'
# CONF_LABELS = 'labels'
# CONF_MODEL_DIR = 'model_dir'
# CONF_CATEGORIES = 'categories'
# CONF_CATEGORY = 'category'
# CONF_AREA = 'area'
# CONF_TOP = 'top'
# CONF_LEFT = 'left'
# CONF_BOTTOM = 'bottom'
# CONF_RIGHT = 'right'
# AREA_SCHEMA = vol.Schema({
# vol.Optional(CONF_TOP, default=0): cv.small_float,
# vol.Optional(CONF_LEFT, default=0): cv.small_float,
# vol.Optional(CONF_BOTTOM, default=1): cv.small_float,
# vol.Optional(CONF_RIGHT, default=1): cv.small_float
# })
# CATEGORY_SCHEMA = vol.Schema({
# vol.Required(CONF_CATEGORY): cv.string,
# vol.Optional(CONF_AREA): AREA_SCHEMA
# })
# PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
# vol.Optional(CONF_FILE_OUT, default=[]):
# vol.All(cv.ensure_list, [cv.template]),
# vol.Required(CONF_MODEL): vol.Schema({
# vol.Required(CONF_GRAPH): cv.isfile,
# vol.Optional(CONF_LABELS): cv.isfile,
# vol.Optional(CONF_MODEL_DIR): cv.isdir,
# vol.Optional(CONF_AREA): AREA_SCHEMA,
# vol.Optional(CONF_CATEGORIES, default=[]):
# vol.All(cv.ensure_list, [vol.Any(
# cv.string,
# CATEGORY_SCHEMA
# )])
# })
# })
# def draw_box(draw, box, img_width,
# img_height, text='', color=(255, 255, 0)):
# """Draw bounding box on image."""
# ymin, xmin, ymax, xmax = box
# (left, right, top, bottom) = (xmin * img_width, xmax * img_width,
# ymin * img_height, ymax * img_height)
# draw.line([(left, top), (left, bottom), (right, bottom),
# (right, top), (left, top)], width=5, fill=color)
# if text:
# draw.text((left, abs(top-15)), text, fill=color)
# def setup_platform(hass, config, add_entities, discovery_info=None):
# """Set up the TensorFlow image processing platform."""
# model_config = config.get(CONF_MODEL)
# model_dir = model_config.get(CONF_MODEL_DIR) \
# or hass.config.path('tensorflow')
# labels = model_config.get(CONF_LABELS) \
# or hass.config.path('tensorflow', 'object_detection',
# 'data', 'mscoco_label_map.pbtxt')
# # Make sure locations exist
# if not os.path.isdir(model_dir) or not os.path.exists(labels):
# _LOGGER.error("Unable to locate tensorflow models or label map.")
# return
# # append custom model path to sys.path
# sys.path.append(model_dir)
# try:
# # Verify that the TensorFlow Object Detection API is pre-installed
# # pylint: disable=unused-import,unused-variable
# os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
# import tensorflow as tf # noqa
# from object_detection.utils import label_map_util # noqa
# except ImportError:
# # pylint: disable=line-too-long
# _LOGGER.error(
# "No TensorFlow Object Detection library found! Install or compile "
# "for your system following instructions here: "
# "https://github.com/tensorflow/models/blob/master/research/object_detection/g3doc/installation.md") # noqa
# return
# try:
# # Display warning that PIL will be used if no OpenCV is found.
# # pylint: disable=unused-import,unused-variable
# import cv2 # noqa
# except ImportError:
# _LOGGER.warning("No OpenCV library found. "
# "TensorFlow will process image with "
# "PIL at reduced resolution.")
# # setup tensorflow graph, session, and label map to pass to processor
# # pylint: disable=no-member
# detection_graph = tf.Graph()
# with detection_graph.as_default():
# od_graph_def = tf.GraphDef()
# with tf.gfile.GFile(model_config.get(CONF_GRAPH), 'rb') as fid:
# serialized_graph = fid.read()
# od_graph_def.ParseFromString(serialized_graph)
# tf.import_graph_def(od_graph_def, name='')
# session = tf.Session(graph=detection_graph)
# label_map = label_map_util.load_labelmap(labels)
# categories = label_map_util.convert_label_map_to_categories(
# label_map, max_num_classes=90, use_display_name=True)
# category_index = label_map_util.create_category_index(categories)
# entities = []
# for camera in config[CONF_SOURCE]:
# entities.append(TensorFlowImageProcessor(
# hass, camera[CONF_ENTITY_ID], camera.get(CONF_NAME),
# session, detection_graph, category_index, config))
# add_entities(entities)
# class TensorFlowImageProcessor(ImageProcessingEntity):
# """Representation of an TensorFlow image processor."""
# def __init__(self, hass, camera_entity, name, session, detection_graph,
# category_index, config):
# """Initialize the TensorFlow entity."""
# model_config = config.get(CONF_MODEL)
# self.hass = hass
# self._camera_entity = camera_entity
# if name:
# self._name = name
# else:
# self._name = "TensorFlow {0}".format(
# split_entity_id(camera_entity)[1])
# self._session = session
# self._graph = detection_graph
# self._category_index = category_index
# self._min_confidence = config.get(CONF_CONFIDENCE)
# self._file_out = config.get(CONF_FILE_OUT)
# # handle categories and specific detection areas
# categories = model_config.get(CONF_CATEGORIES)
# self._include_categories = []
# self._category_areas = {}
# for category in categories:
# if isinstance(category, dict):
# category_name = category.get(CONF_CATEGORY)
# category_area = category.get(CONF_AREA)
# self._include_categories.append(category_name)
# self._category_areas[category_name] = [0, 0, 1, 1]
# if category_area:
# self._category_areas[category_name] = [
# category_area.get(CONF_TOP),
# category_area.get(CONF_LEFT),
# category_area.get(CONF_BOTTOM),
# category_area.get(CONF_RIGHT)
# ]
# else:
# self._include_categories.append(category)
# self._category_areas[category] = [0, 0, 1, 1]
# # Handle global detection area
# self._area = [0, 0, 1, 1]
# area_config = model_config.get(CONF_AREA)
# if area_config:
# self._area = [
# area_config.get(CONF_TOP),
# area_config.get(CONF_LEFT),
# area_config.get(CONF_BOTTOM),
# area_config.get(CONF_RIGHT)
# ]
# template.attach(hass, self._file_out)
# self._matches = {}
# self._total_matches = 0
# self._last_image = None
# @property
# def camera_entity(self):
# """Return camera entity id from process pictures."""
# return self._camera_entity
# @property
# def name(self):
# """Return the name of the image processor."""
# return self._name
# @property
# def state(self):
# """Return the state of the entity."""
# return self._total_matches
# @property
# def device_state_attributes(self):
# """Return device specific state attributes."""
# return {
# ATTR_MATCHES: self._matches,
# ATTR_SUMMARY: {category: len(values)
# for category, values in self._matches.items()},
# ATTR_TOTAL_MATCHES: self._total_matches
# }
# def _save_image(self, image, matches, paths):
# from PIL import Image, ImageDraw
# import io
# img = Image.open(io.BytesIO(bytearray(image))).convert('RGB')
# img_width, img_height = img.size
# draw = ImageDraw.Draw(img)
# # Draw custom global region/area
# if self._area != [0, 0, 1, 1]:
# draw_box(draw, self._area,
# img_width, img_height,
# "Detection Area", (0, 255, 255))
# for category, values in matches.items():
# # Draw custom category regions/areas
# if (category in self._category_areas
# and self._category_areas[category] != [0, 0, 1, 1]):
# label = "{} Detection Area".format(category.capitalize())
# draw_box(draw, self._category_areas[category], img_width,
# img_height, label, (0, 255, 0))
# # Draw detected objects
# for instance in values:
# label = "{0} {1:.1f}%".format(category, instance['score'])
# draw_box(draw, instance['box'],
# img_width, img_height,
# label, (255, 255, 0))
# for path in paths:
# _LOGGER.info("Saving results image to %s", path)
# img.save(path)
# def process_image(self, image):
# """Process the image."""
# import numpy as np
# try:
# import cv2 # pylint: disable=import-error
# img = cv2.imdecode(
# np.asarray(bytearray(image)), cv2.IMREAD_UNCHANGED)
# inp = img[:, :, [2, 1, 0]] # BGR->RGB
# inp_expanded = inp.reshape(1, inp.shape[0], inp.shape[1], 3)
# except ImportError:
# from PIL import Image
# import io
# img = Image.open(io.BytesIO(bytearray(image))).convert('RGB')
# img.thumbnail((460, 460), Image.ANTIALIAS)
# img_width, img_height = img.size
# inp = np.array(img.getdata()).reshape(
# (img_height, img_width, 3)).astype(np.uint8)
# inp_expanded = np.expand_dims(inp, axis=0)
# image_tensor = self._graph.get_tensor_by_name('image_tensor:0')
# boxes = self._graph.get_tensor_by_name('detection_boxes:0')
# scores = self._graph.get_tensor_by_name('detection_scores:0')
# classes = self._graph.get_tensor_by_name('detection_classes:0')
# boxes, scores, classes = self._session.run(
# [boxes, scores, classes],
# feed_dict={image_tensor: inp_expanded})
# boxes, scores, classes = map(np.squeeze, [boxes, scores, classes])
# classes = classes.astype(int)
# matches = {}
# total_matches = 0
# for box, score, obj_class in zip(boxes, scores, classes):
# score = score * 100
# boxes = box.tolist()
# # Exclude matches below min confidence value
# if score < self._min_confidence:
# continue
# # Exclude matches outside global area definition
# if (boxes[0] < self._area[0] or boxes[1] < self._area[1]
# or boxes[2] > self._area[2] or boxes[3] > self._area[3]):
# continue
# category = self._category_index[obj_class]['name']
# # Exclude unlisted categories
# if (self._include_categories
# and category not in self._include_categories):
# continue
# # Exclude matches outside category specific area definition
# if (self._category_areas
# and (boxes[0] < self._category_areas[category][0]
# or boxes[1] < self._category_areas[category][1]
# or boxes[2] > self._category_areas[category][2]
# or boxes[3] > self._category_areas[category][3])):
# continue
# # If we got here, we should include it
# if category not in matches.keys():
# matches[category] = []
# matches[category].append({
# 'score': float(score),
# 'box': boxes
# })
# total_matches += 1
# # Save Images
# if total_matches and self._file_out:
# paths = []
# for path_template in self._file_out:
# if isinstance(path_template, template.Template):
# paths.append(path_template.render(
# camera_entity=self._camera_entity))
# else:
# paths.append(path_template)
# self._save_image(image, matches, paths)
# self._matches = matches
# self._total_matches = total_matches

View File

@@ -1,267 +0,0 @@
"""
@ Author : Suresh Kalavala
@ Date : 05/24/2017
@ Description : Life365 Sensor - It queries Life360 API and retrieves
data at a specified interval and dumps into MQTT
@ Notes: Copy this file and place it in your
"Home Assistant Config folder\custom_components\sensor\" folder
Copy corresponding Life365 Package frommy repo,
and make sure you have MQTT installed and Configured
Make sure the life365 password doesn't contain '#' or '$' symbols
"""
from datetime import timedelta
import logging
import subprocess
import json
import voluptuous as vol
import homeassistant.components.mqtt as mqtt
from io import StringIO
from homeassistant.components.mqtt import (CONF_STATE_TOPIC, CONF_COMMAND_TOPIC, CONF_QOS, CONF_RETAIN)
from homeassistant.helpers import template
from homeassistant.exceptions import TemplateError
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_NAME, CONF_VALUE_TEMPLATE, CONF_UNIT_OF_MEASUREMENT,
STATE_UNKNOWN)
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['mqtt']
DEFAULT_NAME = 'Life365 Sensor'
CONST_MQTT_TOPIC = "mqtt_topic"
CONST_STATE_ERROR = "error"
CONST_STATE_RUNNING = "running"
CONST_USERNAME = "username"
CONST_PASSWORD = "password"
COMMAND1 = "curl -s -X POST -H \"Authorization: Basic cFJFcXVnYWJSZXRyZTRFc3RldGhlcnVmcmVQdW1hbUV4dWNyRUh1YzptM2ZydXBSZXRSZXN3ZXJFQ2hBUHJFOTZxYWtFZHI0Vg==\" -F \"grant_type=password\" -F \"username=USERNAME360\" -F \"password=PASSWORD360\" https://api.life360.com/v3/oauth2/token.json | grep -Po '(?<=\"access_token\":\")\\w*'"
COMMAND2 = "curl -s -X GET -H \"Authorization: Bearer ACCESS_TOKEN\" https://api.life360.com/v3/circles.json | grep -Po '(?<=\"id\":\")[\\w-]*'"
COMMAND3 = "curl -s -X GET -H \"Authorization: Bearer ACCESS_TOKEN\" https://api.life360.com/v3/circles/ID"
SCAN_INTERVAL = timedelta(seconds=60)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONST_USERNAME): cv.string,
vol.Required(CONST_PASSWORD): cv.string,
vol.Required(CONST_MQTT_TOPIC): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
})
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the Life365 Sensor."""
name = config.get(CONF_NAME)
username = config.get(CONST_USERNAME)
password = config.get(CONST_PASSWORD)
mqtt_topic = config.get(CONST_MQTT_TOPIC)
unit = config.get(CONF_UNIT_OF_MEASUREMENT)
value_template = config.get(CONF_VALUE_TEMPLATE)
if value_template is not None:
value_template.hass = hass
data = Life365SensorData(username, password, COMMAND1, COMMAND2, COMMAND3, mqtt_topic, hass)
add_devices([Life365Sensor(hass, data, name, unit, value_template)])
class Life365Sensor(Entity):
"""Representation of a sensor."""
def __init__(self, hass, data, name, unit_of_measurement, value_template):
"""Initialize the sensor."""
self._hass = hass
self.data = data
self._name = name
self._state = STATE_UNKNOWN
self._unit_of_measurement = unit_of_measurement
self._value_template = value_template
self.update()
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def unit_of_measurement(self):
"""Return the unit the value is expressed in."""
return self._unit_of_measurement
@property
def state(self):
"""Return the state of the device."""
return self._state
def update(self):
"""Get the latest data and updates the state."""
self.data.update()
value = self.data.value
if value is None:
value = STATE_UNKNOWN
elif self._value_template is not None:
self._state = self._value_template.render_with_possible_json_value(
value, STATE_UNKNOWN)
else:
self._state = value
class Life365SensorData(object):
"""The class for handling the data retrieval."""
def __init__(self, username, password, command1, command2, command3, mqtt_topic, hass):
"""Initialize the data object."""
self.username = username
self.password = password
self.COMMAND_ACCESS_TOKEN = command1
self.COMMAND_ID = command2
self.COMMAND_MEMBERS = command3
self.hass = hass
self.value = None
self.mqtt_topic = mqtt_topic
self.mqtt_retain = True
self.mqtt_qos = 0
def update(self):
try:
""" Prepare and Execute Commands """
self.COMMAND_ACCESS_TOKEN = self.COMMAND_ACCESS_TOKEN.replace("USERNAME360", self.username)
self.COMMAND_ACCESS_TOKEN = self.COMMAND_ACCESS_TOKEN.replace("PASSWORD360", self.password)
access_token = self.exec_shell_command( self.COMMAND_ACCESS_TOKEN )
if access_token == None:
self.value = CONST_STATE_ERROR
return None
self.COMMAND_ID = self.COMMAND_ID.replace("ACCESS_TOKEN", access_token)
id = self.exec_shell_command( self.COMMAND_ID )
if id == None:
self.value = CONST_STATE_ERROR
return None
self.COMMAND_MEMBERS = self.COMMAND_MEMBERS.replace("ACCESS_TOKEN", access_token)
self.COMMAND_MEMBERS = self.COMMAND_MEMBERS.replace("ID", id)
payload = self.exec_shell_command( self.COMMAND_MEMBERS )
if payload != None:
self.save_payload_to_mqtt ( self.mqtt_topic, payload )
data = json.loads ( payload )
for member in data["members"]:
topic = StringBuilder()
topic.Append("owntracks/")
topic.Append(member["firstName"].lower())
topic.Append("/")
topic.Append(member["firstName"].lower())
topic = topic
msgPayload = StringBuilder()
msgPayload.Append("{")
msgPayload.Append("\"t\":\"p\"")
msgPayload.Append(",")
msgPayload.Append("\"tst\":")
msgPayload.Append(member['location']['timestamp'])
msgPayload.Append(",")
msgPayload.Append("\"acc\":")
msgPayload.Append(member['location']['accuracy'])
msgPayload.Append(",")
msgPayload.Append("\"_type\":\"location\"")
msgPayload.Append(",")
msgPayload.Append("\"alt\":\"0\"")
msgPayload.Append(",")
msgPayload.Append("\"_cp\":\"false\"")
msgPayload.Append(",")
msgPayload.Append("\"lon\":")
msgPayload.Append(member['location']['longitude'])
msgPayload.Append(",")
msgPayload.Append("\"lat\":")
msgPayload.Append(member['location']['latitude'])
msgPayload.Append(",")
msgPayload.Append("\"batt\":")
msgPayload.Append(member['location']['battery'])
msgPayload.Append(",")
if str(member['location']['wifiState']) == "1":
msgPayload.Append("\"conn\":\"w\"")
msgPayload.Append(",")
msgPayload.Append("\"vel\":")
msgPayload.Append(str(member['location']['speed']))
msgPayload.Append(",")
msgPayload.Append("\"charging\":")
msgPayload.Append(member['location']['charge'])
msgPayload.Append("}")
self.save_payload_to_mqtt ( str(topic), str(msgPayload) )
self.value = CONST_STATE_RUNNING
else:
self.value = CONST_STATE_ERROR
except Exception as e:
self.value = CONST_STATE_ERROR
def exec_shell_command( self, command ):
output = None
try:
output = subprocess.check_output( command, shell=True, timeout=50 )
output = output.strip().decode('utf-8')
except subprocess.CalledProcessError:
""" _LOGGER.error("Command failed: %s", command)"""
self.value = CONST_STATE_ERROR
output = None
except subprocess.TimeoutExpired:
""" _LOGGER.error("Timeout for command: %s", command)"""
self.value = CONST_STATE_ERROR
output = None
if output == None:
_LOGGER.error( "Life365 has not responsed well. Nothing to worry, will try again!" )
self.value = CONST_STATE_ERROR
return None
else:
return output
def save_payload_to_mqtt( self, topic, payload ):
try:
"""mqtt.async_publish ( self.hass, topic, payload, self.mqtt_qos, self.mqtt_retain )"""
_LOGGER.info("topic: %s", topic)
_LOGGER.info("payload: %s", payload)
mqtt.publish ( self.hass, topic, payload, self.mqtt_qos, self.mqtt_retain )
except:
_LOGGER.error( "Error saving Life365 data to mqtt." )
class StringBuilder:
_file_str = None
def __init__(self):
self._file_str = StringIO()
def Append(self, str):
self._file_str.write(str)
def __str__(self):
return self._file_str.getvalue()

View File

@@ -1,269 +0,0 @@
"""
@Author: Suresh Kalavala
@Date: 03/03/2018
Custom Sensor: Palo Alto device integration with Home Assistant.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.paloalto/
"""
import ssl
import logging
import urllib.request
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
import xml.etree.ElementTree as ET
from enum import Enum
from datetime import timedelta
from homeassistant.helpers.entity import Entity
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (CONF_NAME, CONF_API_KEY, CONF_IP_ADDRESS,
CONF_SSL, CONF_VERIFY_SSL,
CONF_MONITORED_CONDITIONS)
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = 'PaloAlto'
DEFAULT_SSL = False
DEFAULT_VERIFY_SSL = True
CONST_COMMAND = "COMMAND"
CONST_OPS_ENDPOINT = '/api/?type=op&cmd=COMMAND'
CONST_CONFIG_ENDPOINT = '/api/?type=config&action=get&xpath=COMMAND'
PA_OPS_ACTIVE_USERS = "<show><admins></admins></show>"
PA_CONF_SYS_INFO = "<show><system><info></info></system></show>"
PA_CONF_GP_USERS = "<show><global-protect-portal><current-user>" \
"</current-user></global-protect-portal></show>"
PA_CONF_TEMPERATURE = "<show><system><environmentals><thermal>" \
"</thermal></environmentals></system></show>"
SCAN_INTERVAL = timedelta(seconds=120)
MONITORED_CONDITIONS = {
'host_name': ['Host Name', 'x', 'mdi:fire'],
'up_time': ['Up Time', 'x', 'mdi:clock'],
'serial_no': ['Serial Number', 'x', 'mdi:counter'],
'sw_version': ['Software Version', 'x', 'mdi:counter'],
'gp_version': ['Global protect Version', 'x', 'mdi:counter'],
'logdb_version': ['LogDB Version', 'x', 'mdi:book-open'],
'operation_mode': ['Operation Mode', 'x', 'mdi:book-open'],
'core_temp': ['Core Temperature', 'x', 'mdi:oil-temperature'],
'sys_temp': ['System Temperature', 'x', 'mdi:oil-temperature'],
'gp_user_count': ['Global Protect User Count', 'vpn users', 'mdi:counter'],
'gp_users': ['Global Protect Users', 'vpn users', 'mdi:account-multiple'],
'loggedin_user_count': ['Loggedin User Count', 'users', 'mdi:counter'],
'loggedin_users': ['Loggedin Users', 'users', 'mdi:account-multiple'],
}
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Required(CONF_API_KEY): cv.string,
vol.Required(CONF_IP_ADDRESS): cv.string,
vol.Optional(CONF_SSL, default=DEFAULT_SSL): cv.boolean,
vol.Optional(CONF_MONITORED_CONDITIONS,
default=list(MONITORED_CONDITIONS)):
vol.All(cv.ensure_list, [vol.In(MONITORED_CONDITIONS)]),
vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL): cv.boolean,
})
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the Palo Alto VPN User Sensor."""
name = config.get(CONF_NAME)
host = config.get(CONF_IP_ADDRESS)
use_ssl = config.get(CONF_SSL)
verify_ssl = config.get(CONF_VERIFY_SSL)
api_key = config.get(CONF_API_KEY)
sensors = []
try:
api = PaloAltoApi(host, use_ssl, verify_ssl, api_key)
for condition in config[CONF_MONITORED_CONDITIONS]:
sensor = PaloAltoSensor(hass, api, name, condition)
sensors.append(sensor)
add_devices(sensors, True)
except Exception as err:
_LOGGER.error("Failed to setup Palo Alto Sensor. Error: " + str(err))
class PaloAltoSensor(Entity):
"""Representation of a sensor."""
def __init__(self, hass, api, name, variable):
"""Initialize the sensor."""
self._hass = hass
self._api = api
self._name = name
self._var_id = variable
variable_info = MONITORED_CONDITIONS[variable]
self._var_name = variable_info[0]
self._var_units = variable_info[1]
self._var_icon = variable_info[2]
@property
def name(self):
"""Return the name of the sensor."""
return "{} {}".format(self._name, self._var_name)
@property
def icon(self):
"""Icon to use in the frontend, if any."""
return self._var_icon
@property
def state(self):
"""Return the state of the device."""
return self._api.data[self._var_id]
@property
def available(self):
"""Could the device be accessed during the last update call."""
return self._api.available
def update(self):
"""Get the latest data and updates the state."""
self._api.update()
class PaloAltoApi(object):
"""The class for handling the data retrieval from Palo Alto Device."""
def __init__(self, host, use_ssl, verify_ssl, api_key):
"""Initialize the Palo Alto API."""
self._host = host
self._use_ssl = use_ssl
self._verify_ssl = verify_ssl
self._api_key = api_key
self._usersdata = None
self._sysinfo = None
self._gp_users = None
self._temperature = None
self.available = True
self._sensors = {}
@property
def data(self):
"""Return data."""
return self._sensors
def get_uri_scheme(self, use_ssl):
"""Return proper uril scheme based on config setting."""
return 'https://' if use_ssl else 'http://'
def get_resource(self, use_ssl, host, api_key, endpoint):
"""Prepare the URL."""
uri_scheme = self.get_uri_scheme(use_ssl)
if endpoint == EndPointType.Operational:
return "{}{}{}&key={}".format(uri_scheme, self._host,
CONST_OPS_ENDPOINT, self._api_key)
else:
return "{}{}{}&key={}".format(uri_scheme, self._host,
CONST_CONFIG_ENDPOINT, self._api_key)
def http_request(self, url):
"""HTTP request to the Palo Alto device."""
content = None
context = None
try:
if self._use_ssl and not self._verify_ssl:
context = ssl._create_unverified_context()
response = urllib.request.urlopen(url, context=context)
content = response.read()
except Exception as ex:
_LOGGER.error(str(ex))
content = None
return content
def update(self):
"""Get Operational and Configuration urls."""
ops_url = self.get_resource(self._use_ssl, self._host,
self._api_key, EndPointType.Operational)
users_url = ops_url.replace(CONST_COMMAND, PA_OPS_ACTIVE_USERS)
self._usersdata = self.http_request(users_url)
sysinfo_url = ops_url.replace(CONST_COMMAND, PA_CONF_SYS_INFO)
self._sysinfo = self.http_request(sysinfo_url)
gp_users_url = ops_url.replace(CONST_COMMAND, PA_CONF_GP_USERS)
self._gp_users = self.http_request(gp_users_url)
temperature_url = ops_url.replace(CONST_COMMAND, PA_CONF_TEMPERATURE)
self._temperature = self.http_request(temperature_url)
"""parse the xml data"""
self.parse_data()
def parse_globalprotect_users(self):
"""Parses global protect users xml."""
user_count = 0
vpn_users = []
root = ET.fromstring(self._gp_users)
nodes = root.findall('result/gp-portal-users/user')
for user in nodes:
user_count += 1
vpn_users.append(user.find('username').text)
if user_count != 0:
self._sensors["gp_users"] = ', '.join(vpn_users)
else:
self._sensors["gp_users"] = "None"
self._sensors["gp_user_count"] = user_count
def parse_temperature(self):
"""Parses environment/temperature values."""
root = ET.fromstring(self._temperature)
nodes = root.findall('result/thermal/Slot1/entry/DegreesC')
self._sensors["core_temp"] = round(float(nodes[0].text), 2)
self._sensors["sys_temp"] = round(float(nodes[1].text), 2)
def parse_system_info(self):
"""Parses System Information."""
root = ET.fromstring(self._sysinfo)
sys_node = root.findall('result/system')
self._sensors["up_time"] = sys_node[0].find('uptime').text
self._sensors["serial_no"] = sys_node[0].find('serial').text
self._sensors["host_name"] = sys_node[0].find('hostname').text
self._sensors["sw_version"] = sys_node[0].find('sw-version').text
self._sensors["logdb_version"] = sys_node[0].find(
'logdb-version').text
self._sensors["operation_mode"] = sys_node[0].find(
'operational-mode').text
self._sensors["gp_version"] = sys_node[0].find(
'global-protect-client-package-version').text
def parse_active_users(self):
"""Parses Active Users XML."""
root = ET.fromstring(self._usersdata)
nodes = root.findall('result/admins/entry')
count = 0
users = []
for item in nodes:
count += 1
users.append(item.find('admin').text)
if count > 0:
self._sensors["loggedin_users"] = ', '.join(users)
else:
self._sensors["loggedin_users"] = "None"
self._sensors["loggedin_user_count"] = count
def parse_data(self):
"""Parses data and populates sensors."""
self.parse_globalprotect_users()
self.parse_temperature()
self.parse_system_info()
self.parse_active_users()
class EndPointType(Enum):
"""Enum that indicates that type of endpoint that is."""
Operational = "operational"
Configuration = "configuration"

View File

@@ -0,0 +1,988 @@
"""Support to send and receive Telegram messages."""
from functools import partial
import importlib
import io
from ipaddress import ip_network
import logging
import requests
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
from telegram import (
Bot,
InlineKeyboardButton,
InlineKeyboardMarkup,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
)
from telegram.error import TelegramError
from telegram.parsemode import ParseMode
from telegram.utils.request import Request
import voluptuous as vol
from homeassistant.const import (
ATTR_COMMAND,
ATTR_LATITUDE,
ATTR_LONGITUDE,
CONF_API_KEY,
CONF_PLATFORM,
CONF_URL,
HTTP_BEARER_AUTHENTICATION,
HTTP_DIGEST_AUTHENTICATION,
)
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.exceptions import TemplateError
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import ConfigType
_LOGGER = logging.getLogger(__name__)
ATTR_DATA = "data"
ATTR_MESSAGE = "message"
ATTR_TITLE = "title"
ATTR_ARGS = "args"
ATTR_AUTHENTICATION = "authentication"
ATTR_CALLBACK_QUERY = "callback_query"
ATTR_CALLBACK_QUERY_ID = "callback_query_id"
ATTR_CAPTION = "caption"
ATTR_CHAT_ID = "chat_id"
ATTR_CHAT_INSTANCE = "chat_instance"
ATTR_DISABLE_NOTIF = "disable_notification"
ATTR_DISABLE_WEB_PREV = "disable_web_page_preview"
ATTR_EDITED_MSG = "edited_message"
ATTR_FILE = "file"
ATTR_FROM_FIRST = "from_first"
ATTR_FROM_LAST = "from_last"
ATTR_KEYBOARD = "keyboard"
ATTR_KEYBOARD_INLINE = "inline_keyboard"
ATTR_MESSAGEID = "message_id"
ATTR_MSG = "message"
ATTR_MSGID = "id"
ATTR_PARSER = "parse_mode"
ATTR_PASSWORD = "password"
ATTR_REPLY_TO_MSGID = "reply_to_message_id"
ATTR_REPLYMARKUP = "reply_markup"
ATTR_SHOW_ALERT = "show_alert"
ATTR_STICKER_ID = "sticker_id"
ATTR_TARGET = "target"
ATTR_TEXT = "text"
ATTR_URL = "url"
ATTR_USER_ID = "user_id"
ATTR_USERNAME = "username"
ATTR_VERIFY_SSL = "verify_ssl"
ATTR_TIMEOUT = "timeout"
ATTR_MESSAGE_TAG = "message_tag"
ATTR_CHANNEL_POST = "channel_post"
CONF_ALLOWED_CHAT_IDS = "allowed_chat_ids"
CONF_PROXY_URL = "proxy_url"
CONF_PROXY_PARAMS = "proxy_params"
CONF_TRUSTED_NETWORKS = "trusted_networks"
DOMAIN = "telegram_bot"
SERVICE_SEND_MESSAGE = "send_message"
SERVICE_SEND_PHOTO = "send_photo"
SERVICE_SEND_STICKER = "send_sticker"
SERVICE_SEND_ANIMATION = "send_animation"
SERVICE_SEND_VIDEO = "send_video"
SERVICE_SEND_VOICE = "send_voice"
SERVICE_SEND_DOCUMENT = "send_document"
SERVICE_SEND_LOCATION = "send_location"
SERVICE_EDIT_MESSAGE = "edit_message"
SERVICE_EDIT_CAPTION = "edit_caption"
SERVICE_EDIT_REPLYMARKUP = "edit_replymarkup"
SERVICE_ANSWER_CALLBACK_QUERY = "answer_callback_query"
SERVICE_DELETE_MESSAGE = "delete_message"
SERVICE_LEAVE_CHAT = "leave_chat"
EVENT_TELEGRAM_CALLBACK = "telegram_callback"
EVENT_TELEGRAM_COMMAND = "telegram_command"
EVENT_TELEGRAM_TEXT = "telegram_text"
EVENT_TELEGRAM_SENT = "telegram_sent"
PARSER_HTML = "html"
PARSER_MD = "markdown"
DEFAULT_TRUSTED_NETWORKS = [ip_network("149.154.160.0/20"), ip_network("91.108.4.0/22")]
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.All(
cv.ensure_list,
[
vol.Schema(
{
vol.Required(CONF_PLATFORM): vol.In(
("broadcast", "polling", "webhooks")
),
vol.Required(CONF_API_KEY): cv.string,
vol.Required(CONF_ALLOWED_CHAT_IDS): vol.All(
cv.ensure_list, [vol.Coerce(int)]
),
vol.Optional(ATTR_PARSER, default=PARSER_MD): cv.string,
vol.Optional(CONF_PROXY_URL): cv.string,
vol.Optional(CONF_PROXY_PARAMS): dict,
# webhooks
vol.Optional(CONF_URL): cv.url,
vol.Optional(
CONF_TRUSTED_NETWORKS, default=DEFAULT_TRUSTED_NETWORKS
): vol.All(cv.ensure_list, [ip_network]),
}
)
],
)
},
extra=vol.ALLOW_EXTRA,
)
BASE_SERVICE_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_TARGET): vol.All(cv.ensure_list, [vol.Coerce(int)]),
vol.Optional(ATTR_PARSER): cv.string,
vol.Optional(ATTR_DISABLE_NOTIF): cv.boolean,
vol.Optional(ATTR_DISABLE_WEB_PREV): cv.boolean,
vol.Optional(ATTR_KEYBOARD): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(ATTR_KEYBOARD_INLINE): cv.ensure_list,
vol.Optional(ATTR_TIMEOUT): cv.positive_int,
vol.Optional(ATTR_MESSAGE_TAG): cv.string,
},
extra=vol.ALLOW_EXTRA,
)
SERVICE_SCHEMA_SEND_MESSAGE = BASE_SERVICE_SCHEMA.extend(
{vol.Required(ATTR_MESSAGE): cv.template, vol.Optional(ATTR_TITLE): cv.template}
)
SERVICE_SCHEMA_SEND_FILE = BASE_SERVICE_SCHEMA.extend(
{
vol.Optional(ATTR_URL): cv.template,
vol.Optional(ATTR_FILE): cv.template,
vol.Optional(ATTR_CAPTION): cv.template,
vol.Optional(ATTR_USERNAME): cv.string,
vol.Optional(ATTR_PASSWORD): cv.string,
vol.Optional(ATTR_AUTHENTICATION): cv.string,
vol.Optional(ATTR_VERIFY_SSL): cv.boolean,
}
)
SERVICE_SCHEMA_SEND_STICKER = SERVICE_SCHEMA_SEND_FILE.extend(
{vol.Optional(ATTR_STICKER_ID): cv.string}
)
SERVICE_SCHEMA_SEND_LOCATION = BASE_SERVICE_SCHEMA.extend(
{
vol.Required(ATTR_LONGITUDE): cv.template,
vol.Required(ATTR_LATITUDE): cv.template,
}
)
SERVICE_SCHEMA_EDIT_MESSAGE = SERVICE_SCHEMA_SEND_MESSAGE.extend(
{
vol.Required(ATTR_MESSAGEID): vol.Any(
cv.positive_int, vol.All(cv.string, "last")
),
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
}
)
SERVICE_SCHEMA_EDIT_CAPTION = vol.Schema(
{
vol.Required(ATTR_MESSAGEID): vol.Any(
cv.positive_int, vol.All(cv.string, "last")
),
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
vol.Required(ATTR_CAPTION): cv.template,
vol.Optional(ATTR_KEYBOARD_INLINE): cv.ensure_list,
},
extra=vol.ALLOW_EXTRA,
)
SERVICE_SCHEMA_EDIT_REPLYMARKUP = vol.Schema(
{
vol.Required(ATTR_MESSAGEID): vol.Any(
cv.positive_int, vol.All(cv.string, "last")
),
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
vol.Required(ATTR_KEYBOARD_INLINE): cv.ensure_list,
},
extra=vol.ALLOW_EXTRA,
)
SERVICE_SCHEMA_ANSWER_CALLBACK_QUERY = vol.Schema(
{
vol.Required(ATTR_MESSAGE): cv.template,
vol.Required(ATTR_CALLBACK_QUERY_ID): vol.Coerce(int),
vol.Optional(ATTR_SHOW_ALERT): cv.boolean,
},
extra=vol.ALLOW_EXTRA,
)
SERVICE_SCHEMA_DELETE_MESSAGE = vol.Schema(
{
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
vol.Required(ATTR_MESSAGEID): vol.Any(
cv.positive_int, vol.All(cv.string, "last")
),
},
extra=vol.ALLOW_EXTRA,
)
SERVICE_SCHEMA_LEAVE_CHAT = vol.Schema({vol.Required(ATTR_CHAT_ID): vol.Coerce(int)})
SERVICE_MAP = {
SERVICE_SEND_MESSAGE: SERVICE_SCHEMA_SEND_MESSAGE,
SERVICE_SEND_PHOTO: SERVICE_SCHEMA_SEND_FILE,
SERVICE_SEND_STICKER: SERVICE_SCHEMA_SEND_STICKER,
SERVICE_SEND_ANIMATION: SERVICE_SCHEMA_SEND_FILE,
SERVICE_SEND_VIDEO: SERVICE_SCHEMA_SEND_FILE,
SERVICE_SEND_VOICE: SERVICE_SCHEMA_SEND_FILE,
SERVICE_SEND_DOCUMENT: SERVICE_SCHEMA_SEND_FILE,
SERVICE_SEND_LOCATION: SERVICE_SCHEMA_SEND_LOCATION,
SERVICE_EDIT_MESSAGE: SERVICE_SCHEMA_EDIT_MESSAGE,
SERVICE_EDIT_CAPTION: SERVICE_SCHEMA_EDIT_CAPTION,
SERVICE_EDIT_REPLYMARKUP: SERVICE_SCHEMA_EDIT_REPLYMARKUP,
SERVICE_ANSWER_CALLBACK_QUERY: SERVICE_SCHEMA_ANSWER_CALLBACK_QUERY,
SERVICE_DELETE_MESSAGE: SERVICE_SCHEMA_DELETE_MESSAGE,
SERVICE_LEAVE_CHAT: SERVICE_SCHEMA_LEAVE_CHAT,
}
def load_data(
hass,
url=None,
filepath=None,
username=None,
password=None,
authentication=None,
num_retries=5,
verify_ssl=None,
):
"""Load data into ByteIO/File container from a source."""
try:
if url is not None:
# Load data from URL
params = {"timeout": 15}
if authentication == HTTP_BEARER_AUTHENTICATION and password is not None:
params["headers"] = {"Authorization": f"Bearer {password}"}
elif username is not None and password is not None:
if authentication == HTTP_DIGEST_AUTHENTICATION:
params["auth"] = HTTPDigestAuth(username, password)
else:
params["auth"] = HTTPBasicAuth(username, password)
if verify_ssl is not None:
params["verify"] = verify_ssl
retry_num = 0
while retry_num < num_retries:
req = requests.get(url, **params)
if not req.ok:
_LOGGER.warning(
"Status code %s (retry #%s) loading %s",
req.status_code,
retry_num + 1,
url,
)
else:
data = io.BytesIO(req.content)
if data.read():
data.seek(0)
data.name = url
return data
_LOGGER.warning("Empty data (retry #%s) in %s)", retry_num + 1, url)
retry_num += 1
_LOGGER.warning("Can't load data in %s after %s retries", url, retry_num)
elif filepath is not None:
if hass.config.is_allowed_path(filepath):
return open(filepath, "rb")
_LOGGER.warning("'%s' are not secure to load data from!", filepath)
else:
_LOGGER.warning("Can't load data. No data found in params!")
except (OSError, TypeError) as error:
_LOGGER.error("Can't load data into ByteIO: %s", error)
return None
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Telegram bot component."""
if not config[DOMAIN]:
return False
for p_config in config[DOMAIN]:
p_type = p_config.get(CONF_PLATFORM)
platform = importlib.import_module(f".{p_config[CONF_PLATFORM]}", __name__)
_LOGGER.info("Setting up %s.%s", DOMAIN, p_type)
try:
receiver_service = await platform.async_setup_platform(hass, p_config)
if receiver_service is False:
_LOGGER.error("Failed to initialize Telegram bot %s", p_type)
return False
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Error setting up platform %s", p_type)
return False
bot = initialize_bot(p_config)
notify_service = TelegramNotificationService(
hass, bot, p_config.get(CONF_ALLOWED_CHAT_IDS), p_config.get(ATTR_PARSER)
)
async def async_send_telegram_message(service: ServiceCall) -> None:
"""Handle sending Telegram Bot message service calls."""
def _render_template_attr(data, attribute):
if attribute_templ := data.get(attribute):
if any(
isinstance(attribute_templ, vtype) for vtype in (float, int, str)
):
data[attribute] = attribute_templ
else:
attribute_templ.hass = hass
try:
data[attribute] = attribute_templ.async_render(
parse_result=False
)
except TemplateError as exc:
_LOGGER.error(
"TemplateError in %s: %s -> %s",
attribute,
attribute_templ.template,
exc,
)
data[attribute] = attribute_templ.template
msgtype = service.service
kwargs = dict(service.data)
for attribute in (
ATTR_MESSAGE,
ATTR_TITLE,
ATTR_URL,
ATTR_FILE,
ATTR_CAPTION,
ATTR_LONGITUDE,
ATTR_LATITUDE,
):
_render_template_attr(kwargs, attribute)
_LOGGER.debug("New telegram message %s: %s", msgtype, kwargs)
if msgtype == SERVICE_SEND_MESSAGE:
await hass.async_add_executor_job(
partial(notify_service.send_message, **kwargs)
)
elif msgtype in [
SERVICE_SEND_PHOTO,
SERVICE_SEND_ANIMATION,
SERVICE_SEND_VIDEO,
SERVICE_SEND_VOICE,
SERVICE_SEND_DOCUMENT,
]:
await hass.async_add_executor_job(
partial(notify_service.send_file, msgtype, **kwargs)
)
elif msgtype == SERVICE_SEND_STICKER:
await hass.async_add_executor_job(
partial(notify_service.send_sticker, **kwargs)
)
elif msgtype == SERVICE_SEND_LOCATION:
await hass.async_add_executor_job(
partial(notify_service.send_location, **kwargs)
)
elif msgtype == SERVICE_ANSWER_CALLBACK_QUERY:
await hass.async_add_executor_job(
partial(notify_service.answer_callback_query, **kwargs)
)
elif msgtype == SERVICE_DELETE_MESSAGE:
await hass.async_add_executor_job(
partial(notify_service.delete_message, **kwargs)
)
else:
await hass.async_add_executor_job(
partial(notify_service.edit_message, msgtype, **kwargs)
)
# Register notification services
for service_notif, schema in SERVICE_MAP.items():
hass.services.async_register(
DOMAIN, service_notif, async_send_telegram_message, schema=schema
)
return True
def initialize_bot(p_config):
"""Initialize telegram bot with proxy support."""
api_key = p_config.get(CONF_API_KEY)
proxy_url = p_config.get(CONF_PROXY_URL)
proxy_params = p_config.get(CONF_PROXY_PARAMS)
if proxy_url is not None:
request = Request(
con_pool_size=8, proxy_url=proxy_url, urllib3_proxy_kwargs=proxy_params
)
else:
request = Request(con_pool_size=8)
return Bot(token=api_key, request=request)
class TelegramNotificationService:
"""Implement the notification services for the Telegram Bot domain."""
def __init__(self, hass, bot, allowed_chat_ids, parser):
"""Initialize the service."""
self.allowed_chat_ids = allowed_chat_ids
self._default_user = self.allowed_chat_ids[0]
self._last_message_id = {user: None for user in self.allowed_chat_ids}
self._parsers = {PARSER_HTML: ParseMode.HTML, PARSER_MD: ParseMode.MARKDOWN}
self._parse_mode = self._parsers.get(parser)
self.bot = bot
self.hass = hass
def _get_msg_ids(self, msg_data, chat_id):
"""Get the message id to edit.
This can be one of (message_id, inline_message_id) from a msg dict,
returning a tuple.
**You can use 'last' as message_id** to edit
the message last sent in the chat_id.
"""
message_id = inline_message_id = None
if ATTR_MESSAGEID in msg_data:
message_id = msg_data[ATTR_MESSAGEID]
if (
isinstance(message_id, str)
and (message_id == "last")
and (self._last_message_id[chat_id] is not None)
):
message_id = self._last_message_id[chat_id]
else:
inline_message_id = msg_data["inline_message_id"]
return message_id, inline_message_id
def _get_target_chat_ids(self, target):
"""Validate chat_id targets or return default target (first).
:param target: optional list of integers ([12234, -12345])
:return list of chat_id targets (integers)
"""
if target is not None:
if isinstance(target, int):
target = [target]
chat_ids = [t for t in target if t in self.allowed_chat_ids]
if chat_ids:
return chat_ids
_LOGGER.warning(
"Disallowed targets: %s, using default: %s", target, self._default_user
)
return [self._default_user]
def _get_msg_kwargs(self, data):
"""Get parameters in message data kwargs."""
def _make_row_inline_keyboard(row_keyboard):
"""Make a list of InlineKeyboardButtons.
It can accept:
- a list of tuples like:
`[(text_b1, data_callback_b1),
(text_b2, data_callback_b2), ...]
- a string like: `/cmd1, /cmd2, /cmd3`
- or a string like: `text_b1:/cmd1, text_b2:/cmd2`
"""
buttons = []
if isinstance(row_keyboard, str):
for key in row_keyboard.split(","):
if ":/" in key:
# commands like: 'Label:/cmd' become ('Label', '/cmd')
label = key.split(":/")[0]
command = key[len(label) + 1 :]
buttons.append(
InlineKeyboardButton(label, callback_data=command)
)
else:
# commands like: '/cmd' become ('CMD', '/cmd')
label = key.strip()[1:].upper()
buttons.append(InlineKeyboardButton(label, callback_data=key))
elif isinstance(row_keyboard, list):
for entry in row_keyboard:
text_btn, data_btn = entry
buttons.append(
InlineKeyboardButton(text_btn, callback_data=data_btn)
)
else:
raise ValueError(str(row_keyboard))
return buttons
# Defaults
params = {
ATTR_PARSER: self._parse_mode,
ATTR_DISABLE_NOTIF: False,
ATTR_DISABLE_WEB_PREV: None,
ATTR_REPLY_TO_MSGID: None,
ATTR_REPLYMARKUP: None,
ATTR_TIMEOUT: None,
ATTR_MESSAGE_TAG: None,
}
if data is not None:
if ATTR_PARSER in data:
params[ATTR_PARSER] = self._parsers.get(
data[ATTR_PARSER], self._parse_mode
)
if ATTR_TIMEOUT in data:
params[ATTR_TIMEOUT] = data[ATTR_TIMEOUT]
if ATTR_DISABLE_NOTIF in data:
params[ATTR_DISABLE_NOTIF] = data[ATTR_DISABLE_NOTIF]
if ATTR_DISABLE_WEB_PREV in data:
params[ATTR_DISABLE_WEB_PREV] = data[ATTR_DISABLE_WEB_PREV]
if ATTR_REPLY_TO_MSGID in data:
params[ATTR_REPLY_TO_MSGID] = data[ATTR_REPLY_TO_MSGID]
if ATTR_MESSAGE_TAG in data:
params[ATTR_MESSAGE_TAG] = data[ATTR_MESSAGE_TAG]
# Keyboards:
if ATTR_KEYBOARD in data:
keys = data.get(ATTR_KEYBOARD)
keys = keys if isinstance(keys, list) else [keys]
if keys:
params[ATTR_REPLYMARKUP] = ReplyKeyboardMarkup(
[[key.strip() for key in row.split(",")] for row in keys]
)
else:
params[ATTR_REPLYMARKUP] = ReplyKeyboardRemove(True)
elif ATTR_KEYBOARD_INLINE in data:
keys = data.get(ATTR_KEYBOARD_INLINE)
keys = keys if isinstance(keys, list) else [keys]
params[ATTR_REPLYMARKUP] = InlineKeyboardMarkup(
[_make_row_inline_keyboard(row) for row in keys]
)
return params
def _send_msg(self, func_send, msg_error, message_tag, *args_msg, **kwargs_msg):
"""Send one message."""
try:
out = func_send(*args_msg, **kwargs_msg)
if not isinstance(out, bool) and hasattr(out, ATTR_MESSAGEID):
chat_id = out.chat_id
message_id = out[ATTR_MESSAGEID]
self._last_message_id[chat_id] = message_id
_LOGGER.debug(
"Last message ID: %s (from chat_id %s)",
self._last_message_id,
chat_id,
)
event_data = {
ATTR_CHAT_ID: chat_id,
ATTR_MESSAGEID: message_id,
}
if message_tag is not None:
event_data[ATTR_MESSAGE_TAG] = message_tag
self.hass.bus.fire(EVENT_TELEGRAM_SENT, event_data)
elif not isinstance(out, bool):
_LOGGER.warning(
"Update last message: out_type:%s, out=%s", type(out), out
)
return out
except TelegramError as exc:
_LOGGER.error(
"%s: %s. Args: %s, kwargs: %s", msg_error, exc, args_msg, kwargs_msg
)
def send_message(self, message="", target=None, **kwargs):
"""Send a message to one or multiple pre-allowed chat IDs."""
title = kwargs.get(ATTR_TITLE)
text = f"{title}\n{message}" if title else message
params = self._get_msg_kwargs(kwargs)
for chat_id in self._get_target_chat_ids(target):
_LOGGER.debug("Send message in chat ID %s with params: %s", chat_id, params)
self._send_msg(
self.bot.send_message,
"Error sending message",
params[ATTR_MESSAGE_TAG],
chat_id,
text,
parse_mode=params[ATTR_PARSER],
disable_web_page_preview=params[ATTR_DISABLE_WEB_PREV],
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_to_message_id=params[ATTR_REPLY_TO_MSGID],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
)
def delete_message(self, chat_id=None, **kwargs):
"""Delete a previously sent message."""
chat_id = self._get_target_chat_ids(chat_id)[0]
message_id, _ = self._get_msg_ids(kwargs, chat_id)
_LOGGER.debug("Delete message %s in chat ID %s", message_id, chat_id)
deleted = self._send_msg(
self.bot.delete_message, "Error deleting message", None, chat_id, message_id
)
# reduce message_id anyway:
if self._last_message_id[chat_id] is not None:
# change last msg_id for deque(n_msgs)?
self._last_message_id[chat_id] -= 1
return deleted
def edit_message(self, type_edit, chat_id=None, **kwargs):
"""Edit a previously sent message."""
chat_id = self._get_target_chat_ids(chat_id)[0]
message_id, inline_message_id = self._get_msg_ids(kwargs, chat_id)
params = self._get_msg_kwargs(kwargs)
_LOGGER.debug(
"Edit message %s in chat ID %s with params: %s",
message_id or inline_message_id,
chat_id,
params,
)
if type_edit == SERVICE_EDIT_MESSAGE:
message = kwargs.get(ATTR_MESSAGE)
title = kwargs.get(ATTR_TITLE)
text = f"{title}\n{message}" if title else message
_LOGGER.debug("Editing message with ID %s", message_id or inline_message_id)
return self._send_msg(
self.bot.edit_message_text,
"Error editing text message",
params[ATTR_MESSAGE_TAG],
text,
chat_id=chat_id,
message_id=message_id,
inline_message_id=inline_message_id,
parse_mode=params[ATTR_PARSER],
disable_web_page_preview=params[ATTR_DISABLE_WEB_PREV],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
)
if type_edit == SERVICE_EDIT_CAPTION:
return self._send_msg(
self.bot.edit_message_caption,
"Error editing message attributes",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
message_id=message_id,
inline_message_id=inline_message_id,
caption=kwargs.get(ATTR_CAPTION),
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
)
return self._send_msg(
self.bot.edit_message_reply_markup,
"Error editing message attributes",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
message_id=message_id,
inline_message_id=inline_message_id,
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
)
def answer_callback_query(
self, message, callback_query_id, show_alert=False, **kwargs
):
"""Answer a callback originated with a press in an inline keyboard."""
params = self._get_msg_kwargs(kwargs)
_LOGGER.debug(
"Answer callback query with callback ID %s: %s, alert: %s",
callback_query_id,
message,
show_alert,
)
self._send_msg(
self.bot.answer_callback_query,
"Error sending answer callback query",
params[ATTR_MESSAGE_TAG],
callback_query_id,
text=message,
show_alert=show_alert,
timeout=params[ATTR_TIMEOUT],
)
def send_file(self, file_type=SERVICE_SEND_PHOTO, target=None, **kwargs):
"""Send a photo, sticker, video, or document."""
params = self._get_msg_kwargs(kwargs)
file_content = load_data(
self.hass,
url=kwargs.get(ATTR_URL),
filepath=kwargs.get(ATTR_FILE),
username=kwargs.get(ATTR_USERNAME),
password=kwargs.get(ATTR_PASSWORD),
authentication=kwargs.get(ATTR_AUTHENTICATION),
verify_ssl=kwargs.get(ATTR_VERIFY_SSL),
)
if file_content:
for chat_id in self._get_target_chat_ids(target):
_LOGGER.debug("Sending file to chat ID %s", chat_id)
if file_type == SERVICE_SEND_PHOTO:
self._send_msg(
self.bot.send_photo,
"Error sending photo",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
photo=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
)
elif file_type == SERVICE_SEND_STICKER:
self._send_msg(
self.bot.send_sticker,
"Error sending sticker",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
sticker=file_content,
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
)
elif file_type == SERVICE_SEND_VIDEO:
self._send_msg(
self.bot.send_video,
"Error sending video",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
video=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
)
elif file_type == SERVICE_SEND_DOCUMENT:
self._send_msg(
self.bot.send_document,
"Error sending document",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
document=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
)
elif file_type == SERVICE_SEND_VOICE:
self._send_msg(
self.bot.send_voice,
"Error sending voice",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
voice=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
)
elif file_type == SERVICE_SEND_ANIMATION:
self._send_msg(
self.bot.send_animation,
"Error sending animation",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
animation=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
)
file_content.seek(0)
else:
_LOGGER.error("Can't send file with kwargs: %s", kwargs)
def send_sticker(self, target=None, **kwargs):
"""Send a sticker from a telegram sticker pack."""
params = self._get_msg_kwargs(kwargs)
stickerid = kwargs.get(ATTR_STICKER_ID)
if stickerid:
for chat_id in self._get_target_chat_ids(target):
self._send_msg(
self.bot.send_sticker,
"Error sending sticker",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
sticker=stickerid,
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_markup=params[ATTR_REPLYMARKUP],
timeout=params[ATTR_TIMEOUT],
)
else:
self.send_file(SERVICE_SEND_STICKER, target, **kwargs)
def send_location(self, latitude, longitude, target=None, **kwargs):
"""Send a location."""
latitude = float(latitude)
longitude = float(longitude)
params = self._get_msg_kwargs(kwargs)
for chat_id in self._get_target_chat_ids(target):
_LOGGER.debug(
"Send location %s/%s to chat ID %s", latitude, longitude, chat_id
)
self._send_msg(
self.bot.send_location,
"Error sending location",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
latitude=latitude,
longitude=longitude,
disable_notification=params[ATTR_DISABLE_NOTIF],
timeout=params[ATTR_TIMEOUT],
)
def leave_chat(self, chat_id=None):
"""Remove bot from chat."""
chat_id = self._get_target_chat_ids(chat_id)[0]
_LOGGER.debug("Leave from chat ID %s", chat_id)
leaved = self._send_msg(
self.bot.leave_chat, "Error leaving chat", None, chat_id
)
return leaved
class BaseTelegramBotEntity:
"""The base class for the telegram bot."""
def __init__(self, hass, allowed_chat_ids):
"""Initialize the bot base class."""
self.allowed_chat_ids = allowed_chat_ids
self.hass = hass
def _get_message_data(self, msg_data):
"""Return boolean msg_data_is_ok and dict msg_data."""
if not msg_data:
return False, None
bad_fields = (
"text" not in msg_data and "data" not in msg_data and "chat" not in msg_data
)
if bad_fields or "from" not in msg_data:
# Message is not correct.
_LOGGER.error("Incoming message does not have required data (%s)", msg_data)
return False, None
if (
msg_data["from"].get("id") not in self.allowed_chat_ids
and msg_data["chat"].get("id") not in self.allowed_chat_ids
# and msg_data["message"]["chat"].get("id") not in self.allowed_chat_ids
):
# Neither from id nor chat id was in allowed_chat_ids,
# origin is not allowed.
_LOGGER.error("Incoming message is not allowed (%s)", msg_data)
return True, None
data = {
ATTR_USER_ID: msg_data["from"]["id"],
ATTR_FROM_FIRST: msg_data["from"]["first_name"],
}
if "message_id" in msg_data:
data[ATTR_MSGID] = msg_data["message_id"]
if "last_name" in msg_data["from"]:
data[ATTR_FROM_LAST] = msg_data["from"]["last_name"]
if "chat" in msg_data:
data[ATTR_CHAT_ID] = msg_data["chat"]["id"]
elif ATTR_MESSAGE in msg_data and "chat" in msg_data[ATTR_MESSAGE]:
data[ATTR_CHAT_ID] = msg_data[ATTR_MESSAGE]["chat"]["id"]
return True, data
def _get_channel_post_data(self, msg_data):
"""Return boolean msg_data_is_ok and dict msg_data."""
if not msg_data:
return False, None
if "sender_chat" in msg_data and "chat" in msg_data and "text" in msg_data:
if (
msg_data["sender_chat"].get("id") not in self.allowed_chat_ids
and msg_data["chat"].get("id") not in self.allowed_chat_ids
):
# Neither sender_chat id nor chat id was in allowed_chat_ids,
# origin is not allowed.
_LOGGER.error("Incoming message is not allowed (%s)", msg_data)
return True, None
data = {
ATTR_MSGID: msg_data["message_id"],
ATTR_CHAT_ID: msg_data["chat"]["id"],
ATTR_TEXT: msg_data["text"],
}
return True, data
_LOGGER.error("Incoming message does not have required data (%s)", msg_data)
return False, None
def process_message(self, data):
"""Check for basic message rules and fire an event if message is ok."""
if ATTR_MSG in data or ATTR_EDITED_MSG in data:
event = EVENT_TELEGRAM_COMMAND
if ATTR_MSG in data:
data = data.get(ATTR_MSG)
else:
data = data.get(ATTR_EDITED_MSG)
message_ok, event_data = self._get_message_data(data)
if event_data is None:
return message_ok
if ATTR_MSGID in data:
event_data[ATTR_MSGID] = data[ATTR_MSGID]
if "text" in data:
if data["text"][0] == "/":
pieces = data["text"].split(" ")
event_data[ATTR_COMMAND] = pieces[0]
event_data[ATTR_ARGS] = pieces[1:]
else:
event_data[ATTR_TEXT] = data["text"]
event = EVENT_TELEGRAM_TEXT
else:
_LOGGER.warning("Message without text data received: %s", data)
event_data[ATTR_TEXT] = str(data)
event = EVENT_TELEGRAM_TEXT
self.hass.bus.async_fire(event, event_data)
return True
if ATTR_CALLBACK_QUERY in data:
event = EVENT_TELEGRAM_CALLBACK
data = data.get(ATTR_CALLBACK_QUERY)
message_ok, event_data = self._get_message_data(data)
if event_data is None:
return message_ok
query_data = event_data[ATTR_DATA] = data[ATTR_DATA]
if query_data[0] == "/":
pieces = query_data.split(" ")
event_data[ATTR_COMMAND] = pieces[0]
event_data[ATTR_ARGS] = pieces[1:]
event_data[ATTR_MSG] = data[ATTR_MSG]
event_data[ATTR_CHAT_INSTANCE] = data[ATTR_CHAT_INSTANCE]
event_data[ATTR_MSGID] = data[ATTR_MSGID]
self.hass.bus.async_fire(event, event_data)
return True
if ATTR_CHANNEL_POST in data:
event = EVENT_TELEGRAM_TEXT
data = data.get(ATTR_CHANNEL_POST)
message_ok, event_data = self._get_channel_post_data(data)
if event_data is None:
return message_ok
self.hass.bus.async_fire(event, event_data)
return True
_LOGGER.warning("Message with unknown data received: %s", data)
return True

View File

@@ -0,0 +1,17 @@
"""Support for Telegram bot to send messages only."""
import logging
from . import initialize_bot
_LOGGER = logging.getLogger(__name__)
async def async_setup_platform(hass, config):
"""Set up the Telegram broadcast platform."""
bot = initialize_bot(config)
bot_config = await hass.async_add_executor_job(bot.getMe)
_LOGGER.debug(
"Telegram broadcast platform setup with bot %s", bot_config["username"]
)
return True

View File

@@ -0,0 +1,11 @@
{
"domain": "telegram_bot",
"name": "Telegram bot",
"version": "1.0",
"documentation": "https://www.home-assistant.io/integrations/telegram_bot",
"requirements": ["python-telegram-bot==13.1", "PySocks==1.7.1"],
"dependencies": ["http"],
"codeowners": [],
"iot_class": "cloud_push",
"loggers": ["telegram"]
}

View File

@@ -0,0 +1,99 @@
"""Support for Telegram bot using polling."""
import logging
from telegram import Update
from telegram.error import NetworkError, RetryAfter, TelegramError, TimedOut
from telegram.ext import CallbackContext, Dispatcher, Handler, Updater
from telegram.utils.types import HandlerArg
from homeassistant.const import EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP
from . import CONF_ALLOWED_CHAT_IDS, BaseTelegramBotEntity, initialize_bot
_LOGGER = logging.getLogger(__name__)
async def async_setup_platform(hass, config):
"""Set up the Telegram polling platform."""
bot = initialize_bot(config)
pol = TelegramPoll(bot, hass, config[CONF_ALLOWED_CHAT_IDS])
def _start_bot(_event):
"""Start the bot."""
pol.start_polling()
def _stop_bot(_event):
"""Stop the bot."""
pol.stop_polling()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, _start_bot)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _stop_bot)
return True
def process_error(update: Update, context: CallbackContext):
"""Telegram bot error handler."""
try:
raise context.error
except (TimedOut, NetworkError, RetryAfter):
# Long polling timeout or connection problem. Nothing serious.
pass
except TelegramError:
_LOGGER.error('Update "%s" caused error: "%s"', update, context.error)
def message_handler(handler):
"""Create messages handler."""
class MessageHandler(Handler):
"""Telegram bot message handler."""
def __init__(self):
"""Initialize the messages handler instance."""
super().__init__(handler)
def check_update(self, update):
"""Check is update valid."""
return isinstance(update, Update)
def handle_update(
self,
update: HandlerArg,
dispatcher: Dispatcher,
check_result: object,
context: CallbackContext = None,
):
"""Handle update."""
optional_args = self.collect_optional_args(dispatcher, update)
context.args = optional_args
return self.callback(update, context)
return MessageHandler()
class TelegramPoll(BaseTelegramBotEntity):
"""Asyncio telegram incoming message handler."""
def __init__(self, bot, hass, allowed_chat_ids):
"""Initialize the polling instance."""
BaseTelegramBotEntity.__init__(self, hass, allowed_chat_ids)
self.updater = Updater(bot=bot, workers=4)
self.dispatcher = self.updater.dispatcher
self.dispatcher.add_handler(message_handler(self.process_update))
self.dispatcher.add_error_handler(process_error)
def start_polling(self):
"""Start the polling task."""
self.updater.start_polling()
def stop_polling(self):
"""Stop the polling task."""
self.updater.stop()
def process_update(self, update: HandlerArg, context: CallbackContext):
"""Process incoming message."""
self.process_message(update.to_dict())

View File

@@ -0,0 +1,840 @@
# Describes the format for available Telegram bot services
send_message:
name: Send message
description: Send a notification.
fields:
message:
name: Message
description: Message body of the notification.
required: true
example: The garage door has been open for 10 minutes.
selector:
text:
title:
name: Title
description: Optional title for your notification. Will be composed as '%title\n%message'
example: "Your Garage Door Friend"
selector:
text:
target:
name: Target
description: An array of pre-authorized chat_ids to send the notification to. If not present, first allowed chat_id is the default.
example: "[12345, 67890] or 12345"
selector:
object:
parse_mode:
name: Parse mode
description: "Parser for the message text."
selector:
select:
options:
- "html"
- "markdown"
- "markdown2"
disable_notification:
name: Disable notification
description: Sends the message silently. iOS users and Web users will not receive a notification, Android users will receive a notification with no sound.
selector:
boolean:
disable_web_page_preview:
name: Disable web page preview
description: Disables link previews for links in the message.
selector:
boolean:
timeout:
name: Timeout
description: Timeout for send message. Will help with timeout errors (poor internet connection, etc)s
selector:
number:
min: 1
max: 3600
unit_of_measurement: seconds
keyboard:
name: Keyboard
description: List of rows of commands, comma-separated, to make a custom keyboard. Empty list clears a previously set keyboard.
example: '["/command1, /command2", "/command3"]'
selector:
object:
inline_keyboard:
name: Inline keyboard
description: List of rows of commands, comma-separated, to make a custom inline keyboard with buttons with associated callback data.
example: '["/button1, /button2", "/button3"] or ["Text button1:/button1, Text button2:/button2", "Text button3:/button3"] or [[["Text button1", "/button1"], ["Text button2", "/button2"]], [["Text button3", "/button3"]]]'
selector:
object:
message_tag:
name: Message tag
description: "Tag for sent message. In telegram_sent event data: {{trigger.event.data.message_tag}}"
example: "msg_to_edit"
selector:
text:
send_photo:
name: Send photo
description: Send a photo.
fields:
url:
name: URL
description: Remote path to an image.
example: "http://example.org/path/to/the/image.png"
selector:
text:
file:
name: File
description: Local path to an image.
example: "/path/to/the/image.png"
selector:
text:
caption:
name: Caption
description: The title of the image.
example: "My image"
selector:
text:
username:
name: Username
description: Username for a URL which require HTTP authentication.
example: myuser
selector:
text:
password:
name: Password
description: Password (or bearer token) for a URL which require HTTP authentication.
example: myuser_pwd
selector:
text:
authentication:
name: Authentication method
description: Define which authentication method to use. Set to `digest` to use HTTP digest authentication, or `bearer_token` for OAuth 2.0 bearer token authentication. Defaults to `basic`.
default: digest
selector:
select:
options:
- "digest"
- "bearer_token"
target:
name: Target
description: An array of pre-authorized chat_ids to send the document to. If not present, first allowed chat_id is the default.
example: "[12345, 67890] or 12345"
selector:
object:
parse_mode:
name: Parse mode
description: "Parser for the message text."
selector:
select:
options:
- "html"
- "markdown"
- "markdown2"
disable_notification:
name: Disable notification
description: Sends the message silently. iOS users and Web users will not receive a notification, Android users will receive a notification with no sound.
selector:
boolean:
verify_ssl:
name: Verify SSL
description: Enable or disable SSL certificate verification. Set to false if you're downloading the file from a URL and you don't want to validate the SSL certificate of the server.
selector:
boolean:
timeout:
name: Timeout
description: Timeout for send photo. Will help with timeout errors (poor internet connection, etc)
selector:
number:
min: 1
max: 3600
unit_of_measurement: seconds
keyboard:
name: Keyboard
description: List of rows of commands, comma-separated, to make a custom keyboard.
example: '["/command1, /command2", "/command3"]'
selector:
object:
inline_keyboard:
name: Inline keyboard
description: List of rows of commands, comma-separated, to make a custom inline keyboard with buttons with associated callback data.
example: '["/button1, /button2", "/button3"] or [[["Text button1", "/button1"], ["Text button2", "/button2"]], [["Text button3", "/button3"]]]'
selector:
object:
message_tag:
name: Message tag
description: "Tag for sent message. In telegram_sent event data: {{trigger.event.data.message_tag}}"
example: "msg_to_edit"
selector:
text:
send_sticker:
name: Send sticker
description: Send a sticker.
fields:
url:
name: URL
description: Remote path to a static .webp or animated .tgs sticker.
example: "http://example.org/path/to/the/sticker.webp"
selector:
text:
file:
name: File
description: Local path to a static .webp or animated .tgs sticker.
example: "/path/to/the/sticker.webp"
selector:
text:
sticker_id:
name: Sticker ID
description: ID of a sticker that exists on telegram servers
example: CAACAgIAAxkBAAEDDldhZD-hqWclr6krLq-FWSfCrGNmOQAC9gAD9HsZAAFeYY-ltPYnrCEE
selector:
text:
username:
name: Username
description: Username for a URL which require HTTP authentication.
example: myuser
selector:
text:
password:
name: Password
description: Password (or bearer token) for a URL which require HTTP authentication.
example: myuser_pwd
selector:
text:
authentication:
name: Authentication method
description: Define which authentication method to use. Set to `digest` to use HTTP digest authentication, or `bearer_token` for OAuth 2.0 bearer token authentication. Defaults to `basic`.
default: digest
selector:
select:
options:
- "digest"
- "bearer_token"
target:
name: Target
description: An array of pre-authorized chat_ids to send the document to. If not present, first allowed chat_id is the default.
example: "[12345, 67890] or 12345"
selector:
object:
disable_notification:
name: Disable notification
description: Sends the message silently. iOS users and Web users will not receive a notification, Android users will receive a notification with no sound.
selector:
boolean:
verify_ssl:
name: Verify SSL
description: Enable or disable SSL certificate verification. Set to false if you're downloading the file from a URL and you don't want to validate the SSL certificate of the server.
selector:
boolean:
timeout:
name: Timeout
description: Timeout for send sticker. Will help with timeout errors (poor internet connection, etc)
selector:
number:
min: 1
max: 3600
unit_of_measurement: seconds
keyboard:
name: Keyboard
description: List of rows of commands, comma-separated, to make a custom keyboard.
example: '["/command1, /command2", "/command3"]'
selector:
object:
inline_keyboard:
name: Inline keyboard
description: List of rows of commands, comma-separated, to make a custom inline keyboard with buttons with associated callback data.
example: '["/button1, /button2", "/button3"] or [[["Text button1", "/button1"], ["Text button2", "/button2"]], [["Text button3", "/button3"]]]'
selector:
object:
message_tag:
name: Message tag
description: "Tag for sent message. In telegram_sent event data: {{trigger.event.data.message_tag}}"
example: "msg_to_edit"
selector:
text:
send_animation:
name: Send animation
description: Send an anmiation.
fields:
url:
name: URL
description: Remote path to a GIF or H.264/MPEG-4 AVC video without sound.
example: "http://example.org/path/to/the/animation.gif"
selector:
text:
file:
name: File
description: Local path to a GIF or H.264/MPEG-4 AVC video without sound.
example: "/path/to/the/animation.gif"
selector:
text:
caption:
name: Caption
description: The title of the animation.
example: "My animation"
selector:
text:
username:
name: Username
description: Username for a URL which require HTTP authentication.
example: myuser
selector:
text:
password:
name: Password
description: Password (or bearer token) for a URL which require HTTP authentication.
example: myuser_pwd
selector:
text:
authentication:
name: Authentication method
description: Define which authentication method to use. Set to `digest` to use HTTP digest authentication, or `bearer_token` for OAuth 2.0 bearer token authentication. Defaults to `basic`.
default: digest
selector:
select:
options:
- "digest"
- "bearer_token"
target:
name: Target
description: An array of pre-authorized chat_ids to send the document to. If not present, first allowed chat_id is the default.
example: "[12345, 67890] or 12345"
selector:
object:
parse_mode:
name: Parse Mode
description: "Parser for the message text."
selector:
select:
options:
- "html"
- "markdown"
- "markdown2"
disable_notification:
name: Disable notification
description: Sends the message silently. iOS users and Web users will not receive a notification, Android users will receive a notification with no sound.
selector:
boolean:
verify_ssl:
name: Verify SSL
description: Enable or disable SSL certificate verification. Set to false if you're downloading the file from a URL and you don't want to validate the SSL certificate of the server.
selector:
boolean:
timeout:
name: Timeout
description: Timeout for send sticker. Will help with timeout errors (poor internet connection, etc)
selector:
number:
min: 1
max: 3600
unit_of_measurement: seconds
keyboard:
name: Keyboard
description: List of rows of commands, comma-separated, to make a custom keyboard.
example: '["/command1, /command2", "/command3"]'
selector:
object:
inline_keyboard:
name: Inline keyboard
description: List of rows of commands, comma-separated, to make a custom inline keyboard with buttons with associated callback data.
example: '["/button1, /button2", "/button3"] or [[["Text button1", "/button1"], ["Text button2", "/button2"]], [["Text button3", "/button3"]]]'
selector:
object:
send_video:
name: Send video
description: Send a video.
fields:
url:
name: URL
description: Remote path to a video.
example: "http://example.org/path/to/the/video.mp4"
selector:
text:
file:
name: File
description: Local path to a video.
example: "/path/to/the/video.mp4"
selector:
text:
caption:
name: Caption
description: The title of the video.
example: "My video"
selector:
text:
username:
name: Username
description: Username for a URL which require HTTP authentication.
example: myuser
selector:
text:
password:
name: Password
description: Password (or bearer token) for a URL which require HTTP authentication.
example: myuser_pwd
selector:
text:
authentication:
name: Authentication method
description: Define which authentication method to use. Set to `digest` to use HTTP digest authentication, or `bearer_token` for OAuth 2.0 bearer token authentication. Defaults to `basic`.
default: digest
selector:
select:
options:
- "digest"
- "bearer_token"
target:
name: Target
description: An array of pre-authorized chat_ids to send the document to. If not present, first allowed chat_id is the default.
example: "[12345, 67890] or 12345"
selector:
object:
parse_mode:
name: Parse mode
description: "Parser for the message text."
selector:
select:
options:
- "html"
- "markdown"
- "markdown2"
disable_notification:
name: Disable notification
description: Sends the message silently. iOS users and Web users will not receive a notification, Android users will receive a notification with no sound.
selector:
boolean:
verify_ssl:
name: Verify SSL
description: Enable or disable SSL certificate verification. Set to false if you're downloading the file from a URL and you don't want to validate the SSL certificate of the server.
selector:
boolean:
timeout:
name: Timeout
description: Timeout for send video. Will help with timeout errors (poor internet connection, etc)
selector:
number:
min: 1
max: 3600
unit_of_measurement: seconds
keyboard:
name: Keyboard
description: List of rows of commands, comma-separated, to make a custom keyboard.
example: '["/command1, /command2", "/command3"]'
selector:
object:
inline_keyboard:
name: Inline keyboard
description: List of rows of commands, comma-separated, to make a custom inline keyboard with buttons with associated callback data.
example: '["/button1, /button2", "/button3"] or [[["Text button1", "/button1"], ["Text button2", "/button2"]], [["Text button3", "/button3"]]]'
selector:
object:
message_tag:
name: Message tag
description: "Tag for sent message. In telegram_sent event data: {{trigger.event.data.message_tag}}"
example: "msg_to_edit"
selector:
text:
send_voice:
name: Send voice
description: Send a voice message.
fields:
url:
name: URL
description: Remote path to a voice message.
example: "http://example.org/path/to/the/voice.opus"
selector:
text:
file:
name: File
description: Local path to a voice message.
example: "/path/to/the/voice.opus"
selector:
text:
caption:
name: Caption
description: The title of the voice message.
example: "My microphone recording"
selector:
text:
username:
name: Username
description: Username for a URL which require HTTP authentication.
example: myuser
selector:
text:
password:
name: Password
description: Password (or bearer token) for a URL which require HTTP authentication.
example: myuser_pwd
selector:
text:
authentication:
name: Authentication method
description: Define which authentication method to use. Set to `digest` to use HTTP digest authentication, or `bearer_token` for OAuth 2.0 bearer token authentication. Defaults to `basic`.
default: digest
selector:
select:
options:
- "digest"
- "bearer_token"
target:
name: Target
description: An array of pre-authorized chat_ids to send the document to. If not present, first allowed chat_id is the default.
example: "[12345, 67890] or 12345"
selector:
object:
disable_notification:
name: Disable notification
description: Sends the message silently. iOS users and Web users will not receive a notification, Android users will receive a notification with no sound.
selector:
boolean:
verify_ssl:
name: Verify SSL
description: Enable or disable SSL certificate verification. Set to false if you're downloading the file from a URL and you don't want to validate the SSL certificate of the server.
selector:
boolean:
timeout:
name: Timeout
description: Timeout for send voice. Will help with timeout errors (poor internet connection, etc)
selector:
number:
min: 1
max: 3600
unit_of_measurement: seconds
keyboard:
name: Keyboard
description: List of rows of commands, comma-separated, to make a custom keyboard.
example: '["/command1, /command2", "/command3"]'
selector:
object:
inline_keyboard:
name: Inline keyboard
description: List of rows of commands, comma-separated, to make a custom inline keyboard with buttons with associated callback data.
example: '["/button1, /button2", "/button3"] or [[["Text button1", "/button1"], ["Text button2", "/button2"]], [["Text button3", "/button3"]]]'
selector:
object:
message_tag:
name: Message tag
description: "Tag for sent message. In telegram_sent event data: {{trigger.event.data.message_tag}}"
example: "msg_to_edit"
selector:
text:
send_document:
name: Send document
description: Send a document.
fields:
url:
name: URL
description: Remote path to a document.
example: "http://example.org/path/to/the/document.odf"
selector:
text:
file:
name: File
description: Local path to a document.
example: "/tmp/whatever.odf"
selector:
text:
caption:
name: Caption
description: The title of the document.
example: Document Title xy
selector:
text:
username:
name: Username
description: Username for a URL which require HTTP authentication.
example: myuser
selector:
text:
password:
name: Password
description: Password (or bearer token) for a URL which require HTTP authentication.
example: myuser_pwd
selector:
text:
authentication:
name: Authentication method
description: Define which authentication method to use. Set to `digest` to use HTTP digest authentication, or `bearer_token` for OAuth 2.0 bearer token authentication. Defaults to `basic`.
default: digest
selector:
select:
options:
- "digest"
- "bearer_token"
target:
name: Target
description: An array of pre-authorized chat_ids to send the document to. If not present, first allowed chat_id is the default.
example: "[12345, 67890] or 12345"
selector:
object:
parse_mode:
name: Parse mode
description: "Parser for the message text."
selector:
select:
options:
- "html"
- "markdown"
- "markdown2"
disable_notification:
name: Disable notification
description: Sends the message silently. iOS users and Web users will not receive a notification, Android users will receive a notification with no sound.
selector:
boolean:
verify_ssl:
name: Verify SSL
description: Enable or disable SSL certificate verification. Set to false if you're downloading the file from a URL and you don't want to validate the SSL certificate of the server.
selector:
boolean:
timeout:
name: Timeout
description: Timeout for send document. Will help with timeout errors (poor internet connection, etc)
selector:
number:
min: 1
max: 3600
unit_of_measurement: seconds
keyboard:
name: Keyboard
description: List of rows of commands, comma-separated, to make a custom keyboard.
example: '["/command1, /command2", "/command3"]'
selector:
object:
inline_keyboard:
name: Inline keyboard
description: List of rows of commands, comma-separated, to make a custom inline keyboard with buttons with associated callback data.
example: '["/button1, /button2", "/button3"] or [[["Text button1", "/button1"], ["Text button2", "/button2"]], [["Text button3", "/button3"]]]'
selector:
object:
message_tag:
name: Message tag
description: "Tag for sent message. In telegram_sent event data: {{trigger.event.data.message_tag}}"
example: "msg_to_edit"
selector:
text:
send_location:
name: Send location
description: Send a location.
fields:
latitude:
name: Latitude
description: The latitude to send.
required: true
selector:
number:
min: -90
max: 90
step: 0.001
unit_of_measurement: "°"
longitude:
name: Longitude
description: The longitude to send.
required: true
selector:
number:
min: -180
max: 180
step: 0.001
unit_of_measurement: "°"
target:
name: Target
description: An array of pre-authorized chat_ids to send the location to. If not present, first allowed chat_id is the default.
example: "[12345, 67890] or 12345"
selector:
object:
disable_notification:
name: Disable notification
description: Sends the message silently. iOS users and Web users will not receive a notification, Android users will receive a notification with no sound.
selector:
boolean:
timeout:
name: Timeout
description: Timeout for send photo. Will help with timeout errors (poor internet connection, etc)
selector:
number:
min: 1
max: 3600
unit_of_measurement: seconds
keyboard:
name: Keyboard
description: List of rows of commands, comma-separated, to make a custom keyboard.
example: '["/command1, /command2", "/command3"]'
selector:
object:
inline_keyboard:
name: Inline keyboard
description: List of rows of commands, comma-separated, to make a custom inline keyboard with buttons with associated callback data.
example: '["/button1, /button2", "/button3"] or [[["Text button1", "/button1"], ["Text button2", "/button2"]], [["Text button3", "/button3"]]]'
selector:
object:
message_tag:
name: Message tag
description: "Tag for sent message. In telegram_sent event data: {{trigger.event.data.message_tag}}"
example: "msg_to_edit"
selector:
text:
edit_message:
name: Edit message
description: Edit a previously sent message.
fields:
message_id:
name: Message ID
description: id of the message to edit.
required: true
example: "{{ trigger.event.data.message.message_id }}"
selector:
text:
chat_id:
name: Chat ID
description: The chat_id where to edit the message.
required: true
example: 12345
selector:
text:
message:
name: Message
description: Message body of the notification.
example: The garage door has been open for 10 minutes.
selector:
text:
title:
name: Title
description: Optional title for your notification. Will be composed as '%title\n%message'
example: "Your Garage Door Friend"
selector:
text:
parse_mode:
name: Parse mode
description: "Parser for the message text."
selector:
select:
options:
- "html"
- "markdown"
- "markdown2"
disable_web_page_preview:
name: Disable web page preview
description: Disables link previews for links in the message.
selector:
boolean:
inline_keyboard:
name: Inline keyboard
description: List of rows of commands, comma-separated, to make a custom inline keyboard with buttons with associated callback data.
example: '["/button1, /button2", "/button3"] or [[["Text button1", "/button1"], ["Text button2", "/button2"]], [["Text button3", "/button3"]]]'
selector:
object:
edit_caption:
name: Edit caption
description: Edit the caption of a previously sent message.
fields:
message_id:
name: Message ID
description: id of the message to edit.
required: true
example: "{{ trigger.event.data.message.message_id }}"
selector:
text:
chat_id:
name: Chat ID
description: The chat_id where to edit the caption.
required: true
example: 12345
selector:
text:
caption:
name: Caption
description: Message body of the notification.
required: true
example: The garage door has been open for 10 minutes.
selector:
text:
inline_keyboard:
name: Inline keyboard
description: List of rows of commands, comma-separated, to make a custom inline keyboard with buttons with associated callback data.
example: '["/button1, /button2", "/button3"] or [[["Text button1", "/button1"], ["Text button2", "/button2"]], [["Text button3", "/button3"]]]'
selector:
object:
edit_replymarkup:
name: Edit reply markup
description: Edit the inline keyboard of a previously sent message.
fields:
message_id:
name: Message ID
description: id of the message to edit.
required: true
example: "{{ trigger.event.data.message.message_id }}"
selector:
text:
chat_id:
name: Chat ID
description: The chat_id where to edit the reply_markup.
required: true
example: 12345
selector:
text:
inline_keyboard:
name: Inline keyboard
description: List of rows of commands, comma-separated, to make a custom inline keyboard with buttons with associated callback data.
required: true
example: '["/button1, /button2", "/button3"] or [[["Text button1", "/button1"], ["Text button2", "/button2"]], [["Text button3", "/button3"]]]'
selector:
object:
answer_callback_query:
name: Answer callback query
description: Respond to a callback query originated by clicking on an online keyboard button. The answer will be displayed to the user as a notification at the top of the chat screen or as an alert.
fields:
message:
name: Message
description: Unformatted text message body of the notification.
required: true
example: "OK, I'm listening"
selector:
text:
callback_query_id:
name: Callback query ID
description: Unique id of the callback response.
required: true
example: "{{ trigger.event.data.id }}"
selector:
text:
show_alert:
name: Show alert
description: Show a permanent notification.
required: true
selector:
boolean:
timeout:
name: Timeout
description: Timeout for sending the answer. Will help with timeout errors (poor internet connection, etc)
selector:
number:
min: 1
max: 3600
unit_of_measurement: seconds
delete_message:
name: Delete message
description: Delete a previously sent message.
fields:
message_id:
name: Message ID
description: id of the message to delete.
required: true
example: "{{ trigger.event.data.message.message_id }}"
selector:
text:
chat_id:
name: Chat ID
description: The chat_id where to delete the message.
required: true
example: 12345
selector:
text:

View File

@@ -0,0 +1,107 @@
"""Support for Telegram bots using webhooks."""
import datetime as dt
from http import HTTPStatus
from ipaddress import ip_address
import logging
from telegram.error import TimedOut
from homeassistant.components.http import HomeAssistantView
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.helpers.network import get_url
from . import (
CONF_ALLOWED_CHAT_IDS,
CONF_TRUSTED_NETWORKS,
CONF_URL,
BaseTelegramBotEntity,
initialize_bot,
)
_LOGGER = logging.getLogger(__name__)
TELEGRAM_HANDLER_URL = "/api/telegram_webhooks"
REMOVE_HANDLER_URL = ""
async def async_setup_platform(hass, config):
"""Set up the Telegram webhooks platform."""
bot = initialize_bot(config)
current_status = await hass.async_add_executor_job(bot.getWebhookInfo)
if not (base_url := config.get(CONF_URL)):
base_url = get_url(hass, require_ssl=True, allow_internal=False)
# Some logging of Bot current status:
last_error_date = getattr(current_status, "last_error_date", None)
if (last_error_date is not None) and (isinstance(last_error_date, int)):
last_error_date = dt.datetime.fromtimestamp(last_error_date)
_LOGGER.info(
"Telegram webhook last_error_date: %s. Status: %s",
last_error_date,
current_status,
)
else:
_LOGGER.debug("telegram webhook Status: %s", current_status)
handler_url = f"{base_url}{TELEGRAM_HANDLER_URL}"
if not handler_url.startswith("https"):
_LOGGER.error("Invalid telegram webhook %s must be https", handler_url)
return False
def _try_to_set_webhook():
retry_num = 0
while retry_num < 3:
try:
return bot.setWebhook(handler_url, timeout=5)
except TimedOut:
retry_num += 1
_LOGGER.warning("Timeout trying to set webhook (retry #%d)", retry_num)
if current_status and current_status["url"] != handler_url:
result = await hass.async_add_executor_job(_try_to_set_webhook)
if result:
_LOGGER.info("Set new telegram webhook %s", handler_url)
else:
_LOGGER.error("Set telegram webhook failed %s", handler_url)
return False
hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STOP, lambda event: bot.setWebhook(REMOVE_HANDLER_URL)
)
hass.http.register_view(
BotPushReceiver(
hass, config[CONF_ALLOWED_CHAT_IDS], config[CONF_TRUSTED_NETWORKS]
)
)
return True
class BotPushReceiver(HomeAssistantView, BaseTelegramBotEntity):
"""Handle pushes from Telegram."""
requires_auth = False
url = TELEGRAM_HANDLER_URL
name = "telegram_webhooks"
def __init__(self, hass, allowed_chat_ids, trusted_networks):
"""Initialize the class."""
BaseTelegramBotEntity.__init__(self, hass, allowed_chat_ids)
self.trusted_networks = trusted_networks
async def post(self, request):
"""Accept the POST from telegram."""
real_ip = ip_address(request.remote)
if not any(real_ip in net for net in self.trusted_networks):
_LOGGER.warning("Access denied from %s", real_ip)
return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED)
try:
data = await request.json()
except ValueError:
return self.json_message("Invalid JSON", HTTPStatus.BAD_REQUEST)
if not self.process_message(data):
return self.json_message("Invalid message", HTTPStatus.BAD_REQUEST)
return None

View File

@@ -1,141 +0,0 @@
"""
Custom Component, written by @skalavala - based on the existing TCP component.
Support for UDP socket based sensors.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.udp/
"""
import logging
import socket
import select
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_NAME, CONF_HOST, CONF_PORT, CONF_PAYLOAD, CONF_TIMEOUT,
CONF_UNIT_OF_MEASUREMENT, CONF_VALUE_TEMPLATE)
from homeassistant.exceptions import TemplateError
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
CONF_BUFFER_SIZE = 'buffer_size'
CONF_VALUE_ON = 'value_on'
DEFAULT_BUFFER_SIZE = 1024
DEFAULT_NAME = 'UDP Sensor'
DEFAULT_TIMEOUT = 10
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PORT): cv.port,
vol.Required(CONF_PAYLOAD): cv.string,
vol.Optional(CONF_BUFFER_SIZE, default=DEFAULT_BUFFER_SIZE):
cv.positive_int,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
vol.Optional(CONF_VALUE_ON): cv.string,
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
})
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the UDP Sensor."""
add_entities([UdpSensor(hass, config)])
class UdpSensor(Entity):
"""Implementation of a UDP socket based sensor."""
required = tuple()
def __init__(self, hass, config):
"""Set all the config values if they exist and get initial state."""
value_template = config.get(CONF_VALUE_TEMPLATE)
if value_template is not None:
value_template.hass = hass
self._hass = hass
self._config = {
CONF_NAME: config.get(CONF_NAME),
CONF_HOST: config.get(CONF_HOST),
CONF_PORT: config.get(CONF_PORT),
CONF_TIMEOUT: config.get(CONF_TIMEOUT),
CONF_PAYLOAD: config.get(CONF_PAYLOAD),
CONF_UNIT_OF_MEASUREMENT: config.get(CONF_UNIT_OF_MEASUREMENT),
CONF_VALUE_TEMPLATE: value_template,
CONF_VALUE_ON: config.get(CONF_VALUE_ON),
CONF_BUFFER_SIZE: config.get(CONF_BUFFER_SIZE),
}
self._state = None
self.update()
@property
def name(self):
"""Return the name of this sensor."""
name = self._config[CONF_NAME]
if name is not None:
return name
return super(UdpSensor, self).name
@property
def state(self):
"""Return the state of the device."""
return self._state
@property
def unit_of_measurement(self):
"""Return the unit of measurement of this entity."""
return self._config[CONF_UNIT_OF_MEASUREMENT]
def update(self):
"""Get the latest value for this sensor."""
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.settimeout(self._config[CONF_TIMEOUT])
try:
sock.connect(
(self._config[CONF_HOST], self._config[CONF_PORT]))
except socket.error as err:
_LOGGER.error(
"Unable to connect to %s on port %s: %s",
self._config[CONF_HOST], self._config[CONF_PORT], err)
return
try:
sock.send(self._config[CONF_PAYLOAD].encode())
except socket.error as err:
_LOGGER.error(
"Unable to send payload %r to %s on port %s: %s",
self._config[CONF_PAYLOAD], self._config[CONF_HOST],
self._config[CONF_PORT], err)
return
readable, _, _ = select.select(
[sock], [], [], self._config[CONF_TIMEOUT])
if not readable:
_LOGGER.warning(
"Timeout (%s second(s)) waiting for a response after "
"sending %r to %s on port %s.",
self._config[CONF_TIMEOUT], self._config[CONF_PAYLOAD],
self._config[CONF_HOST], self._config[CONF_PORT])
return
value = sock.recv(self._config[CONF_BUFFER_SIZE]).decode()
if self._config[CONF_VALUE_TEMPLATE] is not None:
try:
self._state = self._config[CONF_VALUE_TEMPLATE].render(
value=value)
return
except TemplateError:
_LOGGER.error(
"Unable to render template of %r with value: %r",
self._config[CONF_VALUE_TEMPLATE], value)
return
self._state = value