import React from "react";

const DockerComposeRawHTML = `
version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - kafka-net
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "baeldung:1:1"
    networks:
      - kafka-net
networks:
  kafka-net:
    driver: bridge
`;

const createTopicCommand = `
docker-compose exec kafka kafka-topics.sh --create --topic nipuna 
--partitions 1 --replication-factor 1 --bootstrap-server kafka:9092
`;

const StartConsumerCommand = `
docker-compose exec kafka kafka-console-consumer.sh --topic nipuna
--from-beginning --bootstrap-server kafka:9092
`;

const PublishMessagesCommand = `
docker-compose exec kafka kafka-console-producer.sh --topic nipuna
          --broker-list kafka:9092
`;

const applicationYmlConfig = `
kafka:
  topic: forgotpassword
  bootstrapServers: 0.0.0.0:9092
`;

const kafkaApiConfig = `
package com.nipuna.configuration;


import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Configuration
@Getter
public class KafkaApiConfig {

    @Value("dollarSymbol{kafka.topic}")
    private String TOPIC;

    @Value("dollarSymbol{kafka.bootstrapServers}")
    private String BOOTSTRAP_SERVERS;

}

`;

const ProducerRawHTML = `
@Service
@Slf4j
@AllArgsConstructor
public class KafkaProducerService {

    private final KafkaApiConfig kafkaApiConfig;
    private static final String SERIALIZER = 
    "org.apache.kafka.common.serialization.StringSerializer";

    public void sendDetailsToConsumer(MailRequest mailRequest) {
        if (mailRequest.getEmail() != null) {
            UUID customResetCode = UUID.randomUUID();

            final String TOPIC = mailRequest.getTopic();
            final String BOOTSTRAP_SERVERS = kafkaApiConfig.getBOOTSTRAP_SERVERS();

            Properties props = new Properties();
            props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
            props.put("key.serializer", SERIALIZER);
            props.put("value.serializer", SERIALIZER);

            try (Producer<String, String> producer = 
              new KafkaProducer<>(props)) {

                String message = mailRequest.getEmail() + "," 
                + mailRequest.getFirstName() + "," +
                        mailRequest.getLastName() + "," 
                        + customResetCode.toString();

                ProducerRecord<String, String> record =
                 new ProducerRecord<>(TOPIC, message);
                producer.send(record);
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}
`;

const ConsumerRawHTML = `
@Service
@Slf4j
@AllArgsConstructor
public class KafkaConsumerService {
    private static final String GROUP_ID = "my-group-id";
    private static final String STRING_DESERIALIZER = 
    "org.apache.kafka.common.serialization.StringDeserializer";

    private final EmailService emailService;
    private final KafkaApiConfig kafkaApiConfig;

    public void sendMail(String mailMessage) {
        if (mailMessage != null) {

            final String TOPIC = kafkaApiConfig.getTOPIC();
            final String BOOTSTRAP_SERVERS = 
            kafkaApiConfig.getBOOTSTRAP_SERVERS();

            Properties props = new Properties();
            props.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
            props.setProperty("group.id", GROUP_ID);
            props.setProperty("key.deserializer", STRING_DESERIALIZER);
            props.setProperty("value.deserializer", STRING_DESERIALIZER);
            props.setProperty("enable.auto.commit", "true");


            try (KafkaConsumer<String, String> consumer = 
              new KafkaConsumer<>(props)) {
                consumer.subscribe(Arrays.asList(TOPIC));

                String[] userDetails = mailMessage.split(",");
                String emailAddress = userDetails[0];
                String firstName = userDetails[1];
                String lastName = userDetails[2];
                String password = userDetails[3];

                StringBuilder emailContent = new StringBuilder();
                emailContent.append("Dear ").append(firstName)
                .append(" ").append(lastName)
                        .append(",<br>Your reset password code is ")
                        .append(password);

                System.out.println("Sending email to: " + emailAddress +
                 " with details: " + firstName + ", " + lastName +
                  ", " + password);
                emailService.sendResetPasswordEmail(emailAddress, 
                  String.valueOf(emailContent));

            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}
`;

const Consumer1RawHTML = `
@Service
@Slf4j
@AllArgsConstructor
public class KafkaConsumerService1 {
    private static final String GROUP_ID = "my-group-id-1";
    private static final String STRING_DESERIALIZER =
     "org.apache.kafka.common.serialization.StringDeserializer";

    private final EmailService emailService;
    private final KafkaApiConfig kafkaApiConfig;

    @KafkaListener(topics = "topic1", groupId = "my-group-id-1")
    public void sendMail(String mailMessage) {
        if (mailMessage != null) {

            final String TOPIC = "topic1";
            final String BOOTSTRAP_SERVERS =
             kafkaApiConfig.getBOOTSTRAP_SERVERS();

            Properties props = new Properties();
            props.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
            props.setProperty("group.id", GROUP_ID);
            props.setProperty("key.deserializer", STRING_DESERIALIZER);
            props.setProperty("value.deserializer", STRING_DESERIALIZER);
            props.setProperty("enable.auto.commit", "true");


            try (KafkaConsumer<String, String> consumer = 
              new KafkaConsumer<>(props)) {
                consumer.subscribe(Arrays.asList(TOPIC));

                String[] userDetails = mailMessage.split(",");
                String emailAddress = userDetails[0];
                String firstName = userDetails[1];
                String lastName = userDetails[2];
                String password = userDetails[3];

                StringBuilder emailContent = new StringBuilder();
                emailContent.append("Dear ").append(firstName)
                .append(" ").append(lastName)
                        .append(",<br>Topic1:Your reset password code is ")
                        .append(password);

                System.out.println("Sending email to: " + emailAddress + 
                " with details: " + firstName + ", " + lastName + ", " + password);
                emailService.sendResetPasswordEmail(emailAddress, 
                  String.valueOf(emailContent));

            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}
`;

const Consumer2RawHTML = `
@Service
@Slf4j
@AllArgsConstructor
public class KafkaConsumerService2 {
    private static final String GROUP_ID = "my-group-id-2";
    private static final String STRING_DESERIALIZER = 
    "org.apache.kafka.common.serialization.StringDeserializer";

    private final EmailService emailService;
    private final KafkaApiConfig kafkaApiConfig;

    @KafkaListener(topics = "topic2", groupId = "my-group-id-2")
    public void sendMail(String mailMessage) {
        if (mailMessage != null) {

            final String TOPIC = "topic2";
            final String BOOTSTRAP_SERVERS = kafkaApiConfig.getBOOTSTRAP_SERVERS();

            Properties props = new Properties();
            props.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
            props.setProperty("group.id", GROUP_ID);
            props.setProperty("key.deserializer", STRING_DESERIALIZER);
            props.setProperty("value.deserializer", STRING_DESERIALIZER);
            props.setProperty("enable.auto.commit", "true");


            try (KafkaConsumer<String, String> consumer = 
              new KafkaConsumer<>(props)) {
                consumer.subscribe(Arrays.asList(TOPIC));

                String[] userDetails = mailMessage.split(",");
                String emailAddress = userDetails[0];
                String firstName = userDetails[1];
                String lastName = userDetails[2];
                String password = userDetails[3];

                StringBuilder emailContent = new StringBuilder();
                emailContent.append("Dear ").append(firstName).append(" ")
                .append(lastName)
                .append(",<br>Topic2:Your reset password code is ")
                .append(password);

                System.out.println("Sending email to: " + emailAddress +
                 " with details: " + firstName + ", " + lastName + ", " + password);
                emailService.sendResetPasswordEmail(emailAddress, 
                  String.valueOf(emailContent));

            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}
`;

const kafka: React.FC = () => {
  return (
    <div>
      <h2>Apache Kafka Overview</h2>

      <p>
        Apache Kafka is an open-source distributed event streaming platform used
        for building real-time data pipelines and streaming applications. In
        simple terms, Kafka is like a super-smart messenger for data. Imagine
        you have a bunch of computers or systems that need to talk to each other
        and share information in real-time. Instead of them directly talking to
        each other, they send their messages to Kafka, and Kafka makes sure the
        messages get to the right places. Kafka is designed to handle
        large-scale, fault-tolerant, and high-throughput data streaming.
      </p>

      <h3>Key Concepts and Features of Kafka:</h3>

      <ul>
        <li>
          <strong>Producers:</strong> Producers are responsible for publishing
          messages to Kafka topics. They send messages to specific topics, and
          Kafka ensures that these messages are distributed across partitions.
          Think of them as message senders. They have some information
          (messages) that they want to share.
        </li>
        <li>
          <strong>Consumers:</strong> Consumers are applications or processes
          that subscribe to topics and process the messages. Each partition is
          consumed by only one consumer at a time, allowing for parallel
          processing. Think of them as message receivers. They want to know
          what's happening and get the messages.
        </li>
        <li>
          <strong>Publish-Subscribe Model:</strong> Kafka follows a
          publish-subscribe messaging model. Producers publish messages to
          topics, and consumers subscribe to these topics to receive the
          messages.
        </li>
        <li>
          <strong>Topics:</strong> Topics are logical channels or categories to
          which messages are published. Producers send messages to topics, and
          consumers subscribe to topics to receive the messages.
        </li>
        <li>
          <strong>Partitions:</strong> Each topic can be divided into
          partitions, which allows Kafka to parallelize the processing of
          messages. Partitions also provide fault tolerance and scalability.
          Think of them as sections within a channel. They help in managing and
          processing messages more efficiently. Imagine you have a "sales"
          channel, and you split it into different sections (partitions) so that
          different people can handle different parts simultaneously.
        </li>
        <li>
          <strong>ZooKeeper:</strong> Kafka uses Apache ZooKeeper for
          distributed coordination and management of the Kafka cluster.
          ZooKeeper is responsible for electing leaders, managing configuration,
          and maintaining metadata.
        </li>
      </ul>

      <p>
        Kafka is widely used for various purposes, including real-time event
        processing, log aggregation, data integration, and building scalable and
        fault-tolerant microservices architectures. Its ability to handle large
        volumes of data in real-time makes it a popular choice in modern data
        architectures.
      </p>
      <h2>Kafka Setup:</h2>

      <p>
        We use Docker Compose tool to run services like Zookeeper and Kafka in a
        containerized environment. Below is the docker-compose.yml file:
      </p>
      <div className="listitems">
        <pre>{DockerComposeRawHTML}</pre>
      </div>
      <p>Save this file in the local directory of your system.</p>

      <h2>2. Start the Kafka Cluster:</h2>

      <p>
        Open terminal and navigate to the directory where docker-compose.yml is
        saved and run the following command to start the Kafka and Zookeeper in
        detached mode:
      </p>

      <pre className="listitems">
        <code>docker-compose up -d</code>
      </pre>

      <p>Check the running containers using the below command:</p>

      <pre className="listitems">
        <code>docker-compose ps</code>
      </pre>

      <h2>3. Creating a Kafka Topic:</h2>

      <p>
        With the Kafka cluster up, create a Kafka topic named "nipuna" using the
        following command:
      </p>
      <div className="listitems">
        <pre>{createTopicCommand}</pre>
      </div>

      <h2>4. Publishing and Consuming Messages:</h2>

      <p>Start a consumer to consume messages from the "nipuna" topic:</p>

      <div className="listitems">
        <pre>{StartConsumerCommand}</pre>
      </div>

      <p>To publish messages to the topic, use the following command:</p>

      <div className="listitems">
        <pre>{PublishMessagesCommand}</pre>
      </div>

      <p>
        By executing these commands, you can efficiently publish and consume
        messages within the Kafka topic "nipuna" in a Dockerized environment.
      </p>

      <h2>5. Dependencies:</h2>

      <p>The kafka dependencies we add in the build.gradle file are:</p>
      <pre>
        <code>
          <strong>org.apache.kafka:kafka-clients:3.5.0</strong> - Dependency for
          Kafka client libraries.
          <br />
          <strong>org.springframework.kafka:spring-kafka:3.1.1</strong> -
          Dependency for Spring Kafka integration.
        </code>
      </pre>

      <h2>6. application.yml file config:</h2>
      <div className="listitems">
        <pre>{applicationYmlConfig}</pre>
      </div>

      <h2>7. KafkaApiConfig:</h2>
      <p>Add new class for KafkaApiConfig</p>
      <div className="listitems">
        <pre>{kafkaApiConfig}</pre>
      </div>
      <p><strong>Note:</strong> dollarSymbol text in above code should be replaced with dollar symbol</p>

      <h2>8. Kafka Producer Service:</h2>
      <p>Below is how producer code looks like:</p>
      <div className="listitems">
        <pre>{ProducerRawHTML}</pre>
      </div>
      <p>
        This is a Spring @Service class for Kafka producer. It uses constructor
        injection for KafkaApiConfig. The sendDetailsToConsumer method is
        responsible for sending messages to Kafka. It uses KafkaProducer to
        create a message with data from MailRequest and sends it to the
        specified topic.
      </p>

      <h2>9. Kafka Consumer Service:</h2>
      <p>Below is how consumer code looks like:</p>
      <div className="listitems">
        <pre>{ConsumerRawHTML}</pre>
      </div>
      <p>
        This is a Spring @Service class for Kafka consumer. It uses constructor
        injection for EmailService and KafkaApiConfig. The @KafkaListener
        annotation is used to listen to messages on the specified topic and
        group ID. The sendMail method handles the consumed message and performs
        actions like sending an email.
      </p>

      <h2>10. Consumer Services with Different Topics:</h2>
      <p>Below is how consumers code looks like if we have two consumers:</p>
      <div className="listitems">
        <pre>{Consumer1RawHTML}</pre>
      </div>
      <div className="listitems">
        <pre>{Consumer2RawHTML}</pre>
      </div>
      <p>
        The provided code includes two additional consumer services
        (KafkaConsumerService1 and KafkaConsumerService2) for different topics.
        These services follow similar patterns but consume messages from
        different topics.
      </p>

      <p>
        The consumer services have similar structures, where the @KafkaListener
        annotation specifies the topic and group ID for each consumer.
      </p>

      <p>
        Please note that it's common to externalize Kafka configurations (like
        topic names) into properties files to make the application more
        configurable and maintainable. Also, ensure that the necessary Kafka
        topics exist before starting the application.
      </p>
    </div>
  );
};

export default kafka;
