Skip to content
Permalink
9bfb9ba527
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
91 lines (76 sloc) 2.92 KB
parser = require "./parser"
Events = require "./Events"
Scripts = require "./Scripts"
class RedisConnection
datastore: "redis"
defaults:
Redis: null
clientOptions: {}
client: null
Promise: Promise
Events: null
constructor: (options={}) ->
parser.load options, @defaults, @
@Redis ?= eval("require")("redis") # Obfuscated or else Webpack/Angular will try to inline the optional redis module. To override this behavior: pass the redis module to Bottleneck as the 'Redis' option.
@Events ?= new Events @
@terminated = false
@client ?= @Redis.createClient @clientOptions
@subscriber = @client.duplicate()
@limiters = {}
@shas = {}
@ready = @Promise.all [@_setup(@client, false), @_setup(@subscriber, true)]
.then => @_loadScripts()
.then => { @client, @subscriber }
_setup: (client, sub) ->
client.setMaxListeners 0
new @Promise (resolve, reject) =>
client.on "error", (e) => @Events.trigger "error", e
if sub
client.on "message", (channel, message) =>
@limiters[channel]?._store.onMessage channel, message
if client.ready then resolve()
else client.once "ready", resolve
_loadScript: (name) ->
new @Promise (resolve, reject) =>
payload = Scripts.payload name
@client.multi([["script", "load", payload]]).exec (err, replies) =>
if err? then return reject err
@shas[name] = replies[0]
resolve replies[0]
_loadScripts: -> @Promise.all(Scripts.names.map (k) => @_loadScript k)
__runCommand__: (cmd) ->
await @ready
new @Promise (resolve, reject) =>
@client.multi([cmd]).exec_atomic (err, replies) ->
if err? then reject(err) else resolve(replies[0])
__addLimiter__: (instance) ->
@Promise.all [instance.channel(), instance.channel_client()].map (channel) =>
new @Promise (resolve, reject) =>
handler = (chan) =>
if chan == channel
@subscriber.removeListener "subscribe", handler
@limiters[channel] = instance
resolve()
@subscriber.on "subscribe", handler
@subscriber.subscribe channel
__removeLimiter__: (instance) ->
@Promise.all [instance.channel(), instance.channel_client()].map (channel) =>
unless @terminated
await new @Promise (resolve, reject) =>
@subscriber.unsubscribe channel, (err, chan) ->
if err? then return reject err
if chan == channel then return resolve()
delete @limiters[channel]
__scriptArgs__: (name, id, args, cb) ->
keys = Scripts.keys name, id
[@shas[name], keys.length].concat keys, args, cb
__scriptFn__: (name) ->
@client.evalsha.bind(@client)
disconnect: (flush=true) ->
clearInterval(@limiters[k]._store.heartbeat) for k in Object.keys @limiters
@limiters = {}
@terminated = true
@client.end flush
@subscriber.end flush
@Promise.resolve()
module.exports = RedisConnection