$> ./nsqlookupd
$> ./nsqd --lookupd-tcp-address=127.0.0.1:4160
$> ./nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1 -tcp-address=127.0.0.1:4154 -http-address="0.0.0.0:4155" --data-path=/data/nsqdata
$> ./nsqadmin --lookupd-http-address=127.0.0.1:4161
php-nsq pub
$nsqd_addr = array(
"127.0.0.1:4150",
"127.0.0.1:4154"
);
$nsq = new Nsq();
$is_true = $nsq->connect_nsqd($nsqd_addr);
for($i = 0; $i < 20; $i++){
$nsq->publish("test", "nihao");
}
php-nsq 延时pub
$deferred = new Nsq();
$isTrue = $deferred->connectNsqd($nsqdAddr);
for($i = 0; $i < 20; $i++){
$deferred->deferredPublish("test", "message daly", 3000); // 第三值默认范围 millisecond default : [0 < millisecond < 3600000] ,可以更改 上面已提到
}
php-nsq sub
$nsq_lookupd = new NsqLookupd("127.0.0.1:4161"); //the nsqlookupd tcp addr
$nsq = new Nsq();
$config = array(
"topic" => "test",
"channel" => "struggle",
"rdy" => 2, //optional , default 1
"connect_num" => 1, //optional , default 1
"retry_delay_time" => 5000, //optional, default 0 , after 5000 msec, message will be retried
);
$nsq->subscribe($nsq_lookupd, $config, function($msg,$bev){
echo $msg->payload;
echo $msg->attempts;
echo $msg->message_id;
echo $msg->timestamp;
});
go client pub
package main
import (
"github.com/nsqio/go-nsq"
)
var producer *nsq.Producer
func main() {
nsqd := "127.0.0.1:4150"
producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
producer.Publish("test", []byte("nihao"))
if err != nil {
panic(err)
}
}
go client sub
package main
import (
"fmt"
"sync"
"github.com/nsqio/go-nsq"
)
type NSQHandler struct {}
func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
func testNSQ() {
waiter := sync.WaitGroup{}
waiter.Add(1)
go func() {
defer waiter.Done()
config:=nsq.NewConfig()
config.MaxInFlight=9
//建立多个连接
for i := 0; i<10; i++ {
consumer, err := nsq.NewConsumer("test", "struggle", config)
if nil != err {
fmt.Println("err", err)
return
}
consumer.AddHandler(&NSQHandler{})
err = consumer.ConnectToNSQD("127.0.0.1:4150")
if nil != err {
fmt.Println("err", err)
return
}
}
select{}
}()
waiter.Wait()
}
func main() {
testNSQ();
}
标签: nsq, go, golang