Source code for vineyard

#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020-2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import contextlib
import logging
import os
import sys
from typing import Any
from typing import Dict
from typing import Generator
from typing import Optional
from typing import Tuple
from typing import Union

from .version import __version__

__doc__ = """
Vineyard - an in-memory immutable data manager. (Project under CNCF)
====================================================================

Vineyard (v6d) is an in-memory immutable data manager that provides
out-of-the-box high-level abstraction and zero-copy in-memory
sharing for distributed data in big data tasks, such as graph analytics
(e.g., GraphScope), numerical computing (e.g., Mars), and machine learning.
"""

# pylint: disable=import-outside-toplevel,wrong-import-position

logger = logging.getLogger('vineyard')


@contextlib.contextmanager
def envvars(
    key: Union[str, Dict[str, str]],
    value: Union[str, None] = None,
    append: bool = False,
) -> Generator[os._Environ, Any, Any]:
    """Create a context with specified environment variables set.

    It is useful for setting the :code`VINEYARD_IPC_SOCKET` environment
    variable to obtain a proper default vineyard client.

    This context macro can be used as

    .. code:: python

        with environment('KEY'):
            # env :code:`KEY` will be set to None.

        with environment('KEY', 'value'):
            # env :code:`KEY` will be set as :code:`value`.

        with environment({'KEY1': None, 'KEY2': 'value2'}):
            # env :code:`KEY1` will be set as None and :code:`KEY2` will
            # be set as :code:`value2`.
    """
    if isinstance(key, str):
        if value is None:
            items = dict()
        else:
            items: Dict[str, str] = {key: value}
    else:
        items: Dict[str, str] = key
    original_items = dict()
    for k, v in items.items():
        original_items[k] = os.environ.get(k, None)
        if append and original_items[k] is not None:
            os.environ[k] = original_items[k] + ':' + v
        else:
            os.environ[k] = v

    yield os.environ

    for k, v in original_items.items():
        if v is not None:
            os.environ[k] = v
        else:
            del os.environ[k]


def _init_global_context():
    import os as _dl_flags  # pylint: disable=reimported

    if sys.platform == 'linux':
        registry = os.path.join(
            os.path.dirname(__file__), 'libvineyard_internal_registry.so'
        )
    elif sys.platform == 'darwin':
        registry = os.path.join(
            os.path.dirname(__file__), 'libvineyard_internal_registry.dylib'
        )
    else:
        raise RuntimeError("Unsupported platform: %s" % sys.platform)

    ctx = {'__VINEYARD_INTERNAL_REGISTRY': registry}

    if os.environ.get('VINEYARD_DEVELOP', None) is None:
        with envvars(ctx):  # n.b., no append
            from . import _C
        return

    if not hasattr(_dl_flags, 'RTLD_GLOBAL') or not hasattr(_dl_flags, 'RTLD_LAZY'):
        try:
            # next try if DLFCN exists
            import DLFCN as _dl_flags  # noqa: N811
        except ImportError:
            _dl_flags = None

    if _dl_flags is not None:
        old_flags = sys.getdlopenflags()

        # import the extension module
        sys.setdlopenflags(_dl_flags.RTLD_GLOBAL | _dl_flags.RTLD_LAZY)
        with envvars(ctx):  # n.b., no append
            from . import _C  # noqa: F811
        # restore
        sys.setdlopenflags(old_flags)


_init_global_context()
del _init_global_context


from . import core
from . import csi
from . import data
from . import deploy
from . import io
from . import launcher
from . import shared_memory
from ._C import ArrowErrorException
from ._C import AssertionFailedException
from ._C import Blob
from ._C import BlobBuilder
from ._C import ConnectionErrorException
from ._C import ConnectionFailedException
from ._C import EndOfFileException
from ._C import EtcdErrorException
from ._C import InstanceStatus
from ._C import InvalidException
from ._C import InvalidStreamStateException
from ._C import IOErrorException
from ._C import IPCClient
from ._C import KeyErrorException
from ._C import MetaTreeInvalidException
from ._C import MetaTreeLinkInvalidException
from ._C import MetaTreeNameInvalidException
from ._C import MetaTreeNameNotExistsException
from ._C import MetaTreeSubtreeNotExistsException
from ._C import MetaTreeTypeInvalidException
from ._C import MetaTreeTypeNotExistsException
from ._C import NotEnoughMemoryException
from ._C import NotImplementedException
from ._C import Object
from ._C import ObjectBuilder
from ._C import ObjectExistsException
from ._C import ObjectID
from ._C import ObjectMeta
from ._C import ObjectName
from ._C import ObjectNotExistsException
from ._C import ObjectNotSealedException
from ._C import ObjectSealedException
from ._C import RemoteBlob
from ._C import RemoteBlobBuilder
from ._C import RPCClient
from ._C import StreamDrainedException
from ._C import StreamFailedException
from ._C import TypeErrorException
from ._C import UnknownErrorException
from ._C import UserInputErrorException
from ._C import VineyardServerNotReadyException
from ._C import _connect
from ._C import memory_copy
from .core import Client
from .core import builder_context
from .core import default_builder_context
from .core import default_driver_context
from .core import default_resolver_context
from .core import driver_context
from .core import resolver_context
from .core.builder import BuilderContext
from .core.resolver import ResolverContext
from .data import register_builtin_types
from .data.graph import Graph
from .deploy.local import get_current_socket
from .deploy.local import shutdown
from .deploy.local import try_init


def _init_vineyard_modules():  # noqa: C901
    """Resolve registered vineyard modules in the following order:

    * /etc/vineyard/config.py
    * {sys.prefix}/etc/vineyard/config.py
    * /usr/share/vineyard/01-xxx.py
    * /usr/local/share/vineyard/01-xxx.py
    * {sys.prefix}/share/vineyard/02-xxxx.py
    * $HOME/.vineyard/03-xxxxx.py

    Then import packages like vineyard.drivers.*:

    * vineyard.drivers.io
    """

    import glob
    import importlib.util
    import pkgutil
    import site
    import sysconfig

    def _import_module_from_file(filepath):
        filepath = os.path.expanduser(os.path.expandvars(filepath))
        if os.path.exists(filepath):
            try:
                spec = importlib.util.spec_from_file_location(
                    "vineyard._contrib", filepath
                )
                mod = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(mod)
            except Exception:  # pylint: disable=broad-except
                logger.debug("Failed to load %s", filepath, exc_info=True)

    def _import_module_from_qualified_name(module):
        try:
            importlib.import_module(module)
        except Exception:  # pylint: disable=broad-except
            logger.debug('Failed to load module %s', module, exc_info=True)

    _import_module_from_file('/etc/vineyard/config.py')
    _import_module_from_file(os.path.join(sys.prefix, '/etc/vineyard/config.py'))
    for filepath in glob.glob('/usr/share/vineyard/*-*.py'):
        _import_module_from_file(filepath)
    for filepath in glob.glob('/usr/local/share/vineyard/*-*.py'):
        _import_module_from_file(filepath)
    for filepath in glob.glob(os.path.join(sys.prefix, '/share/vineyard/*-*.py')):
        _import_module_from_file(filepath)
    for filepath in glob.glob(os.path.expanduser('$HOME/.vineyard/*-*.py')):
        _import_module_from_file(filepath)

    package_sites = set()
    pkg_sites = site.getsitepackages()
    if not isinstance(pkg_sites, (list, tuple)):
        pkg_sites = [pkg_sites]
    package_sites.update(pkg_sites)
    pkg_sites = site.getusersitepackages()
    if not isinstance(pkg_sites, (list, tuple)):
        pkg_sites = [pkg_sites]
    package_sites.update(pkg_sites)

    paths = sysconfig.get_paths()
    if 'purelib' in paths:
        package_sites.add(paths['purelib'])
    if 'platlib' in paths:
        package_sites.add(paths['purelib'])

    # add relative path
    package_sites.add(os.path.join(os.path.dirname(__file__), '..'))

    # dedup
    deduped = set()
    for pkg_site in package_sites:
        deduped.add(os.path.abspath(pkg_site))

    for pkg_site in deduped:
        for _, mod, _ in pkgutil.iter_modules(
            [os.path.join(pkg_site, 'vineyard', 'drivers')]
        ):
            _import_module_from_qualified_name('vineyard.drivers.%s' % mod)


try:
    _init_vineyard_modules()
except Exception:  # pylint: disable=broad-except
    pass
del _init_vineyard_modules


[docs]def connect(*args, **kwargs): """ Connect to vineyard by specified UNIX-domain socket or TCP endpoint. If no arguments are provided and failed to resolve both the environment variables :code:`VINEYARD_IPC_SOCKET`, :code:`VINEYARD_RPC_ENDPOINT`, :code:`VINEYARD_CONFIG`, and the default configuration file :code:`/var/run/vineyard-config.yaml` and :code:`/var/run/vineyard/vineyard-config.yaml`, it will launch a standalone vineyardd server in the background and then connect to it. The `connect()` method has various overloading: .. function:: connect(socket: str, username: str = None, password: str = None) -> IPCClient :noindex: Connect to vineyard via UNIX domain socket for IPC service: .. code:: python client = vineyard.connect('/var/run/vineyard.sock') Parameters: socket: str UNIX domain socket path to setup an IPC connection. username: str Username to login, default to None. password: str Password to login, default to None. Returns: IPCClient: The connected IPC client. .. function:: connect(host: str, port: int or str, username: str = None, password: str = None) -> RPCClient :noindex: Connect to vineyard via TCP socket. Parameters: host: str Hostname to connect to. port: int or str The TCP that listened by vineyard TCP service. username: str Username to login, default to None. password: str Password to login, default to None. Returns: RPCClient: The connected RPC client. .. function:: connect(endpoint: (str, int or str), username: str = None, password: str = None) -> RPCClient :noindex: Connect to vineyard via TCP socket. Parameters: endpoint: tuple(str, int or str) Endpoint to connect to. The parameter is a tuple, in which the first element is the host, and the second parameter is the port (can be int a str). username: str Username to login, default to None. password: str Password to login, default to None. Returns: RPCClient: The connected RPC client. .. function:: connect(username: str = None, password: str = None) -> IPCClient or RPCClient :noindex: Connect to vineyard via UNIX domain socket or TCP endpoint. This method normally usually no arguments, and will first tries to resolve IPC socket from the environment variable `VINEYARD_IPC_SOCKET` and connect to it. If it fails to establish a connection with vineyard server, the method will tries to resolve RPC endpoint from the environment variable `VINEYARD_RPC_ENDPOINT`. If both tries are failed, this method will try to resolve the configuration file that contains IPC socket and RPC endpoint from the environment variable `VINEYARD_CONFIG`, and then connect to the vineyard server with the resolved configuration. If above all are failed, this method will raise a :class:`ConnectionFailed` exception. In rare cases, user may be not sure about if the IPC socket or RPC endpoint is available, i.e., the variable might be :code:`None`. In such cases this method can accept a `None` as arguments, and do resolution as described above. Parameters: username: str Username to login, default to None. password: str Password to login, default to None. Raises: ConnectionFailed """ if ( not args and not kwargs and 'VINEYARD_IPC_SOCKET' not in os.environ and 'VINEYARD_RPC_ENDPOINT' not in os.environ and 'VINEYARD_CONFIG' not in os.environ and not any( os.path.exists(path) for path in [ "/var/run/vineyard-config.yaml", "/var/run/vineyard/vineyard-config.yaml", ] ) ): logger.info( 'No vineyard socket or endpoint is specified, ' 'try to launch a standalone one.' ) try_init() return Client(*args, **kwargs)
def put( value: Any, builder: Optional[BuilderContext] = None, persist: bool = False, name: Optional[str] = None, **kwargs, ): """ Connect the vineyard server by the following Environment Variables: VINEYARD_IPC_SOCKET: UNIX domain socket path to setup an IPC connection. E.g. /var/run/vineyard.sock VINEYARD_RPC_ENDPOINT: TCP endpoint to setup an RPC connection. E.g. 127.0.0.1:9600 VINEYARD_CONFIG: Either be a path to a YAML configuration file or a path to a directory containing the default config file `vineyard-config.yaml`. The configuration file should be like: .. code:: yaml Vineyard: IPCSocket: '/path/to/vineyard.sock' RPCEndpoint: 'hostname1:port1,hostname2:port2,...' Then put python value to vineyard. .. code:: python >>> os.environ['VINEYARD_IPC_SOCKET'] = '/var/run/vineyard.sock' >>> arr = np.arange(8) >>> arr_id = vineyard.put(arr) >>> arr_id 00002ec13bc81226 Parameters: value: The python value that will be put to vineyard. Supported python value types are decided by modules that registered to vineyard. By default, python value can be put to vineyard after serialized as a bytes buffer using pickle. builder: optional When putting python value to vineyard, an optional *builder* can be specified to tell vineyard how to construct the corresponding vineyard :class:`Object`. If not specified, the default builder context will be used to select a proper builder. persist: bool, optional If true, persist the object after creation. name: str, optional If given, the name will be automatically associated with the resulted object. Note that only take effect when the object is persisted. kw: User-specific argument that will be passed to the builder. Returns: ObjectID: The result object id will be returned. """ client = connect() return client.put(value, builder, persist, name, **kwargs) def get( object_id: Optional[ObjectID] = None, name: Optional[str] = None, resolver: Optional[ResolverContext] = None, fetch: bool = False, **kwargs, ): """ Connect the vineyard server by the following Environment Variables: VINEYARD_IPC_SOCKET: UNIX domain socket path to setup an IPC connection. E.g. /var/run/vineyard.sock VINEYARD_RPC_ENDPOINT: TCP endpoint to setup an RPC connection. E.g. 127.0.0.1:9600 VINEYARD_CONFIG: Either be a path to a YAML configuration file or a path to a directory containing the default config file `vineyard-config.yaml`. The configuration file should be like: .. code:: yaml Vineyard: IPCSocket: '/path/to/vineyard.sock' RPCEndpoint: 'hostname1:port1,hostname2:port2,...' Then get vineyard object as python value. .. code:: python >>> os.environ['VINEYARD_IPC_SOCKET'] = '/var/run/vineyard.sock' >>> arr_id = vineyard.ObjectID('00002ec13bc81226') >>> arr = vineyard.get(arr_id) >>> arr array([0, 1, 2, 3, 4, 5, 6, 7]) Parameters: object_id: ObjectID The object id that will be obtained from vineyard. name: ObjectID The object name that will be obtained from vineyard, ignored if ``object_id`` is not None. resolver: When retrieving vineyard object, an optional *resolver* can be specified. If no resolver given, the default resolver context will be used. fetch: Whether to trigger a migration when the target object is located on remote instances. kw: User-specific argument that will be passed to the builder. Returns: A python object that return by the resolver, by resolving an vineyard object. """ client = connect() return client.get(object_id, name, resolver, fetch, **kwargs)