nodejs消费rabbitmq队列消息 nodejs

index.js

var amqp = require('amqplib/callback_api');

const MyConsume = require('./MyConsume');

amqp.connect('amqp://name:password!@localhost:5672/vhost', function (error0, connection) {
    if (error0) {
        throw error0;
    }
    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }

        var exchangeName = 'my_topic'; // 你的 exchange 名称
        var exchangeType = 'topic'; // exchange 类型,可以是 direct、fanout、topic 等
        var routingKey = 'my_routing_key'; // 你的 routing key
        var queueName = 'my_queue'; // 你的队列名称

        channel.assertExchange(exchangeName, exchangeType, { durable: true }, function(error2, ok) {
            if (error2) {
                throw error2;
            }
            console.log('Exchange ' + exchangeName + ' is ready');

            // 声明一个队列
            channel.assertQueue(queueName, { durable: true  }, function(error3, q) {
                if (error3) {
                    throw error3;
                }
                console.log('Queue ' +queueName + ' is created');

                // 将队列绑定到 exchange,并指定 routing key
                channel.bindQueue(queueName, exchangeName, routingKey);
                console.log('Queue ' + queueName + ' is bound to Exchange ' + exchangeName + ' with routing key ' + routingKey);

                // 在这里开始消费消息
                channel.consume(queueName, function(msg) {
                  MyConsume.handleMessage(msg,channel);
                }, {
                    noAck: false
                });
            });
        });
    });
});

MyConsume.js

const { UserModel } = require('./UserModel');

function handleMessage(msg,channel) {
    UserModel.create({
        user_name: 'Example Bookmark',
        url: 'http://www.example.com',
        type_id: 1,
        order: 1,
        is_recommend: true,
        status: true
      }).then(bookmark => {
        console.log('New bookmark created:');
        console.log(new Date());
      }).catch(error => {
        console.error('Error creating bookmark:', error);
      });
    // 在这里可以添加你的其他处理逻辑
    channel.ack(msg)
}

module.exports = {
    handleMessage: handleMessage
};

使用Mysql连接池,开10个mysql连接,消费3万rabbitmq消息,每条消息的逻辑是往mysql数据表写入一条数据,30秒写入完.

以上nodejs代码的写法我用php实现不到.哪怕是用swoole.如何用swoole来实现请高人指教.

标签: rabbitmq

xiang 发布于  2024-1-29 10:06 

Nodejs Http 阻塞业务接口 压测 nodejs

const http = require('http');

// 创建一个 HTTP 服务器
const server = http.createServer((req, res) => {
  // 设置响应头
  res.writeHead(200, {'Content-Type': 'application/json'});

  // 定义接口路径
  if (req.url === '/index') {
    // 模拟返回的数据
    const data = {
      message: 'Hello, this is the API response!'
    };

    var currentTime = new Date();

    data.message=currentTime;

    // 将数据转换为 JSON 格式并发送响应
    res.end(JSON.stringify(data));
  } else if (req.url === '/google') {
    const options = {
        hostname: 'cn.bing.com',
        path:'/HPImageArchive.aspx?format=js&idx=0&n=1',
        port: 80,
        method: 'GET',
        timeout:1000,
      };

    // 模拟返回的数据
     fetchDataFromServer(options, (error, data) => {
        if (error) {
            res.writeHead(200, {'Content-Type': 'application/json'});
            res.end(JSON.stringify(error));

        } else {
            const googleData = {
                message: data
              };
            res.writeHead(200, {'Content-Type': 'application/json'});
            res.end(JSON.stringify(googleData));

        }
      });

  } else {
    // 处理未知路径的请求
    res.writeHead(200, {'Content-Type': 'application/json'});
    res.end('Not Found');
  }
});

function fetchDataFromServer(options, callback) {
    const req = http.request(options, (res) => {
      let data = '';

      // 接收响应数据
      res.on('data', (chunk) => {
        data += chunk;
      });

      // 响应完成后处理数据
      res.on('end', () => {
        // 调用回调函数并传递获取的数据
        callback(null, data);
      });
    });

    // 处理请求错误
    req.on('error', (e) => {
      // 调用回调函数并传递错误信息
      callback(e, null);
    });

    // 结束请求
    req.end();
  }

// 监听端口
const port = 3000;
server.listen(port, () => {
  console.log(`Server is running on http://localhost:${port}`);
});

阻塞接口压测

wrk -c 100 -t 10 -d 15s http://127.0.0.1:3000/google

QPS几十

同时压测普通接口

wrk -c 100 -t 10 -d 10s http://127.0.0.1:3000/index

QPS 3万左右,没有明显影响.


xiang 发布于  2024-1-29 10:06 

golang的面向对象

php 代码

<?php

class Cat {
    private $age;
    private $color;

    public function __construct($age, $color) {
        $this->age = $age;
        $this->color = $color;
    }

    public function getAge() {
        return $this->age;
    }

    public function getColor() {
        return $this->color;
    }

    public function eat() {
        echo "Cat is eating.\n";
    }

    public function sleep() {
        echo "Cat is sleeping.\n";
    }
}

$myCat = new Cat(3, 'brown');
echo "Age: " . $myCat->getAge() . "\n";
echo "Color: " . $myCat->getColor() . "\n";
$myCat->eat();
$myCat->sleep();

?>

golang

package main

import "fmt"

// Cat 结构体
type Cat struct {
    age   int
    color string
}

func NewCat(age int, color string) *Cat {
    return &Cat{age, color}
}

func (c *Cat) GetAge() int {
    return c.age
}

func (c *Cat) GetColor() string {
    return c.color
}

func (c *Cat) Eat() {
    fmt.Println("Cat is eating.")
}

func (c *Cat) Sleep() {
    fmt.Println("Cat is sleeping.")
}

func main() {

    myCat := NewCat(3, "brown")
    fmt.Println("Age:", myCat.GetAge())
    fmt.Println("Color:", myCat.GetColor())
    myCat.Eat()
    myCat.Sleep()
}

xiang 发布于  2024-1-19 22:57 

xxl-job-admin 大量定时任务导致linux inodes满了

日志
/var/www/java/xxl-job

删除文件


xiang 发布于  2024-1-16 11:18 

php rabbitmq 队列持久化,消息持久化 php

ubuntu停止rabbitmq

service rabbitmq-server stop

ubuntu启动rabbitmq

service rabbitmq-server start

queue队列持久化

  $channel->queue_declare($queue, false, true, false, false);

durable = true 队列持久化


message消息持久化

 $msg = new AMQPMessage('Hello World!'.$time, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

重启 rabbitmq-server

queue和ready message依然存在

标签: rabbitmq

xiang 发布于  2024-1-13 10:27 

云服务器放通内网 网络

安全组设置

放通内网

来源 端口 策略 备注
10.0.0.0/8 ALL 允许 放通内网

172.16.0.0/12 ALL 允许 放通内网

192.168.0.0/16 ALL 允许 放通内网


放通80端口给所有外网访问

来源 端口 策略 备注
ipv4
0.0.0.0/0 TCP:80 允许 web

ipv6
::/0 TCP:80 允许 web


gpt解释
172.16.0.0/12 是一个私有的 IPv4 地址范围1。在这个表示法中,/12 表示网络号位数23。这意味着前12位是网络号,其余的是主机号2。这个范围的地址是 172.16.0.0 到 172.31.255.25524。这些地址通常被用于内部网络,不会被路由器转发到互联网上1。这是由 RFC(由很多专家制定的规范标准)指定的2。具体来说,172.16.0.0/12 的子网掩码是 255.240.0.034。这个子网掩码是通过取二进制表示的前12位为1,其余位为0,然后转换为十进制得到的3。例如,二进制的 11111111.11110000.00000000.00000000 转换为十进制就是 255.240.0.03。这样,我们就可以知道一个 IP 地址是否在这个范围内1。希望这个解释对你有所帮助!


xiang 发布于  2024-1-11 10:28 

centos8安装多版本php,remi-php php

安装
https://itlanyan.com/install-newest-php7-and-php8-with-remi-repo/

完全卸载php
https://www.cnblogs.com/NeigeBolg/p/13395128.html

php配置位置
/etc/opt/remi/php73/

软连接 php -v
ln -s /opt/remi/php82/root/usr/bin/php /usr/bin/php

ln -s /opt/remi/php82/root/usr/bin/php-cgi /usr/bin/php-cgi

ln -s /opt/remi/php82/root/usr/bin/phar.phar /usr/bin/php.phar

启动 对应版本php-fpm
service php73-php-fpm restart

标签: php

xiang 发布于  2024-1-2 16:23 

swoole的协程能否减少单个数据库语句查询的时间呢? php

问: 如果一个接口需要查询数据库并返回数据.而这个数据库查询语句需要1秒时间.那么swoole的协程能否减少请求时间呢?

答:
Swoole的协程可以帮助提高并发处理能力,但它并不能直接减少单个数据库查询的执行时间。如果你的数据库查询需要1秒钟,那么无论是否使用协程,这个查询本身的执行时间都是1秒钟。

然而,Swoole的协程可以在等待这个查询结果的同时,去执行其他的任务。例如,如果你有10个这样的查询需要执行,而且这些查询可以并行执行,那么使用Swoole的协程,理论上你可以在大约1秒钟的时间内完成所有的查询,因为这些查询是同时进行的。而如果不使用协程,你可能需要10秒钟的时间,因为每个查询都需要等待上一个查询完成后才能开始。

所以,虽然Swoole的协程不能减少单个查询的执行时间,但它可以通过并行处理来提高整体的处理能力,从而在处理多个查询时,可以显著减少总的等待时间。

标签: swoole

xiang 发布于  2023-12-28 15:34 

golang 处理http请求 并发测试 golang

目的测试慢接口是否会阻塞服务,导致其他正常接口无法请求.

测试代码

package main

import (
    "fmt"
    "net/http"
    "time"
)

func helloHandler(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, "Hello, World")
}

func sleepHandler(w http.ResponseWriter, r *http.Request) {
    time.Sleep(2 * time.Second)
    fmt.Fprintf(w, "you are sleeping")
}

func main() {
    http.HandleFunc("/", helloHandler)

    http.HandleFunc("/sleep", sleepHandler)

    fmt.Println("Starting server at port 8081")
    if err := http.ListenAndServe(":8081", nil); err != nil {
        fmt.Println("Failed to start server:", err)
    }
}

方法如下:

A接口sleep2秒,
B接口hello world,

单独压测hello word接口
wrk -c 1000 -t 10 -d 10s http://127.0.0.1:8081

QPS 315572.06

同时压测 sleep接口 和 hello world接口
wrk -c 1000 -t 10 -d 15s http://127.0.0.1:8081/sleep
wrk -c 1000 -t 10 -d 10s http://127.0.0.1:8081

sleep接口 QPS 463.59 全部timeout
hello world接口 QPS 329326.68 几乎没有被sleep接口影响.

同样的测试,来测试php-fpm,大概就知道php-fpm有什么性能上的问题.
还有人在说什么 php项目的瓶颈是数据库性能,之类.
用了swoole框架,哪怕某些接口数据库查询慢,也就单单是那接口慢,不会影响到其他的接口

标签: golang 压测

xiang 发布于  2023-12-28 14:22