Kafkarian Move oder wie man Partitionen verschiebt

Einführung

Hallo Habr! Ich arbeite als Java-Programmierer in einer Finanzorganisation. Ich beschloss, Habré zu prägen und meinen ersten Artikel zu schreiben. Aufgrund von Problemen mit der Anwesenheit von Entwicklern wurde ich beauftragt, den Kafka-Cluster von 2.0 auf 2.6 ohne Ausfallzeiten und Verlust von Nachrichten zu aktualisieren (Sie wissen, niemand mag es, wenn Geld in der Luft hängt oder irgendwo verloren geht). Ich möchte diese Erfahrung mit Ihnen teilen und konstruktives Feedback erhalten. Also, genug Wasser, lass uns zur Sache kommen.





Migrationsschema

Die Aufgabe wurde durch die Tatsache erschwert, dass es notwendig war, von alten virtuellen Maschinen auf neue zu migrieren, sodass die Option, den Broker auszuschalten, die Binärdateien zu ändern und ihn zu starten, nicht zu mir passte.





Unten sehen Sie ein Diagramm der Migration. Wir haben 3 Broker auf alten VMs, 3 Broker auf neuen VMs und jeder Broker hat seinen eigenen Zookeeper.





Migrationsplan

Damit wir migrieren können, ohne dass es jemand merkt, müssen wir "ein wenig" schwitzen. Unten finden Sie eine allgemeine Übersicht.





  1. Fügen Sie den Anwendungseinstellungen Adressen neuer Broker hinzu





  2. Schlag Zugang zwischen allen und allem





  3. Bereiten Sie die Infrastruktur auf neuen virtuellen Maschinen vor





  4. Erhebe eine neue Gruppe von Zookieepern und füge sie mit der alten zusammen





  5. Neue Kafka-Broker aufbauen





  6. Migrieren Sie alle Partitionen von alten zu neuen Brokern





  7. Deaktivieren Sie alte Kafka-Broker und alte Zookieepers





  8. Entfernen Sie alte Broker und Zukiper aus neuen Konfigurationen









. , , . "bootstrap.servers"





old-server1:9092,old-server2:9092,old-server3:9092,new-server4:9092,new-server5:9092,new-server6:9092







- . , . , .





  1. , 9092 -> ( 3 )





  2. <----> (+18 )





  3. 9092 (+6 )





  4. 2 3 : 2181, 2888, 3888 ( (18+6)*3 = 72)





: 99 . ! .





100 ,









, .





kafka





- kafka. , /etc/security/limits.conf





kafka hard nofile 262144
kafka soft nofile 262144
      
      



, , , .





, 262144, , ( ). 10 , .









java





/home/kafka/.bash_profile





export JAVA_HOME=/opt/java
export PATH=JAVA_HOME/bin:PATH
      
      



jre /opt/java









, , . .





setup.sh
tar -xf ../jdk1.8.0_181.tar.gz -C /opt/
mv /home/kafka/kafka /opt
mv /home/kafka/zookeeper /opt
mv /home/kafka/kafka-start.sh /opt
mv /home/kafka/scripts /opt

ln -sfn /opt/kafka/kafka_2.13-2.6.0 /opt/kafka/current
ln -sfn /opt/zookeeper/zookeeper-3.6.2 /opt/zookeeper/current
ln -sfn /opt/jdk1.8.0_181/ /opt/java

chown -R kafka:kafka /opt
chmod -R 700 /opt

#env_var start ------------------------------->

kafkaProfile=/home/kafka/.bash_profile

homeVar="export JAVA_HOME=/opt/java"
javaHome=$(cat $kafkaProfile | grep "$homeVar")


if [ "$javaHome" != "$homeVar" ]; then
    echo -e "\n$homeVar\n" >> $kafkaProfile
fi


pathVar="export PATH=\$JAVA_HOME/bin:\$PATH"
path=$(cat $kafkaProfile | grep "$pathVar")

if [ "$path" != "$pathVar" ]; then
    echo -e "\n$pathVar\n" >> $kafkaProfile
fi

#env_var end --------------------------------->




#ulimit start >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

limitsFile=/etc/security/limits.conf

soft="kafka soft nofile 262144"
limitSoft=$(cat $limitsFile | grep "$soft")

if [ "$limitSoft" != "$soft" ]; then
    echo -e "\n$soft\n" >> $limitsFile
fi


hard="kafka hard nofile 262144"
limitHard=$(cat $limitsFile | grep "$hard")

if [ "$limitHard" != "$hard" ]; then
    echo -e "\n$hard\n" >> $limitsFile
fi

#ulimit end >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
      
      







kafka





ulimit -n  = 262144
echo $JAVA_HOME  = /opt/java
echo $PATH    /opt/java/bin
      
      



, , , , myid, id .





/opt/zookeeper/current/conf/zoo.cfg





zoo.cfg
dataDir=/opt/zookeeper/zookeeper-data
server.1=server1:2888:3888
server.2=server2:2888:3888
server.3=server3:2888:3888

server.4=server4:2888:3888
server.5=server5:2888:3888
server.6=server6:2888:3888
      
      







/opt/zookeeper/current/bin/zkServer.sh start





.





zk-add-new-servers.sh
echo -e "\nserver.4=server4:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.5=server5:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.6=server6:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
      
      







, . .





/opt/zookeeper/zookeeper-3.4.13/bin/zkServer.sh restart
      
      



, .





.../zkServer.sh status
      
      



.





, server.properties broker.id zookeeper.connect





4





broker.id=4
zookeeper.connect=server4:2181,server5:2181,server6:2181/cluster_name
      
      







, , .





server.properties (2.0.0). .





inter.broker.protocol.version=2.0.0
      
      







nohup /opt/kafka/current/bin/kafka-server-start.sh /opt/kafka/config/server.properties > log.log 2>&1 &
      
      



. cli .





/opt/zookeeper/current/bin/zkCli.sh
      
      







ls /cluster_name/brokers/ids
      
      







[1,2,3,4,5,6]
      
      



6 , .





. - , , , . replication.factor . . .





, "", , .





json , .





{"topics": [{"topic": "foo1"},
              {"topic": "foo2"}],
  "version":1
 }
      
      



,





json kafka-reassign-partitions.sh --generate id --broker-list "4,5,6".









generate1.json
{"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2,3]},
                {"topic":"foo1","partition":0,"replicas":[3,2,1]},
                {"topic":"foo2","partition":2,"replicas":[1,2,3]},
                {"topic":"foo2","partition":0,"replicas":[3,2,1]},
                {"topic":"foo1","partition":1,"replicas":[2,3,1]},
                {"topic":"foo2","partition":1,"replicas":[2,3,1]}]
  }

  Proposed partition reassignment configuration

  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[5,4,6]},
                {"topic":"foo1","partition":0,"replicas":[4,5,6]},
                {"topic":"foo2","partition":2,"replicas":[6,4,5]},
                {"topic":"foo2","partition":0,"replicas":[4,5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,4,6]},
                {"topic":"foo2","partition":1,"replicas":[4,5,6]}]
  }
      
      







Proposed partition reassignment configuration ( ). - . , json.





. "" , . kafka-reassign-helper.jar .









prepare-for-reassignment.sh
#       .    20
if [ "$#" -eq 0 ]
then
    echo "no arguments"
    exit 1
fi


echo "Start reassignment preparing"

/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> topics.txt

echo created topics.txt

java -jar kafka-reassign-helper.jar generate topics.txt $1

fileCount=$(ls -dq generate*.json | wc -l)

echo "created $fileCount file for topics to move"

echo -e "\nCreating generated files\n"

mkdir -p generated
for ((i = 1; i < $fileCount+1; i++ ))
do
/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --broker-list "4,5,6" --topics-to-move-json-file "generate$i.json" --generate >> "generated/generated$i.txt"
echo "generated/generated$i.txt" created
done

echo -e "\nCreating execute/rollback files"

java -jar kafka-reassign-helper.jar execute $fileCount

echo -e "\nexecute/rollback files created"

echo -e "\nPreparing finished successfully!"
      
      







kafka-reassign-partitions.sh, --execute





move-partitions.sh
#     execute1.json, execute2.json ....
if [ "$#" -eq 0 ]
then
    echo "no arguments"
    exit 1
fi
        

/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file "execute/execute$1.json" --execute
      
      







. , . , , kafka-reassign-partitions.sh, --verify, .





, , .





reassign-verify.sh
progress=-1

while [ $progress != 0 ]
do

    progress=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "in progress" -c)
    complete=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "is complete" -c)
    failed=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "failed" -c)

    echo "In progress:" $progress;
    echo "Is complete:" $complete;
    echo "Failed:" $failed;

    sleep 2s

done
      
      







In progress 0. Is complete - . Failed 0.





move-partitions.sh reassign-verify.sh , .





log.dirs , - .





. kafka-server-stop.sh zkStop.sh .





. zoo.cfg





zk-remove-old-servers.sh
sed -i '/server.1/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.2/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.3/d' /opt/zookeeper/current/conf/zoo.cfg
      
      







, , , . : 2 1 .





server.properties





remove-old-protocol.sh
sed -i '/inter.broker.protocol.version=2.0.0/d' /opt/kafka/config/server.properties
      
      







, , insync, min.insync.replicas ( 2), default.replication.factor ( 3)





- 2 , 3, , , , insync .









check-insync.sh
input="check-topics.txt"
rm -f $input

/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> check-topics.txt

checkPerIter=100
i=0
list=""
notInsync=0
while IFS= read -r line
do
 ((i=i+1))
 list+="${line}|"
 if [ $i -eq $checkPerIter ]
  then
   list=${list::${#list}-1}
   echo "checking $list"
   count=$(/opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$" -c)
   if [ "$count" -ne 0 ]
    then
     /opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$"
   fi
   ((notInsync=notInsync+count))
   list=""
   i=0
 fi
done < "$input"

echo "not insync: $notInsync"
      
      







Wenn wir am Ausgang nicht insync: 0 erhalten , können wir die Broker einzeln neu starten.





Das ist alles. Die Migration der Broker ist nun abgeschlossen, mit Ausnahme der anschließenden Neukonfiguration der Überwachung und anderer Hilfsangelegenheiten.





So sieht die Anweisung aus, die ich an die Admins geschickt habe, die alles im Kampf getan haben. Überraschenderweise haben sie es das erste Mal gemacht und keine Fragen gestellt.





README.txt
    2.0.0 -> 2.6.0

1 .  

1.1       

NEW4.tar.gz ->  4 
migration.tar.gz ->  4 

NEW5.tar.gz ->  5 
NEW6.tar.gz ->  6 

1.2  

tar -xf NEW4.tar.gz -C /home/kafka
tar -xf NEW5.tar.gz -C /home/kafka
tar -xf NEW6.tar.gz -C /home/kafka

1.3   root       (    ,   )

/home/kafka/scripts/setup.sh

1.4    kafka (   )

1.5     (   )

ulimit -n  = 262144
echo $JAVA_HOME  = /opt/java
echo $PATH    /opt/java/bin

  /opt   kafka kafka



2 .    

2.1    kafka (   )

2.2       (  )

/opt/scripts/zkStart.sh

2.3       

OLD1.tar.gz ->  1 
OLD2.tar.gz ->  2 
OLD3.tar.gz ->  3 

2.4  

tar -xf OLD1.tar.gz -C /home/kafka
tar -xf OLD2.tar.gz -C /home/kafka
tar -xf OLD3.tar.gz -C /home/kafka

2.5   root       (    )

/home/kafka/scripts/setup-old.sh

2.6    kafka    

2.7       (      )

/home/kafka/scripts/zk-add-new-servers.sh

2.8       (  )

/home/kafka/scripts/zkStatus.sh

      

2.9       
    

/home/kafka/scripts/zkRestart.sh

2.10       
     

/home/kafka/scripts/zkStatus.sh ( )
/opt/scripts/zkStatus.sh ( )

        follower
  leader

2.11   
    
/opt/kafka-start.sh

2.12      
   

/home/kafka/migration/zkCli.sh

ls /cluster_name/brokers/ids

   [1,2,3,4,5,6]

3      

3.1   (    )
/home/kafka/migration/prepare-for-assignment.sh  20

3.2           /home/kafka/migration/execute


:
  execute1.json
/home/kafka/migration/move-partitions.sh 1

      

:
/home/kafka/migration/reassign-verify.sh 1

     "in progress"  0        .3.2   

3.3          ,     /opt/kafka/kafka-data    
     

3.4       ,  
/opt/kafka/current/bin/kafka-server-stop.sh

3.5       ,  

/home/kafka/scripts/zkStop.sh

3.6      (      )
/opt/scripts/zk-remove-old-servers.sh

3.7     

/opt/scripts/zkRestart.sh

3.8       ( , 2 )

/opt/scripts/zkStatus.sh

3.9      (       )
/opt/scripts/remove-old-protocol.sh


3.10       insync

    /home/kafka/migration/check-insync.sh

not insync    0

3.11       

/opt/kafka/current/bin/kafka-server-stop.sh

    (ps aux | grep kafka    )

 /opt/kafka-start.sh

  3.10  

 !
      
      







Hoffe, es hat jemandem in dieser Angelegenheit geholfen. Ich warte auf Ihre Kommentare und Bemerkungen.





Alle Skripte und Anweisungen, einschließlich der für Rollbacks, finden Sie hier








All Articles