阿里云物联网平台最完全的使用教程

简介:包括内容如下(详细到每一个细节和步骤,如果还不清楚,可以进入阿里云控制台创建工单,请教阿里的工程师) 使用环境:(使用蜂窝网进行过测试,和WiFi直连并无差别,可以直接使用)

阿里云物联网平台最完全的使用教程

一、阿里云账号说明

1、基本功能说明

进入阿里云官网创建主账号

https://www.aliyun.com/?spm=a2c4g.11186623.amxosvpfn.2.15f5293ewZtPYC
创建完成之后,进入控制台并选择Access Key管理,如图所示
阿里云物联网平台最完全的使用教程

创建子账号,并将物联网平台的所有权限给予子账号,以后我们就用子账号进行各类操作,注意保存得到的三元组,这是接入物联网平台的关键之一
阿里云物联网平台最完全的使用教程
阿里云物联网平台最完全的使用教程

2、开通物联网服务

https://www.aliyun.com/product/iot?spm=5176.10695662.J_3717714080.2.1ce83318Gaytdw
选择开通即可,前两个月赠送的免费消息足够用了,选择进入管理控制台
阿里云物联网平台最完全的使用教程

https://iot.console.aliyun.com/lk/summary
阿里云物联网平台最完全的使用教程

指出来的这几个是需要用到的功能

二、物联网平台的基本使用

1、创建产品,如下图

阿里云物联网平台最完全的使用教程
阿里云物联网平台最完全的使用教程

2、添加设备

阿里云物联网平台最完全的使用教程
阿里云物联网平台最完全的使用教程

三、设备接入物联网平台

1、开发环境设置

https://help.aliyun.com/document_detail/98292.html?spm=a2c4g.11186623.6.683.7d5b1f19UYzxqv
环境win10,pycharm2020,python3.8(Ubuntu16 64-bit和Ubuntu18 arm架构同理)
(1)python3.8的安装和pycharm的安装略过(python3.8需要安装pip下载工具)
(2)环境配置
无需参考官方文档配置虚拟环境,直接用pycharm就好了
直接win+R进入win10命令行控制环境
输入:

pip install aliyun-iot-linkkit

2、连接

https://help.aliyun.com/document_detail/98293.html?spm=a2c4g.11186623.6.684.61c84912ccMTDp
使用一机一密方式进行

from linkkit import linkkit
import sys
 
#一机一密认证
lk = linkkit.LinkKit(
    host_name="cn-shanghai",
    product_key="aG*******k",
    device_name="Test1",
    device_secret="****************")
 
#连接上物联网平台后的回调,成功连接session_flag和rc返回0
def onconnect(sessionflag, rc, userdata):
    print("onconnect:%d,rc:%d:" % (sessionflag, rc))
    pass
 
#断开连接物联网平台后的回调,断开后rc返回1
def on_disconnect(rc, userdata):
    print("on_disconnect:rc:%d:" % rc)
 
#当出现网络波动时,程序会自动循环调用连接,显示的效果为这两个回调函数会被一直调用
lk.onconnect = onconnect
lk.ondisconnect = ondisconnect
 
lk.connect_async()  # 连接物联网平台
lk.startworkerloop()  # 保持连接
 
print("connected")
注意

1、lk后面的是你创建产品设备对应的三元组|
2、注意所有的回调函数放在连接之前,程序会一直执行,只要出现相应的操作回调函数就会被调用,即只要连接上时,就输出rc=0,只要断开连接时,就输出rc=1 |
阿里云物联网平台最完全的使用教程

3、自定义MQTT通信

(1)创建自定义的Topic(注意:Topic的权限与代码中的函数要一一对应,例如权限为订阅,那么在通信时选择的应该是subscribe回调,可以接收到云端消息,发布同理)

阿里云物联网平台最完全的使用教程

Topic的名字是作为通信的凭证
阿里云物联网平台最完全的使用教程

(2)实现(首先需要连接上阿里云物联网平台,再构造逻辑进行相应操作,https://help.aliyun.com/document_detail/98295.html?spm=a2c4g.11186623.6.685.6d596dc9OWMDE9)

from linkkit import linkkit
import sys
import time
 
#一机一密认证
lk = linkkit.LinkKit(
    host_name="cn-shanghai",
    product_key="a*************Gk",
    device_name="Test1",
    device_secret="****************************")
 
 
#连接上物联网平台后的回调,成功连接session_flag和rc返回0
def onconnect(sessionflag, rc, userdata):
    print("onconnect:%d,rc:%d:" % (sessionflag, rc))
    pass
 
 
#断开连接物联网平台后的回调,断开后rc返回1
def on_disconnect(rc, userdata):
    print("on_disconnect:rc:%d:" % rc)
 
 
#订阅topic回调
def onsubscribetopic(mid, granted_qos, userdata):
    print("onsubscribetopic mid:%d, granted_qos:%s" %
          (mid, str(','.join('%s' % it for it in granted_qos))))
    pass
 
 
#取消订阅回调
def onunsubscribetopic(mid, userdata):
    print("onunsubscribetopic mid:%d" % mid)
    pass
 
 
#接收消息回调,调用函数时会一直执行print,效果为:如果调用该方法,发布消息的topic每次发布消息都会被输出到控制台并打印出来
def ontopicmessage(topic, payload, qos, userdata):
    print("ontopicmessage:" + topic + " payload:" + str(payload) + " qos:" + str(qos))
    pass
 
 
#当出现网络波动时,程序会自动循环调用连接,显示的效果为这两个回调函数会被一直调用
lk.onconnect = onconnect
lk.ondisconnect = ondisconnect
 
lk.onsubscribetopic = onsubscribetopic  # 订阅topic回调
lk.onunsubscribetopic = onunsubscribetopic  # 取消订阅topic回调
 
lk.ontopicmessage = ontopicmessage  # 接收topic消息回调
 
 
lk.connect_async()  # 连接物联网平台
lk.startworkerloop()  # 保持连接
 
print("connected")
 
 
 
#增加while循环的作用:保证物联网平台是连接上之后再进行通信的
while True:
    try:
        msg = input()  # 获取从控制台的输入
    except KeyboardInterrupt:
        sys.exit()
    else:
        if msg == "1":
            lk.disconnect()
        elif msg == "2":
            lk.connect_async()
        elif msg == "3":  # 输入为3时,订阅get这个topic,每个订阅的topic只需订阅一次即可,会在物联网平台的设备topic列表中显示
            rc, mid = lk.subscribetopic(lk.tofull_topic("user/get"))  # 注意topic只需要写成这样的格式即可,格式需要完全一致,全称会自动补全,不需要输入设备名
            if rc == 0:  # rc返回值为0时则表示订阅成功
                print("subscribe topic success:%r, mid:%r" % (rc, mid))
            else:
                print("subscribe topic fail:%d" % rc)
        elif msg == "4":  # 取消订阅
            rc, mid = lk.unsubscribetopic(lk.tofull_topic("user/get"))
            if rc == 0:
                print("unsubscribe topic success:%r, mid:%r" % (rc, mid))
            else:
                print("unsubscribe topic fail:%d" % rc)
        elif msg == "5":  # 发布消息“123”给自定义的reciver这个topic
            rc, mid = lk.publishtopic(lk.tofull_topic("user/test"), "123")
            if rc == 0:
                print("publish topic success:%r, mid:%r" % (rc, mid))
            else:
                print("publish topic fail:%d" % rc)
        elif msg == "6":  # 同时订阅多个topic
            rc, mid = lk.subscribetopic([(lk.tofull_topic("user/sender"), 1),
                                          (lk.tofulltopic("user/get"), 1),
                                          (lk.tofulltopic("user/test"), 1)])
            if rc == 0:
                print("subscribe multiple topics success:%r, mid:%r" % (rc, mid))
            else:
                print("subscribe multiple topics fail:%d" % rc)
        elif msg == "7":  # 同时取消订阅多个topic
            rc, mid = lk.unsubscribetopic([lk.tofulltopic("user/get"), lk.tofull_topic("user/test")])
            if rc == 0:
                print("unsubscribe multiple topics success:%r, mid:%r" % (rc, mid))
            else:
                print("unsubscribe multiple topics fail:%d" % rc)
        elif msg == "8":  # RRPC
             lk.ontopicmessage = ontopicmessage
        elif msg == "11":  # 物模型通信,属性上报
            prop_data = {
                "Test001": "hh",
                "memory_usage": 99
            }
            rc, requestid = lk.thingpostproperty(propdata)
            if rc == 0:
                print("propertydata post success:%r, requestid:%r" % (rc, request_id))
            else:
                print("property_data post fail:%d" % rc)
        elif msg == "12":  # 物模型通信,事件1上报
            event_data = {
                "Testdata001": 100
            }
            rc, requestid = lk.thingtriggerevent(("Test001event",event_data))
            if rc == 0:
                print("eventdata post success:%r, requestid:%r" % (rc, request_id))
            else:
                print("event_data post fail:%d" % rc)
        elif msg == "13":  # 物模型通信,事件2上报
            event_data = {
                "Testdata002": 1
            }
            rc, requestid = lk.thingtriggerevent(("Test002event",event_data))
            if rc == 0:
                print("eventdata post success:%r, requestid:%r" % (rc, request_id))
            else:
                print("event_data post fail:%d" % rc)
        elif msg == "98":  # 打印topic列表?
            ret = lk.dumpusertopics()
            print("user topics:%s", str(ret))
        elif msg == "99":  # 断开连接
            lk.destruct()
            print("destructed")
 
        else:
            sys.exit()
注意点

1、自行查看自定义MQTT通信的代码
2、接收消息回调,当云端发送消息到设备时发生作用,可将接收的数据输出到控制台(回调函数是程序执行过程中一直在执行的代码,只要满足相应的条件就会被运行)
3、注意Topic的格式为:“user/test”(只能是这样的,不需要输入完整的topic名称,sdk会自动补全名称,即如下图部分)
阿里云物联网平台最完全的使用教程

4、设备发送到云端消息查看,如下图:
阿里云物联网平台最完全的使用教程

4、物模型通信

(1)自定义物模型

点编辑草稿进行自定义物模型创建
阿里云物联网平台最完全的使用教程

自定义各项数据
阿里云物联网平台最完全的使用教程

点击生成设备端代码或者物模型TSL查看自定义的物模型的名称等信息(这个是作为物模型通信的凭证)
阿里云物联网平台最完全的使用教程

(2)实现(https://help.aliyun.com/document_detail/98370.html?spm=a2c4g.11186623.6.686.7e1352f7jxsvvk)

完整测试代码

from linkkit import linkkit
import sys
import time

# 一机一密认证
lk = linkkit.LinkKit(
    host_name="cn-shanghai",
    product_key="a1******Gk",
    device_name="Test1",
    device_secret="e2b*****************afd06abb")

# 配置物模型文件
lk.thing_setup("tsl.json")


# 物模型可用时回调函数
# def on_thing_enable(self, userdata):
#     print("on_thing_enable")
def on_thing_enable(userdata):
    print("on_thing_enable")
    pass


# 物模型不可用时回调函数
def on_thing_disable(userdata):
    print("on_thing_disable")


# 属性上报回调
# def on_thing_prop_post(self, request_id, code, data, message, userdata):
#     print("on_thing_prop_post request id:%s, code:%d, data:%s message:%s" % (request_id, code, str(data), message))
def on_thing_prop_post(request_id, code, data, message, userdata):
    print("on_thing_prop_post request id:%s, code:%d, data:%s message:%s" %
          (request_id, code, str(data), message))


# 事件上报回调
# def on_thing_event_post(self, event, request_id, code, data, message, userdata):
#     print("on_thing_event_post request id:%s, code:%d, data:%s message:%s" % (event, code, str(data), message))
def on_thing_event_post(event, request_id, code, data, message, userdata):
    print("on_thing_event_post event:%s,request id:%s, code:%d, data:%s, message:%s" %
          (event, request_id, code, str(data), message))


# RRPC请求回调
def on_topic_rrpc_message(id, topic, payload, qos, userdata):
    print("on_topic_rrpc_message: id:%s, topic:%s, payload:%s" % (id, topic, payload))
    lk.thing_answer_rrpc(id, payload)


# service请求回调
def on_thing_call_service(identifier, request_id, params, userdata):
    print("on_thing_call_service: identifier:%s, request_id:%s, params:%s" % (identifier, request_id, params))
    lk.thing_answer_service(identifier, request_id, 200, {})


# 连接上物联网平台后的回调,成功连接session_flag和rc返回0
def on_connect(session_flag, rc, userdata):
    print("on_connect:%d,rc:%d:" % (session_flag, rc))
    pass


# 断开连接物联网平台后的回调,断开后rc返回1
def on_disconnect(rc, userdata):
    print("on_disconnect:rc:%d:" % rc)


# 订阅topic回调
def on_subscribe_topic(mid, granted_qos, userdata):
    print("on_subscribe_topic mid:%d, granted_qos:%s" %
          (mid, str(','.join('%s' % it for it in granted_qos))))
    pass


# 取消订阅回调
def on_unsubscribe_topic(mid, userdata):
    print("on_unsubscribe_topic mid:%d" % mid)
    pass


# 接收消息回调,调用函数时会一直执行print,效果为:如果调用该方法,发布消息的topic每次发布消息都会被输出到控制台并打印出来
def on_topic_message(topic, payload, qos, userdata):
    print("on_topic_message:" + topic + " payload:" + str(payload) + " qos:" + str(qos))
    pass


# 当出现网络波动时,程序会自动循环调用连接,显示的效果为这两个回调函数会被一直调用
lk.on_connect = on_connect
lk.on_disconnect = on_disconnect

lk.on_subscribe_topic = on_subscribe_topic  # 订阅topic回调
lk.on_unsubscribe_topic = on_unsubscribe_topic  # 取消订阅topic回调

lk.on_topic_message = on_topic_message  # 接收topic消息回调

lk.on_topic_rrpc_message = on_topic_rrpc_message  # 普通,接收RRPC请求回调
lk.on_thing_call_service = on_thing_call_service  # 物模型,处理同步类型的service

lk.on_thing_enable = on_thing_enable  # 物模型功能可用时回调
lk.on_thing_disable = on_thing_disable  # 物模型功能不可用时回调

lk.on_thing_prop_post = on_thing_prop_post  # 属性上报成功时回调
lk.on_thing_event_post = on_thing_event_post  # 事件上报成功时回调


lk.connect_async()  # 连接物联网平台
lk.start_worker_loop()  # 保持连接

print("connected")



# 增加while循环的作用:保证物联网平台是连接上之后再进行通信的
while True:
    try:
        msg = input()  # 获取从控制台的输入
    except KeyboardInterrupt:
        sys.exit()
    else:
        if msg == "1":
            lk.disconnect()
        elif msg == "2":
            lk.connect_async()
        elif msg == "3":  # 输入为3时,订阅get这个topic,每个订阅的topic只需订阅一次即可,会在物联网平台的设备topic列表中显示
            rc, mid = lk.subscribe_topic(lk.to_full_topic("user/get"))  # 注意topic只需要写成这样的格式即可,格式需要完全一致,全称会自动补全,不需要输入设备名
            if rc == 0:  # rc返回值为0时则表示订阅成功
                print("subscribe topic success:%r, mid:%r" % (rc, mid))
            else:
                print("subscribe topic fail:%d" % rc)
        elif msg == "4":  # 取消订阅
            rc, mid = lk.unsubscribe_topic(lk.to_full_topic("user/get"))
            if rc == 0:
                print("unsubscribe topic success:%r, mid:%r" % (rc, mid))
            else:
                print("unsubscribe topic fail:%d" % rc)
        elif msg == "5":  # 发布消息“123”给自定义的test这个topic
            rc, mid = lk.publish_topic(lk.to_full_topic("user/test"), "123")
            if rc == 0:
                print("publish topic success:%r, mid:%r" % (rc, mid))
            else:
                print("publish topic fail:%d" % rc)
        elif msg == "6":  # 同时订阅多个topic
            rc, mid = lk.subscribe_topic([(lk.to_full_topic("user/sender"), 1),
                                          (lk.to_full_topic("user/get"), 1),
                                          (lk.to_full_topic("user/test"), 1)])
            if rc == 0:
                print("subscribe multiple topics success:%r, mid:%r" % (rc, mid))
            else:
                print("subscribe multiple topics fail:%d" % rc)
        elif msg == "7":  # 同时取消订阅多个topic
            rc, mid = lk.unsubscribe_topic([lk.to_full_topic("user/get"), lk.to_full_topic("user/test")])
            if rc == 0:
                print("unsubscribe multiple topics success:%r, mid:%r" % (rc, mid))
            else:
                print("unsubscribe multiple topics fail:%d" % rc)
        elif msg == "8":  # RRPC
            rc, mid = lk.subscribe_rrpc_topic("/testA")
            if rc == 0:  # rc返回值为0时则表示订阅成功
                print("subscribe topic success:%r, mid:%r" % (rc, mid))
            else:
                print("subscribe topic fail:%d" % rc)
        elif msg == "11":  # 物模型通信,属性上报
            prop_data = {
                "Test001": "hh",
                "memory_usage": 99
            }
            rc, request_id = lk.thing_post_property(prop_data)
            if rc == 0:
                print("property_data post success:%r, request_id:%r" % (rc, request_id))
            else:
                print("property_data post fail:%d" % rc)
        elif msg == "12":  # 物模型通信,事件1上报
            event_data = {
                "Testdata001": 100
            }
            rc, request_id = lk.thing_trigger_event(("Test001_event",event_data))
            if rc == 0:
                print("event_data post success:%r, request_id:%r" % (rc, request_id))
            else:
                print("event_data post fail:%d" % rc)
        elif msg == "13":  # 物模型通信,事件2上报
            event_data = {
                "Testdata002": 1
            }
            rc, request_id = lk.thing_trigger_event(("Test002_event",event_data))
            if rc == 0:
                print("event_data post success:%r, request_id:%r" % (rc, request_id))
            else:
                print("event_data post fail:%d" % rc)
        elif msg == "98":  # 打印topic列表?
            ret = lk.dump_user_topics()
            print("user topics:%s", str(ret))
        elif msg == "99":  # 断开连接
            lk.destruct()
            print("destructed")

        else:
            sys.exit()



RRPC代码

注意:上面实现的完整代码中rrpc部分和阿里云的文档具有差别,以我这个为准,主要是在回调函数的名称和函数定义是的self上的差别,还要注意订阅rrpc的topic时候的订阅方法和mqtt的订阅方法不一致

阿里云python sdk中的rrpc功能
rrpc在线调用
下面的代码中注意需要加上自定的topic时的格式需要和上面实现代码中的逻辑8中的topic格式一致

!/usr/bin/env python
coding=utf-8
 
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
client = AcsClient('LTAI************XR3', 'Dh******9fw', 'cn-shanghai')
 
request = CommonRequest()
request.setacceptformat('json')
request.set_domain('iot.cn-shanghai.aliyuncs.com')
request.set_method('POST')
request.setprotocoltype('https') # https | http
request.set_version('2018-01-20')
request.setactionname('RRpc')
 
request.addqueryparam('RegionId', "cn-shanghai")
request.addqueryparam('DeviceName', "Test1")
request.addqueryparam('Timeout', "5000")
request.addqueryparam('RequestBase64Byte', "dG*********Gxl")
request.addqueryparam('ProductKey', "a1V************Gk")
request.add_query_param('Topic', "/testA")
 
response = client.do_action(request)
python2:  print(response)
print(str(response, encoding = 'utf-8'))
注意点

1、首先注意要配置物模型文件,即lk.thing_setup(“tsl.json”)
2、注意属性、事件和服务的上报方式各不相同
3、查看上报数据,如下图
阿里云物联网平台最完全的使用教程

四、服务端开发

1、AMQP(https://help.aliyun.com/document_detail/142489.html?spm=a2c4g.11186623.6.623.3e36354e3xozA7)

python3的sdk补充链接

需要注意的是签名的url一栏填写的是物联网平台实例amqp服务链接中除掉amqp://后面的部分,如下图

阿里云物联网平台最完全的使用教程

(1)环境设置(Qpid Proton 0.29.0直接下载地址:下载网站)

1)Ubuntu18,Python2.7,Qpid Proton 0.29.0(当前测试环境,证实可用)

安装教程

2)其他Linux系统下的python2.7和win10下的C++

安装教程

3)测试proton是否可用
import proton;print('%s' % 'SSL present' if proton.SSL.present() else 'SSL NOT AVAILABLE')

(2)实现(https://help.aliyun.com/document_detail/143597.html?spm=a2c4g.11186623.6.626.3800719ftRcJ40)

# encoding=utf-8
import sys
import logging
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container
import hashlib
import hmac
import base64

reload(sys)
sys.setdefaultencoding('utf-8')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler(sys.stdout)

def current_time_millis():
    return str(int(round(time.time() * 1000)))


def do_sign(secret, sign_content):
    m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
    return base64.b64encode(m.digest())


class AmqpClient(MessagingHandler):
    def __init__(self):
        super(AmqpClient, self).__init__()

    def on_start(self, event):
        #  接入域名,请参见AMQP客户端接入说明文档。
        url = "amqps://18************019.iot-amqp.cn-shanghai.aliyuncs.com:5671"
        accessKey = "LTA*****************XR3"
        accessSecret = "Dhc*************Q19fw"
        consumerGroupId = "xoZ***********0100"
        # iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。
        iotInstanceId = ""
        clientId = "test1"
        # 签名方法:支持hmacmd5,hmacsha1和hmacsha256。
        signMethod = "hmacsha1"
        timestamp = current_time_millis()
        # userName组装方法,请参见AMQP客户端接入说明文档。
        userName = clientId + "|authMode=aksign" + ",signMethod=" + signMethod 
                        + ",timestamp=" + timestamp + ",authId=" + accessKey 
                        + ",iotInstanceId=" + iotInstanceId + ",consumerGroupId=" + consumerGroupId + "|"
        signContent = "authId=" + accessKey + "&timestamp=" + timestamp
        # 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
        passWord = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
        conn = event.container.connect(url, user=userName, password=passWord, heartbeat=60)
        self.receiver = event.container.create_receiver(conn)

    # 当连接成功建立被调用。
    def on_connection_opened(self, event):
        logger.info("Connection established, remoteUrl: %s", event.connection.hostname)

    # 当连接关闭时被调用。
    def on_connection_closed(self, event):
        logger.info("Connection closed: %s", self)

    # 当远端因错误而关闭连接时被调用。
    def on_connection_error(self, event):
        logger.info("Connection error")

    # 当建立AMQP连接错误时被调用,包括身份验证错误和套接字错误。
    def on_transport_error(self, event):
        if event.transport.condition:
            if event.transport.condition.info:
                logger.error("%s: %s: %s" % (
                    event.transport.condition.name, event.transport.condition.description,
                    event.transport.condition.info))
            else:
                logger.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
        else:
            logging.error("Unspecified transport error")

    # 当收到消息时被调用。
    def on_message(self, event):
        message = event.message
        content = message.body.decode('utf-8')
        topic = message.properties.get("topic")
        message_id = message.properties.get("messageId")
        print("receive message: message_id=%s, topic=%s, content=%s" % (message_id, topic, content))
        event.receiver.flow(1)

Container(AmqpClient()).run()
注意

1、文中格式一定需要保持一致,accessKey、accessSecret注意要保证该RAM账号具有物联网平台权限
url:
阿里云物联网平台最完全的使用教程
consumerGroupId:
阿里云物联网平台最完全的使用教程

2、MNS(https://help.aliyun.com/document_detail/32305.html?spm=a2c4g.11186623.6.624.20b33dc31he3Dc,https://help.aliyun.com/document_detail/68948.html?spm=a2c4g.11186623.6.629.7c8b5608a7Kq8s)

(收费较高不考虑)
下载对应SDK后就在根目录进行调试
若是能够调试出接收消息的代码,请发给我,感谢

#!/usr/bin/env python
#coding=utf8

# Copyright (C) 2015, Alibaba Cloud Computing

#Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

#The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

import sys
import time
from mns.account import Account
from mns.queue import *
from mns.topic import *
from mns.subscription import *
try:
    import configparser as ConfigParser
except ImportError:
    import ConfigParser as ConfigParser

cfgFN = "sample.cfg"
required_ops = [("Base", "AccessKeyId"), ("Base", "AccessKeySecret"), ("Base", "Endpoint")]
optional_ops = [("Optional", "SecurityToken")]

parser = ConfigParser.ConfigParser()
parser.read(cfgFN)
for sec,op in required_ops:
    if not parser.has_option(sec, op):
        sys.stderr.write("ERROR: need (%s, %s) in %s.n" % (sec,op,cfgFN))
        sys.stderr.write("Read README to get help inforamtion.n")
        sys.exit(1)

#获取配置信息
## AccessKeyId      阿里云官网获取
## AccessKeySecret  阿里云官网获取
## Endpoint         阿里云消息和通知服务官网获取, Example: http://$AccountId.mns.cn-hangzhou.aliyuncs.com
## WARNING: Please do not hard code your accessId and accesskey in next line.(more information: https://yq.aliyun.com/articles/55947)
accessKeyId = parser.get("Base", "AccessKeyId")
accessKeySecret = parser.get("Base", "AccessKeySecret")
endpoint = parser.get("Base", "Endpoint")
securityToken = ""
if parser.has_option("Optional", "SecurityToken") and parser.get("Optional", "SecurityToken") != "$SecurityToken":
    securityToken = parser.get("Optional", "SecurityToken")


#初始化my_account
my_account = Account(endpoint, accessKeyId, accessKeySecret, securityToken)

##############Queue 相关操作#####################
my_queue = my_account.get_queue("MyQueue-%s" % time.strftime("%y%m%d-%H%M%S", time.localtime()))

#创建队列
## message被receive后,持续不可消费的时间   100秒
## message body的最大长度                   10240Byte
## message最长存活时间                      3600秒
## 新message可消费的默认延迟时间            10秒
## receive message时,长轮询时间            20秒
queue_meta = QueueMeta()
queue_meta.set_visibilitytimeout(100)
queue_meta.set_maximum_message_size(10240)
queue_meta.set_message_retention_period(3600)
queue_meta.set_delay_seconds(10)
queue_meta.set_polling_wait_seconds(20)
queue_meta.set_logging_enabled(True)
try:
    queue_url = my_queue.create(queue_meta)
    sys.stdout.write("Create Queue Succeed!nQueueURL:%snn" % queue_url)
except MNSExceptionBase as e:
    sys.stderr.write("Create Queue Fail!nException:%snn" % e)
    sys.exit(1)

#修改队列属性
## message被receive后,持续不可消费的时间   50秒
## message body的最大长度                   5120Byte
## message最长存活时间                      1800秒
## 新message可消费的默认延迟时间            5秒
## receive message时,长轮询时间            10秒
queue_meta = QueueMeta()
queue_meta.set_visibilitytimeout(50)
queue_meta.set_maximum_message_size(5120)
queue_meta.set_message_retention_period(1800)
queue_meta.set_delay_seconds(5)
queue_meta.set_polling_wait_seconds(10)
try:
    queue_url = my_queue.set_attributes(queue_meta)
    sys.stdout.write("Set Queue Attributes Succeed!nn")
except MNSExceptionBase as e:
    sys.stderr.write("Set Queue Attributes Fail!nException:%snn" % e)
    sys.exit(1)

#获取队列属性
## 除可设置属性外,返回如下属性:
## ActiveMessages:      可消费消息数,近似值
## InactiveMessages:   正在被消费的消息数,近似值
## DelayMessages:      延迟消息数,近似值
## CreateTime:         queue创建时间,单位:秒
## LastModifyTime:     修改queue属性的最近时间,单位:秒
try:
    queue_meta = my_queue.get_attributes()
    sys.stdout.write("Get Queue Attributes Succeed! 
                      nQueueName: %snVisibilityTimeout: %s 
                      nMaximumMessageSize: %snDelaySeconds: %s 
                      nPollingWaitSeconds: %snActiveMessages: %s 
                      nInactiveMessages: %snDelayMessages: %s 
                      nCreateTime: %snLastModifyTime: %snn" %
                      (queue_meta.queue_name, queue_meta.visibility_timeout,
                       queue_meta.maximum_message_size, queue_meta.delay_seconds,
                       queue_meta.polling_wait_seconds, queue_meta.active_messages,
                       queue_meta.inactive_messages, queue_meta.delay_messages,
                       queue_meta.create_time, queue_meta.last_modify_time))
except MNSExceptionBase as e:
    sys.stderr.write("Get Queue Attributes Fail!nException:%snn" % e)
    sys.exit(1)

#列出所有队列
## prefix               指定queue name前缀
## ret_number           单次list_queue最大返回队列个数
## marker               list_queue的开始位置; 当一次list queue不能列出所有队列时,返回的next_marker作为下一次list queue的marker参数
try:
    prefix = u""
    ret_number = 10
    marker = u""
    total_qcount = 0
    while(True):
        queue_url_list, next_marker = my_account.list_queue(prefix, ret_number, marker)
        total_qcount += len(queue_url_list)
        for queue_url in queue_url_list:
            sys.stdout.write("QueueURL:%sn" % queue_url)
        if(next_marker == ""):
            break
        marker = next_marker
    sys.stdout.write("List Queue Succeed! Total Queue Count:%s!nn" % total_qcount)
except MNSExceptionBase as e:
    sys.stderr.write("List Queue Fail!nException:%snn" % e)
    sys.exit(1)

#发送消息
## set_delayseconds     设置消息的延迟时间,单位:秒
## set_priority         设置消息的优先级
## 返回如下属性:
## MessageId            消息编号
## MessageBodyMd5       消息正文的MD5值
msg_body = "I am test Message."
message = Message(msg_body)
message.set_delayseconds(0)
message.set_priority(10)
try:
    send_msg = my_queue.send_message(message)
    sys.stdout.write("Send Message Succeed.nMessageBody:%snMessageId:%snMessageBodyMd5:%snn" % (msg_body, send_msg.message_id, send_msg.message_body_md5))
except MNSExceptionBase as e:
    sys.stderr.write("Send Message Fail!nException:%snn" % e)
    sys.exit(1)
本文章来源于互联网,如有侵权,请联系删除!原文地址:阿里云物联网平台最完全的使用教程

相关推荐: 物联网练习题

1. 【判断】NB-IoT的系统带宽是200KHz。 对 2. 【单选】以下哪个不属于物联网操作系统? D.STM32 3. 【单选】以下选项中,哪项不属于HuaweiLiteOS中间框架? D.开放API 4. 【单选】物联网中,eSIM卡和vSIM卡,可通…