IOException.de

Icon

Ausgewählter Nerdkram von Informatikstudenten der Uni Ulm

SequentialMessageQueues in Java

Viele Netzwerkprotokolle teilen ihre Kommunikationsentitäten in Pakete oder Nachrichten auf. Manche Protokolle erwarten außerdem eine sequentielle Abarbeitung (z.B. TCP auf Transportebene), auch wenn die darunterliegenden Protokollschichten dass nicht unbedingt unterstützen. Für die Implementierung eines Nachrichtenpuffers für RTSP-Nachrichten, die ungeordnet ankommen können, aber sequentiell abgearbeitet werden müssen, habe ich eine entsprechende Datenstruktur in Java erstellt. RTSP-Nachrichten besitzen ein Cseq-Header-Feld, welches die einzelnen Nachrichten durchnummiert und als ordnendes Element genutzt werden kann.

Die folgende generische Implementierung erwartet, dass alle Nachrichtenelemete eindeutige und miteinander vergleichbare Identifier besitzen. Desweiteren erlaubt diese Implementierung zwar das parallele Schreiben in die Queue, allerdings sollte das Lesen durch einen einzelnen Thread realisiert werden. Ansonsten kann die Ordnung nach dem Entnehmen aus der Queue durch unterschiedlich lange Laufzeiten der Worker-Threads wieder verloren gehen. Außerdem realisiert diese Implementierung die sequentielle Ordnung eines vollständigen Nachrichtenstroms. Nachrichtenverluste oder Timeouts werden nicht behandelt. Hierfür eignen sich eher Automatic repeat request Protokolle.

Interface für MessageQueue
Eine MessageQueue bietet die grundlegenden Funktionen zum Einfügen und Entfernen von Elementen sowie Hilfsmethoden für die Größe der Queue an.

/**
 * Interface for message queues.
 * 
 * @author Benjamin Erb
 * 
 * @param <E>
 *            Element type
 */
public interface MessageQueue<E>
{
	/**
	 * Adds element to queue. Blocks until element can be added.
	 * 
	 * @param e
	 *            new element
	 * @throws InterruptedException
	 */
	void push(E e) throws InterruptedException;

	/**
	 * Takes an element from the queue. Blocks until element becomes available.
	 * 
	 * @return taken element
	 * @throws InterruptedException
	 */
	E pop() throws InterruptedException;

	/**
	 * Gets the size of the queue
	 * 
	 * @return
	 */
	int size();

	/**
	 * Checks whether queue is empty or not.
	 * 
	 * @return
	 */
	boolean empty();

}

Interface für speziellen Comparator
Dieses spezielel Comparator-Interface fügt Methoden hinzu für das Erfragen von vorherigen und nachfolgenden Identifiern an. Außerdem können Nachrichten, Identifier oder eine Kombination aus beidem verglichen werden.

import java.util.Comparator;

/**
 * An enhanced comparator interface for retrieving preceding and subsequent identifiers.  
 * 
 * @author Benjamin Erb
 *
 * @param <E> Element type
 * @param <T> Element identifier
 */
/**
 */
public interface SequentialComparator<E, T extends Comparable<T>> extends Comparator<E>
{
	/**
	 * Returns the subsequent identifier of a given entity.
	 * 
	 * @param t
	 *            entity
	 * @return subsequent identifier
	 */
	T getNext(E e);

	/**
	 * Returns the preceding identifier of a given entity.
	 * 
	 * @param t
	 *            entity
	 * @return preceding identifier
	 */
	T getPrevious(E e);

	/**
	 * Compares two entities.
	 * 
	 * @param t1
	 *            entity 1
	 * @param t2
	 *            entity 2
	 * @return a negative integer, zero, or a positive integer as the first
	 *         argument is less than, equal to, or greater than the second.
	 */
	int compare(T t1, T t2);

	/**
	 * Compares an entity and an identifier.
	 * 
	 * @param t1
	 *            entity
	 * @param e2
	 *            identifier
	 * @return a negative integer, zero, or a positive integer as the first
	 *         argument is less than, equal to, or greater than the second.
	 */
	int compare(T t1, E e2);

	/**
	 * Compares an identifier and an entity.
	 * 
	 * @param e1
	 *            identifier
	 * @param t2
	 *            entity
	 * @return a negative integer, zero, or a positive integer as the first
	 *         argument is less than, equal to, or greater than the second.
	 */
	int compare(E e1, T t2);
}

SequentialMessageQueue Implementierung
Die eigentliche Queue-Implementierung greift intern auf eine PriorityBlockingQueue zurück, allerdings wird durch die Kapselung sichergestellt, dass nur das nächste zu erwartende Element entnommen werden kann.

import java.util.concurrent.PriorityBlockingQueue;

/**
 * A message queue for sequential message processing. This queue orders incoming
 * messages entities by their identifiers and outputs entites in-order. This
 * queue supports more than one writing thread. However, it is designed for one
 * reading/consuming thread in order to prevent out-of-order processing by
 * multiple threads.
 * 
 * @author Benjamin Erb
 * 
 * @param <E>
 *            Message entity
 * @param <T>
 *            Message identifier
 */
public class SequentialMessageQueue<E, T extends Comparable<T>> implements MessageQueue<E>
{
	/**
	 * Lock object for queueing
	 */
	private final Object lock = new Object();

	/**
	 * internal queue
	 */
	private final PriorityBlockingQueue<E> internalQueue;
	/**
	 * comparator
	 */
	private final SequentialComparator<E, T> comparator;

	/**
	 * Represents the identifier of the next expected entity
	 */
	private T expectedIdentifier;

	public SequentialMessageQueue(SequentialComparator<E, T> comparator, T initialIdentifier)
	{
		this.internalQueue = new PriorityBlockingQueue<E>(16, comparator);
		this.comparator = comparator;
		this.expectedIdentifier = initialIdentifier;
	}

	@Override
	public boolean empty()
	{
		return internalQueue.isEmpty();
	}

	@Override
	public E pop() throws InterruptedException
	{
		synchronized (lock)
		{
			E firstElement;
			while ((firstElement = internalQueue.peek()) == null || comparator.compare(firstElement, expectedIdentifier) != 0)
			{
				lock.wait();
			}
			internalQueue.remove();
			expectedIdentifier = comparator.getNext(firstElement);
			return firstElement;
		}
	}

	@Override
	public void push(E e) throws InterruptedException
	{
		synchronized (lock)
		{
			internalQueue.put(e);
			lock.notifyAll();
		}

	}

	@Override
	public int size()
	{
		return internalQueue.size();
	}

	/**
	 * Returns the used comparator
	 * 
	 * @return
	 */
	protected SequentialComparator<E, T> getSequentialComparator()
	{
		return comparator;
	}

}

Kategorie: java

Tags: ,

Diese Icons verlinken auf Bookmark Dienste bei denen Nutzer neue Inhalte finden und mit anderen teilen können.
  • MisterWong
  • Y!GG
  • Webnews
  • Digg
  • del.icio.us
  • StumbleUpon
  • Reddit
  • Facebook

Kommentar