Database
 sql >> Base de Dados >  >> RDS >> Database

O que são fluxos sequenciais versus paralelos em Java?


Java pode paralelizar operações de fluxo para alavancar sistemas multinúcleo. Este artigo fornece uma perspectiva e mostra como o fluxo paralelo pode melhorar o desempenho com exemplos apropriados.

Fluxos em Java


Um fluxo em Java é uma sequência de objetos representados como um canal de dados. Geralmente tem uma fonte onde os dados estão situados e um destino onde é transmitido. Observe que um fluxo não é um repositório; em vez disso, ele opera em uma fonte de dados, como um array ou uma coleção. Os bits intermediários na passagem são, na verdade, chamados de fluxo. Durante o processo de transmissão, o fluxo geralmente passa por uma ou mais transformações possíveis, como filtragem ou classificação, ou pode ser qualquer outro processo operando nos dados. Isso personaliza os dados originais em um formato diferente, normalmente, de acordo com a necessidade do programador. Portanto, um novo fluxo é criado de acordo com a operação aplicada nele. Por exemplo, quando um fluxo é classificado, ele resulta em um novo fluxo que produz um resultado que é classificado. Isso significa que os novos dados são uma cópia transformada do original, em vez de estarem no formato original.

Stream sequencial


Qualquer operação de fluxo em Java, a menos que especificada explicitamente como paralela, é processada sequencialmente. Eles são basicamente fluxos não paralelos usados ​​em um único thread para processar seu pipeline. Os fluxos sequenciais nunca tiram proveito do sistema multicore, mesmo que o sistema subjacente possa suportar execução paralela. O que acontece, por exemplo, quando aplicamos multithreading para processar o stream? Mesmo assim, ele opera em um único núcleo por vez. No entanto, ele pode pular de um núcleo para outro, a menos que seja explicitamente fixado em um núcleo específico. Por exemplo, o processamento em quatro threads diferentes versus quatro núcleos diferentes é obviamente diferente onde o primeiro não é compatível com o último. É bem possível executar vários threads em um único ambiente de núcleo, mas o processamento paralelo é um gênero completamente diferente. Um programa precisa ser projetado desde o início para programação paralela, além de ser executado em um ambiente que o suporte. Esta é a razão pela qual a programação paralela é uma arena complexa.

Vamos tentar um exemplo para ilustrar melhor a ideia.
package org.mano.example;

import java.util.Arrays;
import java.util.List;

public class Main2 {
   public static oid main(String[] args) {
      List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9);
      list.stream().forEach(System.out::println);
      System.out.println();
      list.parallelStream().forEach(System.out::println);
   }
}

Saída

123456789
685973214

Este exemplo é uma ilustração de q fluxo sequencial, bem como q fluxo paralelo em operação. O list.stream() funciona em sequência em um único thread com o println() Operação. list.parallelStream() , por outro lado, é processado em paralelo, aproveitando ao máximo o ambiente multicore subjacente. O aspecto interessante está na saída do programa anterior. No caso de um fluxo sequencial, o conteúdo da lista é impresso em uma sequência ordenada. A saída do fluxo paralelo, por outro lado, não é ordenada e a sequência muda toda vez que o programa é executado. Isso significa pelo menos uma coisa:essa invocação do list.parallelStream() O método torna o println instrução operam em vários threads, algo que list.stream() faz em um único fio.

Transmissão paralela


A principal motivação por trás do uso de um fluxo paralelo é tornar o processamento de fluxo uma parte da programação paralela, mesmo que o programa inteiro não seja paralelizado. O fluxo paralelo aproveita os processadores multicore, resultando em um aumento substancial no desempenho. Ao contrário de qualquer programação paralela, eles são complexos e propensos a erros. No entanto, a biblioteca de fluxo Java oferece a capacidade de fazer isso de maneira fácil e confiável. O programa inteiro não pode ser paralelizado. mas pelo menos a parte que lida com o fluxo pode ser paralelizada. Eles são realmente bastante simples no sentido de que podemos invocar alguns métodos e o resto é resolvido. Existem algumas maneiras de fazê-lo. Uma dessas maneiras é obter um fluxo paralelo invocando o parallelStream() método definido por Coleção . Outra maneira é invocar o parallel() método definido por BaseStream em um fluxo sequencial. O fluxo sequencial é paralelizado pela invocação. Observe que a plataforma subjacente deve suportar programação paralela, como em um sistema multicore. Caso contrário, não há sentido na invocação. Nesse caso, o fluxo seria processado em sequência, mesmo que tivéssemos feito a invocação. Se a invocação for feita em um stream já paralelo, não faz nada e simplesmente retorna o stream.

Para garantir que o resultado do processamento paralelo aplicado no fluxo seja o mesmo obtido por meio do processamento sequencial, os fluxos paralelos devem ser sem estado, não interferentes e associativos.

Um exemplo rápido

package org.mano.example;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class Main {

   public static void main(String[] args) {
      List<Employee> employees = Arrays.asList(
         new Employee(1276, "FFF",2000.00),
         new Employee(7865, "AAA",1200.00),
         new Employee(4975, "DDD",3000.00),
         new Employee(4499, "CCC",1500.00),
         new Employee(9937, "GGG",2800.00),
         new Employee(5634, "HHH",1100.00),
         new Employee(9276, "BBB",3200.00),
         new Employee(6852, "EEE",3400.00));

      System.out.println("Original List");
      printList(employees);

      // Using sequential stream
      long start = System.currentTimeMillis();
      List<Employee> sortedItems = employees.stream()
         .sorted(Comparator
            .comparing(Employee::getName))
         .collect(Collectors.toList());
      long end = System.currentTimeMillis();

      System.out.println("sorted using sequential stream");
      printList(sortedItems);
      System.out.println("Total the time taken process :"
         + (end - start) + " milisec.");

      // Using parallel stream
      start = System.currentTimeMillis();
      List<Employee> anotherSortedItems = employees
         .parallelStream().sorted(Comparator
            .comparing(Employee::getName))
         .collect(Collectors.toList());
      end = System.currentTimeMillis();

      System.out.println("sorted using parallel stream");
      printList(anotherSortedItems);
      System.out.println("Total the time taken process :"
         + (end - start) + " milisec.");


      double totsal=employees.parallelStream()
         .map(e->e.getSalary())
         .reduce(0.00,(a1,a2)->a1+a2);
      System.out.println("Total Salary expense: "+totsal);
      Optional<Employee> maxSal=employees.parallelStream()
         .reduce((Employee e1, Employee e2)->
         e1.getSalary()<e2.getSalary()?e2:e1);
      if(maxSal.isPresent())
         System.out.println(maxSal.get().toString());
   }

   public static void printList(List<Employee> list) {
      for (Employee e : list)
         System.out.println(e.toString());
   }
}


package org.mano.example;

public class Employee {
   private int empid;
   private String name;
   private double salary;

   public Employee() {
      super();
   }

   public Employee(int empid, String name,
         double salary) {
      super();
      this.empid = empid;
      this.name = name;
      this.salary = salary;
   }

   public int getEmpid() {
      return empid;
   }

   public void setEmpid(int empid) {
      this.empid = empid;
   }

   public String getName() {
      return name;
   }

   public void setName(String name) {
      this.name = name;
   }

   public double getSalary() {
      return salary;
   }

   public void setSalary(double salary) {
      this.salary = salary;
   }

   @Override
   public String toString() {
      return "Employee [empid=" + empid + ", name="
         + name + ", salary=" + salary + "]";
   }
}

No código anterior, observe como aplicamos a classificação em um fluxo usando a execução sequencial.
List<Employee> sortedItems = employees.stream()
               .sorted(Comparator
               .comparing(Employee::getName))
               .collect(Collectors.toList());

e a execução paralela é alcançada alterando ligeiramente o código.
List<Employee> anotherSortedItems = employees
               .parallelStream().sorted(Comparator
               .comparing(Employee::getName))
               .collect(Collectors.toList());

Também vamos comparar o tempo do sistema para ter uma ideia de qual parte do código leva mais tempo. A operação paralela começa quando o fluxo paralelo é obtido explicitamente pelo parallelStream() método. Existe outro método interessante, chamado reduce() . Quando aplicamos esse método a um fluxo paralelo, a operação pode ocorrer em diferentes threads.

No entanto, sempre podemos alternar entre paralelo e sequencial conforme a necessidade. Se quisermos alterar o fluxo paralelo para sequencial, podemos fazê-lo invocando o sequential() método especificado por BaseStream . Como vimos em nosso primeiro programa, a operação realizada no stream pode ser ordenada ou não ordenada de acordo com a ordem dos elementos. Isso significa que a ordem depende da fonte de dados. Esta, no entanto, não é a situação no caso de fluxos paralelos. Para aumentar o desempenho, eles são processados ​​em paralelo. Como isso é feito sem nenhuma sequência, onde cada partição do fluxo é processada independentemente das outras partições sem qualquer coordenação, a consequência é imprevisivelmente desordenada. Mas, se quisermos executar especificamente uma operação em cada elemento no fluxo paralelo a ser ordenado, podemos considerar o método forEachOrdered() método, que é uma alternativa ao método forEach() método.

Conclusão


As APIs de fluxo fazem parte do Java há muito tempo, mas adicionar o ajuste do processamento paralelo é muito acolhedor e, ao mesmo tempo, um recurso bastante intrigante. Isso é particularmente verdadeiro porque as máquinas modernas são multicore e há um estigma de que o projeto de programação paralela é complexo. As APIs fornecidas pelo Java fornecem a capacidade de incorporar um toque de ajustes de programação paralela em um programa Java que tem o design geral de execução sequencial. Esta é talvez a melhor parte deste recurso.