--- /dev/null
+# Mattermost Bot.
+# Copyright (c) 2016-2021 by Someone <someone@somenet.org> (aka. Jan Vales <jan@jvales.net>)
+# published under MIT-License
+
+
+class RequireException(Exception):
+ """Required precondition not met-exception. Mostly used for permission checks. Message will be forwarded to command-caller/user"""
+
+
+class AbstractCommand():
+ URL = None
+ TEAM_ID = None
+ TRIGGER = None
+ CONFIG = {"display_name": "somebot-command", "auto_complete": True}
+ USEINFO = None
+
+ bot = None
+ mm_secret_token = None
+
+
+ def __init__(self, team_id):
+ self.TEAM_ID = team_id
+
+
+# def __str__(self):
+# return str(self.__class__)+" for team_id: "+str(self.TEAM_ID)
+# def __repr__(self):
+# return self.__str__()
+
+
+ # can/should be overridden by the user
+ def on_register(self):
+ """Consider to override. Handles the post-command-registration logic at bot startup."""
+ self._create_slash_command()
+
+ # can/should be overridden by the user
+ def on_shutdown(self):
+ """Consider to override. Handles the shutdown-procedure."""
+ return
+
+ # can/should be overridden by the user
+ def on_SIGUSR1(self, sigusr1_cnt):
+ """Consider to override. Handles the SIGUSR1-signal."""
+ return
+
+ # should be overridden by the user
+ def on_POST(self, request, data):
+ """Override. Handles the post-command logic."""
+ return
+
+ # should be overridden by the user
+ # manual command authentication needed!
+ def on_POST_interactive(self, request, data):
+ """Consider to override. Handles the interactive-message logic."""
+ return
+
+ # should be overridden by the user
+ # manual command authentication needed!
+ def on_POST_dialog(self, request, data):
+ """Consider to override. Handles the dialog logic."""
+ return
+
+
+ def _on_register(self, bot):
+ self.bot = bot
+ self.URL = self.bot.local_websrv_url+"/"+self.TEAM_ID+"/"+self.TRIGGER
+ if self.USEINFO:
+ self.bot.USETOPICS.setdefault("/"+self.TRIGGER, self.USEINFO)
+ self.on_register()
+
+
+ def _on_shutdown(self):
+ self.on_shutdown()
+
+
+ def _on_SIGUSR1(self, sigusr1_cnt):
+ self.on_SIGUSR1(sigusr1_cnt)
+
+
+ def _on_POST(self, request, data):
+ try:
+ self._require_not_guest(data)
+ self.on_POST(request, data)
+ except RequireException as ex:
+ request.cmd_respond_text_temp(str(ex))
+
+
+ def _on_POST_interactive(self, request, data):
+ try:
+ self._require_not_guest(data)
+ self.on_POST_interactive(request, data)
+ except RequireException as ex:
+ request.cmd_respond_text_temp(str(ex))
+
+
+ def _on_POST_dialog(self, request, data):
+ try:
+ self._require_not_guest(data)
+ self.on_POST_dialog(request, data)
+ except RequireException as ex:
+ request.cmd_respond_text_temp(str(ex))
+
+
+ def _create_slash_command(self):
+ # (possibly) delete old version of command
+ for command in self.bot.api.list_custom_slash_commands_for_team(self.TEAM_ID):
+ if command["url"] == self.URL or command["trigger"].lower() == self.TRIGGER.lower():
+ self.bot.api.delete_slash_command(command["id"])
+
+ # create slash command
+ res = self.bot.api.create_slash_command(self.TEAM_ID, self.TRIGGER.lower(), self.URL+"/command")
+ res.update(self.CONFIG)
+ self.bot.api.update_slash_command(res)
+ self.mm_secret_token = res["token"]
+
+
+ def _require_bot_admin(self, data):
+ """
+ Require exactly bot admin priviledges.
+ Throws RequireException if not priviledged.
+ """
+ if not data["user_id"] in self.bot.admin_ids:
+ raise RequireException("### Leave me alone. You are not by real dad.")
+
+
+ def _require_system_admin(self, data, exc=True):
+ """
+ Require at least team admin priviledges.
+ Throws RequireException if not priviledged.
+ """
+ user = self.bot.api.get_user(data["user_id"])
+ if "system_admin" not in user["roles"]:
+ if exc:
+ raise RequireException("### You are not SYSTEM_ADMIN. :(")
+ return False
+ return True
+
+
+ def _require_team_admin(self, data, exc=True):
+ """
+ Require at least team admin priviledges.
+ Throws RequireException if not priviledged.
+ """
+ team_member = self.bot.api.get_team_member(data["team_id"], data["user_id"])
+ if "team_admin" not in team_member["roles"] and not self._require_system_admin(data, exc=False):
+ if exc:
+ raise RequireException("### You are not TEAM_ADMIN. :(")
+ return False
+ return True
+
+
+ def _require_channel_admin(self, data, exc=True):
+ """
+ Require at least channel admin priviledges.
+ Throws RequireException if not priviledged.
+ """
+ channel_member = self.bot.api.get_channel_member(data["channel_id"], data["user_id"])
+ if "channel_admin" not in channel_member["roles"] and not self._require_team_admin(data, exc=False):
+ if exc:
+ raise RequireException("### You are not CHANNEL_ADMIN. :(")
+ return False
+ return True
+
+
+ def _require_not_guest(self, data, exc=True):
+ """
+ Require to not be a guest.
+ Throws RequireException if guest.
+ """
+ channel_member = self.bot.api.get_channel_member(data["channel_id"], data["user_id"])
+ if "channel_guest" in channel_member["roles"]:
+ if exc:
+ raise RequireException("### The bot cannot be used by guests. :(")
+ return False
+ return True
--- /dev/null
+# Mattermost Bot.
+# Copyright (c) 2016-2021 by Someone <someone@somenet.org> (aka. Jan Vales <jan@jvales.net>)
+# published under MIT-License
+
+import asyncio
+import atexit
+import json
+import logging
+import os
+import pprint
+import signal
+import sys
+import time
+import traceback
+import urllib
+
+from inspect import cleandoc
+from http.server import BaseHTTPRequestHandler, HTTPServer
+from urllib.request import Request, urlopen
+from socketserver import ThreadingMixIn
+
+import mattermost
+import mattermost.ws
+
+
+logger = logging.getLogger(__name__)
+
+
+class MMBot():
+ def __init__(self, local_websrv_hostname="localhost", local_websrv_port=18065, api_user=None, api_user_pw=None, api_bearer=None, mm_api_url="http://localhost:8065/api", mm_ws_url="ws://localhost:8065/api/v4/websocket", debug_chan_id=None):
+ self.local_websrv_hostname = local_websrv_hostname
+ self.local_websrv_port = local_websrv_port
+ self.api_user = api_user
+ self.mm_api_url = mm_api_url
+ self.mm_ws_url = mm_ws_url
+
+ self.local_websrv_url = ("http://"+self.local_websrv_hostname+":"+str(self.local_websrv_port)+"/").strip("/")
+ self.modules = dict()
+ self.wsmodules = dict()
+ self.api = None
+ self.debug_chan_id = debug_chan_id
+ self.loop = asyncio.new_event_loop()
+ self.command_stats = dict()
+ self.admin_ids = []
+
+ self.mmws = None
+ self.sigusr1_cnt = 0
+
+ atexit.register(self.on_shutdown)
+ signal.signal(signal.SIGUSR1, self.on_SIGUSR1)
+ signal.signal(signal.SIGTERM, self.shutdown)
+ signal.signal(signal.SIGINT, self.shutdown)
+
+
+ # Core-Command: /use-data
+ self.USETOPICS = {"bot":cleandoc("""##### Here I am, brain the size of a planet, and they ask me to fill in some missing MM-features and be fun ... It gives me a headache.
+ Written by **``@someone``** in python3. Big thanks to contributors: **``@ju``**, **``@gittenburg``**
+ Inspiring ideas by: ``@bearza``, ``@frunobulax``, **``@x5468656f``**
+
+ Feel like contributing too? Talk to **``@someone``**. :)
+ The repository is here: https://git.somenet.org/pub/jan/mattermost-bot.git
+ """)}
+
+ if api_user is not None and api_user_pw is not None:
+ logger.info("User credentials given. Trying to login.")
+ self.api = mattermost.MMApi(self.mm_api_url)
+ self.api.login(api_user, api_user_pw)
+ elif api_user is not None:
+ self.api = mattermost.MMApi(self.mm_api_url)
+ self.api.login(bearer=api_bearer)
+
+
+ # Register a module with the bot.
+ def register(self, module):
+ if module.TEAM_ID not in self.modules:
+ self.modules[module.TEAM_ID] = dict()
+
+ if module.TRIGGER not in self.modules[module.TEAM_ID]:
+ self.modules[module.TEAM_ID][module.TRIGGER] = module
+ module._on_register(self)
+
+ else:
+ raise Exception("Multiple registration attempts for module: "+module.TRIGGER+" and team: "+module.TEAM_ID)
+
+
+ # Register a websocket handling module with the bot.
+ # There is no useful way to discriminate WS-events by originating teams atm. :(
+ def register_ws(self, module, eventlist):
+ for evtype in eventlist:
+ if evtype not in self.wsmodules:
+ self.wsmodules[evtype] = dict()
+
+ if module.NAME not in self.wsmodules[evtype]:
+ self.wsmodules[evtype][module.NAME] = module
+ module._on_register_ws_evtype(self, evtype)
+
+ else:
+ raise Exception("Multiple registration attempts for module: "+module.NAME+" and evtype: "+evtype)
+
+
+ def start(self):
+ logger.info("Starting: Almost there.")
+ logger.info(pprint.pformat(self.modules))
+ logger.info(pprint.pformat(self.wsmodules))
+
+ if self.mm_ws_url is not None:
+ self.mmws = mattermost.ws.MMws(self.websocket_handler, self.api, self.mm_ws_url)
+
+ if self.local_websrv_hostname is not None and self.local_websrv_port is not None:
+ self.start_webserver()
+
+
+ def on_shutdown(self):
+ logger.info("Shutting down ...")
+
+ if self.mmws:
+ self.mmws.close_websocket()
+ # todo: stop webserver + WS?
+
+ for team_modules in self.modules:
+ for module in self.modules[team_modules]:
+ self.modules[team_modules][module]._on_shutdown()
+
+ for evtype in self.wsmodules:
+ for module in self.wsmodules[evtype]:
+ self.wsmodules[evtype][module]._on_shutdown()
+
+ self.api.logout()
+ self.command_stats_dump()
+ logger.info("BYE.")
+
+
+
+ ########
+ # misc #
+ ########
+ def shutdown(self, unk1=None, unk2=None, err=0):
+ sys.exit(err)
+
+
+ def on_SIGUSR1(self, unk1=None, unk2=None):
+ logger.info("on_SIGUSR1()")
+ self.sigusr1_cnt += 1
+ self.command_stats_inc("internal::SIGUSR1")
+ self.command_stats_dump()
+
+ # TODO: reinit teams.
+
+ for team_modules in self.modules:
+ for module in self.modules[team_modules]:
+ self.modules[team_modules][module]._on_SIGUSR1(self.sigusr1_cnt)
+
+ for evtype in self.wsmodules:
+ for module in self.wsmodules[evtype]:
+ self.wsmodules[evtype][module]._on_SIGUSR1(self.sigusr1_cnt)
+
+
+ def debug_chan(self, message):
+ if self.debug_chan_id is None:
+ logger.info("debug_chan() called, but debug_chan_id is unspecified.")
+ return
+ self.api.create_post(self.debug_chan_id, "``AUTODELETE-DAY``\n"+message)
+
+
+ def command_stats_inc(self, command, amount=1):
+ if command in self.command_stats:
+ self.command_stats[command] += amount
+ else:
+ self.command_stats[command] = amount
+
+
+ def command_stats_dump(self):
+ self.dump_stats_json(self.command_stats, "/tmp/somebot_command_stats.json", "#command_usage #mmstats")
+ self.command_stats = dict()
+
+
+ def dump_stats_json(self, stats_data, file_path, header="", footer="", no_data_text=""):
+ do_write = False
+ if stats_data:
+ do_write = True
+
+ try:
+ with open(file_path, "r") as file:
+ old_stats = json.load(file)
+ for item, cnt in old_stats["data"].items():
+ if item in stats_data:
+ stats_data[item] += cnt
+ else:
+ stats_data[item] = cnt
+ except (FileNotFoundError, json.JSONDecodeError, KeyError):
+ do_write = True
+
+ # if no data, but file exists: skip write
+ if not do_write and "header" in old_stats and "footer" in old_stats and "no_data_text" in old_stats and old_stats["header"] == header and old_stats["footer"] == footer and old_stats["no_data_text"] == no_data_text:
+ return
+
+ logger.info("dump_stats_json(): writing file: %s", file_path)
+ self.command_stats_inc("internal::dump_stats_json:"+file_path)
+
+ with open(file_path, "w", encoding="utf-8") as file:
+ json.dump({"header":header, "footer":footer, "no_data_text":no_data_text, "data":dict(sorted(stats_data.items()))}, file, ensure_ascii=False, indent=2)
+
+
+
+ ##########################
+ # Bot's websocket client #
+ ##########################
+ def websocket_handler(self, mmws, event_data):
+ for evtype in self.wsmodules:
+ if evtype == event_data["event"]:
+ for module_name in self.wsmodules[evtype]:
+ try:
+ if self.wsmodules[evtype][module_name].on_WS_EVENT(event_data):
+ self.command_stats_inc("ws::"+evtype+"::"+module_name)
+ except Exception:
+ exctxt = "".join(traceback.format_exc())
+ logger.error(exctxt)
+
+
+ ###################
+ # Bot's webserver #
+ ###################
+ def start_webserver(self):
+ logger.info("Starting webserver.")
+
+ class RequestHandler(BaseHTTPRequestHandler):
+ bot = None
+ handled = False
+ responseURL = None
+
+ def do_POST(self):
+ self.handled = False
+
+ if self.headers["Content-Type"] == "application/x-www-form-urlencoded":
+ data = urllib.parse.parse_qs(self.rfile.read(int(self.headers["Content-Length"])).decode("utf-8"), keep_blank_values=True)
+ # only accept first occurence
+ data = {k: v[0] for k, v in data.items()}
+ elif self.headers["Content-Type"] == "application/json":
+ data = json.loads(self.rfile.read(int(self.headers["Content-Length"])).decode("utf-8"))
+ else:
+ self.respond(415)
+ return
+
+ # store responseURL
+ if "response_url" in data:
+ self.responseURL = data["response_url"]
+
+ # handle call
+ splitpath = self.path.strip("/").split("/")
+ if self.bot.modules[splitpath[0]] and self.bot.modules[splitpath[0]][splitpath[1]]:
+ module = self.bot.modules[splitpath[0]][splitpath[1]]
+
+ # /command
+ if len(splitpath) > 2 and splitpath[2] == "command":
+ if "token" in data and module.mm_secret_token == data["token"]:
+ self.bot.command_stats_inc("/"+splitpath[1]+"/"+splitpath[2])
+ module._on_POST(self, data)
+ else:
+ logger.warning("mm_secret_token mismatch expected/got -%s-%s-", module.mm_secret_token, data["token"])
+ self.respond(403)
+ self.bot.on_shutdown()
+ os._exit(1)
+
+ # interactive button-handler. TODO auth!
+ elif len(splitpath) > 2 and splitpath[2] == "interactive":
+ self.bot.command_stats_inc("/"+splitpath[1]+"/"+splitpath[2])
+ module._on_POST_interactive(self, data)
+
+ # dialog-handler: TODO: auth!
+ elif len(splitpath) > 2 and splitpath[2] == "dialog":
+ self.bot.command_stats_inc("/"+splitpath[1]+"/"+splitpath[2])
+ module._on_POST_dialog(self, data)
+
+ else: #something weird.
+ self.bot.command_stats_inc("/"+splitpath[1]+' --- SOMETHINGS WEIRD!!!')
+
+ # always try to fail. If previously responded to, nothing will happen.
+ self.respond(400, only_direct=True)
+
+
+ def cmd_respond_text_temp(self, message, props=None, http=200):
+ if props is None:
+ self.respond(http, {"skip_slack_parsing":True, "response_type":"ephemeral", "text":message})
+ else:
+ self.respond(http, {"skip_slack_parsing":True, "response_type":"ephemeral", "text":message, "props":props})
+
+
+ def cmd_respond_text_chan(self, message, props=None, http=200):
+ if props is None:
+ self.respond(http, {"skip_slack_parsing":True, "response_type":"in_channel", "text":message})
+ else:
+ self.respond(http, {"skip_slack_parsing":True, "response_type":"in_channel", "text":message, "props":props})
+
+
+ def respond(self, err, message=None, only_direct=False):
+ # first response handled by returning in-request.
+ # Must be fast before mm drops the connection.
+ if self.handled and only_direct:
+ #logger.warning("Multiple responses but only_direct set. ignoring.")
+ #traceback.print_stack()
+ return
+
+ elif self.handled and not self.responseURL:
+ logger.error("Multiple responses without response url. ignoring.")
+ traceback.print_stack()
+ return
+
+ elif self.handled:
+ logger.info("Multiple responses. Using responseURL: "+self.responseURL)
+ req = Request(self.responseURL, data=bytes(json.dumps(message), "utf8"), method='POST')
+ req.add_header("Content-Type", "application/json")
+ conn = urlopen(req, timeout=3)
+ return
+
+ self.handled = True
+ self.send_response(err)
+ self.send_header("Content-Type", "application/json")
+ self.send_header("Content-Length", len(bytes(json.dumps(message), "utf8")))
+ self.end_headers()
+ if message is not None:
+ self.wfile.write(bytes(json.dumps(message), "utf8"))
+
+
+ class MyHTTPServer(ThreadingMixIn, HTTPServer):
+ def serve_forever(self, bot):
+ self.RequestHandlerClass.bot = bot
+ HTTPServer.serve_forever(self)
+
+ self.httpd = MyHTTPServer((self.local_websrv_hostname, self.local_websrv_port), RequestHandler)
+ self.httpd.serve_forever(self)