Toggle navigation
首页
[
Markdown
]
``` $> ./nsqlookupd $> ./nsqd --lookupd-tcp-address=127.0.0.1:4160 $> ./nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1 -tcp-address=127.0.0.1:4154 -http-address="0.0.0.0:4155" --data-path=/data/nsqdata $> ./nsqadmin --lookupd-http-address=127.0.0.1:4161 ``` php-nsq pub ```php $nsqd_addr = array( "127.0.0.1:4150", "127.0.0.1:4154" ); $nsq = new Nsq(); $is_true = $nsq->connect_nsqd($nsqd_addr); for($i = 0; $i < 20; $i++){ $nsq->publish("test", "nihao"); } ``` php-nsq 延时pub ```php $deferred = new Nsq(); $isTrue = $deferred->connectNsqd($nsqdAddr); for($i = 0; $i < 20; $i++){ $deferred->deferredPublish("test", "message daly", 3000); // 第三值默认范围 millisecond default : [0 < millisecond < 3600000] ,可以更改 上面已提到 } ``` php-nsq sub ```php $nsq_lookupd = new NsqLookupd("127.0.0.1:4161"); //the nsqlookupd tcp addr $nsq = new Nsq(); $config = array( "topic" => "test", "channel" => "struggle", "rdy" => 2, //optional , default 1 "connect_num" => 1, //optional , default 1 "retry_delay_time" => 5000, //optional, default 0 , after 5000 msec, message will be retried ); $nsq->subscribe($nsq_lookupd, $config, function($msg,$bev){ echo $msg->payload; echo $msg->attempts; echo $msg->message_id; echo $msg->timestamp; }); ``` go client pub ``` package main import ( "github.com/nsqio/go-nsq" ) var producer *nsq.Producer func main() { nsqd := "127.0.0.1:4150" producer, err := nsq.NewProducer(nsqd, nsq.NewConfig()) producer.Publish("test", []byte("nihao")) if err != nil { panic(err) } } ``` go client sub ``` package main import ( "fmt" "sync" "github.com/nsqio/go-nsq" ) type NSQHandler struct {} func (this *NSQHandler) HandleMessage(msg *nsq.Message) error { fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body)) return nil } func testNSQ() { waiter := sync.WaitGroup{} waiter.Add(1) go func() { defer waiter.Done() config:=nsq.NewConfig() config.MaxInFlight=9 //建立多个连接 for i := 0; i<10; i++ { consumer, err := nsq.NewConsumer("test", "struggle", config) if nil != err { fmt.Println("err", err) return } consumer.AddHandler(&NSQHandler{}) err = consumer.ConnectToNSQD("127.0.0.1:4150") if nil != err { fmt.Println("err", err) return } } select{} }() waiter.Wait() } func main() { testNSQ(); } ```
[
Html
]