#! /usr/bin/env python# -*- coding: utf-8 -*-## Copyright 2020-2023 Alibaba Group Holding Limited.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#''' This module exposes support for ByteStream, that use can used like:.. code:: python # create a builder, then seal it as stream >>> stream = ByteStream.new(client) >>> stream ByteStream <o0001e09ddd98fd70> # use write to put chunks >>> writer = stream.open_writer(client) >>> chunk = reader.next(1024) >>> chunk <memory at 0x136ca2ac0> >>> len(chunk) 1024 >>> chunk.readonly False >>> vineyard.memory_copy(chunk, src=b'abcde', offset=0) # mark the stream as finished >>> writer.finish() # open a reader >>> reader = stream.open_reader(client) >>> chunk = reader.next() >>> chunk <memory at 0x136d207c0> >>> len(chunk) 1234 >>> chunk.readonly True >>> bytes(chunk[:10]) b'abcde\x00\x00\x00\x00\x00' # the reader reaches the end of the stream >>> chunk = reader.next() --------------------------------------------------------------------------- StreamDrainedException Traceback (most recent call last) ~/libvineyard/python/vineyard/io/byte.py in next(self) 108 --> 109 def next(self) -> memoryview: 110 try: StreamDrainedException: Stream drain: Stream drained: no more chunks The above exception was the direct cause of the following exception: StopIteration Traceback (most recent call last) <ipython-input-11-d8809de11870> in <module> ----> 1 chunk = reader.next() ~/libvineyard/python/vineyard/io/byte.py in next(self) 109 def next(self) -> memoryview: 110 try: --> 111 return self._client.next_buffer_chunk(self._stream) 112 except StreamDrainedException as e: 113 raise StopIteration('No more chunks') from e StopIteration: No more chunks'''importjsonfromioimportBytesIOfromtypingimportDictfromvineyard._CimportObjectIDfromvineyard._CimportObjectMetafromvineyard._CimportStreamDrainedExceptionfromvineyard._Cimportmemory_copyfromvineyard.io.streamimportBaseStream
[docs]classByteStream(BaseStream):def__init__(self,meta:ObjectMeta,params:Dict=None):super().__init__(meta)self._params=params@propertydefparams(self):returnself._params@staticmethoddefnew(client,params:Dict=None,meta:ObjectMeta=None)->"ByteStream":ifmetaisNone:meta=ObjectMeta()meta['typename']='vineyard::ByteStream'ifparamsisNone:params=dict()meta['params_']=paramsmeta=client.create_metadata(meta)client.create_stream(meta.id)returnByteStream(meta,params)classReader(BaseStream.Reader):def__init__(self,client,stream:ObjectID):super().__init__(client,stream)defnext(self)->memoryview:try:returnself._client.next_buffer_chunk(self._stream)exceptStreamDrainedExceptionase:raiseStopIteration('No more chunks')fromeclassWriter(BaseStream.Writer):def__init__(self,client,stream:ObjectID):super().__init__(client,stream)self._buffer_size_limit=1024*1024*256self._buffer=BytesIO()@propertydefbuffer_size_limit(self):returnself._buffer_size_limit@buffer_size_limit.setterdefbuffer_size_limit(self,value:int):self._buffer_size_limit=valuedefnext(self,size:int)->memoryview:returnself._client.new_buffer_chunk(self._stream,size)defwrite(self,data:bytes):self._buffer.write(data)self._try_flush_buffer()def_try_flush_buffer(self,force=False):view=self._buffer.getbuffer()iflen(view)>=self._buffer_size_limitor(forceandlen(view)>0):iflen(view)>0:chunk=self.next(len(view))memory_copy(chunk,view)self._buffer=BytesIO()deffinish(self):self._try_flush_buffer(True)returnself._client.stop_stream(self._stream,False)def_open_new_reader(self,client):returnByteStream.Reader(client,self.id)def_open_new_writer(self,client):returnByteStream.Writer(client,self.id)