Introduction

Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library (Documents).
RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that the letter carrier will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office, and a letter carrier (see tutorial).

This article is about using Pick connecting to RabbitMQ then send and receive Message queue

Prerequisities

Install Pika

First be sure that python installed in your environment.
then install pika:

pip install pika

Run RabbitMQ docker image

It is suggested to use official docker image of RabbitMQ.
Search possible RabbitMQ image:

docker search rabbitmq

Pull the corresponding RabbitMQ image:

docker pull rabbitmq

Then run the image:

docker run -d -p 15672:15672  -p  5672:5672  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq --hostname=rabbitmqhostone  rabbitmq:latest

Parameters:

  • 15672 :indicates the port number of the RabbitMQ console, which can be used to perform RabbitMQ-related operations through the console in the browser。
  • 5672 : represents the TCP port number monitored by RabbitMQ. The application can establish a TCP connection with RabbitMQ through this port to complete subsequent asynchronous message communication
  • RABBITMQ_DEFAULT_USER:used to set the user name for logging in to the console, here setted as admin
  • RABBITMQ_DEFAULT_PASS:used to set the password for logging in to the console, here setted as admin

After the container is successfully started, you can enter the address in the browser: http://ip:15672/ to access the console.
My server is an digitalocean server, so I need to open the port, otherwise it will not be accessible.
There is a very important question here: RabbitMQ can only access localhost:15762 by default for security reasons. If you want to use other ip, you need to configure it yourself.

some Issues

I encountered this problem after restarting the computer today. After the rabbitmq service was started, the localhost:15672 could not be accessed.At the same time, the project could not connect to the rabbitmq service. After researching for a long time, it was finally solved. Here I make notes just for reference.
Try:

rabbitmq-plugins enable rabbitmq_management

to re-enable the plugins. This trick can solve 80% of the problems.

Sending

Send
The following naive send.py will send a single message to the queue. The first thing we need to do is to establish a connection with RabbitMQ server.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

Sending example with Sqlite3

#!/usr/bin/env python
import pika
import sqlite3
import K_lines
import json
import time
conn = sqlite3.connect('klines1.db')

start_0 = 1631141776000
credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672,'/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')

for i in range(0,14):  # 14x7 days
    filename = "klines" + str(14-i) + ".db" # 7 days as one db files
    conn = sqlite3.connect(filename)
    start = start_0 + i * 7 * 24 * 60 * 60 * 1000
    for index in range(0,10080):   # 7 days
        timest = index*60*1000+start
        cursor = conn.execute("SELECT PAIR,INTERVAL,OPEN_TIME,CLOSE_TIME,HIGH,LOW,OPEN_PRICE,CLOSE_PRICE,VOLUME,ACTIVE_TAKER_VOLUME from time%d" %timest)

        msg = []
        for row in cursor:
            timeArray1 = time.strptime(row[2], "%Y-%m-%d %H:%M:%S")
            timeArray2 = time.strptime(row[3], "%Y-%m-%d %H:%M:%S")
            timestamp1 = int(time.mktime(timeArray1)*1000)
            timestamp2 = int(time.mktime(timeArray2)*1000)
            msg.append({"pair": row[0], "interval": row[1], "open_time": timestamp1, "close_time": timestamp2, "high": row[4], "low": row[5], "open": row[6], "close": row[7], "volume": row[8], "active_taker_volume": row[9]})
        json_msg = json.dumps(msg)
        channel.basic_publish(exchange='', routing_key='hello', body=json_msg)
        print(" [x] Sent 'time%d'" %timest)
        time.sleep(0.025)
    conn.close()
channel.basic_publish(exchange='', routing_key='hello', body="end of test")
connection.close()

to be continue ...