Source code for vineyard.io.byte

#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020-2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

''' This module exposes support for ByteStream, that use can used like:

.. code:: python

    # create a builder, then seal it as stream
    >>> stream = ByteStream.new(client)
    >>> stream
    ByteStream <o0001e09ddd98fd70>

    # use write to put chunks
    >>> writer = stream.open_writer(client)
    >>> chunk = reader.next(1024)
    >>> chunk
    <memory at 0x136ca2ac0>
    >>> len(chunk)
    1024
    >>> chunk.readonly
    False
    >>> vineyard.memory_copy(chunk, src=b'abcde', offset=0)

    # mark the stream as finished
    >>> writer.finish()

    # open a reader
    >>> reader = stream.open_reader(client)
    >>> chunk = reader.next()
    >>> chunk
    <memory at 0x136d207c0>
    >>> len(chunk)
    1234
    >>> chunk.readonly
    True
    >>> bytes(chunk[:10])
    b'abcde\x00\x00\x00\x00\x00'

    # the reader reaches the end of the stream
    >>> chunk = reader.next()
    ---------------------------------------------------------------------------
    StreamDrainedException                    Traceback (most recent call last)
    ~/libvineyard/python/vineyard/io/byte.py in next(self)
        108
    --> 109         def next(self) -> memoryview:
        110             try:

    StreamDrainedException: Stream drain: Stream drained: no more chunks

    The above exception was the direct cause of the following exception:

    StopIteration                             Traceback (most recent call last)
    <ipython-input-11-d8809de11870> in <module>
    ----> 1 chunk = reader.next()

    ~/libvineyard/python/vineyard/io/byte.py in next(self)
        109         def next(self) -> memoryview:
        110             try:
    --> 111                 return self._client.next_buffer_chunk(self._stream)
        112             except StreamDrainedException as e:
        113                 raise StopIteration('No more chunks') from e

    StopIteration: No more chunks
'''

import json
from io import BytesIO
from typing import Dict

from vineyard._C import ObjectID
from vineyard._C import ObjectMeta
from vineyard._C import StreamDrainedException
from vineyard._C import memory_copy
from vineyard.io.stream import BaseStream


[docs]class ByteStream(BaseStream): def __init__(self, meta: ObjectMeta, params: Dict = None): super().__init__(meta) self._params = params @property def params(self): return self._params @staticmethod def new(client, params: Dict = None, meta: ObjectMeta = None) -> "ByteStream": if meta is None: meta = ObjectMeta() meta['typename'] = 'vineyard::ByteStream' if params is None: params = dict() meta['params_'] = params meta = client.create_metadata(meta) client.create_stream(meta.id) return ByteStream(meta, params) class Reader(BaseStream.Reader): def __init__(self, client, stream: ObjectID): super().__init__(client, stream) def next(self) -> memoryview: try: return self._client.next_buffer_chunk(self._stream) except StreamDrainedException as e: raise StopIteration('No more chunks') from e class Writer(BaseStream.Writer): def __init__(self, client, stream: ObjectID): super().__init__(client, stream) self._buffer_size_limit = 1024 * 1024 * 256 self._buffer = BytesIO() @property def buffer_size_limit(self): return self._buffer_size_limit @buffer_size_limit.setter def buffer_size_limit(self, value: int): self._buffer_size_limit = value def next(self, size: int) -> memoryview: return self._client.new_buffer_chunk(self._stream, size) def write(self, data: bytes): self._buffer.write(data) self._try_flush_buffer() def _try_flush_buffer(self, force=False): view = self._buffer.getbuffer() if len(view) >= self._buffer_size_limit or (force and len(view) > 0): if len(view) > 0: chunk = self.next(len(view)) memory_copy(chunk, view) self._buffer = BytesIO() def finish(self): self._try_flush_buffer(True) return self._client.stop_stream(self._stream, False) def _open_new_reader(self, client): return ByteStream.Reader(client, self.id) def _open_new_writer(self, client): return ByteStream.Writer(client, self.id)
def byte_stream_resolver(obj): meta = obj.meta if 'params_' in meta: params = json.loads(meta['params_']) else: params = dict return ByteStream(obj.meta, params) def register_byte_stream_types(_builder_ctx, resolver_ctx): if resolver_ctx is not None: resolver_ctx.register('vineyard::ByteStream', byte_stream_resolver)