From: George Caswell Date: Wed, 18 Jun 2025 15:37:45 +0000 (-0400) Subject: Improved support for stream control messages X-Git-Url: https://scope-eye.net/git/?a=commitdiff_plain;h=9ceb4daf282046e2757114a11a5ae88d7dc1c7e2;p=python_nu_plugin.git Improved support for stream control messages --- diff --git a/nuplugin.py b/nuplugin.py index b51241b..3207c66 100755 --- a/nuplugin.py +++ b/nuplugin.py @@ -9,7 +9,7 @@ import msgpack class stream_info: id = None dropped = False - closed = False + ended = False pending_acks = 0 class stream_manager: @@ -40,6 +40,7 @@ class server: packer = None compatible_version = '0.102.0' features = [] + producer_streams = None def __init__(self, ins, outs, errs=None, allow_localsocket=True, version="0.1", signature=[]): self.ins = ins @@ -53,6 +54,7 @@ class server: features.append({'name':'LocalSocket'}) self.message_queue = queue.PriorityQueue(32) self.version = version + self.producer_streams = stream_manager() def __send_encoding(self): self.outs.write(b'\x07msgpack') @@ -67,11 +69,12 @@ class server: def __error(self, message): print(message, file=self.errs) - def enqueue_response(self, message, priority=0): + def enqueue_message(self, message, priority=0): # Priority level guidelines: - # 0: miscellaneous traffic, stream messages, etc. + # 1: "end" messages for outgoing streams. (A stream "end" message must not be sent until after any data messages for that stream - unless the consumer dropped it first!) + # 0: miscellaneous traffic, stream data messages, etc. # -1: PipelineData/Ordering Call Responses (Call responses providing result values) - # -2: Introductory message Call Responses (metadata, signature, etc.), EngineCalls, etc. + # -2: Introductory message Call Responses (metadata, signature, etc.), EngineCalls, stream flow control messages, etc. # -3: Error messages try: self.message_queue.put((priority, message), block=True, timeout=None) @@ -80,7 +83,7 @@ class server: def send_error_response(self, call_id, error_msg): response = {'CallResponse': [call_id, {'Error': {'msg': error_msg}}]} - self.enqueue_response(response, -3) + self.enqueue_message(response, -3) def handle_custom_value_unsupported(self, call_id, value): # Custom values aren't yet supported by this module @@ -166,11 +169,11 @@ class server: def do_call_metadata(self, call_id): response = {'CallResponse': [call_id, {'Metadata': {'version': self.version}}]} - self.enqueue_response(response, -2) + self.enqueue_message(response, -2) def do_call_signature(self, call_id): response = {'CallResponse': [call_id, {'Signature': self.signature}]} - self.enqueue_response(response, -2) + self.enqueue_message(response, -2) def do_call(self, msg): [call_id, call_data] = msg['Call'] @@ -201,7 +204,65 @@ class server: self.send_error_response(call_id, error_msg) def do_drop(self, stream_id): - + "Respond to an incoming DROP directive to terminate a stream originating from this plugin" + # If stream in question hasn't been ended, end it. Either way mark it as dropped. + stream = self.producer_streams.get(stream_id) + if stream: + if stream.dropped: + self.__error('Stream already dropped') + else: + stream.dropped = True + if not stream.ended: + self.enqueue_message({'End': stream_id}, -2) + stream.ended = True + + # Stream is dropped and ended, we can stop tracking it. + self.producer_streams.remove(stream_id) + else: + # We're not tracking this stream (anymore...?) - that shouldn't happen unless it's already been ended and dropped. + self.__error('Got a drop message for an unrecognized stream ID! ({})'.format(stream_id)) + + def do_end(self, stream_id): + "Respond to an incoming END directive indicating no more data will be provided in a stream" + stream = self.consumer_streams.get(stream_id) + if stream: + if stream.ended: + self.__error('Got END message for a stream which was already ENDed') + else: + stream.ended = True + if not stream.dropped: + self.enqueue_message({'Drop': stream_id}, -2) + stream.dropped = True + + # Stream is dropped and ended, we can stop tracking it. + self.consumer_streams.remove(stream_id) + else: + # We're not tracking this stream (anymore...?) - that shouldn't happen unless it's already been ended and dropped. + self.__error('Got an END message for an unrecognized stream ID! ({})'.format(stream_id)) + + def drop_stream(self, stream_id): + "Issue a DROP message to request termination of an incoming stream" + stream = self.consumer_streams.get(stream_id) + if stream: + if stream.dropped: + self.__error('Stream {} already dropped'.format(stream_id)) + else: + self.enqueue_message({'Drop': stream_id}, -2) + stream.dropped = True + else: + self.__error('Caller requested DROP of unrecognized stream ID: ({})'.format(stream_id)) + + def end_stream(self, stream_id): + "Issue an END message for an outgoing stream to indicate no further data will be provided for it" + stream = self.producer_streams.get(stream_di) + if stream: + # It's not an error in this case if we end a stream that's already ended, because we may send an END message automatically in response to a DROP... + # ...We need a better strategy for handling drop/end. + if not stream.ended: + # Note explicit "End" messages must have lower priority than any DATA messages for the stream, or the messages will be out of order! + # ...Need a better strategy for queueing messages, too... + self.enqueue_message({'End': stream_id}, 1) + stream.ended = True def receive_loop(self): while not self.shutdown: @@ -226,6 +287,10 @@ class server: self.do_ack(msg_data) case 'Drop': self.do_drop(msg_data) + case 'End': + self.do_end(msg_data) + case 'Data': + self.do_data(msg_data) case 'EngineCallResponse': self.__error('Got engine call response: Not yet supported by this module') case 'Goodbye':