Was ist der unterschied zwischen topics und messa-ges

Java EE (Java Platform, Enterprise Edition) ist eine durch Schnittstellen definierte Architektur f�r Unternehmensanwendungen, bestehend aus verschiedenen Komponenten.

JMS (Java Message Service) stellt einen wichtigen Bestandteil von Java EE dar, um asynchrone Kommunikation (�ber einen Message Broker) zu erm�glichen, was auch unter dem Begriff MOM (Message Oriented Middleware) bekannt ist.

Die Vorteile von asynchroner Kommunikation per JMS sind:
+ Zeitliche Entkopplung zwischen Beauftragung und Bearbeitung
+ Lose Kopplung erleichtert Austausch einzelner Systeme
+ Flexibel konfigurierbare Kommunikation und Diensteverteilung
+ Plattform- und programmiersprachenunabh�ngige Kommunikation
+ Anbindung an Fremdsysteme, z.B. Hosts
+ Batchbetrieb ist m�glich
+ Verteilte Anwendungen
+ Verteilte Transaktionen
+ Standardisierung, Wartungsfreundlichkeit
+ Definierte Sicherheitsmechanismen
+ Hohe Ausfallsicherheit erreichbar
+ Hohe Skalierbarkeit

Inhalt

  1. Wichtige Klassen und Methoden
    • �berblick, Nachrichtenkan�le, Transaktion/Best�tigung, Nachrichtenempf�nger, Nachrichtensender
  2. Vorbereitungen f�r die Beispiele
  3. Namen im JNDI-Context
  4. Queue (Point-to-Point)
    • QueueSender, Queue-receive, Queue-Listener
  5. Topic (Publish-and-Subscribe)
    • TopicPublisher, Topic-receive, Topic-Listener (mit Filter)
  6. QueueRequestor
    • QueueRequestor, QueueRequestor-reply
  7. MDB (Message Driven Bean)
  8. Links auf weiterf�hrende Informationen

Wichtige Klassen und Methoden

�berblick
  Queue
(Point-to-Point, "PTP")
Topic
(Publish-and-Subscribe, "pub-sub")
InitialContext InitialContext InitialContext
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Session QueueSession TopicSession
Destination Queue Topic
MessageConsumer QueueReceiver TopicSubscriber
MessageProducer QueueSender TopicPublisher
Requestor QueueRequestor TopicRequestor
Message Message Message
Nachrichtenkan�le Queue (Point-to-Point, PTP)Queues (Warteschlangen) sind Nachrichtenkan�le, die normalerweise zwischen genau einem Sender und einem Empf�nger aufgebaut werden (Punkt-zu-Punkt-Verbindung, �hnlich wie eine E-Mail). Wenn ein Empf�nger eine Nachricht erh�lt, wird sie normalerweise aus dem Nachrichtenkanal entfernt. Topic (Point-to-Multipoint, Publish-and-Subscribe, pub/sub, Publizieren/Abonnieren)Topics (Themen) sind themenorientierte Nachrichtenkan�le und haben normalerweise viele Konsumenten. Sender publizieren Nachrichten ('Publish') und Empf�nger abbonieren sie ('Subscribe'). Wenn ein Empf�nger eine Nachricht erh�lt, bleibt sie normalerweise trotzdem weiter im Nachrichtenkanal, damit auch weitere Konsumenten die Nachricht empfangen k�nnen. Es kann ein Verfallsdatum f�r die Nachrichten geben oder sie werden durch neuere ersetzt. Transaktionskontrolle und Best�tigungsmodusBeim Erzeugen der Session �ber QueueConnection.createQueueSession() beziehungsweise TopicConnection.createTopicSession() werden die beiden Parameter 'transacted' und 'acknowledgeMode' gesetzt: transacted�ber den 'transacted'-Parameter wird gesteuert, ob es sich um eine transaktionsorientierte Sitzung handelt. Dann werden die in dieser Session erzeugten Nachrichten zwischengespeichert und erst beim abschlie�enden Commit gemeinsam abgesendet. acknowledgeMode�ber den 'acknowledgeMode'-Parameter wird der Best�tigungsmodus gesetzt: 'AUTO_ACKNOWLEDGE', 'CLIENT_ACKNOWLEDGE' oder 'DUPS_OK_ACKNOWLEDGE'. Nachrichtenempf�nger MessageConsumer . receive()Anders als ein Listener wartet 'receive()' synchron auf Messages. MessageConsumer . setMessageListener( MessageListener )
MessageListener . onMessage( Message )
Anders als bei 'receive()' braucht ein Listener nicht auf Messages zu warten, sondern empf�ngt sie asynchron, indem er sich �ber 'setMessageListener()' registriert, wodurch Messages an die 'onMessage(Message)'-Callback-Methode geschickt werden. Nachrichtensender QueueSender . send( Message )
TopicPublisher . publish( Message )
'send()' und 'publish()' versenden Nachrichten ohne eine Antwortnachricht zu erwarten. QueueRequestor . request( Message )
TopicRequestor . request( Message )
Ein Requestor verschickt eine Nachricht, �ffnet einen tempor�ren R�ckkanal, macht diesen �ber das JMSReplyTo-Feld zug�nglich, registriert sich als Empf�nger und wartet auf eine Antwortnachricht. Messages Message-Aufbau
Header Enth�lt vom JMS gesetzte allgemeine Nachrichteninformationen wie: JMSCorrelationID, JMSDeliveryMode, JMSDestination, JMSExpiration, JMSMessageID, JMSPriority, JMSRedelivered, JMSReplyTo, JMSTimestamp, JMSType.
Properties Enth�lt von den Anwendungen gesetzte Nachrichtenzusatzinformationen als Attribute in Form von Key/Value-Paaren, deren Values von verschiedenen Java-Typen sein k�nnen. Die Attribute werden gesetzt und gelesen �ber 'get<Type>Property()' und 'set<Type>Property()'. Ein m�glicher Verwendungszweck dieser Attribute ist eine m�gliche Filterung der Nachrichten bei den Empf�ngern.
Body Enth�lt die eigentliche Nachricht, die einem der folgenden f�nf Message-Typen entspricht.
Message-Typen
TextMessage �bermittelt einzelnen Text-String.
MapMessage �bermittelt (mehrere) Attribute als Key/Value-Paare.
Dabei k�nnen die Values nicht nur Strings sein,
sondern auch viele andere Java-Typen.
ObjectMessage �bermittelt ein Java-Object (welches 'Serializable' implementieren muss).
StreamMessage �bermittelt Streams �hnlich wie 'DataOutputStream'.
Anders als 'BytesMessage' �bergibt 'StreamMessage' auch Datentypen.
BytesMessage �bermittelt Streams �hnlich wie 'DataOutputStream'.
Anders als 'StreamMessage' �bergibt 'BytesMessage' nicht interpretierte Rohdaten.

Vorbereitungen f�r die Beispiele

F�r die folgenden Beispiel wird ein Java EE Application Server f�r die JNDI- und JMS-Dienste ben�tigt. Es wird im Folgenden davon ausgegangen, dass JBoss installiert wird. Mit kleinen Modifikationen sind die Beispiele auch mit anderen Java EE Application Servern lauff�hig.

  1. Installieren Sie JBoss wie beschrieben unter 'jee-ejb2.htm#InstallationJBoss'.

    Wenn Sie nicht JBoss verwenden wollen, m�ssen Sie einen anderen Java EE Application Server f�r die JNDI- und JMS-Dienste installieren.

  2. Legen Sie ein Projektverzeichnis an, z.B. 'D:\MeinWorkspace\MeinJmsProjekt' (im Folgenden '<MeinJmsProjekt>' genannt).

  3. Legen Sie im Projektverzeichnis '<MeinJmsProjekt>' das Unterverzeichnis '<MeinJmsProjekt>\lib' an und kopieren Sie dort hinein die vom Client ben�tigten Libs. F�r JBosss ist dies die Datei 'jbossall-client.jar' aus dem JBoss-Verzeichnis 'C:\JBoss\client'.

  4. Legen Sie im Projektverzeichnis '<MeinJmsProjekt>' das Unterverzeichnis '<MeinJmsProjekt>\conf' und darin die folgende Datei 'jndi.properties' an (wird von 'InitialContext()' ben�tigt):

    java.naming.provider.url=jnp://localhost:1099 java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory jnp.socket.Factory=org.jnp.interfaces.TimedSocketFactory

    Wenn Sie nicht JBoss verwenden oder wenn JBoss auf einem anderen Rechner installiert ist, m�ssen Sie die Datei anpassen, wie zum Beispiel beschrieben unter: jee-jndi.htm#jndi.properties.

  5. JMS-Queues und -Topics m�ssen beim Java EE Application Server bzw. JMS-Provider angemeldet werden. F�r JBoss muss hierf�r eine '*-service.xml'-Datei in das JBoss-Deploy-Verzeichnis kopiert werden.

    Legen Sie im Unterverzeichnis '<MeinJmsProjekt>\conf' die folgende Datei 'wetter-service.xml' an:

    <?xml version="1.0" encoding="UTF-8"?> <!-- Destination without a configured SecurityManager or without a SecurityConf will default to role guest with read=true, write=true, create=false. --> <server> <mbean code="org.jboss.mq.server.jmx.Topic" name="jboss.mq.destination:service=Topic,name=WetterTopic"> <depends optional-attribute-name="DestinationManager"> jboss.mq:service=DestinationManager </depends> <depends optional-attribute-name="SecurityManager"> jboss.mq:service=SecurityManager </depends> </mbean> </server>

    Kopieren Sie diese Datei in das JBoss-Deploy-Verzeichnis:

    copy D:\MeinWorkspace\MeinJmsProjekt\conf\wetter-service.xml C:\JBoss\server\default\deploy

    Bei anderen Java EE Application Servern m�ssen entweder �hnliche Dateien erstellt werden oder JMS wird �ber die Server-Konfiguration (z.B. Web-Konsole) eingerichtet.

Namen im JNDI-Context anzeigen

JNDI (Java Naming and Directory Interface) bietet einen Namens- und Verzeichnisdienst, �ber den Objekte und Dienste gefunden und verf�gbar gemacht werden.

Besonders zu Beginn kann es sehr hilfreich sein, die genauen Namen aller JNDI-Eintr�ge anzeigen zu lassen, da sie bei den verschiedenen Java EE Application Servern unterschiedlich vergeben werden k�nnen.

F�hren Sie hierf�r das Testprogramm unter jee-jndi.htm#Auslesen-Namen-im-JNDI aus.

Queue (Point-to-Point)

  1. QueueSender

    Vergegenw�rtigen Sie sich den Unterschied zwischen 'Queue' und 'Topic'.

    Der folgende QueueSender sendet zehn Nachrichten an ein Queue.

    Speichern Sie im Verzeichnis '<MeinJmsProjekt>\src\meinjmspackage' die folgende Datei 'MyQueueSender.java':

    package meinjmspackage; import javax.jms.*; import javax.naming.*; public class MyQueueSender { public static void main( String[] args ) throws NamingException, JMSException { final String MEINE_QUEUE = "queue/testQueue"; Context ctx = null; QueueConnection connect = null; QueueSession session = null; Queue queue = null; QueueSender sender = null; try { ctx = new InitialContext(); QueueConnectionFactory fact = (QueueConnectionFactory) ctx.lookup( "ConnectionFactory" ); connect = fact.createQueueConnection(); session = connect.createQueueSession( false, Session.AUTO_ACKNOWLEDGE ); try { queue = (Queue) ctx.lookup( MEINE_QUEUE ); } catch( NameNotFoundException ex ) { queue = session.createQueue( MEINE_QUEUE ); ctx.bind( MEINE_QUEUE, queue ); } sender = session.createSender( queue ); connect.start(); for( int i=0; i<10; i++ ) { TextMessage msg = session.createTextMessage(); msg.setText( "Die " + (i+1) + ". Meldung des MyQueueSenders." ); sender.send( msg ); System.out.println( "Sending " + msg.getText() ); // System.out.println( "Sending " + msg.toString() ); } } finally { try { if( null != sender ) sender.close(); } catch( Exception ex ) {/*ok*/} try { if( null != session ) session.close(); } catch( Exception ex ) {/*ok*/} try { if( null != connect ) connect.close(); } catch( Exception ex ) {/*ok*/} try { if( null != ctx ) ctx.close(); } catch( Exception ex ) {/*ok*/} } } }
  2. Queue-receive

    Der folgende QueueReceiver empf�ngt maximal 20 Sekunden lang bis zu 20 Nachrichten von einer Queue.

    Anders als ein Listener wartet 'QueueReceiver.receive(1000)' synchron auf Messages (mit vorgegebenem Timeout).

    Speichern Sie im Verzeichnis '<MeinJmsProjekt>\src\meinjmspackage' die folgende Datei 'MyQueueReceive.java':

    package meinjmspackage; import javax.jms.*; import javax.naming.*; public class MyQueueReceive { public static void main( String[] args ) throws Exception { final String MEINE_QUEUE = "queue/testQueue"; Context ctx = null; QueueConnection connect = null; QueueSession session = null; Queue queue = null; QueueReceiver receiver = null; try { ctx = new InitialContext(); QueueConnectionFactory fact = (QueueConnectionFactory) ctx.lookup( "ConnectionFactory" ); connect = fact.createQueueConnection(); session = connect.createQueueSession( false, Session.AUTO_ACKNOWLEDGE ); try { queue = (Queue) ctx.lookup( MEINE_QUEUE ); } catch( NameNotFoundException ex ) { queue = session.createQueue( MEINE_QUEUE ); ctx.bind( MEINE_QUEUE, queue ); } receiver = session.createReceiver( queue ); connect.start(); for( int i=0; i<20; i++ ) { TextMessage msg = (TextMessage) receiver.receive( 1000 ); if( null != msg ) { System.out.println( msg.getText() ); msg.acknowledge(); } } } finally { try { if( null != receiver ) receiver.close(); } catch( Exception ex ) {/*ok*/} try { if( null != session ) session.close(); } catch( Exception ex ) {/*ok*/} try { if( null != connect ) connect.close(); } catch( Exception ex ) {/*ok*/} try { if( null != ctx ) ctx.close(); } catch( Exception ex ) {/*ok*/} } } }
  3. Queue-Listener

    Der folgende Queue-Listener empf�ngt 20 Sekunden lang Nachrichten von einer Queue.

    Anders als bei 'receive()' braucht er als Listener nicht auf Messages zu warten, sondern empf�ngt sie asynchron, indem er sich �ber 'QueueReceiver.setMessageListener(new MyQueueListener())' registriert, wodurch Messages an die 'onMessage(Message message)'-Callback-Methode geschickt werden.

    Speichern Sie im Verzeichnis '<MeinJmsProjekt>\src\meinjmspackage' die folgende Datei 'MyQueueListener.java':

    package meinjmspackage; import javax.jms.*; import javax.naming.*; public class MyQueueListener implements MessageListener { public static void main( String[] args ) throws Exception { final String MEINE_QUEUE = "queue/testQueue"; Context ctx = null; QueueConnection connect = null; QueueSession session = null; Queue queue = null; QueueReceiver receiver = null; try { ctx = new InitialContext(); QueueConnectionFactory fact = (QueueConnectionFactory) ctx.lookup( "ConnectionFactory" ); connect = fact.createQueueConnection(); session = connect.createQueueSession( false, Session.AUTO_ACKNOWLEDGE ); try { queue = (Queue) ctx.lookup( MEINE_QUEUE ); } catch( NameNotFoundException ex ) { queue = session.createQueue( MEINE_QUEUE ); ctx.bind( MEINE_QUEUE, queue ); } receiver = session.createReceiver( queue ); receiver.setMessageListener( new MyQueueListener() ); connect.start(); Thread.sleep( 20000 ); } finally { try { if( null != receiver ) receiver.close(); } catch( Exception ex ) {/*ok*/} try { if( null != session ) session.close(); } catch( Exception ex ) {/*ok*/} try { if( null != connect ) connect.close(); } catch( Exception ex ) {/*ok*/} try { if( null != ctx ) ctx.close(); } catch( Exception ex ) {/*ok*/} } } public void onMessage( Message message ) { try { TextMessage msg = (TextMessage) message; System.out.println( msg.getText() ); message.acknowledge(); } catch( JMSException ex ) { System.out.println( ex.getMessage() ); } } }
  4. Achten Sie darauf, dass die 'Vorbereitungen' durchgef�hrt sind.

    Pr�fen Sie mit der 'JNDI-Context-Testroutine', ob unter 'queue' der Eintrag 'queue/testQueue' aufgelistet ist.

    Starten Sie JBoss und �ffnen Sie drei Kommandozeilenfenster.

    Geben Sie im ersten Kommandozeilenfenster ein:

    cd \MeinWorkspace\MeinJmsProjekt

    set CLASSPATH=.;bin;conf;lib/jbossall-client.jar

    javac -d bin src/meinjmspackage/*.java

    java meinjmspackage.MyQueueListener

    Geben Sie im zweiten Kommandozeilenfenster ein:

    cd \MeinWorkspace\MeinJmsProjekt

    set CLASSPATH=.;bin;conf;lib/jbossall-client.jar

    java meinjmspackage.MyQueueReceive

    Geben Sie im dritten Kommandozeilenfenster ein:

    cd \MeinWorkspace\MeinJmsProjekt

    set CLASSPATH=.;bin;conf;lib/jbossall-client.jar

    java meinjmspackage.MyQueueSender

    Starten Sie dabei das jeweils letzte 'java ...'-Kommando in der Reihenfolge der Kommandozeilenfenster, also 'MyQueueSender' zuletzt. Starten Sie diese drei Kommandos m�glichst kurz hintereinander.

    In allen drei Kommandozeilenfenstern sollen Nachrichten erscheinen. Im 'MyQueueSender'-Kommandozeilenfenster erscheinen alle zehn Nachrichten. Die beiden anderen Kommandozeilenfenster teilen sich die Nachrichten, jede Nachricht erscheint also in genau einem Empf�nger-Kommandozeilenfenster.

Topic (Publish-and-Subscribe)

  1. TopicPublisher

    Vergegenw�rtigen Sie sich den Unterschied zwischen 'Queue' und 'Topic'.

    Der folgende TopicPublisher sendet zehn Wetternachrichten an ein Topic.

    Dabei gibt es eine Besonderheit: W�hrend 'Wetterlage', 'Windrichtung' und 'Temperatur' normal �ber eine 'MapMessage' im Body der 'Message' �bermittelt werden, wird die 'Stadt'-Information getrennt davon als 'Message'-Property gesetzt. Dies werden wir weiter unten im 'Topic-Listener'-Beispiel f�r einen Nachrichtenfilter ausnutzen.

    Speichern Sie im Verzeichnis '<MeinJmsProjekt>\src\meinjmspackage' die folgende Datei 'MyTopicPublisher.java':

    package meinjmspackage; import javax.jms.*; import javax.naming.*; public class MyTopicPublisher { public static void main( String[] args ) throws NamingException, JMSException { final String WETTER_TOPIC = "topic/WetterTopic"; final String[] STAEDTE = { "Aachen", "Berlin" }; final String[] WETTERLAGEN = { "Sonnig", "Wolkig", "Regen " }; final String[] WINDRICHTUNGEN = { "Nord", "West", "Sued", "Ost " }; Context ctx = null; TopicConnection connect = null; TopicSession session = null; Topic topic = null; TopicPublisher sender = null; try { ctx = new InitialContext(); TopicConnectionFactory fact = (TopicConnectionFactory) ctx.lookup( "ConnectionFactory" ); connect = fact.createTopicConnection(); session = connect.createTopicSession( false, Session.AUTO_ACKNOWLEDGE ); try { topic = (Topic) ctx.lookup( WETTER_TOPIC ); } catch( NameNotFoundException ex ) { topic = session.createTopic( WETTER_TOPIC ); ctx.bind( WETTER_TOPIC, topic ); } sender = session.createPublisher( topic ); connect.start(); for( int i=0; i<10; i++ ) { int rndm = (int) (Math.random() * 300); String stadt = STAEDTE[rndm % STAEDTE.length]; String wetterlage = WETTERLAGEN[rndm % WETTERLAGEN.length]; String windrichtung = WINDRICHTUNGEN[rndm % WINDRICHTUNGEN.length]; double temperatur = (double) (rndm - 50) / 10; MapMessage msg = session.createMapMessage(); msg.setStringProperty( "Stadt", stadt ); msg.setString( "Wetterlage", wetterlage ); msg.setString( "Windrichtung", windrichtung ); msg.setDouble( "Temperatur", temperatur ); sender.publish( msg ); System.out.println( "Sending Stadt=" + stadt + " Wl=" + wetterlage + " Wr=" + windrichtung + " T=" + temperatur + "C" ); // System.out.println( "Sending " + msg.toString() ); } } finally { try { if( null != sender ) sender.close(); } catch( Exception ex ) {/*ok*/} try { if( null != session ) session.close(); } catch( Exception ex ) {/*ok*/} try { if( null != connect ) connect.close(); } catch( Exception ex ) {/*ok*/} try { if( null != ctx ) ctx.close(); } catch( Exception ex ) {/*ok*/} } } }
  2. Topic-receive

    Der folgende Topic-Empf�nger empf�ngt maximal 20 Sekunden lang bis zu 20 Wetternachrichten von einem Topic.

    Anders als ein Listener wartet 'TopicSubscriber.receive(1000)' synchron auf Messages (mit vorgegebenem Timeout).

    Speichern Sie im Verzeichnis '<MeinJmsProjekt>\src\meinjmspackage' die folgende Datei 'MyTopicReceive.java':

    package meinjmspackage; import javax.jms.*; import javax.naming.*; public class MyTopicReceive { public static void main( String[] args ) throws Exception { final String WETTER_TOPIC = "topic/WetterTopic"; Context ctx = null; TopicConnection connect = null; TopicSession session = null; Topic topic = null; TopicSubscriber subscrib = null; try { ctx = new InitialContext(); TopicConnectionFactory fact = (TopicConnectionFactory) ctx.lookup( "ConnectionFactory" ); connect = fact.createTopicConnection(); session = connect.createTopicSession( false, Session.AUTO_ACKNOWLEDGE ); try { topic = (Topic) ctx.lookup( WETTER_TOPIC ); } catch( NameNotFoundException ex ) { topic = session.createTopic( WETTER_TOPIC ); ctx.bind( WETTER_TOPIC, topic ); } subscrib = session.createSubscriber( topic ); connect.start(); for( int i=0; i<20; i++ ) { MapMessage msg = (MapMessage)subscrib.receive( 1000 ); if( null != msg ) { System.out.println( "Receiving: Stadt=" + msg.getStringProperty( "Stadt" ) + " Wl=" + msg.getString( "Wetterlage" ) + " Wr=" + msg.getString( "Windrichtung" ) + " T=" + msg.getDouble( "Temperatur" ) + "C" ); msg.acknowledge(); } } } finally { try { if( null != subscrib ) subscrib.close(); } catch( Exception ex ) {/*ok*/} try { if( null != session ) session.close(); } catch( Exception ex ) {/*ok*/} try { if( null != connect ) connect.close(); } catch( Exception ex ) {/*ok*/} try { if( null != ctx ) ctx.close(); } catch( Exception ex ) {/*ok*/} } } }
  3. Topic-Listener (mit Filter)

    Der folgende Topic-Listener empf�ngt 20 Sekunden lang Wetternachrichten von einem Topic.

    Anders als bei 'receive()' braucht er als Listener nicht auf Messages zu warten, sondern empf�ngt sie asynchron, indem er sich �ber 'TopicSubscriber.setMessageListener(new MyTopicListener())' registriert, wodurch Messages an die 'onMessage(Message message)'-Callback-Methode geschickt werden.

    Dabei gibt es eine Besonderheit: Oben im 'TopicPublisher'-Beispiel wurden 'Wetterlage', 'Windrichtung' und 'Temperatur' normal �ber eine 'MapMessage' im Body der 'Message' �bermittelt, w�hrend die 'Stadt'-Information getrennt davon als 'Message'-Property gesetzt wurde. Dies wird jetzt f�r einen Nachrichtenfilter ausgenutzt. Wird ein Stadtname als Kommandozeilen-Argument �bergeben, wird 'createSubscriber()' nicht mit
    subscrib = session.createSubscriber( topic ); sondern stattdessen mit
    subscrib = session.createSubscriber( topic, "Stadt='"+ args[0] + "'", true ); aufgerufen, und es werden nur Nachrichten f�r die gew�hlte Stadt ausfiltert und �bermittelt.

    Speichern Sie im Verzeichnis '<MeinJmsProjekt>\src\meinjmspackage' die folgende Datei 'MyTopicListener.java':

    package meinjmspackage; import javax.jms.*; import javax.naming.*; public class MyTopicListener implements MessageListener { public static void main( String[] args ) throws Exception { final String WETTER_TOPIC = "topic/WetterTopic"; Context ctx = null; TopicConnection connect = null; TopicSession session = null; Topic topic = null; TopicSubscriber subscrib = null; try { ctx = new InitialContext(); TopicConnectionFactory fact = (TopicConnectionFactory) ctx.lookup( "ConnectionFactory" ); connect = fact.createTopicConnection(); session = connect.createTopicSession( false, Session.AUTO_ACKNOWLEDGE ); try { topic = (Topic) ctx.lookup( WETTER_TOPIC ); } catch( NameNotFoundException ex ) { topic = session.createTopic( WETTER_TOPIC ); ctx.bind( WETTER_TOPIC, topic ); } if( null != args && 0 < args.length ) subscrib = session.createSubscriber( topic, "Stadt='"+ args[0] + "'", true ); else subscrib = session.createSubscriber( topic ); subscrib.setMessageListener( new MyTopicListener() ); connect.start(); Thread.sleep( 20000 ); } finally { try { if( null != subscrib ) subscrib.close(); } catch( Exception ex ) {/*ok*/} try { if( null != session ) session.close(); } catch( Exception ex ) {/*ok*/} try { if( null != connect ) connect.close(); } catch( Exception ex ) {/*ok*/} try { if( null != ctx ) ctx.close(); } catch( Exception ex ) {/*ok*/} } } public void onMessage( Message message ) { try { MapMessage msg = (MapMessage)message; System.out.println( "Receiving: Stadt=" + msg.getStringProperty( "Stadt" ) + " Wl=" + msg.getString( "Wetterlage" ) + " Wr=" + msg.getString( "Windrichtung" ) + " T=" + msg.getDouble( "Temperatur" ) + "C" ); message.acknowledge(); } catch( JMSException ex ) { System.out.println( ex.getMessage() ); } } }
  4. Achten Sie darauf, dass die 'Vorbereitungen' durchgef�hrt sind.

    Pr�fen Sie mit der 'JNDI-Context-Testroutine', ob unter 'topic' der Eintrag 'topic/WetterTopic' aufgelistet ist.

    Starten Sie JBoss und �ffnen Sie drei Kommandozeilenfenster.

    Geben Sie im ersten Kommandozeilenfenster ein:

    cd \MeinWorkspace\MeinJmsProjekt

    set CLASSPATH=.;bin;conf;lib/jbossall-client.jar

    javac -d bin src/meinjmspackage/*.java

    java meinjmspackage.MyTopicListener

    Geben Sie im zweiten Kommandozeilenfenster ein:

    cd \MeinWorkspace\MeinJmsProjekt

    set CLASSPATH=.;bin;conf;lib/jbossall-client.jar

    java meinjmspackage.MyTopicReceive

    Geben Sie im dritten Kommandozeilenfenster ein:

    cd \MeinWorkspace\MeinJmsProjekt

    set CLASSPATH=.;bin;conf;lib/jbossall-client.jar

    java meinjmspackage.MyTopicPublisher

    Starten Sie dabei das jeweils letzte 'java ...'-Kommando in der Reihenfolge der Kommandozeilenfenster, also 'MyTopicPublisher' zuletzt. Starten Sie diese drei Kommandos m�glichst kurz hintereinander.

    In allen drei Kommandozeilenfenstern erscheinen die gleichen Wetternachrichten (allerdings nicht immer in der gleichen Reihenfolge).

    Geben Sie anschlie�end in einem Kommandozeilenfenster ein:

    java meinjmspackage.MyTopicListener Aachen

    Geben Sie sofort danach in einem anderen Kommandozeilenfenster ein:

    java meinjmspackage.MyTopicPublisher

    Jetzt werden nur noch Wetternachrichten f�r Aachen empfangen.

QueueRequestor

  1. QueueRequestor

    Der folgende QueueRequestor sendet zehn Nachrichten an ein Queue. Anders als die bisherigen Sender/Publisher erwartet er eine Antwortnachricht. F�r jede Nachricht �ffnet er einen tempor�ren R�ckkanal, macht diesen �ber das JMSReplyTo-Feld zug�nglich, registriert sich als Empf�nger und wartet auf eine Antwort.

    Speichern Sie im Verzeichnis '<MeinJmsProjekt>\src\meinjmspackage' die folgende Datei 'MyQueueRequestor.java':

    package meinjmspackage; import javax.jms.*; import javax.naming.*; public class MyQueueRequestor { public static void main( String[] args ) throws NamingException, JMSException { final String MEINE_QUEUE = "queue/testQueue"; Context ctx = null; QueueConnection connect = null; QueueSession session = null; Queue queue = null; QueueRequestor sender = null; try { ctx = new InitialContext(); QueueConnectionFactory fact = (QueueConnectionFactory) ctx.lookup( "ConnectionFactory" ); connect = fact.createQueueConnection(); session = connect.createQueueSession( false, Session.AUTO_ACKNOWLEDGE ); try { queue = (Queue) ctx.lookup( MEINE_QUEUE ); } catch( NameNotFoundException ex ) { queue = session.createQueue( MEINE_QUEUE ); ctx.bind( MEINE_QUEUE, queue ); } sender = new QueueRequestor( session, queue ); connect.start(); for( int i=0; i<10; i++ ) { TextMessage msg = session.createTextMessage(); msg.setText( "Die " + (i+1) + ". Anfrage des MyQueueRequestors." ); System.out.println( "Sending: " + msg.getText() ); TextMessage answer = (TextMessage) sender.request( msg ); System.out.println( "Reveived: " + answer.getText() ); } } finally { try { if( null != sender ) sender.close(); } catch( Exception ex ) {/*ok*/} try { if( null != session ) session.close(); } catch( Exception ex ) {/*ok*/} try { if( null != connect ) connect.close(); } catch( Exception ex ) {/*ok*/} try { if( null != ctx ) ctx.close(); } catch( Exception ex ) {/*ok*/} } } }
  2. QueueRequestor-reply

    Der folgende Nachrichtenbeantworter empf�ngt 20 Sekunden lang Nachrichten von einer Queue und beantwortet sie.

    Speichern Sie im Verzeichnis '<MeinJmsProjekt>\src\meinjmspackage' die folgende Datei 'MyQueueRequestorAnswer.java':

    package meinjmspackage; import javax.jms.*; import javax.naming.*; public class MyQueueRequestorAnswer implements MessageListener { final String MEINE_QUEUE = "queue/testQueue"; Context ctx = null; QueueConnection connect = null; QueueSession session = null; Queue queue = null; QueueReceiver receiver = null; QueueSender replySnd = null; public static void main( String[] args ) throws Exception { new MyQueueRequestorAnswer(); } public MyQueueRequestorAnswer() throws Exception { try { ctx = new InitialContext(); QueueConnectionFactory fact = (QueueConnectionFactory) ctx.lookup( "ConnectionFactory" ); connect = fact.createQueueConnection(); session = connect.createQueueSession( false, Session.AUTO_ACKNOWLEDGE ); try { queue = (Queue) ctx.lookup( MEINE_QUEUE ); } catch( NameNotFoundException ex ) { queue = session.createQueue( MEINE_QUEUE ); ctx.bind( MEINE_QUEUE, queue ); } receiver = session.createReceiver( queue ); receiver.setMessageListener( this ); connect.start(); Thread.sleep( 20000 ); } finally { try { if( null != replySnd ) replySnd.close(); } catch( Exception ex ) {/*ok*/} try { if( null != receiver ) receiver.close(); } catch( Exception ex ) {/*ok*/} try { if( null != session ) session.close(); } catch( Exception ex ) {/*ok*/} try { if( null != connect ) connect.close(); } catch( Exception ex ) {/*ok*/} try { if( null != ctx ) ctx.close(); } catch( Exception ex ) {/*ok*/} } } public void onMessage( Message message ) { try { TextMessage msg = (TextMessage) message; System.out.println( "Reveived: " + msg.getText() ); TextMessage answer = session.createTextMessage(); answer.setText( "Echo '" + msg.getText() + "'." ); System.out.println( "Answering: " + answer.getText() ); Queue reply = (Queue)msg.getJMSReplyTo(); replySnd = session.createSender( reply ); replySnd.send( answer ); replySnd.close(); replySnd = null; message.acknowledge(); } catch( JMSException ex ) { System.out.println( ex.getMessage() ); } } }
  3. Achten Sie darauf, dass die 'Vorbereitungen' durchgef�hrt sind.

    Pr�fen Sie mit der 'JNDI-Context-Testroutine', ob unter 'queue' der Eintrag 'queue/testQueue' aufgelistet ist.

    Starten Sie JBoss und �ffnen Sie zwei Kommandozeilenfenster.

    Geben Sie im ersten Kommandozeilenfenster ein:

    cd \MeinWorkspace\MeinJmsProjekt

    set CLASSPATH=.;bin;conf;lib/jbossall-client.jar

    javac -d bin src/meinjmspackage/*.java

    java meinjmspackage.MyQueueRequestorAnswer

    Geben Sie im zweiten Kommandozeilenfenster ein:

    cd \MeinWorkspace\MeinJmsProjekt

    set CLASSPATH=.;bin;conf;lib/jbossall-client.jar

    java meinjmspackage.MyQueueRequestor

    Starten Sie dabei das jeweils letzte 'java ...'-Kommando in der Reihenfolge der Kommandozeilenfenster, also 'MyQueueRequestor' zuletzt. Starten Sie diese zwei Kommandos m�glichst kurz hintereinander.

    In den beiden Kommandozeilenfenstern k�nnen Sie das Anfrage-/Antwort-Spiel beobachten.

MDB (Message Driven Bean)

Erl�uterungen und ein Programmierbeispiel zu MDB (Message Driven Bean) finden Sie in jee-ejb2.htm#Beispiel-MDB.

Links auf weiterf�hrende Informationen

  • JMS: //www.oracle.com/technetwork/java/jms
  • Java EE Tutorial: //docs.oracle.com/javaee/7/tutorial
  • Java EE API: //docs.oracle.com/javaee/7/api
  • Java SE API: //docs.oracle.com/javase/7/docs/api
  • JBoss: //www.jboss.org/jbossas
  • Bond/Law/Longshaw/Haywood/Roxburgh, J2EE in 21 Days, 2004: Rezension, Amazon.de 0672325586
  • Stark, J2EE, 2004: Rezension, Amazon.de 3827321840
Weitere Themen: andere TechDocs | JSP | EJB | SQL
© 1998-2007 Torsten Horn, Aachen

Was ist ein Topic bei MQTT?

MQTT Topics Die Kommunikation von MQTT stützt sich auf ein sogenanntes “Topic”-Prinzip: Jede Nachricht wir einem Topic zugeordnet. Das heißt, jede valide MQTT-Nachricht enthält eine Payload mit einem zugehörigen Topic. Topics sind von der Funktions- und Schreibsyntax Ordnern in einem Filesystem sehr ähnlich.

Was ist eine JMS Queue?

jms. Queue. Bei diesem Konzept veröffentlicht der Sender, in diesem Fall auch Publisher genannt eine Nachricht für viele Abnehmer (Subscriber). Die Nachricht wird an alle registrierten Abnehmer automatisch verteilt.

Wo wird MQTT eingesetzt?

Das Nachrichtenprotokoll MQTT ist gerade für Umgebungen mit niedriger Bandbreite und hoher Latenz und damit für die Machine-to-Machine-Kommunikation (M2M) geeignet. Das über einen zentralen Broker betriebene Publish-and-Subscribe-Prinzip ist daher im IoT sehr beliebt.

Wer nutzt MQTT?

Aber auch klassische Nachrichten, wie Aktienkurse oder Kurzmitteilungen. Facebook nutzt MQTT für seine Mobile-Apps. MQTT wurde ursprünglich von IBM und Eurotech um die Jahrtausendwende konzipiert, um Telemetriedaten zwischen Sensoren und Servern über unzuverlässige Datenverbindungen zu übertragen.

Toplist

Neuester Beitrag

Stichworte