#! /usr/bin/env python# -*- coding: utf-8 -*-## Copyright 2020-2023 Alibaba Group Holding Limited.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#''' This module exposes support for DataframeStream, that use can used like:.. code:: python # create a builder, then seal it as stream >>> stream = DataframeStream.new(client) >>> stream = builder.seal(client) >>> stream DataframeStream <o0001e09ddd98fd70> # use write to put chunks >>> writer = stream.open_writer(client) >>> writer.write_table( pa.Table.from_pandas( pd.DataFrame({"x": [1,2,3], "y": [4,5,6]}))) # mark the stream as finished >>> writer.finish() # open a reader >>> reader = stream.open_reader(client) >>> batch = reader.next() >>> batch pyarrow.RecordBatch x: int64 y: int64 # the reader reaches the end of the stream >>> batch = reader.next() --------------------------------------------------------------------------- StreamDrainedException Traceback (most recent call last) ~/libvineyard/python/vineyard/io/dataframe.py in next(self) 97 try: ---> 98 buffer = self._client.next_buffer_chunk(self._stream) 99 with pa.ipc.open_stream(buffer) as reader: StreamDrainedException: Stream drain: Stream drained: no more chunks The above exception was the direct cause of the following exception: StopIteration Traceback (most recent call last) <ipython-input-11-10f09bf65f8a> in <module> ----> 1 batch = reader.next() ~/libvineyard/python/vineyard/io/dataframe.py in next(self) 100 return reader.read_next_batch() 101 except StreamDrainedException as e: --> 102 raise StopIteration('No more chunks') from e 103 104 def __str__(self) -> str: StopIteration: No more chunks'''importcontextlibimportjsonfromioimportBytesIOfromtypingimportDictimportpyarrowaspaimportpyarrow.ipc# pylint: disable=unused-importfromvineyard._CimportObjectIDfromvineyard._CimportObjectMetafromvineyard._CimportStreamDrainedExceptionfromvineyard._Cimportmemory_copyfromvineyard.io.streamimportBaseStream
[docs]classDataframeStream(BaseStream):def__init__(self,meta:ObjectMeta,params:Dict=None):super().__init__(meta)self._params=params@propertydefparams(self):returnself._params@staticmethoddefnew(client,params:Dict=None,meta:ObjectMeta=None)->"DataframeStream":ifmetaisNone:meta=ObjectMeta()meta['typename']='vineyard::DataframeStream'ifparamsisNone:params=dict()meta['params_']=paramsmeta=client.create_metadata(meta)client.create_stream(meta.id)returnDataframeStream(meta,params)classReader(BaseStream.Reader):def__init__(self,client,stream:ObjectID):super().__init__(client,stream)defnext(self)->pa.RecordBatch:try:buffer=self._client.next_buffer_chunk(self._stream)withpa.ipc.open_stream(buffer)asreader:returnreader.read_next_batch()exceptStreamDrainedExceptionase:raiseStopIteration('No more chunks')fromedefread_table(self)->pa.Table:batches=[]whileTrue:try:batches.append(self.next())exceptStopIteration:breakreturnpa.Table.from_batches(batches)classWriter(BaseStream.Writer):def__init__(self,client,stream:ObjectID):super().__init__(client,stream)self._buffer=BytesIO()defnext(self,size:int)->memoryview:returnself._client.new_buffer_chunk(self._stream,size)defwrite(self,batch:pa.RecordBatch):sink=BytesIO()withpa.ipc.new_stream(sink,batch.schema)aswriter:writer.write(batch)view=sink.getbuffer()iflen(view)>0:buffer=self.next(len(view))memory_copy(buffer,view)defwrite_table(self,table:pa.Table):forbatchintable.to_batches():self.write(batch)deffinish(self):returnself._client.stop_stream(self._stream,False)def_open_new_reader(self,client):returnDataframeStream.Reader(client,self.id)def_open_new_writer(self,client):returnDataframeStream.Writer(client,self.id)