python amqp

python-mesa:Agent Based Model 简单教程 教程链接:https://mesa.readthedocs.io/en/latest/tutorials/intro_tutorial.html Mesa是用于构建,分析和可视化基于代理的模型(Agent Based Model)的模块化框架。

python-mesa:Agent Based Model 简单教程

教程链接:https://mesa.readthedocs.io/en/latest/tutorials/intro_tutorial.html

Mesa是用于构建,分析和可视化基于代理的模型(Agent Based Model)的模块化框架。

基于代理的模型是一种计算机模拟,涉及多个实体(代理),这些实体根据它们的编程行为互相作用和交互。代理可以用来代表活细胞,动物,个人,甚至整个组织或抽象实体。有时,我们可能会对系统中各个组件的行为有所认识,并希望研究各个部分在系统整体层面上产生的行为和效果;而在其他时候,我们可能希望研究系统的整体行为。
它适合研究各种模型仿真主体(代理人,也就是Agent)的合作与竞争关系。

安装:

pip install mesa

示例1:随机发红包游戏

假设有一个群,里面有十个人,游戏规则是这样的:
每个人初始有一元钱,游戏开始后,每轮每人随机选定群里的一个人(可以是自己)发一个1元红包。如果钱数为0,本轮就不用发。
游戏进行10轮,试着用代码描述这样的过程。
过程有三步:设置模型,添加调度器,进行仿真。
仿真主要有Agent和Model两类,一般在Agent中用面向对象的方式定义不同Agent中的竞争合作关系;而Model类用来管理若干个Agent,

本文完整代码链接:https://gitee.com/hzy15610046011/python_code_collection/tree/master/%E7%AE%97%E6%B3%95%E4%B8%8E%E5%BB%BA%E6%A8%A1/%E4%BB%BF%E7%9C%9F/mesa

from mesa import Agent, Model
from mesa.time import RandomActivation


class MoneyAgent(Agent):
    "带有固定初始财富的代理人Agent。"

    def __init__(self, id, model):
        super().__init__(id, model)  # 自身的id和归属的模型。
        self.wealth = 1

    def step(self):
        if (self.wealth == 0):
            return
        otherAgent = self.random.choice(self.model.schedule.agents)  
        # 在模型的多个Agent中,随机选出一个Agent
        otherAgent.wealth += 1  # 给他一块钱的红包。相当于是蒙特卡洛仿真一下。当然,过程结束之后谁得到多少钱是随机的。
        self.wealth -= 1

class MoneyModel(Model):
# 管理代理人的模型。
    def __init__(self, N):
        self.agentNum = N
        self.schedule = RandomActivation(self)
        # 创建Agents
        for i in range(self.agentNum):
            a = MoneyAgent(i, self)
            self.schedule.add(a)

    def step(self):
        self.schedule.step()


最后调用matplotlib进行绘图。绘出的图像是每位代理人的财富(纵轴)与人数(横轴)关系的直方图。

import matplotlib.pyplot as plt

agent_wealth = [a.wealth for a in model.schedule.agents]
plt.hist(agent_wealth)

这是输出的直方图
如果将这样的游戏反复进行一百次,每次游戏进行十轮。将每次游戏后每个人的钱数看做观察的样本,那么应该会产生100次×10人=1000个样本。这一千个样本中,金钱是怎样分布的?

我们不妨按照程序编一下,这部分完整代码见链接。
本文完整代码链接:https://gitee.com/hzy15610046011/python_code_collection/tree/master/%E7%AE%97%E6%B3%95%E4%B8%8E%E5%BB%BA%E6%A8%A1/%E4%BB%BF%E7%9C%9F/mesa
一千个样本频数分布。可见大概40%的人亏到0了
上面就是一千个样本频数分布。可见大概40%的人亏到0了

示例2:基于MESA的Web可视化模块的排队论模型(原创)

mesa库与netlogo类似,在内部的基础上,还有一个创建仿真界面的功能。这个仿真界面前端使用web技术,后端是基于python的tornado开发的,可以说是适应了Web技术大行其道的潮流吧。

当然这种技术也有缺点。缺点之一就是python和javascript的速度都不怎么样,比起基于java的netlogo还是力有不逮;其次,网络通信的速度也一般,最高仿真速率只有20fps,也就是说一秒钟刷新20次。

不多说了,先上代码吧。

from mesa import Agent
from mesa import Model
from mesa.datacollection import DataCollector
from mesa.space import Grid
from mesa.time import RandomActivation
import numpy as np
from mesa.visualization.modules import CanvasGrid, ChartModule, PieChartModule, TextElement
from mesa.visualization.ModularVisualization import ModularServer
from mesa.visualization.UserParam import UserSettableParameter
import mesa
print(mesa.__file__)
# 可以修改服务器函数中的一些功能.
class StatusElement(TextElement):
    '''
    显示当前服务的状态。.
    '''

    def __init__(self):
        pass

    def render(self, model):
        return "目前已经服务的人数: " + str(model.count_type(model, 'served'))


class QueueModel(Model):
    """
    一个简单的排队论模型。
    """

    def __init__(self, height=100, width=100, meanServiceTime=10, mtba=10):  # mtba的意思是平均到达的时间间隔
        # Initialize model parameters
        self.height = height
        self.width = width
        self.mtba = mtba
        self.meanServiceTime = meanServiceTime
        self.customerNum = 0
        self.interval = 0

        # Set up model objects
        self.schedule = RandomActivation(self)
        self.grid = Grid(width, height, torus=False)

        self.datacollector = DataCollector(
            {"serving": lambda m: self.count_type(m, "serving"),
             "served": lambda m: self.count_type(m, "served"),
             "waiting": lambda m: self.count_type(m, "waiting")})

        self.running = True
        self.datacollector.collect(self)
        
    def generateCustomer(self):

        c = Customer(self.customerNum, self, meanServiceTime=self.meanServiceTime)
        self.schedule.add(c)
        self.grid.place_agent(c, (0, 0))
        self.customerNum += 1
            
    
    def step(self):
        
        #for i in range(10):
        self.schedule.step()
        self.datacollector.collect(self)
        if (self.interval <= 0):  # 如果产生顾客的间隔归零
            self.generateCustomer()  # 就产生一个顾客
            self.interval = np.random.poisson(lam=self.mtba)
        else:
            self.interval -= 1

    @staticmethod
    def count_type(model, customer_condition):
        """
        Helper method to count trees in a given condition in a given model.
        """
        count = 0
        for customer in model.schedule.agents:
            if customer.condition == customer_condition:
                count += 1
        return count


class Customer(Agent):

    def __init__(self, pos, model, meanServiceTime):  # mtba是顾客到达的平均时间间隔(暂时是服从简单的random分布)

        super().__init__(pos, model)
        self.model = model
        self.pos = pos
        self.condition = "waiting"  # "serving","waiting","served"
        self.serviceTime = np.random.poisson(lam=meanServiceTime)  # 这一步是服务时间,可以做成随机生成的!

    def step(self):
        """
        If the tree is on fire, spread it to fine trees nearby.
        """
        if self.condition == "waiting":

            if (self.checkNextAgent() == False):  # 如果下个单元格没有顾客占用的话
                self.move(dx=1, dy=0)
                if self.pos[0] == 18:
                    self.condition = "serving"
        elif self.condition == "serving":
            self.serviceTime -= 1
            if (self.serviceTime <= 0):
                self.condition = "served"
                self.model.grid.remove_agent(self)

    def checkNextAgent(self):  # 检测下一个单元格是否有顾客占用。
        absoPos = self.convertAbsoPos(1, 0)
        nextAgent = self.model.grid.get_cell_list_contents([absoPos])
        if nextAgent == []:
            return False
        else:
            return True

    def convertAbsoPos(self, dx: int, dy: int):  # 输入相对这个Agent的位置返回绝对位置
        x, y = self.pos[0] + dx, self.pos[1] + dy
        width = self.model.grid.width
        if (x >= width):
            x -= width
        elif x < 0:
            x += width

        if (y >= width):
            y -= width
        elif y < 0:
            y += width

        return (x, y)

    def move(self, dx: int, dy: int):
        new_position = (self.pos[0] + dx, self.pos[1] + dy)  # 向右为正,向上为正。
        self.model.grid.move_agent(self, new_position)

    def get_pos(self):
        return self.pos


COLORS = {"serving": "#00AA00",
          "waiting": "#880000",
          "served": "#000000"}


def serviceTablePotrayal(customer):
    if customer is None:
        return
    
    portrayal = {"Shape": "rect", "w": 1, "h": 1, "Filled": "true", "Layer": 0}
    
    (x, y) = customer.get_pos()
    portrayal["x"] = x
    portrayal["y"] = y
    portrayal["Color"] = COLORS[customer.condition]
    return portrayal


height = 1
width = 20
canvas_element = CanvasGrid(serviceTablePotrayal, width, height, width * 20, height * 20)
customer_chart = ChartModule([{"Label": label, "Color": color} for (label, color) in COLORS.items()])
statusElement = StatusElement()
pie_chart = PieChartModule([{"Label": label, "Color": color} for (label, color) in COLORS.items()])
model_params = {
    "height": height,
    "width": width,
    "mtba": UserSettableParameter("slider", "顾客到达的平均时间(MTBA)", 10, 1, 20, 1),
    "meanServiceTime": UserSettableParameter("slider", "处理每位顾客要求所需的时间", 10, 1, 20, 1)
}
server = ModularServer(QueueModel, [canvas_element, customer_chart, statusElement], "Forest Fire", model_params)
server.launch()

# --------------------------------以下被注释掉的内容是不启动图形化界面也可以运行的。特点是仿真速度非常快。
#
# model= QueueModel(meanServiceTime=5,mtba=4.5)
# time = 0
# while(1):
#     time+=1
#     model.step()
#     print(time,"served: " + str(model.count_type(model,'served')),"waiting: " + str(model.count_type(model,'waiting')))

首先看原理。
QueueModel这个模型就是仿真模型了,而Agent就是Customer这个类。这些东西和示例一相同。

数据收集器

然后就是datacollector——这个数据收集器是做啥的呢?当然是收集当前的局部变量用的!定义顾客有三种状态,serving:正在被服务,served已经接受过服务,waiting:正在排队。就这三种状态。而datacollector的目的就是统计每种状态对应的人数:排队的,已经服务完了的,还有正在服务的。

然后写一下count_type方法,这个方法用于统计处于各种条件下的顾客人数。

排队本身的问题

在模型方面,写一下Model中的generateCustomer这个方法,用于生成顾客。生成顾客时,同时生成一个服从泊松分布的随机数self.interval。每次step的时候,随机数减一;减到零的时候,生成下一个顾客。

每个step中,若前方无人,则顾客向服务台运动一个单位。

着色与控件

定义网格画布控件:

def serviceTablePotrayal(tree):
    if tree is None:
        return
    portrayal = {"Shape": "rect", "w": 1, "h": 1, "Filled": "true", "Layer": 0}
    (x, y) = tree.get_pos()
    portrayal["x"] = x
    portrayal["y"] = y
    portrayal["Color"] = COLORS[tree.condition]
    return portrayal

height = 1
width = 20
canvas_element = CanvasGrid(serviceTablePotrayal, width, height, width * 20, height * 20)

首先分析ServiceTablePortrayal这个函数。这个函数中定义了portrayal,也就是每个色块(类似于netlogo中的turtle,作为可视化仿真的主体)的形状以及大小。比如图中的就是。形状为矩形、宽度高度都为1、有填充、位于0图层。
然后就是设置y和x的坐标代码了。显而易见调用了customer.get_pos()这个方法获取了坐标,然后刷新customer对应的色块的位置。

然后再分析一下画布的宽高数据。输入的width和height是有多少个格子的意思,而再往后的两个参数我输入的分别是width20和height20,这是啥意思呢?这个的意思是每个方格的宽度和高度,都是20像素!所以整个画布占用的像素点就是400*20。

插入图表

教程中提到过插入图表,于是就插入吧。具体插入方法由于我暂时也不太理解原理,所以在这里不讲,大概是在代码的160行左右插入了两个图表。删除图表之后其实也是可以运行的。

运行效果与速度优化

于是开始点击运行,效果如下:
在这里插入图片描述

这样自然可以运行,但是美中不足的就是最高帧率才20fps——和netlogo比起来岂不是要慢的螺旋升天了?不过不要急,办法还是有的。

python和java在速度上的差距或许很难弥补回来,但是在网络通信方面还是可以优化的。

为什么要限制在20fps?我读了一下源码,大概意思是这样的:Web页面的js脚本每隔一段时间就向服务端发送一个请求,这个间隔时间就是fps的倒数。想象一下一秒钟发20个请求就已经很了不起了吧——如果一秒钟几百几千个请求,岂不是要上天?

没事没事!有办法!

原理:当服务端接收到前端的请求后,驱动Model(目前的Model是QueueModel)的step方法执行一次。

那好办,只要是重写一个函数,让step多执行几次不就是了?

原来的step

    def step(self):
        
        #for i in range(10):
        self.schedule.step()
        self.datacollector.collect(self)
        if (self.interval <= 0):  # 如果产生顾客的间隔归零
            self.generateCustomer()  # 就产生一个顾客
            self.interval = np.random.poisson(lam=self.mtba)
        else:
            self.interval -= 1

改成下面这样:

    def step(self):
        for i in range(10):
            self.baseStep()
            
    
    def baseStep(self):
        
        #for i in range(10):
        self.schedule.step()
        self.datacollector.collect(self)
        if (self.interval <= 0):  # 如果产生顾客的间隔归零
            self.generateCustomer()  # 就产生一个顾客
            self.interval = np.random.poisson(lam=self.mtba)
        else:
            self.interval -= 1

这样一看,岂不是加速了十倍?不错不错!

加速效果:(由于执行时服务人数已经较多,所以看上去图线变化并不明显。但是如果看一下“已经服务的人数”这一栏中数字的变化,就能看出十分明显的差别!!)
在这里插入图片描述

当然要注意,加速的倍数不要太大,否则运算一次step的时间会长于两次请求之间的间隔,直接影响就是卡顿。我的电脑cpu是i5-6200u,当每秒刷新率为3fps时实测最大的加速倍数约为150~200倍,此时差不多能保证模型的流畅性。如果刷新率取最高200fps,恐怕还需要再降低一些。

知秋君
上一篇 2024-07-28 14:02
下一篇 2024-07-28 13:36

相关推荐