Improved support for stream control messages master
authorGeorge Caswell <gcaswell@translations.com>
Wed, 18 Jun 2025 15:37:45 +0000 (11:37 -0400)
committerGeorge Caswell <gcaswell@translations.com>
Wed, 18 Jun 2025 15:37:45 +0000 (11:37 -0400)
nuplugin.py

index b51241bbede87ddcb33b24e83f3d8083b65df13c..3207c666b14de041a6db9e1262150d8e30bf5a4a 100755 (executable)
@@ -9,7 +9,7 @@ import msgpack
 class stream_info:
     id = None
     dropped = False
-    closed = False
+    ended = False
     pending_acks = 0
 
 class stream_manager:
@@ -40,6 +40,7 @@ class server:
     packer = None
     compatible_version = '0.102.0'
     features = []
+    producer_streams = None
 
     def __init__(self, ins, outs, errs=None, allow_localsocket=True, version="0.1", signature=[]):
         self.ins = ins
@@ -53,6 +54,7 @@ class server:
             features.append({'name':'LocalSocket'})
         self.message_queue = queue.PriorityQueue(32)
         self.version = version
+        self.producer_streams = stream_manager()
 
     def __send_encoding(self):
         self.outs.write(b'\x07msgpack')
@@ -67,11 +69,12 @@ class server:
     def __error(self, message):
         print(message, file=self.errs)
 
-    def enqueue_response(self, message, priority=0):
+    def enqueue_message(self, message, priority=0):
         # Priority level guidelines:
-        # 0: miscellaneous traffic, stream messages, etc.
+        # 1: "end" messages for outgoing streams. (A stream "end" message must not be sent until after any data messages for that stream - unless the consumer dropped it first!)
+        # 0: miscellaneous traffic, stream data messages, etc.
         # -1: PipelineData/Ordering Call Responses (Call responses providing result values)
-        # -2: Introductory message Call Responses (metadata, signature, etc.), EngineCalls, etc.
+        # -2: Introductory message Call Responses (metadata, signature, etc.), EngineCalls, stream flow control messages, etc.
         # -3: Error messages
         try:
             self.message_queue.put((priority, message), block=True, timeout=None)
@@ -80,7 +83,7 @@ class server:
 
     def send_error_response(self, call_id, error_msg):
         response = {'CallResponse': [call_id, {'Error': {'msg': error_msg}}]}
-        self.enqueue_response(response, -3)
+        self.enqueue_message(response, -3)
 
     def handle_custom_value_unsupported(self, call_id, value):
         # Custom values aren't yet supported by this module
@@ -166,11 +169,11 @@ class server:
 
     def do_call_metadata(self, call_id):
         response = {'CallResponse': [call_id, {'Metadata': {'version': self.version}}]}
-        self.enqueue_response(response, -2)
+        self.enqueue_message(response, -2)
 
     def do_call_signature(self, call_id):
         response = {'CallResponse': [call_id, {'Signature': self.signature}]}
-        self.enqueue_response(response, -2)
+        self.enqueue_message(response, -2)
 
     def do_call(self, msg):
         [call_id, call_data] = msg['Call']
@@ -201,7 +204,65 @@ class server:
                 self.send_error_response(call_id, error_msg)
 
     def do_drop(self, stream_id):
-                
+        "Respond to an incoming DROP directive to terminate a stream originating from this plugin"
+        # If stream in question hasn't been ended, end it. Either way mark it as dropped.
+        stream = self.producer_streams.get(stream_id)
+        if stream:
+            if stream.dropped:
+                self.__error('Stream already dropped')
+            else:
+                stream.dropped = True
+            if not stream.ended:
+                self.enqueue_message({'End': stream_id}, -2)
+                stream.ended = True
+
+            # Stream is dropped and ended, we can stop tracking it.
+            self.producer_streams.remove(stream_id)
+        else:
+            # We're not tracking this stream (anymore...?) - that shouldn't happen unless it's already been ended and dropped.
+            self.__error('Got a drop message for an unrecognized stream ID! ({})'.format(stream_id))
+
+    def do_end(self, stream_id):
+        "Respond to an incoming END directive indicating no more data will be provided in a stream"
+        stream = self.consumer_streams.get(stream_id)
+        if stream:
+            if stream.ended:
+                self.__error('Got END message for a stream which was already ENDed')
+            else:
+                stream.ended = True
+            if not stream.dropped:
+                self.enqueue_message({'Drop': stream_id}, -2)
+                stream.dropped = True
+
+            # Stream is dropped and ended, we can stop tracking it.
+            self.consumer_streams.remove(stream_id)
+        else:
+            # We're not tracking this stream (anymore...?) - that shouldn't happen unless it's already been ended and dropped.
+            self.__error('Got an END message for an unrecognized stream ID! ({})'.format(stream_id))
+
+    def drop_stream(self, stream_id):
+        "Issue a DROP message to request termination of an incoming stream"
+        stream = self.consumer_streams.get(stream_id)
+        if stream:
+            if stream.dropped:
+                self.__error('Stream {} already dropped'.format(stream_id))
+            else:
+                self.enqueue_message({'Drop': stream_id}, -2)
+                stream.dropped = True
+        else:
+            self.__error('Caller requested DROP of unrecognized stream ID: ({})'.format(stream_id))
+
+    def end_stream(self, stream_id):
+        "Issue an END message for an outgoing stream to indicate no further data will be provided for it"
+        stream = self.producer_streams.get(stream_di)
+        if stream:
+            # It's not an error in this case if we end a stream that's already ended, because we may send an END message automatically in response to a DROP...
+            # ...We need a better strategy for handling drop/end.
+            if not stream.ended:
+                # Note explicit "End" messages must have lower priority than any DATA messages for the stream, or the messages will be out of order!
+                # ...Need a better strategy for queueing messages, too...
+                self.enqueue_message({'End': stream_id}, 1)
+                stream.ended = True
 
     def receive_loop(self):
         while not self.shutdown:
@@ -226,6 +287,10 @@ class server:
                         self.do_ack(msg_data)
                     case 'Drop':
                         self.do_drop(msg_data)
+                    case 'End':
+                        self.do_end(msg_data)
+                    case 'Data':
+                        self.do_data(msg_data)
                     case 'EngineCallResponse':
                         self.__error('Got engine call response: Not yet supported by this module')
                     case 'Goodbye':