RhasspyVisualConversationTool/RhasspyVisualConversationTool/src/com/jens/rhasspy/visualtool/MQTT.java

280 lines
7.2 KiB
Java

package com.jens.rhasspy.visualtool;
import java.util.Calendar;
import java.util.Timer;
import java.util.TimerTask;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import com.jens.rhasspy.visualtool.lib.JSONObject;
public class MQTT implements MqttCallbackExtended
{
final static String clientId = "RhasspyVisualConversationTool-" + Settings.siteId;
final static int maximumReconnectionAttempts = -1;
final static boolean forceReconnection = false;
Timer timeoutTimer = null;
static String getServerUri()
{
if(Settings.mqttClientUseSsl)
return "ssl://" + Settings.mqttClientServerHostname + ":" + String.valueOf(Settings.mqttClientServerPort);
else
return "tcp://" + Settings.mqttClientServerHostname + ":" + String.valueOf(Settings.mqttClientServerPort);
}
static MQTT instance = null;
static MqttClient client = null;
public static MQTT getInstance()
{
if(instance == null)
instance = new MQTT();
return instance;
}
public MqttClient getClient()
{
if(client == null)
{
try
{
/*
* Documentation of options:
* https://www.ibm.com/support/knowledgecenter/SSFKSJ_7.5.0/com.ibm.mq.javadoc.doc/WMQMQxrClasses/org/eclipse/paho/client/mqttv3/MqttConnectOptions.html#setKeepAliveInterval(int)
*/
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setAutomaticReconnect(true);
if(Settings.mqttClientUseSsl)
{
// Properties props = new Properties();
// props.put("secureProtocol", "TLSv1_method");
// options.setSSLProperties(props);
// options.setSSLProperties(props);
options.setHttpsHostnameVerificationEnabled(true);
}
if(Settings.mqttClientUseAuthentication)
{
options.setUserName(Settings.mqttClientUsername);
options.setPassword(Settings.mqttClientPassword.toCharArray());
}
client = new MqttClient(getServerUri(), clientId);
client.setCallback(this);
}
catch (MqttException e)
{
Miscellaneous.logEvent(Miscellaneous.getStackTraceAsString(e), 1);
}
}
return client;
}
protected boolean subscribeToTopic(String topic)
{
Miscellaneous.logEvent("Subscribing to topic " + topic, 3);
try
{
if(!getClient().isConnected())
{
Miscellaneous.logEvent("Connecting to MQTT server " + Settings.mqttClientServerHostname + " on port " + String.valueOf(Settings.mqttClientServerPort) + ".", 3);
// getClient().setCallback(this);
getClient().connect();
}
getClient().subscribe(topic);
return true;
}
catch (MqttException e)
{
Miscellaneous.logEvent("Error subscribing to topic " + topic + ": " + Miscellaneous.getStackTraceAsString(e), 1);
}
return false;
}
protected boolean unsubscribeFromTopic(String topic)
{
Miscellaneous.logEvent("Unsubscribing from topic " + topic, 3);
try
{
if(getClient().isConnected())
{
getClient().unsubscribe(topic);
}
return true;
}
catch (MqttException e)
{
Miscellaneous.logEvent(Miscellaneous.getStackTraceAsString(e), 1);
}
return false;
}
@Override
public void connectionLost(Throwable arg0)
{
Miscellaneous.logEvent("Connection to MQTT server lost.", 3);
Miscellaneous.logEvent("Reason for loss of MQTT connection: " + Miscellaneous.getStackTraceAsString(arg0), 5);
GUI.getInstance().updateConnectionStatus(false);
// Currently set to autoreconnect, but that doesn't always seem to work
// reconnectManually();
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0)
{
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception
{
Miscellaneous.logEvent("Received MQTT message from topic: " + topic + ", message: " + message, 3);
MqttMessageDistributor mmd = new MqttMessageDistributor(topic, message);
Thread worker = new Thread(mmd);
worker.start();
if(MqttTopic.isMatched(Settings.topicNameSessionEnded, topic))
unscheduleTimeout();
else
scheduleTimeout();
}
class MqttMessageDistributor implements Runnable
{
String topic;
MqttMessage message;
public MqttMessageDistributor(String topic, MqttMessage message)
{
this.topic = topic;
this.message = message;
}
@Override
public void run()
{
Thread.setDefaultUncaughtExceptionHandler(Miscellaneous.getUncaughtExceptionHandler(3));
try
{
Object[] object = new Object[3];
object[0] = message;
object[1] = topic;
object[2] = Calendar.getInstance();
Miscellaneous.logEvent("New message received on topic " + object[1] + Miscellaneous.lineSeparator + object[0], 2);
String message = new String(((MqttMessage)object[0]).getPayload());
// if(!StringUtils.isEmpty(message))
if(message != null && message.length() > 0)
{
JSONObject jo = new JSONObject(message);
if(jo.has(Settings.jsonObjectStringSiteId) && jo.getString(Settings.jsonObjectStringSiteId).equalsIgnoreCase(Settings.siteId))
{
GUI.getInstance().handleNewMessage(object);
// for(String ignoredSite : Settings.ignoredSatellites.split(";"))
// {
// if(ignoredSite.equalsIgnoreCase(jo.getString(Settings.jsonObjectStringSiteId)))
// {
// Miscellaneous.logEvent("Ignoring message as it's targeted to " + ignoredSite, 3);
// return;
// }
// }
}
else
Miscellaneous.logEvent("Message not for me.", 1);
}
}
catch(Exception e)
{
Miscellaneous.logEvent("Error after receiving MQTT message: " + Miscellaneous.getStackTraceAsString(e), 1);
}
}
}
public boolean publish(String topic, String message)
{
try
{
getClient().publish(topic, new MqttMessage(message.getBytes()));
return true;
}
catch (MqttPersistenceException e)
{
Miscellaneous.logEvent(Miscellaneous.getStackTraceAsString(e), 1);
}
catch (MqttException e)
{
Miscellaneous.logEvent(Miscellaneous.getStackTraceAsString(e), 1);
}
return false;
}
@Override
public void connectComplete(boolean reconnect, String serverURI)
{
if(reconnect)
Miscellaneous.logEvent("Reconnected to MQTT server " + serverURI, 3);
else
Miscellaneous.logEvent("Connected to MQTT server " + serverURI, 3);
GUI.getInstance().updateConnectionStatus(true);
}
void scheduleTimeout()
{
TimerTask timerTask = new TimerTask()
{
@Override
public void run()
{
Object[] object = new Object[3];
object[0] = new MqttMessage();
object[1] = Settings.topicNameSessionEnded;
object[2] = Calendar.getInstance();
GUI.getInstance().handleNewMessage(object);
}
};
// if(timeoutTimer == null)
timeoutTimer = new Timer();
// else
// timeoutTimer.cancel();
timeoutTimer.schedule(timerTask, Settings.sessionTimeout);
}
void unscheduleTimeout()
{
if(timeoutTimer != null)
timeoutTimer.cancel();
}
}