This commit is contained in:
@@ -73,9 +73,16 @@ func runSingleWorker(ctx context.Context, rdb *redis.Client, consumerID int, sc
|
||||
log.Error().Err(err).Msg("Failed to unmarshal payload")
|
||||
continue
|
||||
}
|
||||
storageKeys := make([]string, len(data))
|
||||
for i, item := range data {
|
||||
storageKeys[i] = item.StorageKey
|
||||
storageKeys := make([]string, 0, len(data))
|
||||
for _, item := range data {
|
||||
if item != nil && item.StorageKey != "" {
|
||||
storageKeys = append(storageKeys, item.StorageKey)
|
||||
}
|
||||
}
|
||||
if len(storageKeys) == 0 {
|
||||
log.Info().Str("worker", consumerName).Msg("No valid storage keys found in bulk delete payload")
|
||||
rdb.XAck(ctx, constants.StreamStorageName, constants.GroupStorageName, message.ID)
|
||||
continue
|
||||
}
|
||||
log.Info().
|
||||
Str("worker", consumerName).
|
||||
|
||||
Reference in New Issue
Block a user