Home > Net >  Testing Nats subscription in Go
Testing Nats subscription in Go

Time:08-05

I have a function designed to listen to a Nats subject and route the messages as it receives them:

func (conn *JetStreamConnection) SubscribeMultiple(ctx context.Context, subject string,
    subscribers ...*SubscriptionCallback) error {

    callbacks := make(map[string]func(*pnats.NatsMessage) (func(context.Context), error))
    for _, subscriber := range subscribers {
        callbacks[subscriber.Category] = subscriber.Callback
    }

    fullSubject := fmt.Sprintf("%s.*", subject)
    sub, err := conn.context.SubscribeSync(fullSubject, nats.Context(ctx))
    if err != nil {
        return err
    }

loop:
    for {

        select {
        case <-ctx.Done():
            break loop
        default:
        }

        msg, err := sub.NextMsgWithContext(ctx)
        if err != nil {
            return err
        }

        msg.InProgress()

        var message pnats.NatsMessage
        if err := conn.unmarshaller(msg.Data, &message); err != nil {
            msg.Term()
            return err
        }

        actualSubject := fmt.Sprintf("%s.%s", subject, message.Context.Category)
        subscriber, ok := callbacks[message.Context.Category]
        if !ok {
            msg.Nak()
            continue
        }

        callback, err := subscriber(&message)
        if err == nil {
            msg.Ack()
        } else {
            msg.Nak()
            return err
        }

        callback(ctx)
    }

    if err := sub.Unsubscribe(); err != nil {
        return err
    }

    return nil
}

My problem is that, since the SubscribeSync function produces a *nats.Subscription object, I have no way to mock out the test. How can I test around this object?

CodePudding user response:

You can put your loop in a separate function. This func can accept an interface that describes nats Subscription instead of *nats.Subscription. This way you will be able to create Subscription mocks with gomock or other tools. After that you can test the inside func separately

Something like this:

func (conn *JetStreamConnection) SubscribeMultiple(ctx context.Context, subject string,
    subscribers ...*SubscriptionCallback) error {

    callbacks := make(map[string]func(*pnats.NatsMessage) (func(context.Context), error))
    for _, subscriber := range subscribers {
        callbacks[subscriber.Category] = subscriber.Callback
    }

    fullSubject := fmt.Sprintf("%s.*", subject)
    sub, err := conn.context.SubscribeSync(fullSubject, nats.Context(ctx))
    if err != nil {
        return err
    }

    return run(ctx, sub)
}

//go:generate mockgen -source conn.go -destination ../mocks/conn.go -package mocks
type ISubscription interface{
    NextMsgWithContext(ctx context.Context) (*nats.Msg, error)
    Unsubscribe() error
}

func (conn *JetStreamConnection) run(ctx context.Context, sub ISubscription) error {
loop:
    for {

        select {
        case <-ctx.Done():
            break loop
        default:
        }

        msg, err := sub.NextMsgWithContext(ctx)
        if err != nil {
            return err
        }

        msg.InProgress()

        var message pnats.NatsMessage
        if err := conn.unmarshaller(msg.Data, &message); err != nil {
            msg.Term()
            return err
        }

        actualSubject := fmt.Sprintf("%s.%s", subject, message.Context.Category)
        subscriber, ok := callbacks[message.Context.Category]
        if !ok {
            msg.Nak()
            continue
        }

        callback, err := subscriber(&message)
        if err == nil {
            msg.Ack()
        } else {
            msg.Nak()
            return err
        }

        callback(ctx)
    }

    if err := sub.Unsubscribe(); err != nil {
        return err
    }
}

upd: if you still want to test SubscribeMultiple, you can create a Runner that will have only one func Run and take it as dependency for JetStreamConnection. Again, you can create a mock for Runner and test with it

  • Related