17 mayo, 2012

Tratamiento de ResultSet con multiples hilos

Aunque el título de la entrada no es el mas adecuado, la implementación de esta técnica permite procesar aparentemente un ResultSet con múltiples hilos.

Típicamente, en una aplicación que manipula una gran cantidad de datos con un ResultSet, tenemos el problema de que el tratamiento puede llegar a ser costoso en terminos de tiempo. Esto se debe a que el ResultSet no es thread-safe, es decir, no puede se accedido desde múltiples hilos a la vez.

Por lo tanto, un escenario como el que se ilustra abajo es técnicamente imposible:




Luego, se está obligado a utilizar un único hilo que acceda al ResultSet, lo que se traduce en un desperdicio de recursos y en un altísimo tiempo de espera en caso de contar con grandes volúmenes de información.

Una solución simple, es almacenar los datos en un objeto que actúe como buffer y que pued ser leído desde múltiples hilos. De este modo, se tiene por un lado a un hilo único accediendo al ResultSet y dejando los datos en nuestro buffer, y por otro lado a varios hilos que leen los datos de este buffer y los procesan en forma paralela.

Se llega a un escenario más cómodo, que permite el uso de múltiples hilos para el procesamiento de nuestros datos:



Para esto, se necesita un JavaBean que contenga los datos que son leídos desde el ResultSet, una cola para almacenar estos JavaBeans a medida que se van leyendo y una clase que extienda Thread (o implemente Runnable) que será la encargada del proceso. 

La secuencia sería más o menos así:
  • Obtener el ResultSet
  • En un hilo, leer el ResultSet, crear los Beans de Datos y llenar el buffer.
  • Lanzar uno o varios hilos que lean el buffer.
Adicionalmente se necesitan algunos mecanismos de control para:
  • Ganatizar que el Buffer no crezca descontroladamente.
  • Asegurar que los hilos de proceso se esten ejecutando hasta que se haya leido completamente el ResultSet.
  • No continuar hasta que todos los hilos de proceso hayan terminado (este paso puede ser opcional).

Existen muchas formas de implementar esta lógica. Aqui propongo una simple, que puede ser mejorada en gran medida.

Primero: Definimos una clase que actue como "Bean" de datos:

class Bean {
        
        private String dato1;
        private String dato2;
        private String dato3;

        public String getDato1() {
            return dato1;
        }

        public void setDato1(String dato1) {
            this.dato1 = dato1;
        }

        public String getDato2() {
            return dato2;
        }

        public void setDato2(String dato2) {
            this.dato2 = dato2;
        }

        public String getDato3() {
            return dato3;
        }

        public void setDato3(String dato3) {
            this.dato3 = dato3;
        }
        
    }

Luego, definimos una clase donde se hará el proceso:



class Proceso {
    
    private static final int MAXIMO_BUFFER = 10;
    private static final int MAXIMO_HILOS_PROCESO = 5;
    private ResultSet rs;
    private Thread[] hilosProceso;
    private volatile boolean leyendoRs;
    private LinkedBlockingQueue buffer;

    /**
     * Este metodo debe ser lanzado desde un hilo que no sea el 
     * event-dispatch-thread
     */
    public void procesar() throws SQLException {
        buffer = new LinkedBlockingQueue();
        hilosProceso = new Thread[MAXIMO_HILOS_PROCESO];
        rs = leerBaseDatos();
        leyendoRs = true;
        Runnable proceso = new Runnable() {

            @Override
            public void run() {
                while (true) {
                    Bean b = buffer.poll();
                    if (b == null) {
                        if (leyendoRs) {
                            try {
                                Thread.sleep(250);
                            } catch (InterruptedException ex) {
                                ex.printStackTrace();
                            }
                        } else {
                            break;
                        }
                    } else {
                        //Hacer algo con Bean b
                    }
                }
            }
        };
        for (int i = 0; i < MAXIMO_HILOS_PROCESO; i++) {
            hilosProceso[i] = new Thread(proceso, "Hilo Proceso " + i);
            hilosProceso[i].start();
        }
        
        while (rs.next()) {
            Bean b = new Bean();
            b.setDato1(rs.getString("dato1"));
            b.setDato2(rs.getString("dato2"));
            b.setDato3(rs.getString("dato3"));
            buffer.offer(b);
            if (buffer.size() >= MAXIMO_BUFFER) {
                try {
                    Thread.sleep(250);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
        }
        rs.close();
        leyendoRs = false;
        
        for (Thread hilo : hilosProceso) {
            if (hilo.isAlive()) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
        }
        
        
        /*
         * En este punto estamos completamente seguros de que 
         * todos los datos han sido procesados.
         * 
         */
    }

}

Como puede verse, la clase que procesa los datos es bastante simple. Utilizamos como buffer la clase LinkedBlockingQueue que permite un acceso concurrente a los metodos offer() y poll(). 

Obetenemos los datos con algún método a nuestro gusto, y luego levantamos un flag que indica que estamos leyendo los datos. 

Utilizamos la interfaz Runnable y definimos una pequeña clase anónima que procesa los datos. Esta clase está constantemente leyendo los datos desde el buffer. Si en algún momento no se han leído datos, pero el flag de lectura aún está a true, el hilo esperará 250 milisegundos y continuará su lectura. Esto permite que no se detengan los hilos aunque se produzca algun retraso en el llenado del buffer.

El bloque while que lee el RsultSet crea las instancias de nuestro bean de datos y las encola. Contiene un pequeño bloque de control, el cual verifica que si el tamaño del buffer supera un máximo establecido, se esperan 250 milisegundos antes de continuar. Esta tecnica me ha dado mejores resultados que definir el tamaño máximo de LinkedBlockingQueue (se me estaban perdiendo datos). Cuando termina el ciclo de lectura del ResultSet, el flag de lectura se lleva a false, lo que indica a los hilos de proceso que ya no habrá mas datos para procesar.

Finalmente, existe un pequeño bloque de control, el cual verifica que mientras existan hilos de proceso activos, se esperará. Por lo tanto en este punto la ejecución no continuará hasta que todos los hilos hayan finalizado.

El código es bastante rudimentario, pero es efectivo. Pueden hacerse varias mejoras en las secciones de control (sobre todo la del final), pero prefiero publicar este código ya que es simple de entender. Todas las mejoras y extensiones quedan a gusto del consumidor ;-)


bytes!




No hay comentarios.: