Wie ich ein intelligentes Aquarium gemacht habe (Backend)

Bild



Prolog



Als Programmierer in einem der großen und erfolgreichen Unternehmen in Moskau habe ich nie aufgehört, meine Programmierkenntnisse zu verbessern, und verschiedene Kurse auf der Udemy-Plattform besucht.

Natürlich war es langweilig, nur den Kurs zu sehen und alles zu wiederholen, nachdem der Autor es getan hatte, und es gab Momente, die ich aufgrund meiner damaligen Inkompetenz nicht verstand. Sie müssen Ihre Projekte basierend auf dem erstellen, was der Autor des Kurses gibt - ich dachte, und natürlich hatte ich Recht. Nur echte Schwierigkeiten und ihre Lösung geben Ihnen unschätzbare Erfahrungen, dies ist echtes Lernen.



web , . Full-stack , , , . JavaScript Express, Appolo GraphQL ( , - REST ), Apollo GQL Vue.



, , , , . , , , " ", "The Facebook", "Instagram" .. , , .



, , , ? ...



, 15 3 . , " , , , ". . "" — , , , .



, . - , . - . .



, " " — 20-50 , "" . , (, , , ), , RGB . , . . . . . . , , .



Arduino. , , .





NodeMCU — "" WI-FI (ESP 8266), , Arduino nano wi-fi , .. NodeMCU, .





Arduino IDE ( Windows 10, ), WI-FI. , , . , , Lua( , Micro python, )



, , , C++ Arduino JS Python(). :



init.lua



print ( "Waiting ...")
tmr.register (0, 5000, tmr.ALARM_SINGLE, function (t) tmr.unregister (0); print ( "Starting ..."); dofile ( "main.lua") end)
tmr.start (0)


main.lua



--WiFi Settup
wifi.setmode(wifi.STATION)
local cfg={}
cfg.ssid="wifi_point_name"
cfg.pwd="point_pass"
wifi.sta.config(cfg)
cfg = nil
collectgarbage()

=wifi.ap.getip()


WI-FI , , begin end. esp-8266 , . , Micro python, , lua .



esptool



pip install -g esptool

esptool.py --port COM3 --baud 460800 write_flash --flash_size=detect 0 esp8266-20191220-v1.12.bin


IDE EsPy "" .



, , :



  • boot.py — , . , Wi-Fi WebREPL;
  • main.py — , boot.py, .


WebREPLssh raspberri, , EsPy esptool, . — WI-FI — .



boot.py



import network
import time

'''    WiFi '''
wlan_id = "my_point"
wlan_pass = "strong_pass"

wlan = network.WLAN(network.STA_IF)
wlan.active(True)

if wlan.isconnected() == False:
    wlan.connect(wlan_id, wlan_pass)
    while wlan.isconnected() == False:
        time.sleep(3)
        print("Connection Fail...")
print('Device IP:', wlan.ifconfig()[0])


main.py , , asyncio, ( ).



main.py



from app import app_start

app_start()


MicroPyServer ( ), - ( ). .



app.py



from micropyserver import MicroPyServer

import ujson

server = MicroPyServer()

def send(self, **kwargs):
        '''
             
        '''
        server.send(
            ujson.dumps(kwargs),
            content_type="Content-Type: application/json",
            #    CORS 
            extra_headers=["Access-Control-Allow-Origin: *"]
        )
        gc.collect()

def healthcheck():
    send(success=1, healthcheck='green')

server.add_route("/healthcheck", healthcheck)

def app_start():
    server.start()


() , , . .



  • main.py
  • boot.py
  • app.py
  • micropyserver.py


, http://192.168.1.70/ . , http://192.168.1.70/healthcheck, — ,





// Json from chrome browser
{
    "success": 1,
    "healthcheck": "green",
}


hardware ( , LED-, LCD- .. ), , , . , , .



app.py



from heater import Heater

# ...  

class HttpHelper:
    """
          GET 
    """
    def __init__(self):
        pass

    def parse(self, request):
        '''
               
        '''
        lines = request.split("\r\n")

        result = {
            'lines': lines,
            'method': ure.search("^([A-Z]+)", lines[0]).group(1),
            'path': ure.search(
                "^[A-Z]+\\s+(/[-a-zA-Z0-9_.]*)", lines[0]
            ).group(1),
        }

        param_split = ure.sub("\/([a-z]+_?)+?\?", '', lines[0].split(" ")[1])
        result['params'] = self.get_params(param_split.split("&"))

        return result

    def get_params(self, params_as_array):
        '''
               query 
        '''
        params = {}

        for element in params_as_array:
            splited = element.split("=")

            params[splited[0]] = splited[1]

        return params

    def send(self, **kwargs):
        '''
             
        '''
        server.send(
            ujson.dumps(kwargs),
            content_type="Content-Type: application/json",
            extra_headers=["Access-Control-Allow-Origin: *"]
        )
        gc.collect()

# ...  

def get_water_temperature_C(request):
    '''
           
    '''
    try:
        water_heater.get_water_tmp_C()

        http_helper.send(
            success=1,
            water_temperature_c=water_heater.water_tmp
        )

    except Exception as e:
        print(e)
        http_helper.send(success=0, error=e)

# ...  

server.add_route("/healthcheck", healthcheck)
server.add_route("/get_water_tmp", get_water_temperature_C)

# ...  


heater.py



import machine
import onewire
import ds18x20

HEATER_PIN = 2

class Heater:
    def __init__(self):
        self.water_tmp = 0.00
        self.heater_pin = machine.Pin(HEATER_PIN)
        self.sensor = ds18x20.DS18X20(onewire.OneWire(self.heater_pin))

    def get_water_tmp_C(self):
        rows = self.sensor.scan()

        self.sensor.convert_temp()

        for rom in rows:
            self.water_tmp = self.sensor.read_temp(rom)

        return self.water_tmp


LED . , . 5 :





  • 2
  • 1
  • 1
  • 1


32 , (24V) . , , " ", , , . , . -, , 100% . , .



, API.



LCD , IP , ip . frontend , .







IOT , . MicroPyServer, . , , , try except, .



micropyserver.py



# ...  

def _get_request(self):
        """ Return request body """
        #   4096 ,     MemoryError
        return str(self._connect.recv(1024), "utf8")


:



  • Automatisieren Sie die Arbeit mit Beleuchtung, so dass er selbst die Beleuchtung im Raum überprüft + die Wirkung von Sonnenuntergang und Morgengrauen und bei Nacht - Mondlicht erzeugt.
  • Wasseranalyse automatisieren
  • Automatisieren Sie die Warmwasserbereitung
  • Durchflusspumpe automatisieren
  • Auto Feeder
  • Autofill



All Articles