class stream_info:
id = None
dropped = False
- closed = False
+ ended = False
pending_acks = 0
class stream_manager:
packer = None
compatible_version = '0.102.0'
features = []
+ producer_streams = None
def __init__(self, ins, outs, errs=None, allow_localsocket=True, version="0.1", signature=[]):
self.ins = ins
features.append({'name':'LocalSocket'})
self.message_queue = queue.PriorityQueue(32)
self.version = version
+ self.producer_streams = stream_manager()
def __send_encoding(self):
self.outs.write(b'\x07msgpack')
def __error(self, message):
print(message, file=self.errs)
- def enqueue_response(self, message, priority=0):
+ def enqueue_message(self, message, priority=0):
# Priority level guidelines:
- # 0: miscellaneous traffic, stream messages, etc.
+ # 1: "end" messages for outgoing streams. (A stream "end" message must not be sent until after any data messages for that stream - unless the consumer dropped it first!)
+ # 0: miscellaneous traffic, stream data messages, etc.
# -1: PipelineData/Ordering Call Responses (Call responses providing result values)
- # -2: Introductory message Call Responses (metadata, signature, etc.), EngineCalls, etc.
+ # -2: Introductory message Call Responses (metadata, signature, etc.), EngineCalls, stream flow control messages, etc.
# -3: Error messages
try:
self.message_queue.put((priority, message), block=True, timeout=None)
def send_error_response(self, call_id, error_msg):
response = {'CallResponse': [call_id, {'Error': {'msg': error_msg}}]}
- self.enqueue_response(response, -3)
+ self.enqueue_message(response, -3)
def handle_custom_value_unsupported(self, call_id, value):
# Custom values aren't yet supported by this module
def do_call_metadata(self, call_id):
response = {'CallResponse': [call_id, {'Metadata': {'version': self.version}}]}
- self.enqueue_response(response, -2)
+ self.enqueue_message(response, -2)
def do_call_signature(self, call_id):
response = {'CallResponse': [call_id, {'Signature': self.signature}]}
- self.enqueue_response(response, -2)
+ self.enqueue_message(response, -2)
def do_call(self, msg):
[call_id, call_data] = msg['Call']
self.send_error_response(call_id, error_msg)
def do_drop(self, stream_id):
-
+ "Respond to an incoming DROP directive to terminate a stream originating from this plugin"
+ # If stream in question hasn't been ended, end it. Either way mark it as dropped.
+ stream = self.producer_streams.get(stream_id)
+ if stream:
+ if stream.dropped:
+ self.__error('Stream already dropped')
+ else:
+ stream.dropped = True
+ if not stream.ended:
+ self.enqueue_message({'End': stream_id}, -2)
+ stream.ended = True
+
+ # Stream is dropped and ended, we can stop tracking it.
+ self.producer_streams.remove(stream_id)
+ else:
+ # We're not tracking this stream (anymore...?) - that shouldn't happen unless it's already been ended and dropped.
+ self.__error('Got a drop message for an unrecognized stream ID! ({})'.format(stream_id))
+
+ def do_end(self, stream_id):
+ "Respond to an incoming END directive indicating no more data will be provided in a stream"
+ stream = self.consumer_streams.get(stream_id)
+ if stream:
+ if stream.ended:
+ self.__error('Got END message for a stream which was already ENDed')
+ else:
+ stream.ended = True
+ if not stream.dropped:
+ self.enqueue_message({'Drop': stream_id}, -2)
+ stream.dropped = True
+
+ # Stream is dropped and ended, we can stop tracking it.
+ self.consumer_streams.remove(stream_id)
+ else:
+ # We're not tracking this stream (anymore...?) - that shouldn't happen unless it's already been ended and dropped.
+ self.__error('Got an END message for an unrecognized stream ID! ({})'.format(stream_id))
+
+ def drop_stream(self, stream_id):
+ "Issue a DROP message to request termination of an incoming stream"
+ stream = self.consumer_streams.get(stream_id)
+ if stream:
+ if stream.dropped:
+ self.__error('Stream {} already dropped'.format(stream_id))
+ else:
+ self.enqueue_message({'Drop': stream_id}, -2)
+ stream.dropped = True
+ else:
+ self.__error('Caller requested DROP of unrecognized stream ID: ({})'.format(stream_id))
+
+ def end_stream(self, stream_id):
+ "Issue an END message for an outgoing stream to indicate no further data will be provided for it"
+ stream = self.producer_streams.get(stream_di)
+ if stream:
+ # It's not an error in this case if we end a stream that's already ended, because we may send an END message automatically in response to a DROP...
+ # ...We need a better strategy for handling drop/end.
+ if not stream.ended:
+ # Note explicit "End" messages must have lower priority than any DATA messages for the stream, or the messages will be out of order!
+ # ...Need a better strategy for queueing messages, too...
+ self.enqueue_message({'End': stream_id}, 1)
+ stream.ended = True
def receive_loop(self):
while not self.shutdown:
self.do_ack(msg_data)
case 'Drop':
self.do_drop(msg_data)
+ case 'End':
+ self.do_end(msg_data)
+ case 'Data':
+ self.do_data(msg_data)
case 'EngineCallResponse':
self.__error('Got engine call response: Not yet supported by this module')
case 'Goodbye':