手機安裝 ThingSpeak View APP
https://www.facebook.com/alexpts.wu/videos/pcb.10207569592676348/10207569594476393/?type=3&theater
DHT11結果的畫面
#測試 3 : 等待 ThingSpeak 更新週期時閃爍 LED
#===================================================================
# leds 0 = d3 , 1 = d10 , 2 = d4 , 3 = d9 , 4 = d2 , 5 = d1
# 9 = sdd2 , 10 = sdd3
# 12 = d6 , 13 = d7 , 14 = d5 , 15 = d8 , 16 = d0
#====================================================================
from machine import Pin
import dht
import time
import urequests
p0=Pin(4, Pin.IN)
p2=Pin(2, Pin.OUT)
d=dht.DHT11(p0)
host='http://api.thingspeak.com'
api_key='U52Q30TDQDA0G6IW'
def LED_blink(s):
for i in range(1,10*s):
p2.value(1)
time.sleep_ms(50)
p2.value(0)
time.sleep_ms(50)
while True:
d.measure()
t=d.temperature()
f=round(t * 9/5 + 32)
h=d.humidity()
url='%s/update?api_key=%s&field1=%s&field2=%s&field3=%s' %(host, api_key, t, f, h)
print('Temperature=', t, 'C', '/', f, 'F', 'Humidity=', h, '%')
try:
r=urequests.get(url)
print('response=', r.text)
except:
print('urequests.get() exception occurred!')
LED_blink(16)
#==============dth.py======================
# DHT11/DHT22 driver for MicroPython on ESP8266
# MIT license; Copyright (c) 2016 Damien P. George
import esp
class DHTBase:
def __init__(self, pin):
self.pin = pin
self.buf = bytearray(5)
def measure(self):
buf = self.buf
esp.dht_readinto(self.pin, buf)
if (buf[0] + buf[1] + buf[2] + buf[3]) & 0xff != buf[4]:
raise Exception("checksum error")
class DHT11(DHTBase):
def humidity(self):
return self.buf[0]
def temperature(self):
return self.buf[2]
class DHT22(DHTBase):
def humidity(self):
return (self.buf[0] << 8 | self.buf[1]) * 0.1
def temperature(self):
t = ((self.buf[2] & 0x7f) << 8 | self.buf[3]) * 0.1
if self.buf[2] & 0x80:
t = -t
return t
#======================urequests.py====================
import usocket
import ujson
try:
import ussl
SUPPORT_SSL = True
except ImportError:
ussl = None
SUPPORT_SSL = False
SUPPORT_TIMEOUT = hasattr(usocket.socket, 'settimeout')
CONTENT_TYPE_JSON = 'application/json'
class Response(object):
def __init__(self, status_code, raw):
self.status_code = status_code
self.raw = raw
self._content = False
self.encoding = 'utf-8'
@property
def content(self):
if self._content is False:
self._content = self.raw.read()
self.raw.close()
self.raw = None
return self._content
@property
def text(self):
content = self.content
return str(content, self.encoding) if content else ''
def close(self):
if self.raw is not None:
self._content = None
self.raw.close()
self.raw = None
def json(self):
return ujson.loads(self.text)
def raise_for_status(self):
if 400 <= self.status_code < 500:
raise OSError('Client error: %s' % self.status_code)
if 500 <= self.status_code < 600:
raise OSError('Server error: %s' % self.status_code)
# Adapted from upip
def request(method, url, json=None, timeout=None, headers=None):
urlparts = url.split('/', 3)
proto = urlparts[0]
host = urlparts[2]
urlpath = '' if len(urlparts) < 4 else urlparts[3]
if proto == 'http:':
port = 80
elif proto == 'https:':
port = 443
else:
raise OSError('Unsupported protocol: %s' % proto[:-1])
if ':' in host:
host, port = host.split(':')
port = int(port)
if json is not None:
content = ujson.dumps(json)
content_type = CONTENT_TYPE_JSON
else:
content = None
ai = usocket.getaddrinfo(host, port)
addr = ai[0][4]
sock = usocket.socket()
if timeout is not None:
assert SUPPORT_TIMEOUT, 'Socket does not support timeout'
sock.settimeout(timeout)
sock.connect(addr)
if proto == 'https:':
assert SUPPORT_SSL, 'HTTPS not supported: could not find ussl'
sock = ussl.wrap_socket(sock)
sock.write('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host))
if headers is not None:
for header in headers.items():
sock.write('%s: %s\r\n' % header)
if content is not None:
sock.write('content-length: %s\r\n' % len(content))
sock.write('content-type: %s\r\n' % content_type)
sock.write('\r\n')
sock.write(content)
else:
sock.write('\r\n')
l = sock.readline()
protover, status, msg = l.split(None, 2)
# Skip headers
while sock.readline() != b'\r\n':
pass
return Response(int(status), sock)
def get(url, **kwargs):
return request('GET', url, **kwargs)
def post(url, **kwargs):
return request('POST', url, **kwargs)
import usocket
class Response:
def __init__(self, f):
self.raw = f
self.encoding = "utf-8"
self._cached = None
def close(self):
if self.raw:
self.raw.close()
self.raw = None
self._cached = None
@property
def content(self):
if self._cached is None:
self._cached = self.raw.read()
self.raw.close()
self.raw = None
return self._cached
@property
def text(self):
return str(self.content, self.encoding)
def json(self):
import ujson
return ujson.loads(self.content)
def request(method, url, data=None, json=None, headers={}, stream=None):
try:
proto, dummy, host, path = url.split("/", 3)
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto == "http:":
port = 80
elif proto == "https:":
import ussl
port = 443
else:
raise ValueError("Unsupported protocol: " + proto)
if ":" in host:
host, port = host.split(":", 1)
port = int(port)
ai = usocket.getaddrinfo(host, port)
addr = ai[0][-1]
s = usocket.socket()
s.connect(addr)
if proto == "https:":
s = ussl.wrap_socket(s, server_hostname=host)
s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
if not "Host" in headers:
s.write(b"Host: %s\r\n" % host)
# Iterate over keys to avoid tuple alloc
for k in headers:
s.write(k)
s.write(b": ")
s.write(headers[k])
s.write(b"\r\n")
if json is not None:
assert data is None
import ujson
data = ujson.dumps(json)
if data:
s.write(b"Content-Length: %d\r\n" % len(data))
s.write(b"\r\n")
if data:
s.write(data)
l = s.readline()
protover, status, msg = l.split(None, 2)
status = int(status)
#print(protover, status, msg)
while True:
l = s.readline()
if not l or l == b"\r\n":
break
#print(l)
if l.startswith(b"Transfer-Encoding:"):
if b"chunked" in l:
raise ValueError("Unsupported " + l)
elif l.startswith(b"Location:") and not 200 <= status <= 299:
raise NotImplementedError("Redirects not yet supported")
resp = Response(s)
resp.status_code = status
resp.reason = msg.rstrip()
return resp
def head(url, **kw):
return request("HEAD", url, **kw)
def get(url, **kw):
return request("GET", url, **kw)
def post(url, **kw):
return request("POST", url, **kw)
def put(url, **kw):
return request("PUT", url, **kw)
def patch(url, **kw):
return request("PATCH", url, **kw)
def delete(url, **kw):
return request("DELETE", url, **kw)
#========================http_client.py================
import usocket
import ujson
try:
import ussl
SUPPORT_SSL = True
except ImportError:
ussl = None
SUPPORT_SSL = False
SUPPORT_TIMEOUT = hasattr(usocket.socket, 'settimeout')
CONTENT_TYPE_JSON = 'application/json'
class Response(object):
def __init__(self, status_code, raw):
self.status_code = status_code
self.raw = raw
self._content = False
self.encoding = 'utf-8'
@property
def content(self):
if self._content is False:
self._content = self.raw.read()
self.raw.close()
self.raw = None
return self._content
@property
def text(self):
content = self.content
return str(content, self.encoding) if content else ''
def close(self):
if self.raw is not None:
self._content = None
self.raw.close()
self.raw = None
def json(self):
return ujson.loads(self.text)
def raise_for_status(self):
if 400 <= self.status_code < 500:
raise OSError('Client error: %s' % self.status_code)
if 500 <= self.status_code < 600:
raise OSError('Server error: %s' % self.status_code)
# Adapted from upip
def request(method, url, json=None, timeout=None, headers=None):
urlparts = url.split('/', 3)
proto = urlparts[0]
host = urlparts[2]
urlpath = '' if len(urlparts) < 4 else urlparts[3]
if proto == 'http:':
port = 80
elif proto == 'https:':
port = 443
else:
raise OSError('Unsupported protocol: %s' % proto[:-1])
if ':' in host:
host, port = host.split(':')
port = int(port)
if json is not None:
content = ujson.dumps(json)
content_type = CONTENT_TYPE_JSON
else:
content = None
ai = usocket.getaddrinfo(host, port)
addr = ai[0][4]
sock = usocket.socket()
if timeout is not None:
assert SUPPORT_TIMEOUT, 'Socket does not support timeout'
sock.settimeout(timeout)
sock.connect(addr)
if proto == 'https:':
assert SUPPORT_SSL, 'HTTPS not supported: could not find ussl'
sock = ussl.wrap_socket(sock)
sock.write('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host))
if headers is not None:
for header in headers.items():
sock.write('%s: %s\r\n' % header)
if content is not None:
sock.write('content-length: %s\r\n' % len(content))
sock.write('content-type: %s\r\n' % content_type)
sock.write('\r\n')
sock.write(content)
else:
sock.write('\r\n')
l = sock.readline()
protover, status, msg = l.split(None, 2)
# Skip headers
while sock.readline() != b'\r\n':
pass
return Response(int(status), sock)
def get(url, **kwargs):
return request('GET', url, **kwargs)
def post(url, **kwargs):
return request('POST', url, **kwargs)
沒有留言:
張貼留言