Big / Bug-Daten: Analyse des Apache Flink-Quellcodes

image1.png


Big Data-Anwendungen verarbeiten große Informationsmengen, häufig in Echtzeit. Natürlich müssen solche Anwendungen sehr zuverlässig sein, damit kein Fehler im Code die Datenverarbeitung beeinträchtigen kann. Um eine hohe Zuverlässigkeit zu erreichen, muss die Qualität des für diesen Bereich entwickelten Projektcodes genau überwacht werden. Der statische Analysator PVS-Studio behandelt dieses Problem. Heute wurde das von der Apache Software Foundation, einem der führenden Unternehmen auf dem Big Data-Softwaremarkt, entwickelte Apache Flink-Projekt als Testobjekt für den Analysator ausgewählt.



Was ist Apache Flink? Es ist ein Open-Source-Framework für die verteilte Verarbeitung großer Datenmengen. Es wurde 2010 als Alternative zu Hadoop MapReduce an der Technischen Universität Berlin entwickelt. Das Framework basiert auf einer verteilten Ausführungs-Engine für Batch- und Stream-Datenverarbeitungsanwendungen. Diese Engine ist in Java und Scala geschrieben. Heute kann Apache Flink in Projekten verwendet werden, die mit Java, Scala, Python und sogar SQL geschrieben wurden.



Projektanalyse



Nachdem ich den Quellcode des Projekts heruntergeladen hatte, begann ich mit dem Erstellen des Projekts mit dem Befehl 'mvn clean package -DskipTests', der in den Anweisungen auf GitHub angegeben ist . Während der Montage mit dem Dienstprogramm CLOC stellte ich fest, dass das Projekt 10838 Java-Dateien enthält, die ungefähr 1,3 Millionen Codezeilen enthalten. Darüber hinaus gab es bereits 3833 Test-Java-Dateien, was mehr als 1/3 aller Java-Dateien entspricht. Mir ist auch aufgefallen, dass das Projekt den statischen Code-Analysator FindBugs und das Dienstprogramm Cobertura verwendet, das Informationen zur Codeabdeckung durch Tests bereitstellt. Vor diesem Hintergrund wird deutlich, dass die Entwickler von Apache Flink die Codequalität und die Testabdeckung während der Entwicklung sorgfältig überwacht haben.



Nach einem erfolgreichen Build habe ich das Projekt in IntelliJ IDEA geöffnet und die Analyse mit dem Plugin PVS-Studio für IDEA und Android Studio ausgeführt . Die Warnungen des Analysators wurden wie folgt verteilt:



  • 183 hoch;
  • 759 Medium;
  • 545 Niedrig.


Etwa 2/3 der PVS-Studio-Analysator-Trigger wurden Testdateien zugewiesen. Angesichts dieser Tatsache und der Größe der Codebasis des Projekts können wir sagen, dass es den Apache Flink-Entwicklern gelungen ist, die Qualität des Codes im Vordergrund zu halten.



Nachdem ich die Warnungen des Analysators genauer untersucht habe, habe ich meiner Meinung nach die interessantesten ausgewählt. Mal sehen, was PVS-Studio in diesem Projekt gefunden hat!





Nur ein bisschen Nachlässigkeit



V6001 Links und rechts vom Operator '==' befinden sich identische Unterausdrücke ' processData '. CheckpointStatistics.java (229)



@Override
public boolean equals(Object o) 
{
  ....
  CheckpointStatistics that = (CheckpointStatistics) o;
  return id == that.id &&
    savepoint == that.savepoint &&
    triggerTimestamp == that.triggerTimestamp &&
    latestAckTimestamp == that.latestAckTimestamp &&
    stateSize == that.stateSize &&
    duration == that.duration &&
    alignmentBuffered == that.alignmentBuffered &&
    processedData == processedData &&                // <=
    persistedData == that.persistedData &&
    numSubtasks == that.numSubtasks &&
    numAckSubtasks == that.numAckSubtasks &&
    status == that.status &&
    Objects.equals(checkpointType, that.checkpointType) &&
    Objects.equals(
      checkpointStatisticsPerTask, 
      that.checkpointStatisticsPerTask);
}
      
      





Vor dem Hintergrund anderer Ausdrücke ist dieser Fehler nicht sehr auffällig. Beim Überschreiben der equals- Methode für die CheckpointStatistics- Klasse hat der Programmierer einen Fehler im Ausdruck processData == verarbeiteter Daten gemacht , der bedeutungslos ist, da er immer wahr ist. In ähnlicher Weise sollte der Rest des Ausdrucks im Gegenzug mit dem Feld des aktuellen Objekts this und dem Objekt That verglichen werden : processData == that.processedData... Diese Situation ist eines der typischen Fehlermuster in Vergleichsfunktionen, die im Artikel "Das Böse lebt in Vergleichsfunktionen " ausführlich beschrieben werden . Es stellt sich also heraus, dass nur "ein wenig Unaufmerksamkeit" die Logik der Überprüfung der Äquivalenz von Objekten der CheckpointStatistics- Klasse gebrochen hat .



Ausdruck ist immer wahr



V6007 Der Ausdruck 'input2.length> 0' ist immer wahr. Operator.java (283)



public static <T> Operator<T> createUnionCascade(Operator<T> input1, 
                                                 Operator<T>... input2) 
{
  if (input2 == null || input2.length == 0) 
  {
    return input1;                                // <=
  } 
  else if (input2.length == 1 && input1 == null) 
  {
    return input2[0];
  }
  ....
  if (input1 != null) 
  {
    ....
  } 
  else if (input2.length > 0 && input2[0] != null) // <=
  {
    ....
  } 
  else 
  {
    ....
  }
}
      
      





Bei dieser Methode erwies sich der Analysator als aufmerksamer als eine Person, die er auf seine eigene Art und Weise zu melden beschloss, was darauf hinweist, dass der Ausdruck input2.length> 0 immer wahr ist. Der Grund ist, dass wenn die Länge des Eingangs2- Arrays 0 ist, die Bedingung input2 == null || ist input2.length == 0 des ersten if in der Methode ist true, und die Ausführung der Methode wird unterbrochen, bevor die Zeile mit dem Ausdruck input2.length> 0 erreicht wird .



Allsehender Analysator



V6007 Der Ausdruck 'slotSharingGroup == null' ist immer falsch. StreamGraphGenerator.java (510)



private <T> Collection<Integer> transformFeedback(....)
{
  ....
  String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
  if (slotSharingGroup == null)
  {
    slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
  }
  ....
}
      
      





Der Analysator hat gemeldet, dass slotSharingGroup == null immer falsch ist. Dies deutet darauf hin, dass die Methode DetermineSlotSharingGroup niemals null zurückgibt . Ist der Analysator so intelligent, dass er alle Werte berechnen konnte, die diese Methode zurückgeben kann? Lassen Sie uns besser alles selbst überprüfen:



public class StreamGraphGenerator 
{
  ....
  public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
  ....
  private String determineSlotSharingGroup(String specifiedGroup, 
                                           Collection<Integer> inputIds) 
  {
    if (specifiedGroup != null)
    {
      return specifiedGroup; // <= 1
    }
    else
    {
      String inputGroup = null;
      for (int id: inputIds)
      {
        String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
        if (inputGroup == null)
        {
          inputGroup = inputGroupCandidate;
        }
        else if (!inputGroup.equals(inputGroupCandidate))
        {
          return DEFAULT_SLOT_SHARING_GROUP; // <= 2
        }
      }
      return inputGroup == null 
             ? DEFAULT_SLOT_SHARING_GROUP 
             : inputGroup; // <= 3
    }
  }
  ....
}
      
      





In der Reihenfolge gehen wir alle Rückgaben durch und sehen, was diese Methode wiedererlangen kann:



  • Die erste Rückgabe gibt das Argument an die angegebene Group- Methode zurück , jedoch nur, wenn es nicht null ist .
  • return for DEFAULT_SLOT_SHARING_GROUP, ;
  • return inputGroup, null. DEFAULT_SLOT_SHARING_GROUP.


Es stellte sich heraus, dass der Analysator wirklich in der Lage war, die Unmöglichkeit der Rückgabe von Null aus der Methode DetermineSlotSharingGroup zu berechnen, und warnte uns davor und wies auf die Sinnlosigkeit der Überprüfung von slotSharingGroup == Null hin . Und obwohl diese Situation nicht fehlerhaft ist, kann ein solcher zusätzlicher Schutz des Analysators in einem anderen Fall einen Fehler erkennen. Zum Beispiel, wenn Sie eine Methode benötigen, um unter bestimmten Bedingungen null zurückzugeben.



Sammle sie alle



V6007 Der Ausdruck 'currentCount <= lastEnd' ist immer wahr. CountSlidingWindowAssigner.java (75)



V6007 Der Ausdruck 'lastStart <= currentCount' ist immer wahr. CountSlidingWindowAssigner.java (75)



@Override
public Collection<CountWindow> assignWindows(....) throws IOException 
{
  Long countValue = count.value();
  long currentCount = countValue == null ? 0L : countValue;
  count.update(currentCount + 1);
  long lastId = currentCount / windowSlide;
  long lastStart = lastId * windowSlide;
  long lastEnd = lastStart + windowSize - 1;
  List<CountWindow> windows = new ArrayList<>();
  while (lastId >= 0 && 
         lastStart <= currentCount && 
         currentCount <= lastEnd) 
  {
    if (lastStart <= currentCount && currentCount <= lastEnd) // <=
    {
      windows.add(new CountWindow(lastId));
    }
    lastId--;
    lastStart -= windowSlide;
    lastEnd -= windowSlide;
  }
  return windows;
}
      
      





Der Analysator warnt davor, dass die Ausdrücke currentCount <= lastEnd und lastStart <= currentCount immer wahr sind. Und tatsächlich, wenn Sie sich den Zustand der while-Schleife ansehen , dann gibt es genau die gleichen Ausdrücke. Dies bedeutet, dass diese Ausdrücke innerhalb der Schleife immer wahr sind, sodass alle in der Schleife erstellten Objekte vom Typ CountWindow zur Fensterliste hinzugefügt werden . Es gibt viele Optionen für das Erscheinen dieser bedeutungslosen Prüfung, und das erste, was mir in den Sinn kommt, ist entweder ein Refactoring-Artefakt oder die Bestätigung eines Entwicklers. Aber es kann ein Fehler sein, wenn Sie etwas anderes überprüfen wollten ...



Falsche Argumentreihenfolge



V6029 Mögliche falsche Reihenfolge der an die Methode übergebenen Argumente: 'hasBufferForReleasedChannel', 'hasBufferForRemovedChannel'. NettyMessageClientDecoderDelegateTest.java (165), NettyMessageClientDecoderDelegateTest.java (166)



private void testNettyMessageClientDecoding(
       boolean hasEmptyBuffer,
       boolean hasBufferForReleasedChannel,
       boolean hasBufferForRemovedChannel) throws Exception 
{
  ....
  List<BufferResponse> messages = createMessageList (
    hasEmptyBuffer,
    hasBufferForReleasedChannel,
    hasBufferForRemovedChannel);
  ....
}
      
      





Javas mangelnde Fähigkeit, eine Methode mit benannten Parametern aufzurufen, ist für Entwickler manchmal ein grausamer Witz. Genau dies geschah, als der Analysator auf die Methode createMessageList zeigte . Bei der Definition dieser Methode wird deutlich, dass der Parameter hasBufferForRemovedChannel vor dem Parameter hasBufferForReleasedChannel an die Methode übergeben werden muss :



private List<BufferResponse> createMessageList(
  boolean hasEmptyBuffer,
  boolean hasBufferForRemovedChannel,
  boolean hasBufferForReleasedChannel) 
{
  ....
  if (hasBufferForReleasedChannel) {
    addBufferResponse(messages, 
                      releasedInputChannelId, 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  if (hasBufferForRemovedChannel) {
    addBufferResponse(messages, 
                      new InputChannelID(), 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  ....
  return messages;
}
      
      





Beim Aufrufen der Methode hat der Entwickler jedoch die Reihenfolge dieser Argumente vertauscht , weshalb die Logik der Methode createMessageList unterbrochen wird, wenn sich die Werte der gemischten Argumente unterscheiden.



Oh, dieses Kopieren und Einfügen



V6032 Es ist seltsam, dass der Hauptteil der Methode 'seekToFirst' dem Körper einer anderen Methode 'seekToLast' vollständig entspricht. RocksIteratorWrapper.java (53), RocksIteratorWrapper.java (59)



public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable {
  ....
  private RocksIterator iterator;
  ....

  @Override
  public void seekToFirst() {
    iterator.seekToFirst(); // <=
    status(); 
  }
  
  @Override
  public void seekToLast() {
    iterator.seekToFirst();  // <=
    status();
  }
  
  ....
}
      
      





Die Körper der Methoden seekToFirst und seekToLast sind identisch. Darüber hinaus werden beide Methoden im Code verwendet.



Hier ist etwas unrein! Wenn Sie sich ansehen, über welche Methoden das Iteratorobjekt verfügt , wird deutlich, welchen Fehler der Analysator gefunden hat:



public class RocksIterator extends AbstractRocksIterator<RocksDB>
{
  ....
}

public abstract class AbstractRocksIterator<....> extends ....
{
  ....
  public void seekToFirst() // <=
  {
    assert this.isOwningHandle();
    this.seekToFirst0(this.nativeHandle_);
  }
  
  public void seekToLast() // <=
  {
    assert this.isOwningHandle();
    this.seekToLast0(this.nativeHandle_);
  }
  ....
}
      
      





Es stellt sich heraus, dass die Methode seekToLast- Klasse RocksIteratorWrapper mit der Copy-Paste-Methode seekToFirst derselben Klasse erstellt wurde. Doch aus irgendeinem Grund vergessen der Entwickler zu ersetzen , die Iterator ‚s seekToFirst Methode Aufruf mit seekToLast .



Verwechslung mit Formatzeichenfolgen



V6046 Falsches Format. Eine andere Anzahl von Formatelementen wird erwartet. Nicht verwendete Argumente: 1. UnsignedTypeConversionITCase.java (102)



public static void prepareMariaDB() throws IllegalStateException {
  ....
  if (!initDbSuccess) {
    throw new IllegalStateException(
      String.format(
        "Initialize MySQL database instance failed after {} attempts," + // <=
        " please open an issue.", INITIALIZE_DB_MAX_RETRY));
  }
}
      
      





Die Formatzeichenfolgen der String.format- Methode und der Java-Logger sind unterschiedlich. Im Gegensatz zur Formatzeichenfolge der String.format- Methode , bei der Argumentersetzungen mit dem Zeichen '%' angegeben werden, verwenden die Zeichenfolgen des Loggerformats stattdessen die Zeichenkombination '{}'. Aufgrund dieser Verwirrung trat dieser Fehler auf. Als Formatzeichenfolge wird eine Zeichenfolge an die String.format- Methode übergeben , die höchstwahrscheinlich von einer anderen Stelle kopiert wurde, an der sie in einem Logger verwendet wurde. Als Ergebnis wird der Wert des INITIALIZE_DB_MAX_RETRY wird Feldes nicht ersetzt wird in der Illegal Nachricht. Anstelle von '{}' weiß derjenige, der diese Ausnahme abfängt oder protokolliert, nie, wie viele Versuche unternommen wurden, eine Verbindung zur Datenbank herzustellen.



Abnormale Verteilung



V6048 Dieser Ausdruck kann vereinfacht werden. Der Operand 'index' in der Operation ist gleich 0. CollectionUtil.java (76)



public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets;                                 // <=
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))

           .add(element); 
  }

  return buckets.values();
}
      
      





Die Partitionsmethode teilt die Elemente aus der Elementsammlung in mehrere Segmente auf und gibt diese Segmente zurück. Aufgrund des vom Analysator angezeigten Fehlers tritt jedoch keine Trennung auf. Der Ausdruck, der zur Bestimmung des Segmentnummernindex % numBuckets verwendet wird, ist immer 0, da der Index immer 0 ist. Ich dachte ursprünglich, dass der Code für diese Methode überarbeitet wurde, weshalb sie vergessen haben, ein Inkrement der Indexvariablen in die for- Schleife einzufügen . Aber auf das Commit schauen Wo diese Methode hinzugefügt wurde, stellte sich heraus, dass dieser Fehler mit dieser Methode einherging. Korrigierte Version des Codes:



public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets; 
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))
           .add(element);
    index++;
  }

  return buckets.values();
}
      
      





Inkompatibler Typ



V6066 Der als Argument übergebene Objekttyp ist nicht mit dem Auflistungstyp kompatibel: String, ListStateDescriptor <NextTransactionalIdHint>. FlinkKafkaProducer.java (1083)



public interface OperatorStateStore 
{
  Set<String> getRegisteredStateNames();
}
public class FlinkKafkaProducer<IN> extends ....
{
  ....
  private static final 
  ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>
  NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = ....;

  @Override
  public void initializeState(FunctionInitializationContext context).... 
  {
    ....
    if (context.getOperatorStateStore()
               .getRegisteredStateNames()
               .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR))    // <=
    {
       migrateNextTransactionalIdHindState(context);
    }
    ....
  }
}
      
      





Der Ausdruck, auf den der Analysator zeigt, ist immer falsch. Dies bedeutet, dass der Aufruf der Methode migrateNextTransactionalIdHindState niemals erfolgt. Wie kam es, dass jemand eine Sammlung vom Typ Set <String> nach einem Element eines völlig anderen Typs durchsucht - ListStateDescriptor <FlinkKafkaProducer.NextTransactionalIdHint> ? Ohne die Hilfe des Analysators hätte ein solcher Fehler höchstwahrscheinlich sehr lange im Code gelebt, da er nicht ins Auge fällt und es ohne gründliche Überprüfung dieser Methode einfach unmöglich ist, ihn zu finden.



Nichtatomare Variablenänderung



V6074 Nichtatomare Modifikation der flüchtigen Variablen. Überprüfen Sie 'currentNumAcknowledgedSubtasks'. PendingCheckpointStats.java (131)



boolean reportSubtaskStats(JobVertexID jobVertexId, SubtaskStateStats subtask) {
  TaskStateStats taskStateStats = taskStats.get(jobVertexId);

  if (taskStateStats != null && taskStateStats.reportSubtaskStats(subtask)) {
    currentNumAcknowledgedSubtasks++;                // <=
    latestAcknowledgedSubtask = subtask;

    currentStateSize += subtask.getStateSize();      // <=

    long processedData = subtask.getProcessedData();
    if (processedData > 0) {
      currentProcessedData += processedData;         // <=
    }

    long persistedData = subtask.getPersistedData();
    if (persistedData > 0) {
      currentPersistedData += persistedData;         // <=
    }
    return true;
  } else {
    return false;
  }
}
      
      





Plus 3 weitere Warnungen des Analysators in derselben Methode:



  • V6074 Nichtatomare Modifikation der flüchtigen Variablen. Überprüfen Sie 'currentStateSize'. PendingCheckpointStats.java (134)
  • V6074 Nichtatomare Modifikation der flüchtigen Variablen. Überprüfen Sie 'currentProcessedData'. PendingCheckpointStats.java (138)
  • V6074 Nichtatomare Modifikation der flüchtigen Variablen. Überprüfen Sie 'currentPersistedData'. PendingCheckpointStats.java (143)


Der Analysator schlug vor, dass bis zu 4 flüchtige Felder in der Methode nichtatomar sind. Und der Analysator erweist sich wie immer als richtig, da die Operationen ++ und + = tatsächlich eine Folge mehrerer Lese-, Änderungs- und Schreiboperationen sind. Wie Sie wissen, ist der flüchtige Wert eines Feldes für alle Threads sichtbar, was bedeutet, dass einige der Feldänderungen aufgrund einer Racebedingung verloren gehen können. Weitere Informationen hierzu finden Sie in der Beschreibung der Diagnose.



Fazit



In Big Data-Projekten ist Zuverlässigkeit eine der Hauptanforderungen, daher muss die Qualität des darin enthaltenen Codes genau überwacht werden. Apache Flink-Entwickler wurden dabei von mehreren Tools unterstützt und haben auch eine beträchtliche Anzahl von Tests geschrieben. Selbst unter solchen Bedingungen konnte der PVS-Studio-Analysator jedoch Fehler finden. Es ist unmöglich, Fehler vollständig zu beseitigen, aber wenn Sie regelmäßig verschiedene statische Code-Analyse-Tools verwenden, können Sie diesem Ideal näher kommen. Ja, genau regelmäßig. Nur bei regelmäßiger Anwendung zeigt die statische Analyse ihre Wirksamkeit, die in diesem Artikel ausführlicher beschrieben wird .





Wenn Sie diesen Artikel einem englischsprachigen Publikum zugänglich machen möchten, verwenden Sie bitte den Übersetzungslink: Valery Komarov. Big / Bug-Daten: Analyse des Apache Flink-Quellcodes .



All Articles