#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020-2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import logging
import traceback
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from urllib.parse import urlparse
from vineyard._C import ObjectID
from vineyard._C import ObjectMeta
from vineyard._C import StreamDrainedException
from vineyard.core.driver import registerize
from vineyard.core.resolver import resolver_context
logger = logging.getLogger('vineyard')
[docs]
@registerize
def read(
    path,
    *args,
    handlers=None,
    accumulate=False,
    chunk_hook: Optional[Callable] = None,
    **kwargs
):
    """Open a path and read it as a single stream.
    Parameters
    ----------
    path: str
        Path to read, the last reader registered for the scheme of
        the path will be used.
    handlers: list, optional
        If handlers is not None, launched worker processes will be
        emplaced into the list for further customized job lifecycle
        management. Default is None.
    accumulate: bool, optional
        If :code:`accumulate` is True, it will return a data frame,
        rather than dataframe stream. Default is False.
    chunk_hook: callable, optional
        If the read/write target is a global dataframe (e.g., csv,
        orc, parquet, etc.), the hook will be called for each chunk
        to be read or write (usually a :code:`pyarrow.RecordBatch`).
        The hook should return a :code:`pyarrow.RecordBatch` object
        and should be stateless as the invoke order is not guaranteed.
        E.g.,
        .. code:: python
            def exchange_column(batch):
                import pyarrow as pa
                columns = batch.columns
                first = columns[0]
                second = columns[1]
                columns = [second, first] + columns[2:]
                return pa.RecordBatch.from_arrays(columns, schema=batch.schema)
    vineyard_ipc_socket: str
        The local or remote vineyard's IPC socket location that the
        remote readers will use to establish connections with the
        vineyard server.
    vineyard_endpoint: str, optional
        An optional address of vineyard's RPC socket, which will be
        used for retrieving server's information on the client side.
        If not provided, the `vineyard_ipc_socket` will be used, or
        it will tries to discovery vineyard's IPC or RPC endpoints
        from environment variables.
    """
    parsed = urlparse(path)
    if read._factory and read._factory.get(parsed.scheme):
        errors = []
        for reader in read._factory[parsed.scheme][::-1]:
            try:
                proc_kwargs = kwargs.copy()
                r = reader(
                    path,
                    proc_kwargs.pop('vineyard_ipc_socket'),
                    *args,
                    handlers=handlers,
                    accumulate=accumulate,
                    chunk_hook=chunk_hook,
                    **proc_kwargs
                )
                if r is not None:
                    return r
            except Exception:  # pylint: disable=broad-except
                errors.append('%s: %s' % (reader.__name__, traceback.format_exc()))
        raise RuntimeError(
            'Unable to find a proper IO driver for %s, potential causes are:\n %s'
            % (path, '\n'.join(errors))
        )
    else:
        raise ValueError("No IO driver registered for %s" % path) 
[docs]
@registerize
def write(
    path, stream, *args, handlers=None, chunk_hook: Optional[Callable] = None, **kwargs
):
    """Write the stream to a given path.
    Parameters
    ----------
    path: str
        Path to write, the last writer registered for the scheme of the path
        will be used.
    stream: vineyard stream
        Stream that produces the data to write.
    handlers: list, optional
        If handlers is not None, launched worker processes will be
        emplaced into the list for further customized job lifecycle
        management. Default is None.
    chunk_hook: callable, optional
        If the read/write target is a global dataframe (e.g., csv,
        orc, parquet, etc.), the hook will be called for each chunk
        to be read or write (usually a :code:`pyarrow.RecordBatch`).
        The hook should return a :code:`pyarrow.RecordBatch` object
        and should be stateless as the invoke order is not guaranteed.
        E.g.,
        .. code:: python
            def exchange_column(batch):
                import pyarrow as pa
                columns = batch.columns
                first = columns[0]
                second = columns[1]
                columns = [second, first] + columns[2:]
                return pa.RecordBatch.from_arrays(columns, schema=batch.schema)
    vineyard_ipc_socket: str
        The local or remote vineyard's IPC socket location that the remote
        readers will use to establish connections with the vineyard server.
    vineyard_endpoint: str, optional
        An optional address of vineyard's RPC socket, which will be used for
        retrieving server's information on the client side. If not provided,
        the `vineyard_ipc_socket` will be used, or it will tries to discovery
        vineyard's IPC or RPC endpoints from environment variables.
    """
    parsed = urlparse(path)
    if write._factory and write._factory.get(parsed.scheme):
        errors = []
        for writer in write._factory[parsed.scheme][::-1]:
            try:
                proc_kwargs = kwargs.copy()
                writer(
                    path,
                    stream,
                    proc_kwargs.pop('vineyard_ipc_socket'),
                    *args,
                    handlers=handlers,
                    chunk_hook=chunk_hook,
                    **proc_kwargs
                )
            except Exception:  # pylint: disable=broad-except
                exc = traceback.format_exc()
                errors.append('%s: %s' % (writer.__name__, exc))
                if 'StreamFailedException: Stream failed' in exc:
                    # if the source stream has already failed, we should
                    # fail immediately and don't try other drivers.
                    break
                continue
            else:
                return
        raise RuntimeError(
            'Unable to find a proper IO driver for %s, potential causes are:\n %s'
            % (path, '\n'.join(errors))
        )
    else:
        raise ValueError("No IO driver registered for %s" % path) 
[docs]
def open(
    path,
    *args,
    mode='r',
    handlers=None,
    chunk_hook: Optional[Callable] = None,
    **kwargs
):
    """Open a path as a reader or writer, depends on the parameter :code:`mode`.
    If :code:`mode` is :code:`r`, it will open a stream for read, and open a
    stream for write when :code:`mode` is :code:`w`.
    Parameters
    ----------
    path: str
        Path to open.
    mode: char
        Mode about how to open the path, :code:`r` is for read and :code:`w` for write.
    handlers:
        A dict that will be filled with a :code:`handler` that contains the process
        handler of the underlying read/write process that can be joined using
        :code:`join` to capture the possible errors during the I/O proceeding.
    chunk_hook: callable, optional
        If the read/write target is a global dataframe (e.g., csv,
        orc, parquet, etc.), the hook will be called for each chunk
        to be read or write (usually a :code:`pyarrow.RecordBatch`).
        The hook should return a :code:`pyarrow.RecordBatch` object
        and should be stateless as the invoke order is not guaranteed.
        E.g.,
        .. code:: python
            def exchange_column(batch):
                import pyarrow as pa
                columns = batch.columns
                first = columns[0]
                second = columns[1]
                columns = [second, first] + columns[2:]
                return pa.RecordBatch.from_arrays(columns, schema=batch.schema)
    vineyard_ipc_socket: str
        Vineyard's IPC socket location.
    vineyard_endpoint: str
        Vineyard's RPC socket address.
    See Also
    --------
    vineyard.io.read
    vineyard.io.write
    """
    parsed = urlparse(path)
    if not parsed.scheme:
        path = 'file://' + path
    if mode == 'r':
        return read(path, *args, handlers=handlers, chunk_hook=chunk_hook, **kwargs)
    if mode == 'w':
        return write(path, *args, handlers=handlers, chunk_hook=chunk_hook, **kwargs)
    raise RuntimeError('Opening %s with mode %s is not supported' % (path, mode)) 
class BaseStream:
    class Reader:
        def __init__(self, client, stream: ObjectID, resolver=None):
            self._client = client
            self._stream = stream
            self._resolver = resolver
            self._client.open_stream(stream, 'r')
        def next(self) -> object:
            try:
                chunk = self._client.next_chunk(self._stream)
            except StreamDrainedException as e:
                raise StopIteration('No more chunks') from e
            if self._resolver is not None:
                return self._resolver(chunk)
            else:
                with resolver_context() as ctx:
                    return ctx.run(chunk)
        def next_metadata(self) -> ObjectMeta:
            try:
                return self._client.next_chunk_meta(self._stream)
            except StreamDrainedException as e:
                raise StopIteration('No more chunks') from e
        def __str__(self) -> str:
            return repr(self)
        def __repr__(self) -> str:
            return '%s of Stream <%r>' % (self.__class__, self._stream)
    class Writer:
        def __init__(self, client, stream: ObjectID):
            self._client = client
            self._stream = stream
            self._client.open_stream(stream, 'w')
        def next(self, size: int) -> memoryview:
            return self._client.new_buffer_chunk(self._stream, size)
        def append(self, chunk: ObjectID):
            return self._client.push_chunk(self._stream, chunk)
        def fail(self):
            return self._client.stop_stream(self._stream, True)
        def finish(self):
            return self._client.stop_stream(self._stream, False)
        def __str__(self) -> str:
            return repr(self)
        def __repr__(self) -> str:
            return '%s of Stream <%r>' % (self.__class__, self._stream)
    def __init__(self, meta: ObjectMeta, resolver=None):
        self._meta = meta
        self._stream = meta.id
        self._resolver = resolver
        self._reader = None
        self._writer = None
    @property
    def id(self) -> ObjectID:
        return self._stream
    @property
    def meta(self) -> ObjectMeta:
        return self._meta
    @property
    def reader(self) -> "BaseStream.Reader":
        return self.open_reader()
    def __str__(self) -> str:
        return repr(self)
    def __repr__(self) -> str:
        return '%s <%r>' % (self.__class__.__name__, self._stream)
    def _open_new_reader(self, client) -> "BaseStream.Reader":
        '''Always open a new reader.'''
        return BaseStream.Reader(client, self.id, self._resolver)
    def open_reader(self, client=None) -> "BaseStream.Reader":
        if self._reader is None:
            if client is None:
                client = self._meta._client
            self._reader = self._open_new_reader(client)
        return self._reader
    @property
    def writer(self) -> "BaseStream.Writer":
        return self.open_writer()
    def _open_new_writer(self, client) -> "BaseStream.Writer":
        return BaseStream.Writer(client, self.id)
    def open_writer(self, client=None) -> "BaseStream.Writer":
        if self._writer is None:
            if client is None:
                client = self._meta._client
            self._writer = self._open_new_writer(client)
        return self._writer
    def drop(self, client=None):
        if client is None:
            client = self._meta._client
        if hasattr(client, 'drop_stream'):
            client.drop_stream(self.id)
class StreamCollection:
    """A stream collection is a set of stream, where each element is a stream, or,
    another stream collection.
    """
    KEY_OF_STREAMS = '__streams'
    KEY_OF_PATH = '__path'
    KEY_OF_GLOBAL = '__global'
    KEY_OF_OPTIONS = '__options'
    def __init__(self, meta: ObjectMeta, streams: List[ObjectID]):
        self._meta = meta
        self._streams = streams
        if StreamCollection.KEY_OF_GLOBAL in self._meta:
            self._global = self._meta[StreamCollection.KEY_OF_GLOBAL]
        else:
            self._global = False
    @staticmethod
    def new(
        client, metadata: Dict, streams: List[ObjectID], meta: ObjectMeta = None
    ) -> "StreamCollection":
        if meta is None:
            meta = ObjectMeta()
        meta['typename'] = 'vineyard::StreamCollection'
        for k, v in metadata.items():
            if k not in [
                'id',
                'signature',
                'instance_id',
                'transient',
                'global',
                'typename',
            ]:
                meta[k] = v
        meta[StreamCollection.KEY_OF_STREAMS] = [int(s) for s in streams]
        meta = client.create_metadata(meta)
        return StreamCollection(meta, streams)
    @property
    def id(self):
        return self.meta.id
    @property
    def meta(self):
        return self._meta
    @property
    def isglobal(self):
        return self._global
    @property
    def streams(self):
        return self._streams
    def __repr__(self) -> str:
        return "StreamCollection: %s [%s]" % (
            repr(self.id),
            [repr(s) for s in self.streams],
        )
    def __str__(self) -> str:
        return repr(self)
def stream_collection_resolver(obj):
    meta = obj.meta
    streams = json.loads(meta[StreamCollection.KEY_OF_STREAMS])
    return StreamCollection(meta, [ObjectID(s) for s in streams])
def register_stream_collection_types(_builder_ctx, resolver_ctx):
    if resolver_ctx is not None:
        resolver_ctx.register('vineyard::StreamCollection', stream_collection_resolver)
__all__ = [
    'open',
    'read',
    'write',
    'BaseStream',
    'StreamCollection',
    'register_stream_collection_types',
]