Source code for visioncpp.backend

from __future__ import print_function

import visioncpp as vp
from visioncpp import cache

import logging
import os
import sys
import pkgconfig

from ctypes import cdll
from pkg_resources import resource_filename
from shutil import rmtree
from subprocess import Popen, PIPE, STDOUT
from tempfile import mkdtemp


[docs]def get_host_cflags(): """ Get compilation cflags. Returns: str[]: List of compiler flags. """ return [ "-x", "c++", "-std=c++11", # Workaround for issue with libc++ "-D_GLIBCXX_USE_CXX11_ABI=0", "-I" + os.path.join(vp.computecpp_prefix, "include"), "-I" + resource_filename(__name__, os.path.join("lib", "include")), ] + pkgconfig.cflags("opencv").split()
[docs]def get_device_cflags(): """ Get integration header compilation cflags. Returns: str[]: List of compiler flags. """ proc = Popen([ os.path.join(vp.computecpp_prefix, "bin", "computecpp_info"), "--dump-device-compiler-flags" ], stdout=PIPE, stderr=PIPE) stdout, _ = proc.communicate() return stdout.decode("utf-8").split()
[docs]def get_ldflags(): """ Get link flags. Returns: str[]: List of link flags. """ libdirs = [os.path.join(vp.computecpp_prefix, "lib")] libs = ["ComputeCpp", "pthread"] return (pkgconfig.libs("opencv").split() + ["-L" + x for x in libdirs] + ["-l" + x for x in libs])
[docs]def invoke_computecpp(args, stdin=None): """ Invoke ComputeCpp compute++ compiler. Arguments: args (str[]): Arguments. stdin (str, optional): Compiler input. """ if stdin is not None: stdin = stdin.encode("utf-8") computecpp = os.path.join(vp.computecpp_prefix, "bin", "compute++") cmd = [computecpp] + args logging.info(" ".join(cmd)) process = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) stdout, stderr = process.communicate(stdin) if process.returncode != 0: print("================", file=sys.stderr) print("ComputeCpp Error", file=sys.stderr) print("================", file=sys.stderr) print(file=sys.stderr) if stdin is not None: print("=== compute++ input:", file=sys.stderr) print(file=sys.stderr) print(stdin.decode("utf-8"), file=sys.stderr, end="") print(file=sys.stderr) if stdout is not None: print("=== compute++ output:", file=sys.stderr) print(file=sys.stderr) print(stdout.decode("utf-8"), file=sys.stderr, end="") print(file=sys.stderr) if stderr is not None: print("=== compute++ error output:", file=sys.stderr) print(file=sys.stderr) print(stderr.decode("utf-8"), file=sys.stderr, end="") print(file=sys.stderr) print("=========================", file=sys.stderr) print("FATAL ERROR. TERMINATING.", file=sys.stderr) sys.exit(process.returncode)
[docs]def host_compile(code, stub, dir="/tmp"): """ Compile object file. Arguments: code (str): C++ code. stub (str): Path to integration header. dir (str, optional): Output directory. Returns: str: Path to compiled object file. """ dest = os.path.join(dir, "host.o") assert(not os.path.exists(dest)) args = get_host_cflags() + [ "-c", "-", "-fPIC", "-include", stub, "-o", dest ] invoke_computecpp(args, stdin=code) assert(os.path.exists(dest)) return dest
[docs]def stub_file(code, dir="/tmp"): """ Compile integration header. Arguments: code (str): C++ code. dir (str, optional): Output directory. Returns: str: Path to compiled integration header file. """ dest = os.path.join(dir, "stub.sycl") assert(not os.path.exists(dest)) args = get_host_cflags() + get_device_cflags() + [ "-c", "-", "-o", dest ] invoke_computecpp(args, stdin=code) assert(os.path.exists(dest)) return dest
[docs]def check_for_computecpp(): """ Check that ComputeCpp files exist. Raises: VisionCppException: If ComputeCpp file(s) are mising. """ def must_exist(path): """ Check that a file exists. Arguments: path (str): Path to file. Returns: str: Path to file (same as argument). Raises: VisionCppException: If file does not exist. """ if not os.path.exists(path): raise vp.VisionCppException( "file '{}' not found. Is ComputeCpp installed?" .format(path)) return path must_exist(os.path.join(vp.computecpp_prefix, "bin", "computecpp_info")) must_exist(os.path.join(vp.computecpp_prefix, "bin", "compute++")) must_exist(os.path.join(vp.computecpp_prefix, "lib", "libComputeCpp.so"))
[docs]def compile_cpp_code(code): """ Compile C++ code to a dynamic library. Arguments: code (str): C++ socde. Returns: str: Path to binary. """ uid = cache.get_uid(code) if cache.is_cached(uid): logging.info("Found cached binary {}".format(uid)) binary = cache.load(uid) else: check_for_computecpp() counter = {"val": 0} def progress(msg): text = "{}: {}".format(counter["val"], msg) if msg else "" if logging.getLogger().getEffectiveLevel() <= logging.INFO: end = "\n" else: end = "" print("\r\033[K {}".format(text), end=end) counter["val"] += 1 sys.stdout.flush() tmpdir = mkdtemp(prefix="visioncpp-") try: progress("compiling device code ...") stub = stub_file(code, dir=tmpdir) progress("compiling host code ...") host = host_compile(code, stub, dir=tmpdir) progress("linking executable ...") tmpbin = link(host, dir=tmpdir) progress("") binary = cache.emplace(uid, tmpbin) except Exception as e: rmtree(tmpdir) raise e rmtree(tmpdir) return binary
[docs]def run_binary(binary): """ Execute a program binary. Arguments: binary (str): Path to binary. Raises: VisionCppException: If program returns non-zero exit status. """ assert(binary and os.path.exists(binary)) lib = cdll.LoadLibrary(binary) lib.native_expression_tree()