+ "Respond to an incoming DROP directive to terminate a stream originating from this plugin"
+ # If stream in question hasn't been ended, end it. Either way mark it as dropped.
+ stream = self.producer_streams.get(stream_id)
+ if stream:
+ if stream.dropped:
+ self.__error('Stream already dropped')
+ else:
+ stream.dropped = True
+ if not stream.ended:
+ self.enqueue_message({'End': stream_id}, -2)
+ stream.ended = True
+
+ # Stream is dropped and ended, we can stop tracking it.
+ self.producer_streams.remove(stream_id)
+ else:
+ # We're not tracking this stream (anymore...?) - that shouldn't happen unless it's already been ended and dropped.
+ self.__error('Got a drop message for an unrecognized stream ID! ({})'.format(stream_id))
+
+ def do_end(self, stream_id):
+ "Respond to an incoming END directive indicating no more data will be provided in a stream"
+ stream = self.consumer_streams.get(stream_id)
+ if stream:
+ if stream.ended:
+ self.__error('Got END message for a stream which was already ENDed')
+ else:
+ stream.ended = True
+ if not stream.dropped:
+ self.enqueue_message({'Drop': stream_id}, -2)
+ stream.dropped = True
+
+ # Stream is dropped and ended, we can stop tracking it.
+ self.consumer_streams.remove(stream_id)
+ else:
+ # We're not tracking this stream (anymore...?) - that shouldn't happen unless it's already been ended and dropped.
+ self.__error('Got an END message for an unrecognized stream ID! ({})'.format(stream_id))
+
+ def drop_stream(self, stream_id):
+ "Issue a DROP message to request termination of an incoming stream"
+ stream = self.consumer_streams.get(stream_id)
+ if stream:
+ if stream.dropped:
+ self.__error('Stream {} already dropped'.format(stream_id))
+ else:
+ self.enqueue_message({'Drop': stream_id}, -2)
+ stream.dropped = True
+ else:
+ self.__error('Caller requested DROP of unrecognized stream ID: ({})'.format(stream_id))
+
+ def end_stream(self, stream_id):
+ "Issue an END message for an outgoing stream to indicate no further data will be provided for it"
+ stream = self.producer_streams.get(stream_di)
+ if stream:
+ # It's not an error in this case if we end a stream that's already ended, because we may send an END message automatically in response to a DROP...
+ # ...We need a better strategy for handling drop/end.
+ if not stream.ended:
+ # Note explicit "End" messages must have lower priority than any DATA messages for the stream, or the messages will be out of order!
+ # ...Need a better strategy for queueing messages, too...
+ self.enqueue_message({'End': stream_id}, 1)
+ stream.ended = True