Source code for pyborg.pyborg

#
# PyBorg: The python AI bot.
#
# Copyright (c) 2000, 2006, 2013-2021 Tom Morton, Sebastien Dailly, Jack Laxson
#
#
# This bot was inspired by the PerlBorg, by Eric Bock.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
# Tom Morton <tom@moretom.net>
# Seb Dailly <seb.dailly@gmail.com>
# Jack Laxson <jackjrabbit+pyborg@gmail.com>

import collections
import datetime
import json
import logging
import marshal
import os
import random
import re
import sys
import time
import uuid
import zipfile
from pathlib import Path
from random import randint
from typing import Any, Dict, Optional, Tuple, List, Callable, Union
from zlib import crc32

import attr
import click
import toml

from pyborg.util.censored_defaults import CENSORED_REASONABLE_DEFAULTS
from pyborg.util.util_cli import mk_folder

from . import __version__

logger = logging.getLogger(__name__)

try:
    import nltk
    logger.debug("Got nltk!")
except ImportError:
    nltk = None
    logger.debug("No nltk, won't be using advanced part of speech tagging.")

try:
    import systemd.daemon
except ImportError:
    systemd = None
    logger.debug("no systemd support detected")


def filter_message(message: str, bot) -> str:
    """
    Filter a message body so it is suitable for learning from and
    replying to. This involves removing confusing characters,
    padding ? and ! with ". " so they also terminate lines
    and converting to lower case.
    """
    # to lowercase
    message = message.lower()

    # remove garbage
    message = message.replace("\"", "")  # remove "s
    message = message.replace("\n", " ")  # remove newlines
    message = message.replace("\r", " ")  # remove carriage returns

    # remove matching brackets (unmatched ones are likely smileys :-) *cough*
    # should except out when not found.
    index = 0
    try:
        while 1:
            index = message.index("(", index)
            # Remove matching ) bracket
            i = message.index(")", index + 1)
            message = message[0:i] + message[i + 1:]
            # And remove the (
            message = message[0:index] + message[index + 1:]
    except ValueError as e:
        logger.debug("filter_message error: %s", e)

    message = message.replace(";", ",")
    message = message.replace("?", " ? ")
    message = message.replace("!", " ! ")
    message = message.replace(".", " . ")
    message = message.replace(",", " , ")
    message = message.replace("'", " ' ")
    message = message.replace(":", " : ")

    # Find ! and ? and append full stops.
    #   message = message.replace(". ", ".. ")
    #   message = message.replace("? ", "?. ")
    #   message = message.replace("! ", "!. ")

    #   And correct the '...'
    #   message = message.replace("..  ..  .. ", ".... ")

    words = message.split()
    for x in range(0, len(words)):
        # is there aliases ?
        for z in bot.settings.aliases.keys():
            for alias in bot.settings.aliases[z]:
                pattern = "^%s$" % alias
                if re.search(pattern, words[x]):
                    words[x] = z

    message = " ".join(words)
    return message


@attr.s
class FakeCfg2:
    """fake it until you make it"""

    aliases: dict = attr.ib(default={})
    num_aliases: int = attr.ib(default=0)
    censored: List[str] = attr.ib(default=CENSORED_REASONABLE_DEFAULTS, repr=False)
    ignore_list: list = attr.ib(default=[])
    max_words: int = attr.ib(default=6000)
    num_words: int = attr.ib(default=0)
    num_contexts: int = attr.ib(default=0)
    no_save: bool = attr.ib(default=False)
    learning: bool = attr.ib(default=True)

    def save(self) -> None:
        logger.debug("Settings save called. Current state: %s", self)


class FakeAns:
    """this is a cool thing"""
    def __init__(self) -> None:
        self.sentences = {}


@attr.s
class InternalCommand:
    name: str = attr.ib()
    function: Callable[..., str] = attr.ib()
    help: Union[str, bool] = attr.ib()
    input: bool = attr.ib(default=False)

    def get_help(self):
        if help is True:
            # introsepct the function here
            return self.function.__doc__
        return self.help


def checkdict(pyb: "PyborgExperimental") -> str:
    "Check for broken links in the dictionary"
    t = time.time()
    num_broken = 0
    num_bad = 0
    for w in pyb.words.keys():
        wlist = pyb.words[w]
        for i in range(len(wlist) - 1, -1, -1):
            line_idx = wlist[i]['hashval']
            word_num = wlist[i]['index']
            # Nasty critical error we should fix
            if line_idx not in pyb.lines:
                logging.debug("Removing broken link '%s' -> %d" % (w, line_idx))
                num_broken = num_broken + 1
                del wlist[i]
            else:
                # Check pointed to word is correct
                split_line = pyb.lines[line_idx][0].split()
                if split_line[word_num] != w:
                    logging.error("Line '%s' word %d is not '%s' as expected." % (pyb.lines[line_idx][0], word_num, w))
                    num_bad = num_bad + 1
                    del wlist[i]
        if len(wlist) == 0:
            del pyb.words[w]
            pyb.settings.num_words = pyb.settings.num_words - 1
            logging.info("\"%s\" vaped totally" % w)

    output = "Checked dictionary in %0.2fs. Fixed links: %d broken, %d bad." % (time.time() - t, num_broken, num_bad)
    logging.info(output)
    return output


def known_command(pyb: "PyborgExperimental", word: str) -> str:
    if word in pyb.words:
        c = len(pyb.words[word])
        msg = "%s is known (%d contexts)" % (word, c)
    else:
        msg = "%s is unknown." % word
    return msg


def known2(pyb, words: List[str]) -> str:
    msg = "Number of contexts: "
    for x in words:
        if x in pyb.words:
            c = len(pyb.words[x])
            msg += x + "/" + str(c) + " "
        else:
            msg += x + "/0 "
    return msg


def _internal_commands_generate() -> Dict:
    return {"checkdict": InternalCommand(name="checkdict", function=checkdict, help="check the brain for broken links (legacy)"),
            "known": InternalCommand(name="known", function=known_command, input=True, help=True)}

def _create_new_database() -> str:
    mk_folder()
    folder = click.get_app_dir("Pyborg")
    name = datetime.datetime.now().strftime("%m-%d-%y-auto-{}.pyborg.json").format(str(uuid.uuid4())[:4])
    brain_path = os.path.join(folder, "brains", name)
    logger.info("Error reading saves. New database created.")
    return brain_path


def PyborgBridge(brain: Any) -> "PyborgExperimental":
    "cheat and make an api mapping to the old one"
    mk_folder()
    logger.info("Reading dictionary...")
    try:
        their_pyb = PyborgExperimental.from_brain(brain)
    except (EOFError, IOError) as e:
        # Create new database
        logger.debug(e)
        folder = click.get_app_dir("Pyborg")
        name = datetime.datetime.now().strftime("%m-%d-%y-auto-{}.pyborg.json").format(str(uuid.uuid4())[:4])
        brain_path = Path(folder, "brains", name)
        logger.info("Error reading saves. New database created.")
        their_pyb = PyborgExperimental(brain=brain_path, words={}, lines={})
    return their_pyb



[docs]@attr.s class PyborgExperimental: brain: Path = attr.ib() words: Dict[str, Dict[str, int]] = attr.ib() lines: Dict[int, Tuple[str, int]] = attr.ib() settings_file: Path = attr.ib(default=Path(click.get_app_dir("Pyborg"), "pyborg.toml")) settings: FakeCfg2 = attr.ib(default=FakeCfg2()) internal_commands: Dict[str, InternalCommand] = attr.ib(default=_internal_commands_generate()) ver_string: str = attr.ib(default=f"I am a version {__version__} Pyborg") saves_version: str = attr.ib(default="1.4.0") ready: bool = attr.ib(default=False) has_nltk: bool = attr.ib(init=False) def __attrs_post_init__(self) -> None: if nltk is None: self.has_nltk = False else: self.has_nltk = True def __repr__(self) -> str: return f"{self.ver_string} with {len(self.words)} words and {len(self.lines)} lines. With a settings of: {self.settings}" def __str__(self) -> str: return self.ver_string def on_ready(self): """does nothing! implement or override. used internally for systemd notify.""" pass
[docs] @classmethod def from_brain(cls, brain: Path) -> "PyborgExperimental": words, lines = pyborg.load_brain_json(brain) # READY can be sent here instance = cls(brain=brain, lines=lines, words=words) instance.on_ready() instance.ready = True return instance
[docs] def make_reply(self, body: str) -> str: pass
[docs] def learn(self, body: str) -> None: pass
[docs] def save(self) -> None: """ Save brain as 1.4.0 JSON-Unsigned format """ logger.info("Writing dictionary...") folder = click.get_app_dir("Pyborg") logger.info("Saving pyborg brain to %s", self.brain) cnt: collections.Counter = collections.Counter() for key, value in self.words.items(): cnt[type(key)] += 1 # cnt[type(value)] += 1 for i in value: cnt[type(i)] += 1 logger.debug("Types: %s", cnt) logger.debug("Words: %s", self.words) logger.debug("Lines: %s", self.lines) brain = {'version': self.saves_version, 'words': self.words, 'lines': self.lines} tmp_file = os.path.join(folder, "tmp", "current.pyborg.json") with open(tmp_file, 'w') as f: # this can fail half way... json.dump(brain, f) # if we didn't crash os.rename(tmp_file, self.brain) logger.debug("Successful writing of brain & renaming. Quitting.")
def save_brain(self): "for bridging. should be removed." self.save()
class PyborgSystemdNotify(PyborgExperimental): def on_ready(self): systemd.daemon.notify('READY=1') class PyborgEmptyJSON(Exception): pass class PyborgNoBrainException(Exception): pass
[docs]class pyborg: ver_string = "I am a version 2.0.0 Pyborg" saves_version = "1.4.0" # Main command list commandlist = "Pyborg commands:\n!checkdict, !contexts, !help, !known, !learning, !rebuilddict, \ !replace, !unlearn, !purge, !version, !words, !limit, !alias, !save, !censor, !uncensor, !owner" commanddict = { "help": "Owner command. Usage: !help [command]\nPrints information about using a command, or a list of commands if no command is given", "version": "Usage: !version\nDisplay what version of Pyborg we are running", "words": "Usage: !words\nDisplay how many words are known", "known": "Usage: !known word1 [word2 [...]]\nDisplays if one or more words are known, and how many contexts are known", "contexts": "Owner command. Usage: !contexts <phrase>\nPrint contexts containing <phrase>", "unlearn": "Owner command. Usage: !unlearn <expression>\nRemove all occurances of a word or expression from the dictionary. For example '!unlearn of of' would remove all contexts containing double 'of's", "purge": "Owner command. Usage: !purge [number]\nRemove all occurances of the words that appears in less than <number> contexts", "replace": "Owner command. Usage: !replace <old> <new>\nReplace all occurances of word <old> in the dictionary with <new>", "learning": "Owner command. Usage: !learning [on|off]\nToggle bot learning. Without arguments shows the current setting", "checkdict": "Owner command. Usage: !checkdict\nChecks the dictionary for broken links. Shouldn't happen, but worth trying if you get KeyError crashes", "rebuilddict": "Owner command. Usage: !rebuilddict\nRebuilds dictionary links from the lines of known text. Takes a while. You probably don't need to do it unless your dictionary is very screwed", "censor": "Owner command. Usage: !censor [word1 [...]]\nPrevent the bot using one or more words. Without arguments lists the currently censored words", "uncensor": "Owner command. Usage: !uncensor word1 [word2 [...]]\nRemove censorship on one or more words", "limit": "Owner command. Usage: !limit [number]\nSet the number of words that pyBorg can learn", "alias": "Owner command. Usage: !alias : Show the differents aliases\n!alias <alias> : show the words attached to this alias\n!alias <alias> <word> : link the word to the alias", "owner": "Usage : !owner password\nAdd the user in the owner list" } @staticmethod def load_brain_2(brain_path: Union[str, Path]) -> Tuple[Dict, Dict]: """1.2.0 marshal.zip loader Returns tuple (words, lines)""" saves_version = b"1.2.0" try: zfile = zipfile.ZipFile(brain_path, 'r') for filename in zfile.namelist(): data = zfile.read(filename) f = open(filename, 'w+b') f.write(data) f.close() except (EOFError, IOError) as e: logger.debug(e) print("no zip found") logger.info("No archive.zip (pyborg brain) found.") with open("version", "rb") as vers, open("words.dat", "rb") as words, open("lines.dat", "rb") as lines: x = vers.read() logger.debug("Saves Version: %s", x) if x != saves_version: print("Error loading dictionary\nPlease convert it before launching pyborg") logger.error("Error loading dictionary\nPlease convert it before launching pyborg") logger.debug("Pyborg version: %s", saves_version) sys.exit(1) words = marshal.loads(words.read()) lines = marshal.loads(lines.read()) return words, lines @staticmethod def load_brain_json(brain_path: Union[Path, str]) -> Tuple[Dict[str, int], Dict[int, Tuple[str, int]]]: """Load the new format""" saves_version = u"1.4.0" # folder = click.get_app_dir("Pyborg") logger.debug("Trying to open brain %s", brain_path) with open(brain_path) as f: raw_json = f.read() logger.debug(raw_json) try: brain = json.loads(raw_json) except json.decoder.JSONDecodeError as e: logger.exception(e) logger.info("Tried to open brain %s", brain_path) # if the file is just empty for instance a Tempfile from `tempfile` just record it and raise a less scary error. # wrapping Paths is fine apparently... probably nasty but whatever if Path(brain_path).stat().st_size < 2: raise PyborgEmptyJSON from e else: raise e if brain['version'] == saves_version: logger.debug(brain['lines']) lines = {int(x): y for x, y in brain['lines'].items()} return brain['words'], lines else: print("Error loading dictionary\nPlease convert it before launching pyborg") logger.error("Error loading dictionary\nPlease convert it before launching pyborg") logger.debug("Pyborg version: %s", saves_version) raise PyborgNoBrainException() def save_brain(self) -> None: """ Save brain as 1.4.0 JSON-Unsigned format """ logger.info("Writing dictionary...") saves_version = u"1.4.0" folder = click.get_app_dir("Pyborg") logger.info("Saving pyborg brain to %s", self.brain_path) cnt = collections.Counter() for key, value in self.words.items(): cnt[type(key)] += 1 # cnt[type(value)] += 1 for i in value: cnt[type(i)] += 1 logger.debug("Types: %s", cnt) logger.debug("Words: %s", self.words) logger.debug("Lines: %s", self.lines) brain = {'version': saves_version, 'words': self.words, 'lines': self.lines} tmp_file = os.path.join(folder, "tmp", "current.pyborg.json") with open(tmp_file, 'w') as f: # this can fail half way... json.dump(brain, f) # if we didn't crash os.rename(tmp_file, self.brain_path) logger.debug("Successful writing of brain & renaming.") def save_all(self) -> None: "Legacy wraper for save_brain()" self.save_brain() def load_settings(self) -> FakeCfg2: toml_path = os.path.join(click.get_app_dir("Pyborg"), "pyborg.toml") if os.path.exists(click.get_app_dir("Pyborg")) and not os.path.exists(toml_path): settings = {'pyborg-core': {"max_words": False}} toml.dump(settings, open(toml_path, "w")) d = toml.load(toml_path)['pyborg-core'] if d['max_words']: cfg = FakeCfg2(max_words=d['max_words']) else: cfg = FakeCfg2(max_words=50000) return cfg def __repr__(self) -> str: return "{} with {} words and {} lines. With a settings of: {}".format(self.ver_string, len(self.words), len(self.lines), self.settings) def __init__(self, brain: Union[str, Path, None] = None) -> None: """ Open the dictionary. Resize as required. """ self.settings = self.load_settings() self.answers = FakeAns() self.unfilterd = {} mk_folder() # Read the dictionary logger.info("Reading dictionary...") if brain is None: self.brain_path = 'archive.zip' else: self.brain_path = brain try: self.words, self.lines = self.load_brain_json(self.brain_path) except (EOFError, IOError, json.decoder.JSONDecodeError, PyborgEmptyJSON) as e: # Create new database self.words = {} self.lines = {} logger.error(e) folder = click.get_app_dir("Pyborg") name = datetime.datetime.now().strftime("%m-%d-%y-auto-{}.pyborg.json").format(str(uuid.uuid4())[:4]) self.brain_path = os.path.join(folder, "brains", name) logger.info("Error reading saves. New database created.") # Is a resizing required? if len(self.words) != self.settings.num_words: logger.info("Updating dictionary information...") self.settings.num_words = len(self.words) num_contexts = 0 # Get number of contexts for x in self.lines.keys(): num_contexts += len(self.lines[x][0].split()) self.settings.num_contexts = num_contexts # Save new values self.settings.save() # Is an aliases update required ? compteur = 0 for x in self.settings.aliases.keys(): compteur += len(self.settings.aliases[x]) if compteur != self.settings.num_aliases: logger.info("check dictionary for new aliases") self.settings.num_aliases = compteur for x in self.words.keys(): # is there aliases ? if x[0] != '~': for z in self.settings.aliases.keys(): for alias in self.settings.aliases[z]: pattern = "^%s$" % alias if re.search(pattern, x): logger.info("replace %s with %s", x, z) self.replace(x, z) for x in self.words.keys(): if not (x in self.settings.aliases.keys()) and x[0] == '~': logger.info("unlearn %s" % x) self.settings.num_aliases -= 1 self.unlearn(x) logger.info("unlearned aliases %s" % x) # unlearn words in the unlearn.txt file. try: with open("unlearn.txt", 'r') as f: for line in f.readlines(): self.unlearn(line) except (EOFError, IOError) as e: logger.debug("No words to unlearn") self.settings.save() def save_all_2(self): "legacy api" if self.settings.no_save != "True": print("Writing dictionary...") try: zfile = zipfile.ZipFile(self.brain_path, 'r') for filename in zfile.namelist(): data = zfile.read(filename) f = open(filename, 'w+b') f.write(data) f.close() except (OSError, IOError): print("no zip found. Is the programm launch for first time ?") with open("words.dat", "wb") as f: f.write(marshal.dumps(self.words)) with open("lines.dat", "wb") as f: f.write(marshal.dumps(self.lines)) # save the version with open('version', 'w') as f: f.write(self.saves_version) # zip the files with zipfile.ZipFile(self.brain_path, "w") as f: f.write('words.dat') f.write('lines.dat') f.write('version') try: os.remove('words.dat') os.remove('lines.dat') os.remove('version') except (OSError, IOError): print("could not remove the files") f = open("words.txt", "w") # write each words known wordlist = [] # Sort the list befor to export for key in self.words: wordlist.append([key, len(self.words[key])]) wordlist.sort(key=lambda x: x[1]) list(map((lambda x: f.write(str(x[0]) + "\n\r")), wordlist)) f.close() f = open("sentences.txt", "w") # write each words known wordlist = [] # Sort the list befor to export for key in self.unfilterd: wordlist.append([key, self.unfilterd[key]]) # wordlist.sort(lambda x, y: cmp(y[1], x[1])) wordlist.sort(key=lambda x: x[1]) list(map((lambda x: f.write(str(x[0]) + "\n")), wordlist)) f.close() # Save settings self.settings.save() def process_msg(self, io_module, body, replyrate, learn: int, args, owner=0) -> None: """ Process message 'body' and pass back to IO module with args. If owner==1 allow owner commands. """ logger.debug("process_msg: %s", locals()) # add trailing space so sentences are broken up correctly body = body + " " # Parse commands if body[0] == "!": logger.debug("sending do_commands...") self.do_commands(io_module, body, args, owner) return # Filter out garbage and do some formatting body = filter_message(body, self) # Learn from input if learn == 1: self.learn(body) # Make a reply if desired if randint(0, 99) < int(replyrate): message = "" # Look if we can find a prepared answer for sentence in self.answers.sentences.keys(): pattern = "^%s$" % sentence if re.search(pattern, body): message = self.answers.sentences[sentence][ randint(0, len(self.answers.sentences[sentence]) - 1)] break else: if body in self.unfilterd: self.unfilterd[body] = self.unfilterd[body] + 1 else: self.unfilterd[body] = 0 if message == "": message = self.reply(body) # single word reply: always output if len(message.split()) == 1: io_module.output(message, args) return # empty. do not output if message == "": return # else output if owner == 0: time.sleep(.2 * len(message)) io_module.output(message, args) def do_commands(self, io_module, body: str, args, owner: int) -> None: """ Respond to user commands. """ msg = "" command_list = body.split() logger.debug("do_commands.command_list: %s", command_list) command_list[0] = command_list[0].lower() # Guest commands. # Version string if command_list[0] == "!version": msg = self.ver_string # How many words do we know? elif command_list[0] == "!words": num_w = self.settings.num_words num_c = self.settings.num_contexts num_l = len(self.lines) if num_w != 0: num_cpw = num_c / float(num_w) # contexts per word else: num_cpw = 0.0 msg = "I know %d words (%d contexts, %.2f per word), %d lines." % (num_w, num_c, num_cpw, num_l) # Owner commands if owner == 1: # Save dictionary if command_list[0] == "!save": self.save_all() msg = "Dictionary saved" # Command list elif command_list[0] == "!help": if len(command_list) > 1: # Help for a specific command cmd = command_list[1].lower() dic = None if cmd in self.commanddict.keys(): dic = self.commanddict elif cmd in io_module.commanddict.keys(): dic = io_module.commanddict if dic: for i in dic[cmd].split("\n"): io_module.output(i, args) else: msg = "No help on command '%s'" % cmd else: for i in self.commandlist.split("\n"): io_module.output(i, args) for i in io_module.commandlist.split("\n"): io_module.output(i, args) # Change the max_words setting elif command_list[0] == "!limit": msg = "The max limit is " if len(command_list) == 1: msg += str(self.settings.max_words) else: limit = int(command_list[1].lower()) self.settings.max_words = limit msg += "now " + command_list[1] # Rebuild the dictionary by discarding the word links and # re-parsing each line elif command_list[0] == "!rebuilddict": if self.settings.learning == 1: t = time.time() old_lines = self.lines old_num_words = self.settings.num_words old_num_contexts = self.settings.num_contexts self.words = {} self.lines = {} self.settings.num_words = 0 self.settings.num_contexts = 0 for k in old_lines.keys(): self.learn(old_lines[k][0], old_lines[k][1]) msg = "Rebuilt dictionary in %0.2fs. Words %d (%+d), contexts %d (%+d)" % (time.time() - t, old_num_words, self.settings.num_words - old_num_words, old_num_contexts, self.settings.num_contexts - old_num_contexts) # Remove rares words elif command_list[0] == "!purge": t = time.time() if len(command_list) == 2: # limite d occurences a effacer c_max = int(command_list[1]) else: c_max = 0 number_removed = self.purge(c_max, io_module=io_module) msg = "Purge dictionary in %0.2fs. %d words removed" % (time.time() - t, number_removed) # Change a typo in the dictionary elif command_list[0] == "!replace": if len(command_list) < 3: return old = command_list[1].lower() new = command_list[2].lower() msg = self.replace(old, new) # Print contexts [flooding...:-] elif command_list[0] == "!contexts": # This is a large lump of data and should # probably be printed, not module.output XXX # build context we are looking for context = " ".join(command_list[1:]) context = context.lower() if context == "": return io_module.output("Contexts containing \"" + context + "\":", args) # Build context list # Pad it context = " " + context + " " c = [] # Search through contexts for x in self.lines.keys(): # get context ctxt = self.lines[x][0] # add leading whitespace for easy sloppy search code ctxt = " " + ctxt + " " if ctxt.find(context) != -1: # Avoid duplicates (2 of a word # in a single context) if len(c) == 0: c.append(self.lines[x][0]) elif c[len(c) - 1] != self.lines[x][0]: c.append(self.lines[x][0]) x = 0 while x < 5: if x < len(c): io_module.output(c[x], args) x += 1 if len(c) == 5: return if len(c) > 10: io_module.output("...({} skipped)...".format(len(c) - 10), args) x = len(c) - 5 if x < 5: x = 5 while x < len(c): io_module.output(c[x], args) x += 1 # Remove a word from the vocabulary [use with care] elif command_list[0] == "!unlearn": # build context we are looking for context = " ".join(command_list[1:]) context = context.lower() if context == "": return print("Looking for: " + context) # Unlearn contexts containing 'context' t = time.time() self.unlearn(context) # we don't actually check if anything was # done.. msg = "Unlearn done in %0.2fs" % (time.time() - t) # Query/toggle bot learning elif command_list[0] == "!learning": msg = "Learning mode " if len(command_list) == 1: if self.settings.learning == 0: msg += "off" else: msg += "on" else: toggle = command_list[1].lower() if toggle == "on": msg += "on" self.settings.learning = 1 else: msg += "off" self.settings.learning = 0 # add a word to the 'censored' list elif command_list[0] == "!censor": # no arguments. list censored words if len(command_list) == 1: if len(self.settings.censored) == 0: msg = "No words censored" else: msg = "I will not use the word(s) %s" % ", ".join(self.settings.censored) # add every word listed to censored list else: for x in range(1, len(command_list)): if command_list[x] in self.settings.censored: msg += "%s is already censored" % command_list[x] else: self.settings.censored.append(command_list[x].lower()) self.unlearn(command_list[x]) msg += "done" msg += "\n" # remove a word from the censored list elif command_list[0] == "!uncensor": # Remove everyone listed from the ignore list # eg !unignore tom dick harry for x in range(1, len(command_list)): try: self.settings.censored.remove(command_list[x].lower()) msg = "done" except ValueError as e: logger.exception(e) elif command_list[0] == "!alias": # no arguments. list aliases words if len(command_list) == 1: if len(self.settings.aliases) == 0: msg = "No aliases" else: msg = "I will alias the word(s) %s" % ", ".join(self.settings.aliases.keys()) # add every word listed to alias list elif len(command_list) == 2: if command_list[1][0] != '~': command_list[1] = '~' + command_list[1] if command_list[1] in self.settings.aliases.keys(): msg = "Thoses words : %s are aliases to %s" % (" ".join(self.settings.aliases[command_list[1]]), command_list[1]) else: msg = "The alias %s is not known" % command_list[1][1:] elif len(command_list) > 2: # create the aliases msg = "The words : " if command_list[1][0] != '~': command_list[1] = '~' + command_list[1] if not(command_list[1] in self.settings.aliases.keys()): self.settings.aliases[command_list[1]] = [command_list[1][1:]] self.replace(command_list[1][1:], command_list[1]) msg += command_list[1][1:] + " " for x in range(2, len(command_list)): msg += "%s " % command_list[x] self.settings.aliases[command_list[1]].append(command_list[x]) # replace each words by his alias self.replace(command_list[x], command_list[1]) msg += "have been aliases to %s" % command_list[1] # Quit elif command_list[0] == "!quit": # Close the dictionary self.save_all() sys.exit() # Save changes self.settings.save() logger.info(msg) if msg != "": io_module.output(msg, args)
[docs] def replace(self, old: str, new: str) -> str: """ Replace all occuraces of 'old' in the dictionary with 'new'. Nice for fixing learnt typos. """ try: pointers = self.words[old] except KeyError: return old + " not known." changed = 0 for x in pointers: # pointers consist of (line, word) to self.lines l = self.words[x['hashval']] # noqa: E741 w = self.words[x['index']] line = self.lines[l][0].split() number = self.lines[l][1] if line[w] != old: # fucked dictionary print("Broken link: %s %s" % (x, self.lines[l][0])) continue line[w] = new self.lines[l][0] = " ".join(line) self.lines[l][1] += number changed += 1 if new in self.words: self.settings.num_words -= 1 self.words[new].extend(self.words[old]) else: self.words[new] = self.words[old] del self.words[old] return "%d instances of %s replaced with %s" % (changed, old, new)
def purge(self, max_contexts: int, io_module=None) -> int: "Remove rare words from the dictionary. Returns number of words removed." liste = [] compteur = 0 for w in self.words.keys(): digit = 0 char = 0 for c in w: if c.isalpha(): char += 1 if c.isdigit(): digit += 1 # Compte les mots inferieurs a cette limite c = len(self.words[w]) if c < 2 or (digit and char): liste.append(w) compteur += 1 if compteur == max_contexts: break if max_contexts < 1: # io_module.output(str(compteur)+" words to remove", args) if io_module: # I'm not gonna pass pyborg.process.args. This breaks the api technically. io_module.output("%s words to remove" % compteur, []) # supprime les mots for w in liste[0:]: self.unlearn(w) return len(liste[0:])
[docs] def unlearn(self, context: str) -> None: """ Unlearn all contexts containing 'context'. If 'context' is a single word then all contexts containing that word will be removed, just like the old !unlearn <word> """ # Pad thing to look for # We pad so we don't match 'shit' when searching for 'hit', etc. context = " " + context + " " # Search through contexts # count deleted items dellist = [] # words that will have broken context due to this wordlist = [] for x in self.lines.copy().keys(): # get context. pad c = " " + self.lines[x][0] + " " if c.find(context) != -1: # Split line up wlist = self.lines[x][0].split() # add touched words to list for w in wlist: if w not in wordlist: wordlist.append(w) dellist.append(x) del self.lines[x] words = self.words # update links for x in wordlist: word_contexts = words[x] # Check all the word's links (backwards so we can delete) for y in range(len(word_contexts) - 1, -1, -1): # Check for any of the deleted contexts hashval = word_contexts[y]['hashval'] if hashval in dellist: del word_contexts[y] self.settings.num_contexts = self.settings.num_contexts - 1 if len(words[x]) == 0: del words[x] self.settings.num_words = self.settings.num_words - 1 logger.info(f" \"{x}\" vaped totally")
def _is_censored(self, word: str) -> bool: """DRY.""" for censored in self.settings.censored: if re.search(censored, word): logger.debug(f"word is censored: {word}") return True return False
[docs] def reply(self, body) -> Optional[str]: """ Reply to a line of text. """ # split sentences into list of words _words = body.split(" ") words = [] for i in _words: words += i.split() if len(words) == 0: logger.debug("Did not find any words to reply to.") return None # remove words on the ignore list words = [x for x in words if x not in self.settings.ignore_list and not x.isdigit()] logger.debug("reply: cleaned words: %s", words) # Find rarest word (excluding those unknown) index = [] known = -1 # The word has to have been seen in already 3 contexts differents for being choosen known_min = 3 for w in words: logger.debug("known_loop: locals: %s", locals()) if w in self.words: k = len(self.words[w]) logger.debug("known_loop: k?? %s", k) else: continue if (known == -1 or k < known) and k > known_min: index = [w] known = k continue elif k == known: index.append(w) continue # Index now contains list of rarest known words in sentence # index = words # def find_known_words(words): # d = dict() # for w in words: # if w in self.words: # logger.debug(self.words[w]) # k = len(self.words[w]) # d[w] = k # logger.debug("find_known_words: %s", d) # idx = [x for x,y in d.items() if y > 3] # logger.debug("find_known_words: %s", idx) # return idx # index = find_known_words(words) if len(index) == 0: logger.debug("No words with atleast 3 contexts were found.") logger.debug("reply:index: %s", index) return "" # Begin experimental NLP code def weight(pos: str) -> int: """Takes a POS tag and assigns a weight New: doubled the weights in 1.4""" lookup = {"NN": 8, "NNP": 10, "RB": 4, "NNS": 6, "NNPS": 10} try: ret = lookup[pos] except KeyError: ret = 2 return ret def _mappable_nick_clean(pair: Tuple[str, str]) -> Tuple[str, int]: "mappable weight apply but with shortcut for #nick" word, pos = pair if word == "#nick": comp_weight = 1 else: comp_weight = weight(pos) return (word, comp_weight) if nltk: # uses punkt tokenized = nltk.tokenize.casual.casual_tokenize(body) # uses averaged_perceptron_tagger tagged = nltk.pos_tag(tokenized) logger.info(tagged) weighted_choices = list(map(_mappable_nick_clean, tagged)) population = [val for val, cnt in weighted_choices for i in range(cnt)] word = random.choice(population) # make sure the word is known counter = 0 while word not in self.words and counter < 200: word = random.choice(population) counter += 1 logger.debug("Ran choice %d times", counter) else: word = index[randint(0, len(index) - 1)] # Build sentence backwards from "chosen" word if self._is_censored(word): logger.debug("chosen word: %s***%s is censored. ignoring.", word[0], word[-1]) return None sentence = [word] done = 0 while done == 0: # create a dictionary wich will contain all the words we can found before the "chosen" word pre_words = {"": 0} # this is for prevent the case when we have an ignore_listed word word = str(sentence[0].split(" ")[0]) for x in range(0, len(self.words[word]) - 1): logger.debug(locals()) logger.debug('trying to unpack: %s', self.words[word][x]) l = self.words[word][x]['hashval'] # noqa: E741 w = self.words[word][x]['index'] context = self.lines[l][0] num_context = self.lines[l][1] cwords = context.split() # if the word is not the first of the context, look the previous one if cwords[w] != word: print(context) if w: # look if we can found a pair with the choosen word, and the previous one if len(sentence) > 1 and len(cwords) > w + 1: if sentence[1] != cwords[w + 1]: continue # if the word is in ignore_list, look the previous word look_for = cwords[w - 1] if look_for in self.settings.ignore_list and w > 1: look_for = cwords[w - 2] + " " + look_for # saves how many times we can found each word if look_for not in pre_words: pre_words[look_for] = num_context else: pre_words[look_for] += num_context else: pre_words[""] += num_context # Sort the words liste = list(pre_words.items()) # this is a view in py3 liste.sort(key=lambda x: x[1]) numbers = [liste[0][1]] for x in range(1, len(liste)): numbers.append(liste[x][1] + numbers[x - 1]) # take one them from the list ( randomly ) mot = randint(0, numbers[len(numbers) - 1]) for x in range(0, len(numbers)): if mot <= numbers[x]: mot = liste[x][0] break # if the word is already choosen, pick the next one while mot in sentence: x += 1 if x >= len(liste) - 1: mot = '' logger.info("the choosening: %s", liste[x]) mot = liste[x][0] # logger.debug("mot1: %s", len(mot)) mot = mot.split() mot.reverse() if mot == []: done = 1 else: list(map((lambda x: sentence.insert(0, x)), mot)) pre_words = sentence sentence = sentence[-2:] # Now build sentence forwards from "chosen" word # We've got # cwords: ... cwords[w-1] cwords[w] cwords[w+1] cwords[w+2] # sentence: ... sentence[-2] sentence[-1] look_for look_for ? # we are looking, for a cwords[w] known, and maybe a cwords[w-1] known, what will be the cwords[w+1] to choose. # cwords[w+2] is need when cwords[w+1] is in ignored list done = 0 while done == 0: # create a dictionary wich will contain all the words we can found before the "chosen" word post_words = {"": 0} word = str(sentence[-1].split(" ")[-1]) for x in range(0, len(self.words[word])): l = self.words[word][x]['hashval'] # noqa: E741 w = self.words[word][x]['index'] context = self.lines[l][0] num_context = self.lines[l][1] cwords = context.split() # look if we can found a pair with the choosen word, and the next one if len(sentence) > 1: if sentence[len(sentence) - 2] != cwords[w - 1]: continue if w < len(cwords) - 1: # if the word is in ignore_list, look the next word look_for = cwords[w + 1] if (look_for in self.settings.ignore_list or look_for in self.settings.censored) and w < len(cwords) - 2: look_for = look_for + " " + cwords[w + 2] if look_for not in post_words: post_words[look_for] = num_context else: post_words[look_for] += num_context else: post_words[""] += num_context # Sort the words liste = list(post_words.items()) liste.sort(key=lambda x: x[1]) numbers = [liste[0][1]] for x in range(1, len(liste)): numbers.append(liste[x][1] + numbers[x - 1]) # take one them from the list ( randomly ) mot = randint(0, numbers[len(numbers) - 1]) for x in range(0, len(numbers)): if mot <= numbers[x]: mot = liste[x][0] break x = -1 while mot in sentence: x += 1 if x >= len(liste) - 1: mot = '' break mot = liste[x][0] # logger.debug("mot2: %s", len(mot)) mot = mot.split() if mot == []: done = 1 else: list(map(lambda x: sentence.append(x), mot)) sentence = pre_words[:-2] + sentence # this seems bogus? how does this work??? # Replace aliases for x in range(0, len(sentence)): if sentence[x][0] == "~": sentence[x] = sentence[x][1:] # Insert space between each words list(map((lambda x: sentence.insert(1 + x * 2, " ")), range(0, len(sentence) - 1))) # correct the ' & , spaces problem # code is not very good and can be improve but does his job... for x in range(0, len(sentence)): if sentence[x] == "'": sentence[x - 1] = "" sentence[x + 1] = "" if sentence[x] == ",": sentence[x - 1] = "" # logger.debug("final locals: %s", locals()) # yolo for w in sentence: if self._is_censored(w): logger.debug(f"word in sentence: {w[0]}***{w[-1]} is censored. escaping.") return None final = "".join(sentence) return final
[docs] def learn(self, body: str, num_context: int = 1) -> None: """ Lines should be cleaned (filter_message()) before passing to this. """ def learn_line(body: str, num_context: int) -> None: """ Learn from a sentence. nb: there is a closure here... """ logger.debug("entering learn_line") if nltk: words = nltk.word_tokenize(body) else: words = body.split() # Ignore sentences of < 1 words XXX was <3 if len(words) < 1: return # voyelles = "aàâeéèêiîïoöôuüûy" voyelles = "aeiouy" logger.debug("reply:learn_line:words: %s", words) for x in range(0, len(words)): nb_voy = 0 digit = 0 char = 0 for c in words[x]: if c in voyelles: nb_voy += 1 if c.isalpha(): char += 1 if c.isdigit(): digit += 1 for censored in self.settings.censored: if re.search(censored, words[x]): logger.debug("word: %s***%s is censored. escaping.", words[x][0], words[x][-1]) return if len(words[x]) > 13 \ or (((nb_voy * 100) / len(words[x]) < 26) and len(words[x]) > 5) \ or (char and digit) \ or (words[x] in self.words) == 0 and self.settings.learning == 0: # if one word as more than 13 characters, don't learn # (in french, this represent 12% of the words) # and d'ont learn words where there are less than 25% of voyels # don't learn the sentence if one word is censored # don't learn too if there are digits and char in the word # same if learning is off logger.debug("reply:learn_line: Bailing because reasons?") return elif ("-" in words[x] or "_" in words[x]): words[x] = "#nick" num_w = self.settings.num_words if num_w != 0: num_cpw = self.settings.num_contexts / float(num_w) # contexts per word else: num_cpw = 0 cleanbody = " ".join(words) # Hash collisions we don't care about. 2^32 is big :-) # Ok so this takes a bytes object... in python3 thats a pain cleanbody_b = bytes(cleanbody, "utf-8") # ok so crc32 got changed in 3... hashval = crc32(cleanbody_b) & 0xffffffff logger.debug(hashval) # Check context isn't already known if hashval not in self.lines: if not (num_cpw > 100 and self.settings.learning == 0): self.lines[hashval] = [cleanbody, num_context] # Add link for each word for i, word in enumerate(words): if word in self.words: # Add entry. (line number, word number) self.words[word].append({"hashval": hashval, "index": i}) else: self.words[word] = [{"hashval": hashval, "index": i}] self.settings.num_words += 1 self.settings.num_contexts += 1 else: self.lines[hashval][1] += num_context # if max_words reached, don't learn more if self.settings.num_words >= self.settings.max_words: self.settings.learning = False # Split body text into sentences and parse them # one by one. body += " " logger.debug("reply:replying to %s", body) # map ( (lambda x : learn_line(self, x, num_context)), body.split(". ")) for part in body.split('. '): learn_line(part, num_context)