Source code for fatbotslim.irc.bot

# -*- coding: utf-8 -*-
#
# This file is part of FatBotSlim.
#
# FatBotSlim is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# FatBotSlim is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with FatBotSlim. If not, see <http://www.gnu.org/licenses/>.
#
"""
.. module:: fatbotslim.irc.bot

.. moduleauthor:: Mathieu D. (MatToufoutu)

This module contains IRC protocol related stuff.
"""

import re
from random import choice

from gevent import spawn, joinall, killall
from gevent.pool import Group

from fatbotslim.irc import u
from fatbotslim.irc.codes import *
from fatbotslim.irc.tcp import TCP, SSL
from fatbotslim.handlers import CTCPHandler, PingHandler, UnknownCodeHandler, RightsHandler
from fatbotslim.log import create_logger


ctcp_re = re.compile(ur'\x01(.*?)\x01')
log = create_logger(__name__)


[docs]class NullMessage(Exception): """ Raised when an empty line is received from the server. """ pass
[docs]class Message(object): """ Holds informations about a line received from the server. """ def __init__(self, data): """ :param data: line received from the server. :type data: unicode """ self._raw = data self.erroneous = False self.propagate = True try: self.src, self.dst, self.command, self.args = Message.parse(data) except IndexError: self.src, self.dst, self.command, self.args = [None] * 4 self.erroneous = True def __str__(self): return u"<Message(src='{0}', dst='{1}', command='{2}', args={3})>".format( self.src.name, self.dst, self.command, self.args ) @classmethod
[docs] def parse(cls, data): """ Extracts message informations from `data`. :param data: received line. :type data: unicode :return: extracted informations (source, destination, command, args). :rtype: tuple(Source, str, str, list) :raise: :class:`fatbotslim.irc.NullMessage` if `data` is empty. """ src = u'' dst = None if data[0] == u':': src, data = data[1:].split(u' ', 1) if u' :' in data: data, trailing = data.split(u' :', 1) args = data.split() args.extend(trailing.split()) else: args = data.split() command = args.pop(0) if command in (PRIVMSG, NOTICE): dst = args.pop(0) if ctcp_re.match(args[0]): args = args[0].strip(u'\x01').split() command = u'CTCP_' + args.pop(0) return Source(src), dst, command, args
[docs]class Source(object): """ Holds informations about a message sender. """ def __init__(self, prefix): """ :param prefix: prefix with format ``<servername>|<nick>['!'<user>]['@'<host>]``. :type prefix: unicode """ self._raw = prefix self.name, self.mode, self.user, self.host = Source.parse(prefix) def __str__(self): return u"<Source(nick='{0}', mode='{1}', user='{2}', host='{3}')>".format( self.name, self.mode, self.user, self.host ) @classmethod
[docs] def parse(cls, prefix): """ Extracts informations from `prefix`. :param prefix: prefix with format ``<servername>|<nick>['!'<user>]['@'<host>]``. :type prefix: unicode :return: extracted informations (nickname or host, mode, username, host). :rtype: tuple(str, str, str, str) """ try: nick, rest = prefix.split(u'!') except ValueError: return prefix, None, None, None try: mode, rest = rest.split(u'=') except ValueError: mode, rest = None, rest try: user, host = rest.split(u'@') except ValueError: return nick, mode, rest, None return nick, mode, user, host
[docs]class IRC(object): """ The main IRC bot class. """ quit_msg = u"I'll be back!" default_handlers = [ CTCPHandler, PingHandler, UnknownCodeHandler, RightsHandler ] def __init__(self, settings): """ The only expected argument is the bot's configuration, it should be a :class:`dict` with at least the following keys defined: * server: the ircd's host (:class:`str`) * port: the ircd's port (:class:`int`) * ssl: connect to the server using SSL (:class:`bool`) * channels: the channels to join upon connection (:class:`list`) * nick: the bot's nickname (:class:`str`) * realname: the bot's real name (:class:`str`) :param settings: bot configuration. :type settings: dict """ self.server = settings['server'] self.port = settings['port'] self.ssl = settings['ssl'] self.channels = map(u, settings['channels']) self.nick = u(settings['nick']) self.realname = u(settings['realname']) self.handlers = [] self._pool = Group() self.rights = None log.setLevel(settings.get('loglevel', 'INFO')) for handler in self.default_handlers: self.add_handler(handler) def _create_connection(self): """ Creates a transport channel. :return: transport channel instance :rtype: :class:`fatbotslim.irc.tcp.TCP` or :class:`fatbotslim.irc.tcp.SSL` """ transport = SSL if self.ssl else TCP return transport(self.server, self.port) def _connect(self): """ Connects the bot to the server and identifies itself. """ self.conn = self._create_connection() spawn(self.conn.connect) self.set_nick(self.nick) self.cmd(u'USER', u'{0} 3 * {1}'.format(self.nick, self.realname)) def _send(self, command): """ Sends a raw line to the server. :param command: line to send. :type command: unicode """ command = command.encode('utf-8') log.debug('>> ' + command) self.conn.oqueue.put(command) def _event_loop(self): """ The main event loop. Data from the server is parsed here using :func:`_parse_msg`. Parsed events are put in the object's event queue (`self.events`). """ while True: orig_line = self.conn.iqueue.get() log.debug('<< ' + orig_line) line = u(orig_line, errors='replace').strip() err_msg = False try: message = Message(line) except ValueError: err_msg = True if err_msg or message.erroneous: log.error("Received a line that can't be parsed: \"%s\"" % orig_line) continue if message.command == ERR_NICKNAMEINUSE: self.set_nick(IRC.randomize_nick(self.nick)) elif message.command == RPL_CONNECTED: for channel in self.channels: self.join(channel) self._handle(message) def _handle(self, msg): """ Pass a received message to the registered handlers. :param msg: received message :type msg: :class:`fatbotslim.irc.Message` """ def handler_yielder(): for handler in self.handlers: yield handler def handler_callback(_): if msg.propagate: try: h = hyielder.next() g = self._pool.spawn(handler_runner, h) g.link(handler_callback) except StopIteration: pass def handler_runner(h): for command in h.commands: if command == msg.command: method = getattr(h, h.commands[command]) method(msg) hyielder = handler_yielder() try: next_handler = hyielder.next() g = self._pool.spawn(handler_runner, next_handler) g.link(handler_callback) except StopIteration: pass @classmethod
[docs] def randomize_nick(cls, base, suffix_length=3): """ Generates a pseudo-random nickname. :param base: prefix to use for the generated nickname. :type base: unicode :param suffix_length: amount of digits to append to `base` :type suffix_length: int :return: generated nickname. :rtype: unicode """ suffix = u''.join(choice(u'0123456789') for _ in range(suffix_length)) return u'{0}{1}'.format(base, suffix)
[docs] def enable_rights(self): """ Enables rights management provided by :class:`fatbotslim.handlers.RightsHandler`. """ if self.rights is None: handler_instance = RightsHandler(self) self.handlers.insert(len(self.default_handlers), handler_instance)
[docs] def disable_rights(self): """ Disables rights management provided by :class:`fatbotslim.handlers.RightsHandler`. """ for handler in self.handlers: if isinstance(handler, RightsHandler): self.handlers.remove(handler) break self.rights = None
[docs] def add_handler(self, handler, args=None, kwargs=None): """ Registers a new handler. :param handler: handler to register. :type handler: :class:`fatbotslim.handlers.BaseHandler` :param args: positional arguments to pass to the handler's constructor. :type args: list :param kwargs: keyword arguments to pass to the handler's constructor. :type kwargs: dict """ args = [] if args is None else args kwargs = {} if kwargs is None else kwargs handler_instance = handler(self, *args, **kwargs) if isinstance(handler_instance, RightsHandler): self.rights = handler_instance if handler_instance not in self.handlers: self.handlers.append(handler_instance)
[docs] def cmd(self, command, args, prefix=None): """ Sends a command to the server. :param command: IRC code to send. :type command: unicode :param args: arguments to pass with the command. :type args: basestring :param prefix: optional prefix to prepend to the command. :type prefix: str or None """ if prefix is None: prefix = u'' raw_cmd = u'{0} {1} {2}'.format(prefix, command, args).strip() self._send(raw_cmd)
[docs] def ctcp_reply(self, command, dst, message=None): """ Sends a reply to a CTCP request. :param command: CTCP command to use. :type command: str :param dst: sender of the initial request. :type dst: str :param message: data to attach to the reply. :type message: str """ if message is None: raw_cmd = u'\x01{0}\x01'.format(command) else: raw_cmd = u'\x01{0} {1}\x01'.format(command, message) self.notice(dst, raw_cmd)
[docs] def msg(self, target, msg): """ Sends a message to an user or channel. :param target: user or channel to send to. :type target: str :param msg: message to send. :type msg: str """ self.cmd(u'PRIVMSG', u'{0} :{1}'.format(target, msg))
[docs] def notice(self, target, msg): """ Sends a NOTICE to an user or channel. :param target: user or channel to send to. :type target: str :param msg: message to send. :type msg: basestring """ self.cmd(u'NOTICE', u'{0} :{1}'.format(target, msg))
[docs] def join(self, channel): """ Make the bot join a channel. :param channel: new channel to join. :type channel: str """ self.cmd(u'JOIN', channel)
[docs] def set_nick(self, nick): """ Changes the bot's nickname. :param nick: new nickname to use :type nick: unicode """ self.cmd(u'NICK', nick)
[docs] def disconnect(self): """ Disconnects the bot from the server. """ self.cmd(u'QUIT', u':{0}'.format(self.quit_msg))
[docs] def run(self): """ Connects the bot and starts the event loop. """ self._connect() self._event_loop()
[docs]def run_bots(bots): """ Run many bots in parallel. :param bots: IRC bots to run. :type bots: list """ greenlets = [spawn(bot.run) for bot in bots] try: joinall(greenlets) except KeyboardInterrupt: for bot in bots: bot.disconnect() finally: killall(greenlets)