from abc import ABCMeta class SocketBase(object): __metaclass__ = ABCMeta name = None socket = None def __init__(self, name): """ :param name: The name of the endpoint. :type name: str """ self.name = name self.socket = None def create_socket(self, zmq_context, socket_type, port): """ Create a ZeroMQ socket. :param zmq_context: The ZeroMQ context to use. :type zmq_context: zmq.Context :param socket_type: The type of socket to create. Use zmq constants, e.g. zmq.SUB or zmq.REP. :type socket_type: int :param port: The port to use. :type port: int """ self.socket = zmq_context.socket(socket_type) self.socket.connect("tcp://localhost:{}".format(port)) def close(self): """Close the ZeroMQ socket.""" if not self.socket: return self.socket.close() self.socket = None