GoFrame 框架中使用 mqtt
// SPDX-License-Identifier: MIT // SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co // SPDX-FileContributor: mochi-co package mqtt import ( "flag" "gfmqtt/internal/mqtt/app" "github.com/mochi-mqtt/server/v2/hooks/auth" "github.com/mochi-mqtt/server/v2/hooks/storage/badger" "github.com/mochi-mqtt/server/v2/packets" "log" "math" "os" "os/signal" "syscall" mqttLib "github.com/mochi-mqtt/server/v2" "github.com/mochi-mqtt/server/v2/listeners" ) func Server() { tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener") wsAddr := flag.String("ws", ":1882", "network address for Websocket listener") infoAddr := flag.String("info", ":1881", "network address for web info dashboard listener") flag.Parse() // 创建信号用于等待服务端关闭信号 sigs := make(chan os.Signal, 1) done := make(chan bool, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigs done <- true }() // 创建新的 MQTT 服务器。 //server := mqttLib.New(nil) server := mqttLib.New(&mqttLib.Options{ //Capabilities: mqttLib.NewDefaultServerCapabilities(), Capabilities: &mqttLib.Capabilities{ MaximumMessageExpiryInterval: 60 * 60 * 24, // maximum message expiry if message expiry is 0 or over MaximumClientWritesPending: 1024 * 8, // maximum number of pending message writes for a client MaximumSessionExpiryInterval: math.MaxUint32, // maximum number of seconds to keep disconnected sessions MaximumPacketSize: 0, // no maximum packet size //maximumPacketID: math.MaxUint16, ReceiveMaximum: 1024, // maximum number of concurrent qos messages per client MaximumInflight: 1024 * 8, // maximum number of qos > 0 messages can be stored TopicAliasMaximum: math.MaxUint16, // maximum topic alias value SharedSubAvailable: 1, // shared subscriptions are available MinimumProtocolVersion: 3, // minimum supported mqtt version (3.0.0) MaximumQos: 2, // maximum qos value available to clients RetainAvailable: 1, // retain messages is available WildcardSubAvailable: 1, // wildcard subscriptions are available SubIDAvailable: 1, // subscription identifiers are available }, ClientNetWriteBufferSize: 4096, ClientNetReadBufferSize: 4096, SysTopicResendInterval: 10, InlineClient: true, }) // 允许所有连接(权限)。 _ = server.AddHook(new(auth.AllowHook), nil) _ = server.AddHook(new(app.MyHook), nil) _ = server.AddHook(new(badger.Hook), &badger.Options{ Path: "aa.txt", }) // 在标1883端口上创建一个 TCP 服务端。 tcp := listeners.NewTCP("t1", *tcpAddr, nil) err := server.AddListener(tcp) if err != nil { log.Fatal(err) } ws := listeners.NewWebsocket("ws1", *wsAddr, nil) err = server.AddListener(ws) if err != nil { log.Fatal(err) } stats := listeners.NewHTTPStats("stats", *infoAddr, nil, server.Info) err = server.AddListener(stats) if err != nil { log.Fatal(err) } go func() { err := server.Serve() if err != nil { log.Fatal(err) } }() callbackFn := func(cl *mqttLib.Client, sub packets.Subscription, pk packets.Packet) { server.Log.Info("inline client received message from subscription", "client", cl.ID, "subscriptionId", sub.Identifier, "topic", pk.TopicName, "payload", string(pk.Payload)) } server.Subscribe("result", 1, callbackFn) // 服务端等待关闭信号 <-done // 关闭服务端时需要做的一些清理工作 server.Log.Warn("caught signal, stopping...") _ = server.Close() server.Log.Info("main.go finished") }
hook 处理
package app import ( "bytes" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/frame/g" mqtt "github.com/mochi-mqtt/server/v2" "github.com/mochi-mqtt/server/v2/packets" "log" ) type MyHook struct { mqtt.HookBase } // ID returns the ID of the hook. func (h *MyHook) ID() string { return "OnPacketRead" } // Provides indicates which hook methods this hook provides. func (h *MyHook) Provides(b byte) bool { return bytes.Contains([]byte{ mqtt.OnPacketRead, }, []byte{b}) } // OnPacketRead 测试 func (h *MyHook) OnPacketRead(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) { //log.Println(pk.Payload) log.Println(string(pk.Payload)) g.Dump(gjson.New(pk.Payload).String()) g.Dump(pk.FixedHeader.Qos) //cl.WritePacket("hello") return pk, nil }
在 internal/cmd/cmd.go
package cmd import ( "context" "gfmqtt/internal/mqtt" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/os/gcmd" "gfmqtt/internal/controller/hello" ) var ( Main = gcmd.Command{ Name: "main", Usage: "main", Brief: "start http server", Func: func(ctx context.Context, parser *gcmd.Parser) (err error) { go mqtt.Server() s := g.Server() s.Group("/", func(group *ghttp.RouterGroup) { group.Middleware(ghttp.MiddlewareHandlerResponse) group.Bind( hello.NewV1(), ) }) s.Run() return nil }, } )