#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020-2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import atexit
import contextlib
import logging
import os
import shutil
import subprocess
import sys
import tempfile
import textwrap
import time
from .._C import connect
from .etcd import start_etcd
from .utils import check_socket
from .utils import find_vineyardd_path
logger = logging.getLogger('vineyard')
[docs]@contextlib.contextmanager
def start_vineyardd(
etcd_endpoints=None,
etcd_prefix=None,
vineyardd_path=None,
size='256M',
socket=None,
rpc=True,
rpc_socket_port=9600,
debug=False,
):
"""Launch a local vineyard cluster.
Parameters:
etcd_endpoint: str
Launching vineyard using specified etcd endpoints. If not specified,
vineyard will launch its own etcd instance.
etcd_prefix: str
Specify a common prefix to establish a local vineyard cluster.
vineyardd_path: str
Location of vineyard server program. If not specified, vineyard will
use its own bundled vineyardd binary.
size: int
The memory size limit for vineyard's shared memory. The memory size
can be a plain integer or as a fixed-point number using one of these
suffixes:
.. code::
E, P, T, G, M, K.
You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki.
For example, the following represent roughly the same value:
.. code::
128974848, 129k, 129M, 123Mi, 1G, 10Gi, ...
socket: str
The UNIX domain socket socket path that vineyard server will listen on.
Default is None.
When the socket parameter is None, a random path under temporary directory
will be generated and used.
rpc_socket_port: int
The port that vineyard will use to privode RPC service.
debug: bool
Whether print debug logs.
Returns:
(proc, socket):
Yields a tuple with the subprocess as the first element and the UNIX-domain
IPC socket as the second element.
"""
if not vineyardd_path:
vineyardd_path = find_vineyardd_path()
if not vineyardd_path:
raise RuntimeError('Unable to find the "vineyardd" executable')
if not socket:
socketfp = tempfile.NamedTemporaryFile(
delete=True, prefix='vineyard-', suffix='.sock'
)
socket = socketfp.name
socketfp.close()
if etcd_endpoints is None:
etcd_ctx = start_etcd()
_etcd_proc, etcd_endpoints = etcd_ctx.__enter__() # pylint: disable=no-member
else:
etcd_ctx = None
env = os.environ.copy()
if debug:
env['GLOG_v'] = 11
command = [
vineyardd_path,
'--deployment',
'local',
'--size',
str(size),
'--socket',
socket,
'--rpc' if rpc else '--norpc',
'--rpc_socket_port',
str(rpc_socket_port),
'--etcd_endpoint',
etcd_endpoints,
]
if etcd_prefix is not None:
command.extend(('--etcd_prefix', etcd_prefix))
proc = None
try:
proc = subprocess.Popen(
command,
env=env,
stdout=subprocess.PIPE,
stderr=sys.__stderr__,
universal_newlines=True,
encoding='utf-8',
)
# wait for vineyardd ready: check the rpc port and ipc sockets
rc = proc.poll()
while rc is None:
if check_socket(socket) and (
(not rpc) or check_socket(('0.0.0.0', rpc_socket_port))
):
break
time.sleep(1)
rc = proc.poll()
if rc is not None:
err = textwrap.indent(proc.stdout.read(), ' ' * 4)
raise RuntimeError(
'vineyardd exited unexpectedly '
'with code %d, error is:\n%s' % (rc, err)
)
logger.debug('vineyardd is ready.............')
yield proc, socket, etcd_endpoints
finally:
logger.debug('Local vineyardd being killed')
if proc is not None and proc.poll() is None:
proc.terminate()
proc.wait()
try:
shutil.rmtree(socket)
except Exception: # pylint: disable=broad-except
pass
if etcd_ctx is not None:
etcd_ctx.__exit__(None, None, None) # pylint: disable=no-member
__default_instance_contexts = {}
[docs]def init(num_instances=1, **kw):
"""
Launching a local vineyardd instance and get a client as easy as possible
In a clean environment, simply use:
.. code:: python
vineyard.init()
It will launch a local vineyardd and return a connected client to the
vineyardd.
It will also setup the environment variable :code:`VINEYARD_IPC_SOCKET`.
For the case to establish a local vineyard cluster consists of multiple
vineyardd instances, using the :code:`num_instances` parameter:
.. code:: python
client1, client2, client3 = vineyard.init(num_instances=3)
In this case, three vineyardd instances will be launched.
The init method can only be called once in a process, to get the established
sockets or clients later in the process, use :code:`get_current_socket` or
:code:`get_current_client` respectively.
"""
assert __default_instance_contexts == {}
if 'VINEYARD_IPC_SOCKET' in os.environ:
raise ValueError(
"VINEYARD_IPC_SOCKET has already been set: %s, which "
"means there might be a vineyard daemon already running "
"locally" % os.environ['VINEYARD_IPC_SOCKET']
)
etcd_endpoints = None
etcd_prefix = f'vineyard_init_at_{time.time()}'
for idx in range(num_instances):
ctx = start_vineyardd(
etcd_endpoints=etcd_endpoints, etcd_prefix=etcd_prefix, rpc=False, **kw
)
_, ipc_socket, etcd_endpoints = ctx.__enter__() # pylint: disable=no-member
client = connect(ipc_socket)
__default_instance_contexts[ipc_socket] = (ctx, client)
if idx == 0:
os.environ['VINEYARD_IPC_SOCKET'] = ipc_socket
return get_current_client()
[docs]def get_current_client():
"""
Get current vineyard IPC clients established by :code:`vineyard.init()`.
Raises:
ValueError if vineyard is not initialized.
"""
if not __default_instance_contexts:
raise ValueError(
'Vineyard has not been initialized, '
'use vineyard.init() to launch vineyard instances'
)
clients = [v[1] for _, v in __default_instance_contexts.items()]
return clients if len(clients) > 1 else clients[0]
[docs]def get_current_socket():
"""
Get current vineyard UNIX-domain socket established by :code:`vineyard.init()`.
Raises:
ValueError if vineyard is not initialized.
"""
if not __default_instance_contexts:
raise ValueError(
'Vineyard has not been initialized, '
'use vineyard.init() to launch vineyard instances'
)
sockets = __default_instance_contexts.keys()
return sockets if len(sockets) > 1 else sockets[0]
[docs]def shutdown():
"""
Shutdown the vineyardd instances launched by previous :code:`vineyard.init()`.
"""
global __default_instance_contexts
if __default_instance_contexts:
for ipc_socket in reversed(__default_instance_contexts.keys()):
__default_instance_contexts[ipc_socket][0].__exit__(None, None, None)
# NB. don't pop pre-existing env if we not launch
os.environ.pop('VINEYARD_IPC_SOCKET', None)
__default_instance_contexts = {}
@atexit.register
def __shutdown_handler():
try:
shutdown()
except Exception: # pylint: disable=broad-except
pass
__all__ = ['start_vineyardd']