#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020-2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import logging
import traceback
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from urllib.parse import urlparse
from vineyard._C import ObjectID
from vineyard._C import ObjectMeta
from vineyard._C import StreamDrainedException
from vineyard.core.driver import registerize
from vineyard.core.resolver import resolver_context
logger = logging.getLogger('vineyard')
[docs]@registerize
def read(
path,
*args,
handlers=None,
accumulate=False,
chunk_hook: Optional[Callable] = None,
**kwargs
):
"""Open a path and read it as a single stream.
Parameters
----------
path: str
Path to read, the last reader registered for the scheme of
the path will be used.
handlers: list, optional
If handlers is not None, launched worker processes will be
emplaced into the list for further customized job lifecycle
management. Default is None.
accumulate: bool, optional
If :code:`accumulate` is True, it will return a data frame,
rather than dataframe stream. Default is False.
chunk_hook: callable, optional
If the read/write target is a global dataframe (e.g., csv,
orc, parquet, etc.), the hook will be called for each chunk
to be read or write (usually a :code:`pyarrow.RecordBatch`).
The hook should return a :code:`pyarrow.RecordBatch` object
and should be stateless as the invoke order is not guaranteed.
E.g.,
.. code:: python
def exchange_column(batch):
import pyarrow as pa
columns = batch.columns
first = columns[0]
second = columns[1]
columns = [second, first] + columns[2:]
return pa.RecordBatch.from_arrays(columns, schema=batch.schema)
vineyard_ipc_socket: str
The local or remote vineyard's IPC socket location that the
remote readers will use to establish connections with the
vineyard server.
vineyard_endpoint: str, optional
An optional address of vineyard's RPC socket, which will be
used for retrieving server's information on the client side.
If not provided, the `vineyard_ipc_socket` will be used, or
it will tries to discovery vineyard's IPC or RPC endpoints
from environment variables.
"""
parsed = urlparse(path)
if read._factory and read._factory.get(parsed.scheme):
errors = []
for reader in read._factory[parsed.scheme][::-1]:
try:
proc_kwargs = kwargs.copy()
r = reader(
path,
proc_kwargs.pop('vineyard_ipc_socket'),
*args,
handlers=handlers,
accumulate=accumulate,
chunk_hook=chunk_hook,
**proc_kwargs
)
if r is not None:
return r
except Exception: # pylint: disable=broad-except
errors.append('%s: %s' % (reader.__name__, traceback.format_exc()))
raise RuntimeError(
'Unable to find a proper IO driver for %s, potential causes are:\n %s'
% (path, '\n'.join(errors))
)
else:
raise ValueError("No IO driver registered for %s" % path)
[docs]@registerize
def write(
path, stream, *args, handlers=None, chunk_hook: Optional[Callable] = None, **kwargs
):
"""Write the stream to a given path.
Parameters
----------
path: str
Path to write, the last writer registered for the scheme of the path
will be used.
stream: vineyard stream
Stream that produces the data to write.
handlers: list, optional
If handlers is not None, launched worker processes will be
emplaced into the list for further customized job lifecycle
management. Default is None.
chunk_hook: callable, optional
If the read/write target is a global dataframe (e.g., csv,
orc, parquet, etc.), the hook will be called for each chunk
to be read or write (usually a :code:`pyarrow.RecordBatch`).
The hook should return a :code:`pyarrow.RecordBatch` object
and should be stateless as the invoke order is not guaranteed.
E.g.,
.. code:: python
def exchange_column(batch):
import pyarrow as pa
columns = batch.columns
first = columns[0]
second = columns[1]
columns = [second, first] + columns[2:]
return pa.RecordBatch.from_arrays(columns, schema=batch.schema)
vineyard_ipc_socket: str
The local or remote vineyard's IPC socket location that the remote
readers will use to establish connections with the vineyard server.
vineyard_endpoint: str, optional
An optional address of vineyard's RPC socket, which will be used for
retrieving server's information on the client side. If not provided,
the `vineyard_ipc_socket` will be used, or it will tries to discovery
vineyard's IPC or RPC endpoints from environment variables.
"""
parsed = urlparse(path)
if write._factory and write._factory.get(parsed.scheme):
errors = []
for writer in write._factory[parsed.scheme][::-1]:
try:
proc_kwargs = kwargs.copy()
writer(
path,
stream,
proc_kwargs.pop('vineyard_ipc_socket'),
*args,
handlers=handlers,
chunk_hook=chunk_hook,
**proc_kwargs
)
except Exception: # pylint: disable=broad-except
exc = traceback.format_exc()
errors.append('%s: %s' % (writer.__name__, exc))
if 'StreamFailedException: Stream failed' in exc:
# if the source stream has already failed, we should
# fail immediately and don't try other drivers.
break
continue
else:
return
raise RuntimeError(
'Unable to find a proper IO driver for %s, potential causes are:\n %s'
% (path, '\n'.join(errors))
)
else:
raise ValueError("No IO driver registered for %s" % path)
[docs]def open(
path,
*args,
mode='r',
handlers=None,
chunk_hook: Optional[Callable] = None,
**kwargs
):
"""Open a path as a reader or writer, depends on the parameter :code:`mode`.
If :code:`mode` is :code:`r`, it will open a stream for read, and open a
stream for write when :code:`mode` is :code:`w`.
Parameters
----------
path: str
Path to open.
mode: char
Mode about how to open the path, :code:`r` is for read and :code:`w` for write.
handlers:
A dict that will be filled with a :code:`handler` that contains the process
handler of the underlying read/write process that can be joined using
:code:`join` to capture the possible errors during the I/O proceeding.
chunk_hook: callable, optional
If the read/write target is a global dataframe (e.g., csv,
orc, parquet, etc.), the hook will be called for each chunk
to be read or write (usually a :code:`pyarrow.RecordBatch`).
The hook should return a :code:`pyarrow.RecordBatch` object
and should be stateless as the invoke order is not guaranteed.
E.g.,
.. code:: python
def exchange_column(batch):
import pyarrow as pa
columns = batch.columns
first = columns[0]
second = columns[1]
columns = [second, first] + columns[2:]
return pa.RecordBatch.from_arrays(columns, schema=batch.schema)
vineyard_ipc_socket: str
Vineyard's IPC socket location.
vineyard_endpoint: str
Vineyard's RPC socket address.
See Also
--------
vineyard.io.read
vineyard.io.write
"""
parsed = urlparse(path)
if not parsed.scheme:
path = 'file://' + path
if mode == 'r':
return read(path, *args, handlers=handlers, chunk_hook=chunk_hook, **kwargs)
if mode == 'w':
return write(path, *args, handlers=handlers, chunk_hook=chunk_hook, **kwargs)
raise RuntimeError('Opening %s with mode %s is not supported' % (path, mode))
class BaseStream:
class Reader:
def __init__(self, client, stream: ObjectID, resolver=None):
self._client = client
self._stream = stream
self._resolver = resolver
self._client.open_stream(stream, 'r')
def next(self) -> object:
try:
chunk = self._client.next_chunk(self._stream)
except StreamDrainedException as e:
raise StopIteration('No more chunks') from e
if self._resolver is not None:
return self._resolver(chunk)
else:
with resolver_context() as ctx:
return ctx.run(chunk)
def next_metadata(self) -> ObjectMeta:
try:
return self._client.next_chunk_meta(self._stream)
except StreamDrainedException as e:
raise StopIteration('No more chunks') from e
def __str__(self) -> str:
return repr(self)
def __repr__(self) -> str:
return '%s of Stream <%r>' % (self.__class__, self._stream)
class Writer:
def __init__(self, client, stream: ObjectID):
self._client = client
self._stream = stream
self._client.open_stream(stream, 'w')
def next(self, size: int) -> memoryview:
return self._client.new_buffer_chunk(self._stream, size)
def append(self, chunk: ObjectID):
return self._client.push_chunk(self._stream, chunk)
def fail(self):
return self._client.stop_stream(self._stream, True)
def finish(self):
return self._client.stop_stream(self._stream, False)
def __str__(self) -> str:
return repr(self)
def __repr__(self) -> str:
return '%s of Stream <%r>' % (self.__class__, self._stream)
def __init__(self, meta: ObjectMeta, resolver=None):
self._meta = meta
self._stream = meta.id
self._resolver = resolver
self._reader = None
self._writer = None
@property
def id(self) -> ObjectID:
return self._stream
@property
def meta(self) -> ObjectMeta:
return self._meta
@property
def reader(self) -> "BaseStream.Reader":
return self.open_reader()
def __str__(self) -> str:
return repr(self)
def __repr__(self) -> str:
return '%s <%r>' % (self.__class__.__name__, self._stream)
def _open_new_reader(self, client) -> "BaseStream.Reader":
'''Always open a new reader.'''
return BaseStream.Reader(client, self.id, self._resolver)
def open_reader(self, client=None) -> "BaseStream.Reader":
if self._reader is None:
if client is None:
client = self._meta._client
self._reader = self._open_new_reader(client)
return self._reader
@property
def writer(self) -> "BaseStream.Writer":
return self.open_writer()
def _open_new_writer(self, client) -> "BaseStream.Writer":
return BaseStream.Writer(client, self.id)
def open_writer(self, client=None) -> "BaseStream.Writer":
if self._writer is None:
if client is None:
client = self._meta._client
self._writer = self._open_new_writer(client)
return self._writer
def drop(self, client=None):
if client is None:
client = self._meta._client
if hasattr(client, 'drop_stream'):
client.drop_stream(self.id)
class StreamCollection:
"""A stream collection is a set of stream, where each element is a stream, or,
another stream collection.
"""
KEY_OF_STREAMS = '__streams'
KEY_OF_PATH = '__path'
KEY_OF_GLOBAL = '__global'
KEY_OF_OPTIONS = '__options'
def __init__(self, meta: ObjectMeta, streams: List[ObjectID]):
self._meta = meta
self._streams = streams
if StreamCollection.KEY_OF_GLOBAL in self._meta:
self._global = self._meta[StreamCollection.KEY_OF_GLOBAL]
else:
self._global = False
@staticmethod
def new(
client, metadata: Dict, streams: List[ObjectID], meta: ObjectMeta = None
) -> "StreamCollection":
if meta is None:
meta = ObjectMeta()
meta['typename'] = 'vineyard::StreamCollection'
for k, v in metadata.items():
if k not in [
'id',
'signature',
'instance_id',
'transient',
'global',
'typename',
]:
meta[k] = v
meta[StreamCollection.KEY_OF_STREAMS] = [int(s) for s in streams]
meta = client.create_metadata(meta)
return StreamCollection(meta, streams)
@property
def id(self):
return self.meta.id
@property
def meta(self):
return self._meta
@property
def isglobal(self):
return self._global
@property
def streams(self):
return self._streams
def __repr__(self) -> str:
return "StreamCollection: %s [%s]" % (
repr(self.id),
[repr(s) for s in self.streams],
)
def __str__(self) -> str:
return repr(self)
def stream_collection_resolver(obj):
meta = obj.meta
streams = json.loads(meta[StreamCollection.KEY_OF_STREAMS])
return StreamCollection(meta, [ObjectID(s) for s in streams])
def register_stream_collection_types(_builder_ctx, resolver_ctx):
if resolver_ctx is not None:
resolver_ctx.register('vineyard::StreamCollection', stream_collection_resolver)
__all__ = [
'open',
'read',
'write',
'BaseStream',
'StreamCollection',
'register_stream_collection_types',
]