Mi solución a la Stratio Challenge

Hace unos meses participé en la Stratio Challenge, una prueba tipo concurso que hacía la gente de la empresa Stratio para atraer talento, con un premio de 500€ al equipo ganador.

A continuación pongo mi solución (o ideas de soluciones, porque hablo de varias posibilidades), que tuvo una mención especial junto a otras 4. En mi opinión es mucho más simple que la solución ganadora, aunque también probablemente mucho menos potente. 😛

Introducción

Esta solución utiliza InfluxDB. Lo primero, aclarar que yo no tengo ninguna experiencia con InfluxDB, pero sí que he trabajado con Graphite, que es una herramienta para guardar series temporales bastante madura, pero más orientada a monitorización de servidores.

La razón por la que creo que os convendría más usar InfluxDB que Grahpite es porque el query language de Graphite es bastante limitado, dificultando bastante los “groupby” que buscáis.

De la descripción del problema entiendo que una de las partes más importantes es poder guardar centenares de eventos de impresiones por segundo sin problemas. Según he leído en algunos textos, InfluxDB tiene la idea de soportar unas 20.000 escrituras por segundo en un sólo servidor. Independientemente de el número en sí, el hecho de ser una base de datos orientada a guardar series temporales y con énfasis en ser rápida haciendo esto ya indica que puede ser una buena opción. :)

Por otra parte, otro de los requerimientos básicos es poder hacer queries con group by eficientemente. InfluxDB permite hacer queries con un idioma sql-like que admite group by por columnas, pero tiene el problema de base que no indexa las columnas de los registros, por lo que tendría que leer toda la serie temporal para hacer el group by. No obstante, en la versión 0.9 se incorpora una feature llamada “tags” que permite hacer estas queries mucho más eficientemente y más sencillas, y por eso creo que la solución debería pasar por usar InfluxDB 0.9.

A continuación detallo la propuesta de modelo.

Modelo de datos

En InfluxDB 0.9 almacenaríamos tres métricas diferentes, con los siguientes campos:

  • prints: timestamp, id
  • clicks: timestamp, id, print_id
  • conversions: timestamp, id, click_id

Tener en cuenta que, al ser una base de datos orientada a series temporales, el camp “timestamp” viene de serie en todas las métricas que se guardan.

Estas tres métricas tendrían los siguientes tags asociados:

  • country
  • user_agent
  • campaign
  • advertirser
  • publisher

InfluxDB crea una serie para cada combinación de métrica y tags, de forma que cuando se quiere hacer una query con un “where” o un “group by” con estos campos no hace falta recorrer toda la serie temporal, sino que actúan como columnas indexadas.

Nótese que el modelo anterior está de-normalizado, ya que se podrían obtener los tags de cada click y conversión siguiendo las “foreign keys” conversion.click_id → click.id, click.print_id → print.id, pero esto sería bastante más lento ya que los campos de id no están indexados.

Consultas

Con este modelo se pueden hacer fácilmente las tres consultas que se plantean:

  • El número de impresiones, clicks y compras (conversion rates) en un período de tiempo determinado. select value from <metric> where time > now() – 3h
  • ¿Podríamos obtenerlos agrupados por país, user agent, campaña, advertiser o publisher (sólo por una de ellas)? select value from <metric> where time > now() – 3h group by <tag>
  • ¿Seríamos capaces de combinar diferentes variables (dos o más)? select value from <metric> where time > now() – 3h group by <tag-1>, <tag-2>

Si se quiere granularidad de una hora, también se puede añadir un “group by time(1h)” al final de cada una de estas queries. No obstante, por razones de performance, lo mejor sería hacer downsampling de las anteriores métricas mediante lo que en InfluxDB se llama una continuous query:

SELECT count(value) FROM /^(prints|clicks|conversions)$/ group by time(1h) INTO 1h.:series_name

Esto creará las métricas 1h.prints, 1h.clicks y 1h.conversions conservando los tags donde se pueden hacer queries con group by de forma muchísimo más eficiente. Además, en la última versión de InfluxDB estas queries son verdaderamente continuas:

The second part is that continuous queries will now actually run continuously. Previously, continuous queries were run at each time interval for the last period of time. This meant that if you had data collection that lagged behind or if you were loading historical data, it wouldn’t be included in the output of a continuous query. The 0.9.0 version will fix this.

Esto nos permitiría contar el número de clics de la hora en curso, sin necesidad de que haya pasado ya la hora.

Eventos

Ahora veamos cómo contabilizar los eventos.

Como frontend web se puede dejar la API HTTP que ya utilizáis actualmente. Se necesita un servidor+app que pueda manejar bastantes peticiones por segundo, para que no sea este el cuello de botella. Esta app se tiene que encargar únicamente de contabilizar los eventos, no hacer “informes”. Se me viene a la cabeza Node.js, en el que no he programado nunca, pero para esta tarea tan sencillita seguro que va genial.

La comunicación con InfluxDB se puede hacer directamente por HTTP, aunque hay varias librerías wrappers disponibles para varios lenguajes. Por eso lo que mola aquí es tener un servidor+app que al hacer las peticiones HTTP sea non-blocking, ya que la inmensísima mayoría del tiempo se perderá en establecer la conexión HTTP con InfluxDB. Por eso he dicho Node.js.

Cómo:

  • Grabar impresiones. El servidor recibe una petición POST en /prints con los datos de país, user_agent, campaign, advertiser y publisher y la envía a InfluxDB a la métrica “prints” con los tags adecuados. Trivial.
  • Grabar clicks. Se recibe POST en /clicks con el print.id asociado. La app hace una query a InfluxDB buscando impresiones en las últimas 6h con el id en cuestión. Aquí la gracia es que, como un clic sólo se considera válido hasta un máximo de 6 horas después de la impresión del ad, se puede restringir la query a ese periodo horario y, si no se encuentra la impresión (aunque se haya producido antes), no hacer nada. Si se encuentra la impresión, simplemente se registra un punto nuevo en la métrica “clicks” con los tags de la impresión.
  • Grabar compras. Igual que la anterior, pero ahora la búsqueda del id del clic está restringida a 30 días atrás. Como hay 1000 clics menos que impresiones, aunque haya que buscar clics hasta 60 días atrás es asumible. Y aunque se tardase bastante (varios segundos) es también perfectamente asumible porque no se esperan recibir apenas compras por segundo.

Respecto al tema temporal, InfluxDB permite definir políticas de retención por las cuales podríamos, por ejemplo, descartar todas las impresiones de hace más de 6h y los clics de hace más de 30 días. Por supuesto, manteniendo la query continua que almacena su volumen por hora hasta que nos apetezca.

Interfaz de reporting

Esto ya creo que lo podéis hacer con la tecnología que os dé la gana. Asumo que el número de queries de tipo “reporting” con group by son órdenes de magnitud menos que el número de clics. Entonces se trata simplemente de montar una app bonita para el cliente que permita seleccionar filtros, mostrar gráficos, etc., que ya lo tendréis montado.

Las queries a hacer están en la sección anterior. Añadir aquí que InfluxDB 0.9 tiene una API para hacer discovery de los valores de los tags usados:

SHOW TAG VALUES WITH KEY=<tag-key>

Donde <tag-key> puede ser:

  • country
  • user_agent
  • campaign
  • advertirser
  • publisher

Eficiencia

Respecto a esta solución, me gustaría puntualizar cuáles pienso que son las claves para que el sistema sea eficiente.

Las columnas sobre las que hacer groupby están “indexadas”, de forma que no hay que recorrer toda la serie temporal para buscar los puntos correspondientes a una cierta categoría (país, advertiser, etc.). Esto en InfluxDB (y otros) se consigue simplemente almacenando cada combinación en una serie diferente, ya que tener miles de series y mezclarlas condicionalmente sale más barato que hacer búsquedas condicionales en una única serie muy larga.

Las queries se pueden hacer sobre series agregadas de conteos, con granularidad más gruesa (1h). Esto evita tener que hacer agregaciones comunes una y otra vez, ya que se asume que se esperan granularidades más gruesas para datos más antiguos. En InfluxDB se hace mediante continuous queries para agregar los conteos por hora, pero si se quiere se pueden incorporar también agregados por día, semana, mes, etc. para vistas más eficientes de períodos más largos. Esto es una cosa que Graphite (Whisper) hacer muy sencillo, ya que selecciona automáticamente la granularidad en función del rango de tiempo que se pida.

Cuando se buscan impresiones a las que asignar un clic o clics a los que asociar compras, se hace en un rango limitado de tiempo, de forma que hay que explorar pocos ids. Si los ids de las impresiones y de los clics son autonuméricos ascendentes, hay una forma mucho más eficiente de hacerlo: simplemente coger el punto más antiguo en una query limitada a 6h (30 días) y, si su id es menor que la que referencia del clic (la compra), aceptar el clic (compra) y, si no, rechazarlo.

Estabilidad y alternativas

Es cierto que InfluxDB 0.9 aún no ha salido y probablemente estará verde al princpio, pero su modelo de datos es más adecuado que el de la versión 0.8 donde no hay tags, con lo cual hay que recurrir la viejo truco de usar nombres de series con una convención estilo prints.<country>.<user_agent>.<campaign>.<advertiser>.<publisher>, o guardando todo en “prints” y luego haciendo un fanout con una query continua. Es decir, es posible, pero no me parece tan elegante y por eso he utilizado la versión 0.9 en mi ejemplo.

A continuación enumero algunas posibles alternativas a la solución que propongo y las razones por las que he elegido InfluxDB en lugar de ellas.

Druid.io. A simple vista, la opción más avanzada y apropiada para tratar el problema de análisis de campañas de ads en tiempo semi-real (dashboard). Lo utilizan en sitios como eBay, Netflix, PayPal o Yahoo! con estos fines, y aquí podéis ver un vídeo sobre cómo lo utilizan en Criteo, una empresa líder en márqueting online. La variedad y potencia de las queries que puede hacer es mucho mayor que la de InfluxDB (aunque también más intrincadas, escritas en JSON). Además, tiene un modelo de ingestión y proceso de datos y clustering con nodos con varios roles para High Availability muy avanzado y documentado, a diferencia de InfluxDB, donde el sistema de clustering parece más rudimentario, simplemente para conseguir sharding y réplicas de los datos.

No obstante, la potencia de Druid.io tiene un precio: es muchísimo más complicado de entender, configurar adecuadamente y mantener. La documentación parece la de una nave espacial y tiene dependencias secundarias, como Apache Zookeeper y Kafka, constituyendo una pesadilla de multitud de piezas móviles, en mi opinión. InfluxDB es mucho más sencillo de entender y configurar. Quizá fuese interesante diseñar un prototipo con InfluxDB y, si se ve que se necesita hacer cosas más avanzadas (queries con agregados heurísticos, ingerir una ingente cantidad de datos por segundo en un sistema totalmente federado, etc.), emplear el tiempo que se necesita en aprender a usar y configurar correctamente Druid.io.

Prometheus.io. Menos orientado a analítica y más orientado a sistemas de monitorización tipo Graphite. Su ventaja respecto a InfluxDB es que está bastante más maduro, al menos mucho más maduro que InfluxDB 0.9. Está desarrollado por la gente de SoundCloud y usado por Docker y Boxever. El modelo de datos es muy similar al de InfluxDB 0.9 y tiene un mecanismo para agregar datos por unidad de tiempo (una hora en nuestro caso) efectivamente idéntico a las continuous queries de InfluxDB. Por otra parte, tiene como ventajas un lenguaje de queries más avanzado y documentado y un dashboard (PromDash) y sistema de alertas de serie.

Una diferencia fundamental de Prometheus.io a la hora de solucionar nuestro problema es que, dada su mayor orientación como sistema de monitorización, no permite almacenar y consultar de manera eficiente el id de ad o click junto a cada punto que se graba (que en realidad son sólo counts). Por tanto, para desarrollar la funcionalidad de aceptar clics o compras sólo hasta un cierto límite de tiempo, convendría usar un sistema secundario como una base de datos de clave-valor que permita especificar TTLs, como por ejemplo Redis. Así, al llegar cada ad se guardaría su id en Redis con un TTL de 6h y, al recibirse un clic asociado a un ad, simplemente se tendría que buscar su id en Redis, que sólo se encontraría si han pasado menos de 6h desde que se mostró el anuncio. El proceso es similar para las ventas.

Conclusiones

La solución que he propuesto usando InfluxDB 0.9 no es la más estable en cuanto a software utilizado o la más potente, pero sí es la más sencilla que he encontrado que satisface todos los requerimientos del problema que se propone.

La decisión por una u otra tecnología y arquitectura pasaría sin duda por invertir unos pocos días en probar las distintas opciones construyendo prototipos.

Leave a Reply

Your email address will not be published. Required fields are marked *