/*
* Copyright (C) 2008 Search Solution Corporation. All rights reserved by Search Solution.
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* - Neither the name of the <ORGANIZATION> nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
* OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
* OF SUCH DAMAGE.
*
*/
/*
* cci_handle_mng.c -
*/
#ident "$Id$"
/************************************************************************
* IMPORTED SYSTEM HEADER FILES *
************************************************************************/
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#if defined(WINDOWS)
#include <winsock2.h>
#include <windows.h>
#else
#include <netdb.h>
#include <pthread.h>
#endif
/************************************************************************
* OTHER IMPORTED HEADER FILES *
************************************************************************/
#include "cci_common.h"
#include "cci_handle_mng.h"
#include "cas_cci.h"
#include "cci_util.h"
#include "cci_query_execute.h"
#include "cas_protocol.h"
#include "cci_network.h"
#include "cci_map.h"
/************************************************************************
* PRIVATE DEFINITIONS *
************************************************************************/
#define MAX_CON_HANDLE 1024
#define REQ_HANDLE_ALLOC_SIZE 256
#define CCI_MAX_CONNECTION_POOL 256
/************************************************************************
* PRIVATE TYPE DEFINITIONS *
************************************************************************/
typedef struct
{
T_ALTER_HOST host; /* host info (ip, port) */
bool is_reachable;
} T_HOST_STATUS;
static T_HOST_STATUS host_status[MAX_CON_HANDLE];
static int host_status_count = 0;
#if defined(WINDOWS)
HANDLE host_status_mutex;
#else
T_MUTEX host_status_mutex = PTHREAD_MUTEX_INITIALIZER;
#endif
static int conn_pool[CCI_MAX_CONNECTION_POOL];
static unsigned int num_conn_pool = 0;
/************************************************************************
* PRIVATE FUNCTION PROTOTYPES *
************************************************************************/
static int compare_conn_info (unsigned char *ip_addr, int port, char *dbname,
char *dbuser, char *dbpasswd,
T_CON_HANDLE * con_handle);
static int init_con_handle (T_CON_HANDLE * con_handle, char *ip_str, int port,
char *db_name, char *db_user, char *db_passwd);
static int new_con_handle_id (void);
static int new_req_handle_id (T_CON_HANDLE * con_handle);
static void con_handle_content_free (T_CON_HANDLE * con_handle);
static void ipstr2uchar (char *ip_str, unsigned char *ip_addr);
static int is_ip_str (char *ip_str);
static int hostname2uchar (char *hostname, unsigned char *ip_addr);
static int hm_find_host_status_index (unsigned char *ip_addr, int port);
static void hm_set_host_status_by_addr (unsigned char *ip_addr, int port,
bool is_reachable);
static THREAD_RET_T THREAD_CALLING_CONVENTION hm_thread_health_checker (void
*arg);
/************************************************************************
* INTERFACE VARIABLES *
************************************************************************/
/************************************************************************
* PUBLIC VARIABLES *
************************************************************************/
static T_CON_HANDLE *con_handle_table[MAX_CON_HANDLE];
static int con_handle_current_index;
/************************************************************************
* PRIVATE VARIABLES *
************************************************************************/
/************************************************************************
* IMPLEMENTATION OF PUBLIC FUNCTIONS *
************************************************************************/
static int
compare_conn_info (unsigned char *ip_addr, int port, char *dbname,
char *dbuser, char *dbpasswd, T_CON_HANDLE * con_handle)
{
if (memcmp (ip_addr, con_handle->ip_addr, 4) != 0 ||
port != con_handle->port ||
strcmp (dbname, con_handle->db_name) != 0 ||
strcmp (dbuser, con_handle->db_user) != 0 ||
strcmp (dbpasswd, con_handle->db_passwd) != 0)
{
return 0;
}
return 1;
}
T_CON_HANDLE *
hm_get_con_from_pool (unsigned char *ip_addr, int port, char *dbname,
char *dbuser, char *dbpasswd)
{
int con = -1;
unsigned int i;
for (i = 0; i < num_conn_pool; i++)
{
if (compare_conn_info (ip_addr, port, dbname, dbuser, dbpasswd,
con_handle_table[conn_pool[i] - 1]))
{
con = conn_pool[i];
conn_pool[i] = conn_pool[--num_conn_pool];
break;
}
}
if (con < 0)
{
return NULL;
}
return con_handle_table[con - 1];
}
int
hm_put_con_to_pool (int con)
{
if (num_conn_pool >= sizeof (conn_pool) / sizeof (int))
{
return -1;
}
conn_pool[num_conn_pool++] = con;
return 0;
}
int
hm_ip_str_to_addr (char *ip_str, unsigned char *ip_addr)
{
if (is_ip_str (ip_str))
{
ipstr2uchar (ip_str, ip_addr);
}
else
{
if (hostname2uchar (ip_str, ip_addr) < 0)
{
return CCI_ER_HOSTNAME;
}
}
return 0;
}
void
hm_con_handle_table_init ()
{
int i;
for (i = 0; i < MAX_CON_HANDLE; i++)
{
con_handle_table[i] = NULL;
}
con_handle_current_index = 0;
}
T_CON_HANDLE *
hm_con_handle_alloc (char *ip_str, int port, char *db_name, char *db_user,
char *db_passwd)
{
int handle_id;
int error = 0;
T_CON_HANDLE *con_handle = NULL;
handle_id = new_con_handle_id ();
if (handle_id <= 0)
goto error_end;
con_handle = (T_CON_HANDLE *) MALLOC (sizeof (T_CON_HANDLE));
if (con_handle == NULL)
{
goto error_end;
}
error = init_con_handle (con_handle, ip_str, port, db_name, db_user,
db_passwd);
if (error < 0)
{
goto error_end;
}
con_handle_table[handle_id - 1] = con_handle;
con_handle->id = handle_id;
return con_handle;
error_end:
FREE_MEM (con_handle);
return NULL;
}
int
hm_con_handle_free (T_CON_HANDLE * con_handle)
{
if (con_handle == NULL)
{
return CCI_ER_CON_HANDLE;
}
con_handle_table[con_handle->id - 1] = NULL;
if (!IS_INVALID_SOCKET (con_handle->sock_fd))
{
CLOSE_SOCKET (con_handle->sock_fd);
con_handle->sock_fd = INVALID_SOCKET;
}
hm_req_handle_free_all (con_handle);
con_handle_content_free (con_handle);
FREE_MEM (con_handle);
return CCI_ER_NO_ERROR;
}
static int
hm_pool_add_node_to_list (T_REQ_HANDLE ** head, T_REQ_HANDLE ** tail,
T_REQ_HANDLE * target)
{
target->next = NULL;
target->prev = *tail;
if (*tail == NULL)
{
*head = target;
}
else
{
(*tail)->next = target;
}
*tail = target;
return CCI_ER_NO_ERROR;
}
static int
hm_pool_drop_node_from_list (T_REQ_HANDLE ** head, T_REQ_HANDLE ** tail,
T_REQ_HANDLE * target)
{
T_REQ_HANDLE *prev_target, *next_target;
assert (*head != NULL && *tail != NULL);
prev_target = target->prev;
next_target = target->next;
if (prev_target != NULL)
{
prev_target->next = next_target;
}
if (next_target != NULL)
{
next_target->prev = prev_target;
}
if (target == *head)
{
*head = next_target;
}
if (target == *tail)
{
*tail = prev_target;
}
return CCI_ER_NO_ERROR;
}
static int
hm_pool_move_node_from_use_to_lru (T_CON_HANDLE * connection,
int statement_id)
{
T_REQ_HANDLE *target;
statement_id = statement_id % CON_HANDLE_ID_FACTOR;
target = connection->req_handle_table[statement_id - 1];
assert (target != NULL);
/* cut from use */
hm_pool_drop_node_from_list (&connection->pool_use_head,
&connection->pool_use_tail, target);
/* add to lru */
hm_pool_add_node_to_list (&connection->pool_lru_head,
&connection->pool_lru_tail, target);
connection->open_prepared_statement_count++;
return CCI_ER_NO_ERROR;
}
static int
hm_pool_move_node_from_lru_to_use (T_CON_HANDLE * connection,
int statement_id)
{
T_REQ_HANDLE *target;
statement_id = statement_id % CON_HANDLE_ID_FACTOR;
target = connection->req_handle_table[statement_id - 1];
assert (target != NULL);
/* cut from lru */
hm_pool_drop_node_from_list (&connection->pool_lru_head,
&connection->pool_lru_tail, target);
connection->open_prepared_statement_count--;
/* add to use */
hm_pool_add_node_to_list (&connection->pool_use_head,
&connection->pool_use_tail, target);
return CCI_ER_NO_ERROR;
}
static T_REQ_HANDLE *
hm_pool_victimize_last_node_from_lru (T_CON_HANDLE * connection)
{
T_REQ_HANDLE *victim;
if (connection->pool_lru_head == NULL)
{
return NULL;
}
victim = connection->pool_lru_head;
hm_pool_drop_node_from_list (&connection->pool_lru_head,
&connection->pool_lru_tail, victim);
connection->open_prepared_statement_count--;
return victim;
}
int
hm_pool_restore_used_statements (T_CON_HANDLE * connection)
{
T_REQ_HANDLE *r;
r = connection->pool_use_head;
while (r != NULL)
{
int statement = r->req_handle_index;
req_handle_content_free_for_pool (r);
r = r->next;
hm_pool_move_node_from_use_to_lru (connection, statement);
}
connection->pool_use_head = NULL;
connection->pool_use_tail = NULL;
return CCI_ER_NO_ERROR;
}
int
hm_pool_add_statement_to_use (T_CON_HANDLE * connection, int statement_id)
{
T_REQ_HANDLE *statement;
statement_id = statement_id % CON_HANDLE_ID_FACTOR;
statement = connection->req_handle_table[statement_id - 1];
assert (statement != NULL);
hm_pool_add_node_to_list (&connection->pool_use_head,
&connection->pool_use_tail, statement);
return CCI_ER_NO_ERROR;
}
int
hm_req_handle_alloc (T_CON_HANDLE * con_handle,
T_REQ_HANDLE ** ret_req_handle)
{
int req_handle_id;
T_REQ_HANDLE *req_handle = NULL;
*ret_req_handle = NULL;
req_handle_id = new_req_handle_id (con_handle);
if (req_handle_id < 0)
{
return (req_handle_id);
}
req_handle = (T_REQ_HANDLE *) MALLOC (sizeof (T_REQ_HANDLE));
if (req_handle == NULL)
{
return CCI_ER_NO_MORE_MEMORY;
}
memset (req_handle, 0, sizeof (T_REQ_HANDLE));
req_handle->req_handle_index = req_handle_id;
req_handle->mapped_stmt_id = -1;
req_handle->fetch_size = 100;
req_handle->query_timeout = con_handle->query_timeout;
req_handle->shard_id = CCI_SHARD_ID_INVALID;
req_handle->is_fetch_completed = 0;
con_handle->req_handle_table[req_handle_id - 1] = req_handle;
++(con_handle->req_handle_count);
*ret_req_handle = req_handle;
return MAKE_REQ_ID (con_handle->id, req_handle_id);
}
int
hm_req_add_to_pool (T_CON_HANDLE * con, char *sql, int mapped_statement_id,
T_REQ_HANDLE * statement)
{
char *key;
int *data;
int error;
data = mht_get (con->stmt_pool, sql);
if (data != NULL)
{
hm_pool_drop_node_from_list (&con->pool_use_head, &con->pool_use_tail,
statement);
return CCI_ER_REQ_HANDLE;
}
if (HAS_REACHED_LIMIT_OPEN_STATEMENT (con))
{
T_REQ_HANDLE *victim = hm_pool_victimize_last_node_from_lru (con);
if (victim->handle_type == HANDLE_PREPARE
|| victim->handle_type == HANDLE_SCHEMA_INFO)
{
/* because the statement will be terminated by restarting cas
* all errors of qe_close_req_handle() are ignored
*/
qe_close_req_handle (victim, con);
}
mht_rem (con->stmt_pool, victim->sql_text, true, true);
hm_req_handle_free (con, victim);
}
key = strdup (sql);
if (key == NULL)
{
return CCI_ER_NO_MORE_MEMORY;
}
data = MALLOC (sizeof (int));
if (data == NULL)
{
FREE (key);
return CCI_ER_NO_MORE_MEMORY;
}
map_get_ots_value (mapped_statement_id, data, true);
if (!mht_put_data (con->stmt_pool, key, data))
{
FREE (key);
FREE (data);
return CCI_ER_NO_MORE_MEMORY;
}
hm_pool_move_node_from_use_to_lru (con, *data);
return CCI_ER_NO_ERROR;
}
int
hm_req_get_from_pool (T_CON_HANDLE * con, T_REQ_HANDLE ** req, char *sql)
{
int req_id, error;
void *data;
data = mht_rem (con->stmt_pool, sql, true, false);
if (data == NULL)
{
return CCI_ER_REQ_HANDLE;
}
req_id = *((int *) data);
FREE_MEM (data);
hm_pool_move_node_from_lru_to_use (con, req_id);
if (req != NULL)
{
*req = con->req_handle_table[GET_REQ_ID (req_id) - 1];
}
return req_id;
}
T_CCI_ERROR_CODE
hm_get_connection_by_resolved_id (int resolved_id, T_CON_HANDLE ** connection)
{
if (connection == NULL || con_handle_table[resolved_id - 1] == NULL)
{
return CCI_ER_CON_HANDLE;
}
*connection = con_handle_table[resolved_id - 1];
assert (*connection != NULL);
return CCI_ER_NO_ERROR;
}
static T_CCI_ERROR_CODE
hm_get_connection_internal (int mapped_id, T_CON_HANDLE ** connection,
bool force)
{
T_CCI_ERROR_CODE error;
int connection_id;
if (connection == NULL)
{
return CCI_ER_CON_HANDLE;
}
*connection = NULL;
error = map_get_otc_value (mapped_id, &connection_id, force);
if (error != CCI_ER_NO_ERROR || connection_id < 1
|| connection_id > MAX_CON_HANDLE)
{
return CCI_ER_CON_HANDLE;
}
*connection = con_handle_table[connection_id - 1];
if (*connection == NULL)
{
return CCI_ER_CON_HANDLE;
}
return CCI_ER_NO_ERROR;
}
T_CCI_ERROR_CODE
hm_get_connection_force (int mapped_id, T_CON_HANDLE ** connection)
{
return hm_get_connection_internal (mapped_id, connection, true);
}
T_CCI_ERROR_CODE
hm_get_connection (int mapped_id, T_CON_HANDLE ** connection)
{
return hm_get_connection_internal (mapped_id, connection, false);
}
T_CCI_ERROR_CODE
hm_get_statement (int mapped_id, T_CON_HANDLE ** connection,
T_REQ_HANDLE ** statement)
{
int connection_id;
int statement_id;
T_CON_HANDLE *conn;
if (connection != NULL)
{
*connection = NULL;
}
if (statement == NULL)
{
return CCI_ER_REQ_HANDLE;
}
*statement = NULL;
if (map_get_ots_value (mapped_id, &statement_id, false) != CCI_ER_NO_ERROR)
{
return CCI_ER_REQ_HANDLE;
}
connection_id = GET_CON_ID (statement_id);
statement_id = GET_REQ_ID (statement_id);
if (connection_id < 1 || statement_id < 1)
{
return CCI_ER_REQ_HANDLE;
}
conn = con_handle_table[connection_id - 1];
if (conn == NULL)
{
return CCI_ER_REQ_HANDLE;
}
if (statement_id > conn->max_req_handle)
{
return CCI_ER_REQ_HANDLE;
}
*statement = conn->req_handle_table[statement_id - 1];
if (connection != NULL)
{
*connection = conn;
}
assert (*statement != NULL);
return CCI_ER_NO_ERROR;
}
static T_CCI_ERROR_CODE
hm_release_connection_internal (int mapped_id,
T_CON_HANDLE ** connection,
bool delete_handle)
{
T_CCI_ERROR_CODE error;
error = map_close_otc (mapped_id);
if (error != CCI_ER_NO_ERROR)
{
return error;
}
if (delete_handle)
{
error = hm_con_handle_free (*connection);
}
*connection = NULL;
return error;
}
T_CCI_ERROR_CODE
hm_release_connection (int mapped_id, T_CON_HANDLE ** connection)
{
return hm_release_connection_internal (mapped_id, connection, false);
}
T_CCI_ERROR_CODE
hm_delete_connection (int mapped_id, T_CON_HANDLE ** connection)
{
return hm_release_connection_internal (mapped_id, connection, true);
}
T_CCI_ERROR_CODE
hm_release_statement (int mapped_id, T_CON_HANDLE ** connection,
T_REQ_HANDLE ** statement)
{
T_CCI_ERROR_CODE error;
error = map_close_ots (mapped_id);
if (error != CCI_ER_NO_ERROR)
{
return error;
}
*connection = NULL;
*statement = NULL;
return error;
}
void
hm_req_handle_free (T_CON_HANDLE * con_handle, T_REQ_HANDLE * req_handle)
{
con_handle->req_handle_table[req_handle->req_handle_index - 1] = NULL;
--(con_handle->req_handle_count);
req_handle_content_free (req_handle, 0);
FREE_MEM (req_handle);
}
void
hm_req_handle_free_all (T_CON_HANDLE * con_handle)
{
int i;
T_REQ_HANDLE *req_handle = NULL;
for (i = 0; i < con_handle->max_req_handle; i++)
{
req_handle = con_handle->req_handle_table[i];
if (req_handle == NULL)
continue;
req_handle_content_free (req_handle, 0);
FREE_MEM (req_handle);
con_handle->req_handle_table[i] = NULL;
--(con_handle->req_handle_count);
}
}
void
hm_req_handle_fetch_buf_free (T_REQ_HANDLE * req_handle)
{
int i, j, fetched_tuple;
if (req_handle->tuple_value)
{
fetched_tuple = req_handle->fetched_tuple_end -
req_handle->fetched_tuple_begin + 1;
for (i = 0; i < fetched_tuple; i++)
{
#if defined(WINDOWS)
for (j = 0; j < req_handle->num_col_info; j++)
{
FREE_MEM (req_handle->tuple_value[i].decoded_ptr[j]);
}
FREE_MEM (req_handle->tuple_value[i].decoded_ptr);
#endif
FREE_MEM (req_handle->tuple_value[i].column_ptr);
}
FREE_MEM (req_handle->tuple_value);
}
FREE_MEM (req_handle->msg_buf);
req_handle->fetched_tuple_begin = req_handle->fetched_tuple_end = 0;
req_handle->cur_fetch_tuple_index = -1;
req_handle->is_fetch_completed = 0;
}
int
hm_conv_value_buf_alloc (T_VALUE_BUF * val_buf, int size)
{
if (size <= val_buf->size)
return 0;
FREE_MEM (val_buf->data);
val_buf->size = 0;
val_buf->data = MALLOC (size);
if (val_buf->data == NULL)
return CCI_ER_NO_MORE_MEMORY;
val_buf->size = size;
return 0;
}
void
hm_invalidate_all_req_handle (T_CON_HANDLE * con_handle)
{
int i;
int count = 0;
T_REQ_HANDLE *curr_req_handle;
for (i = 0; i < con_handle->max_req_handle; ++i)
{
if (count == con_handle->req_handle_count)
{
break;
}
curr_req_handle = con_handle->req_handle_table[i];
if (curr_req_handle == NULL)
{
continue;
}
curr_req_handle->valid = 0;
curr_req_handle->shard_id = CCI_SHARD_ID_INVALID;
++count;
}
}
void
hm_conv_value_buf_clear (T_VALUE_BUF * val_buf)
{
FREE_MEM (val_buf->data);
val_buf->size = 0;
}
void
req_handle_col_info_free (T_REQ_HANDLE * req_handle)
{
int i;
if (req_handle->col_info)
{
for (i = 0; i < req_handle->num_col_info; i++)
{
FREE_MEM (req_handle->col_info[i].col_name);
FREE_MEM (req_handle->col_info[i].real_attr);
FREE_MEM (req_handle->col_info[i].class_name);
FREE_MEM (req_handle->col_info[i].default_value);
}
FREE_MEM (req_handle->col_info);
}
}
void
req_handle_content_free (T_REQ_HANDLE * req_handle, int reuse)
{
/*
For reusing invalidated req handle,
sql_text and prepare flag of req handle are needed.
So, they must not be freed.
*/
req_close_query_result (req_handle);
req_handle_col_info_free (req_handle);
req_handle->valid = 0;
req_handle->shard_id = CCI_SHARD_ID_INVALID;
req_handle->is_fetch_completed = 0;
if (!reuse)
{
FREE_MEM (req_handle->sql_text);
qe_bind_value_free (req_handle->num_bind, req_handle->bind_value);
FREE_MEM (req_handle->bind_mode);
FREE_MEM (req_handle->bind_value);
}
}
void
req_handle_content_free_for_pool (T_REQ_HANDLE * req_handle)
{
req_close_query_result (req_handle);
qe_bind_value_free (req_handle->num_bind, req_handle->bind_value);
}
int
req_close_query_result (T_REQ_HANDLE * req_handle)
{
assert (req_handle != NULL);
hm_req_handle_fetch_buf_free (req_handle);
hm_conv_value_buf_clear (&(req_handle->conv_value_buffer));
if (req_handle->num_query_res == 0 || req_handle->qr == NULL)
{
assert (req_handle->num_query_res == 0 && req_handle->qr == NULL);
return CCI_ER_RESULT_SET_CLOSED;
}
QUERY_RESULT_FREE (req_handle);
return CCI_ER_NO_ERROR;
}
static int
hm_find_host_status_index (unsigned char *ip_addr, int port)
{
int i, index = -1;
for (i = 0; i < host_status_count; i++)
{
if (memcmp (host_status[i].host.ip_addr, ip_addr, 4) == 0
&& host_status[i].host.port == port)
{
index = i;
break;
}
}
return index;
}
#if defined (ENABLE_UNUSED_FUNCTION)
int
hm_get_ha_connected_host (T_CON_HANDLE * con_handle)
{
int i, cur_host_id = -1;
MUTEX_LOCK (ha_status_mutex);
i = hm_find_ha_status_index (con_handle);
if (i >= 0)
{
cur_host_id = ha_status[i].cur_host_id;
}
MUTEX_UNLOCK (ha_status_mutex);
return cur_host_id;
}
#endif
bool
hm_is_host_reachable (T_CON_HANDLE * con_handle, int host_id)
{
int i;
unsigned char *ip_addr = con_handle->alter_hosts[host_id].ip_addr;
int port = con_handle->alter_hosts[host_id].port;
bool is_reachable = true;
i = hm_find_host_status_index (ip_addr, port);
if (i >= 0)
{
is_reachable = host_status[i].is_reachable;
}
return is_reachable;
}
void
hm_set_host_status (T_CON_HANDLE * con_handle, int host_id, bool is_reachable)
{
unsigned char *ip_addr = con_handle->alter_hosts[host_id].ip_addr;
int port = con_handle->alter_hosts[host_id].port;
hm_set_host_status_by_addr (ip_addr, port, is_reachable);
if (!is_reachable)
{
con_handle->last_failure_time = time (NULL);
}
}
T_BROKER_VERSION
hm_get_broker_version (T_CON_HANDLE * con_handle)
{
T_BROKER_VERSION version = 0;
if (con_handle->broker_info[BROKER_INFO_PROTO_VERSION]
& CAS_PROTO_INDICATOR)
{
version =
CAS_PROTO_UNPACK_NET_VER (con_handle->
broker_info[BROKER_INFO_PROTO_VERSION]);
}
else
{
version =
CAS_MAKE_VER (con_handle->broker_info[BROKER_INFO_MAJOR_VERSION],
con_handle->broker_info[BROKER_INFO_MINOR_VERSION],
con_handle->broker_info[BROKER_INFO_PATCH_VERSION]);
}
return version;
}
bool
hm_broker_understand_the_protocol (T_BROKER_VERSION broker_version, int require)
{
if (broker_version >= CAS_PROTO_MAKE_VER (require))
{
return true;
}
else
{
return false;
}
}
bool
hm_broker_match_the_protocol (T_BROKER_VERSION broker_version, int require)
{
if (broker_version == CAS_PROTO_MAKE_VER (require))
{
return true;
}
else
{
return false;
}
}
bool
hm_broker_reconnect_when_server_down (T_CON_HANDLE * con_handle)
{
char f = con_handle->broker_info[BROKER_INFO_FUNCTION_FLAG];
return (f & BROKER_RECONNECT_WHEN_SERVER_DOWN) ==
BROKER_RECONNECT_WHEN_SERVER_DOWN;
}
void
hm_check_rc_time (T_CON_HANDLE * con_handle)
{
time_t cur_time, failure_time;
if (IS_INVALID_SOCKET (con_handle->sock_fd))
{
return;
}
if (con_handle->alter_host_id > 0 && con_handle->rc_time > 0)
{
cur_time = time (NULL);
failure_time = con_handle->last_failure_time;
if (failure_time > 0 && con_handle->rc_time < (cur_time - failure_time))
{
if (hm_is_host_reachable (con_handle, 0))
{
con_handle->force_failback = true;
con_handle->last_failure_time = 0;
}
}
}
}
void
hm_create_health_check_th (void)
{
int rv;
pthread_attr_t thread_attr;
pthread_t health_check_th;
#if !defined(WINDOWS)
rv = pthread_attr_init (&thread_attr);
rv = pthread_attr_setdetachstate (&thread_attr, PTHREAD_CREATE_DETACHED);
rv = pthread_attr_setscope (&thread_attr, PTHREAD_SCOPE_SYSTEM);
#endif /* WINDOWS */
rv =
pthread_create (&health_check_th, &thread_attr, hm_thread_health_checker,
(void *) NULL);
}
/************************************************************************
* IMPLEMENTATION OF PRIVATE FUNCTIONS *
************************************************************************/
bool
hm_is_empty_session (T_CCI_SESSION_ID * id)
{
size_t i;
for (i = 0; i < DRIVER_SESSION_SIZE; i++)
{
if (id->id[i] != 0)
{
return false;
}
}
return true;
}
void
hm_make_empty_session (T_CCI_SESSION_ID * id)
{
memset (id->id, 0, DRIVER_SESSION_SIZE);
}
static int
init_con_handle (T_CON_HANDLE * con_handle, char *ip_str, int port,
char *db_name, char *db_user, char *db_passwd)
{
unsigned char ip_addr[4];
if (is_ip_str (ip_str))
{
ipstr2uchar (ip_str, ip_addr);
}
else
{
if (hostname2uchar (ip_str, ip_addr) < 0)
return CCI_ER_HOSTNAME;
}
memset (con_handle, 0, sizeof (T_CON_HANDLE));
memcpy (con_handle->ip_addr, ip_addr, 4);
con_handle->port = port;
ALLOC_COPY (con_handle->db_name, db_name);
ALLOC_COPY (con_handle->db_user, db_user);
ALLOC_COPY (con_handle->db_passwd, db_passwd);
snprintf (con_handle->url, SRV_CON_URL_SIZE,
"cci:cubrid:%d.%d.%d.%d:%d:%s:%s:********:",
ip_addr[0], ip_addr[1], ip_addr[2], ip_addr[3], port,
(db_name ? db_name : ""), (db_user ? db_user : ""));
con_handle->sock_fd = -1;
con_handle->isolation_level = TRAN_UNKNOWN_ISOLATION;
con_handle->lock_timeout = CCI_LOCK_TIMEOUT_DEFAULT;
con_handle->is_retry = 0;
con_handle->used = false;
con_handle->con_status = CCI_CON_STATUS_OUT_TRAN;
con_handle->autocommit_mode = CCI_AUTOCOMMIT_TRUE;
hm_make_empty_session (&con_handle->session_id);
con_handle->max_req_handle = REQ_HANDLE_ALLOC_SIZE;
con_handle->req_handle_table = (T_REQ_HANDLE **)
MALLOC (sizeof (T_REQ_HANDLE *) * con_handle->max_req_handle);
if (con_handle->req_handle_table == NULL)
{
FREE_MEM (con_handle->db_name);
FREE_MEM (con_handle->db_user);
FREE_MEM (con_handle->db_passwd);
memset (con_handle, 0, sizeof (T_CON_HANDLE));
return CCI_ER_NO_MORE_MEMORY;
}
con_handle->stmt_pool = mht_create (0, 1000, mht_5strhash,
mht_strcasecmpeq);
if (con_handle->stmt_pool == NULL)
{
FREE_MEM (con_handle->db_name);
FREE_MEM (con_handle->db_user);
FREE_MEM (con_handle->db_passwd);
FREE_MEM (con_handle->req_handle_table);
return CCI_ER_NO_MORE_MEMORY;
}
memset (con_handle->req_handle_table,
0, sizeof (T_REQ_HANDLE *) * con_handle->max_req_handle);
con_handle->req_handle_count = 0;
con_handle->open_prepared_statement_count = 0;
memset (con_handle->broker_info, 0, BROKER_INFO_SIZE);
con_handle->cas_info[CAS_INFO_STATUS] = CAS_INFO_STATUS_INACTIVE;
con_handle->cas_info[CAS_INFO_RESERVED_1] = CAS_INFO_RESERVED_DEFAULT;
con_handle->cas_info[CAS_INFO_RESERVED_2] = CAS_INFO_RESERVED_DEFAULT;
con_handle->cas_info[CAS_INFO_ADDITIONAL_FLAG] = CAS_INFO_RESERVED_DEFAULT;
memset (con_handle->alter_hosts, 0,
sizeof (T_ALTER_HOST) * ALTER_HOST_MAX_SIZE);
con_handle->load_balance = false;
con_handle->force_failback = false;
con_handle->alter_host_count = 0;
con_handle->alter_host_id = -1;
con_handle->rc_time = 600;
con_handle->last_failure_time = 0;
con_handle->datasource = NULL;
con_handle->login_timeout = 0;
con_handle->query_timeout = 0;
con_handle->disconnect_on_query_timeout = false;
con_handle->start_time.tv_sec = 0;
con_handle->start_time.tv_usec = 0;
con_handle->current_timeout = 0;
con_handle->log_filename = NULL;
con_handle->log_on_exception = false;
con_handle->log_slow_queries = false;
con_handle->slow_query_threshold_millis = 60000;
con_handle->log_trace_api = false;
con_handle->log_trace_network = false;
con_handle->deferred_max_close_handle_count =
DEFERRED_CLOSE_HANDLE_ALLOC_SIZE;
con_handle->deferred_close_handle_list =
(int *) MALLOC (sizeof (int) *
con_handle->deferred_max_close_handle_count);
con_handle->deferred_close_handle_count = 0;
con_handle->no_backslash_escapes = CCI_NO_BACKSLASH_ESCAPES_NOT_SET;
con_handle->last_insert_id = NULL;
con_handle->shard_id = CCI_SHARD_ID_INVALID;
return 0;
}
static int
new_con_handle_id ()
{
int i;
for (i = 0; i < MAX_CON_HANDLE; i++)
{
con_handle_current_index =
(con_handle_current_index + 1) % MAX_CON_HANDLE;
if (con_handle_table[con_handle_current_index] == NULL)
{
return (con_handle_current_index + 1);
}
}
return CCI_ER_ALLOC_CON_HANDLE;
}
static int
new_req_handle_id (T_CON_HANDLE * con_handle)
{
int i;
int handle_id = 0;
int new_max_req_handle;
T_REQ_HANDLE **new_req_handle_table = NULL;
for (i = 0; i < con_handle->max_req_handle; i++)
{
if (con_handle->req_handle_table[i] == NULL)
return (i + 1);
}
new_max_req_handle = con_handle->max_req_handle + REQ_HANDLE_ALLOC_SIZE;
new_req_handle_table = (T_REQ_HANDLE **)
REALLOC (con_handle->req_handle_table,
sizeof (T_REQ_HANDLE *) * new_max_req_handle);
if (new_req_handle_table == NULL)
return CCI_ER_NO_MORE_MEMORY;
handle_id = con_handle->max_req_handle + 1;
memset (new_req_handle_table + con_handle->max_req_handle, 0,
REQ_HANDLE_ALLOC_SIZE * sizeof (T_REQ_HANDLE *));
con_handle->max_req_handle = new_max_req_handle;
con_handle->req_handle_table = new_req_handle_table;
return handle_id;
}
static void
con_handle_content_free (T_CON_HANDLE * con_handle)
{
FREE_MEM (con_handle->db_name);
FREE_MEM (con_handle->db_user);
FREE_MEM (con_handle->db_passwd);
con_handle->url[0] = '\0';
FREE_MEM (con_handle->req_handle_table);
FREE_MEM (con_handle->deferred_close_handle_list);
FREE_MEM (con_handle->last_insert_id);
if (con_handle->stmt_pool != NULL)
{
mht_destroy (con_handle->stmt_pool, true, true);
}
FREE_MEM (con_handle->log_filename);
}
static void
ipstr2uchar (char *ip_str, unsigned char *ip_addr)
{
int ip0, ip1, ip2, ip3;
if (ip_str == NULL)
{
memset (ip_addr, 0, 4);
return;
}
ip0 = ip1 = ip2 = ip3 = 0;
sscanf (ip_str, "%d%*c%d%*c%d%*c%d", &ip0, &ip1, &ip2, &ip3);
ip_addr[0] = (unsigned char) ip0;
ip_addr[1] = (unsigned char) ip1;
ip_addr[2] = (unsigned char) ip2;
ip_addr[3] = (unsigned char) ip3;
}
static int
is_ip_str (char *ip_str)
{
char *p;
for (p = ip_str; *p; p++)
{
if ((*p >= '0' && *p <= '9') || (*p == '.'))
continue;
return 0;
}
return 1;
}
static int
hostname2uchar (char *hostname, unsigned char *ip_addr)
{
struct hostent *hp;
hp = gethostbyname (hostname);
if (hp == NULL)
return -1;
memcpy (ip_addr, hp->h_addr_list[0], 4);
return 0;
}
static void
hm_set_host_status_by_addr (unsigned char *ip_addr, int port,
bool is_reachable)
{
int i;
MUTEX_LOCK (host_status_mutex);
i = hm_find_host_status_index (ip_addr, port);
if (i < 0)
{
i = host_status_count;
memcpy (host_status[i].host.ip_addr, ip_addr, 4);
host_status[i].host.port = port;
host_status_count++;
}
host_status[i].is_reachable = is_reachable;
MUTEX_UNLOCK (host_status_mutex);
}
static THREAD_RET_T THREAD_CALLING_CONVENTION
hm_thread_health_checker (void *arg)
{
int i;
unsigned char *ip_addr;
int port;
time_t start_time;
time_t elapsed_time;
while (1)
{
start_time = time (NULL);
for (i = 0; i < host_status_count; i++)
{
ip_addr = host_status[i].host.ip_addr;
port = host_status[i].host.port;
if (!host_status[i].is_reachable && net_check_broker_alive
(ip_addr, port, BROKER_HEALTH_CHECK_TIMEOUT))
{
hm_set_host_status_by_addr (ip_addr, port, true);
}
}
elapsed_time = time (NULL) - start_time;
if (elapsed_time < MONITORING_INTERVAL)
{
SLEEP_MILISEC (MONITORING_INTERVAL - elapsed_time, 0);
}
}
return (THREAD_RET_T) 0;
}