StatWriteBuffer.java
package sk.iway.iwcm.stat;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Map.Entry;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import sk.iway.iwcm.DBPool;
import sk.iway.iwcm.Logger;
/**
* StatWriteBuffer.java
*
* Acts as a synchronized buffer for Stat* inserts. Made this way in order to avoid
* database congestion caused by every user's HTTP request filling the database.
*
*@Title webjet7
*@Company Interway s.r.o. (www.interway.sk)
*@Copyright Interway s.r.o. (c) 2001-2010
*@author $Author: marosurbanec $
*@version $Revision: 1.3 $
*@created Date: 6.7.2010 18:35:44
*@modified $Date: 2004/08/16 06:26:11 $
*/
public class StatWriteBuffer
{
//guards access to buffer
private static final Lock lock = new ReentrantLock();
private static Map<UpdateInsertSqlPair, List<Object[]>> buffer = new HashMap<>();
//needed to construct a table from SQLException
private static Map<UpdateInsertSqlPair, String> sqlToTable = new HashMap<>();
private static int pendingCounter = 0;
private static Random random = new Random();
/**
* Adds an SQL statement into a buffer queue.
*
* @param sql SQL statement pushed into prepareStatement allocation
* @param table Base table name used for table creation if SQL fails. For inserting into stat_error_2010_07, pass "stat_error" as a parameter
* @param parameters Pushed into prepareStatement setObject calls
*/
public static void add(String sql, String table, Object...parameters)
{
UpdateInsertSqlPair insertOnly = new UpdateInsertSqlPair(null, sql);
Logger.debug(StatWriteBuffer.class, String.format("Appending to buffer: %s ", sql));
appendToBuffer(insertOnly, table, parameters);
}
/**
* Serves as a facade for a re-occuring pattern in stat tables:
* <code>
* int rowsTouched = executeUpdate();
* //insert new values into database in case there are none
* if (rowsTouched == 0)
* executeInsert();
* </code>
*
* Acts in a same way as add method
*
* <b>NOTE: insert and update must share the same number of parameters for this method to work correctly</b>
*
* @param SEE add(String, String, Object...)
*/
public static void addUpdateInsertPair(String update, String insert, String table, Object...parameters)
{
UpdateInsertSqlPair sql = new UpdateInsertSqlPair(update, insert);
Logger.debug(StatWriteBuffer.class, String.format("Appending to buffer: %s ", sql.toString()));
appendToBuffer(sql, table, parameters);
}
private static void appendToBuffer(UpdateInsertSqlPair sql, String table, Object... parameters)
{
lock.lock();
try
{
List<Object[]> statements = buffer.get(sql);
if (statements == null)
statements = new ArrayList<>();
statements.add(parameters);
sqlToTable.put(sql, table);
buffer.put(sql, statements);
pendingCounter++;
}
finally
{
lock.unlock();
}
}
private static Map<UpdateInsertSqlPair, List<Object[]>> releaseBuffer()
{
lock.lock();
Map<UpdateInsertSqlPair, List<Object[]>> oldBuffer = buffer;
buffer = new HashMap<>();
pendingCounter = 0;
return oldBuffer;
}
public static int size()
{
return pendingCounter;
}
/**
* Sweeps the buffer and flushes its contents into database.
* Launched by a background cron task.
*/
public static void main(String[] args)
{
try{
//sk.iway.iwcm.Logger.debug(StatWriteBuffer.class, "About to flush statistics buffer");
int statements = flushBuffer();
if (statements > 0)
{
sk.iway.iwcm.Logger.debug(StatWriteBuffer.class, String.format("Flushing stats done: %d statements executed", statements));
}
}
catch (Exception e) {
sk.iway.iwcm.Logger.error(e);
}
}
/**
* Flushes statistics data to database, effectively re-using previously
* prepared SQL statements for better performance.
*
* @return number of SQL statements executed
*/
private static int flushBuffer()
{
//v clustri dochadza ku konfliktom ked sa zapis spusti z cronu naraz, spravime nahodny sleep
try
{
long rndSleep = random.nextInt(10000);
Logger.debug(StatWriteBuffer.class, "sleep for: "+rndSleep+" ms");
Thread.sleep(rndSleep);
}
catch (InterruptedException e){sk.iway.iwcm.Logger.error(e);}
int flushCounter = 0;
Map<UpdateInsertSqlPair, List<Object[]>> oldBuffer = releaseBuffer();
Map<UpdateInsertSqlPair, String> oldMapping = releaseSqlToTableMapping();
lock.unlock();
for (Entry<UpdateInsertSqlPair, List<Object[]>> entry : oldBuffer.entrySet())
{
//if the first attempt fails, the reason liess probably in a non-existing table
//In that case, the method attempts to create the table from received SQL Exception
boolean success = batchSave(entry.getKey(), entry.getValue(), oldMapping);
if (!success)
batchSave(entry.getKey(), entry.getValue(), oldMapping);
flushCounter += entry.getValue().size();
}
return flushCounter;
}
private static Map<UpdateInsertSqlPair, String> releaseSqlToTableMapping()
{
Map<UpdateInsertSqlPair, String> sqlToTableName = sqlToTable;
sqlToTable = new HashMap<>();
return sqlToTableName;
}
private static boolean batchSave(UpdateInsertSqlPair sql, List<Object[]> statements, Map<UpdateInsertSqlPair, String> oldMapping)
{
Logger.debug(StatWriteBuffer.class, String.format("About to flush: %s", sql.toString()));
Connection db_conn = null;
PreparedStatement ps = null;
PreparedStatement psFollowing = null;
try
{
db_conn = DBPool.getConnection();
boolean isPair = sql.firstSql != null;
ps = db_conn.prepareStatement(isPair ? sql.firstSql : sql.followingSql);
psFollowing = isPair ? db_conn.prepareStatement(sql.followingSql) :null;
for (Object[] params : statements)
{
setParams(params, ps);
int rowsTouched = ps.executeUpdate();
if (isPair && rowsTouched == 0)
{
setParams(params, psFollowing);
if (psFollowing!=null) psFollowing.execute();
}
}
ps.close();
ps = null;
db_conn.close();
db_conn = null;
return true;
}
catch (Exception ex)
{
boolean created = StatNewDB.createStatTablesFromError(ex.getMessage(), null, oldMapping.get(sql));
if (created==false) {
sk.iway.iwcm.Logger.error(ex);
}
return false;
}
finally{
try{
if (ps != null) ps.close();
if (db_conn != null) db_conn.close();
}
catch (Exception ex2){sk.iway.iwcm.Logger.error(ex2);}
}
}
private static void setParams(Object[] params, PreparedStatement ps) throws SQLException
{
int parameterIndex = 1;
for (Object param : params)
{
ps.setObject(parameterIndex++, param);
}
}
}