A selector is basically an expression you can use to filter which messages will be dispatched through your consumer. According to the docs, it uses SQL 92 conditional expression syntax:
http://livedocs.adobe.com/blazeds/1/blazeds_devguide/help.html?content=messaging_6.html
A subtopic is sort of a special case of a selector, filtering out messages whose "DSSubtopic" header don't match the provided value.
The important thing to understand with both of these is that the client determines which messages are sent to it, and as such it cannot be relied upon entirely for security.
To implement secure server-based filtering of messages based on an authenticated user's identity, see my answer to a related question here:
Flex Messaging Security
As far as multiple Consumers vs. MultiTopicConsumer, not sure there. They're both going to use the same underlying ChannelSet, so it ought not to have a big performance difference. I think it's mostly a question of whether it's convenient to have one event handler that responds to all messages from the MultiTopicConsumer or whether it's easier to have separate event handlers for each Consumer.
Here is test code I wrote and use, at times, to test sending data to our client. It's a stripped down, bare bones Java example of a ServiceAdapter implementation. It removes a lot of unnecessary code from the existing examples on the web. It Compiles, works and I use it often in testing.
package your.package.structure.adapter;
import your.package.structure.device.DevicePort;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.Message;
import flex.messaging.services.MessageService;
import flex.messaging.services.ServiceAdapter;
import flex.messaging.util.UUIDUtils;
/**
* Test service adapter. Great for testing when you want to JUST SEND AN OBJECT and nothing
* else. This class has to stay in the main codebase (instead of test) because, when it's used
* it needs to be deployed to Tomcat.
* @author Kevin G
*
*/
public class TestServiceAdapter extends ServiceAdapter {
private volatile boolean running;
private Message createTestMessage() {
DevicePort objectToSend = new DevicePort("RouterDevice");
final AsyncMessage msg = new AsyncMessage();
msg.setDestination(getClass().getSimpleName() + "Destination");
msg.setClientId(UUIDUtils.createUUID());
msg.setMessageId(UUIDUtils.createUUID());
msg.setBody(objectToSend);
return msg;
}
private void sendMessageToClients(Message msg) {
((MessageService) getDestination().getService()).pushMessageToClients(msg, false);
}
/**
* @see flex.messaging.services.ServiceAdapter#start()
*/
@Override
public void start(){
super.start();
Thread messageSender = new Thread(){
public void run(){
running = true;
while(running){
sendMessageToClients(createTestMessage());
secondsToSleep(3);
}
}
};
messageSender.start();
}
/**
* @see flex.messaging.services.ServiceAdapter#stop()
*/
@Override
public void stop(){
super.stop();
running = false;
}
/**
* This method is called when a producer sends a message to the destination. Currently,
* we don't care when that happens.
*/
@Override
public Object invoke(Message message) {
if (message.getBody().equals("stop")) {
running = false;
}
return null;
}
private void secondsToSleep(int seconds) {
try{
Thread.sleep(seconds * 1000);
}catch(InterruptedException e){
System.out.println("TestServiceAdapter Interrupted while sending messages");
e.printStackTrace();
}
}
}
You need to set a few properties in tomcat to get this to work.
In messaging-config.xml
, you need to add an adapter and destination:
Add this line to the existing <adapters>
tag:
<adapter-definition id="TestServiceAdapter" class="your.package.structure.adapter.TestServiceAdapter"/>
Add this destination to that same messaging-config.xml
file:
<destination id="TestServiceAdapterDestination">
<channels>
<channel ref="my-streaming-amf"/>
</channels>
<adapter ref="TestServiceAdapter"/>
</destination>
Finally, make sure the "my-streaming-amf" channel is defined in services-config.xml
, as in:
<channel-definition id="my-streaming-amf" class="mx.messaging.channels.StreamingAMFChannel">
<endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/streamingamf" class="flex.messaging.endpoints.StreamingAMFEndpoint"/>
<properties>
<!-- you don't need to set all these properties, this is just what we set, included for illustration, only -->
<idle-timeout-minutes>0</idle-timeout-minutes>
<max-streaming-clients>10</max-streaming-clients>
<server-to-client-heartbeat-millis>5000</server-to-client-heartbeat-millis>
<user-agent-settings>
<user-agent match-on="Safari" kickstart-bytes="2048" max-streaming-connections-per-session="10"/>
<user-agent match-on="MSIE" kickstart-bytes="2048" max-streaming-connections-per-session="15"/>
<user-agent match-on="Firefox" kickstart-bytes="2048" max-streaming-connections-per-session="10"/>
</user-agent-settings>
</properties>
</channel-definition>
Note that in blazeDS, these two config files (messaging-config.xml and services-config.xml) are located in the following directory:
/blazeds/tomcat/webapps/[nameOfYourApp]/WEB-INF/flex/
where [nameOfYourApp]
is the directory your webapp lives in.
I hope all that helps!
-kg
Best Answer
There is indeed a solution for this in the APIs. The first step is to write a class which extends the FlexClientOutboundQueueProcessor class. You need to override one method:
Basically all you need to do is write some logic to determine whether you should make the following call:
Simply put, if you don't add the message to the queue, then the message won't be pushed to the client. The other important method in this class is:
Which you can use to get the associated FlexSession and ultimately the authentication information that presumably exists in your app.
Once this is done, you just need to register the processor with the appropriate channels. Simply add this element within the "properties" element of the "channel-definition" element:
I believe you can also specify a nested "properties" element for the queue processor but I don't believe it's required.