This document describes cell 0.0. For development docs, go here.
Adder¶
Actor that can add one to a given number. Adder actor can be used to implement a Counter.
from kombu import Connection
connection = Connection()
agent = dAgent(connection)
class Adder(Actor):
class state():
def add_one(self, i):
print 'Increasing %s with 1' % i
return i + 1
if __name__=='__main__':
import examples.adder
adder = agent.spawn(Adder)
adder.call('add-one', {'i':10})
Chat-users¶
from cell.actors import Actor
from cell.agents import dAgent
connection = Connection()
class User(Actor):
class state():
def post(self, msg):
print msg
def post(self, msg):
msg = 'Posting on the wall: %s' % msg
self.scatter('post', {'msg': msg})
def message_to(self, actor, msg):
a = User(id = actor, connection = self.connection)
msg = 'Actor %s is sending you a message: %s' %(self.id, msg)
a.call('post', {'msg':msg})
def connect(self):
if not agent:
agent = dAgent(self.connection)
return self.agent.spawn(self)
if __name__=='__main__':
import examples.chat
rumi = examples.chat.User(connection).connect()
rumi.post('Hello everyone')
ask = examples.chat.User(connection).connect()
ask.post('Hello everyone')
rumi.message_to(ask.id, 'How are you?')
ask.message_to(rumi.id, 'Fine.You?')
Map-reduce¶
import celery
from cell.actors import Actor
from cell.agents import dAgent
my_app = celery.Celery(broker='pyamqp://guest@localhost//')
agent = dAgent(connection=my_app.broker_connection())
class Aggregator(Actor):
def __init__(self, barrier=None, **kwargs):
self.barrier = barrier
super(Aggregator, self).__init__(**kwargs)
class state(Actor.state):
def __init__(self):
self.result = {}
super(Aggregator.state, self).__init__()
def aggregate(self, words):
for word, n in words.iteritems():
self.result.setdefault(word, 0)
self.result[word] += n
self.actor.barrier -= 1
if self.actor.barrier <= 0:
self.print_result()
def print_result(self):
for (key, val) in self.result.iteritems():
print "%s:%s" % (key, val)
class Reducer(Actor):
class state(Actor.state):
def __init__(self):
self.aggregator = None
super(Reducer.state, self).__init__()
def on_agent_ready(self):
self.aggregator = Aggregator(connection=self.actor.connection)
def count_lines(self, line, aggregator):
words = {}
for word in line.split(" "):
words.setdefault(word, 0)
words[word] += 1
self.aggregator.id = aggregator
self.aggregator.call('aggregate', {'words': words})
def on_agent_ready(self):
self.state.on_agent_ready()
class Mapper(Actor):
class state(Actor.state):
REDUCERS = 10
def on_agent_ready(self):
self.pool = []
for i in range(self.REDUCERS):
reducer = self.actor.agent.spawn(Reducer)
self.pool.append(reducer)
def count_document(self, file):
with open(file) as f:
lines = f.readlines()
count = 0
self.aggregator = agent.spawn(Aggregator, barrier=len(lines))
for line in lines:
reducer = self.pool[count % self.REDUCERS]
reducer.cast('count_lines',
{'line': line,
'aggregator': self.aggregator.id})
def on_agent_ready(self):
self.state.on_agent_ready()
if __name__ == '__main__':
import examples.map_reduce
file = "map_reduce_test.txt"
mapper = agent.spawn(examples.map_reduce.Mapper)
mapper.call('count_document', {'file': file})