Menu Close

Integration Apache Kafka (Amazon MSK) – Attunity Replicate


1. Description

The following document details the integration of the Attunity Replicate and Apache kafka tools using Amazon Managed Streaming for Apache Kafka (Amazon MSK), with the aim of real-time data ingestion and consumption from multiple platforms. The installation of the tools, configuration and subsequent integration will be presented, using an example use case.


2. Concepts

2.1. Apache Kafka

2.1.1. What is Apache Kafka?

  • Apache Kafka is a platform for real-time distributed data transmission.
  • It allows you to store log streams in a durable and fault-tolerant manner.
  • It processes log flows as they occur.
  • High performance: it is designed for a large number of small messages
  • It is used to build pipelines of real-time streaming data and streaming applications.

2.1.2. Terms

Broker Kafka server. Several brokers form a cluster.
Cluster Group of two or more brokers that provide redundancy and scalability.
Producer Kafka client. It is who delivers the messages to the broker.
Consumer Kafka client. It is who receives the messages from the broker.
Topic A topic is a category in which records are published. Each Kafka topic may contain from zero to many consumers subscribed to the data in it.
Partition Subset of topics created to allow redundancy and parallel reading /writing for a better performance.
Message Any buffer (text / binary). The format must be coordinated between producers and consumers.
Message Key Optional. It is used for partitioning and compacting.
Offset Position of a message in a partition. It is used by the consumer.

 

2.2. Amazon MSK

2.2.1. What is Amazon MSK?

  • Amazon MSK is a fully managed service that makes it easy to create and run applications that use Apache Kafka to process streaming data.
  • With a few clicks in the Amazon MSK console, you can create highly available Apache Kafka clusters with tweaks and configurations based on Apache Kafka deployment best practices.

2.3. Attunity Replicate

2.3.1. What is Attunity Replicate?

  • It is a tool that enables companies to accelerate the replication, ingestion and transmission of data in a wide variety of databases, data warehouses and Big Data platforms.
  • It facilitates moving large volumes of data safely, efficiently, and with a very low impact on operations.
  • It simplifies massive ingest on Big Data platforms from thousands of sources.


3. Cluster Amazon MSK

Before creating an Amazon MSK cluster, you must make preliminary settings to deploy your environment.

3.1. Create a VPC for the MSK cluster

In the first step of the Introduction to Using the Amazon MSK, the Amazon VPC Console is used to create an Amazon Virtual Private Cloud (Amazon VPC). You will create an MSK Cluster in this VPC later.

To perform this action you must follow the following steps:

 

Step 3: Choose Select to accept the Amazon VPC default configuration, called VPC with a Single Public Subnet.

 

  • Step 4: In VPC name (Nombre de VPC), write the name of your VPC.
    Ejemplo: AWS-MSK-VPC
  • Step 5: In Availability Zone, choose us-east-1a.
  • Step 6: In Subnet name, write the name of the subnet.
    Ejemplo: AWS-MSK-VPCSubnet-1
  • Step 7: Choose Create VPC and, next choose OK.

 

3.2. Add subnets to the VPC

To ensure high availability and fault tolerance Amazon MSK requires that it be distributed in more than one subnet and in more than one availability zone.

Amazon MSK requires that you create a minimum of 2 and a maximum of 3 subnets each in a different Availability Zone.

To perform this action you must follow the following steps:

  • Step 3: In the list of subnets, find the subnet previously created and next find the column Route table. Copy the associated value to the subnet in that column and save it for later.

  • Step 4: Choose Create subnet.
  • Step 5:  In the label Name, write the name of the subnet.
    Ejemplo: AWS-MSK-VPCSubnet-2
  • Step 6: In VPC, choose the VPC created in the previous point.
  • Step 7: In Availability Zone, choose us-east-1b.
  • Step 8: In IPv4 CIDR block, write 10.0.1.0/24.
  • Step 9: Choose Create and next Close.
  • Step 10: Choose the subnet just created in the list of subnets selecting the checkbox next to it. Make sure that no other check box is selected in the list.
  • Step 11: In the subnet view near the lower part of the screen, choose the tab Route Table and then choose Edit route table association.

  • Step 12: In the list Route Table ID, choose the route table with the previously copied value in this procedure.

  • Step 13: Choose Save and next choose Close.
    If you want to add another subnet please repeat these steps by changing values previously added.

3.3. Creation of MSK Cluster

Once the VPC in which the Amazon MSK cluster will be hosted is configured, we can proceed to create it.

To perform this action you must follow the next steps:

  • Step 1: Open AWS Management Console using your authentication credentials
  • Step 2: Once inside the console select the MSK service

  • Step 3: Press the option Create cluster

  • Step 4: In the section General specify the name (Cluster name) the cluster will have
    Ejemplo: kafkfa-test

 

  • Step 5: In the area Networking select in the option VPC, the VPC previously created.


  • Step 6: Once the VPC has been selected, choose the Availability Zones in which the cluster will be deployed. Select depending on the subnets added to the VPC.
  • Step 7: After this, each subnet of the VPC must be assigned to a different availability zone as follows.

  • Step 8: In the section Brokers select the type of instance in which the cluster will be deployed, for this document it will be t3.small

  • Step 9: To make use of the AWS best practices, the following tags will be assigned to the cluster


  • Step 10: In the area Encryption select the option Both TLS encrypted and plaintext traffic allowed to allow both encrypted messages and plain text.

  • Step 11: Finally press the button create cluster in the lower part of the screen to deploy the cluster. The cluster creation process takes approximately 15 minutes.

4. Amazon MSK Client

4.1. Creation of Amazon MSK Client

In order to access the cluster, an EC2 instance will be created as an Amazon MSK client. This client will be used to create the topics and store the services that will allow the data to be consumed. For simplicity’s sake, we’ll put this client machine in the same VPC as the Amazon MSK cluster.

To perform this action, the following steps must be carried out:

  • Step 1: Open the Amazon EC2 console in https://console.aws.amazon.com/ec2/
  • Step 2: Choose Launch Instance.
  • Step 3: Choose Select to create a new instance Amazon Linux 2 AMI (HVM), SSD Volume Type.
  • Step 4: Choose the type of instance that will host the client, it is recommended to use one that has at least 4GB of RAM. In this case, the type t2.medium.
  • Step 5: Choose Next: Configure Instance Details.
  • Step 6: In the list Network choose the VPC previously created.
  • Step 7: In the list Auto-assign Public IP, choose Enable.
  • Step 8: On the menu near the top, choose 5. Add Tags adding the same tags used when creating the cluster
  • Step 9: Choose Review and Launch and next choose Launch.
  • Step 10: Choose Create a new key pair, write the name to identify your key pair in Key pair name and choose Download Key Pair. You can also use an existing key pair if you want to.

4.2. Amazon MSK Client Setup

Once the client for Amazon MSK has been created, the security group rules must be configured to allow the connection between the cluster and the client that we have just created.

To perform this action, the following steps must be carried out:

  • Step 1: In the Amazon EC2 console choose View Instances. Next, in the column Security Groups, choose the security group associated with the client instance that was recently created.

  • Step 2: Copy the value of Group ID associated with the security group and save it for later.

  • Step 3: Open the Amazon VPC console in https://console.aws.amazon.com/vpc/
  • Step 4: In the list of VPC created copy the VPC ID associated with the VPC created at the beginning of this document.


  • Step 5: in the section Security Groups from the Amazon VPC console paste the VPC ID copied from the previous step and paste it into the search textbox.

  • Step 6: Select the security group by default of the VPC.

  • Step 7: In the tab Inbound Rules, choose Edit rules.

  • Step 8: Select Add Rule.
  • Step 9: In the new rule, choose All traffic in the column Type. In the second field of the column Source write the ID of the  security group of the client computer obtained in step 2.
  • Step 10: Select Save rules.

Repeat these steps to add an inbound rule in the security group that corresponds to your client computer to allow it to receive traffic from the security group from the VPC. Now your client computer can communicate bidirectionally with the MSK Cluster.

Once this is done, the newly created and configured client can be accessed v

 

4.3. Creation of Topics

Once the client that has access to the Amazon MSK cluster is created, we proceed to create topics to send and consume messages

4.3.1. Pre requirements.

  • Install java on the client machine using the following command: sudo yum install java-1.8.0
  • Download and unzip Apache Kafka on the client computer:wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
    tar -xzf kafka_2.12-2.2.1.tgz

Subsequently, the topics are created following the following steps:

  • Step 1: Navigate to the kafka directorycd kafka_2.12-2.2.1
  • Paso 2: Get the value ZookeeperConnectString corresponding to the previously created cluster in order to create the topics
  • aws kafka describe-cluster –region us-east-1 –cluster-arn “ClusterArn”

The value ClusterArn obtained by selecting the previously created cluster from the Amazon MSK console. https://console.aws.amazon.com/msk/

Once the previous command is entered, we obtain the following result:

{
    “ClusterInfo”: {
        “EncryptionInfo”: {
            “EncryptionInTransit”: {
                “ClientBroker”: “TLS_PLAINTEXT”,
                “InCluster”: true
            },
            “EncryptionAtRest”: {
                “DataVolumeKMSKeyId”: “arn:aws:kms:us-east-1:581027662077:key/1db13367-6660-4bc8-a575-ffd215041ef1”
            }
        },
        “BrokerNodeGroupInfo”: {
            “BrokerAZDistribution”: “DEFAULT”,
            “ClientSubnets”: [
                “subnet-0538eb3e4ad6c12a5”,
                “subnet-0fb732dbc7c9f752d”
            ],
            “StorageInfo”: {
                “EbsStorageInfo”: {
                    “VolumeSize”: 100
                }
            },
            “SecurityGroups”: [
                “sg-08b9d453e3f4d22eb”
            ],
            “InstanceType”: “kafka.t3.small”
        },
        “ClusterName”: “Kafka-test”,
        “CurrentBrokerSoftwareInfo”: {
            “KafkaVersion”: “2.2.1”
        },
        “Tags”: {
            “owner”: “cvalenzuela”,
            “Project”: “poc-kafka”
        },
        “CreationTime”: “2020-05-15T15:42:33.683Z”,
        “NumberOfBrokerNodes”: 2

        “ZookeeperConnectString”: “z-3.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:2181,z-1.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:2181,z-2.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:2181”

        “State”: “ACTIVE”

        “CurrentVersion”: “K3AEGXETSR30VB”

        “ClusterArn”: “arn:aws:kafka:us-east-1:581027662077:cluster/Kafka-test/6fed5882-f025-4369-a775-71be9b525521-5”

        “EnhancedMonitoring”: “DEFAULT”

        “OpenMonitoring”: {
            “Prometheus”: {
                “NodeExporter”: {
                    “EnabledInBroker”: false
                },
                “JmxExporter”: {
                    “EnabledInBroker”: false
                }
            }
        }
    }
}

  • Step 3: Create the topic using the following command:
    bin/kafka-topics.sh –create –zookeeper <ZookeeperConnectString> –replication-factor 2 –partitions 1 –topic <Nombre Tópico>
    If the command is successful, you will see the following message: Created topic <Nombre tópico>.


4.4. Produce and consume data locally

To test that the connection to the cluster and the newly created topic is successful, a test will be carried out producing and consuming data locally from the Amazon MSK client.

To do this, the following steps must be followed:

  • Step 1: Obtain the list of brokers servers to which the data will be sent and consumed:
    aws kafka get-bootstrap-brokers –region us-east-1 –cluster-arn <ClusterArn>

You must configure your AWS credentials on the client using the command aws config
The value ClusterArn obtained by selecting the previously created cluster from the Amazon MSK console. https://console.aws.amazon.com/msk/

{
    “BootstrapBrokerStringTls”: “b-2.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:9094,b-1.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:9094”, 

    “BootstrapBrokerString”: “b-2.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:9092,b-1.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:9092”
}

  • Step 2: Copy the value obtained in for the parameter BootstrapBrokerString
  • Step 3: Navigate to the kafka directory
  • Step 4: To produce data use the following command
    bin/kafka-console-producer.sh –broker-list <BootstrapBrokerString> –topic <Nombre Tópico>
  • Step 5: Open a new Amazon MSK client console to be able to consume the data
  • Step 6: Within the same kafka folder accessed in step 3 use the following command:
    bin/kafka-console-consumer.sh –bootstrap-server <BootstrapBrokerString> –topic <Nombre Tópico> –from-beginning

If everything has been executed correctly, we can see that when writing a message from the producer console (left) it is replicated to the consumer console (right).


5. Data producer

Once the cluster and the Apache Kafka topic have been created using Amazon MSK, we proceed to configure the data producer that is the one that will be in charge of sending the messages to the cluster. To do this, Attunity Replicate will be used since it allows us to ingest data from multiple sources towards a specific topic of Apache Kafka.

5.1. Installation of Attunity Replicate

For the installation of this tool, an AMI image can be used with Attunity previously installed, so only the EC2 instance corresponding to this tool should be created. In this case it is called Attunity Replicate Capacitacion and it is a windows instance so once created it can be accessed through windows remote.

This instance will be created within the VPC where the cluster is located, so this must be configured when creating the tool image. To do this, the following steps must be performed:

  • Step 1: Open the Amazon EC2 console in https://console.aws.amazon.com/ec2/
  • Step 2: Choose Launch Instance.
  • Step 3: Select the image Attunity Replicate Capacitacion within the menu of My AMIs

  • Step 4: Choose the type of instance that hosts the Attunity Replicate image, it is recommended to use one that has at least 4GB of ram. In this case, the type t2.medium.
  • Step 5: Choose Next: Configure Instance Details.
  • Step 6: In the list Network choose the VPC create in the step “Create a VPC for the MKS cluster” in this document.
  • Step 7: In the list Auto-assign Public IP, choose Enable.
  • Step 8: On the menu near the top, choose 5. Add Tags. adding the same tags used when creating the cluster
  • Step 9: Choose Review and Launch and next choose Launch.
  • Step 10: Choose Create a new key pair, type a name to identify your key pair in Key pair name and choose Download Key Pair. You can also use an existing key pair if you prefer.


5.2. Attunity Replicate Setup

Once the instance containing Attunity Replicate is created, the rules of the security group must be configured to allow the connection between the cluster and Attunity Replicate.

To perform this action, the following steps must be carried out:

  • Step 1: In the Amazon EC2 console choose View Instances. Next, in the column Security Groups, choose the security group associated with the newly created instance containing Attunity Replicate.

  • Step 2: Copy the value of Group ID associated with the newly created instance containing Attunity Replicate.

  • Step 3: Open the Amazon VPC console in https://console.aws.amazon.com/vpc/
  • Step 4: In the list of VPC created copy the value of VPC ID associated with the VPC created at the beginning of this document.

  • Step 5: In the section Security Groups from the Amazon VPC console paste the VPC ID copied from the previous step and paste it into the browser.

  • Step 6: Choose the security group by default of the VPC

  • Step 7: In the tab Inbound Rules, choose Edit rules.

  • Step 8: Choose Add Rule.
  • Step 9: In the new rule, choose All traffic in the column Type. In the second field of the column Source type the ID of the security group of the Attunity Replicate instance obtained in step 2.
  • Step 10: Choose Save rules.

Repeat these steps to add an inbound rule in the security group that corresponds to the Attunity Replicate instance to allow it to receive traffic from the security group from the VPC. Now your Attunity Replicate instance can communicate bidirectionally with the MSK Cluster.

Once this is done, the instance can be accessed through windows remote.

5.3. Access to Attunity Replicate

Once you have entered the Attunity Replicate EC2 instance to access the tool, you must search Attunity Replicate Console from the windows application search engine.

5.4. Connectors Setup

Attunity Replicate allows to access or send data through connectors that can be of type source or target.

To manage these connectors select the following option:

5.4.1. Setup source connector

This type of connectors allow information to be obtained from different sources. The types supported by the Attunity source connectors are:

  • Amazon RDS for MySQL 
  • Amazon RDS for PostgreSQL
  • Amazon RDS for SQL Server
  • DISAM (ARC)
  • File
  • File Channel
  • Google Cloud SQL for MySQL
  • Google Cloud SQL for PostgreSQL
  • HP NonStop Enscribe (AIS)
  • HP NonStop SQL/MP (AIS)
  • Hadoop
  • IBM DB2 for LUW
  • IBM DB2 for iSeries
  • IBM DB2 for z/OS
  • IBM IMS (ARC)
  • Microsoft Azure SQL Managed Instance
  • Microsoft SQL Server
  • MySQL
  • ODBC
  • ODBC with CDC
  • Oracle
  • PostgreSQL
  • RMS (ARC)
  • SAP Application
  • SAP Application (DB)
  • SAP HANA
  • SAP Sybase ASE
  • Teradata Database
  • VSAM (ARC)

To create a source type connector, follow these steps:

Once inside the tab Manage Endpoint Connections which is where the connectors are managed select New Endpoint Connection

Select type source in the role checkbox and complete data on the connector according to its type.

As an example, an Amazon RDS for MySQL source connector is created.

The source data corresponds to a database that contains information on cities in the world, collected at the following link https://dev.mysql.com/doc/index-other.html

Finally it is proved that the connection to the connector is made successfully and the connector is saved by pressing save

5.4.2. Setup target connector

This type of connectors allow to send the information of a source towards another type of application. The types supported by the connectors target of Attunity are:

  • Actian Vector
  • Amazon EMR
  • Amazon Kinesis Data Streams
  • Amazon Redshift
  • Amazon S3
  • File
  • File Channel
  • Google Cloud SQL for MySQL
  • Google Cloud SQL for PostgreSQL
  • Google Cloud Storage
  • Google Dataproc
  • HP Vertica
  • Hadoop
  • Hortonworks Data Platform (HDP)
  • Kafka
  • Log Stream
  • MapR Streams
  • MemSQL
  • Microsoft Azure ADLS
  • Microsoft Azure Database for MySQL
  • Microsoft Azure Database for PostgreSQL
  • Microsoft Azure Databricks
  • Microsoft Azure HDInsight
  • Microsoft Azure SQL Database
  • Microsoft SQL Server
  • MySQL
  • ODBC
  • Oracle
  • Pivotal Greenplum
  • PostgreSQL
  • SAP HANA
  • Snowflake on AWS
  • Snowflake on Azure
  • Teradata Database

To create a type connector target the following steps should be followed:

Once inside the tab Manage Endpoint Connections which is where the connectors are managed select New Endpoint Connection

Choose type target in the role checkbox and complete data on the connector according to its type.

IMPORTANT: remember that the broker servers is where the cluster is hosted, in this case it is the value BootstrapBrokerString obtained in Step 1 of the section “Produce and consume data locally”of this document.

  • Add the value BootstrapBrokerString removing the ports.
  • Later in the section data message publishing the topic to which the connector will be subscribed is selected, in this case the selected topic was test
  • Finally it is proved that the connection to the connector is made successfully and the connector is saved by pressing save.

5.5. Tasks

Attunity Replicate lets you replicate data from a point of origin to a target point. To do this, the tasks are used. Each task has a source point (Connector Source) and a target point (Connector target).

5.5.1.  Creation of Tasks

To create a new task you must press the option New Task from the main screen, as shown below.

After this, you must specify the name and description of the task that must consider the action that will be taken by that use case of replicate.

The Replication Profile that describes the task to be performed must also be configured. There are 3 types of replication profile, which are:

  • Unidirectional: to replicate between endpoints for Unidirectional purpose
  • Bidirectional: to synchronize records between two endpoints.
  • Log Stream: allows a dedicated replication task to save transaction log data changes from a single source database and apply them to multiple targets, without the overhead of having to read the records for each target separately.

Finally, the task options are configured where at least one of the presented options is chosen, which are:

  • Full load: When full load is triggered, Replicate loads the data from the initial source to the target end point.
  • Apply Changes: Attunity Replicate processes the changes. By default, modification processing is performed for this task. Modification processing can be viewed in the monitor view.
  • Store Changes: Changes are stored in change tables or in an audit table. By default, changes are not stored.

In this particular case the Unidirectional replication profile is selected since it is only required to send data to an endpoint and in task options select Full Load to perform an initial data load and Apply Changes to send data as the database records change, being as follows.

 

5.5.2. Configuración de Tarea

Once the task is created, it must be configured for its operation.

As a first step, you must assign the connectors that will be used in the task, this is done by dragging them to their position within the task in the following way.


After this, the tables to be replicated to the selected destination are selected.

 

Once the task that will replicate the data to the Kafka cluster has been created, it only remains to start the newly created task by pressing Run at the top of the interface.

5.6. Produce Data

Once pressed Run to start the Full Load task or the initial data load to observe this we must go to the Attunity Replicate monitor.

After the initial data load is performed, the task does not stop since adding the option Apply Changes it is waiting for the realization of some change in the database to be able to replicate a new message towards the subscribed topic.

To view this, a new record is made in the table city in the database considering the following data:

“Name”: “Cibinong”,
“CountryCode”: “IDN”,
“District”: “West Java”,
“Population”: 101300

Once this is done, the change made in the database is displayed on the Attunity monitor

To observe the messages replicated by Attunity Replicate you must follow the step 6 of the section “Produce and consume data locally” of this document.

The format of the message obtained by Amazon MSK is as follows:

{
    “magic”: “atMSG”,
    “type”: “DT”,
    “headers”: null,
    “messageSchemaId”: null,
    “messageSchema”: null,
    “message”: {
        “data”: {
            “ID”: 4097,
            “Name”: “Jombang”,
            “CountryCode”: “IDN”,
            “District”: “East Java”,
            “Population”: 92600
        },
        “beforeData”: null,
        “headers”: {
            “operation”: “INSERT”,
            “changeSequence”: “20200520193227000000000000000000005”,
            “timestamp”: “2020-05-20T19:32:27.000”,
            “streamPosition”: “mysql-bin-changelog.004646:350:0:416:19954418057435:mysql-bin-changelog.004646:292”,
            “transactionId”: “000000000000000000001226000000DB”,
            “changeMask”: “1F”,
            “columnMask”: “1F”,
            “transactionEventCounter”: 1,
            “transactionLastEvent”: true
        }
    }
}


6. Data Consumer

Once the messages have been published in the Amazon MSK cluster topic, a consumer of this data must be created, to do this we will configure a Proxy-Rest in the Amazon MSK client.

The Confluent Proxy-Rest will be implemented which provides a RESTful interface to a Kafka cluster, making it easier to produce and consume messages, view the status of the cluster, and take administrative action without using the Kafka native clients or protocol.

6.1. Configuración Proxy-Rest

To perform the Proxy-Rest configuration, the following steps must be followed:

  • Step 1: Login to the previously created Amazon MSK client from the Amazon EC2 console at https://console.aws.amazon.com/ec2/.
  • Step 2: Download and uncompress confluent using the following command: curl -O http://packages.confluent.io/archive/5.5/confluent-5.5.0-2.12.tar.gz
    tar xzf confluent-5.5.0-2.12.tar.gz
  • Paso 3: Editar el archivo kafka-rest.properties utilizando el siguiente comando: vim confluent-5.5.0/etc/kafka-rest/kafka-rest.properties
    You can use your text editor of your choice.
  • Step 4: Replace the lines shown below with the data from the Amazon MSK cluster.
    ZookeeperConnectString obtained in  step 2 of section “Creation of topics” of this document.
    BootstrapBrokerString obtained in step 1 of section “Produce and consume data locally” of this document.

    IMPORTANT:
    Write PLAINTEXT:// before each broker server
    Example: PLAINTEXT://b-2.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:9092,PLAINTEXT://b-1.kafka-test.m1vlzx.c5.kafka.us-east-1.amazonaws.com:9092
  • Paso 5: Step 5: Browse to the confluent folder.
    cd confluent-5.5.0/
  • Step 6: Deploy the service Proxy-Rest of confluent using the following command:
    bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties
  • Step 7: Verify that the service is working correctly by entering the following endpoint
    http://<DNS_CLIENTE_AMAZON_MSK>:8082/topicsSi no hay errores se debe obtener una lista de los tópicos creados en el clúster de Amazon MSK.// 20200521183524
    // http://ec2-34-228-240-180.compute-1.amazonaws.com:8082/topics
    [
    “AWSKafkaTutorialTopic”,
    “test”
    ]

6.2. Api Proxy-Rest

  • The Proxy-Rest that confluent provides has different endpoints to which to resort to manage Apache Kafka through HTTP requests. It allows consuming and producing data, creating and deleting topics, obtaining data from consumers, etc.
  • The documentation for this API is available at:
  • https://docs.confluent.io/current/kafka-rest/api.html#crest-api-v2
  • The following is an attached file that contains a set of requests that allow you to test the aforementioned API.
  • Amazon MSK.postman_collection.json
  • Postman compatible file.
  • Change the Amazon MSK cluster host on each request.

 

6.3. Creation of the consumer

For the creation of the data consumer, we used NodeJs and the dependency Kafka-Rest (https://www.npmjs.com/package/kafka-rest).

The code that allows creating the consumer is as follows:

‘use strict’
var KafkaRest = require(‘kafka-rest’);
var kafka = new KafkaRest({ ‘url’: ‘http://<DNS_CLIENTE_AMAZON_MSK>:8082’ }); 

//Permite crear el consumidor

//NOMBRE_CONSUMIDOR = nombre del consumidor

kafka.consumer(“NOMBRE_CONSUMIDOR”).join({

    “format”: “binary”,

    “auto.offset.reset”: “smallest”,

    “auto.commit.enable”: “false”

}, function (err, consumer_instance) {


//El crear el consumidor retorna una instancia que es la que permite suscribirse al tópico
    //TOPICO = nombre del tópico    var stream = consumer_instance.subscribe(‘TOPICO’)
   //Establece conexión con el tópico y espera la llegada de nuevos mensajes

    stream.on(‘data’, function (msgs) {
        for (var i = 0; i < msgs.length; i++) {
            //Json que contiene metadata del tópico
            var metadata = {
                “topic”: stream.topic,
                “key”: msgs[i].key,
                “partition”: msgs[i].partition,
                “offset”: msgs[i].offset
            }
            //Json que contiene el mensaje
            //El mensaje viene cifrado en base64
            var data = JSON.parse(msgs[i].value.toString(‘utf8’))
            console.log({ metadata, data });
        }
    });
    //Actúa cuando existe una interrupción en la conexión con el clúster
    stream.on(‘error’, function (err) {
        console.log(“Something broke: “ + err);
    });
});

6.4. Data consumption

The above code prints the topic messages on the console once they are produced by Attunity Replicate.

The format of the message obtained is as follows:

{
    metadata: {
        topic: ‘test’,
        key: null,
        partition: 0,
        offset: 16
    },
    data: {
        magic: ‘atMSG’,
        type: ‘DT’,
        headers: null,
        messageSchemaId: null,
        messageSchema: null,
        message: {
            data: {
                ID: 1017,
                Name: ‘Jombang’,
                CountryCode: ‘IDN’,
                District: ‘East Java’,
                Population: 92600
            },
            beforeData: null,
            headers: {
                operation: ‘DELETE’,
                changeSequence: ‘20200520194057000000000000000000029‘,
                timestamp: ‘2020-05-20T19: 40: 57.000‘,
                streamPosition:’mysql-bin-changelog.004648: 729: 0: 795: 19963007992027:mysql-bin-changelog.004648: 671‘,
                transactionId: ‘000000000000000000001228000000DB’,
                changeMask: ‘01‘,
                columnMask: ‘1F’,
                transactionEventCounter: 4,
                transactionLastEvent: true

            }
        }
    }
}


7. Connectors

Apache Kafka not only allows you to produce or consume data, it also allows you to add connectors to it that help synchronize messages that hit a topic and send them to external systems like databases, key value stores, search indexes and file systems. To perform this task, the service will be used kafka-connect provided by Confluent in the Amazon MSK client.

For practical purposes, a connector will be created that sends the messages in JSON format to an s3 bucket.

7.1. Descargar Plugins

  • Step 1: Login to the previously created Amazon MSK client from the Amazon EC2 console at https://console.aws.amazon.com/ec2/.
    Step 2: Download and uncompress confluent using the following command:
    curl -O http://packages.confluent.io/archive/5.5/confluent-5.5.0-2.12.tar.gz
    tar xzf confluent-5.5.0-2.12.tar.gz

If you have already completed the steps to configure the proxy-rest skip step 2

  • Step 3: download and unzip the Amazon S3 plugin in the main folder
    wget https://api.hub.confluent.io/api/plugins/confluentinc/kafka-connect-s3/versions/5.5.0/archive
    unzip archive
  • Step 4: Create folder for plugins in main foldermkdir plugins
  • Step 5: Create dedicated folder to S3 connector in plugin folder
    mkdir plugins/kafka-connect-s3
  • Step 6: copy content of the plugin downloaded in step 2 to the folder created in step
    cp confluentinc-kafka-connect-s3-5.5.0/lib/* plugins/kafka-connect-s3/

7.2. Configuración Kafka-Connect
To do this, the following steps must be followed:

  • Paso 1: Step 1: Edit the file connect-standalone.properties by using the following command:
    vim confluent-5.5.0/etc/kafka/connect-standalone.properties
  • Paso 2: Step 2: Replace the lines shown below with the requested data:
    bootstrap.servers=<Replace_With_Your_BootstrapBrokerString> plugin.path=<PATH_CARPETA_PLUGIN>
    BootstrapBrokerString obtained in step 1 of the section “Produce and consume data locally” of this document.
    PATH_CARPETA_PLUGIN Path of the plugin folder created in the step 4 of previous section.

7.3. Connector Setup

To do this, the following steps must be followed:

  • Step 1: Edit the file quickstart-s3.properties using the following command:

Inside the file make the following configuration:

name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=<NOMBRE_TOPICO>
s3.region=<REGION_DE_BUCKET>
s3.bucket.name=<NOMBRE_BUCKET>
s3.part.size=5242880
flush.size=100000

key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false

storage.class=io.confluent.connect.s3.storage.S3Storage
#format.class=io.confluent.connect.s3.format.avro.AvroFormat
format.class=io.confluent.connect.s3.format.json.JsonFormat
#partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner

schema.compatibility=NONE

locale: es_CL
timezone: UTC
path.format: YYYY-MM-dd/HH
partition.duration.ms: 3600000
rotate.interval.ms: 60000
timestamp.extractor: Record
Replace the data that is between <>

7.4. Deploy Connection

Once the plugin is downloaded and the files for both kafka-connect and the connector itself are configured, we proceed to deploy the connection and see how the messages contained in a topic are synchronized within the S3 bucket.

To deploy the connection use the following command inside the Confluent folder:

bin/connect-standalone etc/kafka/connect-standalone.properties etc/kafka-connect-s3/quickstart-s3.properties

If there are no problems, the topic data is replicated inside the bucket:


8. Fail Recovery

Although it is expected that the system does not present failures at the time of its deployment, it is necessary to take into account how to mitigate or solve if they occur, this is why we will detail how errors are handled in each of the parts.

8.1. Producer

As the Attunity Replicate tool was used as a data producer, this allows in case of errors in the connection of any of the task’s connectors to obtain a detailed log of all the errors present in the operation.


8.2. Amazon MSK Cluster

The main barrier that Amazon MSK has to face a possible fall in services is that it allows brokers to be created and replicated in different availability zones, which not only allows for high availability of services, but also helps if a problem occurs. failure of one of the brokers there is another in a different availability zone that can take its place.

However, when a massive failure occurs the only measure that exists in case the Amazon MSK cluster fails is creating a new one. Therefore, measures must be taken to mitigate or reduce the consequences if this action is required.

Before the catastrophe it is recommended to use kafka-connect to back up the message stream by sending it to an S3 bucket.

Check the section Connectors of this document.

In order that when the catastrophe happens and it is required to create a new cluster, this data backup is used.

8.2.1. Recovery Procedure

  • Create the new cluster
  • Isolate the old cluster by disabling cluster services, kafka-connect, kafka-rest, etc. (This to prevent users from trying to connect to the cluster.)
  • Load the s3 bucket backup into the new cluster (use kafka-connect, s3 source connector).
  • Enable kafka-connect for the new MSK cluster.
  • Change by informing all services about the new Kafka broker server point
  • Enable the services so that users can use the application again.

8.2.2. Metrics

Amazon MSK has metrics to measure certain aspects of the cluster. These metrics are available at https://docs.aws.amazon.com/es_es/msk/latest/developerguide/monitoring.html.

the only metric that allows the state of one of its elements to be measured is:

ZooKeeperSessionState.

Which is measured in the following values.

NOT_CONNECTED ‘0.0’
ASSOCIATING ‘0.1’
CONNECTING ‘0.5’
CONNECTEDREADONLY ‘0.8’
CONNECTED ‘1.0’
CLOSED ‘5.0’
AUTH_FAILED ‘10.0’

 

This metric measures the status of the cluster ZooKeeper session.

The Zookeeper is used to store the status of the brokers.


8.3. Consumer

In the event that the consumer presents some type of failure, they must be able to resume from the last message consumed.

To achieve this, the consumer must be configured using the following options:

“auto.offset.reset”: “smallest”,

“auto.commit.enable”: “false”

By default in the consumer instance, the offset of the last consumed message is saved, so it will know where to resume the task once some type of consumer failure occurs.

You must also configure what the consumer’s initial offset will be. Use auto.offset.reset to define consumer behavior when there is no compromised position (which would be the case when the group is initialized for the first time) or when an offset is out of range. You can choose to reset the position to the oldest smallest offset or the latest largest offset (the default).

By initializing auto.commit.enable to false we ensure that the transition to the next offset is only made when the previous message is successfully received, if it is initialized to true it will make the transition or offset after a certain time.

Write us!

Posted in »Blog English