nuplugin.py: Various updates to support version check, metadata, signature, and messa...
authorGeorge Caswell <gcaswell@translations.com>
Tue, 10 Jun 2025 20:19:58 +0000 (16:19 -0400)
committerGeorge Caswell <gcaswell@translations.com>
Tue, 10 Jun 2025 20:19:58 +0000 (16:19 -0400)
nuplugin.py

index 026085776efa4fc03196ebc7bcf8475136d1693d..b51241bbede87ddcb33b24e83f3d8083b65df13c 100755 (executable)
@@ -1,9 +1,34 @@
 import argparse
 import os
+import queue
 import socket
 import sys
 import msgpack
 
+
+class stream_info:
+    id = None
+    dropped = False
+    closed = False
+    pending_acks = 0
+
+class stream_manager:
+    next_id = 0
+    max_id = 2**63-1 # IDs must fit in a signed 64 bit integer
+    streams = {}
+
+    def new_stream(id=None):
+        if id is None:
+            id = next_id
+            next_id = next_id + 1
+            if id > max_id:
+                raise Exception('Unable to allocate a stream ID')
+
+        if id in streams:
+            raise Exception('Stream ID is already in use')
+
+        streams[id] = stream_info(id)
+
 class server:
     ins = None
     outs = None
@@ -16,7 +41,7 @@ class server:
     compatible_version = '0.102.0'
     features = []
 
-    def __init__(self, ins, outs, errs=None, allow_localsocket=True):
+    def __init__(self, ins, outs, errs=None, allow_localsocket=True, version="0.1", signature=[]):
         self.ins = ins
         self.outs = outs
         self.errs = errs
@@ -26,6 +51,8 @@ class server:
         self.packer = msgpack.Packer(self.outs)
         if allow_localsocket:
             features.append({'name':'LocalSocket'})
+        self.message_queue = queue.PriorityQueue(32)
+        self.version = version
 
     def __send_encoding(self):
         self.outs.write(b'\x07msgpack')
@@ -37,14 +64,28 @@ class server:
     def __send(self, msg):
         self.packer.pack(msg)
 
+    def __error(self, message):
+        print(message, file=self.errs)
+
+    def enqueue_response(self, message, priority=0):
+        # Priority level guidelines:
+        # 0: miscellaneous traffic, stream messages, etc.
+        # -1: PipelineData/Ordering Call Responses (Call responses providing result values)
+        # -2: Introductory message Call Responses (metadata, signature, etc.), EngineCalls, etc.
+        # -3: Error messages
+        try:
+            self.message_queue.put((priority, message), block=True, timeout=None)
+        except queue.ShutDown:
+            self.error('Failed to enqueue message: message queue was shut down')
+
     def send_error_response(self, call_id, error_msg):
         response = {'CallResponse': [call_id, {'Error': {'msg': error_msg}}]}
-        self.enqueue_response(response)
+        self.enqueue_response(response, -3)
 
     def handle_custom_value_unsupported(self, call_id, value):
         # Custom values aren't yet supported by this module
         error_msg = "Custom values are not yet supported"
-        self.error(error_msg)
+        self.__error(error_msg)
         self.send_error_response(call_id, error_msg)
 
     def run_custom_value_op(self, call_id, value, op):
@@ -54,7 +95,7 @@ class server:
             # It should be a dict of just one key, and the key is the name of the operation being performed
             if len(op) != 1:
                 error_msg = "Unrecognized CustomValueOp format: ({})".format(op)
-                self.error(error_msg)
+                self.__error(error_msg)
                 self.send_error_response(call_id, error_msg)
                 return
 
@@ -76,20 +117,46 @@ class server:
                 self.handle_custom_value_unsupported(call_id, op_arg)
             case _:
                 error_msg = 'Unrecognized CustomValue operation: {}'.format(op_name)
-                self.error(error_msg)
+                self.__error(error_msg)
                 self.send_error_response(call_id, error_msg)
 
+    def run_method(self, call_id, method_name, call_info):
+        # Override this to implement dispatch to a plugin's individual methods
+        send_error_response(call_id, 'Unrecognized method name ({})'.format(method_name))
+
+    def is_compatible_version(self, version):
+        version_bits = version.split('.') if version else []
+        compatible_version_bits = self.compatible_version.split('.') if self.compatible_version else []
+
+        try:
+            if (len(version_bits) < 1) or (len(compatible_version_bits) < 1):
+                # Nothing to compare
+                return False
+
+            if (version_bits[0] != compatible_version_bits[0]):
+                # Different major version
+                return False
+
+            if (len(version_bits) >= 2) and (len(compatible_version_bits) >= 2) and (int(version_bits[1]) < int(compatible_version_bits[1])):
+                # Lesser minor version
+                return False
+        except:
+            # Failed to process the version numbers somehow, so fail
+            return False
+
+        return True
+
     def do_hello(self, msg):
         protocol = msg.get('protocol')
         version = msg.get('version')
         ok = False
 
         if protocol != 'nu-plugin':
-            self.error('Invalid protocol ({}) sent by engine'.format(protocol))
+            self.__error('Invalid protocol ({}) sent by engine'.format(protocol))
         elif not version:
-            self.error('No version number provided by engine')
-        elif not self.compatible_version(version):
-            self.error('Version number {} provided by the engine is incompatible with this plugin.'.format(version))
+            self.__error('No version number provided by engine')
+        elif not self.is_compatible_version(version):
+            self.__error('Version number {} provided by the engine is incompatible with this plugin.'.format(version))
         else:
             ok = True
 
@@ -97,6 +164,14 @@ class server:
             # Abort communication, terminate...
             self.shutdown = True
 
+    def do_call_metadata(self, call_id):
+        response = {'CallResponse': [call_id, {'Metadata': {'version': self.version}}]}
+        self.enqueue_response(response, -2)
+
+    def do_call_signature(self, call_id):
+        response = {'CallResponse': [call_id, {'Signature': self.signature}]}
+        self.enqueue_response(response, -2)
+
     def do_call(self, msg):
         [call_id, call_data] = msg['Call']
         call_name = call_data
@@ -104,7 +179,7 @@ class server:
         if type(call_data) is 'dict':
             if (len(call_data) != 1):
                 error_msg = 'Unrecognized call format: ({})'.format(call_data)
-                self.error(error_msg)
+                self.__error(error_msg)
                 self.send_error_response(call_id, error_msg)
                 return
             (call_name, call_arg) = [(k, v) for (k, v) in call_data.items()][0]
@@ -115,16 +190,19 @@ class server:
             case 'Signature':
                 do_call_signature(self, call_id)
             case 'Run':
-                self.run_method(call_id, call_data.get('Run'))
+                self.run_method(call_id, call_data.get('Run').get('name'), call_data.get('Run'))
             case 'CustomValueOp':
                 value = call_arg[0]
                 op = call_arg[1]
                 self.run_custom_value_op(call_id, value, op)
             case _:
                 error_msg = 'Unrecognized Call: {}'.format(call_name)
-                self.error(error_msg)
+                self.__error(error_msg)
                 self.send_error_response(call_id, error_msg)
 
+    def do_drop(self, stream_id):
+                
+
     def receive_loop(self):
         while not self.shutdown:
             try:
@@ -135,7 +213,7 @@ class server:
 
                 if type(msg) is dict:
                     if len(msg) != 1:
-                        self.error('Unsupported message format: ({})'.format(msg))
+                        self.__error('Unsupported message format: ({})'.format(msg))
                         return
                     (msg_type, msg_data) = [(k, v) for (k, v) in msg.items()][0]
 
@@ -149,14 +227,14 @@ class server:
                     case 'Drop':
                         self.do_drop(msg_data)
                     case 'EngineCallResponse':
-                        self.error('Got engine call response: Not yet supported by this module')
+                        self.__error('Got engine call response: Not yet supported by this module')
                     case 'Goodbye':
                         self.shutdown = True
                     case _:
-                        self.error('Got unrecognized message from engine: ({})'.format(msg))
+                        self.__error('Got unrecognized message from engine: ({})'.format(msg))
 
             except msgpack.exceptions.OutOfData:
-                self.error('Engine disconnected without \'Goodbye\' message: Shutting down')
+                self.__error('Engine disconnected without \'Goodbye\' message: Shutting down')
                 self.shutdown = True
 
     def run(self):