GoFrame 框架中使用 mqtt 协议通讯

 Golang  2024-03-30  admin  733  989

GoFrame 框架中使用 mqtt

// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co

package mqtt

import (
	"flag"
	"gfmqtt/internal/mqtt/app"
	"github.com/mochi-mqtt/server/v2/hooks/auth"
	"github.com/mochi-mqtt/server/v2/hooks/storage/badger"
	"github.com/mochi-mqtt/server/v2/packets"
	"log"
	"math"
	"os"
	"os/signal"
	"syscall"

	mqttLib "github.com/mochi-mqtt/server/v2"
	"github.com/mochi-mqtt/server/v2/listeners"
)

func Server() {
	tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
	wsAddr := flag.String("ws", ":1882", "network address for Websocket listener")
	infoAddr := flag.String("info", ":1881", "network address for web info dashboard listener")
	flag.Parse()

	// 创建信号用于等待服务端关闭信号
	sigs := make(chan os.Signal, 1)
	done := make(chan bool, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigs
		done <- true
	}()

	// 创建新的 MQTT 服务器。
	//server := mqttLib.New(nil)
	server := mqttLib.New(&mqttLib.Options{
		//Capabilities:             mqttLib.NewDefaultServerCapabilities(),
		Capabilities: &mqttLib.Capabilities{
			MaximumMessageExpiryInterval: 60 * 60 * 24,   // maximum message expiry if message expiry is 0 or over
			MaximumClientWritesPending:   1024 * 8,       // maximum number of pending message writes for a client
			MaximumSessionExpiryInterval: math.MaxUint32, // maximum number of seconds to keep disconnected sessions
			MaximumPacketSize:            0,              // no maximum packet size
			//maximumPacketID:              math.MaxUint16,
			ReceiveMaximum:         1024,           // maximum number of concurrent qos messages per client
			MaximumInflight:        1024 * 8,       // maximum number of qos > 0 messages can be stored
			TopicAliasMaximum:      math.MaxUint16, // maximum topic alias value
			SharedSubAvailable:     1,              // shared subscriptions are available
			MinimumProtocolVersion: 3,              // minimum supported mqtt version (3.0.0)
			MaximumQos:             2,              // maximum qos value available to clients
			RetainAvailable:        1,              // retain messages is available
			WildcardSubAvailable:   1,              // wildcard subscriptions are available
			SubIDAvailable:         1,              // subscription identifiers are available
		},
		ClientNetWriteBufferSize: 4096,
		ClientNetReadBufferSize:  4096,
		SysTopicResendInterval:   10,
		InlineClient:             true,
	})

	// 允许所有连接(权限)。
	_ = server.AddHook(new(auth.AllowHook), nil)
	_ = server.AddHook(new(app.MyHook), nil)

	_ = server.AddHook(new(badger.Hook), &badger.Options{
		Path: "aa.txt",
	})

	// 在标1883端口上创建一个 TCP 服务端。
	tcp := listeners.NewTCP("t1", *tcpAddr, nil)
	err := server.AddListener(tcp)
	if err != nil {
		log.Fatal(err)
	}

	ws := listeners.NewWebsocket("ws1", *wsAddr, nil)
	err = server.AddListener(ws)
	if err != nil {
		log.Fatal(err)
	}

	stats := listeners.NewHTTPStats("stats", *infoAddr, nil, server.Info)
	err = server.AddListener(stats)
	if err != nil {
		log.Fatal(err)
	}

	go func() {
		err := server.Serve()
		if err != nil {
			log.Fatal(err)
		}
	}()

	callbackFn := func(cl *mqttLib.Client, sub packets.Subscription, pk packets.Packet) {
		server.Log.Info("inline client received message from subscription", "client", cl.ID, "subscriptionId", sub.Identifier, "topic", pk.TopicName, "payload", string(pk.Payload))
	}
	server.Subscribe("result", 1, callbackFn)

	// 服务端等待关闭信号
	<-done

	// 关闭服务端时需要做的一些清理工作
	server.Log.Warn("caught signal, stopping...")
	_ = server.Close()
	server.Log.Info("main.go finished")

}

hook 处理

package app

import (
    "bytes"
    "github.com/gogf/gf/v2/encoding/gjson"
    "github.com/gogf/gf/v2/frame/g"
    mqtt "github.com/mochi-mqtt/server/v2"
    "github.com/mochi-mqtt/server/v2/packets"
    "log"
)

type MyHook struct {
    mqtt.HookBase
}

// ID returns the ID of the hook.
func (h *MyHook) ID() string {
    return "OnPacketRead"
}

// Provides indicates which hook methods this hook provides.
func (h *MyHook) Provides(b byte) bool {
    return bytes.Contains([]byte{
       mqtt.OnPacketRead,
    }, []byte{b})
}

// OnPacketRead 测试
func (h *MyHook) OnPacketRead(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
    //log.Println(pk.Payload)
    log.Println(string(pk.Payload))
    g.Dump(gjson.New(pk.Payload).String())
    g.Dump(pk.FixedHeader.Qos)

    //cl.WritePacket("hello")
    return pk, nil
}

在 internal/cmd/cmd.go

package cmd

import (
    "context"
    "gfmqtt/internal/mqtt"
    "github.com/gogf/gf/v2/frame/g"
    "github.com/gogf/gf/v2/net/ghttp"
    "github.com/gogf/gf/v2/os/gcmd"

    "gfmqtt/internal/controller/hello"
)

var (
    Main = gcmd.Command{
       Name:  "main",
       Usage: "main",
       Brief: "start http server",
       Func: func(ctx context.Context, parser *gcmd.Parser) (err error) {
          go mqtt.Server()

          s := g.Server()
          s.Group("/", func(group *ghttp.RouterGroup) {
             group.Middleware(ghttp.MiddlewareHandlerResponse)
             group.Bind(
                hello.NewV1(),
             )
          })

          s.Run()
          return nil
       },
    }
)


如果文章对您有帮助,点击下方的广告,支持一下作者吧!

相关推荐


esp32 使用mqtt 通讯

esp32 使用mqtt 通讯基于go的mqtt 客户端packagemain import( &quot;fmt&quot; &quot;github.com/gogf/gf/v2/encoding/gjson&quot; &quot;github.com/gogf/gf/v2/frame/g&quot; &quot;gobot.io/x/gobot&quot; &qu

workerman是一个高性能的PHP socket 服务器框架

Workerman是一款开源高性能异步PHP socket即时通讯框架。支持高并发,超高稳定性,被广泛的用于手机app、移动通讯,微信小程序,手游服务端、网络游戏、PHP聊天室、硬件通讯、智能家居、车联网、物联网等领域的开发。 支持TCP长连接,支持Websocket、HTTP等协议,支持自定义协议。拥有异步Mysql、异步Redis、异步Http、MQTT物联网客户端、异步消息队列等众多高性能组