Micropython: ESP-Now avec ESP32 – Recevez des données (plusieurs à un)

Micropython: ESP-Now avec ESP32 - Recevez des données (plusieurs à un)

Dans ce guide MicropyThon, nous vous montrerons comment configurer un ESP32 pour recevoir et afficher des données de plusieurs cartes ESP32 à l’aide du protocole de communication ESP-Now (configuration de plusieurs à un). Nous allons construire un exemple de projet où les cartes des expéditeurs transmettent des données de capteur au format JSON, et une seule carte de récepteur collecte les données et les affiche sur un affichage OLED. Le tutoriel utilisera deux cartes d’expéditeur à titre d’exemple, mais la configuration peut être facilement mise à l’échelle pour en inclure plus.

Micropython: ESP-Now avec ESP32 - Receive les données de plusieurs planches (plusieurs à un)

Utiliser Arduino IDE? Suivez ce tutoriel à la place: ESP-Now avec ESP32: recevez des données de plusieurs planches (plusieurs à un).

Prérequis – Firmware Micropython

Pour suivre ce didacticiel, vous avez besoin de micro-firmware Micropython installé sur vos cartes ESP32 ou ESP8266. Vous avez également besoin d’un IDE pour écrire et télécharger le code sur votre carte. Nous suggérons d’utiliser Thonny IDE:

Nouveau sur Micropython? Consultez notre ebook: Micropython Programming avec ESP32 et ESP8266 Ebook (2e édition)

Présentation ESP-NOW

ESP-Now est un protocole de communication sans fil développé par ESPRESSIF qui permet à plusieurs cartes ESP32 ou ESP8266 d’échanger de petites quantités de données sans utiliser Wi-Fi ou Bluetooth. ESP-Now ne nécessite pas de connexion Wi-Fi complète (bien que le contrôleur Wi-Fi doit être activé), ce qui le rend idéal pour les applications de faible puissance et de faible latence comme les réseaux de capteurs, les télécommandes ou l’échange de données entre les cartes.

ESP-NOW - logo ESP32

ESP-Now utilise un modèle de communication sans connexion, ce qui signifie que les périphériques peuvent envoyer et recevoir des données sans se connecter à un routeur ou configurer un point d’accès (contrairement à la communication HTTP entre les cartes). Il prend en charge unicast (envoyer des données à un appareil spécifique à l’aide de son adresse MAC) et diffuser (envoyer des données à tous les appareils à l’aide d’une adresse MAC diffusée).

Nouveau sur ESP-NOW? Lisez notre guide de démarrage: Micropython: ESP-Now avec ESP32 (démarrage).

Aperçu du projet

Ce tutoriel montre comment configurer une carte ESP32 pour recevoir des données de plusieurs cartes ESP32 via le protocole de communication ESP-Now (configuration de plusieurs à un) comme indiqué dans la figure suivante.

ESP-Now ESP32 Recevez des données de plusieurs planches (plusieurs à une configuration)
  • Un conseil d’administration ESP32 agit comme un récepteur;
  • Plusieurs cartes ESP32 agissent comme des expéditeurs. Dans cet exemple, nous utilisons deux cartes d’expéditeur ESP32. Vous pouvez ajouter plus de planches à votre configuration si nécessaire.
  • La carte réceptrice ESP32 reçoit les messages de tous les expéditeurs et identifie la carte a envoyé le message.
  • Par exemple, nous échangerons des valeurs de capteur BME280 entre les cartes. Vous pouvez modifier ce projet pour utiliser tout autre capteur ou échanger d’autres données entre les cartes.

Pièces requises

Pour suivre le projet dans ce tutoriel, vous aurez besoin des parties suivantes:

Vous pouvez utiliser les liens précédents ou aller directement à makeradvisor.com/tools pour trouver toutes les pièces de vos projets au meilleur prix!

1758844807 849 Micropython ESP Now avec ESP32 Recevez des donnees plusieurs a

ESP32: obtenir une adresse MAC à la carte

Pour communiquer via ESP-Now, vous devez connaître l’adresse MAC de vos conseils. Pour obtenir l’adresse MAC de votre carte, vous pouvez copier le code suivant sur Thonny IDE et l’exécuter sur votre carte.

# Rui Santos & Sara Santos - Raspberryme.com
# Complete project details at https://Raspberryme.com/micropython-esp-now-esp32/
 
import network

wlan = network.WLAN(network.STA_IF)
wlan.active(True)

# Get MAC address (returns bytes)
mac = wlan.config('mac')

# Convert to human-readable format
mac_address=":".join('%02x' % b for b in mac)

print("MAC Address:", mac_address)

Afficher le code brut

Après avoir exécuté le code, il devrait imprimer l’adresse MAC de la carte sur le shell.

Thonny IDE - Obtenez l'adresse MAC de la carte ESP32 - Micropython

Obtenez l’adresse MAC pour tous vos conseils.

Par exemple, dans mon cas, je comprends:

  • Board des expéditeurs 1: 24: 0a: C4: 31: 40: 50
  • Board de l’expéditeur 2: 30: AE: A4: F6: 7D: 4C
  • Conseil du récepteur: 30: AE: A4: 07: 0D: 64

Préparer les tableaux des expéditeurs

Pour ce tutoriel, nous enverrons des données à une carte réceptrice ESP32 (via ESP-Now) à partir de deux cartes différentes. Vous pouvez modifier ce projet pour envoyer des données à partir de plus de cartes.

Chaque carte sera identifiée par un ID (un nombre que nous attribuerons à chaque carte:

  • Id = 1 pour la carte1
  • Id = 2 pour la carte2
Deux planches ESP32 avec un capteur BME280

Câblage du capteur BME280

Chaque carte des expéditeurs enverra des données environnementales à partir d’un capteur BME280. Tirez un capteur BME280 à chacune de vos planches. Nous utiliserons les broches I2C par défaut ESP32.

En savoir plus sur I2C avec la communication ESP32: ESP32 I2C: Définissez des épingles, plusieurs interfaces de bus et périphériques (Arduino IDE).

ESP32 BME280 Température Température Humidité Pression de câblage Circuit

Vous ne connaissez pas le BME280 avec l’ESP32? Lisez ce tutoriel: Micropython: BME280 avec ESP32 (pression, température, humidité).

Si vous utilisez un modèle de carte différent, les broches I2C par défaut peuvent être différentes.

Importation de la bibliothèque BME280

La bibliothèque pour s’interfacer avec le capteur BME280 ne fait pas partie de la bibliothèque Micropython standard. Ainsi, vous devez télécharger le fichier de bibliothèque dans chacun de vos cartes d’expéditeur.

Module BME280.py Micropython

1. Copiez le code suivant dans un nouveau fichier sur Thonny IDE.

from machine import I2C
import time

# BME280 default address.
BME280_I2CADDR = 0x76

# Operating Modes
BME280_OSAMPLE_1 = 1
BME280_OSAMPLE_2 = 2
BME280_OSAMPLE_4 = 3
BME280_OSAMPLE_8 = 4
BME280_OSAMPLE_16 = 5

# BME280 Registers

BME280_REGISTER_DIG_T1 = 0x88  # Trimming parameter registers
BME280_REGISTER_DIG_T2 = 0x8A
BME280_REGISTER_DIG_T3 = 0x8C

BME280_REGISTER_DIG_P1 = 0x8E
BME280_REGISTER_DIG_P2 = 0x90
BME280_REGISTER_DIG_P3 = 0x92
BME280_REGISTER_DIG_P4 = 0x94
BME280_REGISTER_DIG_P5 = 0x96
BME280_REGISTER_DIG_P6 = 0x98
BME280_REGISTER_DIG_P7 = 0x9A
BME280_REGISTER_DIG_P8 = 0x9C
BME280_REGISTER_DIG_P9 = 0x9E

BME280_REGISTER_DIG_H1 = 0xA1
BME280_REGISTER_DIG_H2 = 0xE1
BME280_REGISTER_DIG_H3 = 0xE3
BME280_REGISTER_DIG_H4 = 0xE4
BME280_REGISTER_DIG_H5 = 0xE5
BME280_REGISTER_DIG_H6 = 0xE6
BME280_REGISTER_DIG_H7 = 0xE7

BME280_REGISTER_CHIPID = 0xD0
BME280_REGISTER_VERSION = 0xD1
BME280_REGISTER_SOFTRESET = 0xE0

BME280_REGISTER_CONTROL_HUM = 0xF2
BME280_REGISTER_CONTROL = 0xF4
BME280_REGISTER_CONFIG = 0xF5
BME280_REGISTER_PRESSURE_DATA = 0xF7
BME280_REGISTER_TEMP_DATA = 0xFA
BME280_REGISTER_HUMIDITY_DATA = 0xFD


class Device:
  """Class for communicating with an I2C device.

  Allows reading and writing 8-bit, 16-bit, and byte array values to
  registers on the device."""

  def __init__(self, address, i2c):
    """Create an instance of the I2C device at the specified address using
    the specified I2C interface object."""
    self._address = address
    self._i2c = i2c

  def writeRaw8(self, value):
    """Write an 8-bit value on the bus (without register)."""
    value = value & 0xFF
    self._i2c.writeto(self._address, value)

  def write8(self, register, value):
    """Write an 8-bit value to the specified register."""
    b=bytearray(1)
    b[0]=value & 0xFF
    self._i2c.writeto_mem(self._address, register, b)

  def write16(self, register, value):
    """Write a 16-bit value to the specified register."""
    value = value & 0xFFFF
    b=bytearray(2)
    b[0]= value & 0xFF
    b[1]= (value>>8) & 0xFF
    self.i2c.writeto_mem(self._address, register, value)

  def readRaw8(self):
    """Read an 8-bit value on the bus (without register)."""
    return int.from_bytes(self._i2c.readfrom(self._address, 1),'little') & 0xFF

  def readU8(self, register):
    """Read an unsigned byte from the specified register."""
    return int.from_bytes(
        self._i2c.readfrom_mem(self._address, register, 1),'little') & 0xFF

  def readS8(self, register):
    """Read a signed byte from the specified register."""
    result = self.readU8(register)
    if result > 127:
      result -= 256
    return result

  def readU16(self, register, little_endian=True):
    """Read an unsigned 16-bit value from the specified register, with the
    specified endianness (default little endian, or least significant byte
    first)."""
    result = int.from_bytes(
        self._i2c.readfrom_mem(self._address, register, 2),'little') & 0xFFFF
    if not little_endian:
      result = ((result << 8) & 0xFF00) + (result >> 8)
    return result

  def readS16(self, register, little_endian=True):
    """Read a signed 16-bit value from the specified register, with the
    specified endianness (default little endian, or least significant byte
    first)."""
    result = self.readU16(register, little_endian)
    if result > 32767:
      result -= 65536
    return result

  def readU16LE(self, register):
    """Read an unsigned 16-bit value from the specified register, in little
    endian byte order."""
    return self.readU16(register, little_endian=True)

  def readU16BE(self, register):
    """Read an unsigned 16-bit value from the specified register, in big
    endian byte order."""
    return self.readU16(register, little_endian=False)

  def readS16LE(self, register):
    """Read a signed 16-bit value from the specified register, in little
    endian byte order."""
    return self.readS16(register, little_endian=True)

  def readS16BE(self, register):
    """Read a signed 16-bit value from the specified register, in big
    endian byte order."""
    return self.readS16(register, little_endian=False)


class BME280:
  def __init__(self, mode=BME280_OSAMPLE_1, address=BME280_I2CADDR, i2c=None,
               **kwargs):
    # Check that mode is valid.
    if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,
                    BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
        raise ValueError(
            'Unexpected mode value {0}. Set mode to one of '
            'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
            'BME280_ULTRAHIGHRES'.format(mode))
    self._mode = mode
    # Create I2C device.
    if i2c is None:
      raise ValueError('An I2C object is required.')
    self._device = Device(address, i2c)
    # Load calibration values.
    self._load_calibration()
    self._device.write8(BME280_REGISTER_CONTROL, 0x3F)
    self.t_fine = 0

  def _load_calibration(self):

    self.dig_T1 = self._device.readU16LE(BME280_REGISTER_DIG_T1)
    self.dig_T2 = self._device.readS16LE(BME280_REGISTER_DIG_T2)
    self.dig_T3 = self._device.readS16LE(BME280_REGISTER_DIG_T3)

    self.dig_P1 = self._device.readU16LE(BME280_REGISTER_DIG_P1)
    self.dig_P2 = self._device.readS16LE(BME280_REGISTER_DIG_P2)
    self.dig_P3 = self._device.readS16LE(BME280_REGISTER_DIG_P3)
    self.dig_P4 = self._device.readS16LE(BME280_REGISTER_DIG_P4)
    self.dig_P5 = self._device.readS16LE(BME280_REGISTER_DIG_P5)
    self.dig_P6 = self._device.readS16LE(BME280_REGISTER_DIG_P6)
    self.dig_P7 = self._device.readS16LE(BME280_REGISTER_DIG_P7)
    self.dig_P8 = self._device.readS16LE(BME280_REGISTER_DIG_P8)
    self.dig_P9 = self._device.readS16LE(BME280_REGISTER_DIG_P9)

    self.dig_H1 = self._device.readU8(BME280_REGISTER_DIG_H1)
    self.dig_H2 = self._device.readS16LE(BME280_REGISTER_DIG_H2)
    self.dig_H3 = self._device.readU8(BME280_REGISTER_DIG_H3)
    self.dig_H6 = self._device.readS8(BME280_REGISTER_DIG_H7)

    h4 = self._device.readS8(BME280_REGISTER_DIG_H4)
    h4 = (h4 << 24) >> 20
    self.dig_H4 = h4 | (self._device.readU8(BME280_REGISTER_DIG_H5) & 0x0F)

    h5 = self._device.readS8(BME280_REGISTER_DIG_H6)
    h5 = (h5 << 24) >> 20
    self.dig_H5 = h5 | (
        self._device.readU8(BME280_REGISTER_DIG_H5) >> 4 & 0x0F)

  def read_raw_temp(self):
    """Reads the raw (uncompensated) temperature from the sensor."""
    meas = self._mode
    self._device.write8(BME280_REGISTER_CONTROL_HUM, meas)
    meas = self._mode << 5 | self._mode << 2 | 1
    self._device.write8(BME280_REGISTER_CONTROL, meas)
    sleep_time = 1250 + 2300 * (1 << self._mode)

    sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
    sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
    time.sleep_us(sleep_time)  # Wait the required time
    msb = self._device.readU8(BME280_REGISTER_TEMP_DATA)
    lsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 1)
    xlsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 2)
    raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
    return raw

  def read_raw_pressure(self):
    """Reads the raw (uncompensated) pressure level from the sensor."""
    """Assumes that the temperature has already been read """
    """i.e. that enough delay has been provided"""
    msb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA)
    lsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 1)
    xlsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 2)
    raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
    return raw

  def read_raw_humidity(self):
    """Assumes that the temperature has already been read """
    """i.e. that enough delay has been provided"""
    msb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA)
    lsb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA + 1)
    raw = (msb << 8) | lsb
    return raw

  def read_temperature(self):
    """Get the compensated temperature in 0.01 of a degree celsius."""
    adc = self.read_raw_temp()
    var1 = ((adc >> 3) - (self.dig_T1 << 1)) * (self.dig_T2 >> 11)
    var2 = ((
        (((adc >> 4) - self.dig_T1) * ((adc >> 4) - self.dig_T1)) >> 12) *
        self.dig_T3) >> 14
    self.t_fine = var1 + var2
    return (self.t_fine * 5 + 128) >> 8

  def read_pressure(self):
    """Gets the compensated pressure in Pascals."""
    adc = self.read_raw_pressure()
    var1 = self.t_fine - 128000
    var2 = var1 * var1 * self.dig_P6
    var2 = var2 + ((var1 * self.dig_P5) << 17)
    var2 = var2 + (self.dig_P4 << 35)
    var1 = (((var1 * var1 * self.dig_P3) >> 8) +
            ((var1 * self.dig_P2) >> 12))
    var1 = (((1 << 47) + var1) * self.dig_P1) >> 33
    if var1 == 0:
      return 0
    p = 1048576 - adc
    p = (((p << 31) - var2) * 3125) // var1
    var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25
    var2 = (self.dig_P8 * p) >> 19
    return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4)

  def read_humidity(self):
    adc = self.read_raw_humidity()
    # print 'Raw humidity = {0:d}'.format (adc)
    h = self.t_fine - 76800
    h = (((((adc << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) +
         16384) >> 15) * (((((((h * self.dig_H6) >> 10) * (((h *
                          self.dig_H3) >> 11) + 32768)) >> 10) + 2097152) *
                          self.dig_H2 + 8192) >> 14))
    h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4)
    h = 0 if h < 0 else h
    h = 419430400 if h > 419430400 else h
    return h >> 12

  @property
  def temperature(self):
    "Return the temperature in degrees."
    t = self.read_temperature()
    ti = t // 100
    td = t - ti * 100
    return "{}.{:02d}C".format(ti, td)

  @property
  def pressure(self):
    "Return the temperature in hPa."
    p = self.read_pressure() // 256
    pi = p // 100
    pd = p - pi * 100
    return "{}.{:02d}hPa".format(pi, pd)

  @property
  def humidity(self):
    "Return the humidity in percent."
    h = self.read_humidity()
    hi = h // 1024
    hd = h * 100 // 1024 - hi * 100
    return "{}.{:02d}%".format(hi, hd)

Afficher le code brut

2. Allez dans le fichier> Enregistrer sous…

Thonny IDE ESP32 ESP8266 Micropython Enregistrer la bibliothèque de fichiers à l'appareil Enregistrer sous

3. Sélectionnez Enregistrer sur «Micropython Device»:

Thonny IDE ESP32 ESP8266 Micropython Enregistrer la bibliothèque de fichiers à Sélection de l'appareil

4. Nommez votre fichier comme bme280.py et appuyez sur le bouton OK:

Bibliothèque BME280 Nouveau fichier micropython thonny ide

Et c’est tout. La bibliothèque a été téléchargée sur votre carte.

Répétez ce processus pour tous vos tableaux d’expéditeurs.

ESP32 ESP-NOW SEPHERS – Script micropython

Le code suivant lit les données du capteur BME280 et les envoie via ESP-Now dans une variable JSON à la carte réceptrice ESP32. N’oubliez pas de modifier le code avec l’adresse MAC de votre carte de récepteur. De plus, n’oubliez pas de modifier l’ID de la carte pour chacun de vos conseils.

Dans ce code, nous utilisons le module Aioespnow, qui nous permet d’utiliser ESP-Now de manière asynchrone. Pour en savoir plus sur la programmation asynchrone avec Micropython, lisez le tutoriel suivant: Micropython: ESP32 / ESP8266 Programmation asynchrone – Exécutez plusieurs tâches.

# Rui Santos & Sara Santos - Raspberryme.com
# Complete project details at https://Raspberryme.com/micropython-esp-now-esp32-many-to-one/

import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280

# Board ID
BOARD_ID = 1

# Receiver's MAC address
peer_mac = b'\xff\xff\xff\xff\xff\xff'

# Interval for sending data (in seconds)
send_interval = 10 

# Initialize I2C and BME280
try:
    i2c = I2C(0, scl=Pin(22), sda=Pin(21))  # Adjust pins as needed
    bme = BME280.BME280(i2c=i2c, address=0x76)
except OSError as err:
    print("Failed to initialize BME280:", err)
    raise

# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
try:
    sta.active(True)
    sta.config(channel=1)  # Set channel explicitly
    sta.disconnect()
except OSError as err:
    print("Failed to initialize Wi-Fi:", err)
    raise

# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
    e.active(True)
except OSError as err:
    print("Failed to initialize AIOESPNow:", err)
    raise

# Add peer
try:
    e.add_peer(peer_mac)
except OSError as err:
    print("Failed to add peer:", err)
    raise

# Counter for reading ID
reading_id = 0

def read_temperature():
    try:
        return float(bme.temperature[:-1])  # Remove 'C' from string
    except Exception as err:
        print("Error reading temperature:", err)
        return 0.0

def read_humidity():
    try:
        return float(bme.humidity[:-1])  # Remove '%' from string
    except Exception as err:
        print("Error reading humidity:", err)
        return 0.0

def prepare_sensor_data():
    global reading_id
    data = {
        'id': BOARD_ID,
        'temp': read_temperature(),
        'hum': read_humidity(),
        'readingId': reading_id
    }
    reading_id += 1
    # Serialize to JSON and encode to bytes
    return ujson.dumps(data).encode('utf-8')

async def send_messages(e, peer):
    while True:
        try:
            # Prepare and serialize sensor data
            message = prepare_sensor_data()
            # Send JSON bytes
            if await e.asend(peer, message, sync=True):
                print(f"Sent data: {message.decode('utf-8')}")
            else:
                print("Failed to send data")
        except OSError as err:
            print("Send error:", err)
            await asyncio.sleep(5)
        await asyncio.sleep(send_interval)  # Wait before next send

async def main(e, peer):
    try:
        await send_messages(e, peer)
    except Exception as err:
        print(f"Error in main: {err}")
        await asyncio.sleep(5)
        raise

# Run the async program
try:
    asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
    print("Stopping sender...")
    e.active(False)
    sta.active(False)

Afficher le code brut

Comment fonctionne le code?

Jetons un coup d’œil sur le fonctionnement du code de la carte de l’expéditeur. Alternativement, vous pouvez passer à la section suivante.

Importation de bibliothèques

Commencez par importer les bibliothèques requises pour utiliser ESP-Now, le capteur BME280 et gérer les variables JSON.

import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280

Définition de l’ID de la carte

Donnez une pièce d’identité unique à votre planche afin que le récepteur puisse facilement l’identifier. Ici, nous numérotions simplement les planches, mais vous pouvez utiliser une méthode différente, comme leur donner un nom, ou simplement les identifier par l’adresse MAC (dans ce cas, vous n’avez pas besoin de ce paramètre).

# Board ID
BOARD_ID = 2

Adresse MAC du récepteur

Insérez l’adresse MAC du récepteur. Par exemple, dans mon cas, l’adresse MAC de la carte réceptrice est de 30: AE: A4: 07: 0D: 64. Donc, je dois ajouter de l’OT au code en octets comme ceci:

# Receiver's MAC address
peer_mac = b'\x30\xae\xa4\x07\x0d\x64'

Intervalle d’envoi

Nous enverrons de nouvelles lectures via ESP-Now toutes les 10 secondes. Vous pouvez ajuster cette période sur la variable Send_interval.

# Interval for sending data (in seconds)
send_interval = 10 

Initialiser le capteur BME280

Initialisez une communication I2C sur GPIOS 22 et 21. Ajustez si vous utilisez différentes broches. Nous initialisons également le capteur BME280.

# Initialize I2C and BME280
try:
    i2c = I2C(0, scl=Pin(22), sda=Pin(21))  # Adjust pins as needed
    bme = BME280.BME280(i2c=i2c, address=0x76)
except OSError as err:
    print("Failed to initialize BME280:", err)
    raise

Initialiser l’interface Wi-Fi et ESP-Now

Pour utiliser ESP-Now, nous devons activer l’interface Wi-Fi. Dans les lignes suivantes, nous initialisons le protocole de communication Wi-Fi et ESP-Now à l’aide du module AIOSPNow.

# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
try:
    sta.active(True)
    sta.config(channel=1)  # Set channel explicitly
    sta.disconnect()
except OSError as err:
    print("Failed to initialize Wi-Fi:", err)
    raise

# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
    e.active(True)
except OSError as err:
    print("Failed to initialize AIOESPNow:", err)
    raise

Ajouter des pairs

Dans la ligne suivante, nous ajoutons la carte récepteur en tant que pair. La fonction E.Add_peer (Peer_MAC) est nécessaire pour une communication Unicast fiable dans Aioespnow. Il enregistre l’adresse MAC du récepteur pour s’assurer que l’expéditeur peut envoyer des messages à cette carte spécifique.

# Add peer
try:
    e.add_peer(peer_mac)
except OSError as err:
    print("Failed to add peer:", err)
    raise

Obtenir des données de capteur BME280

Les fonctions suivantes renvoient la température et l’humidité du capteur BME280.

def read_temperature():
    try:
        return float(bme.temperature[:-1])  # Remove 'C' from string
    except Exception as err:
        print("Error reading temperature:", err)
        return 0.0

def read_humidity():
    try:
        return float(bme.humidity[:-1])  # Remove '%' from string
    except Exception as err:
        print("Error reading humidity:", err)
        return 0.0

Les fonctions de la bibliothèque BME280 renvoient les résultats avec le caractère de l’unité, c’est pourquoi nous supprimons les caractères à la fin des lectures.

En savoir plus sur l’interfaçage du BME280 avec l’ESP32 en utilisant Micropython: Micropython: BME280 avec ESP32 (pression, température, humidité).

Ajout des données à une variable JSON

Nous créons une fonction appelée préparent_sensor_data () qui renverra une variable JSON contenant les données que nous souhaitons envoyer. Dans ce cas, nous envoyons l’ID du conseil d’administration, la température, l’humidité et l’ID de lecture (juste un nombre pour suivre le nombre de lectures prises depuis le début du programme).

def prepare_sensor_data():
    global reading_id
    data = {
        'id': BOARD_ID,
        'temp': read_temperature(),
        'hum': read_humidity(),
        'readingId': reading_id
    }
    reading_id += 1
    # Serialize to JSON and encode to bytes
    return ujson.dumps(data).encode('utf-8')

Envoi de messages via ESP-Now

Ensuite, nous créons une fonction asynchrone pour envoyer les messages ESP-Now de manière asynchrone.

async def send_messages(e, peer):
    while True:
        try:
            # Prepare and serialize sensor data
            message = prepare_sensor_data()
            # Send JSON bytes
            if await e.asend(peer, message, sync=True):
                print(f"Sent data: {message.decode('utf-8')}")
            else:
                print("Failed to send data")
        except OSError as err:
            print("Send error:", err)
            await asyncio.sleep(5)
        await asyncio.sleep(send_interval)  # Wait before next send

Exécuter les tâches asynchrones

Ensuite, la fonction suivante exécute la tâche Send_Messages pour envoyer des données via ESP-Now.

async def main(e, peer):
    try:
        await send_messages(e, peer)
    except Exception as err:
        print(f"Error in main: {err}")
        await asyncio.sleep(5)
        raise

Enfin, nous commençons le programme asynchrone.

# Run the async program
try:
    asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
    print("Stopping sender...")
    e.active(False)
    sta.active(False)

Exécuter et télécharger le code sur vos planches

Après avoir copié le code sur Thonny IDE et effectué les modifications requises, testez le code en l’exécutant sur Thonny IDE.

Après avoir établi une connexion avec votre carte, vous pouvez cliquer sur le bouton Green Exécuter.

Bouton de run green thonny ide

Vous devriez obtenir quelque chose de similaire, comme indiqué dans l’image ci-dessous, dans votre coque micropython.

ESP32 Sender ESP-NOW - Échec de l'envoi

Pour le moment, la livraison échouera car nous n’avons pas encore défini la carte récepteur.

Téléchargement du code sur vos planches

Remarque importante: le simple fait d’exécuter le fichier avec Thonny ne le copie pas en permanence au système de fichiers de la carte. Cela signifie que si vous le débranchez à partir de votre ordinateur et appliquez de l’alimentation à la carte, rien ne se passera car il n’a pas de fichier Python enregistré sur son système de fichiers. La fonction Thonny IDE Run est utile pour tester le code, mais si vous souhaitez le télécharger en permanence sur votre carte, vous devez créer et enregistrer un fichier sur le système de fichiers de la carte.

Pour exécuter le code sur vos planches sans être connecté à votre ordinateur, vous devez le télécharger sur le système de fichiers de la carte avec le nom main.py.

Allez dans Fichier> Enregistrer sous … Appareil micropython.

Thonny IDE ESP32 ESP8266 Micropython Enregistrer la bibliothèque de fichiers à Sélection de l'appareil

Appelez ce fichier main.py et enregistrez-le sur la carte.

Téléchargement du fichier MicropyThon principal sur la carte ESP32 à l'aide de Thonny IDE

Maintenant, si vous redémarrez votre carte, il commencera à exécuter le code. Vous ne pourrez peut-être pas accéder à la coque Micropython par la suite.

Préparation de la carte réceptrice

La carte réceptrice ESP32 recevra les données des différentes cartes de l’expéditeur et l’affichera sur l’écran OLED. Dans cet exemple, nous recevons des données de deux conseils différents

ESP-NOW avec Micropython ESP32: Configuration avec un récepteur et deux cartes d'expéditeur

Câblage de l’écran OLED

Tirez l’écran OLED à votre carte ESP32. Nous utilisons les broches I2C par défaut – GPIO 22 (SCL) et GPIO21 (SDA). Ajustez si vous utilisez un modèle ESP32 avec un épingle différent.

ESP32 avec diagramme de câblage d'affichage OLED

Importation de la bibliothèque SSD1306

La bibliothèque pour s’interfacer avec le capteur OLED ne fait pas partie de la bibliothèque Micropython standard. Vous devez donc télécharger le fichier de bibliothèque sur votre carte récepteur.

Module SSD1306.PY Micropython

1. Copiez le code suivant dans un nouveau fichier sur Thonny IDE.

# MicroPython SSD1306 OLED driver, I2C and SPI interfaces created by Adafruit

import time
import framebuf

# register definitions
SET_CONTRAST        = const(0x81)
SET_ENTIRE_ON       = const(0xa4)
SET_NORM_INV        = const(0xa6)
SET_DISP            = const(0xae)
SET_MEM_ADDR        = const(0x20)
SET_COL_ADDR        = const(0x21)
SET_PAGE_ADDR       = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP       = const(0xa0)
SET_MUX_RATIO       = const(0xa8)
SET_COM_OUT_DIR     = const(0xc0)
SET_DISP_OFFSET     = const(0xd3)
SET_COM_PIN_CFG     = const(0xda)
SET_DISP_CLK_DIV    = const(0xd5)
SET_PRECHARGE       = const(0xd9)
SET_VCOM_DESEL      = const(0xdb)
SET_CHARGE_PUMP     = const(0x8d)


class SSD1306:
    def __init__(self, width, height, external_vcc):
        self.width = width
        self.height = height
        self.external_vcc = external_vcc
        self.pages = self.height // 8
        # Note the subclass must initialize self.framebuf to a framebuffer.
        # This is necessary because the underlying data buffer is different
        # between I2C and SPI implementations (I2C needs an extra byte).
        self.poweron()
        self.init_display()

    def init_display(self):
        for cmd in (
            SET_DISP | 0x00, # off
            # address setting
            SET_MEM_ADDR, 0x00, # horizontal
            # resolution and layout
            SET_DISP_START_LINE | 0x00,
            SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
            SET_MUX_RATIO, self.height - 1,
            SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
            SET_DISP_OFFSET, 0x00,
            SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12,
            # timing and driving scheme
            SET_DISP_CLK_DIV, 0x80,
            SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1,
            SET_VCOM_DESEL, 0x30, # 0.83*Vcc
            # display
            SET_CONTRAST, 0xff, # maximum
            SET_ENTIRE_ON, # output follows RAM contents
            SET_NORM_INV, # not inverted
            # charge pump
            SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14,
            SET_DISP | 0x01): # on
            self.write_cmd(cmd)
        self.fill(0)
        self.show()

    def poweroff(self):
        self.write_cmd(SET_DISP | 0x00)

    def contrast(self, contrast):
        self.write_cmd(SET_CONTRAST)
        self.write_cmd(contrast)

    def invert(self, invert):
        self.write_cmd(SET_NORM_INV | (invert & 1))

    def show(self):
        x0 = 0
        x1 = self.width - 1
        if self.width == 64:
            # displays with width of 64 pixels are shifted by 32
            x0 += 32
            x1 += 32
        self.write_cmd(SET_COL_ADDR)
        self.write_cmd(x0)
        self.write_cmd(x1)
        self.write_cmd(SET_PAGE_ADDR)
        self.write_cmd(0)
        self.write_cmd(self.pages - 1)
        self.write_framebuf()

    def fill(self, col):
        self.framebuf.fill(col)

    def pixel(self, x, y, col):
        self.framebuf.pixel(x, y, col)

    def scroll(self, dx, dy):
        self.framebuf.scroll(dx, dy)

    def text(self, string, x, y, col=1):
        self.framebuf.text(string, x, y, col)


class SSD1306_I2C(SSD1306):
    def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False):
        self.i2c = i2c
        self.addr = addr
        self.temp = bytearray(2)
        # Add an extra byte to the data buffer to hold an I2C data/command byte
        # to use hardware-compatible I2C transactions.  A memoryview of the
        # buffer is used to mask this byte from the framebuffer operations
        # (without a major memory hit as memoryview doesn't copy to a separate
        # buffer).
        self.buffer = bytearray(((height // 8) * width) + 1)
        self.buffer[0] = 0x40  # Set first byte of data buffer to Co=0, D/C=1
        self.framebuf = framebuf.FrameBuffer1(memoryview(self.buffer)[1:], width, height)
        super().__init__(width, height, external_vcc)

    def write_cmd(self, cmd):
        self.temp[0] = 0x80 # Co=1, D/C#=0
        self.temp[1] = cmd
        self.i2c.writeto(self.addr, self.temp)

    def write_framebuf(self):
        # Blast out the frame buffer using a single I2C transaction to support
        # hardware I2C interfaces.
        self.i2c.writeto(self.addr, self.buffer)

    def poweron(self):
        pass


class SSD1306_SPI(SSD1306):
    def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
        self.rate = 10 * 1024 * 1024
        dc.init(dc.OUT, value=0)
        res.init(res.OUT, value=0)
        cs.init(cs.OUT, value=1)
        self.spi = spi
        self.dc = dc
        self.res = res
        self.cs = cs
        self.buffer = bytearray((height // 8) * width)
        self.framebuf = framebuf.FrameBuffer1(self.buffer, width, height)
        super().__init__(width, height, external_vcc)

    def write_cmd(self, cmd):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs.high()
        self.dc.low()
        self.cs.low()
        self.spi.write(bytearray([cmd]))
        self.cs.high()

    def write_framebuf(self):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs.high()
        self.dc.high()
        self.cs.low()
        self.spi.write(self.buffer)
        self.cs.high()

    def poweron(self):
        self.res.high()
        time.sleep_ms(1)
        self.res.low()
        time.sleep_ms(10)
        self.res.high()

Afficher le code brut

2. Allez dans le fichier> Enregistrer AS et sélectionner le périphérique Micropython.

Thonny IDE ESP32 ESP8266 Micropython Enregistrer la bibliothèque de fichiers à Sélection de l'appareil

3. Nommez le fichier ssd1306.py et cliquez sur OK pour enregistrer le fichier sur le système de fichiers ESP.

Et c’est tout. La bibliothèque a été téléchargée sur votre carte. Maintenant, vous pouvez utiliser les fonctionnalités de la bibliothèque dans votre code en important la bibliothèque.

ESP-Now Receiver Board – Script micropython

Le code suivant configurera votre carte ESP32 en tant que récepteur ESP-Now pour obtenir des données des deux autres cartes des expéditeurs. Après avoir reçu les données, il sera affiché sur l’écran OLED.

N’oubliez pas de modifier le code pour ajouter les adresses MAC de vos cartes de l’expéditeur.

# Rui Santos & Sara Santos - Raspberryme.com
# Complete project details at https://Raspberryme.com/micropython-esp-now-esp32-many-to-one/

import network
import aioespnow
import asyncio
import ujson
from machine import Pin, I2C
import ssd1306

# Initialize I2C for OLED
try:
    i2c = I2C(0, scl=Pin(22), sda=Pin(21))  # Adjust pins as needed
    display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
    print("SSD1306 initialized")
except Exception as err:
    print("Failed to initialize SSD1306:", err)
    raise

# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
try:
    sta.active(True)
    sta.config(channel=1)  # Set channel explicitly
    sta.disconnect()
except OSError as err:
    print("Failed to initialize Wi-Fi:", err)
    raise

# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
    e.active(True)
except OSError as err:
    print("Failed to initialize AIOESPNow:", err)
    raise

# Sender's MAC addresses (replace with actual sender MACs)
sender_mac_1 = b'\x24\x0a\xc4\x31\x40\x50'  # First sender's MAC (Board ID=1)
sender_mac_2 = b'\x30\xae\xa4\xf6\x7d\x4c'  # Second sender's MAC (Board ID=2)

# Add peers
try:
    e.add_peer(sender_mac_1)
except OSError as err:
    print(f"Failed to add peer {sender_mac_1.hex()}:", err)
    raise

try:
    e.add_peer(sender_mac_2)
except OSError as err:
    print(f"Failed to add peer {sender_mac_2.hex()}:", err)
    raise

# Dictionary to store latest readings for each board
board_readings = {
    1: {'temp': 0.0, 'hum': 0.0, 'readingId': 0},
    2: {'temp': 0.0, 'hum': 0.0, 'readingId': 0}
}

# Update OLED display with temperature and humidity for both boards on separate lines.
def update_display():
    try:
        display.fill(0)
        # Board 1 data
        display.text("Board 1:", 0, 0)
        display.text(f"Temp: {board_readings[1]['temp']:.1f} C", 0, 10)
        display.text(f"Hum: {board_readings[1]['hum']:.1f} %", 0, 20)
        # Board 2 data
        display.text("Board 2:", 0, 32)
        display.text(f"Temp: {board_readings[2]['temp']:.1f} C", 0, 42)
        display.text(f"Hum: {board_readings[2]['hum']:.1f} %", 0, 52)
        display.show()
        print("Display updated")
    except Exception as err:
        print("Error updating display:", err)

# Async function to receive and process messages.
async def receive_messages(e):
    print("Listening for ESP-NOW messages...")
    while True:
        try:
            async for mac, msg in e:
                try:
                    # Decode and parse JSON message
                    json_str = msg.decode('utf-8')
                    data = ujson.loads(json_str)
                    
                    # Extract parameters
                    board_id = data['id']
                    temperature = data['temp']
                    humidity = data['hum']
                    reading_id = data['readingId']
                    
                    # Store in board_readings dictionary
                    if board_id in (1, 2):
                        board_readings[board_id] = {
                            'temp': temperature,
                            'hum': humidity,
                            'readingId': reading_id
                        }
                        # Update OLED display
                        update_display()
                    
                    # Display on MicroPython terminal
                    print(f"\nReceived from {mac.hex()}:")
                    print(f"  Board ID: {board_id}")
                    print(f"  Temperature: {temperature} C")
                    print(f"  Humidity: {humidity} %")
                    print(f"  Reading ID: {reading_id}")
                except (ValueError, KeyError) as err:
                    print(f"Error parsing JSON: {err}")
        except OSError as err:
            print("Receive error:", err)
            await asyncio.sleep(5)

async def main(e):
    await receive_messages(e)

# Run the async program
try:
    asyncio.run(main(e))
except KeyboardInterrupt:
    print("Stopping receiver...")
    e.active(False)
    sta.active(False)

Afficher le code brut

Comment fonctionne le code?

Jetons un coup d’œil sur le fonctionnement du code de la carte de l’expéditeur. Alternativement, vous pouvez passer à la section suivante.

Importation de bibliothèques

Commencez par importer les bibliothèques requises (assurez-vous de télécharger le fichier ssd1306.py précédemment pour ajouter la prise en charge de l’interfaçage avec l’écran).

import network
import aioespnow
import asyncio
import ujson
from machine import Pin, I2C
import ssd1306

Initialisation de l’écran OLED

Initialisez la communication I2C et l’affichage OLED avec les lignes suivantes.

# Initialize I2C for OLED
try:
    i2c = I2C(0, scl=Pin(22), sda=Pin(21))  # Adjust pins as needed
    display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
    print("SSD1306 initialized")
except Exception as err:
    print("Failed to initialize SSD1306:", err)
    raise

Initialisation de l’interface WiFi et ESP-Now

Initialisez l’interface WiFi et ESP-Now.

# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
try:
    sta.active(True)
    sta.config(channel=1)  # Set channel explicitly
    sta.disconnect()
except OSError as err:
    print("Failed to initialize Wi-Fi:", err)
    raise

# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
    e.active(True)
except OSError as err:
    print("Failed to initialize AIOESPNow:", err)
    raise

Ajouter les tableaux des expéditeurs en tant que pairs

Ajoutez les cartes des expéditeurs en tant que pairs (il n’est pas obligatoire d’ajouter les cartes de l’expéditeur en tant que pairs du côté récepteur, mais il garantit plus de fiabilité dans la communication).

Insérez les adresses MAC des cartes de l’expéditeur dans les variables suivantes (assurez-vous d’insérer l’adresse MAC au format des octets).

# Sender's MAC addresses (replace with actual sender MACs)
sender_mac_1 = b'\x24\x0a\xc4\x31\x40\x50'  # First sender's MAC (Board ID=1)
sender_mac_2 = b'\x30\xae\xa4\xf6\x7d\x4c'  # Second sender's MAC (Board ID=2)

Ensuite, ajoutez-les en tant que pairs ESP-Now:

# Add peers
try:
    e.add_peer(sender_mac_1)
except OSError as err:
    print(f"Failed to add peer {sender_mac_1.hex()}:", err)
    raise

try:
    e.add_peer(sender_mac_2)
except OSError as err:
    print(f"Failed to add peer {sender_mac_2.hex()}:", err)
    raise

Dictionnaire pour sauver les lectures

Créez un dictionnaire pour stocker les dernières lectures de capteurs reçues de chaque planche. Enregistrer ces informations dans un dictionnaire est un excellent moyen de sauvegarder toutes les données reçues dans une seule variable.

board_readings = {
    1: {'temp': 0.0, 'hum': 0.0, 'readingId': 0},
    2: {'temp': 0.0, 'hum': 0.0, 'readingId': 0}
}

Mise à jour de l’affichage

La fonction update_display () obtient les données reçues de chaque carte, qui est déjà stockée dans la variable Board_Readings, et l’affiche sur l’écran OLED.

def update_display():
    try:
        display.fill(0)
        # Board 1 data
        display.text("Board 1:", 0, 0)
        display.text(f"Temp: {board_readings[1]['temp']:.1f} C", 0, 10)
        display.text(f"Hum: {board_readings[1]['hum']:.1f} %", 0, 20)
        # Board 2 data
        display.text("Board 2:", 0, 32)
        display.text(f"Temp: {board_readings[2]['temp']:.1f} C", 0, 42)
        display.text(f"Hum: {board_readings[2]['hum']:.1f} %", 0, 52)
        display.show()
        print("Display updated")
    except Exception as err:
        print("Error updating display:", err)

Pour en savoir plus sur l’interfaçage de l’OLED avec l’ESP32 à l’aide de MicropyThon, vous pouvez vérifier notre tutoriel: Micropython: Affichage OLED avec ESP32 et ESP8266.

Recevoir des messages ESP-now

Nous créons une fonction asynchrone pour obtenir et gérer les messages reçus via ESP-Now.

async def receive_messages(e):
    print("Listening for ESP-NOW messages...")
    while True:
        try:
            async for mac, msg in e:

Les données sont disponibles au format JSON. Nous décodons et analysons d’abord le message JSON.

try:
    # Decode and parse JSON message
    json_str = msg.decode('utf-8')
    data = ujson.loads(json_str)

Nous extrassons chacun des paramètres en variables individuelles.

# Extract parameters into individual variables
board_id = data['id']
temperature = data['temp']
humidity = data['hum']
reading_id = data['readingId']

Enfin, nous enregistrons les données au bon endroit dans notre dictionnaire Board_Readings en fonction de l’ID du conseil d’administration.

# Store in board_readings dictionary
if board_id in (1, 2):
    board_readings[board_id] = {
    'temp': temperature,
    'hum': humidity,
    'readingId': reading_id
}

Après cela, la variable Board_Readings est mise à jour, nous pouvons donc maintenant appeler la fonction Updated_Display () pour mettre à jour l’affichage avec les informations actuelles.

update_display()

Enfin, nous imprimons les données reçues dans le shell Micropython.

# Display on serial monitor
print(f"\nReceived from {mac.hex()}:")
print(f"  Board ID: {board_id}")
print(f"  Temperature: {temperature} C")
print(f"  Humidity: {humidity} %")
print(f"  Reading ID: {reading_id}")

Exécuter les tâches asynchrones

Ensuite, la fonction suivante exécute la tâche reçoit_messages pour envoyer des données via ESP-Now.

async def main(e):
    await receive_messages(e)

Enfin, nous commençons le programme asynchrone.

# Run the async program
try:
    asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
    print("Stopping sender...")
    e.active(False)
    sta.active(False)

Démonstration

Téléchargez et / ou exécutez le code précédent sur votre carte récepteur. Si vous êtes connecté au terminal Thonny IDE, il imprimera les données reçues.

Dans le même temps, il imprimera les lectures reçues de chacune des cartes de l’expéditeur sur l’écran OLED.

Récepteur ESP-Now affichant les données de plusieurs planches ESP32 sur l'écran OLED

Emballage

Dans ce tutoriel, vous avez appris à envoyer des données de plusieurs expéditeurs ESP32 à une carte réceptrice ESP32 via ESP-Now à l’aide de Micropython. Nous vous avons également montré comment envoyer plusieurs variables de données au format JSON et comment les analyser et les gérer pour obtenir les informations individuelles que nous voulons.

Le projet de ce didacticiel peut être facilement adapté pour ajouter plus de cartes d’expéditeur et envoyer toutes les autres informations que vous souhaitez.

Nous espérons que vous avez trouvé ce guide utile. Nous avons plus de guides ESP-Now avec Micropython que vous pouvez trouver utile:

Enfin, si vous voulez en savoir plus sur Micropython, n’oubliez pas de consulter nos ressources:

Plongez dans l’histoire de Raspberry Pi avec cette vidéo :

YouTube video