From 3a728545b9edce43649e8a397d062b8ab384c1bc Mon Sep 17 00:00:00 2001 From: George Caswell Date: Tue, 10 Jun 2025 16:19:58 -0400 Subject: [PATCH] nuplugin.py: Various updates to support version check, metadata, signature, and message queueing --- nuplugin.py | 110 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 94 insertions(+), 16 deletions(-) diff --git a/nuplugin.py b/nuplugin.py index 0260857..b51241b 100755 --- a/nuplugin.py +++ b/nuplugin.py @@ -1,9 +1,34 @@ import argparse import os +import queue import socket import sys import msgpack + +class stream_info: + id = None + dropped = False + closed = False + pending_acks = 0 + +class stream_manager: + next_id = 0 + max_id = 2**63-1 # IDs must fit in a signed 64 bit integer + streams = {} + + def new_stream(id=None): + if id is None: + id = next_id + next_id = next_id + 1 + if id > max_id: + raise Exception('Unable to allocate a stream ID') + + if id in streams: + raise Exception('Stream ID is already in use') + + streams[id] = stream_info(id) + class server: ins = None outs = None @@ -16,7 +41,7 @@ class server: compatible_version = '0.102.0' features = [] - def __init__(self, ins, outs, errs=None, allow_localsocket=True): + def __init__(self, ins, outs, errs=None, allow_localsocket=True, version="0.1", signature=[]): self.ins = ins self.outs = outs self.errs = errs @@ -26,6 +51,8 @@ class server: self.packer = msgpack.Packer(self.outs) if allow_localsocket: features.append({'name':'LocalSocket'}) + self.message_queue = queue.PriorityQueue(32) + self.version = version def __send_encoding(self): self.outs.write(b'\x07msgpack') @@ -37,14 +64,28 @@ class server: def __send(self, msg): self.packer.pack(msg) + def __error(self, message): + print(message, file=self.errs) + + def enqueue_response(self, message, priority=0): + # Priority level guidelines: + # 0: miscellaneous traffic, stream messages, etc. + # -1: PipelineData/Ordering Call Responses (Call responses providing result values) + # -2: Introductory message Call Responses (metadata, signature, etc.), EngineCalls, etc. + # -3: Error messages + try: + self.message_queue.put((priority, message), block=True, timeout=None) + except queue.ShutDown: + self.error('Failed to enqueue message: message queue was shut down') + def send_error_response(self, call_id, error_msg): response = {'CallResponse': [call_id, {'Error': {'msg': error_msg}}]} - self.enqueue_response(response) + self.enqueue_response(response, -3) def handle_custom_value_unsupported(self, call_id, value): # Custom values aren't yet supported by this module error_msg = "Custom values are not yet supported" - self.error(error_msg) + self.__error(error_msg) self.send_error_response(call_id, error_msg) def run_custom_value_op(self, call_id, value, op): @@ -54,7 +95,7 @@ class server: # It should be a dict of just one key, and the key is the name of the operation being performed if len(op) != 1: error_msg = "Unrecognized CustomValueOp format: ({})".format(op) - self.error(error_msg) + self.__error(error_msg) self.send_error_response(call_id, error_msg) return @@ -76,20 +117,46 @@ class server: self.handle_custom_value_unsupported(call_id, op_arg) case _: error_msg = 'Unrecognized CustomValue operation: {}'.format(op_name) - self.error(error_msg) + self.__error(error_msg) self.send_error_response(call_id, error_msg) + def run_method(self, call_id, method_name, call_info): + # Override this to implement dispatch to a plugin's individual methods + send_error_response(call_id, 'Unrecognized method name ({})'.format(method_name)) + + def is_compatible_version(self, version): + version_bits = version.split('.') if version else [] + compatible_version_bits = self.compatible_version.split('.') if self.compatible_version else [] + + try: + if (len(version_bits) < 1) or (len(compatible_version_bits) < 1): + # Nothing to compare + return False + + if (version_bits[0] != compatible_version_bits[0]): + # Different major version + return False + + if (len(version_bits) >= 2) and (len(compatible_version_bits) >= 2) and (int(version_bits[1]) < int(compatible_version_bits[1])): + # Lesser minor version + return False + except: + # Failed to process the version numbers somehow, so fail + return False + + return True + def do_hello(self, msg): protocol = msg.get('protocol') version = msg.get('version') ok = False if protocol != 'nu-plugin': - self.error('Invalid protocol ({}) sent by engine'.format(protocol)) + self.__error('Invalid protocol ({}) sent by engine'.format(protocol)) elif not version: - self.error('No version number provided by engine') - elif not self.compatible_version(version): - self.error('Version number {} provided by the engine is incompatible with this plugin.'.format(version)) + self.__error('No version number provided by engine') + elif not self.is_compatible_version(version): + self.__error('Version number {} provided by the engine is incompatible with this plugin.'.format(version)) else: ok = True @@ -97,6 +164,14 @@ class server: # Abort communication, terminate... self.shutdown = True + def do_call_metadata(self, call_id): + response = {'CallResponse': [call_id, {'Metadata': {'version': self.version}}]} + self.enqueue_response(response, -2) + + def do_call_signature(self, call_id): + response = {'CallResponse': [call_id, {'Signature': self.signature}]} + self.enqueue_response(response, -2) + def do_call(self, msg): [call_id, call_data] = msg['Call'] call_name = call_data @@ -104,7 +179,7 @@ class server: if type(call_data) is 'dict': if (len(call_data) != 1): error_msg = 'Unrecognized call format: ({})'.format(call_data) - self.error(error_msg) + self.__error(error_msg) self.send_error_response(call_id, error_msg) return (call_name, call_arg) = [(k, v) for (k, v) in call_data.items()][0] @@ -115,16 +190,19 @@ class server: case 'Signature': do_call_signature(self, call_id) case 'Run': - self.run_method(call_id, call_data.get('Run')) + self.run_method(call_id, call_data.get('Run').get('name'), call_data.get('Run')) case 'CustomValueOp': value = call_arg[0] op = call_arg[1] self.run_custom_value_op(call_id, value, op) case _: error_msg = 'Unrecognized Call: {}'.format(call_name) - self.error(error_msg) + self.__error(error_msg) self.send_error_response(call_id, error_msg) + def do_drop(self, stream_id): + + def receive_loop(self): while not self.shutdown: try: @@ -135,7 +213,7 @@ class server: if type(msg) is dict: if len(msg) != 1: - self.error('Unsupported message format: ({})'.format(msg)) + self.__error('Unsupported message format: ({})'.format(msg)) return (msg_type, msg_data) = [(k, v) for (k, v) in msg.items()][0] @@ -149,14 +227,14 @@ class server: case 'Drop': self.do_drop(msg_data) case 'EngineCallResponse': - self.error('Got engine call response: Not yet supported by this module') + self.__error('Got engine call response: Not yet supported by this module') case 'Goodbye': self.shutdown = True case _: - self.error('Got unrecognized message from engine: ({})'.format(msg)) + self.__error('Got unrecognized message from engine: ({})'.format(msg)) except msgpack.exceptions.OutOfData: - self.error('Engine disconnected without \'Goodbye\' message: Shutting down') + self.__error('Engine disconnected without \'Goodbye\' message: Shutting down') self.shutdown = True def run(self): -- 2.34.1