I have a function designed to listen to a Nats subject and route the messages as it receives them:
func (conn *JetStreamConnection) SubscribeMultiple(ctx context.Context, subject string,
subscribers ...*SubscriptionCallback) error {
callbacks := make(map[string]func(*pnats.NatsMessage) (func(context.Context), error))
for _, subscriber := range subscribers {
callbacks[subscriber.Category] = subscriber.Callback
}
fullSubject := fmt.Sprintf("%s.*", subject)
sub, err := conn.context.SubscribeSync(fullSubject, nats.Context(ctx))
if err != nil {
return err
}
loop:
for {
select {
case <-ctx.Done():
break loop
default:
}
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
return err
}
msg.InProgress()
var message pnats.NatsMessage
if err := conn.unmarshaller(msg.Data, &message); err != nil {
msg.Term()
return err
}
actualSubject := fmt.Sprintf("%s.%s", subject, message.Context.Category)
subscriber, ok := callbacks[message.Context.Category]
if !ok {
msg.Nak()
continue
}
callback, err := subscriber(&message)
if err == nil {
msg.Ack()
} else {
msg.Nak()
return err
}
callback(ctx)
}
if err := sub.Unsubscribe(); err != nil {
return err
}
return nil
}
My problem is that, since the SubscribeSync
function produces a *nats.Subscription
object, I have no way to mock out the test. How can I test around this object?
CodePudding user response:
You can put your loop in a separate function. This func can accept an interface that describes nats Subscription
instead of *nats.Subscription
. This way you will be able to create Subscription
mocks with gomock or other tools. After that you can test the inside func separately
Something like this:
func (conn *JetStreamConnection) SubscribeMultiple(ctx context.Context, subject string,
subscribers ...*SubscriptionCallback) error {
callbacks := make(map[string]func(*pnats.NatsMessage) (func(context.Context), error))
for _, subscriber := range subscribers {
callbacks[subscriber.Category] = subscriber.Callback
}
fullSubject := fmt.Sprintf("%s.*", subject)
sub, err := conn.context.SubscribeSync(fullSubject, nats.Context(ctx))
if err != nil {
return err
}
return run(ctx, sub)
}
//go:generate mockgen -source conn.go -destination ../mocks/conn.go -package mocks
type ISubscription interface{
NextMsgWithContext(ctx context.Context) (*nats.Msg, error)
Unsubscribe() error
}
func (conn *JetStreamConnection) run(ctx context.Context, sub ISubscription) error {
loop:
for {
select {
case <-ctx.Done():
break loop
default:
}
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
return err
}
msg.InProgress()
var message pnats.NatsMessage
if err := conn.unmarshaller(msg.Data, &message); err != nil {
msg.Term()
return err
}
actualSubject := fmt.Sprintf("%s.%s", subject, message.Context.Category)
subscriber, ok := callbacks[message.Context.Category]
if !ok {
msg.Nak()
continue
}
callback, err := subscriber(&message)
if err == nil {
msg.Ack()
} else {
msg.Nak()
return err
}
callback(ctx)
}
if err := sub.Unsubscribe(); err != nil {
return err
}
}
upd: if you still want to test SubscribeMultiple
, you can create a Runner that will have only one func Run
and take it as dependency for JetStreamConnection. Again, you can create a mock for Runner
and test with it