Source code for clgen.corpus

#
# Copyright 2016, 2017 Chris Cummins <[email protected]>.
#
# This file is part of CLgen.
#
# CLgen is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CLgen is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with CLgen.  If not, see <http://www.gnu.org/licenses/>.
#
"""
Manipulating and handling training corpuses.
"""
import re
import codecs
import numpy as np

from checksumdir import dirhash
from collections import Counter
from copy import deepcopy
from labm8 import crypto
from labm8 import fs
from labm8 import jsonutil
from labm8 import lockfile
from labm8 import tar
from labm8 import types
from six.moves import cPickle
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from time import time

import clgen
from clgen import atomizer
from clgen import clutil
from clgen import dbutil
from clgen import explore
from clgen import features
from clgen import fetch
from clgen import log
from clgen import preprocess


# Default options used for corpus. Any values provided by the user will override
# these defaults.
DEFAULT_CORPUS_OPTS = {
    "eof": False,
    "batch_size": 50,
    "seq_length": 50,
    "vocabulary": "char",
    "encoding": "default",
    "preserve_order": False,
}


[docs]class FeaturesError(clgen.CLgenError): """ Thrown in case of error during features encoding. """ pass
[docs]def unpack_directory_if_needed(path: str) -> str: """ If path is a tarball, unpack it. If path doesn't exist but there is a tarball with the same name, unpack it. Arguments: path (str): Path to directory or tarball. Returns: str: Path to directory. Raises: clgen.InternalError: If unable to extract archive. """ if fs.isdir(path): return path if fs.isfile(path) and path.endswith(".tar.bz2"): log.info("unpacking '{}'".format(path)) tar.unpack_archive(path) return re.sub(r'.tar.bz2$', '', path) if fs.isfile(path + ".tar.bz2"): log.info("unpacking '{}'".format(path + ".tar.bz2")) tar.unpack_archive(path + ".tar.bz2") return path raise clgen.InternalError("cannot interpret archive '{path}'" .format(**vars()))
[docs]def get_atomizer(corpus: str, vocab: str="char") -> list: """ Get atomizer for a corpus. Arguments: corpus (str): Corpus. vocab (str, optional): Vocabularly type. Returns: atomizer.Atomizer: Atomizer. """ atomizers = { "char": atomizer.CharacterAtomizer, "greedy": atomizer.GreedyAtomizer, } atomizerclass = atomizers.get(vocab, None) if atomizerclass is None: raise clgen.UserError( "Unknown vocabulary type '{bad}'. Supported values: {good}".format( bad=vocab, good=", ".join(sorted(atomizers.keys())))) else: return atomizerclass.from_text(corpus)
[docs]def get_features(code: str, **kwargs) -> np.array: """ Get features for code. Arguments: code (str): Source code. **kwargs (dict, optional): Arguments to features.features() Returns: np.array: Feature values. """ with NamedTemporaryFile() as outfile: outfile.write(code.encode("utf-8")) outfile.seek(0) f = features.to_np_arrays([outfile.name], **kwargs) if len(f) != 1: log.error("features:", f) raise FeaturesError("code contains more than one kernel") return f[0]
[docs]def encode(kernels_db: str, encoding: str) -> None: """ Encode a kernels database. Arguments: kernels_db (str): Path to kernels database. encoding (str): Encoding type. """ def _default(kernels_db: str) -> None: pass def _static_features(kernels_db: str) -> None: log.verbose("Static feature encoding") db = dbutil.connect(kernels_db) c = db.cursor() c.execute("SELECT id,contents FROM PreprocessedFiles WHERE status=0") for row in list(c.fetchall()): id, contents = row c.execute("DELETE FROM PreprocessedFiles WHERE id=?", (id,)) for i, kernel in enumerate(clutil.get_cl_kernels(contents)): features = get_features(kernel) kid = "{}-{}".format(id, i) if len(features) == 8: log.verbose("features", kid) feature_str = ("/* {:10} {:10} {:10} {:10} {:10} {:10}" "{:10.3f} {:10.3f} */".format( int(features[0]), int(features[1]), int(features[2]), int(features[3]), int(features[4]), int(features[5]), features[6], features[7])) newsource = feature_str + '\n' + kernel c.execute(""" INSERT INTO PreprocessedFiles (id,contents,status) VALUES (?,?,?) """, (kid, newsource, 0)) else: log.verbose("ignored", kid) c.close() db.commit() # dispatch encoder based on encoding encoders = { "default": _default, "static_features": _static_features, } encoder = encoders.get(encoding, None) if encoder is None: raise clgen.UserError( "Unknown encoding type '{bad}'. Supported values: {good}".format( bad=encoding, good=", ".join(sorted(encoders.keys())))) else: encoder(kernels_db)
[docs]class Corpus(clgen.CLgenObject): """ Representation of a training corpus. """ def __init__(self, contentid: str, path: str=None, **opts): """ Instantiate a corpus. If this is a new corpus, a number of files will be created, which may take some time. Arguments: contentid (str): ID of corpus content. path (str, optional): Path to corpus. **opts: Keyword options. """ # Validate options for key in opts.keys(): if key not in DEFAULT_CORPUS_OPTS: raise clgen.UserError( "Unsupported corpus option '{}'. Valid keys: {}".format( key, ','.join(sorted(DEFAULT_CORPUS_OPTS.keys())))) self.opts = deepcopy(DEFAULT_CORPUS_OPTS) types.update(self.opts, opts) self.opts["id"] = contentid # check that contentid exists if (path is None and not fs.isdir(clgen.cachepath("contentfiles", contentid))): raise clgen.UserError("corpus {contentid} not found" .format(**vars())) self.contentid = contentid self.contentcache = clgen.mkcache("contentfiles", contentid) self.kernels_db = self.contentcache.keypath('kernels.db') self.hash = self._hash(contentid, self.opts) self.cache = clgen.mkcache("corpus", self.hash) log.debug("contentfiles {self.contentid}".format(**vars())) log.debug("corpus {hash}".format(hash=self.hash)) # validate metadata against cache self.stats = { "preprocess_time": 0 } meta = deepcopy(self.to_json()) if self.cache.get("META"): cached_meta = jsonutil.read_file(self.cache["META"]) self.stats = cached_meta["stats"] # restore stats del cached_meta["stats"] del meta["stats"] if meta != cached_meta: raise clgen.InternalError("corpus metadata mismatch") else: self._flush_meta() with self.lock.acquire(): self._create_files(path) def _flush_meta(self): jsonutil.write_file(self.cache.keypath("META"), self.to_json()) def _create_files(self, path): def _init_error(err: Exception) -> None: """ tidy up in case of error """ log.error("corpus creation failed. Deleting corpus files") paths = [ self.contentcache.keypath("kernels.db"), self.cache.keypath("corpus.txt"), self.cache.keypath("tensor.npy"), self.cache.keypath("atomizer.pkl") ] for path in paths: if fs.exists(path): log.info("removing", path) fs.rm(path) raise err try: if path is not None: if not fs.isdir(path): raise clgen.UserError( "Corpus path '{}' is not a directory".format(path)) # create kernels database if necessary try: self.contentcache["kernels.db"] except KeyError: self._create_kernels_db(path) # preprocess and encode kernel db modified = False preprocess_time = time() encoding = self.opts["encoding"] if preprocess.preprocess_db(self.contentcache["kernels.db"]): modified = True encode(self.contentcache["kernels.db"], encoding) if modified: preprocess_time = time() - preprocess_time self.stats["preprocess_time"] += preprocess_time self._flush_meta() # create corpus text if not exists try: self.cache["corpus.txt"] except KeyError: self._create_txt() assert(self.cache["corpus.txt"]) # create atomizer if needed try: self.cache["atomizer.pkl"] self._load_atomizer() except KeyError: self._create_atomizer(self.opts["vocabulary"]) assert(self.cache["atomizer.pkl"]) except Exception as e: _init_error(e) def _hash(self, contentid: str, opts: dict) -> str: """ compute corpus hash """ return crypto.sha1_list(contentid, *types.dict_values(opts)) def _create_kernels_db(self, path: str) -> None: """creates and caches kernels.db""" log.debug("creating database") # create a database and put it in the cache tmppath = self.contentcache.keypath("kernels.db.tmp") dbutil.create_db(tmppath) self.contentcache["kernels.db"] = tmppath # get a list of files in the corpus filelist = [f for f in fs.ls(path, abspaths=True, recursive=True) if fs.isfile(f)] # import files into database fetch.fetch_fs(self.contentcache["kernels.db"], filelist) def _create_txt(self) -> None: """creates and caches corpus.txt""" log.debug("creating corpus") # TODO: additional options in corpus JSON to accomodate for EOF, # different encodings etc. tmppath = self.cache.keypath("corpus.txt.tmp") dbutil.dump_db(self.contentcache["kernels.db"], tmppath) self.cache["corpus.txt"] = tmppath def _read_txt(self) -> str: with codecs.open(self.cache["corpus.txt"], encoding="utf-8") as infile: return infile.read() def _create_atomizer(self, vocab: str="char") -> None: """creates and caches atomizer.pkl""" log.debug("creating vocab file") data = self._read_txt() self.atomizer = get_atomizer(data, vocab) self.atoms = self.atomizer.atoms self.vocab_size = self.atomizer.vocab_size self.vocab = self.atomizer.vocab tmp_vocab_file = self.cache.keypath("atomizer.tmp.pkl") with open(tmp_vocab_file, 'wb') as f: cPickle.dump(self.atomizer, f) self.cache["atomizer.pkl"] = tmp_vocab_file def _load_atomizer(self) -> None: with open(self.cache["atomizer.pkl"], 'rb') as infile: self.atomizer = cPickle.load(infile) self.atoms = self.atomizer.atoms self.vocab_size = self.atomizer.vocab_size self.vocab = self.atomizer.vocab def _generate_kernel_corpus(self) -> str: """ dump all kernels into a string in a random order """ db = dbutil.connect(self.contentcache["kernels.db"]) c = db.cursor() # if preservering order, order by line count. Else, order randomly orderby = "LC(contents)" if self.opts["preserve_order"] else "RANDOM()" c.execute("SELECT PreprocessedFiles.Contents FROM PreprocessedFiles " "WHERE status=0 ORDER BY {orderby}".format(orderby=orderby)) # If file separators are requested, insert EOF markers between files sep = '\n\n// EOF\n\n' if self.opts["eof"] else '\n\n' return sep.join(row[0] for row in c.fetchall())
[docs] def create_batches(self) -> None: """ Create batches for training. """ self.reset_batch_pointer() # generate a kernel corpus data = self._generate_kernel_corpus() # encode corpus into vocab indices self._tensor = self.atomizer.atomize(data) batch_size = self.batch_size seq_length = self.seq_length # set corpus size and number of batches self._size = len(self._tensor) self._num_batches = int(self.size / (batch_size * seq_length)) if self.num_batches == 0: raise clgen.UserError( "Not enough data. Use a smaller seq_length and batch_size") # split into batches self._tensor = self._tensor[:self.num_batches * batch_size * seq_length] xdata = self._tensor ydata = np.copy(self._tensor) ydata[:-1] = xdata[1:] ydata[-1] = xdata[0] self._x_batches = np.split(xdata.reshape(batch_size, -1), self.num_batches, 1) self._y_batches = np.split(ydata.reshape(batch_size, -1), self.num_batches, 1)
@property def lock(self): lockpath = self.cache.keypath("LOCK") return lockfile.LockFile(lockpath) @property def batch_size(self) -> int: return self.opts["batch_size"] @property def seq_length(self) -> int: return self.opts["seq_length"] @property def size(self) -> int: """ Return the atomized size of the corpus. """ try: return self._size except AttributeError: self.create_batches() return self._size @property def num_batches(self) -> int: try: return self._num_batches except AttributeError: self.create_batches() return self._num_batches
[docs] def reset_batch_pointer(self) -> None: """ Resets batch pointer to first batch. """ self._pointer = 0
[docs] def next_batch(self) -> tuple: """ Fetch next batch indices. Returns: (np.array, np.array): X, Y batch tuple. """ x = self._x_batches[self._pointer] y = self._y_batches[self._pointer] self._pointer += 1 return x, y
[docs] def set_batch_pointer(self, pointer: int) -> None: """ Set batch pointer. Arguments: pointer (int): New batch pointer. """ self._pointer = pointer
[docs] def preprocessed(self, status: int=0) -> list: """ Return an iterator over all preprocessed kernels. Arguments: status (int, optional): Pre-processed status, {0, 1, 2} for {good, bad, ugly}. Returns: sequence of str: Sources. """ db = dbutil.connect(self.contentcache["kernels.db"]) c = db.cursor() query = c.execute( "SELECT Contents FROM PreprocessedFiles WHERE status={status}" .format(**vars())) for row in query.fetchall(): yield row[0]
[docs] def contentfiles(self) -> list: """ Return an iterator over all un-processed samples. Returns: sequence of str: Samples. """ db = dbutil.connect(self.contentcache["kernels.db"]) c = db.cursor() query = c.execute("SELECT Contents FROM ContentFiles") for row in query.fetchall(): yield row[0]
[docs] def most_common_prototypes(self, n: int) -> tuple: """ Return the n most frequently occuring prototypes. Arguments: c (Corpus): Corpus. n (int): Number of prototypes to return: Returns: tuple of list of tuples, int: """ from clgen import clutil prototypes = [] for kernel in self.preprocessed(): try: prototype = clutil.KernelPrototype.from_source(kernel) if prototype.is_synthesizable: prototypes.append(", ".join(str(x) for x in prototype.args)) except clutil.PrototypeException: pass # Convert frequency into ratios counter = Counter(prototypes) results = [] for row in counter.most_common(n): prototype, freq = row ratio = freq / len(prototypes) results.append((ratio, prototype)) return results, len(prototypes)
def __repr__(self) -> str: hash = self.hash nf = dbutil.num_good_kernels(self.contentcache['kernels.db']) v = self.opts["vocabulary"] nt = self.atomizer.vocab_size size = self.size return ("corpus[{hash}]: {nf} files, {size} tokens using {v} vocabulary of size {nt}" .format(**vars()))
[docs] def to_json(self) -> dict: d = deepcopy(self.opts) d["contentfiles"] = self.contentid d["stats"] = self.stats return d
def __eq__(self, rhs) -> bool: if not isinstance(rhs, Corpus): return False return rhs.hash == self.hash def __ne__(self, rhs) -> bool: return not self.__eq__(rhs) @staticmethod
[docs] def from_json(corpus_json: dict): """ Instantiate Corpus from JSON. Arguments: corpus_json (dict): Specification. Returns: Corpus: Insantiated corpus. """ path = corpus_json.pop("path", None) uid = corpus_json.pop("id", None) if path: path = unpack_directory_if_needed(fs.abspath(path)) if not fs.isdir(path): raise clgen.UserError( "Corpus path '{}' is not a directory".format(path)) uid = dirhash(path, 'sha1') elif uid: cache_path = clgen.mkcache("contentfiles", uid).path if not fs.isdir(cache_path): raise clgen.UserError("Corpus content {} not found".format(uid)) else: raise clgen.UserError("No corpus path or ID provided") if "stats" in corpus_json: # ignore stats del corpus_json["stats"] if "contentfiles" in corpus_json: del corpus_json["contentfiles"] return Corpus(uid, path=path, **corpus_json)