Skip to content

Commit 5267762

Browse files
authored
Add aliyun MNS queue support. (#19)
* Add aliyun MNS queue support. * Update mns queue: support credential refresh. * Add region in option. * Add examples for MNS queue. * Fix: do not return err when receive messages in empty queue. * Update comment.
1 parent 8a915d9 commit 5267762

File tree

6 files changed

+615
-12
lines changed

6 files changed

+615
-12
lines changed

cloud/common.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,24 @@ const (
2020
StandaloneRedisProviderV7 Provider = "standalone_redis_v7"
2121
ClusterRedisProviderV7 Provider = "cluster_redis_v7"
2222
AliCloudStorageProvider Provider = "alicloud_storage"
23+
AliCloudMNSQueueProvider Provider = "alicloud_mns_queue"
24+
)
25+
26+
type AliCloudCredentialType string
27+
28+
const (
29+
AliCloudAccessKeyCredentialType AliCloudCredentialType = "access_key"
30+
AliCloudECSRamRoleCredentialType AliCloudCredentialType = "ecs_ram_role"
31+
AliCloudOIDCRoleARNCredentialType AliCloudCredentialType = "oidc_role_arn"
32+
)
33+
34+
const (
35+
AliCloudEnvAccessKeyID = "ALIBABA_CLOUD_ACCESS_KEY_ID"
36+
AliCloudEnvAccessKeySecret = "ALIBABA_CLOUD_ACCESS_KEY_SECRET"
37+
AliCloudEnvRoleArn = "ALIBABA_CLOUD_ROLE_ARN"
38+
AliCloudEnvOIDCProviderArn = "ALIBABA_CLOUD_OIDC_PROVIDER_ARN"
39+
AliCloudEnvOIDCTokenFile = "ALIBABA_CLOUD_OIDC_TOKEN_FILE"
40+
AliCloudEnvRoleSessionName = "ALIBABA_CLOUD_ROLE_SESSION_NAME"
2341
)
2442

2543
var (

cloud/examples/queue/queue.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,24 @@ func main() {
7070
},
7171
}
7272
queue_examples("redis_cluster_queue_v7", clusterRedisQueueOptionV7)
73+
74+
// alicloud MNS queue: access_key CredentialType
75+
mnsQueueOption := queue.AliMNSClientOption{
76+
EndPoint: "http://account-id.mns.region.aliyuncs.com",
77+
CredentialType: cloud.AliCloudAccessKeyCredentialType,
78+
AccessKeyId: "alicloud_access_key_id",
79+
AccessKeySecret: "alicloud_access_key_secret",
80+
MessagePriority: 10,
81+
}
82+
queue_examples("mns_queue_name", mnsQueueOption)
83+
84+
// alicloud MNS queue: ecs_ram_role credentialType
85+
mnsQueueOption = queue.AliMNSClientOption{
86+
EndPoint: "http://account-id.mns.region.aliyuncs.com",
87+
CredentialType: cloud.AliCloudECSRamRoleCredentialType,
88+
}
89+
queue_examples("mns_queue_name", mnsQueueOption)
90+
alicloud_mns_queue_examples("mns_queue_name", mnsQueueOption)
7391
}
7492

7593
func queue_examples(queueOrTopicName string, option cloud.Option) {
@@ -125,3 +143,62 @@ func queue_examples(queueOrTopicName string, option cloud.Option) {
125143
fmt.Printf("ack message %s\n", message.Body())
126144
}
127145
}
146+
147+
// The following examples show mns queue specific examples: How to set message priority; how to set long polling period seconds.
148+
func alicloud_mns_queue_examples(queueOrTopicName string, option cloud.Option) {
149+
service, err := queue.GetQueueService(queueOrTopicName, option)
150+
if err != nil {
151+
fmt.Printf("get queue service error %s %+v %s\n", queueOrTopicName, option, err)
152+
return
153+
}
154+
defer service.Close()
155+
fmt.Printf("get service %+v\n", service)
156+
157+
producer, err := service.CreateProducer()
158+
if err != nil {
159+
fmt.Printf("create producer error %s\n", err)
160+
return
161+
}
162+
defer producer.Close()
163+
164+
consumer, err := service.CreateConsumer()
165+
if err != nil {
166+
fmt.Printf("create consumer error %s\n", err)
167+
return
168+
}
169+
defer consumer.Close()
170+
171+
ts := int(time.Now().Unix())
172+
var messages []string
173+
for i := 0; i < 3; i++ {
174+
messages = append(messages, fmt.Sprintf("message %d", ts+i))
175+
}
176+
for _, message := range messages {
177+
// rewrite message priority
178+
ctx := context.WithValue(context.TODO(), queue.ContextKeyAliMNSMessagePriority, 5)
179+
err = producer.SendMessage(ctx, message)
180+
if err != nil {
181+
fmt.Printf("producer send message error %s", err)
182+
return
183+
}
184+
fmt.Printf("producer send message %s\n", message)
185+
}
186+
// rewrite long polling period
187+
ctx := context.WithValue(context.TODO(), queue.ContextKeyAliMNSLongPollingWaitSeconds, 10)
188+
receivedMsgs, err := consumer.ReceiveMessages(ctx, 10)
189+
if err != nil {
190+
fmt.Printf("receive messages error %s", err)
191+
return
192+
}
193+
for _, message := range receivedMsgs {
194+
fmt.Printf("received message %s\n", message.Body())
195+
}
196+
for _, message := range receivedMsgs {
197+
err := consumer.AckMessage(context.TODO(), message)
198+
if err != nil {
199+
fmt.Printf("ack message error %s %s\n", message.Body(), err)
200+
return
201+
}
202+
fmt.Printf("ack message %s\n", message.Body())
203+
}
204+
}

0 commit comments

Comments
 (0)