diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 81ed2f558d..15fcb32548 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -8,12 +8,21 @@ KS_BEGIN_EXTERN_C /** * */ -KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht); +KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht); + KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr); +KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); +KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht, + ks_sockaddr_t *raddr, + uint8_t *transactionid, + ks_size_t transactionid_length, + long long errorcode, + const char *errorstr); KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); +KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index c82b1c49f1..d0460d5425 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -75,10 +75,13 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); ks_dht2_register_type(dht, "q", ks_dht2_process_query); ks_dht2_register_type(dht, "r", ks_dht2_process_response); - // @todo ks_hash_insert the r/e callbacks into type registry + ks_dht2_register_type(dht, "e", ks_dht2_process_error); ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); ks_dht2_register_query(dht, "ping", ks_dht2_process_query_ping); + + ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); + // @todo register 301 error for internal get/put CAS hash mismatch retry handler dht->bind_ipv4 = KS_FALSE; dht->bind_ipv6 = KS_FALSE; @@ -111,7 +114,6 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht) dht->recv_buffer_length = 0; for (int32_t i = 0; i < dht->endpoints_size; ++i) { ks_dht2_endpoint_t *ep = dht->endpoints[i]; - //ks_hash_remove(dht->endpoints_hash, ep->addr.host); ks_dht2_endpoint_deinit(ep); ks_dht2_endpoint_free(ep); } @@ -139,6 +141,10 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht) ks_hash_destroy(&dht->registry_query); dht->registry_query = NULL; } + if (dht->registry_error) { + ks_hash_destroy(&dht->registry_error); + dht->registry_error = NULL; + } ks_dht2_nodeid_deinit(&dht->nodeid); @@ -191,6 +197,18 @@ KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_register_error(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback) +{ + ks_assert(dht); + ks_assert(value); + ks_assert(callback); + + return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; +} + /** * */ @@ -297,9 +315,114 @@ KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout) } } + ks_dht2_idle(dht); + return KS_STATUS_SUCCESS; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_maketid(ks_dht2_t *dht) +{ + ks_assert(dht); + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht) +{ + ks_assert(dht); + + if (ks_dht2_idle_expirations(dht) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht) +{ + ks_hash_iterator_t *it = NULL; + ks_time_t now = ks_time_now_sec(); + + ks_assert(dht); + + // @todo add delay between checking expirations, every 10 seconds? + + ks_hash_write_lock(dht->transactions_hash); + for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + const void *key = NULL; + ks_dht2_transaction_t *value = NULL; + ks_bool_t remove = KS_FALSE; + + ks_hash_this(it, &key, NULL, (void **)&value); + if (value->finished) { + remove = KS_TRUE; + } else if (value->expiration <= now) { + ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid); + remove = KS_TRUE; + } + if (remove) { + ks_hash_remove(dht->transactions_hash, (char *)key); + ks_pool_free(value->pool, value); + } + } + ks_hash_write_unlock(dht->transactions_hash); + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr) +{ + ks_dht2_message_t message; + ks_dht2_message_callback_t callback; + ks_status_t ret = KS_STATUS_FAIL; + + ks_assert(dht); + ks_assert(raddr); + + ks_log(KS_LOG_DEBUG, "Received message from %s %d\n", raddr->host, raddr->port); + if (raddr->family != AF_INET && raddr->family != AF_INET6) { + ks_log(KS_LOG_DEBUG, "Message from unsupported address family\n"); + return KS_STATUS_FAIL; + } + + // @todo blacklist check for bad actor nodes + + if (ks_dht2_message_prealloc(&message, dht->pool) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + + if (ks_dht2_message_init(&message, KS_FALSE) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + + if (ks_dht2_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) { + goto done; + } + + if (!(callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) { + ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type); + } else { + ret = callback(dht, raddr, &message); + } + + done: + ks_dht2_message_deinit(&message); + + return ret; +} + /** * */ @@ -350,64 +473,43 @@ KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dh /** * */ -KS_DECLARE(ks_status_t) ks_dht2_maketid(ks_dht2_t *dht) +KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht, + ks_sockaddr_t *raddr, + uint8_t *transactionid, + ks_size_t transactionid_length, + long long errorcode, + const char *errorstr) { - ks_assert(dht); - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht) -{ - ks_assert(dht); - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr) -{ - ks_dht2_message_t message; - ks_dht2_message_callback_t callback; + ks_dht2_message_t error; + struct bencode *e; ks_status_t ret = KS_STATUS_FAIL; ks_assert(dht); ks_assert(raddr); + ks_assert(transactionid); + ks_assert(errorstr); - ks_log(KS_LOG_DEBUG, "Received message from %s %d\n", raddr->host, raddr->port); - if (raddr->family != AF_INET && raddr->family != AF_INET6) { - ks_log(KS_LOG_DEBUG, "Message from unsupported address family\n"); + if (ks_dht2_message_prealloc(&error, dht->pool) != KS_STATUS_SUCCESS) { return KS_STATUS_FAIL; } - // @todo blacklist check for bad actor nodes - - if (ks_dht2_message_prealloc(&message, dht->pool) != KS_STATUS_SUCCESS) { + if (ks_dht2_message_init(&error, KS_TRUE) != KS_STATUS_SUCCESS) { return KS_STATUS_FAIL; } - if (ks_dht2_message_init(&message, KS_FALSE) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - - if (ks_dht2_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) { + if (ks_dht2_message_error(&error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) { goto done; } - - if (!(callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) { - ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type); - } else { - ret = callback(dht, raddr, &message); - } + + // @note e joins response.data and will be freed with it + ben_list_append(e, ben_int(errorcode)); + ben_list_append(e, ben_blob(errorstr, strlen(errorstr))); + + ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode); + ret = ks_dht2_send(dht, raddr, &error); done: - ks_dht2_message_deinit(&message); - + ks_dht2_message_deinit(&error); return ret; } @@ -497,13 +599,96 @@ KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t * if (!transaction) { ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid); + } else if (!ks_addr_cmp(raddr, &transaction->raddr)) { + ks_log(KS_LOG_DEBUG, + "Message response rejected due to spoofing from %s %d, expected %s %d\n", + raddr->host, + raddr->port, + transaction->raddr.host, + transaction->raddr.port); } else { + // @todo mark transaction for later removal + transaction->finished = KS_TRUE; ret = transaction->callback(dht, raddr, message); } return ret; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +{ + struct bencode *e; + struct bencode *ec; + struct bencode *es; + const char *et; + ks_size_t es_len; + long long errorcode; + char error[KS_DHT_MESSAGE_ERROR_MAX_SIZE]; + ks_dht2_transaction_t *transaction; + uint32_t *tid; + uint32_t transactionid; + ks_status_t ret = KS_STATUS_FAIL; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(message); + + // @todo start of ks_dht2_message_parse_error + e = ben_dict_get_by_str(message->data, "e"); + if (!e) { + ks_log(KS_LOG_DEBUG, "Message error missing required key 'e'\n"); + return KS_STATUS_FAIL; + } + ec = ben_list_get(e, 0); + es = ben_list_get(e, 1); + es_len = ben_str_len(es); + if (es_len >= KS_DHT_MESSAGE_ERROR_MAX_SIZE) { + ks_log(KS_LOG_DEBUG, "Message error value has an unexpectedly large size of %d\n", es_len); + return KS_STATUS_FAIL; + } + errorcode = ben_int_val(ec); + et = ben_str_val(es); + + memcpy(error, et, es_len); + error[es_len] = '\0'; + // todo end of ks_dht2_message_parse_error + + message->args = e; + + tid = (uint32_t *)message->transactionid; + transactionid = ntohl(*tid); + + transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED); + ks_hash_read_unlock(dht->transactions_hash); + + if (!transaction) { + ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid); + } else if (!ks_addr_cmp(raddr, &transaction->raddr)) { + ks_log(KS_LOG_DEBUG, + "Message error rejected due to spoofing from %s %d, expected %s %d\n", + raddr->host, + raddr->port, + transaction->raddr.host, + transaction->raddr.port); + } else { + // @todo mark transaction for later removal + ks_dht2_message_callback_t callback; + transaction->finished = KS_TRUE; + + if ((callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED))) { + ret = callback(dht, raddr, message); + } else { + ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error); + ret = KS_STATUS_SUCCESS; + } + } + + return ret; +} + /** * */ @@ -585,7 +770,7 @@ KS_DECLARE(ks_status_t) ks_dht2_send_query_ping(ks_dht2_t *dht, ks_sockaddr_t *r goto done; } - if (ks_dht2_transaction_init(transaction, transactionid, ks_dht2_process_response_ping) != KS_STATUS_SUCCESS) { + if (ks_dht2_transaction_init(transaction, raddr, transactionid, ks_dht2_process_response_ping) != KS_STATUS_SUCCESS) { goto done; } @@ -634,6 +819,7 @@ KS_DECLARE(ks_status_t) ks_dht2_send_response_ping(ks_dht2_t *dht, ks_assert(dht); ks_assert(raddr); + ks_assert(transactionid); if (ks_dht2_message_prealloc(&response, dht->pool) != KS_STATUS_SUCCESS) { return KS_STATUS_FAIL; diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index b6ed56c3ae..8985387ef1 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -16,7 +16,9 @@ KS_BEGIN_EXTERN_C #define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20 #define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20 #define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20 +#define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256 +#define KS_DHT_TRANSACTION_EXPIRATION_DELAY 30 typedef struct ks_dht2_s ks_dht2_t; typedef struct ks_dht2_nodeid_s ks_dht2_nodeid_t; @@ -54,9 +56,11 @@ struct ks_dht2_endpoint_s { struct ks_dht2_transaction_s { ks_pool_t *pool; + ks_sockaddr_t raddr; uint32_t transactionid; ks_dht2_message_callback_t callback; - // @todo expiration data + ks_time_t expiration; + ks_bool_t finished; }; @@ -71,6 +75,7 @@ struct ks_dht2_s { ks_hash_t *registry_type; ks_hash_t *registry_query; + ks_hash_t *registry_error; ks_bool_t bind_ipv4; ks_bool_t bind_ipv6; @@ -137,6 +142,10 @@ KS_DECLARE(ks_status_t) ks_dht2_message_response(ks_dht2_message_t *message, uint8_t *transactionid, ks_size_t transactionid_length, struct bencode **args); +KS_DECLARE(ks_status_t) ks_dht2_message_error(ks_dht2_message_t *message, + uint8_t *transactionid, + ks_size_t transactionid_length, + struct bencode **args); /** * @@ -150,6 +159,7 @@ KS_DECLARE(ks_status_t) ks_dht2_transaction_prealloc(ks_dht2_transaction_t *tras KS_DECLARE(ks_status_t) ks_dht2_transaction_free(ks_dht2_transaction_t *transaction); KS_DECLARE(ks_status_t) ks_dht2_transaction_init(ks_dht2_transaction_t *transaction, + ks_sockaddr_t *raddr, uint32_t transactionid, ks_dht2_message_callback_t callback); KS_DECLARE(ks_status_t) ks_dht2_transaction_deinit(ks_dht2_transaction_t *transaction); diff --git a/libs/libks/src/dht/ks_dht_message.c b/libs/libks/src/dht/ks_dht_message.c index a84eb60e77..ed88a0fe63 100644 --- a/libs/libks/src/dht/ks_dht_message.c +++ b/libs/libks/src/dht/ks_dht_message.c @@ -207,6 +207,33 @@ KS_DECLARE(ks_status_t) ks_dht2_message_response(ks_dht2_message_t *message, return KS_STATUS_SUCCESS; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_message_error(ks_dht2_message_t *message, + uint8_t *transactionid, + ks_size_t transactionid_length, + struct bencode **args) +{ + struct bencode *e; + + ks_assert(message); + ks_assert(transactionid); + + ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length)); + ben_dict_set(message->data, ben_blob("y", 1), ben_blob("e", 1)); + + // @note r joins message->data and will be freed with it + e = ben_list(); + ben_dict_set(message->data, ben_blob("e", 1), e); + + if (args) { + *args = e; + } + + return KS_STATUS_SUCCESS; +} + /* For Emacs: * Local Variables: diff --git a/libs/libks/src/dht/ks_dht_transaction.c b/libs/libks/src/dht/ks_dht_transaction.c index 9cfe883433..3b62f8eaf3 100644 --- a/libs/libks/src/dht/ks_dht_transaction.c +++ b/libs/libks/src/dht/ks_dht_transaction.c @@ -48,15 +48,20 @@ KS_DECLARE(ks_status_t) ks_dht2_transaction_free(ks_dht2_transaction_t *transact * */ KS_DECLARE(ks_status_t) ks_dht2_transaction_init(ks_dht2_transaction_t *transaction, + ks_sockaddr_t *raddr, uint32_t transactionid, ks_dht2_message_callback_t callback) { ks_assert(transaction); + ks_assert(raddr); ks_assert(transaction->pool); ks_assert(callback); + transaction->raddr = *raddr; transaction->transactionid = transactionid; transaction->callback = callback; + transaction->expiration = ks_time_now_sec() + KS_DHT_TRANSACTION_EXPIRATION_DELAY; + transaction->finished = KS_FALSE; return KS_STATUS_SUCCESS; } @@ -68,8 +73,11 @@ KS_DECLARE(ks_status_t) ks_dht2_transaction_deinit(ks_dht2_transaction_t *transa { ks_assert(transaction); + transaction->raddr = (const ks_sockaddr_t){ 0 }; transaction->transactionid = 0; transaction->callback = NULL; + transaction->expiration = 0; + transaction->finished = KS_FALSE; return KS_STATUS_SUCCESS; } diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index 482449c689..f54b776511 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -10,6 +10,7 @@ ks_status_t dht_z_callback(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message { diag("dht_z_callback\n"); ok(message->transactionid[0] == '4' && message->transactionid[1] == '2'); + ks_dht2_send_error(dht, raddr, message->transactionid, message->transactionid_length, 201, "Generic test error"); return KS_STATUS_SUCCESS; } @@ -97,6 +98,8 @@ int main() { err = ks_dht2_process(dht1, &raddr); ok(err == KS_STATUS_SUCCESS); + err = ks_dht2_pulse(&dht2, 1000); + ok(err == KS_STATUS_SUCCESS); //buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER); //memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_QUERY_PING_BUFFER, buflen);