
Python Script to Decode NRF in MQTT.
This is a simple script to decode NRF Messages inside of MQTT on a single topic and write the unencrypted data to a new topic.
In Meshtastic there are two main variants of the hardware that are used NRF and ESP32 based devices. Each have their own pluses and minuses. The most common devices such as RAK, Seeed Studios etc.. are NRF. The reason they are popular is that they are very low power, which means with the proper power settings you can get a week or so on a single charge. They also have more production quaility devices instead of just development devices. Due to this they don’t support sending to MQTT in JSON format. They only send in an Encrypted format used by Meshtastic. This makes using them as a tracker inside of HomeAssistant or the TIG stack impossible unless you decode the string sent to MQTT. ESP32 devices can be configured to send in both formats, encrypted or in JSON directly. I have created a Python script that will pull the encrypted data from a topic, decrypt it and then push it back into another unencrypted topic. This is a work in progress, but it will allow you to see the data in the payload so that you can use it.
You will need to follow these instructions to set it up. I use Ubuntu Linux so all instructions are based on that operating system.
Create the Python Scripts
- Ensure Python is installed
- Make a new directory to store your script in.
- cd to the script directory.
- Run nano meshtastic_nrftojson.py enter the data from the script below into it and save it.
import json
import base64
import paho.mqtt.client as mqtt
from paho.mqtt.client import CallbackAPIVersion
from meshtastic import mqtt_pb2, mesh_pb2, telemetry_pb2
from google.protobuf.json_format import MessageToDict
from Crypto.Cipher import AES
from Crypto.Util import Counter
# --- CONFIGURATION ---
MQTT_HOST = "10.10.10.10"
MQTT_PORT = 1883
MQTT_USER = "mqttuser" # Change this to a valid MQTT User
MQTT_PASS = "newpassword" # Change this to what the user password is for MQTT
MQTT_TOPIC = "msh/US/2/e/LongFast/!8a35dfcc" # Change this to the encrypted topic
DESTINATION_TOPIC = "msh/US/2/json/nrfjson" # Change this to the topic you want the unencrypted data to go.
PSK_BASE64 = "AQ==" # If you have a different key for your channel you will need to change this to match.
CHANNEL_KEY = base64.b64decode(PSK_BASE64)
def decrypt_packet(encrypted_bytes, key, from_id, packet_id):
"""AES-CTR decryption for Meshtastic packets."""
nonce = packet_id.to_bytes(4, 'little') + from_id.to_bytes(4, 'little') + b'x00' * 8
ctr = Counter.new(128, initial_value=int.from_bytes(nonce, 'big'), little_endian=False)
cipher = AES.new(key, AES.MODE_CTR, counter=ctr)
return cipher.decrypt(encrypted_bytes)
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
print(f"Connected to {MQTT_HOST}. Bridging nRF52 data...")
client.subscribe(MQTT_TOPIC)
def on_message(client, userdata, msg):
try:
# 1. Parse the Envelope
envelope = mqtt_pb2.ServiceEnvelope()
envelope.ParseFromString(msg.payload)
packet = envelope.packet
# Safely get IDs (Handling the 'from' vs 'from_' reserved word issue)
from_id = getattr(packet, "from_", 0)
packet_id = getattr(packet, "id", 0)
# 2. Decrypt if necessary
if packet.encrypted:
try:
decrypted_data = decrypt_packet(packet.encrypted, CHANNEL_KEY, from_id, packet_id)
packet.decoded.ParseFromString(decrypted_data)
except Exception as e:
pass # Decryption failed or key incorrect
# 3. Convert to dictionary using 2026 naming conventions
packet_dict = MessageToDict(
packet,
preserving_proto_field_name=True,
always_print_fields_with_no_presence=True
)
decoded_payload = packet_dict.get("decoded", {})
portnum = decoded_payload.get("portnum")
# --- DATA DECODING ---
if portnum == "POSITION_APP":
# Reparse binary for precision
pos = mesh_pb2.Position()
pos.ParseFromString(packet.decoded.payload)
decoded_payload["latitude"] = pos.latitude_i / 1e7 if pos.latitude_i else None
decoded_payload["longitude"] = pos.longitude_i / 1e7 if pos.longitude_i else None
decoded_payload["altitude"] = pos.altitude if pos.altitude else None
elif portnum == "NODEINFO_APP":
# Reparse binary for User information
user = mesh_pb2.User()
user.ParseFromString(packet.decoded.payload)
decoded_payload["long_name"] = user.long_name
decoded_payload["short_name"] = user.short_name
decoded_payload["hw_model"] = user.hw_model
decoded_payload["node_id"] = user.id
elif portnum == "TELEMETRY_APP":
# Reparse binary for Telemetry metrics
tel = telemetry_pb2.Telemetry()
tel.ParseFromString(packet.decoded.payload)
# 1. Device Metrics (Battery, Voltage, Utilization)
if tel.HasField("device_metrics"):
dm = tel.device_metrics
decoded_payload["battery_level"] = dm.battery_level if dm.battery_level else None
decoded_payload["voltage"] = dm.voltage if dm.voltage else None
decoded_payload["channel_utilization"] = dm.channel_utilization
decoded_payload["air_util_tx"] = dm.air_util_tx
# 2. Environment Metrics (Temp, Humidity, Pressure)
elif tel.HasField("environment_metrics"):
em = tel.environment_metrics
decoded_payload["temperature"] = em.temperature if em.temperature else None
decoded_payload["relative_humidity"] = em.relative_humidity if em.relative_humidity else None
decoded_payload["barometric_pressure"] = em.barometric_pressure if em.barometric_pressure else None
# 3. Power Metrics (Current, Bus Voltage)
elif tel.HasField("power_metrics"):
pm = tel.power_metrics
decoded_payload["ch1_voltage"] = pm.ch1_voltage
decoded_payload["ch1_current"] = pm.ch1_current
elif portnum == "TEXT_MESSAGE_APP":
raw_b64 = decoded_payload.get("payload", "")
if raw_b64:
decoded_payload["text"] = base64.b64decode(raw_b64).decode('utf-8', errors='ignore')
# 4. Standardized Output
output = {
"from": from_id,
"to": packet_dict.get("to", 0),
"type": "packet",
"payload": decoded_payload,
"rssi": packet_dict.get("rx_rssi", 0),
"snr": packet_dict.get("rx_snr", 0)
}
# --- NEW: PUBLISH TO DESTINATION TOPIC ---
json_payload = json.dumps(output)
client.publish(DESTINATION_TOPIC, payload=json_payload, qos=1)
# Optional: Keep the print for debugging
print(f"Published decoded packet from {from_id} to {DESTINATION_TOPIC}")
print(json.dumps(output, indent=2))
except Exception as e:
print(f"Bridge Error: {e}")
def main():
client = mqtt.Client(CallbackAPIVersion.VERSION2)
if MQTT_USER:
client.username_pw_set(MQTT_USER, MQTT_PASS)
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_HOST, MQTT_PORT, 60)
client.loop_forever()
if __name__ == "__main__":
main()
Set up the environment
- Run apt update
- Run python3 -m venv mesh-venv
- Run source mesh-venv/bin activate
- Run pip install paho-mqtt meshtastic pycryptodome
- You can now run the script inside the Python venv by typing Python3 Meshtastic_NRFtoJSON.py and pressing Enter. It will just wait for message on the source topic you defined. You can send one and see if it shows up in the destination topic, it will also show you the text it is sending on the screen or any errors that might show up.
- You will want to make this runs as a service on your device so that it runs on boot.
- Run nano /etc/systemd/system/meshtastic-nrftojson.service and input this data into the file and save it.
[Unit]
Description=Meshtastic nRF52 MQTT to JSON Bridge
After=network.target
[Service]
# Path to your virtual env's python and your script
ExecStart=/scripts/mesh-venv/bin/python3 -u /scripts/meshtastic_nrftojson.py
WorkingDirectory=/scripts
Restart=always
User=root
# Ensures logs are sent immediately to journalctl
Environment=PYTHONUNBUFFERED=1
[Install]
WantedBy=multi-user.target
- Run systemctl daemon-reload
- Run systemctl enable meshtastic-nrftojson.service
- Run systemctl start meshtastic-nrftojson.service
- To manage the service you can use these commands.
- service meshtastic-nrftojson start
- service meshtastic-nrftojson restart
- service meshtastic-nrftojson stop
- service meshtastic-nrftojson status
- This service will allow you to see telemetry, GPS, and message data that is sent. This is for a single topic and was made for the Primary channel that gets all of the telemetry and GPS data. If you want to monitor secondary channels you would need to configure it again as a new .py file and an additional service.
