2017年8月22日 星期二

uPyCraft-micropython教程--讀取Mifare RFID RC522


參考資料
https://github.com/wendlers/micropython-mfrc522


--------------------------讀取 RC522- RFID Card ----------------------------
#read_RFID.py
#程式1
#I used the following pins for my setup:
# Signal GPIO ESP8266 GPIO WiPy Note
# sck      0            "GP14"
# mosi    2            "GP16"
# miso    4            "GP15"
# rst         5            "GP22"
# cs        14            "GP14   " Labeled SDA or  NSS  on most RFID-RC522 boards

import read
read.do_read()

#======================================================
#read.py
# 程式2
import mfrc522
from os import uname


def do_read():

if uname()[0] == 'WiPy':
rdr = mfrc522.MFRC522("GP14", "GP16", "GP15", "GP22", "GP17")
elif uname()[0] == 'esp8266':
rdr = mfrc522.MFRC522(0, 2, 4, 5, 14)
else:
raise RuntimeError("Unsupported platform")

print("")
print("Place card before reader to read from address 0x08")
print("")

try:
while True:

(stat, tag_type) = rdr.request(rdr.REQIDL)

if stat == rdr.OK:

(stat, raw_uid) = rdr.anticoll()

if stat == rdr.OK:
print("New card detected")
print("  - tag type: 0x%02x" % tag_type)
print("  - uid : 0x%02x%02x%02x%02x" % (raw_uid[0], raw_uid[1], raw_uid[2], raw_uid[3]))
print("")

if rdr.select_tag(raw_uid) == rdr.OK:

key = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]

if rdr.auth(rdr.AUTHENT1A, 8, key, raw_uid) == rdr.OK:
print("Address 8 data: %s" % rdr.read(8))
rdr.stop_crypto1()
else:
print("Authentication error")
else:
print("Failed to select tag")

except KeyboardInterrupt:
print("Bye")



#======================================================
# 驅動程式
#mfrc522.py
from machine import Pin, SPI
from os import uname


class MFRC522:

OK = 0
NOTAGERR = 1
ERR = 2

REQIDL = 0x26
REQALL = 0x52
AUTHENT1A = 0x60
AUTHENT1B = 0x61

def __init__(self, sck, mosi, miso, rst, cs):

self.sck = Pin(sck, Pin.OUT)
self.mosi = Pin(mosi, Pin.OUT)
self.miso = Pin(miso)
self.rst = Pin(rst, Pin.OUT)
self.cs = Pin(cs, Pin.OUT)

self.rst.value(0)
self.cs.value(1)

board = uname()[0]

if board == 'WiPy' or board == 'LoPy' or board == 'FiPy':
self.spi = SPI(0)
self.spi.init(SPI.MASTER, baudrate=1000000, pins=(self.sck, self.mosi, self.miso))
elif board == 'esp8266':
self.spi = SPI(baudrate=100000, polarity=0, phase=0, sck=self.sck, mosi=self.mosi, miso=self.miso)
self.spi.init()
else:
raise RuntimeError("Unsupported platform")

self.rst.value(1)
self.init()

def _wreg(self, reg, val):

self.cs.value(0)
self.spi.write(b'%c' % int(0xff & ((reg << 1) & 0x7e)))
self.spi.write(b'%c' % int(0xff & val))
self.cs.value(1)

def _rreg(self, reg):

self.cs.value(0)
self.spi.write(b'%c' % int(0xff & (((reg << 1) & 0x7e) | 0x80)))
val = self.spi.read(1)
self.cs.value(1)

return val[0]

def _sflags(self, reg, mask):
self._wreg(reg, self._rreg(reg) | mask)

def _cflags(self, reg, mask):
self._wreg(reg, self._rreg(reg) & (~mask))

def _tocard(self, cmd, send):

recv = []
bits = irq_en = wait_irq = n = 0
stat = self.ERR

if cmd == 0x0E:
irq_en = 0x12
wait_irq = 0x10
elif cmd == 0x0C:
irq_en = 0x77
wait_irq = 0x30

self._wreg(0x02, irq_en | 0x80)
self._cflags(0x04, 0x80)
self._sflags(0x0A, 0x80)
self._wreg(0x01, 0x00)

for c in send:
self._wreg(0x09, c)
self._wreg(0x01, cmd)

if cmd == 0x0C:
self._sflags(0x0D, 0x80)

i = 2000
while True:
n = self._rreg(0x04)
i -= 1
if ~((i != 0) and ~(n & 0x01) and ~(n & wait_irq)):
break

self._cflags(0x0D, 0x80)

if i:
if (self._rreg(0x06) & 0x1B) == 0x00:
stat = self.OK

if n & irq_en & 0x01:
stat = self.NOTAGERR
elif cmd == 0x0C:
n = self._rreg(0x0A)
lbits = self._rreg(0x0C) & 0x07
if lbits != 0:
bits = (n - 1) * 8 + lbits
else:
bits = n * 8

if n == 0:
n = 1
elif n > 16:
n = 16

for _ in range(n):
recv.append(self._rreg(0x09))
else:
stat = self.ERR

return stat, recv, bits

def _crc(self, data):

self._cflags(0x05, 0x04)
self._sflags(0x0A, 0x80)

for c in data:
self._wreg(0x09, c)

self._wreg(0x01, 0x03)

i = 0xFF
while True:
n = self._rreg(0x05)
i -= 1
if not ((i != 0) and not (n & 0x04)):
break

return [self._rreg(0x22), self._rreg(0x21)]

def init(self):

self.reset()
self._wreg(0x2A, 0x8D)
self._wreg(0x2B, 0x3E)
self._wreg(0x2D, 30)
self._wreg(0x2C, 0)
self._wreg(0x15, 0x40)
self._wreg(0x11, 0x3D)
self.antenna_on()

def reset(self):
self._wreg(0x01, 0x0F)

def antenna_on(self, on=True):

if on and ~(self._rreg(0x14) & 0x03):
self._sflags(0x14, 0x03)
else:
self._cflags(0x14, 0x03)

def request(self, mode):

self._wreg(0x0D, 0x07)
(stat, recv, bits) = self._tocard(0x0C, [mode])

if (stat != self.OK) | (bits != 0x10):
stat = self.ERR

return stat, bits

def anticoll(self):

ser_chk = 0
ser = [0x93, 0x20]

self._wreg(0x0D, 0x00)
(stat, recv, bits) = self._tocard(0x0C, ser)

if stat == self.OK:
if len(recv) == 5:
for i in range(4):
ser_chk = ser_chk ^ recv[i]
if ser_chk != recv[4]:
stat = self.ERR
else:
stat = self.ERR

return stat, recv

def select_tag(self, ser):

buf = [0x93, 0x70] + ser[:5]
buf += self._crc(buf)
(stat, recv, bits) = self._tocard(0x0C, buf)
return self.OK if (stat == self.OK) and (bits == 0x18) else self.ERR

def auth(self, mode, addr, sect, ser):
return self._tocard(0x0E, [mode, addr] + sect + ser[:4])[0]

def stop_crypto1(self):
self._cflags(0x08, 0x08)

def read(self, addr):

data = [0x30, addr]
data += self._crc(data)
(stat, recv, _) = self._tocard(0x0C, data)
return recv if stat == self.OK else None

def write(self, addr, data):

buf = [0xA0, addr]
buf += self._crc(buf)
(stat, recv, bits) = self._tocard(0x0C, buf)

if not (stat == self.OK) or not (bits == 4) or not ((recv[0] & 0x0F) == 0x0A):
stat = self.ERR
else:
buf = []
for i in range(16):
buf.append(data[i])
buf += self._crc(buf)
(stat, recv, bits) = self._tocard(0x0C, buf)
if not (stat == self.OK) or not (bits == 4) or not ((recv[0] & 0x0F) == 0x0A):
stat = self.ERR

return stat

#======================================================
# 寫入程式1 
#write_RFID.py
#I used the following pins for my setup:
# Signal GPIO ESP8266 GPIO WiPy Note
# sck      0            "GP14"
# mosi    2            "GP16"
# miso    4            "GP15"
# rst      5            "GP22"
# cs      14          "GP14" Labeled SDA on most RFID-RC522 boards

import write
write.do_write()


#======================================================
# 寫入程式2 
#write.py
import mfrc522
from os import uname


def do_write():

if uname()[0] == 'WiPy':
rdr = mfrc522.MFRC522("GP14", "GP16", "GP15", "GP22", "GP17")
elif uname()[0] == 'esp8266':
rdr = mfrc522.MFRC522(0, 2, 4, 5, 14)
else:
raise RuntimeError("Unsupported platform")

print("")
print("Place card before reader to write address 0x08")
print("")

try:
while True:

(stat, tag_type) = rdr.request(rdr.REQIDL)

if stat == rdr.OK:

(stat, raw_uid) = rdr.anticoll()

if stat == rdr.OK:
print("New card detected")
print("  - tag type: 0x%02x" % tag_type)
print("  - uid : 0x%02x%02x%02x%02x" % (raw_uid[0], raw_uid[1], raw_uid[2], raw_uid[3]))
print("")

if rdr.select_tag(raw_uid) == rdr.OK:

key = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]

if rdr.auth(rdr.AUTHENT1A, 8, key, raw_uid) == rdr.OK:
stat = rdr.write(8, b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f")
rdr.stop_crypto1()
if stat == rdr.OK:
print("Data written to card")
else:
print("Failed to write data to card")
else:
print("Authentication error")
else:
print("Failed to select tag")

except KeyboardInterrupt:
print("Bye")



#======================================================
# 程式3
# 驅動程式 ---重複



#======================================================
# 執行結果
Place card before reader to read from address 0x08
New card detected
  - tag type: 0x10
  - uid    : 0x7021ed10

Address 8 data: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
New card detected
  - tag type: 0x10
  - uid    : 0x4451bb96


Ready to download this file,please wait!
..........
download ok
exec(open('write.py').read(),globals())
>>>
>>>
Ready to download this file,please wait!
...
download ok
exec(open('write_RFID.py').read(),globals())

Place card before reader to write address 0x08

New card detected
  - tag type: 0x10
  - uid    : 0x4451bb96

Data written to card
New card detected
  - tag type: 0x10
  - uid    : 0x4451bb96

Data written to card
New card detected
  - tag type: 0x10
  - uid    : 0x4451bb96

Data written to card
New card detected
  - tag type: 0x10
  - uid    : 0x4451bb96

Data written to card
Bye
>>>
>>>
Ready to download this file,please wait!
.........
download ok
exec(open('read.py').read(),globals())
>>>
>>>
Place card before reader to read from address 0x08

New card detected
  - tag type: 0x10
  - uid    : 0x4451bb96

Address 8 data: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]



2017年8月21日 星期一

uPyCraft-micropython教程--網頁控制板載LED燈

uPyCraft-micropython教程--網頁控制板載LED



參考了一些網路資源js代碼,做了一個網頁控制PWM-LED燈的演示。

實驗準備:
1. uPyCraft
軟體,目前最新的是V0.20版本
2. FireBeettle-ESP32
主機板(在8266上也測試通過了,但是感覺是記憶體原因,快速通信容易掛)
3.
路由器,請將你的電腦和Nodemcu連到同一個網段)











#===============WebLED.py
from machine import Pin,PWM
import network
import os
import time
import socket
import gc

SSID="PTS-2F"
PASSWORD=""
wlan=None
s=None
led=None

def connectWifi(ssid,passwd):
  global wlan
  wlan=network.WLAN(network.STA_IF)
  wlan.active(True)
  wlan.disconnect()
  wlan.connect(ssid,passwd)
  while(wlan.ifconfig()[0]=='0.0.0.0'):
    time.sleep(1)
  return True
def ajaxWebserv():
  # minimal Ajax in Control Webserver
  global s,led
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  s.bind((wlan.ifconfig()[0], 80))
  s.listen(1)
  while True:
    conn, addr = s.accept()
    #print("Got a connection from %s" % str(addr))
    request = conn.recv(1024)
    conn.sendall('HTTP/1.1 200 OK\nConnection: close\nServer: FireBeetle\nContent-Type: text/html\n\n')

    request = str(request)
    ib = request.find('Val=')
    if ib > 0 :
      ie = request.find(' ', ib)
      Val = request[ib+4:ie]
      print("Val =", Val)
      led.duty(int(Val)*100)
      conn.send(Val)
    else:
      with open('webCtrl.htm', 'r') as html:
        conn.sendall(html.read())
    conn.sendall('\r\n')
    conn.close()
    #print("Connection wth %s closed" % str(addr))
try:

  led=PWM(Pin(2),freq=100)
  led.init()
  led.duty(0)
  connectWifi(SSID, PASSWORD)
  ajaxWebserv()
except:
  if (s):
    s.close()
  led.deinit()
  wlan.disconnect()
  wlan.active(False)


#======================webCtrl.htm===================

<html> 
<head> 
<title>ajaxWebCtrl</title> 
  <link rel="icon" href="data:;base64,=">
  <style type="text/css" id="style">
 .dimwrap { border: 2px solid red; height: 150px; width: 600px;}
 .dimSlide { transform: scaleX(2) rotate(0deg); width: 256px;position: relative; top: 80px; left: 90px;}
 .dimVal { width: 90px; height:30;position: relative; top: 20px; left: 20px}
  </style>
<script language="javascript">
function dimSet(evt) {
var me, he;
  var resp, xmlhttp=new XMLHttpRequest();   
me = evt.target;
he = document.getElementById(me.id == "dimVal"? "dimSlide":"dimVal"); // need my other value node
  xmlhttp.onreadystatechange=function() {
    if (xmlhttp.readyState==4 && xmlhttp.status==200){
 resp = xmlhttp.responseText;
      console.log(resp);
      he.value = parseInt(resp);
}
}
  xmlhttp.open("GET","Val=" + me.value,true); xmlhttp.send()
</script> 
</head> 
<body> 
    <h1>PWM LED --- Web Control for FireBeetle</h1>
    <div class="dimwrap" id="dimwrap">
 <input class="dimVal" type="number" id="dimVal" min="0" max="10" step="1" value="0" onclick="dimSet(event)" oninput="dimSet(event)" > 
 <input class= "dimSlide" type="range" id="dimSlide" min="0" max="10" step="1" value="0" onclick="dimSet(event)" oninput="dimSet(event)" >
    </div>
</body>
</html>

MicroPython DHT11測試 4 : Try ....except 與WDT (watch dog Timer)



#測試 4 : Try ....except 與WDT (watch dog Timer)
#=============================================================================
# leds 0 = d3   ,   1 = d10   ,  2 = d4 ,  3 = d9 ,  4 =  d2  ,  5 = d1
#       9 = sdd2 , 10 = sdd3
#     12 = d6    ,  13 = d7   ,  14 = d5 , 15 = d8 , 16 = d0
#=============================================================================

from machine import Pin
from machine import WDT
import dht
import time
import urequests

p0=Pin(4, Pin.IN)
p2=Pin(2, Pin.OUT)
d=dht.DHT11(p0)
host='http://api.thingspeak.com'
api_key='U52Q30TDQDA0G6IW'
wdt=WDT()                            #無參數就可以順利建立 WDT 物件

def LED_blink(s):
    wdt
    for i in range(1,10*s):
        p2.value(1)
        time.sleep_ms(50)
        p2.value(0)
        time.sleep_ms(50)

while True:
    try:
        d.measure()                
        t=d.temperature()      
        f=round(t * 9/5 + 32)
        h=d.humidity()
        url='%s/update?api_key=%s&field1=%s&field2=%s&field3=%s' %(host, api_key, t, f, h)
        print('Temperature=', t, 'C', '/', f, 'F', 'Humidity=', h, '%')
        r=urequests.get(url)
        print('response=', r.text)
    except:
        print('urequests.get() exception occurred!')
    LED_blink(16)

Micropython DHT11測試 3 : 等待 ThingSpeak 更新週期時閃爍 LED


                       手機安裝 ThingSpeak View APP






https://www.facebook.com/alexpts.wu/videos/pcb.10207569592676348/10207569594476393/?type=3&theater

DHT11結果的畫面


#測試 3 : 等待 ThingSpeak 更新週期時閃爍 LED
#===================================================================
# leds 0 = d3   ,   1 = d10   ,  2 = d4 ,  3 = d9 ,  4 =  d2  ,  5 = d1
#       9 = sdd2 , 10 = sdd3
#     12 = d6    ,  13 = d7   ,  14 = d5 , 15 = d8 , 16 = d0
#====================================================================

from machine import Pin
import dht
import time
import urequests

p0=Pin(4, Pin.IN)
p2=Pin(2, Pin.OUT)  
d=dht.DHT11(p0)
host='http://api.thingspeak.com'
api_key='U52Q30TDQDA0G6IW'

def LED_blink(s):  
    for i in range(1,10*s):  
        p2.value(1)
        time.sleep_ms(50)      
        p2.value(0)
        time.sleep_ms(50)  

while True:
    d.measure()              
    t=d.temperature()    
    f=round(t * 9/5 + 32)
    h=d.humidity()
    url='%s/update?api_key=%s&field1=%s&field2=%s&field3=%s' %(host, api_key, t, f, h)
    print('Temperature=', t, 'C', '/', f, 'F', 'Humidity=', h, '%')
    try:
        r=urequests.get(url)
        print('response=', r.text)
    except:
        print('urequests.get() exception occurred!')
    LED_blink(16)


#==============dth.py======================
# DHT11/DHT22 driver for MicroPython on ESP8266
# MIT license; Copyright (c) 2016 Damien P. George

import esp

class DHTBase:
    def __init__(self, pin):
        self.pin = pin
        self.buf = bytearray(5)

    def measure(self):
        buf = self.buf
        esp.dht_readinto(self.pin, buf)
        if (buf[0] + buf[1] + buf[2] + buf[3]) & 0xff != buf[4]:
            raise Exception("checksum error")

class DHT11(DHTBase):
    def humidity(self):
        return self.buf[0]

    def temperature(self):
        return self.buf[2]

class DHT22(DHTBase):
    def humidity(self):
        return (self.buf[0] << 8 | self.buf[1]) * 0.1

    def temperature(self):
        t = ((self.buf[2] & 0x7f) << 8 | self.buf[3]) * 0.1
        if self.buf[2] & 0x80:
            t = -t
        return t

#======================urequests.py====================
import usocket
import ujson
try:
    import ussl
    SUPPORT_SSL = True
except ImportError:
    ussl = None
    SUPPORT_SSL = False

SUPPORT_TIMEOUT = hasattr(usocket.socket, 'settimeout')
CONTENT_TYPE_JSON = 'application/json'


class Response(object):
    def __init__(self, status_code, raw):
        self.status_code = status_code
        self.raw = raw
        self._content = False
        self.encoding = 'utf-8'

    @property
    def content(self):
        if self._content is False:
            self._content = self.raw.read()
            self.raw.close()
            self.raw = None

        return self._content

    @property
    def text(self):
        content = self.content

        return str(content, self.encoding) if content else ''

    def close(self):
        if self.raw is not None:
            self._content = None
            self.raw.close()
            self.raw = None

    def json(self):
        return ujson.loads(self.text)

    def raise_for_status(self):
        if 400 <= self.status_code < 500:
            raise OSError('Client error: %s' % self.status_code)
        if 500 <= self.status_code < 600:
            raise OSError('Server error: %s' % self.status_code)


# Adapted from upip
def request(method, url, json=None, timeout=None, headers=None):
    urlparts = url.split('/', 3)
    proto = urlparts[0]
    host = urlparts[2]
    urlpath = '' if len(urlparts) < 4 else urlparts[3]

    if proto == 'http:':
        port = 80
    elif proto == 'https:':
        port = 443
    else:
        raise OSError('Unsupported protocol: %s' % proto[:-1])

    if ':' in host:
        host, port = host.split(':')
        port = int(port)

    if json is not None:
        content = ujson.dumps(json)
        content_type = CONTENT_TYPE_JSON
    else:
        content = None

    ai = usocket.getaddrinfo(host, port)
    addr = ai[0][4]

    sock = usocket.socket()

    if timeout is not None:
        assert SUPPORT_TIMEOUT, 'Socket does not support timeout'
        sock.settimeout(timeout)

    sock.connect(addr)

    if proto == 'https:':
        assert SUPPORT_SSL, 'HTTPS not supported: could not find ussl'
        sock = ussl.wrap_socket(sock)

    sock.write('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host))

    if headers is not None:
        for header in headers.items():
            sock.write('%s: %s\r\n' % header)

    if content is not None:
        sock.write('content-length: %s\r\n' % len(content))
        sock.write('content-type: %s\r\n' % content_type)
        sock.write('\r\n')
        sock.write(content)
    else:
        sock.write('\r\n')

    l = sock.readline()
    protover, status, msg = l.split(None, 2)

    # Skip headers
    while sock.readline() != b'\r\n':
        pass

    return Response(int(status), sock)


def get(url, **kwargs):
    return request('GET', url, **kwargs)


def post(url, **kwargs):
    return request('POST', url, **kwargs)



import usocket

class Response:

    def __init__(self, f):
        self.raw = f
        self.encoding = "utf-8"
        self._cached = None

    def close(self):
        if self.raw:
            self.raw.close()
            self.raw = None
        self._cached = None

    @property
    def content(self):
        if self._cached is None:
            self._cached = self.raw.read()
            self.raw.close()
            self.raw = None
        return self._cached

    @property
    def text(self):
        return str(self.content, self.encoding)

    def json(self):
        import ujson
        return ujson.loads(self.content)


def request(method, url, data=None, json=None, headers={}, stream=None):
    try:
        proto, dummy, host, path = url.split("/", 3)
    except ValueError:
        proto, dummy, host = url.split("/", 2)
        path = ""
    if proto == "http:":
        port = 80
    elif proto == "https:":
        import ussl
        port = 443
    else:
        raise ValueError("Unsupported protocol: " + proto)

    if ":" in host:
        host, port = host.split(":", 1)
        port = int(port)

    ai = usocket.getaddrinfo(host, port)
    addr = ai[0][-1]
    s = usocket.socket()
    s.connect(addr)
    if proto == "https:":
        s = ussl.wrap_socket(s, server_hostname=host)
    s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
    if not "Host" in headers:
        s.write(b"Host: %s\r\n" % host)
    # Iterate over keys to avoid tuple alloc
    for k in headers:
        s.write(k)
        s.write(b": ")
        s.write(headers[k])
        s.write(b"\r\n")
    if json is not None:
        assert data is None
        import ujson
        data = ujson.dumps(json)
    if data:
        s.write(b"Content-Length: %d\r\n" % len(data))
    s.write(b"\r\n")
    if data:
        s.write(data)

    l = s.readline()
    protover, status, msg = l.split(None, 2)
    status = int(status)
    #print(protover, status, msg)
    while True:
        l = s.readline()
        if not l or l == b"\r\n":
            break
        #print(l)
        if l.startswith(b"Transfer-Encoding:"):
            if b"chunked" in l:
                raise ValueError("Unsupported " + l)
        elif l.startswith(b"Location:") and not 200 <= status <= 299:
            raise NotImplementedError("Redirects not yet supported")

    resp = Response(s)
    resp.status_code = status
    resp.reason = msg.rstrip()
    return resp


def head(url, **kw):
    return request("HEAD", url, **kw)

def get(url, **kw):
    return request("GET", url, **kw)

def post(url, **kw):
    return request("POST", url, **kw)

def put(url, **kw):
    return request("PUT", url, **kw)

def patch(url, **kw):
    return request("PATCH", url, **kw)

def delete(url, **kw):
    return request("DELETE", url, **kw)


#========================http_client.py================
import usocket
import ujson
try:
    import ussl
    SUPPORT_SSL = True
except ImportError:
    ussl = None
    SUPPORT_SSL = False

SUPPORT_TIMEOUT = hasattr(usocket.socket, 'settimeout')
CONTENT_TYPE_JSON = 'application/json'


class Response(object):
    def __init__(self, status_code, raw):
        self.status_code = status_code
        self.raw = raw
        self._content = False
        self.encoding = 'utf-8'

    @property
    def content(self):
        if self._content is False:
            self._content = self.raw.read()
            self.raw.close()
            self.raw = None

        return self._content

    @property
    def text(self):
        content = self.content

        return str(content, self.encoding) if content else ''

    def close(self):
        if self.raw is not None:
            self._content = None
            self.raw.close()
            self.raw = None

    def json(self):
        return ujson.loads(self.text)

    def raise_for_status(self):
        if 400 <= self.status_code < 500:
            raise OSError('Client error: %s' % self.status_code)
        if 500 <= self.status_code < 600:
            raise OSError('Server error: %s' % self.status_code)


# Adapted from upip
def request(method, url, json=None, timeout=None, headers=None):
    urlparts = url.split('/', 3)
    proto = urlparts[0]
    host = urlparts[2]
    urlpath = '' if len(urlparts) < 4 else urlparts[3]

    if proto == 'http:':
        port = 80
    elif proto == 'https:':
        port = 443
    else:
        raise OSError('Unsupported protocol: %s' % proto[:-1])

    if ':' in host:
        host, port = host.split(':')
        port = int(port)

    if json is not None:
        content = ujson.dumps(json)
        content_type = CONTENT_TYPE_JSON
    else:
        content = None

    ai = usocket.getaddrinfo(host, port)
    addr = ai[0][4]

    sock = usocket.socket()

    if timeout is not None:
        assert SUPPORT_TIMEOUT, 'Socket does not support timeout'
        sock.settimeout(timeout)

    sock.connect(addr)

    if proto == 'https:':
        assert SUPPORT_SSL, 'HTTPS not supported: could not find ussl'
        sock = ussl.wrap_socket(sock)

    sock.write('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host))

    if headers is not None:
        for header in headers.items():
            sock.write('%s: %s\r\n' % header)

    if content is not None:
        sock.write('content-length: %s\r\n' % len(content))
        sock.write('content-type: %s\r\n' % content_type)
        sock.write('\r\n')
        sock.write(content)
    else:
        sock.write('\r\n')

    l = sock.readline()
    protover, status, msg = l.split(None, 2)

    # Skip headers
    while sock.readline() != b'\r\n':
        pass

    return Response(int(status), sock)


def get(url, **kwargs):
    return request('GET', url, **kwargs)


def post(url, **kwargs):
    return request('POST', url, **kwargs)

Micropython DHT11測試 2 : 將溫濕度記錄在 ThingSpeak 網站




#================DHT11-2-main.py================================
#測試 2 : 將溫濕度記錄在 ThingSpeak 網站
#=============================================================================
# leds 0 = d3   ,   1 = d10   ,  2 = d4 ,  3 = d9 ,  4 =  d2  ,  5 = d1
#       9 = sdd2 , 10 = sdd3
#     12 = d6    ,  13 = d7   ,  14 = d5 , 15 = d8 , 16 = d0
#=============================================================================
#main.py
from machine import Pin
import dht  
import time
import urequests

p0=Pin(4, Pin.IN)
d=dht.DHT11(p0)
host='http://api.thingspeak.com'      
api_key='U52Q30TDQDA0G6IW'  

while True:
    d.measure()                
    t=d.temperature()      
    f=round(t * 9/5 + 32)
    h=d.humidity()
    url='%s/update?api_key=%s&field1=%s&field2=%s&field3=%s' %(host, api_key, t, f, h)  
    print('Temperature=', t, 'C', '/', f, 'F', 'Humidity=', h, '%')
    #print('url=', url)  
    r=urequests.get(url)  
    print('response=', r.text)
    time.sleep(16)  


#============dht.py======================
# DHT11/DHT22 driver for MicroPython on ESP8266
# MIT license; Copyright (c) 2016 Damien P. George

import esp

class DHTBase:
    def __init__(self, pin):
        self.pin = pin
        self.buf = bytearray(5)

    def measure(self):
        buf = self.buf
        esp.dht_readinto(self.pin, buf)
        if (buf[0] + buf[1] + buf[2] + buf[3]) & 0xff != buf[4]:
            raise Exception("checksum error")

class DHT11(DHTBase):
    def humidity(self):
        return self.buf[0]

    def temperature(self):
        return self.buf[2]

class DHT22(DHTBase):
    def humidity(self):
        return (self.buf[0] << 8 | self.buf[1]) * 0.1

    def temperature(self):
        t = ((self.buf[2] & 0x7f) << 8 | self.buf[3]) * 0.1
        if self.buf[2] & 0x80:
            t = -t
        return t



#====================urequests.py========================
import usocket

class Response:

    def __init__(self, f):
        self.raw = f
        self.encoding = "utf-8"
        self._cached = None

    def close(self):
        if self.raw:
            self.raw.close()
            self.raw = None
        self._cached = None

    @property
    def content(self):
        if self._cached is None:
            self._cached = self.raw.read()
            self.raw.close()
            self.raw = None
        return self._cached

    @property
    def text(self):
        return str(self.content, self.encoding)

    def json(self):
        import ujson
        return ujson.loads(self.content)


def request(method, url, data=None, json=None, headers={}, stream=None):
    try:
        proto, dummy, host, path = url.split("/", 3)
    except ValueError:
        proto, dummy, host = url.split("/", 2)
        path = ""
    if proto == "http:":
        port = 80
    elif proto == "https:":
        import ussl
        port = 443
    else:
        raise ValueError("Unsupported protocol: " + proto)

    if ":" in host:
        host, port = host.split(":", 1)
        port = int(port)

    ai = usocket.getaddrinfo(host, port)
    addr = ai[0][-1]
    s = usocket.socket()
    s.connect(addr)
    if proto == "https:":
        s = ussl.wrap_socket(s, server_hostname=host)
    s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
    if not "Host" in headers:
        s.write(b"Host: %s\r\n" % host)
    # Iterate over keys to avoid tuple alloc
    for k in headers:
        s.write(k)
        s.write(b": ")
        s.write(headers[k])
        s.write(b"\r\n")
    if json is not None:
        assert data is None
        import ujson
        data = ujson.dumps(json)
    if data:
        s.write(b"Content-Length: %d\r\n" % len(data))
    s.write(b"\r\n")
    if data:
        s.write(data)

    l = s.readline()
    protover, status, msg = l.split(None, 2)
    status = int(status)
    #print(protover, status, msg)
    while True:
        l = s.readline()
        if not l or l == b"\r\n":
            break
        #print(l)
        if l.startswith(b"Transfer-Encoding:"):
            if b"chunked" in l:
                raise ValueError("Unsupported " + l)
        elif l.startswith(b"Location:") and not 200 <= status <= 299:
            raise NotImplementedError("Redirects not yet supported")

    resp = Response(s)
    resp.status_code = status
    resp.reason = msg.rstrip()
    return resp


def head(url, **kw):
    return request("HEAD", url, **kw)

def get(url, **kw):
    return request("GET", url, **kw)

def post(url, **kw):
    return request("POST", url, **kw)

def put(url, **kw):
    return request("PUT", url, **kw)

def patch(url, **kw):
    return request("PATCH", url, **kw)

def delete(url, **kw):
    return request("DELETE", url, **kw)

microPython DHT11測試 1 : 連續測量溫溼度



#===============DHT11-1-main.py=================================
#測試 1 : 連續測量溫溼度
#=============================================================
# leds 0 = d3   ,   1 = d10   ,  2 = d4 ,  3 = d9 ,  4 =  d2  ,  5 = d1
#       9 = sdd2 , 10 = sdd3
#     12 = d6    ,  13 = d7   ,  14 = d5 , 15 = d8 , 16 = d0
#=============================================================
from machine import Pin
import dht    
import time

p0=Pin(4, Pin.IN)
d=dht.DHT11(p0)        #建立 DHT11 物件

while True:
    d.measure()                  #重新測量溫溼度
    t=d.temperature()        #讀取攝氏溫度
    h=d.humidity()             #讀取相對溼度
    print('Temperature=', t, 'C', 'Humidity=', h, '%')
    time.sleep(2)                  #暫停 2 秒



#=============dht.py=====================
# DHT11/DHT22 driver for MicroPython on ESP8266
# MIT license; Copyright (c) 2016 Damien P. George

import esp

class DHTBase:
    def __init__(self, pin):
        self.pin = pin
        self.buf = bytearray(5)

    def measure(self):
        buf = self.buf
        esp.dht_readinto(self.pin, buf)
        if (buf[0] + buf[1] + buf[2] + buf[3]) & 0xff != buf[4]:
            raise Exception("checksum error")

class DHT11(DHTBase):
    def humidity(self):
        return self.buf[0]

    def temperature(self):
        return self.buf[2]

class DHT22(DHTBase):
    def humidity(self):
        return (self.buf[0] << 8 | self.buf[1]) * 0.1

    def temperature(self):
        t = ((self.buf[2] & 0x7f) << 8 | self.buf[3]) * 0.1
        if self.buf[2] & 0x80:
            t = -t
        return t

2017年8月20日 星期日

MicroPython 讀取 DHT11 溫度濕度感應器

==參考小狐狸===

使用 uPyCraft V2.5

DHT11 模組有四隻腳, 包含電源 Vcc 與 GND, 數據輸出 Data 以及一隻用不到的腳 (在 I2C 模式才會用到). 電源 Vcc 可吃 3~5V, 因此吃 3.3V 的 ESP8266 可直用 (Data 腳可直連 GPIO 腳), 其接腳定義如下 :



為了使測量數據穩定, 最好在 Vcc 與 Data 腳之間並接一個 10K 歐姆電阻 (棕黑橙); 在 Vcc 與 GND 之間並接一個 0.1uF 電容 (104).

在 MicroPython 上使用 DHT11 很方便, 因為其功能已經被寫進 MicroPython 的內建模組 dht 了, 參考 :
首先匯入 dht 模組與 Pin 模組 (綁定哪一支 GPIO 腳擷取 DHT11 輸出之數據) :

import dht
from machine import Pin

接著就可以呼叫 dht 模組的 DHT 類別建構式來建立 DHT11 物件, 傳入要綁定之 GPIO 腳之 Pin 物件 :

d=dht.DHT11(Pin(0))    #綁定 GPIO 0 為 DHT11 數據輸入腳

不過在讀取溫濕度數據前必須先呼叫 DHT11 物件的 measure() 方法來擷取測量結果, 這樣才能讀取到最新的數據 :

d.measure()    #更新測量結果

這樣就可以讀取溫溼度數據了 :

t=d.temperature()     #傳回攝氏溫度
h=d.humidity()         #傳回相對溼度 (%)

注意, DHT11 的測量頻率是至多每秒一次, 亦即 measure() 方法每秒只能呼叫一次 :

"The DHT11 can be called no more than once per second and the DHT22 once every two seconds for most accurate results."



#===================DHT11-1.py===============
#測試 1 : 連續測量溫溼度
from machine import Pin
import dht    
import time

d2=Pin(4, Pin.IN)
d=dht.DHT11(d2)        #建立 DHT11 物件

while True:
    d.measure()                  #重新測量溫溼度
    t=d.temperature()        #讀取攝氏溫度
    h=d.humidity()             #讀取相對溼度
    print('Temperature=', t, 'C', 'Humidity=', h, '%')
    time.sleep(2)                  #暫停 2 秒
#================dht.py==========================
# DHT11/DHT22 driver for MicroPython on ESP8266
# MIT license; Copyright (c) 2016 Damien P. George

import esp

class DHTBase:
    def __init__(self, pin):
        self.pin = pin
        self.buf = bytearray(5)

    def measure(self):
        buf = self.buf
        esp.dht_readinto(self.pin, buf)
        if (buf[0] + buf[1] + buf[2] + buf[3]) & 0xff != buf[4]:
            raise Exception("checksum error")

class DHT11(DHTBase):
    def humidity(self):
        return self.buf[0]

    def temperature(self):
        return self.buf[2]

class DHT22(DHTBase):
    def humidity(self):
        return (self.buf[0] << 8 | self.buf[1]) * 0.1

    def temperature(self):
        t = ((self.buf[2] & 0x7f) << 8 | self.buf[3]) * 0.1
        if self.buf[2] & 0x80:
            t = -t
        return t

MicroPython讀取DHT11

MicroPython讀取DHT11



前言:
      DHT12
是奧松家的一款數字溫濕度感測器,是我們所熟知的DHT11的升級版。DHT12溫度檢測範圍是-20~60℃,溫度解析度為0.1℃DHT11測量溫度為0~50℃,溫度測量解析度為1℃)。DHT12支援I2C和單匯流排通信,單匯流排模式下相容DHT11,但無法讀取0℃以下溫度,溫度測量解析度為1℃DHT12濕度測量範圍20~~99%,與DHT11無異。DHT12資料處理與DHT11有細微區別,所以自己稍微改了一下MicroPython官方模組以適配DHT12

硬體:
NodeMCU                           *1
DHT12                             *1
麵包板                            *1
杜邦線                            *4

軟體:
uPyCraft v0.25




本例使用的是單匯流排模式。
接線如下:
VDD->3V3
SDA->pin(4) ===> NodeMcu D2
GND->GND
SCL->GND


#==================DHT11.py=====================
#Pin(4) ==  D2
#=================
import DHTsensor
import machine
import time
d = DHTsensor.DHT11(machine.Pin(4))
while True:
  d.measure()
  print("temperature :",d.temperature())
  time.sleep(1)
  print("humidity :",d.humidity())



#==================DHTsencor.py===================

import esp

class DHTBase:
    def __init__(self, pin):
        self.pin = pin
        self.buf = bytearray(5)

    def measure(self):
        buf = self.buf
        esp.dht_readinto(self.pin, buf)
        if (buf[0] + buf[1] + buf[2] + buf[3]) & 0xff != buf[4]:
            raise Exception("checksum error")

class DHT11(DHTBase):
    def humidity(self):
        return self.buf[0]

    def temperature(self):
        return self.buf[2]

class DHT12(DHTBase):
    def humidity(self):
        return self.buf[0] + self.buf[1] * 0.1

    def temperature(self):
        t = self.buf[2] + self.buf[3] * 0.1
        if self.buf[2] & 0x80:
            t = -t
        return t

class DHT22(DHTBase):
    def humidity(self):
        return (self.buf[0] << 8 | self.buf[1]) * 0.1

    def temperature(self):
        t = ((self.buf[2] & 0x7f) << 8 | self.buf[3]) * 0.1
        if self.buf[2] & 0x80:
            t = -t
        return t

2024_09 作業3 以Node-Red 為主

 2024_09 作業3  (以Node-Red 為主  Arduino 可能需要配合修改 ) Arduino 可能需要修改的部分 1)mqtt broker  2) 主題Topic (發行 接收) 3) WIFI ssid , password const char br...