import argparse
import os
+import queue
import socket
import sys
import msgpack
+
+class stream_info:
+ id = None
+ dropped = False
+ closed = False
+ pending_acks = 0
+
+class stream_manager:
+ next_id = 0
+ max_id = 2**63-1 # IDs must fit in a signed 64 bit integer
+ streams = {}
+
+ def new_stream(id=None):
+ if id is None:
+ id = next_id
+ next_id = next_id + 1
+ if id > max_id:
+ raise Exception('Unable to allocate a stream ID')
+
+ if id in streams:
+ raise Exception('Stream ID is already in use')
+
+ streams[id] = stream_info(id)
+
class server:
ins = None
outs = None
compatible_version = '0.102.0'
features = []
- def __init__(self, ins, outs, errs=None, allow_localsocket=True):
+ def __init__(self, ins, outs, errs=None, allow_localsocket=True, version="0.1", signature=[]):
self.ins = ins
self.outs = outs
self.errs = errs
self.packer = msgpack.Packer(self.outs)
if allow_localsocket:
features.append({'name':'LocalSocket'})
+ self.message_queue = queue.PriorityQueue(32)
+ self.version = version
def __send_encoding(self):
self.outs.write(b'\x07msgpack')
def __send(self, msg):
self.packer.pack(msg)
+ def __error(self, message):
+ print(message, file=self.errs)
+
+ def enqueue_response(self, message, priority=0):
+ # Priority level guidelines:
+ # 0: miscellaneous traffic, stream messages, etc.
+ # -1: PipelineData/Ordering Call Responses (Call responses providing result values)
+ # -2: Introductory message Call Responses (metadata, signature, etc.), EngineCalls, etc.
+ # -3: Error messages
+ try:
+ self.message_queue.put((priority, message), block=True, timeout=None)
+ except queue.ShutDown:
+ self.error('Failed to enqueue message: message queue was shut down')
+
def send_error_response(self, call_id, error_msg):
response = {'CallResponse': [call_id, {'Error': {'msg': error_msg}}]}
- self.enqueue_response(response)
+ self.enqueue_response(response, -3)
def handle_custom_value_unsupported(self, call_id, value):
# Custom values aren't yet supported by this module
error_msg = "Custom values are not yet supported"
- self.error(error_msg)
+ self.__error(error_msg)
self.send_error_response(call_id, error_msg)
def run_custom_value_op(self, call_id, value, op):
# It should be a dict of just one key, and the key is the name of the operation being performed
if len(op) != 1:
error_msg = "Unrecognized CustomValueOp format: ({})".format(op)
- self.error(error_msg)
+ self.__error(error_msg)
self.send_error_response(call_id, error_msg)
return
self.handle_custom_value_unsupported(call_id, op_arg)
case _:
error_msg = 'Unrecognized CustomValue operation: {}'.format(op_name)
- self.error(error_msg)
+ self.__error(error_msg)
self.send_error_response(call_id, error_msg)
+ def run_method(self, call_id, method_name, call_info):
+ # Override this to implement dispatch to a plugin's individual methods
+ send_error_response(call_id, 'Unrecognized method name ({})'.format(method_name))
+
+ def is_compatible_version(self, version):
+ version_bits = version.split('.') if version else []
+ compatible_version_bits = self.compatible_version.split('.') if self.compatible_version else []
+
+ try:
+ if (len(version_bits) < 1) or (len(compatible_version_bits) < 1):
+ # Nothing to compare
+ return False
+
+ if (version_bits[0] != compatible_version_bits[0]):
+ # Different major version
+ return False
+
+ if (len(version_bits) >= 2) and (len(compatible_version_bits) >= 2) and (int(version_bits[1]) < int(compatible_version_bits[1])):
+ # Lesser minor version
+ return False
+ except:
+ # Failed to process the version numbers somehow, so fail
+ return False
+
+ return True
+
def do_hello(self, msg):
protocol = msg.get('protocol')
version = msg.get('version')
ok = False
if protocol != 'nu-plugin':
- self.error('Invalid protocol ({}) sent by engine'.format(protocol))
+ self.__error('Invalid protocol ({}) sent by engine'.format(protocol))
elif not version:
- self.error('No version number provided by engine')
- elif not self.compatible_version(version):
- self.error('Version number {} provided by the engine is incompatible with this plugin.'.format(version))
+ self.__error('No version number provided by engine')
+ elif not self.is_compatible_version(version):
+ self.__error('Version number {} provided by the engine is incompatible with this plugin.'.format(version))
else:
ok = True
# Abort communication, terminate...
self.shutdown = True
+ def do_call_metadata(self, call_id):
+ response = {'CallResponse': [call_id, {'Metadata': {'version': self.version}}]}
+ self.enqueue_response(response, -2)
+
+ def do_call_signature(self, call_id):
+ response = {'CallResponse': [call_id, {'Signature': self.signature}]}
+ self.enqueue_response(response, -2)
+
def do_call(self, msg):
[call_id, call_data] = msg['Call']
call_name = call_data
if type(call_data) is 'dict':
if (len(call_data) != 1):
error_msg = 'Unrecognized call format: ({})'.format(call_data)
- self.error(error_msg)
+ self.__error(error_msg)
self.send_error_response(call_id, error_msg)
return
(call_name, call_arg) = [(k, v) for (k, v) in call_data.items()][0]
case 'Signature':
do_call_signature(self, call_id)
case 'Run':
- self.run_method(call_id, call_data.get('Run'))
+ self.run_method(call_id, call_data.get('Run').get('name'), call_data.get('Run'))
case 'CustomValueOp':
value = call_arg[0]
op = call_arg[1]
self.run_custom_value_op(call_id, value, op)
case _:
error_msg = 'Unrecognized Call: {}'.format(call_name)
- self.error(error_msg)
+ self.__error(error_msg)
self.send_error_response(call_id, error_msg)
+ def do_drop(self, stream_id):
+
+
def receive_loop(self):
while not self.shutdown:
try:
if type(msg) is dict:
if len(msg) != 1:
- self.error('Unsupported message format: ({})'.format(msg))
+ self.__error('Unsupported message format: ({})'.format(msg))
return
(msg_type, msg_data) = [(k, v) for (k, v) in msg.items()][0]
case 'Drop':
self.do_drop(msg_data)
case 'EngineCallResponse':
- self.error('Got engine call response: Not yet supported by this module')
+ self.__error('Got engine call response: Not yet supported by this module')
case 'Goodbye':
self.shutdown = True
case _:
- self.error('Got unrecognized message from engine: ({})'.format(msg))
+ self.__error('Got unrecognized message from engine: ({})'.format(msg))
except msgpack.exceptions.OutOfData:
- self.error('Engine disconnected without \'Goodbye\' message: Shutting down')
+ self.__error('Engine disconnected without \'Goodbye\' message: Shutting down')
self.shutdown = True
def run(self):