#!/usr/bin/env python #-*- encoding: utf-8 -*- # # Lib IRC pour le bot. import socket as soc import re # Constant values HANDLER_NEXT = 0 HANDLER_LAST = 1 # Global variables socket = soc.socket(soc.AF_INET, soc.SOCK_STREAM) log_to = None bot_nick = 'DelrothBot' bot_password = None bot_identified = False # Here are defined handlers ident_handlers = [] public_message_handlers = [] private_message_handlers = [] ping_handlers = [] ctcp_handlers = [] join_handlers = [] nick_handlers = [] part_handlers = [] kick_handlers = [] mode_handlers = [] invite_handlers = [] unknown_response_handlers = [] # Utilities def assert_argument_count(func, n): if func.func_code.co_argcount != n: raise TypeException return class HandlerException(Exception): pass def call_handler(handler, *args): try: ret = handler(*args) except HandlerException: print "Intercepted exception from %s (in %s)." % (hand.__name__, handlers.__name__) return HANDLER_LAST return ret # Here are decorators for all those handlers def handle_ident(func): global ident_handlers ident_handlers = [func] + ident_handlers return func def _call_ident(): for f in ident_handlers: ret = call_handler(f) if ret == HANDLER_NEXT: continue elif ret == HANDLER_LAST: break def handle_public_message(**kwargs): def f(func): global public_message_handlers if not 'regexp' in kwargs.keys(): return handle_public_message(func, regexp='(.*)') public_message_handlers = [(func, re.compile(kwargs['regexp']))] + public_message_handlers return func return f def _call_public_message(sender, receiver, content): for (f,r) in public_message_handlers: m = r.match(content) if m == None: continue ret = call_handler(f, sender, receiver, m.groups()) if ret == HANDLER_NEXT: continue elif ret == HANDLER_LAST: break def handle_private_message(**kwargs): def f(func): global private_message_handlers if not 'regexp' in kwargs.keys(): return handle_private_message(func, regexp='.*') private_message_handlers = [(func, re.compile(kwargs['regexp']))] + private_message_handlers return func return f def _call_private_message(sender, content, notice=False): for (f,r) in public_message_handlers: m = r.match(content) if m == None: continue ret = call_handler(f, sender, m.groups(), notice) if ret == HANDLER_NEXT: continue elif ret == HANDLER_LAST: break def handle_ping(func): global ping_handlers ping_handlers = [func] + ping_handlers return func def _call_ping(answer): for f in ping_handlers: ret = call_handler(f, answer) if ret == HANDLER_NEXT: continue elif ret == HANDLER_LAST: break def handle_invite(func): global invite_handlers invite_handlers = [func] + invite_handlers return func def _call_invite(channel): for f in invite_handlers: ret = call_handler(f, channel) if ret == HANDLER_NEXT: continue elif ret == HANDLER_LAST: break def handle_unknown_response(func): global unknown_response_handlers unknown_response_handlers = [func] + unknown_response_handlers return func def _call_unknown_response(msg): for f in unknown_response_handlers: ret = call_handler(f, msg) if ret == HANDLER_NEXT: continue elif ret == HANDLER_LAST: break # Here is the main code class UserMask(object): regexp = re.compile(r'(.+?)!(.+?)@(.?)') def __init__(self, prefix): self.nick = self.user = self.host = '' match = UserMask.regexp.match(prefix) if match: self.nick, self.user, self.host = match.groups() else: self.nick = prefix class Channel(object): def __init__(self, name): self.name = name import sys def connect(host, port=6667, logto=sys.stdout): log_to = logto socket.connect((host, port)) buff = '' cr = False while True: try: txt = socket.recv(1) except: break if txt == '': break elif txt == '\r': cr = True elif cr: cr = False if txt == '\n': process(buff) buff = '' else: buff += txt return def log(msg, out): if out: print >> log_to, ">>>", str(msg) else: print >> log_to, "<<<", str(msg) def process(text): global bot_identified prefix = '' umask = None log(text, False) if text.startswith(":"): pos = text.find(' ') if pos != -1: prefix = text[1:pos] text = text[pos + 1:] umask = UserMask(prefix) pos = text.find(' ') if pos > 0: cmd = text[:pos] params = text[pos + 1:] else: cmd = text params = '' ps = [] last = False for pa in params.split(' '): if pa.startswith(':') and not last: last = True ps.append(pa[1:]) elif last: ps[len(ps) - 1] += ' ' + pa else: ps.append(pa) cmd = cmd.upper() if not bot_identified: _call_ident() bot_identified = True if cmd == 'PING': _call_ping(ps[0]) elif cmd == 'INVITE': _call_invite(ps[1]) elif cmd == 'NOTICE': _call_private_message(umask, ps[1], True) elif cmd == 'PRIVMSG': if ps[0].startswith('#'): _call_public_message(umask, ps[0], ps[1]) else: _call_private_message(umask, ps[0], ps[1]) def raw_send(text): log(text, True) socket.send(text + "\r\n") def send(text, *params): msg = text last = False for p in params: if (' ' in p) and not last: last = True p = ':' + p msg += ' ' + p raw_send(msg) def pong(r): send('PONG', r) def nick(n): send('NICK', n) def user(u, h, s, r): send('USER', u, h, s, r) def pass_(p): send('PASS', p) def join(c): send('JOIN', c) def part(c): send('PART', c) @handle_ident def _default_ident_handler(): global bot_nick, bot_password log('Sending identification to the server', True) nick(bot_nick) user(bot_nick, bot_nick, bot_nick, 'Python IRC Bot v1.0a1') if bot_password: pass_(bot_password) return HANDLER_NEXT @handle_ping def _default_ping_handler(reply): log('Replying a PING command', True) pong(reply) return HANDLER_LAST @handle_invite def _default_invite_handler(channel): log('Invited on %s' % channel, False) join(channel) return HANDLER_NEXT if __name__ == '__main__': @handle_public_message(regexp=r'^die$') def quit_action(sender, receiver, content): sys.exit(0) return HANDLER_LAST connect('irc.epiknet.org')