-- This program is a Gateway between a xAP client that subscribe via a -- mqtt broker and those that publish their messages via the iServer -- or directly as UDP packets. It operates by subscribing to the mqtt -- broker and listening for all topics being published. As each -- message via the broker is an xAP packet it picks up the -- xap-hbeat/xap-header and extracts the source= tag. From this it -- can create an xAP FILTER that allows it to redirect xAP messages to -- the broker. module(...,package.seeall) require("xap") MQTT = require("mqtt_library") info = {version="1.1", description="xAP/MQTT gateway"} -- nil port means use the default broker = {host="172.16.10.150", port=nil} --mqtt broker location subscribedTopics = {} function xAPAddrToTopic(addr) -- convert a.b.c:d.e -> a/b/c/d/e return addr:gsub("[%.:]", "/") end function sendJsonValue(frame) local value = frame:getValue("input.state", "text") local state = frame:getValue("input.state", "state") if state == '?' then -- state = "Unknown" end -- converts xAP payload into usefull info in JSON format for HomeAssistant return '{"text":"' .. value .. '","state":"' .. state .. '"}' end function bumpHop(f, section) if f[section] == nil then return end if f[section]['hop'] then f[section].hop = f[section].hop + 1 else f[section].hop = 2 end end function toMqtt(frame) local source = frame:getValue("xap-header", "source") local target = frame:getValue("xap-header", "target") -- Filtering out some pointless messages if frame:getValue("input.state", "text") == nil then return end bumpHop(frame) -- Build MQTT topic from source address local topic = xAPAddrToTopic(source) -- If target is empty(and it usually is), fill it with the source address if target == nil then frame:setValue('xap-header', 'target', source) end -- rebadge the message as coming from the GW. (this changes the source asddress to 'dbzoo.hah.plugboard' -- we use this fact later to ignore messages from ourself. frame:setValue("xap-header", "source", xap.defaultKeys.source) -- Assuming we're connected if broker.c.connected then -- Sends stripped out frame in name:value pair format broker.c:publish(topic, sendJsonValue(frame)) end print("Publish to topic: "..topic) -- print("Payload: "..tostring(frame)) end function fromMqtt(topic, payload) local f = xap.Frame(payload) if f:getValue("xap-header", "class") == 'xAPBSC.event' then return -- Dont want events from MQTT end -- This happens as we subscribe to # so we get our own publish. if f:getValue("xap-header", "source") == xap.defaultKeys.source then -- print(xap.defaultKeys.source); return -- Message was from us to the MQTT end -- For New Topics if subscribedTopics[topic] == nil then local source = f:getValue("xap-header", "source") if source == nil then source = f:getValue("xap-hbeat", "source") end if source == nil then -- Handle a bad xAP packet - missing source! return end print("Bind topic "..topic.." to "..source) local flt = xap.Filter() flt:add("xap-header", "target", topic) flt:callback(toMqtt) subscribedTopics[topic] = true end -- Sending if f['xap-header'] then -- sendShort only for xap-header messages bumpHop(f, 'xap-header') xap.sendShort(tostring(f)) -- print("shortSending: "..tostring(f)); else -- expect the xap-hbeat to be fully formed bumpHop(f, 'xap-hbeat') xap.send(tostring(f)) -- print("Sending: "..tostring(f)); end end function connect() print("Connecting...") broker.c = MQTT.client.create(broker.host, broker.port, fromMqtt) broker.c:connect(xap.defaultKeys.source) print("@connect: "..xap.defaultKeys.source); if broker.c.connected then print("Connected") -- As long as we don't have a lot of topics and high volumes we -- should be able to keep up with filtering and forwarding. -- I know subscribing to # is not recommended. :( broker.c:subscribe({"#"}) end end function init() connect() -- pump the mqtt client every second local flt = xap.Filter() --Create a filter for all event msgs out. -- Tried to use multiple 'adds' but this didn't work, so just doing all flt:add("xap-header", "source", "dbzoo.hal.*:>") flt:add("xap-header", "class", "xAPBSC.event") flt:callback(toMqtt) -- straight to MQTT out xap.Timer(function() -- If the mqtt server goes away recover. if broker.c.connected then broker.c:handler() else connect() end end, 1):start(true) -- and fire now to get us going at 0sec. end