nodejs消费rabbitmq队列消息 nodejs
index.js
var amqp = require('amqplib/callback_api');
const MyConsume = require('./MyConsume');
amqp.connect('amqp://name:password!@localhost:5672/vhost', function (error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function (error1, channel) {
if (error1) {
throw error1;
}
var exchangeName = 'my_topic'; // 你的 exchange 名称
var exchangeType = 'topic'; // exchange 类型,可以是 direct、fanout、topic 等
var routingKey = 'my_routing_key'; // 你的 routing key
var queueName = 'my_queue'; // 你的队列名称
channel.assertExchange(exchangeName, exchangeType, { durable: true }, function(error2, ok) {
if (error2) {
throw error2;
}
console.log('Exchange ' + exchangeName + ' is ready');
// 声明一个队列
channel.assertQueue(queueName, { durable: true }, function(error3, q) {
if (error3) {
throw error3;
}
console.log('Queue ' +queueName + ' is created');
// 将队列绑定到 exchange,并指定 routing key
channel.bindQueue(queueName, exchangeName, routingKey);
console.log('Queue ' + queueName + ' is bound to Exchange ' + exchangeName + ' with routing key ' + routingKey);
// 在这里开始消费消息
channel.consume(queueName, function(msg) {
MyConsume.handleMessage(msg,channel);
}, {
noAck: false
});
});
});
});
});
MyConsume.js
const { UserModel } = require('./UserModel');
function handleMessage(msg,channel) {
UserModel.create({
user_name: 'Example Bookmark',
url: 'http://www.example.com',
type_id: 1,
order: 1,
is_recommend: true,
status: true
}).then(bookmark => {
console.log('New bookmark created:');
console.log(new Date());
}).catch(error => {
console.error('Error creating bookmark:', error);
});
// 在这里可以添加你的其他处理逻辑
channel.ack(msg)
}
module.exports = {
handleMessage: handleMessage
};
使用Mysql连接池,开10个mysql连接,消费3万rabbitmq消息,每条消息的逻辑是往mysql数据表写入一条数据,30秒写入完.
以上nodejs代码的写法我用php实现不到.哪怕是用swoole.如何用swoole来实现请高人指教.
Nodejs Http 阻塞业务接口 压测 nodejs
const http = require('http');
// 创建一个 HTTP 服务器
const server = http.createServer((req, res) => {
// 设置响应头
res.writeHead(200, {'Content-Type': 'application/json'});
// 定义接口路径
if (req.url === '/index') {
// 模拟返回的数据
const data = {
message: 'Hello, this is the API response!'
};
var currentTime = new Date();
data.message=currentTime;
// 将数据转换为 JSON 格式并发送响应
res.end(JSON.stringify(data));
} else if (req.url === '/google') {
const options = {
hostname: 'cn.bing.com',
path:'/HPImageArchive.aspx?format=js&idx=0&n=1',
port: 80,
method: 'GET',
timeout:1000,
};
// 模拟返回的数据
fetchDataFromServer(options, (error, data) => {
if (error) {
res.writeHead(200, {'Content-Type': 'application/json'});
res.end(JSON.stringify(error));
} else {
const googleData = {
message: data
};
res.writeHead(200, {'Content-Type': 'application/json'});
res.end(JSON.stringify(googleData));
}
});
} else {
// 处理未知路径的请求
res.writeHead(200, {'Content-Type': 'application/json'});
res.end('Not Found');
}
});
function fetchDataFromServer(options, callback) {
const req = http.request(options, (res) => {
let data = '';
// 接收响应数据
res.on('data', (chunk) => {
data += chunk;
});
// 响应完成后处理数据
res.on('end', () => {
// 调用回调函数并传递获取的数据
callback(null, data);
});
});
// 处理请求错误
req.on('error', (e) => {
// 调用回调函数并传递错误信息
callback(e, null);
});
// 结束请求
req.end();
}
// 监听端口
const port = 3000;
server.listen(port, () => {
console.log(`Server is running on http://localhost:${port}`);
});
阻塞接口压测
wrk -c 100 -t 10 -d 15s http://127.0.0.1:3000/google
QPS几十
同时压测普通接口
wrk -c 100 -t 10 -d 10s http://127.0.0.1:3000/index
QPS 3万左右,没有明显影响.
golang的面向对象
php 代码
<?php
class Cat {
private $age;
private $color;
public function __construct($age, $color) {
$this->age = $age;
$this->color = $color;
}
public function getAge() {
return $this->age;
}
public function getColor() {
return $this->color;
}
public function eat() {
echo "Cat is eating.\n";
}
public function sleep() {
echo "Cat is sleeping.\n";
}
}
$myCat = new Cat(3, 'brown');
echo "Age: " . $myCat->getAge() . "\n";
echo "Color: " . $myCat->getColor() . "\n";
$myCat->eat();
$myCat->sleep();
?>
golang
package main
import "fmt"
// Cat ç»æä½
type Cat struct {
age int
color string
}
func NewCat(age int, color string) *Cat {
return &Cat{age, color}
}
func (c *Cat) GetAge() int {
return c.age
}
func (c *Cat) GetColor() string {
return c.color
}
func (c *Cat) Eat() {
fmt.Println("Cat is eating.")
}
func (c *Cat) Sleep() {
fmt.Println("Cat is sleeping.")
}
func main() {
myCat := NewCat(3, "brown")
fmt.Println("Age:", myCat.GetAge())
fmt.Println("Color:", myCat.GetColor())
myCat.Eat()
myCat.Sleep()
}
xxl-job-admin 大量定时任务导致linux inodes满了
日志
/var/www/java/xxl-job
删除文件
php rabbitmq 队列持久化,消息持久化 php
ubuntu停止rabbitmq
service rabbitmq-server stop
ubuntu启动rabbitmq
service rabbitmq-server start
queue队列持久化
$channel->queue_declare($queue, false, true, false, false);
durable = true 队列持久化
message消息持久化
$msg = new AMQPMessage('Hello World!'.$time, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
重启 rabbitmq-server
queue和ready message依然存在
云服务器放通内网 网络
安全组设置
放通内网
来源 端口 策略 备注
10.0.0.0/8 ALL 允许 放通内网
172.16.0.0/12 ALL 允许 放通内网
192.168.0.0/16 ALL 允许 放通内网
放通80端口给所有外网访问
来源 端口 策略 备注
ipv4
0.0.0.0/0 TCP:80 允许 web
ipv6
::/0 TCP:80 允许 web
gpt解释
172.16.0.0/12 是一个私有的 IPv4 地址范围1。在这个表示法中,/12 表示网络号位数23。这意味着前12位是网络号,其余的是主机号2。这个范围的地址是 172.16.0.0 到 172.31.255.25524。这些地址通常被用于内部网络,不会被路由器转发到互联网上1。这是由 RFC(由很多专家制定的规范标准)指定的2。具体来说,172.16.0.0/12 的子网掩码是 255.240.0.034。这个子网掩码是通过取二进制表示的前12位为1,其余位为0,然后转换为十进制得到的3。例如,二进制的 11111111.11110000.00000000.00000000 转换为十进制就是 255.240.0.03。这样,我们就可以知道一个 IP 地址是否在这个范围内1。希望这个解释对你有所帮助!
centos8安装多版本php,remi-php php
安装
https://itlanyan.com/install-newest-php7-and-php8-with-remi-repo/
完全卸载php
https://www.cnblogs.com/NeigeBolg/p/13395128.html
php配置位置
/etc/opt/remi/php73/
软连接 php -v
ln -s /opt/remi/php82/root/usr/bin/php /usr/bin/php
ln -s /opt/remi/php82/root/usr/bin/php-cgi /usr/bin/php-cgi
ln -s /opt/remi/php82/root/usr/bin/phar.phar /usr/bin/php.phar
启动 对应版本php-fpm
service php73-php-fpm restart
swoole的协程能否减少单个数据库语句查询的时间呢? php
问: 如果一个接口需要查询数据库并返回数据.而这个数据库查询语句需要1秒时间.那么swoole的协程能否减少请求时间呢?
答:
Swoole的协程可以帮助提高并发处理能力,但它并不能直接减少单个数据库查询的执行时间。如果你的数据库查询需要1秒钟,那么无论是否使用协程,这个查询本身的执行时间都是1秒钟。
然而,Swoole的协程可以在等待这个查询结果的同时,去执行其他的任务。例如,如果你有10个这样的查询需要执行,而且这些查询可以并行执行,那么使用Swoole的协程,理论上你可以在大约1秒钟的时间内完成所有的查询,因为这些查询是同时进行的。而如果不使用协程,你可能需要10秒钟的时间,因为每个查询都需要等待上一个查询完成后才能开始。
所以,虽然Swoole的协程不能减少单个查询的执行时间,但它可以通过并行处理来提高整体的处理能力,从而在处理多个查询时,可以显著减少总的等待时间。
golang 处理http请求 并发测试 golang
目的测试慢接口是否会阻塞服务,导致其他正常接口无法请求.
测试代码
package main
import (
"fmt"
"net/http"
"time"
)
func helloHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, World")
}
func sleepHandler(w http.ResponseWriter, r *http.Request) {
time.Sleep(2 * time.Second)
fmt.Fprintf(w, "you are sleeping")
}
func main() {
http.HandleFunc("/", helloHandler)
http.HandleFunc("/sleep", sleepHandler)
fmt.Println("Starting server at port 8081")
if err := http.ListenAndServe(":8081", nil); err != nil {
fmt.Println("Failed to start server:", err)
}
}
方法如下:
A接口sleep2秒,
B接口hello world,
单独压测hello word接口
wrk -c 1000 -t 10 -d 10s http://127.0.0.1:8081
QPS 315572.06
同时压测 sleep接口 和 hello world接口
wrk -c 1000 -t 10 -d 15s http://127.0.0.1:8081/sleep
wrk -c 1000 -t 10 -d 10s http://127.0.0.1:8081
sleep接口 QPS 463.59 全部timeout
hello world接口 QPS 329326.68 几乎没有被sleep接口影响.
同样的测试,来测试php-fpm,大概就知道php-fpm有什么性能上的问题.
还有人在说什么 php项目的瓶颈是数据库性能,之类.
用了swoole框架,哪怕某些接口数据库查询慢,也就单单是那接口慢,不会影响到其他的接口
swoole框架真的比php-fpm要高并发吗? php
之前写了一篇文章,做了一个粗疏的测试.来证明,在慢数据库查询下,swoole跟php-fpm的并发数一样地少.相差无几.经过网友的讨论和指教.我发现我的结论是错误的且有可能误导其他开发者.
所以现在找一个更加贴近现实的生产环境的测试方法.这里先说结论,swoole是真正能解决php项目性能差,并发低的方案之一.
这里首先要说php-fpm的多进程模式,php-fpm的处理方法.只会一个请求一个请求地处理.怎么证明这个事情?可以写一个sleep(30)秒的接口.开2个静态进程.快速请求两次.然后再去请求其他没有阻塞的接口.看看是不是连其他正常的接口也请求不了.如果是,就能说明php-fpm的处理方法.
但php-fpm的进程是独立.严格上说一个进程慢,需要时间处理,是不会影响其他进程的响应时间的.你可以这样试,同样开两个php-fpm进程,请求一次sleep(30)秒的接口.然后同时压测另外正常的接口.QPS还是有可能上千的.但如果慢的接口的请求把所有的php-fpm进程都占用了,就会影响都其他正常接口的响应速度.出现类似 "阻塞" 的现象.
这个情况也一般php项目速度慢的情况.我认为在介绍,swoole的好处时候.说php-fpm开销大,每次载入框架代码这些其实意义不大.因为一个接口用php-fpm写响应速度是 100ms.换成swoole框架能提高 到80ms.意义不是特别大.
压测方法,
写两个接口,一个等待一秒返回,另一个hello world.同时压测.这样的目的是模拟生产环境.有个别的接口特别慢.拖慢了整个项目的请求速度.
php-fpm sleep代码
sleep(1)
hyperf sleep1秒接口
try {
$Client= new Client();
$Client->get("https://www.google.com",["timeout"=>1]);
}catch (\Exception $exception ){
}
hello world代码
echo "hello word"
thinkphp8 php-fpm 静态模式 2个进程
单独压测hello world接口
wrk -c 100 -t 10 -d 10s http://tp8.localhost/index/index
QPS 1871
sleep接口和hello world接口,同时压.
wrk -c 10 -t 10 -d 10s http://tp8.localhost/sleep/index/
wrk -c 100 -t 10 -d 10s http://tp8.localhost/index/index
hello world接口 QPS 19.97
hyperf 开2个进程
hyperf 不用sleep模拟慢接口,用http请求来模拟.
hello world接口单独压测
wrk -c 100 -t 10 -d 10s http://127.0.0.1:9501/index
QPS 58576
sleep接口和hello world接口,同时压.
wrk -c 10 -t 10 -d 10s http://127.0.0.1:9501/sleep
wrk -c 100 -t 10 -d 10s http://127.0.0.1:9501/index
hello world接口 QPS 56826
hello world接口几乎没有受慢接口影响
得出结论
单个接口请求,php-fpm跟swoole也许有10倍的差距.但是这个不是重点.重点是采用swoole框架写的项目整体不会被慢的接口拖慢.这个比较有意义.
这里说的swoole性能比php-fpm强10倍.不是说一个接口用来用php-fpm请求时间是1秒,改用了swoole框架就变成了0.1秒.
而是说某些情况.swoole的并发数可以比php-fpm高10倍.
为什么php-fpm会出现"阻塞"的情况.和什么情况下才会出现阻塞.
php-fpm进程数是20,一个sleep(1)的接口,每秒并发请求数为20.这样就会把20个进程用占用.而其他接口的请求就会出现"阻塞"的情况.因为没有进程可用了,但如果并发是10去请求慢接口.还有剩下10个进程去处理其他接口的请求.这时候就不会出现"阻塞".
google请求解释
$Client->get("https://www.google.com",["timeout"=>1]);
解释一下请求google地址,在墙内一定是请求不到的.设置1秒超时.为了代替php-fpm的sleep1秒.